| | import streamlit as st |
| | from pysd import read_vensim, read_xmile |
| | import pandas as pd |
| | import matplotlib.pyplot as plt |
| | import tempfile |
| | import os |
| | import requests |
| | import networkx as nx |
| | from pyvis.network import Network |
| | import base64 |
| | import seaborn as sns |
| | from fpdf import FPDF |
| | import json |
| |
|
| | st.set_page_config(page_title="System Dynamics CLD/SFD Visualizer", layout="wide") |
| | st.title("π System Dynamics Simulator with CLD/SFD and LLM Reports") |
| |
|
| | try: |
| | HF_API_TOKEN = st.secrets["HF_API_TOKEN"] |
| | except KeyError: |
| | HF_API_TOKEN = st.text_input("Enter Hugging Face API Token", type="password") |
| |
|
| | HF_MODEL_URL = "https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.1" |
| |
|
| | def ask_llm(prompt): |
| | if not HF_API_TOKEN: |
| | return "π API token required." |
| | headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} |
| | payload = {"inputs": prompt} |
| | response = requests.post(HF_MODEL_URL, headers=headers, json=payload) |
| | if response.status_code == 200: |
| | return response.json()[0]["generated_text"] |
| | else: |
| | return f"β οΈ Error: {response.text}" |
| |
|
| | def export_csv(df): |
| | csv = df.to_csv(index=True) |
| | b64 = base64.b64encode(csv.encode()).decode() |
| | return f'<a href="data:file/csv;base64,{b64}" download="simulation_results.csv">π₯ Download CSV</a>' |
| |
|
| | def generate_network_graph(dependencies): |
| | G = nx.DiGraph() |
| | for var, inputs in dependencies.items(): |
| | for dep in inputs: |
| | G.add_edge(dep, var) |
| | return G |
| |
|
| | def draw_pyvis_graph(G): |
| | net = Network(height="600px", width="100%", directed=True) |
| | net.barnes_hut() |
| | for node in G.nodes: |
| | net.add_node(node, label=node) |
| | for source, target in G.edges: |
| | net.add_edge(source, target) |
| | net.repulsion() |
| | path = tempfile.NamedTemporaryFile(delete=False, suffix=".html").name |
| | net.save_graph(path) |
| | return path |
| |
|
| | uploaded_file = st.file_uploader("Upload a Vensim (.mdl) or XMILE (.xmile) file", type=["mdl", "xmile"]) |
| |
|
| | if uploaded_file: |
| | suffix = os.path.splitext(uploaded_file.name)[-1].lower() |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: |
| | tmp_file.write(uploaded_file.getvalue()) |
| | model_path = tmp_file.name |
| |
|
| | try: |
| | if suffix == ".mdl": |
| | model = read_vensim(model_path) |
| | else: |
| | model = read_xmile(model_path) |
| | st.success("β
Model loaded successfully!") |
| |
|
| | docs = model.doc() |
| | constants = docs.get("constant_auxiliaries", {}) |
| | default_params = {k: float(v["value"]) for k, v in constants.items() if "value" in v and str(v["value"]).replace('.', '', 1).isdigit()} |
| | preset_options = { |
| | "Default": default_params, |
| | "High Sensitivity": {k: v * 1.2 for k, v in default_params.items()}, |
| | "Low Sensitivity": {k: v * 0.8 for k, v in default_params.items()} |
| | } |
| |
|
| | uploaded_json = st.file_uploader("Or upload your own preset (JSON)", type=["json"]) |
| | if uploaded_json: |
| | try: |
| | user_preset = json.load(uploaded_json) |
| | preset_options["User Defined"] = user_preset |
| | except Exception as e: |
| | st.warning(f"β οΈ Failed to load custom preset: {e}") |
| |
|
| | preset = st.selectbox("Select a preset", options=list(preset_options.keys())) |
| | parameters = {const: st.number_input(f"{const}", value=val) for const, val in preset_options[preset].items()} |
| |
|
| | compare_runs = st.checkbox("π Compare this run with previous?") |
| | if "run_history" not in st.session_state: |
| | st.session_state["run_history"] = [] |
| |
|
| | if st.button("Run Simulation"): |
| | result = model.run(params=parameters) |
| | st.write('β
result type:', type(result)) |
| | st.session_state["run_history"].append((preset, parameters.copy(), result.copy())) |
| |
|
| | st.subheader("π Simulation Output") |
| | st.dataframe(result) |
| | st.markdown(export_csv(result), unsafe_allow_html=True) |
| |
|
| | st.subheader("π Time-Series Plots") |
| | selected_vars = st.multiselect("Select variables to plot", result.columns.tolist(), default=result.columns.tolist()) |
| | for var in selected_vars: |
| | fig, ax = plt.subplots() |
| | for name, _, df in st.session_state["run_history"]: |
| | ax.plot(df.index, df[var], label=name) |
| | ax.set_title(f"{var} Comparison") |
| | ax.set_xlabel("Time") |
| | ax.set_ylabel(var) |
| | ax.legend() |
| | st.pyplot(fig) |
| |
|
| | st.subheader("π€ Export Comparison Results") |
| | combined_df = pd.DataFrame() |
| | for name, _, df in st.session_state["run_history"]: |
| | df_renamed = df.copy() |
| | df_renamed.columns = [f"{col} ({name})" for col in df_renamed.columns] |
| | if combined_df.empty: |
| | combined_df = df_renamed |
| | else: |
| | combined_df = combined_df.join(df_renamed, how='outer') |
| |
|
| | csv_all = combined_df.to_csv(index=True) |
| | b64_all = base64.b64encode(csv_all.encode()).decode() |
| | st.markdown(f'<a href="data:file/csv;base64,{b64_all}" download="all_simulation_runs.csv">π₯ Download All Comparison Results</a>', unsafe_allow_html=True) |
| |
|
| | st.subheader("π % Difference Matrix (vs. First Run)") |
| | if len(st.session_state["run_history"]) > 1: |
| | base_name, _, base_df = st.session_state["run_history"][0] |
| | last_name, _, last_df = st.session_state["run_history"][-1] |
| | try: |
| | |
| | common_cols = list(set(base_df.columns) & set(last_df.columns)) |
| | if common_cols: |
| | diff_df = ((last_df[common_cols] - base_df[common_cols]) / base_df[common_cols] * 100).round(2) |
| | st.dataframe(diff_df) |
| | else: |
| | st.warning("β οΈ No common variables to compute differences.") |
| | except Exception as e: |
| | st.warning(f"β Could not compute differences: {e}") |
| |
|
| | st.subheader("π₯ Sensitivity Heatmap (Std. Dev. Across Runs)") |
| | if len(st.session_state["run_history"]) > 1: |
| | try: |
| | all_results = [df for _, _, df in st.session_state["run_history"]] |
| | if all_results: |
| | combined = pd.concat(all_results, axis=0, keys=[name for name, _, _ in st.session_state["run_history"]]) |
| | std_dev = combined.groupby(level=1).std().T |
| | fig, ax = plt.subplots(figsize=(10, len(std_dev) // 2 + 2)) |
| | sns.heatmap(std_dev, cmap="YlGnBu", ax=ax, annot=True, fmt=".2f") |
| | ax.set_title("Standard Deviation of Each Variable Across Runs") |
| | st.pyplot(fig) |
| | else: |
| | st.warning("β οΈ No simulation results to generate heatmap.") |
| | except Exception as e: |
| | st.warning(f"β οΈ Could not generate sensitivity heatmap: {e}") |
| |
|
| | st.subheader("π Generate Full Model Summary Report") |
| | if st.button("π§ Summarize with LLM"): |
| | |
| | dependencies = docs.get('dependencies', {}) |
| | loops = list(nx.simple_cycles(generate_network_graph(dependencies))) |
| |
|
| | |
| | sensitive_vars = "No runs available" |
| | if len(st.session_state["run_history"]) > 1: |
| | try: |
| | all_results = [df for _, _, df in st.session_state["run_history"]] |
| | if all_results: |
| | combined = pd.concat(all_results, axis=0, keys=[name for name, _, _ in st.session_state["run_history"]]) |
| | std_dev_series = combined.groupby(level=1).std().max(axis=0).sort_values(ascending=False).head(5) |
| | sensitive_vars = list(std_dev_series.index) |
| | except: |
| | sensitive_vars = "Could not calculate sensitivity" |
| |
|
| | summary_prompt = f"""You are an expert in System Dynamics. Summarize the following model and simulation results. |
| | Model components (first 1000 chars): |
| | {str(model.components)[:1000]} |
| | Detected loops: |
| | {loops} |
| | Variables potentially most sensitive (based on max standard deviation across runs): |
| | {sensitive_vars} |
| | Provide insights on model behavior and feedback structure. |
| | """ |
| | summary_response = ask_llm(summary_prompt) |
| | st.markdown(f"### π€ LLM-Generated Summary\n\n{summary_response}") |
| |
|
| | md_content = f"# System Dynamics Model Summary\n\n{summary_response}" |
| | b64_md = base64.b64encode(md_content.encode()).decode() |
| | st.markdown(f'<a href="data:text/markdown;base64,{b64_md}" download="model_summary.md">π₯ Download Markdown Report</a>', unsafe_allow_html=True) |
| |
|
| | pdf = FPDF() |
| | pdf.add_page() |
| | pdf.set_auto_page_break(auto=True, margin=15) |
| | pdf.set_font("Arial", size=12) |
| | for line in summary_response.splitlines(): |
| | pdf.multi_cell(0, 10, line) |
| | pdf_path = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf").name |
| | pdf.output(pdf_path) |
| | with open(pdf_path, "rb") as f: |
| | b64_pdf = base64.b64encode(f.read()).decode() |
| | st.markdown(f'<a href="data:application/pdf;base64,{b64_pdf}" download="model_summary.pdf">π Download PDF Report</a>', unsafe_allow_html=True) |
| |
|
| | st.subheader("π Causal Loop and Stock-Flow Diagram") |
| | if "dependencies" in docs: |
| | G = generate_network_graph(docs["dependencies"]) |
| | loops = list(nx.simple_cycles(G)) |
| | st.markdown(f"Detected **{len(loops)}** feedback loop(s).") |
| | for i, loop in enumerate(loops): |
| | st.markdown(f"Loop {i+1}: {' β '.join(loop)}") |
| |
|
| | html_path = draw_pyvis_graph(G) |
| | with open(html_path, 'r', encoding='utf-8') as file: |
| | html_content = file.read() |
| | st.components.v1.html(html_content, height=600, scrolling=True) |
| |
|
| | st.subheader("π€ Ask the LLM about the Model") |
| | question = st.text_area("What would you like to ask?") |
| | if st.button("Ask LLM"): |
| | model_info = str(model.components)[:1000] |
| | prompt = f"""The following is a system dynamics model fragment: |
| | {model_info} |
| | Question: {question} |
| | """ |
| | response = ask_llm(prompt) |
| | st.markdown(f"**Answer:**\n\n{response}") |
| |
|
| | except Exception as e: |
| | st.error(f"β Error: {str(e)}") |
| |
|
| | finally: |
| | if 'model_path' in locals() and os.path.exists(model_path): |
| | os.remove(model_path) |
| |
|