from __future__ import annotations import os from typing import Callable import pandas as pd import streamlit as st from autofarm.paths import REPO_ROOT from .constants import ( INTERVENTION_ACTION_TYPES, INTERVENTION_SIM_CSV_PATH, INTERVENTION_SIM_ERROR_KEY, INTERVENTION_SIM_MAP_PATH, INTERVENTION_SIM_VIEW_KEY, SIM_INPUT_CHALLENGES_CSV_PATH, SIM_INPUT_INTERVENTIONS_CSV_PATH, SIM_INPUT_RESULTS_ROOT, _ACTION_NAME_FOR_CSV, _CHALLENGE_NAME_FOR_CSV, ) def build_sim_input_interventions(snapshot) -> list[dict[str, object]]: """Extract robot treatment actions into rows for sim_input_interventions.csv.""" seen: set[tuple[int, int, str]] = set() rows: list[dict[str, object]] = [] zone_lookup: dict[str, tuple[int, int]] = {} for zone in snapshot.observation.zone_states: zone_lookup[zone.zone_id] = (zone.row, zone.col) for event in snapshot.event_history: action_str = str(event.action) if action_str not in INTERVENTION_ACTION_TYPES: continue if event.zone_id is None or event.zone_id not in zone_lookup: continue r, c = zone_lookup[event.zone_id] csv_action = _ACTION_NAME_FOR_CSV.get(action_str, action_str) key = (r, c, csv_action) if key in seen: continue seen.add(key) rows.append({"row": r, "col": c, "action": csv_action}) return rows def build_sim_input_challenges(snapshot) -> list[dict[str, object]]: """Extract challenges into rows for sim_input_challenges.csv.""" zone_lookup: dict[str, tuple[int, int]] = {} for zone in snapshot.observation.zone_states: zone_lookup[zone.zone_id] = (zone.row, zone.col) rows: list[dict[str, object]] = [] for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]: if challenge.zone_id not in zone_lookup: continue r, c = zone_lookup[challenge.zone_id] display_name = challenge.name.lower() stress = _CHALLENGE_NAME_FOR_CSV.get(display_name, display_name) treated = challenge.resolution_status == "success" rows.append({"row": r, "col": c, "challenge": stress, "treated": treated}) return rows def write_sim_input_csvs(snapshot) -> None: """Write sim_input_interventions.csv and sim_input_challenges.csv from live simulator data.""" SIM_INPUT_RESULTS_ROOT.mkdir(parents=True, exist_ok=True) intervention_rows = build_sim_input_interventions(snapshot) lines = ["row,col,action"] for row in intervention_rows: lines.append(f"{row['row']},{row['col']},{row['action']}") SIM_INPUT_INTERVENTIONS_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8") challenge_rows = build_sim_input_challenges(snapshot) lines = ["row,col,challenge,treated"] for row in challenge_rows: treated_str = "true" if row["treated"] else "false" lines.append(f"{row['row']},{row['col']},{row['challenge']},{treated_str}") SIM_INPUT_CHALLENGES_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8") def _run_intervention_simulation_pipeline(snapshot) -> str | None: """Write live simulator data to CSVs and run the intervention simulation pipeline. Returns an error message string on failure, or None on success. """ try: write_sim_input_csvs(snapshot) except Exception as exc: return f"Failed to write simulator input CSVs: {exc}" try: import importlib.util import matplotlib matplotlib.use('Agg') # intervention_simulation.py uses relative paths (e.g. Path("./results/...")) # so we must ensure CWD is the repo root while it runs. prev_cwd = os.getcwd() os.chdir(REPO_ROOT) try: spec = importlib.util.spec_from_file_location( "intervention_simulation", REPO_ROOT / "apps" / "intervention_simulation.py", ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) mod.run_simulation() finally: os.chdir(prev_cwd) except Exception as exc: return f"Intervention simulation pipeline failed: {exc}" return None def render_intervention_sim_view(get_snapshot: Callable[[], object] | None = None) -> None: st.subheader("Intervention Impact Analysis") error = st.session_state.get(INTERVENTION_SIM_ERROR_KEY) # If the pipeline hasn't produced a PNG yet (or an earlier run errored), run it now. if error is None and not INTERVENTION_SIM_MAP_PATH.exists(): snapshot = get_snapshot() if get_snapshot is not None else None if snapshot is None: st.error("No simulator environment available. Please run the simulator first.") if st.button("Back to Simulator"): st.session_state[INTERVENTION_SIM_VIEW_KEY] = False st.rerun() return with st.spinner("Running intervention simulation pipeline (this trains a model and may take a minute)..."): err = _run_intervention_simulation_pipeline(snapshot) if err is not None: st.session_state[INTERVENTION_SIM_ERROR_KEY] = err st.rerun() return # Handle error state if error is not None: st.error(error) col_retry, col_back = st.columns(2) with col_retry: if st.button("Retry"): st.session_state[INTERVENTION_SIM_ERROR_KEY] = None # Remove stale PNG so the pipeline re-runs if INTERVENTION_SIM_MAP_PATH.exists(): INTERVENTION_SIM_MAP_PATH.unlink() st.rerun() with col_back: if st.button("Back to Simulator"): st.session_state[INTERVENTION_SIM_VIEW_KEY] = False st.rerun() return # Display the 3-panel before/after/delta map if INTERVENTION_SIM_MAP_PATH.exists(): st.image( str(INTERVENTION_SIM_MAP_PATH), caption="Before / After / Delta yield prediction map", use_container_width=True, ) # Display CSV results summary alongside the map if INTERVENTION_SIM_CSV_PATH.exists(): try: results_df = pd.read_csv(INTERVENTION_SIM_CSV_PATH) if not results_df.empty: st.subheader("Simulation Results") st.dataframe(results_df, use_container_width=True) except Exception: pass else: # Pipeline ran without error but produced no PNG -- this happens when # both input CSVs are empty (no challenges placed and no robot # interventions executed yet). st.info( "No intervention data to simulate. The robot has not yet performed " "any treatment actions and no challenges have been placed on the field. " "Go back to the simulator, advance a few steps so the robot executes " "interventions or place challenges, then try again." ) if st.button("Back to Simulator"): st.session_state[INTERVENTION_SIM_VIEW_KEY] = False st.rerun()