Spaces:
Sleeping
Sleeping
| import gradio as gd | |
| import geopandas as gpd | |
| import plotly.graph_objects as go | |
| import networkx as nx | |
| import rasterio | |
| from scipy.stats import norm | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| def visualize_network(node_gson, edge_gson): | |
| if node_gson == None or edge_gson == None: | |
| return go.Figure() | |
| print(node_gson.name, edge_gson.name) | |
| node_df = gpd.read_file(node_gson.name) | |
| edge_df = gpd.read_file(edge_gson.name) | |
| lon = node_df['x_coord'].tolist() | |
| lat = node_df['y_coord'].tolist() | |
| fig = go.Figure( | |
| go.Scattermapbox( | |
| lat=lat, | |
| lon=lon, | |
| mode='markers', | |
| marker=go.scattermapbox.Marker( | |
| size=6 | |
| ), | |
| )) | |
| for i in edge_df.geometry: | |
| lon, lat = i.geoms[0].coords.xy | |
| start_lon, end_lon = lon | |
| start_lat, end_lat = lat | |
| fig.add_trace( | |
| go.Scattermapbox( | |
| lon = [start_lon, end_lon], | |
| lat = [start_lat, end_lat], | |
| mode = 'lines', | |
| line = dict(width = 3,color = 'blue'), | |
| ) | |
| ) | |
| fig.update_layout( | |
| mapbox_style="open-street-map", | |
| showlegend = False, | |
| mapbox=dict( | |
| center=go.layout.mapbox.Center( | |
| lat=35.22, | |
| lon=-97.4 | |
| ), | |
| zoom=10 | |
| ), | |
| ) | |
| return fig | |
| def generate_power_network(nodes,edges): | |
| # Initialize Graph | |
| G_power = nx.Graph() | |
| try: | |
| # Add Nodes with Node ID and Coordinates | |
| for i in range(len(nodes)): | |
| G_power.add_nodes_from([nodes['NODE_ID'][i]],pos=(nodes['x_coord'][i],nodes['y_coord'][i]), | |
| power_plant=nodes['pwr_plant'][i],eq_vuln_str = nodes['eq_vuln'][i], | |
| service_area=nodes['serv_area'][i],num_buildings=nodes['n_bldgs'][i], | |
| income=nodes['income'][i]) | |
| del i | |
| except: | |
| print('Check node list and make sure all the fields are correctly named') | |
| # Add Edges with length, and road type information | |
| try: | |
| for i in range(len(edges)): | |
| e = (edges['FROM_NODE'][i],edges['TO_NODE'][i]) | |
| G_power.add_edge(*e,ID=edges['EDGE_ID'][i],length=edges['length'][i]) | |
| except: | |
| print('Check edge list and make sure all the fields are correctly named') | |
| return G_power | |
| def power_hazard_impact(hazard_tiff, nodes, eq_vuln): | |
| print(type(hazard_tiff), type(nodes), type(eq_vuln)) | |
| eq_map = rasterio.open(hazard_tiff) | |
| vulnerable_components = nodes[nodes['eq_vuln'].notna()] | |
| vulnerable_components_pga = [] | |
| for point in vulnerable_components['geometry']: | |
| x = point.xy[0][0] | |
| y = point.xy[1][0] | |
| row, col = eq_map.index(x,y) | |
| try: | |
| pga_val = eq_map.read(1)[row,col] | |
| except: | |
| pga_val = 0 | |
| vulnerable_components_pga.append(pga_val) | |
| vulnerable_components['pga_val'] = vulnerable_components_pga | |
| vulnerable_components['med_slight'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Slight']) | |
| vulnerable_components['med_moderate'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Moderate']) | |
| vulnerable_components['med_extensive'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Extensive']) | |
| vulnerable_components['med_complete'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Complete']) | |
| vulnerable_components['beta_slight'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Slight']) | |
| vulnerable_components['beta_moderate'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Moderate']) | |
| vulnerable_components['beta_extensive'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Extensive']) | |
| vulnerable_components['beta_complete'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Complete']) | |
| p_failure = norm.cdf((np.log(vulnerable_components['pga_val'])-np.log(vulnerable_components['med_extensive']))/vulnerable_components['beta_extensive']) | |
| vulnerable_components['p_failure'] = p_failure | |
| return vulnerable_components | |
| def find_negative_columns(matrix): | |
| # Initialize empty list to store column indices | |
| negative_columns = [] | |
| # Loop through each column | |
| for col in range(matrix.shape[1]): | |
| # Check if both elements in the column are negative | |
| if np.all(matrix[:, col] < 0): | |
| negative_columns.append(col) | |
| # Return the list of column indices with negative values in both rows | |
| return negative_columns | |
| def pwr_eq_impact_analysis(G_power,nodes,vulnerable_components): | |
| # Most likely scenario | |
| removal_coponents = list(vulnerable_components[vulnerable_components['p_failure']>=0.5]['NODE_ID'].astype(int)) | |
| G_power_dmg = G_power.copy() | |
| G_power_dmg.remove_nodes_from(removal_coponents) | |
| source_nodes = nodes[nodes['pwr_plant']==1] | |
| source_nodes_idx = list(source_nodes['NODE_ID'].astype(int)) | |
| destination_nodes = nodes[nodes['n_bldgs']>0] | |
| destination_idx = list(destination_nodes['NODE_ID'].astype(int)) | |
| tmp_dist= [] | |
| for i in range(len(source_nodes_idx)): | |
| source_i = source_nodes_idx[i] | |
| for j in range(len(destination_idx)): | |
| dest_j = destination_idx[j] | |
| try: | |
| path_tmp = nx.shortest_path_length(G_power_dmg, source=source_i, target=dest_j, weight='length', method='dijkstra') | |
| tmp_dist.append(path_tmp) | |
| except: | |
| tmp_dist.append(-9999) | |
| dist_array = np.array(tmp_dist).reshape((len(source_nodes_idx),len(destination_idx))) | |
| idx_impacted = find_negative_columns(dist_array) | |
| # Find impacted based on index | |
| nodes_impacted = destination_nodes.iloc[idx_impacted] | |
| # Total buildings/total population | |
| tot_bldgs_impacted = int(sum(nodes_impacted['n_bldgs'])) | |
| # Divide by income/use levels | |
| try: | |
| low_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='low'])) | |
| except: | |
| low_income_impacted = 0 | |
| try: | |
| med_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='medium'])) | |
| except: | |
| med_income_impacted = 0 | |
| try: | |
| high_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='high'])) | |
| except: | |
| high_income_impacted = 0 | |
| try: | |
| commercial_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='commercial'])) | |
| except: | |
| commercial_impacted = 0 | |
| try: | |
| industrial_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='industrial'])) | |
| except: | |
| industrial_impacted = 0 | |
| return tot_bldgs_impacted, [low_income_impacted,med_income_impacted,high_income_impacted,commercial_impacted,industrial_impacted] | |
| def run(node_gson, edge_gson, eq_vuln): | |
| nodes = gpd.read_file(node_gson.name) | |
| edges = gpd.read_file(edge_gson.name) | |
| G_power = generate_power_network(nodes,edges) | |
| power_vulnerable_components = power_hazard_impact("pga_eq_centerville.tif", | |
| nodes, eq_vuln) | |
| pwr_tot_bldgs_impacted, [pwr_low_income_impacted,pwr_med_income_impacted,pwr_high_income_impacted,pwr_commercial_impacted,pwr_industrial_impacted] = \ | |
| pwr_eq_impact_analysis(G_power,nodes,power_vulnerable_components) | |
| pwr_data = [pwr_low_income_impacted,pwr_med_income_impacted,pwr_high_income_impacted,pwr_commercial_impacted,pwr_industrial_impacted] | |
| pwr_labels = ['Low Income', 'Medium Income', 'High Income', 'Commercial', 'Industrial'] | |
| print(pwr_data, pwr_labels) | |
| df = pd.DataFrame([pwr_data],columns=pwr_labels) | |
| print(df) | |
| return df | |
| with gd.Blocks() as demo: | |
| with gd.Column(): | |
| gd.Markdown(""" | |
| * If you don't have data, select Examples below to load existing data | |
| * Click Visualize to see the power network on the map | |
| * Run Analysis to see the number of buildings effected | |
| * Earthquake TIFF (pga) is hard-coded | |
| * Network information is loaded via geogson files. | |
| """) | |
| eq_vuln = gd.DataFrame() | |
| with gd.Row(): | |
| node_gson = gd.File() | |
| edge_gson = gd.File() | |
| show_button = gd.Button("Visualize Network") | |
| network_plot = gd.Plot(go.Figure()) | |
| run_button = gd.Button("Run Analysis") | |
| pwr_labels = ['Low Income', 'Medium Income', 'High Income', 'Commercial', 'Industrial'] | |
| result = gd.DataFrame(headers=pwr_labels) | |
| show_button.click(visualize_network, inputs=[node_gson, edge_gson], outputs=network_plot) | |
| run_button.click(run, inputs=[node_gson,edge_gson, eq_vuln], outputs=result) | |
| gd.Examples( | |
| label="Examples", | |
| examples=[["epn_links.geojson", "epn_nodes.geojson","eq_pwr_vulnerability.csv"]], | |
| inputs=[edge_gson, node_gson, eq_vuln]) | |
| if __name__ == "__main__": | |
| demo.launch() |