File size: 7,255 Bytes
826dd96 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | from __future__ import annotations
import os
from typing import Callable
import pandas as pd
import streamlit as st
from autofarm.paths import REPO_ROOT
from .constants import (
INTERVENTION_ACTION_TYPES,
INTERVENTION_SIM_CSV_PATH,
INTERVENTION_SIM_ERROR_KEY,
INTERVENTION_SIM_MAP_PATH,
INTERVENTION_SIM_VIEW_KEY,
SIM_INPUT_CHALLENGES_CSV_PATH,
SIM_INPUT_INTERVENTIONS_CSV_PATH,
SIM_INPUT_RESULTS_ROOT,
_ACTION_NAME_FOR_CSV,
_CHALLENGE_NAME_FOR_CSV,
)
def build_sim_input_interventions(snapshot) -> list[dict[str, object]]:
"""Extract robot treatment actions into rows for sim_input_interventions.csv."""
seen: set[tuple[int, int, str]] = set()
rows: list[dict[str, object]] = []
zone_lookup: dict[str, tuple[int, int]] = {}
for zone in snapshot.observation.zone_states:
zone_lookup[zone.zone_id] = (zone.row, zone.col)
for event in snapshot.event_history:
action_str = str(event.action)
if action_str not in INTERVENTION_ACTION_TYPES:
continue
if event.zone_id is None or event.zone_id not in zone_lookup:
continue
r, c = zone_lookup[event.zone_id]
csv_action = _ACTION_NAME_FOR_CSV.get(action_str, action_str)
key = (r, c, csv_action)
if key in seen:
continue
seen.add(key)
rows.append({"row": r, "col": c, "action": csv_action})
return rows
def build_sim_input_challenges(snapshot) -> list[dict[str, object]]:
"""Extract challenges into rows for sim_input_challenges.csv."""
zone_lookup: dict[str, tuple[int, int]] = {}
for zone in snapshot.observation.zone_states:
zone_lookup[zone.zone_id] = (zone.row, zone.col)
rows: list[dict[str, object]] = []
for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]:
if challenge.zone_id not in zone_lookup:
continue
r, c = zone_lookup[challenge.zone_id]
display_name = challenge.name.lower()
stress = _CHALLENGE_NAME_FOR_CSV.get(display_name, display_name)
treated = challenge.resolution_status == "success"
rows.append({"row": r, "col": c, "challenge": stress, "treated": treated})
return rows
def write_sim_input_csvs(snapshot) -> None:
"""Write sim_input_interventions.csv and sim_input_challenges.csv from live simulator data."""
SIM_INPUT_RESULTS_ROOT.mkdir(parents=True, exist_ok=True)
intervention_rows = build_sim_input_interventions(snapshot)
lines = ["row,col,action"]
for row in intervention_rows:
lines.append(f"{row['row']},{row['col']},{row['action']}")
SIM_INPUT_INTERVENTIONS_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")
challenge_rows = build_sim_input_challenges(snapshot)
lines = ["row,col,challenge,treated"]
for row in challenge_rows:
treated_str = "true" if row["treated"] else "false"
lines.append(f"{row['row']},{row['col']},{row['challenge']},{treated_str}")
SIM_INPUT_CHALLENGES_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")
def _run_intervention_simulation_pipeline(snapshot) -> str | None:
"""Write live simulator data to CSVs and run the intervention simulation pipeline.
Returns an error message string on failure, or None on success.
"""
try:
write_sim_input_csvs(snapshot)
except Exception as exc:
return f"Failed to write simulator input CSVs: {exc}"
try:
import importlib.util
import matplotlib
matplotlib.use('Agg')
# intervention_simulation.py uses relative paths (e.g. Path("./results/..."))
# so we must ensure CWD is the repo root while it runs.
prev_cwd = os.getcwd()
os.chdir(REPO_ROOT)
try:
spec = importlib.util.spec_from_file_location(
"intervention_simulation",
REPO_ROOT / "apps" / "intervention_simulation.py",
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
mod.run_simulation()
finally:
os.chdir(prev_cwd)
except Exception as exc:
return f"Intervention simulation pipeline failed: {exc}"
return None
def render_intervention_sim_view(get_snapshot: Callable[[], object] | None = None) -> None:
st.subheader("Intervention Impact Analysis")
error = st.session_state.get(INTERVENTION_SIM_ERROR_KEY)
# If the pipeline hasn't produced a PNG yet (or an earlier run errored), run it now.
if error is None and not INTERVENTION_SIM_MAP_PATH.exists():
snapshot = get_snapshot() if get_snapshot is not None else None
if snapshot is None:
st.error("No simulator environment available. Please run the simulator first.")
if st.button("Back to Simulator"):
st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
st.rerun()
return
with st.spinner("Running intervention simulation pipeline (this trains a model and may take a minute)..."):
err = _run_intervention_simulation_pipeline(snapshot)
if err is not None:
st.session_state[INTERVENTION_SIM_ERROR_KEY] = err
st.rerun()
return
# Handle error state
if error is not None:
st.error(error)
col_retry, col_back = st.columns(2)
with col_retry:
if st.button("Retry"):
st.session_state[INTERVENTION_SIM_ERROR_KEY] = None
# Remove stale PNG so the pipeline re-runs
if INTERVENTION_SIM_MAP_PATH.exists():
INTERVENTION_SIM_MAP_PATH.unlink()
st.rerun()
with col_back:
if st.button("Back to Simulator"):
st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
st.rerun()
return
# Display the 3-panel before/after/delta map
if INTERVENTION_SIM_MAP_PATH.exists():
st.image(
str(INTERVENTION_SIM_MAP_PATH),
caption="Before / After / Delta yield prediction map",
use_container_width=True,
)
# Display CSV results summary alongside the map
if INTERVENTION_SIM_CSV_PATH.exists():
try:
results_df = pd.read_csv(INTERVENTION_SIM_CSV_PATH)
if not results_df.empty:
st.subheader("Simulation Results")
st.dataframe(results_df, use_container_width=True)
except Exception:
pass
else:
# Pipeline ran without error but produced no PNG -- this happens when
# both input CSVs are empty (no challenges placed and no robot
# interventions executed yet).
st.info(
"No intervention data to simulate. The robot has not yet performed "
"any treatment actions and no challenges have been placed on the field. "
"Go back to the simulator, advance a few steps so the robot executes "
"interventions or place challenges, then try again."
)
if st.button("Back to Simulator"):
st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
st.rerun()
|