| import streamlit as st |
| from pysd import read_vensim, read_xmile |
| import os |
| import matplotlib.pyplot as plt |
| import numpy as np |
| from pathlib import Path |
| import networkx as nx |
| from pyvis.network import Network |
| import tempfile |
| import base64 |
| from fpdf import FPDF |
| import polars as pl |
| import pandas as pd |
|
|
| st.set_page_config(page_title="π PySD System Dynamics Simulator", layout="wide") |
| st.title("π¦ System Dynamics Model Simulator using PySD") |
|
|
| UPLOAD_DIR = Path("uploaded_models") |
| UPLOAD_DIR.mkdir(exist_ok=True) |
|
|
| |
| model_instance = None |
| model_docs = None |
| sim_result = None |
| uploaded_file = st.file_uploader("Upload a `.mdl` (Vensim) or `.xmile` (XMILE) model", type=["mdl", "xmile"]) |
|
|
| if uploaded_file is not None: |
| model_instance = None |
| model_docs = None |
| sim_result = None |
| file_suffix = os.path.splitext(uploaded_file.name)[-1].lower() |
| model_path = UPLOAD_DIR / uploaded_file.name |
| with open(model_path, "wb") as f: |
| f.write(uploaded_file.getbuffer()) |
| st.success(f"Model uploaded to `{model_path}`") |
| try: |
| if file_suffix == ".mdl": |
| model_instance = read_vensim(str(model_path)) |
| elif file_suffix == ".xmile": |
| model_instance = read_xmile(str(model_path)) |
| else: |
| st.error("Unsupported file type.") |
| st.stop() |
|
|
| |
| model_docs = model_instance.doc() |
|
|
| |
| st.write("Type of model_docs:", type(model_docs)) |
| if isinstance(model_docs, pl.DataFrame): |
| st.dataframe(model_docs.head()) |
| elif isinstance(model_docs, pd.DataFrame): |
| st.dataframe(model_docs.head()) |
| else: |
| st.write("model_docs:", model_docs) |
|
|
| st.success("β
Model parsed successfully!") |
|
|
| except Exception as e: |
| st.error(f"Error loading model: {e}") |
| st.stop() |
|
|
| st.subheader("β±οΈ Simulation Settings") |
| initial_time = st.number_input("Initial Time", value=0.0) |
| final_time = st.number_input("Final Time", value=100.0) |
| time_step = st.number_input("Time Step", value=1.0) |
|
|
| if st.button("Run Simulation") and model_instance: |
| try: |
| time_vector = np.arange(initial_time, final_time + time_step, time_step) |
| sim_result = model_instance.run(return_timestamps=time_vector) |
| st.success("β
Simulation completed successfully!") |
| |
| sim_result_df = pl.DataFrame(sim_result) |
| st.dataframe(sim_result_df) |
| |
| csv_data = sim_result_df.to_csv().encode("utf-8") |
| st.download_button("β¬οΈ Download CSV", csv_data, "simulation_output.csv", "text/csv") |
| st.subheader("π Output Plots") |
| dynamic_vars = [col for col in sim_result_df.columns if sim_result_df[col].n_unique() > 1] |
| default_selection = dynamic_vars[:2] if len(dynamic_vars) >= 2 else dynamic_vars |
| selected_vars = st.multiselect("Select variables to plot", dynamic_vars, default=default_selection) |
| if selected_vars: |
| |
| st.line_chart(sim_result_df.to_pandas()[selected_vars]) |
| except Exception as e: |
| st.error(f"β Simulation failed: {e}") |
|
|
| st.subheader("π Causal Loop Diagram (CLD)") |
| if st.button("Generate CLD") and model_instance and model_docs is not None: |
| try: |
| graph = nx.DiGraph() |
| if isinstance(model_docs, pl.DataFrame): |
| model_docs_df = model_docs |
| elif isinstance(model_docs, pd.DataFrame): |
| model_docs_df = pl.from_pandas(model_docs) |
| else: |
| st.error( |
| f"β Unexpected type for model_docs: {type(model_docs)}. Expected Polars or Pandas DataFrame.") |
| return |
| for row in model_docs_df.iter_rows(): |
| var = row[0] |
| inputs = row[8] if len(row) > 8 else [] |
| if isinstance(inputs, list): |
| for src in inputs: |
| graph.add_edge(src, var) |
| net = Network(height='500px', width='100%', directed=True) |
| net.from_nx(graph) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f: |
| net.save_graph(f.name) |
| html_path = f.name |
| with open(html_path, 'r') as f: |
| html = f.read() |
| b64 = base64.b64encode(html.encode()).decode() |
| st.components.v1.html( |
| f'<iframe src="data:text/html;base64,{b64}" width="100%" height="500"></iframe>', height=550) |
| except Exception as e: |
| st.error(f"β Failed to generate CLD: {e}") |
|
|
| st.subheader("π¦ Stock & Flow Diagram (SFD)") |
| if st.button("Generate SFD") and model_instance and model_docs is not None: |
| try: |
| g = nx.DiGraph() |
| if isinstance(model_docs, pl.DataFrame): |
| model_docs_df = model_docs |
| elif isinstance(model_docs, pd.DataFrame): |
| model_docs_df = pl.from_pandas(model_docs) |
| else: |
| st.error( |
| f"β Unexpected type for model_docs: {type(model_docs)}. Expected Polars or Pandas DataFrame.") |
| return |
|
|
| for row in model_docs_df.iter_rows(): |
| var = row[0] |
| kind = row[7] if len(row) > 7 else "" |
| inputs = row[8] if len(row) > 8 else [] |
| color = "skyblue" if kind == "stock" else "lightgreen" if kind == "flow" else "lightgrey" |
| g.add_node(var, label=var, color=color) |
| if isinstance(inputs, list): |
| for src in inputs: |
| g.add_edge(src, var) |
| net = Network(height='500px', width='100%', directed=True) |
| net.from_nx(g) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f: |
| net.save_graph(f.name) |
| html_path = f.name |
| with open(html_path, 'r') as f: |
| html = f.read() |
| b64 = base64.b64encode(html.encode()).decode() |
| st.components.v1.html( |
| f'<iframe src="data:text/html;base64,{b64}" width="100%" height="550"></iframe>', |
| height=550) |
| except Exception as e: |
| st.error(f"β Failed to generate SFD: {e}") |
|
|
| st.subheader("π§ Feedback Loop Detection (Beta)") |
| if st.button("Detect Feedback Loops") and model_instance and model_docs is not None: |
| try: |
| G = nx.DiGraph() |
| if isinstance(model_docs, pl.DataFrame): |
| model_docs_df = model_docs |
| elif isinstance(model_docs, pd.DataFrame): |
| model_docs_df = pl.from_pandas(model_docs) |
| else: |
| st.error( |
| f"β Unexpected type for model_docs: {type(model_docs)}. Expected Polars or Pandas DataFrame.") |
| return |
|
|
| for row in model_docs_df.iter_rows(): |
| var = row[0] |
| inputs = row[8] if len(row) > 8 else [] |
| if isinstance(inputs, list): |
| for src in inputs: |
| G.add_edge(src, var) |
| cycles = list(nx.simple_cycles(G)) |
| if cycles: |
| st.success(f"Found {len(cycles)} loop(s):") |
| for i, cycle in enumerate(cycles, 1): |
| st.markdown(f"**Loop {i}:** {' β '.join(cycle)} β {cycle[0]}") |
| else: |
| st.info("No feedback loops detected.") |
| except Exception as e: |
| st.error(f"Loop detection failed: {e}") |
|
|
| if sim_result is not None and st.button("Generate PDF Report"): |
| try: |
| pdf = FPDF() |
| pdf.add_page() |
| pdf.set_font("Arial", size=12) |
| pdf.cell(200, 10, txt="PySD Simulation Report", ln=1, align="C") |
| pdf.ln(5) |
| pdf.set_font("Arial", size=10) |
| sim_result_df = pl.DataFrame(sim_result) |
| for col in sim_result_df.columns[:10]: |
| pdf.cell(200, 10, |
| txt=f"Sample output for {col}: {sim_result_df[col][-1]:.2f}", |
| ln=1) |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f: |
| pdf.output(f.name) |
| pdf_path = f.name |
| with open(pdf_path, "rb") as f: |
| st.download_button("π Download PDF Report", f.read(), |
| "pysd_simulation_report.pdf", "application/pdf") |
| except Exception as e: |
| st.error(f"Failed to generate PDF report: {e}") |
|
|