autofarm / apps /intervention /simulation_view.py
isabelku's picture
AutoFarm Space deploy
826dd96
from __future__ import annotations
import os
from typing import Callable
import pandas as pd
import streamlit as st
from autofarm.paths import REPO_ROOT
from .constants import (
INTERVENTION_ACTION_TYPES,
INTERVENTION_SIM_CSV_PATH,
INTERVENTION_SIM_ERROR_KEY,
INTERVENTION_SIM_MAP_PATH,
INTERVENTION_SIM_VIEW_KEY,
SIM_INPUT_CHALLENGES_CSV_PATH,
SIM_INPUT_INTERVENTIONS_CSV_PATH,
SIM_INPUT_RESULTS_ROOT,
_ACTION_NAME_FOR_CSV,
_CHALLENGE_NAME_FOR_CSV,
)
def build_sim_input_interventions(snapshot) -> list[dict[str, object]]:
"""Extract robot treatment actions into rows for sim_input_interventions.csv."""
seen: set[tuple[int, int, str]] = set()
rows: list[dict[str, object]] = []
zone_lookup: dict[str, tuple[int, int]] = {}
for zone in snapshot.observation.zone_states:
zone_lookup[zone.zone_id] = (zone.row, zone.col)
for event in snapshot.event_history:
action_str = str(event.action)
if action_str not in INTERVENTION_ACTION_TYPES:
continue
if event.zone_id is None or event.zone_id not in zone_lookup:
continue
r, c = zone_lookup[event.zone_id]
csv_action = _ACTION_NAME_FOR_CSV.get(action_str, action_str)
key = (r, c, csv_action)
if key in seen:
continue
seen.add(key)
rows.append({"row": r, "col": c, "action": csv_action})
return rows
def build_sim_input_challenges(snapshot) -> list[dict[str, object]]:
"""Extract challenges into rows for sim_input_challenges.csv."""
zone_lookup: dict[str, tuple[int, int]] = {}
for zone in snapshot.observation.zone_states:
zone_lookup[zone.zone_id] = (zone.row, zone.col)
rows: list[dict[str, object]] = []
for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]:
if challenge.zone_id not in zone_lookup:
continue
r, c = zone_lookup[challenge.zone_id]
display_name = challenge.name.lower()
stress = _CHALLENGE_NAME_FOR_CSV.get(display_name, display_name)
treated = challenge.resolution_status == "success"
rows.append({"row": r, "col": c, "challenge": stress, "treated": treated})
return rows
def write_sim_input_csvs(snapshot) -> None:
"""Write sim_input_interventions.csv and sim_input_challenges.csv from live simulator data."""
SIM_INPUT_RESULTS_ROOT.mkdir(parents=True, exist_ok=True)
intervention_rows = build_sim_input_interventions(snapshot)
lines = ["row,col,action"]
for row in intervention_rows:
lines.append(f"{row['row']},{row['col']},{row['action']}")
SIM_INPUT_INTERVENTIONS_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")
challenge_rows = build_sim_input_challenges(snapshot)
lines = ["row,col,challenge,treated"]
for row in challenge_rows:
treated_str = "true" if row["treated"] else "false"
lines.append(f"{row['row']},{row['col']},{row['challenge']},{treated_str}")
SIM_INPUT_CHALLENGES_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")
def _run_intervention_simulation_pipeline(snapshot) -> str | None:
"""Write live simulator data to CSVs and run the intervention simulation pipeline.
Returns an error message string on failure, or None on success.
"""
try:
write_sim_input_csvs(snapshot)
except Exception as exc:
return f"Failed to write simulator input CSVs: {exc}"
try:
import importlib.util
import matplotlib
matplotlib.use('Agg')
# intervention_simulation.py uses relative paths (e.g. Path("./results/..."))
# so we must ensure CWD is the repo root while it runs.
prev_cwd = os.getcwd()
os.chdir(REPO_ROOT)
try:
spec = importlib.util.spec_from_file_location(
"intervention_simulation",
REPO_ROOT / "apps" / "intervention_simulation.py",
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
mod.run_simulation()
finally:
os.chdir(prev_cwd)
except Exception as exc:
return f"Intervention simulation pipeline failed: {exc}"
return None
def render_intervention_sim_view(get_snapshot: Callable[[], object] | None = None) -> None:
st.subheader("Intervention Impact Analysis")
error = st.session_state.get(INTERVENTION_SIM_ERROR_KEY)
# If the pipeline hasn't produced a PNG yet (or an earlier run errored), run it now.
if error is None and not INTERVENTION_SIM_MAP_PATH.exists():
snapshot = get_snapshot() if get_snapshot is not None else None
if snapshot is None:
st.error("No simulator environment available. Please run the simulator first.")
if st.button("Back to Simulator"):
st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
st.rerun()
return
with st.spinner("Running intervention simulation pipeline (this trains a model and may take a minute)..."):
err = _run_intervention_simulation_pipeline(snapshot)
if err is not None:
st.session_state[INTERVENTION_SIM_ERROR_KEY] = err
st.rerun()
return
# Handle error state
if error is not None:
st.error(error)
col_retry, col_back = st.columns(2)
with col_retry:
if st.button("Retry"):
st.session_state[INTERVENTION_SIM_ERROR_KEY] = None
# Remove stale PNG so the pipeline re-runs
if INTERVENTION_SIM_MAP_PATH.exists():
INTERVENTION_SIM_MAP_PATH.unlink()
st.rerun()
with col_back:
if st.button("Back to Simulator"):
st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
st.rerun()
return
# Display the 3-panel before/after/delta map
if INTERVENTION_SIM_MAP_PATH.exists():
st.image(
str(INTERVENTION_SIM_MAP_PATH),
caption="Before / After / Delta yield prediction map",
use_container_width=True,
)
# Display CSV results summary alongside the map
if INTERVENTION_SIM_CSV_PATH.exists():
try:
results_df = pd.read_csv(INTERVENTION_SIM_CSV_PATH)
if not results_df.empty:
st.subheader("Simulation Results")
st.dataframe(results_df, use_container_width=True)
except Exception:
pass
else:
# Pipeline ran without error but produced no PNG -- this happens when
# both input CSVs are empty (no challenges placed and no robot
# interventions executed yet).
st.info(
"No intervention data to simulate. The robot has not yet performed "
"any treatment actions and no challenges have been placed on the field. "
"Go back to the simulator, advance a few steps so the robot executes "
"interventions or place challenges, then try again."
)
if st.button("Back to Simulator"):
st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
st.rerun()