Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import wntr | |
| import tempfile | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import traceback | |
| import io | |
| import os | |
| import base64 | |
| import json | |
| def simulate(inp_file, sim_type): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as tmp: | |
| tmp.write(inp_file) | |
| tmp_path = tmp.name | |
| wn = wntr.network.WaterNetworkModel(tmp_path) | |
| if sim_type.upper() == "EPANET": | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as f_out: | |
| sim = wntr.sim.EpanetSimulator(wn) | |
| sim._inpfile = f_out.name | |
| else: | |
| sim = wntr.sim.WNTRSimulator(wn) | |
| results = sim.run_sim() | |
| pressure = results.node["pressure"].mean().to_dict() | |
| demand = results.node["demand"].mean().to_dict() | |
| pressure_summary = "\n".join(f"{k}: {v:.2f} m" for k, v in pressure.items()) | |
| demand_summary = "\n".join(f"{k}: {v:.2f} L/s" for k, v in demand.items()) | |
| return pressure_summary, demand_summary | |
| except Exception: | |
| return f"Simulation error:\n{traceback.format_exc()}", "" | |
| def simulate_disaster(inp_file, failure_element, failure_type): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as tmp: | |
| tmp.write(inp_file) | |
| path = tmp.name | |
| wn = wntr.network.WaterNetworkModel(path) | |
| if failure_type == "pipe_break": | |
| wn = wntr.morph.split_pipe(wn, failure_element) | |
| elif failure_type == "node_closure": | |
| wn.get_node(failure_element).is_isolated = True | |
| sim = wntr.sim.WNTRSimulator(wn) | |
| results = sim.run_sim() | |
| pressure = results.node["pressure"].mean().to_dict() | |
| demand = results.node["demand"].mean().to_dict() | |
| pressure_summary = "\n".join(f"{k}: {v:.2f} m" for k, v in pressure.items()) | |
| demand_summary = "\n".join(f"{k}: {v:.2f} L/s" for k, v in demand.items()) | |
| return pressure_summary, demand_summary | |
| except Exception as e: | |
| return f"Disaster simulation error: {str(e)}", "" | |
| def simulate_quality(inp_file, quality_type, trace_node): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as tmp: | |
| tmp.write(inp_file) | |
| path = tmp.name | |
| wn = wntr.network.WaterNetworkModel(path) | |
| wn.options.quality.mode = quality_type | |
| if quality_type == "trace" and trace_node: | |
| wn.options.quality.trace_node = trace_node | |
| sim = wntr.sim.EpanetSimulator(wn) | |
| results = sim.run_sim() | |
| quality = results.node["quality"].mean().to_dict() | |
| summary = "\n".join(f"{k}: {v:.2f}" for k, v in quality.items()) | |
| return summary | |
| except Exception as e: | |
| return f"Quality simulation error: {str(e)}" | |
| def analyze_results(analysis_type, pressure_str, demand_str): | |
| try: | |
| pressure = pd.DataFrame([line.split(": ") for line in pressure_str.splitlines()], columns=["Node", "Value"]) | |
| pressure["Value"] = pressure["Value"].astype(float) | |
| demand = pd.DataFrame([line.split(": ") for line in demand_str.splitlines()], columns=["Node", "Value"]) | |
| demand["Value"] = demand["Value"].astype(float) | |
| if analysis_type == "Resilience": | |
| avg_pressure = pressure["Value"].mean() | |
| avg_demand = demand["Value"].mean() | |
| resilience_index = round((avg_pressure / 50) * (avg_demand / 1.0), 3) | |
| return f"Estimated resilience index: {resilience_index}" | |
| elif analysis_type == "Morphology": | |
| return f"The network has {len(pressure)} active nodes." | |
| else: | |
| return "Unknown analysis type." | |
| except Exception as e: | |
| return f"Analysis error: {str(e)}" | |
| def simulate_economic_loss(inp_file): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as tmp: | |
| tmp.write(inp_file) | |
| path = tmp.name | |
| wn = wntr.network.WaterNetworkModel(path) | |
| sim = wntr.sim.WNTRSimulator(wn) | |
| results = sim.run_sim() | |
| node_demand = results.node["demand"].mean() | |
| loss_by_node = (1.0 - node_demand / node_demand.max()) * 100 | |
| total_loss = loss_by_node.sum() | |
| return f"Total economic loss: {total_loss:.2f}%" | |
| except Exception as e: | |
| return f"Economic loss error: {str(e)}" | |
| def simulate_criticality(inp_file): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as tmp: | |
| tmp.write(inp_file) | |
| path = tmp.name | |
| wn = wntr.network.WaterNetworkModel(path) | |
| results = {} | |
| for pipe in wn.pipe_name_list[:5]: | |
| wn_temp = wntr.network.WaterNetworkModel(path) | |
| wn_temp.get_link(pipe).status = "CLOSED" | |
| sim = wntr.sim.WNTRSimulator(wn_temp) | |
| res = sim.run_sim() | |
| demand = res.node["demand"].mean().sum() | |
| results[pipe] = round(demand, 3) | |
| return json.dumps(results, indent=2) | |
| except Exception as e: | |
| return f"Criticality error: {str(e)}" | |
| def plot_network(inp_file): | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".inp") as tmp: | |
| tmp.write(inp_file) | |
| path = tmp.name | |
| wn = wntr.network.WaterNetworkModel(path) | |
| fig = plt.figure() | |
| wn.draw_network() | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format="png") | |
| buf.seek(0) | |
| image_data = base64.b64encode(buf.read()).decode("utf-8") | |
| plt.close(fig) | |
| return f"data:image/png;base64,{image_data}" | |
| except Exception as e: | |
| return f"Plot error: {str(e)}" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 💧 WNTR Simulation & Analysis Tool") | |
| with gr.Tab("Simulate"): | |
| file_input = gr.File(label="Upload .inp File", type="binary") | |
| sim_type = gr.Radio(["EPANET", "WNTR"], label="Simulation Type", value="EPANET") | |
| pressure_output = gr.Textbox(label="Pressure Summary") | |
| demand_output = gr.Textbox(label="Demand Summary") | |
| gr.Button("Run Simulation").click(simulate, inputs=[file_input, sim_type], outputs=[pressure_output, demand_output]) | |
| with gr.Tab("Disaster"): | |
| failure_element = gr.Textbox(label="Element ID") | |
| failure_type = gr.Dropdown(["pipe_break", "node_closure"], label="Failure Type") | |
| d_pressure = gr.Textbox(label="Pressure") | |
| d_demand = gr.Textbox(label="Demand") | |
| gr.Button("Simulate Disaster").click(simulate_disaster, inputs=[file_input, failure_element, failure_type], outputs=[d_pressure, d_demand]) | |
| with gr.Tab("Water Quality"): | |
| quality_type = gr.Dropdown(["AGE", "CHEM", "trace"], label="Quality Type") | |
| trace_node = gr.Textbox(label="Trace Node") | |
| quality_output = gr.Textbox(label="Water Quality Result") | |
| gr.Button("Simulate Water Quality").click(simulate_quality, inputs=[file_input, quality_type, trace_node], outputs=quality_output) | |
| with gr.Tab("Analysis"): | |
| analysis_type = gr.Dropdown(["Resilience", "Morphology"], label="Analysis Type") | |
| analysis_output = gr.Textbox(label="Analysis Result") | |
| gr.Button("Analyze").click(analyze_results, inputs=[analysis_type, pressure_output, demand_output], outputs=analysis_output) | |
| with gr.Tab("Economic Loss"): | |
| econ_output = gr.Textbox(label="Economic Loss Summary") | |
| gr.Button("Compute Economic Loss").click(simulate_economic_loss, inputs=file_input, outputs=econ_output) | |
| with gr.Tab("Criticality"): | |
| critical_output = gr.Textbox(label="Criticality Result") | |
| gr.Button("Run Criticality Assessment").click(simulate_criticality, inputs=file_input, outputs=critical_output) | |
| with gr.Tab("Network Plot"): | |
| net_plot = gr.Image(label="Network Diagram") | |
| gr.Button("Plot Network").click(plot_network, inputs=file_input, outputs=net_plot) | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |