| from __future__ import annotations |
|
|
| import os |
| from typing import Callable |
|
|
| import pandas as pd |
| import streamlit as st |
|
|
| from autofarm.paths import REPO_ROOT |
|
|
| from .constants import ( |
| INTERVENTION_ACTION_TYPES, |
| INTERVENTION_SIM_CSV_PATH, |
| INTERVENTION_SIM_ERROR_KEY, |
| INTERVENTION_SIM_MAP_PATH, |
| INTERVENTION_SIM_VIEW_KEY, |
| SIM_INPUT_CHALLENGES_CSV_PATH, |
| SIM_INPUT_INTERVENTIONS_CSV_PATH, |
| SIM_INPUT_RESULTS_ROOT, |
| _ACTION_NAME_FOR_CSV, |
| _CHALLENGE_NAME_FOR_CSV, |
| ) |
|
|
|
|
| def build_sim_input_interventions(snapshot) -> list[dict[str, object]]: |
| """Extract robot treatment actions into rows for sim_input_interventions.csv.""" |
| seen: set[tuple[int, int, str]] = set() |
| rows: list[dict[str, object]] = [] |
| zone_lookup: dict[str, tuple[int, int]] = {} |
| for zone in snapshot.observation.zone_states: |
| zone_lookup[zone.zone_id] = (zone.row, zone.col) |
| for event in snapshot.event_history: |
| action_str = str(event.action) |
| if action_str not in INTERVENTION_ACTION_TYPES: |
| continue |
| if event.zone_id is None or event.zone_id not in zone_lookup: |
| continue |
| r, c = zone_lookup[event.zone_id] |
| csv_action = _ACTION_NAME_FOR_CSV.get(action_str, action_str) |
| key = (r, c, csv_action) |
| if key in seen: |
| continue |
| seen.add(key) |
| rows.append({"row": r, "col": c, "action": csv_action}) |
| return rows |
|
|
|
|
| def build_sim_input_challenges(snapshot) -> list[dict[str, object]]: |
| """Extract challenges into rows for sim_input_challenges.csv.""" |
| zone_lookup: dict[str, tuple[int, int]] = {} |
| for zone in snapshot.observation.zone_states: |
| zone_lookup[zone.zone_id] = (zone.row, zone.col) |
| rows: list[dict[str, object]] = [] |
| for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]: |
| if challenge.zone_id not in zone_lookup: |
| continue |
| r, c = zone_lookup[challenge.zone_id] |
| display_name = challenge.name.lower() |
| stress = _CHALLENGE_NAME_FOR_CSV.get(display_name, display_name) |
| treated = challenge.resolution_status == "success" |
| rows.append({"row": r, "col": c, "challenge": stress, "treated": treated}) |
| return rows |
|
|
|
|
| def write_sim_input_csvs(snapshot) -> None: |
| """Write sim_input_interventions.csv and sim_input_challenges.csv from live simulator data.""" |
| SIM_INPUT_RESULTS_ROOT.mkdir(parents=True, exist_ok=True) |
|
|
| intervention_rows = build_sim_input_interventions(snapshot) |
| lines = ["row,col,action"] |
| for row in intervention_rows: |
| lines.append(f"{row['row']},{row['col']},{row['action']}") |
| SIM_INPUT_INTERVENTIONS_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
| challenge_rows = build_sim_input_challenges(snapshot) |
| lines = ["row,col,challenge,treated"] |
| for row in challenge_rows: |
| treated_str = "true" if row["treated"] else "false" |
| lines.append(f"{row['row']},{row['col']},{row['challenge']},{treated_str}") |
| SIM_INPUT_CHALLENGES_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
|
|
| def _run_intervention_simulation_pipeline(snapshot) -> str | None: |
| """Write live simulator data to CSVs and run the intervention simulation pipeline. |
| |
| Returns an error message string on failure, or None on success. |
| """ |
| try: |
| write_sim_input_csvs(snapshot) |
| except Exception as exc: |
| return f"Failed to write simulator input CSVs: {exc}" |
|
|
| try: |
| import importlib.util |
| import matplotlib |
| matplotlib.use('Agg') |
|
|
| |
| |
| prev_cwd = os.getcwd() |
| os.chdir(REPO_ROOT) |
| try: |
| spec = importlib.util.spec_from_file_location( |
| "intervention_simulation", |
| REPO_ROOT / "apps" / "intervention_simulation.py", |
| ) |
| mod = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(mod) |
| mod.run_simulation() |
| finally: |
| os.chdir(prev_cwd) |
| except Exception as exc: |
| return f"Intervention simulation pipeline failed: {exc}" |
|
|
| return None |
|
|
|
|
| def render_intervention_sim_view(get_snapshot: Callable[[], object] | None = None) -> None: |
| st.subheader("Intervention Impact Analysis") |
|
|
| error = st.session_state.get(INTERVENTION_SIM_ERROR_KEY) |
|
|
| |
| if error is None and not INTERVENTION_SIM_MAP_PATH.exists(): |
| snapshot = get_snapshot() if get_snapshot is not None else None |
| if snapshot is None: |
| st.error("No simulator environment available. Please run the simulator first.") |
| if st.button("Back to Simulator"): |
| st.session_state[INTERVENTION_SIM_VIEW_KEY] = False |
| st.rerun() |
| return |
| with st.spinner("Running intervention simulation pipeline (this trains a model and may take a minute)..."): |
| err = _run_intervention_simulation_pipeline(snapshot) |
| if err is not None: |
| st.session_state[INTERVENTION_SIM_ERROR_KEY] = err |
| st.rerun() |
| return |
|
|
| |
| if error is not None: |
| st.error(error) |
| col_retry, col_back = st.columns(2) |
| with col_retry: |
| if st.button("Retry"): |
| st.session_state[INTERVENTION_SIM_ERROR_KEY] = None |
| |
| if INTERVENTION_SIM_MAP_PATH.exists(): |
| INTERVENTION_SIM_MAP_PATH.unlink() |
| st.rerun() |
| with col_back: |
| if st.button("Back to Simulator"): |
| st.session_state[INTERVENTION_SIM_VIEW_KEY] = False |
| st.rerun() |
| return |
|
|
| |
| if INTERVENTION_SIM_MAP_PATH.exists(): |
| st.image( |
| str(INTERVENTION_SIM_MAP_PATH), |
| caption="Before / After / Delta yield prediction map", |
| use_container_width=True, |
| ) |
|
|
| |
| if INTERVENTION_SIM_CSV_PATH.exists(): |
| try: |
| results_df = pd.read_csv(INTERVENTION_SIM_CSV_PATH) |
| if not results_df.empty: |
| st.subheader("Simulation Results") |
| st.dataframe(results_df, use_container_width=True) |
| except Exception: |
| pass |
| else: |
| |
| |
| |
| st.info( |
| "No intervention data to simulate. The robot has not yet performed " |
| "any treatment actions and no challenges have been placed on the field. " |
| "Go back to the simulator, advance a few steps so the robot executes " |
| "interventions or place challenges, then try again." |
| ) |
|
|
| if st.button("Back to Simulator"): |
| st.session_state[INTERVENTION_SIM_VIEW_KEY] = False |
| st.rerun() |
|
|