power / app.py
hkayabilisim's picture
Upload 6 files
d01368b
import gradio as gd
import geopandas as gpd
import plotly.graph_objects as go
import networkx as nx
import rasterio
from scipy.stats import norm
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
def visualize_network(node_gson, edge_gson):
if node_gson == None or edge_gson == None:
return go.Figure()
print(node_gson.name, edge_gson.name)
node_df = gpd.read_file(node_gson.name)
edge_df = gpd.read_file(edge_gson.name)
lon = node_df['x_coord'].tolist()
lat = node_df['y_coord'].tolist()
fig = go.Figure(
go.Scattermapbox(
lat=lat,
lon=lon,
mode='markers',
marker=go.scattermapbox.Marker(
size=6
),
))
for i in edge_df.geometry:
lon, lat = i.geoms[0].coords.xy
start_lon, end_lon = lon
start_lat, end_lat = lat
fig.add_trace(
go.Scattermapbox(
lon = [start_lon, end_lon],
lat = [start_lat, end_lat],
mode = 'lines',
line = dict(width = 3,color = 'blue'),
)
)
fig.update_layout(
mapbox_style="open-street-map",
showlegend = False,
mapbox=dict(
center=go.layout.mapbox.Center(
lat=35.22,
lon=-97.4
),
zoom=10
),
)
return fig
def generate_power_network(nodes,edges):
# Initialize Graph
G_power = nx.Graph()
try:
# Add Nodes with Node ID and Coordinates
for i in range(len(nodes)):
G_power.add_nodes_from([nodes['NODE_ID'][i]],pos=(nodes['x_coord'][i],nodes['y_coord'][i]),
power_plant=nodes['pwr_plant'][i],eq_vuln_str = nodes['eq_vuln'][i],
service_area=nodes['serv_area'][i],num_buildings=nodes['n_bldgs'][i],
income=nodes['income'][i])
del i
except:
print('Check node list and make sure all the fields are correctly named')
# Add Edges with length, and road type information
try:
for i in range(len(edges)):
e = (edges['FROM_NODE'][i],edges['TO_NODE'][i])
G_power.add_edge(*e,ID=edges['EDGE_ID'][i],length=edges['length'][i])
except:
print('Check edge list and make sure all the fields are correctly named')
return G_power
def power_hazard_impact(hazard_tiff, nodes, eq_vuln):
print(type(hazard_tiff), type(nodes), type(eq_vuln))
eq_map = rasterio.open(hazard_tiff)
vulnerable_components = nodes[nodes['eq_vuln'].notna()]
vulnerable_components_pga = []
for point in vulnerable_components['geometry']:
x = point.xy[0][0]
y = point.xy[1][0]
row, col = eq_map.index(x,y)
try:
pga_val = eq_map.read(1)[row,col]
except:
pga_val = 0
vulnerable_components_pga.append(pga_val)
vulnerable_components['pga_val'] = vulnerable_components_pga
vulnerable_components['med_slight'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Slight'])
vulnerable_components['med_moderate'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Moderate'])
vulnerable_components['med_extensive'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Extensive'])
vulnerable_components['med_complete'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['med_Complete'])
vulnerable_components['beta_slight'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Slight'])
vulnerable_components['beta_moderate'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Moderate'])
vulnerable_components['beta_extensive'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Extensive'])
vulnerable_components['beta_complete'] = vulnerable_components['eq_vuln'].map(eq_vuln.set_index('vuln_string')['beta_Complete'])
p_failure = norm.cdf((np.log(vulnerable_components['pga_val'])-np.log(vulnerable_components['med_extensive']))/vulnerable_components['beta_extensive'])
vulnerable_components['p_failure'] = p_failure
return vulnerable_components
def find_negative_columns(matrix):
# Initialize empty list to store column indices
negative_columns = []
# Loop through each column
for col in range(matrix.shape[1]):
# Check if both elements in the column are negative
if np.all(matrix[:, col] < 0):
negative_columns.append(col)
# Return the list of column indices with negative values in both rows
return negative_columns
def pwr_eq_impact_analysis(G_power,nodes,vulnerable_components):
# Most likely scenario
removal_coponents = list(vulnerable_components[vulnerable_components['p_failure']>=0.5]['NODE_ID'].astype(int))
G_power_dmg = G_power.copy()
G_power_dmg.remove_nodes_from(removal_coponents)
source_nodes = nodes[nodes['pwr_plant']==1]
source_nodes_idx = list(source_nodes['NODE_ID'].astype(int))
destination_nodes = nodes[nodes['n_bldgs']>0]
destination_idx = list(destination_nodes['NODE_ID'].astype(int))
tmp_dist= []
for i in range(len(source_nodes_idx)):
source_i = source_nodes_idx[i]
for j in range(len(destination_idx)):
dest_j = destination_idx[j]
try:
path_tmp = nx.shortest_path_length(G_power_dmg, source=source_i, target=dest_j, weight='length', method='dijkstra')
tmp_dist.append(path_tmp)
except:
tmp_dist.append(-9999)
dist_array = np.array(tmp_dist).reshape((len(source_nodes_idx),len(destination_idx)))
idx_impacted = find_negative_columns(dist_array)
# Find impacted based on index
nodes_impacted = destination_nodes.iloc[idx_impacted]
# Total buildings/total population
tot_bldgs_impacted = int(sum(nodes_impacted['n_bldgs']))
# Divide by income/use levels
try:
low_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='low']))
except:
low_income_impacted = 0
try:
med_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='medium']))
except:
med_income_impacted = 0
try:
high_income_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='high']))
except:
high_income_impacted = 0
try:
commercial_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='commercial']))
except:
commercial_impacted = 0
try:
industrial_impacted = int(sum(nodes_impacted['n_bldgs'][nodes_impacted['income']=='industrial']))
except:
industrial_impacted = 0
return tot_bldgs_impacted, [low_income_impacted,med_income_impacted,high_income_impacted,commercial_impacted,industrial_impacted]
def run(node_gson, edge_gson, eq_vuln):
nodes = gpd.read_file(node_gson.name)
edges = gpd.read_file(edge_gson.name)
G_power = generate_power_network(nodes,edges)
power_vulnerable_components = power_hazard_impact("pga_eq_centerville.tif",
nodes, eq_vuln)
pwr_tot_bldgs_impacted, [pwr_low_income_impacted,pwr_med_income_impacted,pwr_high_income_impacted,pwr_commercial_impacted,pwr_industrial_impacted] = \
pwr_eq_impact_analysis(G_power,nodes,power_vulnerable_components)
pwr_data = [pwr_low_income_impacted,pwr_med_income_impacted,pwr_high_income_impacted,pwr_commercial_impacted,pwr_industrial_impacted]
pwr_labels = ['Low Income', 'Medium Income', 'High Income', 'Commercial', 'Industrial']
print(pwr_data, pwr_labels)
df = pd.DataFrame([pwr_data],columns=pwr_labels)
print(df)
return df
with gd.Blocks() as demo:
with gd.Column():
gd.Markdown("""
* If you don't have data, select Examples below to load existing data
* Click Visualize to see the power network on the map
* Run Analysis to see the number of buildings effected
* Earthquake TIFF (pga) is hard-coded
* Network information is loaded via geogson files.
""")
eq_vuln = gd.DataFrame()
with gd.Row():
node_gson = gd.File()
edge_gson = gd.File()
show_button = gd.Button("Visualize Network")
network_plot = gd.Plot(go.Figure())
run_button = gd.Button("Run Analysis")
pwr_labels = ['Low Income', 'Medium Income', 'High Income', 'Commercial', 'Industrial']
result = gd.DataFrame(headers=pwr_labels)
show_button.click(visualize_network, inputs=[node_gson, edge_gson], outputs=network_plot)
run_button.click(run, inputs=[node_gson,edge_gson, eq_vuln], outputs=result)
gd.Examples(
label="Examples",
examples=[["epn_links.geojson", "epn_nodes.geojson","eq_pwr_vulnerability.csv"]],
inputs=[edge_gson, node_gson, eq_vuln])
if __name__ == "__main__":
demo.launch()