File size: 7,255 Bytes
826dd96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from __future__ import annotations

import os
from typing import Callable

import pandas as pd
import streamlit as st

from autofarm.paths import REPO_ROOT

from .constants import (
    INTERVENTION_ACTION_TYPES,
    INTERVENTION_SIM_CSV_PATH,
    INTERVENTION_SIM_ERROR_KEY,
    INTERVENTION_SIM_MAP_PATH,
    INTERVENTION_SIM_VIEW_KEY,
    SIM_INPUT_CHALLENGES_CSV_PATH,
    SIM_INPUT_INTERVENTIONS_CSV_PATH,
    SIM_INPUT_RESULTS_ROOT,
    _ACTION_NAME_FOR_CSV,
    _CHALLENGE_NAME_FOR_CSV,
)


def build_sim_input_interventions(snapshot) -> list[dict[str, object]]:
    """Extract robot treatment actions into rows for sim_input_interventions.csv."""
    seen: set[tuple[int, int, str]] = set()
    rows: list[dict[str, object]] = []
    zone_lookup: dict[str, tuple[int, int]] = {}
    for zone in snapshot.observation.zone_states:
        zone_lookup[zone.zone_id] = (zone.row, zone.col)
    for event in snapshot.event_history:
        action_str = str(event.action)
        if action_str not in INTERVENTION_ACTION_TYPES:
            continue
        if event.zone_id is None or event.zone_id not in zone_lookup:
            continue
        r, c = zone_lookup[event.zone_id]
        csv_action = _ACTION_NAME_FOR_CSV.get(action_str, action_str)
        key = (r, c, csv_action)
        if key in seen:
            continue
        seen.add(key)
        rows.append({"row": r, "col": c, "action": csv_action})
    return rows


def build_sim_input_challenges(snapshot) -> list[dict[str, object]]:
    """Extract challenges into rows for sim_input_challenges.csv."""
    zone_lookup: dict[str, tuple[int, int]] = {}
    for zone in snapshot.observation.zone_states:
        zone_lookup[zone.zone_id] = (zone.row, zone.col)
    rows: list[dict[str, object]] = []
    for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]:
        if challenge.zone_id not in zone_lookup:
            continue
        r, c = zone_lookup[challenge.zone_id]
        display_name = challenge.name.lower()
        stress = _CHALLENGE_NAME_FOR_CSV.get(display_name, display_name)
        treated = challenge.resolution_status == "success"
        rows.append({"row": r, "col": c, "challenge": stress, "treated": treated})
    return rows


def write_sim_input_csvs(snapshot) -> None:
    """Write sim_input_interventions.csv and sim_input_challenges.csv from live simulator data."""
    SIM_INPUT_RESULTS_ROOT.mkdir(parents=True, exist_ok=True)

    intervention_rows = build_sim_input_interventions(snapshot)
    lines = ["row,col,action"]
    for row in intervention_rows:
        lines.append(f"{row['row']},{row['col']},{row['action']}")
    SIM_INPUT_INTERVENTIONS_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")

    challenge_rows = build_sim_input_challenges(snapshot)
    lines = ["row,col,challenge,treated"]
    for row in challenge_rows:
        treated_str = "true" if row["treated"] else "false"
        lines.append(f"{row['row']},{row['col']},{row['challenge']},{treated_str}")
    SIM_INPUT_CHALLENGES_CSV_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8")


def _run_intervention_simulation_pipeline(snapshot) -> str | None:
    """Write live simulator data to CSVs and run the intervention simulation pipeline.

    Returns an error message string on failure, or None on success.
    """
    try:
        write_sim_input_csvs(snapshot)
    except Exception as exc:
        return f"Failed to write simulator input CSVs: {exc}"

    try:
        import importlib.util
        import matplotlib
        matplotlib.use('Agg')

        # intervention_simulation.py uses relative paths (e.g. Path("./results/..."))
        # so we must ensure CWD is the repo root while it runs.
        prev_cwd = os.getcwd()
        os.chdir(REPO_ROOT)
        try:
            spec = importlib.util.spec_from_file_location(
                "intervention_simulation",
                REPO_ROOT / "apps" / "intervention_simulation.py",
            )
            mod = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(mod)
            mod.run_simulation()
        finally:
            os.chdir(prev_cwd)
    except Exception as exc:
        return f"Intervention simulation pipeline failed: {exc}"

    return None


def render_intervention_sim_view(get_snapshot: Callable[[], object] | None = None) -> None:
    st.subheader("Intervention Impact Analysis")

    error = st.session_state.get(INTERVENTION_SIM_ERROR_KEY)

    # If the pipeline hasn't produced a PNG yet (or an earlier run errored), run it now.
    if error is None and not INTERVENTION_SIM_MAP_PATH.exists():
        snapshot = get_snapshot() if get_snapshot is not None else None
        if snapshot is None:
            st.error("No simulator environment available. Please run the simulator first.")
            if st.button("Back to Simulator"):
                st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
                st.rerun()
            return
        with st.spinner("Running intervention simulation pipeline (this trains a model and may take a minute)..."):
            err = _run_intervention_simulation_pipeline(snapshot)
        if err is not None:
            st.session_state[INTERVENTION_SIM_ERROR_KEY] = err
            st.rerun()
            return

    # Handle error state
    if error is not None:
        st.error(error)
        col_retry, col_back = st.columns(2)
        with col_retry:
            if st.button("Retry"):
                st.session_state[INTERVENTION_SIM_ERROR_KEY] = None
                # Remove stale PNG so the pipeline re-runs
                if INTERVENTION_SIM_MAP_PATH.exists():
                    INTERVENTION_SIM_MAP_PATH.unlink()
                st.rerun()
        with col_back:
            if st.button("Back to Simulator"):
                st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
                st.rerun()
        return

    # Display the 3-panel before/after/delta map
    if INTERVENTION_SIM_MAP_PATH.exists():
        st.image(
            str(INTERVENTION_SIM_MAP_PATH),
            caption="Before / After / Delta yield prediction map",
            use_container_width=True,
        )

        # Display CSV results summary alongside the map
        if INTERVENTION_SIM_CSV_PATH.exists():
            try:
                results_df = pd.read_csv(INTERVENTION_SIM_CSV_PATH)
                if not results_df.empty:
                    st.subheader("Simulation Results")
                    st.dataframe(results_df, use_container_width=True)
            except Exception:
                pass
    else:
        # Pipeline ran without error but produced no PNG -- this happens when
        # both input CSVs are empty (no challenges placed and no robot
        # interventions executed yet).
        st.info(
            "No intervention data to simulate. The robot has not yet performed "
            "any treatment actions and no challenges have been placed on the field. "
            "Go back to the simulator, advance a few steps so the robot executes "
            "interventions or place challenges, then try again."
        )

    if st.button("Back to Simulator"):
        st.session_state[INTERVENTION_SIM_VIEW_KEY] = False
        st.rerun()