import gradio as gd import geopandas as gpd import plotly.graph_objects as go import networkx as nx import rasterio from scipy.stats import norm import numpy as np import matplotlib.pyplot as plt import pandas as pd def visualize_network(node_gson, edge_gson): if node_gson == None or edge_gson == None: return go.Figure() print(node_gson.name, edge_gson.name) node_df = gpd.read_file(node_gson.name) edge_df = gpd.read_file(edge_gson.name) lon = node_df['x_coord'].tolist() lat = node_df['y_coord'].tolist() fig = go.Figure( go.Scattermapbox( lat=lat, lon=lon, mode='markers', marker=go.scattermapbox.Marker( size=6 ), )) for i in edge_df.geometry: lon, lat = i.geoms[0].coords.xy start_lon, end_lon = lon start_lat, end_lat = lat fig.add_trace( go.Scattermapbox( lon = [start_lon, end_lon], lat = [start_lat, end_lat], mode = 'lines', line = dict(width = 3,color = 'blue'), ) ) fig.update_layout( mapbox_style="open-street-map", showlegend = False, mapbox=dict( center=go.layout.mapbox.Center( lat=35.22, lon=-97.4 ), zoom=10 ), ) return fig def generate_power_network(nodes,edges): # Initialize Graph G_power = nx.Graph() try: # Add Nodes with Node ID and Coordinates for i in range(len(nodes)): G_power.add_nodes_from([nodes['NODE_ID'][i]],pos=(nodes['x_coord'][i],nodes['y_coord'][i]), power_plant=nodes['pwr_plant'][i],eq_vuln_str = nodes['eq_vuln'][i], service_area=nodes['serv_area'][i],num_buildings=nodes['n_bldgs'][i], income=nodes['income'][i]) del i except: print('Check node list and make sure all the fields are correctly named') # Add Edges with length, and road type information try: for i in range(len(edges)): e = (edges['FROM_NODE'][i],edges['TO_NODE'][i]) G_power.add_edge(*e,ID=edges['EDGE_ID'][i],length=edges['length'][i]) except: print('Check edge list and make sure all the fields are correctly named') return G_power def power_hazard_impact(hazard_tiff, nodes, eq_vuln): print(type(hazard_tiff), type(nodes), type(eq_vuln)) eq_map = rasterio.open(hazard_tiff) vulnerable_components = nodes[nodes['eq_vuln'].notna()] vulnerable_components_pga = [] for point in vulnerable_components['geometry']: x = point.xy[0][0] y = point.xy[1][0] row, col = eq_map.index(x,y) try: pga_val = eq_map.read(1)[row,col] except: pga_val = 0 vulnerable_components_pga.append(pga_val) vulnerable_components['pga_val'] = vulnerable_components_pga vulnerable_components['med_slight'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Slight']) vulnerable_components['med_moderate'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Moderate']) vulnerable_components['med_extensive'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Extensive']) vulnerable_components['med_complete'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Complete']) vulnerable_components['beta_slight'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Slight']) vulnerable_components['beta_moderate'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Moderate']) vulnerable_components['beta_extensive'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Extensive']) vulnerable_components['beta_complete'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Complete']) p_failure = norm.cdf((np.log(vulnerable_components['pga_val'])-np.log(vulnerable_components['med_extensive']))/vulnerable_components['beta_extensive']) vulnerable_components['p_failure'] = p_failure return vulnerable_components def find_negative_columns(matrix): # Initialize empty list to store column indices negative_columns = [] # Loop through each column for col in range(matrix.shape[1]): # Check if both elements in the column are negative if np.all(matrix[:, col] < 0): negative_columns.append(col) # Return the list of column indices with negative values in both rows return negative_columns def pwr_eq_impact_analysis(G_power,nodes,vulnerable_components): # Most likely scenario removal_coponents = list(vulnerable_components[vulnerable_components['p_failure']>=0.5]['NODE_ID'].astype(int)) G_power_dmg = G_power.copy() G_power_dmg.remove_nodes_from(removal_coponents) source_nodes = nodes[nodes['pwr_plant']==1] source_nodes_idx = list(source_nodes['NODE_ID'].astype(int)) destination_nodes = nodes[nodes['n_bldgs']>0] destination_idx = list(destination_nodes['NODE_ID'].astype(int)) tmp_dist= [] for i in range(len(source_nodes_idx)): source_i = source_nodes_idx[i] for j in range(len(destination_idx)): dest_j = destination_idx[j] try: path_tmp = nx.shortest_path_length(G_power_dmg, source=source_i, target=dest_j, weight='length', method='dijkstra') tmp_dist.append(path_tmp) except: tmp_dist.append(-9999) dist_array = np.array(tmp_dist).reshape((len(source_nodes_idx),len(destination_idx))) idx_impacted = find_negative_columns(dist_array) # Find impacted based on index nodes_impacted = destination_nodes.iloc[idx_impacted] # Total buildings/total population tot_bldgs_impacted = int(sum(nodes_impacted['n_bldgs'])) # Divide by income/use levels try: low_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='low'])) except: low_income_impacted = 0 try: med_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='medium'])) except: med_income_impacted = 0 try: high_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='high'])) except: high_income_impacted = 0 try: commercial_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='commercial'])) except: commercial_impacted = 0 try: industrial_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='industrial'])) except: industrial_impacted = 0 return tot_bldgs_impacted, [low_income_impacted,med_income_impacted,high_income_impacted,commercial_impacted,industrial_impacted] def run(node_gson, edge_gson, eq_vuln): nodes = gpd.read_file(node_gson.name) edges = gpd.read_file(edge_gson.name) G_power = generate_power_network(nodes,edges) power_vulnerable_components = power_hazard_impact("pga_eq_centerville.tif", nodes, eq_vuln) pwr_tot_bldgs_impacted, [pwr_low_income_impacted,pwr_med_income_impacted,pwr_high_income_impacted,pwr_commercial_impacted,pwr_industrial_impacted] = \ pwr_eq_impact_analysis(G_power,nodes,power_vulnerable_components) pwr_data = [pwr_low_income_impacted,pwr_med_income_impacted,pwr_high_income_impacted,pwr_commercial_impacted,pwr_industrial_impacted] pwr_labels = ['Low Income', 'Medium Income', 'High Income', 'Commercial', 'Industrial'] print(pwr_data, pwr_labels) df = pd.DataFrame([pwr_data],columns=pwr_labels) print(df) return df with gd.Blocks() as demo: with gd.Column(): gd.Markdown(""" * If you don't have data, select Examples below to load existing data * Click Visualize to see the power network on the map * Run Analysis to see the number of buildings effected * Earthquake TIFF (pga) is hard-coded * Network information is loaded via geogson files. """) eq_vuln = gd.DataFrame() with gd.Row(): node_gson = gd.File() edge_gson = gd.File() show_button = gd.Button("Visualize Network") network_plot = gd.Plot(go.Figure()) run_button = gd.Button("Run Analysis") pwr_labels = ['Low Income', 'Medium Income', 'High Income', 'Commercial', 'Industrial'] result = gd.DataFrame(headers=pwr_labels) show_button.click(visualize_network, inputs=[node_gson, edge_gson], outputs=network_plot) run_button.click(run, inputs=[node_gson,edge_gson, eq_vuln], outputs=result) gd.Examples( label="Examples", examples=[["epn_links.geojson", "epn_nodes.geojson","eq_pwr_vulnerability.csv"]], inputs=[edge_gson, node_gson, eq_vuln]) if __name__ == "__main__": demo.launch()