Pysd_rev1 / main.py
razaali10's picture
Update main.py
56cf796 verified
import streamlit as st
from pysd import read_vensim, read_xmile
import os
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import networkx as nx
from pyvis.network import Network
import tempfile
import base64
from fpdf import FPDF
import polars as pl # Import Polars
import pandas as pd
st.set_page_config(page_title="πŸ“ˆ PySD System Dynamics Simulator", layout="wide")
st.title("πŸ“¦ System Dynamics Model Simulator using PySD")
UPLOAD_DIR = Path("uploaded_models")
UPLOAD_DIR.mkdir(exist_ok=True)
# Globals
model_instance = None
model_docs = None
sim_result = None
uploaded_file = st.file_uploader("Upload a `.mdl` (Vensim) or `.xmile` (XMILE) model", type=["mdl", "xmile"])
if uploaded_file is not None:
model_instance = None
model_docs = None
sim_result = None
file_suffix = os.path.splitext(uploaded_file.name)[-1].lower()
model_path = UPLOAD_DIR / uploaded_file.name
with open(model_path, "wb") as f:
f.write(uploaded_file.getbuffer())
st.success(f"Model uploaded to `{model_path}`")
try:
if file_suffix == ".mdl":
model_instance = read_vensim(str(model_path))
elif file_suffix == ".xmile":
model_instance = read_xmile(str(model_path))
else:
st.error("Unsupported file type.")
st.stop()
# Moved the following lines *inside* the try block
model_docs = model_instance.doc()
# Debug: Print the type and content of model_docs
st.write("Type of model_docs:", type(model_docs))
if isinstance(model_docs, pl.DataFrame): # Check if it's a Polars DataFrame
st.dataframe(model_docs.head()) # Show the first few rows
elif isinstance(model_docs, pd.DataFrame):
st.dataframe(model_docs.head())
else:
st.write("model_docs:", model_docs) # Print the entire object if not a DataFrame
st.success("βœ… Model parsed successfully!") # Only show success if NO error
except Exception as e:
st.error(f"Error loading model: {e}")
st.stop() # Stop execution *after* showing the error
st.subheader("⏱️ Simulation Settings")
initial_time = st.number_input("Initial Time", value=0.0)
final_time = st.number_input("Final Time", value=100.0)
time_step = st.number_input("Time Step", value=1.0)
if st.button("Run Simulation") and model_instance:
try:
time_vector = np.arange(initial_time, final_time + time_step, time_step)
sim_result = model_instance.run(return_timestamps=time_vector)
st.success("βœ… Simulation completed successfully!")
# Use Polars DataFrame instead of Pandas
sim_result_df = pl.DataFrame(sim_result)
st.dataframe(sim_result_df)
# Convert to CSV using Polars
csv_data = sim_result_df.to_csv().encode("utf-8")
st.download_button("⬇️ Download CSV", csv_data, "simulation_output.csv", "text/csv")
st.subheader("πŸ“Š Output Plots")
dynamic_vars = [col for col in sim_result_df.columns if sim_result_df[col].n_unique() > 1]
default_selection = dynamic_vars[:2] if len(dynamic_vars) >= 2 else dynamic_vars
selected_vars = st.multiselect("Select variables to plot", dynamic_vars, default=default_selection)
if selected_vars:
# Convert the Polars DataFrame to a Pandas DataFrame for Streamlit compatibility
st.line_chart(sim_result_df.to_pandas()[selected_vars])
except Exception as e:
st.error(f"❌ Simulation failed: {e}")
st.subheader("πŸ” Causal Loop Diagram (CLD)")
if st.button("Generate CLD") and model_instance and model_docs is not None: # check model_instance
try:
graph = nx.DiGraph()
if isinstance(model_docs, pl.DataFrame):
model_docs_df = model_docs
elif isinstance(model_docs, pd.DataFrame):
model_docs_df = pl.from_pandas(model_docs)
else:
st.error(
f"❌ Unexpected type for model_docs: {type(model_docs)}. Expected Polars or Pandas DataFrame.")
return # changed from return to st.stop()
for row in model_docs_df.iter_rows():
var = row[0] # Index is the variable name in the original dict
inputs = row[8] if len(row) > 8 else [] # 'inputs' is at index 8
if isinstance(inputs, list): # make sure inputs is a list
for src in inputs:
graph.add_edge(src, var)
net = Network(height='500px', width='100%', directed=True)
net.from_nx(graph)
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
net.save_graph(f.name)
html_path = f.name
with open(html_path, 'r') as f:
html = f.read()
b64 = base64.b64encode(html.encode()).decode()
st.components.v1.html(
f'<iframe src="data:text/html;base64,{b64}" width="100%" height="500"></iframe>', height=550)
except Exception as e:
st.error(f"❌ Failed to generate CLD: {e}")
st.subheader("πŸ“¦ Stock & Flow Diagram (SFD)")
if st.button("Generate SFD") and model_instance and model_docs is not None: # check model_instance:
try:
g = nx.DiGraph()
if isinstance(model_docs, pl.DataFrame):
model_docs_df = model_docs
elif isinstance(model_docs, pd.DataFrame):
model_docs_df = pl.from_pandas(model_docs)
else:
st.error(
f"❌ Unexpected type for model_docs: {type(model_docs)}. Expected Polars or Pandas DataFrame.")
return # changed from return to st.stop()
for row in model_docs_df.iter_rows():
var = row[0]
kind = row[7] if len(row) > 7 else "" # 'kind' is at index 7
inputs = row[8] if len(row) > 8 else [] # 'inputs' is at index 8
color = "skyblue" if kind == "stock" else "lightgreen" if kind == "flow" else "lightgrey"
g.add_node(var, label=var, color=color)
if isinstance(inputs, list): # make sure inputs is a list
for src in inputs:
g.add_edge(src, var)
net = Network(height='500px', width='100%', directed=True)
net.from_nx(g)
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
net.save_graph(f.name)
html_path = f.name
with open(html_path, 'r') as f:
html = f.read()
b64 = base64.b64encode(html.encode()).decode()
st.components.v1.html(
f'<iframe src="data:text/html;base64,{b64}" width="100%" height="550"></iframe>',
height=550)
except Exception as e:
st.error(f"❌ Failed to generate SFD: {e}")
st.subheader("🧠 Feedback Loop Detection (Beta)")
if st.button("Detect Feedback Loops") and model_instance and model_docs is not None: # check model_instance
try:
G = nx.DiGraph()
if isinstance(model_docs, pl.DataFrame):
model_docs_df = model_docs
elif isinstance(model_docs, pd.DataFrame):
model_docs_df = pl.from_pandas(model_docs)
else:
st.error(
f"❌ Unexpected type for model_docs: {type(model_docs)}. Expected Polars or Pandas DataFrame.")
return # changed from return to st.stop()
for row in model_docs_df.iter_rows():
var = row[0]
inputs = row[8] if len(row) > 8 else [] # 'inputs' is at index 8
if isinstance(inputs, list): # make sure inputs is a list
for src in inputs:
G.add_edge(src, var)
cycles = list(nx.simple_cycles(G))
if cycles:
st.success(f"Found {len(cycles)} loop(s):")
for i, cycle in enumerate(cycles, 1):
st.markdown(f"**Loop {i}:** {' ➝ '.join(cycle)} ➝ {cycle[0]}")
else:
st.info("No feedback loops detected.")
except Exception as e:
st.error(f"Loop detection failed: {e}")
if sim_result is not None and st.button("Generate PDF Report"):
try:
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt="PySD Simulation Report", ln=1, align="C")
pdf.ln(5)
pdf.set_font("Arial", size=10)
sim_result_df = pl.DataFrame(sim_result) # convert sim_result to polars df
for col in sim_result_df.columns[:10]:
pdf.cell(200, 10,
txt=f"Sample output for {col}: {sim_result_df[col][-1]:.2f}",
ln=1) # use polars to access last element
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as f:
pdf.output(f.name)
pdf_path = f.name
with open(pdf_path, "rb") as f:
st.download_button("πŸ“„ Download PDF Report", f.read(),
"pysd_simulation_report.pdf", "application/pdf")
except Exception as e:
st.error(f"Failed to generate PDF report: {e}")