autofarm / apps /simulator.py
isabelku's picture
AutoFarm Space deploy
826dd96
from __future__ import annotations
import base64
import io
import json
import math
import os
import time
from dataclasses import dataclass, replace
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
from urllib.parse import quote
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
import streamlit.components.v1 as components
from autofarm.adapters.datasets import DatasetCatalog
from autofarm.contracts import (
ActionType,
ControlState,
EpisodeOutcomeStatus,
ExecutionOutcomeType,
MissionPlan,
ResolutionIntent,
SimulatorMode,
TerminalReason,
)
from autofarm.debug_profile import (
ACTIVE_DEBUG_SCENARIO,
ACTIVE_DEBUG_STRATEGY,
visible_riddle_scenarios,
visible_strategy_names,
)
from autofarm.evaluation.baselines import HybridStrategy, build_strategies
from autofarm.paths import REPO_ROOT
from autofarm.urgency import (
MISSING_YIELD_COLOR,
REAL_FIELD_CAUTION_URGENCY_THRESHOLD,
REAL_FIELD_CRITICAL_URGENCY_THRESHOLD,
)
from autofarm.planning.utility import action_type_for_field_recommendation
from intervention import render_intervention_view, render_intervention_sim_view
from intervention.simulation_view import write_sim_input_csvs
from intervention.constants import (
INTERVENTION_DATA_KEY,
INTERVENTION_PREDICTED_KEY,
INTERVENTION_SIM_ERROR_KEY,
INTERVENTION_SIM_MAP_PATH,
INTERVENTION_SIM_VIEW_KEY,
INTERVENTION_VIEW_KEY,
)
from autofarm.sim.engine import (
SimulatorEnvironment,
build_environment,
build_interactive_environment,
build_real_field_environment,
shortest_path_zone_ids,
)
from autofarm.sim.playback import (
PLAYBACK_MODE_AUTO,
PLAYBACK_MODE_MANUAL,
PLAYBACK_SPEED_1HZ,
PLAYBACK_SPEED_4HZ,
PLAYBACK_SPEED_8HZ,
default_playback_state,
playback_interval_ms,
playback_speed_options,
should_process_tick,
)
from autofarm.sim.session_persistence import (
PersistenceCoordinator,
SessionPersistenceResult,
create_session_id,
persist_interactive_session,
)
from autofarm.sim.scenarios import RIDDLE_SAFETY_MAX_STEPS, available_riddles, interactive_challenge_templates
ENV_KEY = "env"
ENV_GENERATION_KEY = "env_generation"
PERSISTENCE_GENERATION_KEY = "persistence_generation"
SESSION_ID_KEY = "simulator_session_id"
SESSION_PERSISTENCE_KEY = "simulator_session_persistence"
PERSISTENCE_COORDINATOR_KEY = "simulator_persistence_coordinator"
PERSISTENCE_WARNING_KEY = "simulator_persistence_warning"
LAST_BOARD_EVENT_KEY = "interactive_last_board_event"
SELECTED_ZONE_KEY = "selected_zone_id"
PENDING_CHALLENGE_TEMPLATE_KEY = "interactive_pending_challenge_template"
PENDING_CHALLENGE_RADIUS_KEY = "interactive_pending_challenge_radius"
PLACEMENT_FLASH_KEY = "interactive_placement_flash"
FIGURE_CACHE_KEY = "zone_grid_figure_cache"
STATIC_MAP_CACHE_KEY = "zone_grid_static_cache"
TIMELINE_WINDOW_KEY = "timeline_window_size"
SHOW_TIMELINE_BODY_KEY = "show_timeline_body"
SHOW_PLANNED_TRAJECTORY_KEY = "show_planned_trajectory"
PLAYBACK_MODE_KEY = "playback_mode"
PLAYBACK_RUNNING_KEY = "playback_running"
PLAYBACK_SPEED_KEY = "playback_speed"
PLAYBACK_LAST_TICK_KEY = "playback_last_processed_tick"
PLAYBACK_WARNING_KEY = "playback_warning"
PLAYBACK_NEXT_STEP_AT_KEY = "playback_next_step_at"
PLAYBACK_TELEMETRY_KEY = "playback_telemetry"
PLAYBACK_PENDING_STEP_KEY = "playback_pending_step_telemetry"
BOARD_COMPONENT = components.declare_component(
"challenge_board",
path=str((Path(__file__).parent / "components" / "challenge_board").resolve()),
)
AUTO_TICK_COMPONENT = components.declare_component(
"auto_tick",
path=str((Path(__file__).parent / "components" / "auto_tick").resolve()),
)
SCRIPT_EVAL_EXPORT_ROOT = REPO_ROOT / "files_script_eval"
FIELD_TILE_STATE_EXPORT_PATH = SCRIPT_EVAL_EXPORT_ROOT / "field_tile_states.json"
TILE_ACTION_LOG_EXPORT_PATH = SCRIPT_EVAL_EXPORT_ROOT / "tile_action_log.json"
POSITIVE_SIGNAL_ANOMALIES = {"weed_presence"}
SNAPSHOT_WRITE_INTERVAL = 5
TIMELINE_WINDOW_OPTIONS = [25, 50, 100, 250, 0]
PLANNED_TRAJECTORY_MOVE_STEP_BUDGET = 10
RECENT_TASK_ICON_TTL_STEPS = 5
PLAYBACK_TELEMETRY_WINDOW = 20
SERVICE_ACTIONS = {
ActionType.REMOVE_WEEDS,
ActionType.APPLY_FERTILIZER,
ActionType.APPLY_WATER,
ActionType.INSTALL_DRAINAGE,
ActionType.SUBSOIL,
}
@dataclass(frozen=True)
class RealFieldStaticMapPayload:
figure: go.Figure
zone_ids: tuple[str, ...]
xs: tuple[int, ...]
ys: tuple[int, ...]
x_values: tuple[int, ...]
y_values: tuple[int, ...]
grid_size: int
cell_marker_size: int
cell_text_size: int
marker_ring_size: int
step_marker_size: int
home_base_x: float
home_base_y: float
entry_x: float
entry_y: float
tile_world_x0: float
tile_world_x1: float
tile_world_y0: float
tile_world_y1: float
@dataclass(frozen=True)
class RealFieldDynamicMapPayload:
texts: list[str]
colors: list[str]
line_colors: list[str]
line_widths: list[int]
hover_texts: list[str]
previous_zone_id: str | None
current_zone_id: str | None
event_marker_xs: list[float]
event_marker_ys: list[float]
event_marker_texts: list[str]
event_marker_colors: list[str]
event_marker_hovers: list[str]
planned_trajectory: "PlannedTrajectoryPayload | None"
@dataclass(frozen=True)
class PlannedTrajectoryPayload:
route_zone_ids: tuple[str, ...]
starts_from_home: bool
route_xs: list[float]
route_ys: list[float]
waypoint_xs: list[float]
waypoint_ys: list[float]
task_marker_xs: list[float]
task_marker_ys: list[float]
task_marker_texts: list[str]
task_marker_sizes: list[int]
task_marker_colors: list[str]
task_marker_hovers: list[str]
@dataclass(frozen=True)
class PlannedHorizonAction:
order: int
zone_id: str
task_type: str
rationale: str
@dataclass(frozen=True)
class PlannedHorizonPayload:
route_zone_ids: tuple[str, ...]
starts_from_home: bool
actions: tuple[PlannedHorizonAction, ...]
def enum_value(value):
return getattr(value, "value", value)
def finding_bucket(anomaly_type: str) -> str:
if anomaly_type in POSITIVE_SIGNAL_ANOMALIES:
return "Positive signal findings"
return "Other findings"
def render_finding_groups(findings, *, title_when_empty: str) -> None:
if not findings:
st.write(title_when_empty)
return
grouped: dict[str, list[str]] = {}
for finding in findings:
anomaly_type = str(finding.anomaly_type)
grouped.setdefault(finding_bucket(anomaly_type), []).append(
f"`{anomaly_type}` severity={finding.severity} confidence={finding.confidence:.2f}"
)
for section_title in (
"Positive signal findings",
"Other findings",
):
rows = grouped.get(section_title)
if not rows:
continue
st.write(section_title)
for row in rows:
st.write(f"- {row}")
def event_control_intent(event):
if event.control_event is None:
return None
return event.control_event.requested_intent
def event_terminal_reason(event):
if event.guarded_context_after is None:
return None
return event.guarded_context_after.terminal_reason
def preferred_option(options: list[str], preferred: str) -> str:
if preferred in options:
return preferred
return options[0]
def set_manual_primary_axis(axis: str) -> None:
st.session_state["interactive_manual_primary_axis"] = axis
def arm_pending_challenge(template_id: str, radius: int) -> None:
st.session_state[PENDING_CHALLENGE_TEMPLATE_KEY] = template_id
st.session_state[PENDING_CHALLENGE_RADIUS_KEY] = int(radius)
def clear_pending_challenge() -> None:
st.session_state.pop(PENDING_CHALLENGE_TEMPLATE_KEY, None)
st.session_state.pop(PENDING_CHALLENGE_RADIUS_KEY, None)
def pending_challenge_request() -> str | None:
template_id = st.session_state.get(PENDING_CHALLENGE_TEMPLATE_KEY)
if not isinstance(template_id, str) or not template_id:
return None
return template_id
def pending_challenge_radius() -> int:
radius = st.session_state.get(PENDING_CHALLENGE_RADIUS_KEY, 1)
try:
return max(1, min(3, int(radius)))
except (TypeError, ValueError):
return 1
def set_placement_flash_message(success: bool, message: str) -> None:
st.session_state[PLACEMENT_FLASH_KEY] = {"success": success, "message": message}
def pop_placement_flash_message() -> tuple[bool, str] | None:
payload = st.session_state.pop(PLACEMENT_FLASH_KEY, None)
if not isinstance(payload, dict):
return None
return bool(payload.get("success")), str(payload.get("message") or "")
def bump_env_generation() -> None:
st.session_state[ENV_GENERATION_KEY] = int(st.session_state.get(ENV_GENERATION_KEY, 0)) + 1
def bump_persistence_generation() -> None:
st.session_state[PERSISTENCE_GENERATION_KEY] = int(st.session_state.get(PERSISTENCE_GENERATION_KEY, 0)) + 1
def is_persistent_world_mode_value(mode_name: str) -> bool:
return mode_name in {SimulatorMode.INTERACTIVE.value, SimulatorMode.REAL_FIELD.value}
def is_persistent_world_mode(mode: SimulatorMode) -> bool:
return mode in {SimulatorMode.INTERACTIVE, SimulatorMode.REAL_FIELD}
def ensure_playback_state_defaults() -> None:
defaults = default_playback_state()
st.session_state.setdefault(PLAYBACK_MODE_KEY, defaults["mode"])
st.session_state.setdefault(PLAYBACK_RUNNING_KEY, defaults["running"])
st.session_state.setdefault(PLAYBACK_SPEED_KEY, defaults["speed"])
st.session_state.setdefault(PLAYBACK_LAST_TICK_KEY, defaults["last_processed_tick"])
st.session_state.setdefault(PLAYBACK_WARNING_KEY, None)
st.session_state.setdefault(PLAYBACK_NEXT_STEP_AT_KEY, None)
st.session_state.setdefault(PLAYBACK_TELEMETRY_KEY, default_playback_telemetry())
st.session_state.setdefault(PLAYBACK_PENDING_STEP_KEY, None)
def default_playback_telemetry() -> dict[str, object]:
return {
"speed": PLAYBACK_SPEED_1HZ,
"started_at": None,
"step_count": 0,
"last_step_completed_at": None,
"recent_step_intervals_ms": [],
"last_engine_step_ms": None,
"recent_engine_step_ms": [],
"last_total_app_step_ms": None,
"recent_total_app_step_ms": [],
"last_render_ms": None,
"recent_render_ms": [],
"last_persistence_ms": None,
"recent_persistence_ms": [],
"last_export_ms": None,
"recent_export_ms": [],
"last_strategy_plan_ms": None,
"recent_strategy_plan_ms": [],
"last_decision_trace_ms": None,
"recent_decision_trace_ms": [],
}
def append_recent_metric(series: object, value: float) -> list[float]:
values = [float(item) for item in series] if isinstance(series, list) else []
values.append(round(float(value), 3))
return values[-PLAYBACK_TELEMETRY_WINDOW:]
def reset_playback_telemetry(speed: str) -> None:
telemetry = default_playback_telemetry()
telemetry["speed"] = speed
st.session_state[PLAYBACK_TELEMETRY_KEY] = telemetry
st.session_state[PLAYBACK_PENDING_STEP_KEY] = None
def maybe_reset_playback_telemetry_for_speed_change(speed: str) -> None:
telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY)
if not isinstance(telemetry, dict) or telemetry.get("speed") == speed:
return
reset_playback_telemetry(speed)
if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)):
st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + (
playback_interval_ms(speed) / 1000.0
)
def begin_playback_step_telemetry(
*,
speed: str,
engine_step_ms: float,
step_started_at: float,
step_perf: dict[str, float] | None,
) -> None:
st.session_state[PLAYBACK_PENDING_STEP_KEY] = {
"speed": speed,
"engine_step_ms": round(engine_step_ms, 3),
"step_started_at": step_started_at,
"step_perf": dict(step_perf or {}),
}
def finalize_playback_step_telemetry(
*,
speed: str,
completed_at: float,
total_app_step_ms: float,
render_ms: float,
persistence_ms: float,
export_ms: float,
) -> None:
pending = st.session_state.get(PLAYBACK_PENDING_STEP_KEY)
if not isinstance(pending, dict):
return
telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY)
if not isinstance(telemetry, dict):
telemetry = default_playback_telemetry()
last_step_completed_at = telemetry.get("last_step_completed_at")
recent_intervals = telemetry.get("recent_step_intervals_ms")
if isinstance(last_step_completed_at, (int, float)):
recent_intervals = append_recent_metric(
recent_intervals,
(completed_at - float(last_step_completed_at)) * 1000.0,
)
step_perf = pending.get("step_perf")
strategy_plan_ms = None
decision_trace_ms = None
if isinstance(step_perf, dict):
if isinstance(step_perf.get("strategy_plan_ms"), (int, float)):
strategy_plan_ms = float(step_perf["strategy_plan_ms"])
if isinstance(step_perf.get("decision_trace_ms"), (int, float)):
decision_trace_ms = float(step_perf["decision_trace_ms"])
telemetry.update(
{
"speed": speed,
"started_at": telemetry.get("started_at") or completed_at,
"step_count": int(telemetry.get("step_count", 0)) + 1,
"last_step_completed_at": completed_at,
"last_engine_step_ms": round(float(pending.get("engine_step_ms", 0.0)), 3),
"recent_engine_step_ms": append_recent_metric(
telemetry.get("recent_engine_step_ms"),
float(pending.get("engine_step_ms", 0.0)),
),
"last_total_app_step_ms": round(total_app_step_ms, 3),
"recent_total_app_step_ms": append_recent_metric(
telemetry.get("recent_total_app_step_ms"),
total_app_step_ms,
),
"last_render_ms": round(render_ms, 3),
"recent_render_ms": append_recent_metric(
telemetry.get("recent_render_ms"),
render_ms,
),
"last_persistence_ms": round(persistence_ms, 3),
"recent_persistence_ms": append_recent_metric(
telemetry.get("recent_persistence_ms"),
persistence_ms,
),
"last_export_ms": round(export_ms, 3),
"recent_export_ms": append_recent_metric(
telemetry.get("recent_export_ms"),
export_ms,
),
"recent_step_intervals_ms": recent_intervals if isinstance(recent_intervals, list) else [],
}
)
if strategy_plan_ms is not None:
telemetry["last_strategy_plan_ms"] = round(strategy_plan_ms, 3)
telemetry["recent_strategy_plan_ms"] = append_recent_metric(
telemetry.get("recent_strategy_plan_ms"),
strategy_plan_ms,
)
if decision_trace_ms is not None:
telemetry["last_decision_trace_ms"] = round(decision_trace_ms, 3)
telemetry["recent_decision_trace_ms"] = append_recent_metric(
telemetry.get("recent_decision_trace_ms"),
decision_trace_ms,
)
st.session_state[PLAYBACK_TELEMETRY_KEY] = telemetry
st.session_state[PLAYBACK_PENDING_STEP_KEY] = None
def average_metric(series: object) -> float | None:
if not isinstance(series, list) or not series:
return None
return round(sum(float(item) for item in series) / len(series), 3)
def p95_metric(series: object) -> float | None:
if not isinstance(series, list) or not series:
return None
values = sorted(float(item) for item in series)
index = min(len(values) - 1, math.ceil(0.95 * len(values)) - 1)
return round(values[index], 3)
def max_metric(series: object) -> float | None:
if not isinstance(series, list) or not series:
return None
return round(max(float(item) for item in series), 3)
def metric_summary(*, telemetry: dict[str, object], name: str) -> dict[str, float | None]:
last_key = f"last_{name}"
recent_key = f"recent_{name}"
last_value = telemetry.get(last_key)
return {
"last": round(float(last_value), 3) if isinstance(last_value, (int, float)) else None,
"avg": average_metric(telemetry.get(recent_key)),
"p95": p95_metric(telemetry.get(recent_key)),
"max": max_metric(telemetry.get(recent_key)),
}
def playback_telemetry_snapshot() -> dict[str, object]:
telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY)
if not isinstance(telemetry, dict):
return {
"step_count": 0,
"achieved_hz": None,
"engine_step_ms": {"last": None, "avg": None, "p95": None, "max": None},
"total_app_step_ms": {"last": None, "avg": None, "p95": None, "max": None},
"render_ms": {"last": None, "avg": None, "p95": None, "max": None},
"persistence_ms": {"last": None, "avg": None, "p95": None, "max": None},
"export_ms": {"last": None, "avg": None, "p95": None, "max": None},
"strategy_plan_ms": {"last": None, "avg": None, "p95": None, "max": None},
"decision_trace_ms": {"last": None, "avg": None, "p95": None, "max": None},
}
avg_interval_ms = average_metric(telemetry.get("recent_step_intervals_ms"))
achieved_hz = None if avg_interval_ms in {None, 0.0} else round(1000.0 / avg_interval_ms, 3)
return {
"step_count": int(telemetry.get("step_count", 0)),
"achieved_hz": achieved_hz,
"engine_step_ms": metric_summary(telemetry=telemetry, name="engine_step_ms"),
"total_app_step_ms": metric_summary(telemetry=telemetry, name="total_app_step_ms"),
"render_ms": metric_summary(telemetry=telemetry, name="render_ms"),
"persistence_ms": metric_summary(telemetry=telemetry, name="persistence_ms"),
"export_ms": metric_summary(telemetry=telemetry, name="export_ms"),
"strategy_plan_ms": metric_summary(telemetry=telemetry, name="strategy_plan_ms"),
"decision_trace_ms": metric_summary(telemetry=telemetry, name="decision_trace_ms"),
}
def render_playback_telemetry(*, speed: str, running: bool, mode: SimulatorMode) -> None:
target_hz = round(1000.0 / playback_interval_ms(speed), 2)
telemetry = playback_telemetry_snapshot()
achieved_hz = telemetry.get("achieved_hz")
engine_step_ms = telemetry["engine_step_ms"]
total_app_step_ms = telemetry["total_app_step_ms"]
if running:
if achieved_hz is None:
st.caption(f"Target {target_hz:.2f} Hz. Telemetry will populate after the next auto-step.")
else:
parts = [
f"Target {target_hz:.2f} Hz",
f"Achieved {float(achieved_hz):.2f} Hz",
(
f"Engine {float(engine_step_ms['last'] or 0.0):.1f} / "
f"{float(engine_step_ms['avg'] or 0.0):.1f} / "
f"{float(engine_step_ms['p95'] or 0.0):.1f} ms"
),
]
if mode == SimulatorMode.REAL_FIELD:
parts.append(
f"App {float(total_app_step_ms['last'] or 0.0):.1f} / "
f"{float(total_app_step_ms['avg'] or 0.0):.1f} / "
f"{float(total_app_step_ms['p95'] or 0.0):.1f} ms"
)
parts.append(
f"Render {float(telemetry['render_ms']['avg'] or 0.0):.1f} ms"
)
parts.append(
f"Persist {float(telemetry['persistence_ms']['avg'] or 0.0):.1f} ms"
)
parts.append(
f"Export {float(telemetry['export_ms']['avg'] or 0.0):.1f} ms"
)
st.caption(" | ".join(parts))
return
if int(telemetry.get("step_count", 0)) > 0:
achieved_text = "n/a" if achieved_hz is None else f"{float(achieved_hz):.2f} Hz"
parts = [
f"Last run {achieved_text} achieved",
f"Engine {float(engine_step_ms['last'] or 0.0):.1f} / {float(engine_step_ms['avg'] or 0.0):.1f} ms",
]
if mode == SimulatorMode.REAL_FIELD:
parts.append(
f"App {float(total_app_step_ms['last'] or 0.0):.1f} / {float(total_app_step_ms['avg'] or 0.0):.1f} ms"
)
st.caption(" | ".join(parts))
def build_runtime_performance_metrics(env: SimulatorEnvironment) -> dict[str, object]:
metrics = env.metrics()
if env.mode != SimulatorMode.REAL_FIELD:
return metrics
metrics = dict(metrics)
metrics["runtime_performance"] = {
"playback": playback_telemetry_snapshot(),
"last_step_engine_phases_ms": env.last_step_performance(),
}
return metrics
def pause_playback(*, clear_last_tick: bool) -> None:
st.session_state[PLAYBACK_RUNNING_KEY] = False
if clear_last_tick:
st.session_state[PLAYBACK_LAST_TICK_KEY] = None
st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = None
def start_playback() -> None:
speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))
st.session_state[PLAYBACK_RUNNING_KEY] = True
st.session_state[PLAYBACK_WARNING_KEY] = None
reset_playback_telemetry(speed)
st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + (
playback_interval_ms(speed) / 1000.0
)
def execute_simulator_step(env: SimulatorEnvironment, strategy) -> tuple[bool, float, float, dict[str, float]]:
before_step_index = int(env.step_index)
before_event_count = len(env.event_history)
started_at = time.perf_counter()
env.step(strategy) # type: ignore[arg-type]
duration_ms = (time.perf_counter() - started_at) * 1000.0
after_step_index = int(env.step_index)
after_event_count = len(env.event_history)
return (
after_step_index > before_step_index and after_event_count > before_event_count,
started_at,
duration_ms,
env.last_step_performance(),
)
def maybe_run_fragment_auto_step(env: SimulatorEnvironment, strategy) -> bool:
if str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) != PLAYBACK_MODE_AUTO:
return False
if not bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) or env.done:
return False
interval_seconds = playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))) / 1000.0
next_step_at = st.session_state.get(PLAYBACK_NEXT_STEP_AT_KEY)
now = time.monotonic()
if next_step_at is None:
st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = now + interval_seconds
return False
if now < float(next_step_at):
return False
advanced, step_started_at, step_duration_ms, step_perf = execute_simulator_step(env, strategy)
if advanced:
begin_playback_step_telemetry(
speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)),
engine_step_ms=step_duration_ms,
step_started_at=step_started_at,
step_perf=step_perf,
)
st.session_state[PLAYBACK_WARNING_KEY] = None
st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + interval_seconds
else:
pause_playback(clear_last_tick=True)
st.session_state[PLAYBACK_WARNING_KEY] = (
"Playback paused because the last auto-step did not advance the simulator state."
)
return True
def render_auto_tick_component(*, enabled: bool, interval_ms: int, session_token: str):
return AUTO_TICK_COMPONENT(
enabled=enabled,
interval_ms=interval_ms,
session_token=session_token,
key="simulator_auto_tick",
)
def fragment_runner(run_every_seconds: float):
if hasattr(st, "fragment"):
return st.fragment(run_every=run_every_seconds)
if hasattr(st, "experimental_fragment"):
return st.experimental_fragment(run_every=run_every_seconds)
return None
def field_extent(zone_states) -> tuple[int, int]:
return (
max((zone.row or 0) for zone in zone_states),
max((zone.col or 0) for zone in zone_states),
)
def is_rectangular_field(zone_states) -> bool:
rows, cols = field_extent(zone_states)
return len(zone_states) == rows * cols
def field_layout_metric(env: SimulatorEnvironment) -> tuple[str, str]:
rows, cols = field_extent(env.zone_states)
if env.mode == SimulatorMode.REAL_FIELD:
return "Field Extent", f"{rows} x {cols}"
return "Field Size", f"{rows} x {cols}"
def _prewarm_simulator_resources() -> None:
"""Warm st.cache_resource caches so the simulator page loads instantly."""
try:
get_strategies()
except Exception:
pass
try:
if ENV_KEY not in st.session_state:
catalog = DatasetCatalog(ground_weeds=())
st.session_state[ENV_KEY] = build_real_field_environment(
catalog=catalog,
)
except Exception:
pass
def render_sidebar_heading(label: str, *, size_rem: float) -> None:
st.markdown(
(
f"<div style='font-size:{size_rem:.2f}rem;font-weight:700;"
"line-height:1.2;color:#0f172a;margin:0 0 0.35rem 0;'>"
f"{label}</div>"
),
unsafe_allow_html=True,
)
def main() -> None:
st.set_page_config(page_title="AutoFarm Simulator", layout="wide")
st.markdown(
"<style>MainMenu {visibility: hidden;} header {visibility: hidden;}"
" .block-container {padding-top: 1rem;}</style>",
unsafe_allow_html=True,
)
st.title("AutoFarm Simulator")
st.session_state.setdefault(INTERVENTION_VIEW_KEY, True)
st.session_state.setdefault(INTERVENTION_PREDICTED_KEY, False)
st.session_state.setdefault(INTERVENTION_SIM_VIEW_KEY, False)
st.session_state.setdefault(INTERVENTION_SIM_ERROR_KEY, None)
if st.session_state[INTERVENTION_VIEW_KEY]:
render_intervention_view(on_continue=_prewarm_simulator_resources)
return
if st.session_state[INTERVENTION_SIM_VIEW_KEY]:
def _get_snapshot():
env = st.session_state.get(ENV_KEY)
return env.snapshot() if env is not None else None
render_intervention_sim_view(get_snapshot=_get_snapshot)
return
ensure_playback_state_defaults()
catalog = DatasetCatalog(ground_weeds=())
strategies = get_strategies()
riddle_map = available_riddles()
challenge_palette = interactive_challenge_templates()
requested_mode = os.getenv("AUTOFARM_SIM_MODE", SimulatorMode.REAL_FIELD.value)
default_strategy = os.getenv("AUTOFARM_SIM_STRATEGY", ACTIVE_DEBUG_STRATEGY)
default_scenario = os.getenv("AUTOFARM_SIM_SCENARIO", ACTIVE_DEBUG_SCENARIO)
mode_name = SimulatorMode.RIDDLE.value if requested_mode == SimulatorMode.RIDDLE.value else SimulatorMode.REAL_FIELD.value
show_parked_lanes = False
playback_toggle_clicked = False
with st.sidebar:
render_sidebar_heading("Simulator Controls", size_rem=1.32)
if mode_name == SimulatorMode.RIDDLE.value:
grid_size = 5
else:
grid_size = 0
scenario_name = None
strategy_name = ACTIVE_DEBUG_STRATEGY
if mode_name == SimulatorMode.RIDDLE.value:
visible_scenarios = visible_riddle_scenarios(list(riddle_map), show_parked=show_parked_lanes)
scenario_choice = preferred_option(visible_scenarios, default_scenario)
scenario_name = st.selectbox(
"Riddle",
visible_scenarios,
index=visible_scenarios.index(scenario_choice),
)
visible_strategies = visible_strategy_names(list(strategies), show_parked=show_parked_lanes)
if len(visible_strategies) == 1:
strategy_name = visible_strategies[0]
else:
preferred_strategy = preferred_option(visible_strategies, default_strategy)
strategy_name = st.selectbox(
"Strategy",
visible_strategies,
index=visible_strategies.index(preferred_strategy),
)
st.divider()
render_sidebar_heading("Playback", size_rem=0.98)
playback_mode = st.radio(
"Playback Mode",
[PLAYBACK_MODE_MANUAL, PLAYBACK_MODE_AUTO],
key=PLAYBACK_MODE_KEY,
horizontal=True,
)
playback_speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))
if playback_mode == PLAYBACK_MODE_AUTO:
available_playback_speeds = playback_speed_options(enable_8hz=True)
playback_speed = st.radio(
"Auto Speed",
available_playback_speeds,
key=PLAYBACK_SPEED_KEY,
horizontal=True,
)
maybe_reset_playback_telemetry_for_speed_change(playback_speed)
playback_running = bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False))
toggle_label = "Pause Playback" if playback_running else "Start Playback"
playback_toggle_clicked = st.button(toggle_label, width="stretch")
render_playback_telemetry(
speed=playback_speed,
running=playback_running,
mode=SimulatorMode(mode_name),
)
else:
playback_speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))
if playback_mode == PLAYBACK_MODE_MANUAL and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)):
pause_playback(clear_last_tick=False)
if playback_toggle_clicked:
if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)):
pause_playback(clear_last_tick=False)
else:
start_playback()
auto_running = (
str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) == PLAYBACK_MODE_AUTO
and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False))
)
render_sidebar_heading("Visualization", size_rem=0.98)
show_planned_trajectory = st.toggle(
"Show planned trajectory",
value=bool(st.session_state.get(SHOW_PLANNED_TRAJECTORY_KEY, True)),
key=SHOW_PLANNED_TRAJECTORY_KEY,
)
st.divider()
reset_label = "Reset Riddle" if mode_name == SimulatorMode.RIDDLE.value else "Reset World"
step_label = "Advance One Step" if mode_name == SimulatorMode.RIDDLE.value else "Advance One Step"
reset = st.button(reset_label, width="stretch")
if mode_name == SimulatorMode.RIDDLE.value:
step_col, run_col = st.columns(2)
with step_col:
step_once = st.button(step_label, width="stretch", disabled=auto_running)
with run_col:
run_to_outcome = st.button("Run To Outcome", width="stretch", disabled=auto_running)
else:
step_once = st.button(step_label, width="stretch", disabled=auto_running)
run_to_outcome = False
st.divider()
render_sidebar_heading("Analysis", size_rem=0.98)
investigate_clicked = st.button("Intervention Impact", type="primary", width="stretch")
if investigate_clicked:
st.session_state[INTERVENTION_SIM_VIEW_KEY] = True
st.session_state[INTERVENTION_SIM_ERROR_KEY] = None
# Remove stale PNG so the pipeline re-runs with current simulator data
if INTERVENTION_SIM_MAP_PATH.exists():
INTERVENTION_SIM_MAP_PATH.unlink()
st.rerun()
if reset:
pause_playback(clear_last_tick=True)
st.session_state[PLAYBACK_WARNING_KEY] = None
reset_simulator_run_state()
session_id = get_or_create_session_id()
env = get_or_reset_environment(
mode_name=mode_name,
scenario_name=scenario_name,
reset=reset,
catalog=catalog,
grid_size=grid_size,
)
strategy = strategies[strategy_name]
persistence_duration_ms = 0.0
export_duration_ms = 0.0
render_duration_ms = 0.0
auto_tick_payload = None
run_every_seconds = playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))) / 1000.0
playback_fragment = fragment_runner(run_every_seconds)
if playback_fragment is not None:
@playback_fragment
def auto_step_fragment() -> None:
st.empty()
if maybe_run_fragment_auto_step(env, strategy):
bump_env_generation()
st.rerun()
auto_step_fragment()
else:
auto_tick_payload = render_auto_tick_component(
enabled=(
str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) == PLAYBACK_MODE_AUTO
and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False))
and not env.done
),
interval_ms=playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))),
session_token=(
f"{session_id}:"
f"{int(st.session_state.get(ENV_GENERATION_KEY, 0))}:"
f"{str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))}"
),
)
step_executed = reset or playback_toggle_clicked
if not env.current_assessments:
env.current_assessments, env.current_plan = strategy.plan(env.planning_observation()) # type: ignore[arg-type]
env.invalidate_cached_views()
placement_message = pop_placement_flash_message()
if step_once and not env.done:
st.session_state[PLAYBACK_WARNING_KEY] = None
step_advanced, _, _, _ = execute_simulator_step(env, strategy)
if step_advanced:
bump_env_generation()
step_executed = True
if run_to_outcome and env.mode == SimulatorMode.RIDDLE and not env.done:
stepped = False
for _ in range(max(1, RIDDLE_SAFETY_MAX_STEPS + 2)):
step_advanced, _, _, _ = execute_simulator_step(env, strategy)
if not step_advanced:
break
stepped = True
if env.done:
break
if stepped:
bump_env_generation()
step_executed = True
if playback_fragment is None:
should_tick, tick_id = should_process_tick(
mode=str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)),
running=bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)),
env_done=env.done,
tick_payload=auto_tick_payload,
last_processed_tick=st.session_state.get(PLAYBACK_LAST_TICK_KEY),
)
if should_tick and not step_executed:
step_advanced, step_started_at, step_duration_ms, step_perf = execute_simulator_step(env, strategy)
if step_advanced:
begin_playback_step_telemetry(
speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)),
engine_step_ms=step_duration_ms,
step_started_at=step_started_at,
step_perf=step_perf,
)
st.session_state[PLAYBACK_LAST_TICK_KEY] = tick_id
st.session_state[PLAYBACK_WARNING_KEY] = None
bump_env_generation()
step_executed = True
else:
pause_playback(clear_last_tick=True)
st.session_state[PLAYBACK_WARNING_KEY] = (
"Playback paused because the last auto-step did not advance the simulator state."
)
if env.done and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)):
pause_playback(clear_last_tick=True)
snapshot = env.snapshot()
persistence_started_at = time.perf_counter()
persistence = ensure_session_persistence(
session_id=session_id,
scenario_name=scenario_name or env.name,
snapshot=snapshot,
metrics_factory=lambda: build_runtime_performance_metrics(env),
)
persistence_duration_ms = (time.perf_counter() - persistence_started_at) * 1000.0
export_started_at = time.perf_counter()
write_script_eval_exports(env, snapshot, session_id=session_id)
export_duration_ms = (time.perf_counter() - export_started_at) * 1000.0
current_task = snapshot.mission_plan.ordered_tasks[0] if snapshot.mission_plan.ordered_tasks else None
# Use fixed-slot placeholders so these optional banners never shift the
# cursor position of elements below (which would remount the chart).
placement_slot = st.empty()
if placement_message is not None:
if placement_message[0]:
placement_slot.success(placement_message[1])
else:
placement_slot.warning(placement_message[1])
playback_warning = st.session_state.get(PLAYBACK_WARNING_KEY)
playback_slot = st.empty()
if playback_warning:
playback_slot.warning(str(playback_warning))
persistence_warning = st.session_state.get(PERSISTENCE_WARNING_KEY)
persistence_slot = st.empty()
if persistence_warning:
persistence_slot.warning(str(persistence_warning))
if env.mode == SimulatorMode.RIDDLE and scenario_name is not None:
render_primary_objective(riddle_map[scenario_name])
render_started_at = time.perf_counter()
render_outcome_banner(snapshot, env)
selected_event = snapshot.event_history[-1] if snapshot.event_history else None
if env.mode == SimulatorMode.RIDDLE:
render_riddle_layout(
env,
snapshot,
riddle_map[scenario_name].target_zone_id,
False,
riddle_map[scenario_name],
selected_event,
)
else:
render_interactive_layout(env, snapshot, selected_event, strategy, challenge_palette)
st.subheader("Plan & Steps")
render_plan_and_timeline(snapshot)
render_duration_ms = (time.perf_counter() - render_started_at) * 1000.0
pending_auto_step = st.session_state.get(PLAYBACK_PENDING_STEP_KEY)
if isinstance(pending_auto_step, dict):
total_app_step_ms = (time.perf_counter() - float(pending_auto_step.get("step_started_at", 0.0))) * 1000.0
finalize_playback_step_telemetry(
speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)),
completed_at=time.perf_counter(),
total_app_step_ms=total_app_step_ms,
render_ms=render_duration_ms,
persistence_ms=persistence_duration_ms,
export_ms=export_duration_ms,
)
@st.cache_resource(show_spinner=False)
def get_strategies():
return build_strategies()
def get_or_reset_environment(
*,
mode_name: str,
scenario_name: str | None,
reset: bool,
catalog,
grid_size: int,
) -> SimulatorEnvironment:
active_mode = st.session_state.get("mode_name")
active_scenario = st.session_state.get("scenario_name")
active_grid_size = st.session_state.get("grid_size")
if (
reset
or ENV_KEY not in st.session_state
or active_mode != mode_name
or active_scenario != scenario_name
or active_grid_size != grid_size
):
pause_playback(clear_last_tick=True)
st.session_state[PLAYBACK_WARNING_KEY] = None
flush_persistence_coordinator()
if mode_name == SimulatorMode.INTERACTIVE.value:
st.session_state[ENV_KEY] = build_interactive_environment(
catalog=catalog,
grid_size=grid_size,
)
elif mode_name == SimulatorMode.REAL_FIELD.value:
st.session_state[ENV_KEY] = build_real_field_environment(
catalog=catalog,
)
else:
st.session_state[ENV_KEY] = build_environment(
scenario_name or list(available_riddles())[0],
catalog=catalog,
grid_size=grid_size,
)
bump_env_generation()
bump_persistence_generation()
st.session_state["mode_name"] = mode_name
st.session_state["scenario_name"] = scenario_name
st.session_state["grid_size"] = grid_size
st.session_state[LAST_BOARD_EVENT_KEY] = None
clear_pending_challenge()
st.session_state.pop(PLACEMENT_FLASH_KEY, None)
return st.session_state[ENV_KEY]
def get_or_create_session_id() -> str:
if SESSION_ID_KEY not in st.session_state:
st.session_state[SESSION_ID_KEY] = create_session_id()
return st.session_state[SESSION_ID_KEY]
def reset_simulator_run_state() -> None:
coordinator = st.session_state.pop(PERSISTENCE_COORDINATOR_KEY, None)
if isinstance(coordinator, PersistenceCoordinator):
coordinator.close()
st.session_state[SESSION_ID_KEY] = create_session_id()
st.session_state[ENV_GENERATION_KEY] = 0
st.session_state[PERSISTENCE_GENERATION_KEY] = 0
st.session_state.pop(SESSION_PERSISTENCE_KEY, None)
st.session_state.pop(SELECTED_ZONE_KEY, None)
st.session_state.pop(LAST_BOARD_EVENT_KEY, None)
clear_pending_challenge()
st.session_state.pop(PLACEMENT_FLASH_KEY, None)
st.session_state[PERSISTENCE_WARNING_KEY] = None
def get_or_create_persistence_coordinator() -> PersistenceCoordinator:
coordinator = st.session_state.get(PERSISTENCE_COORDINATOR_KEY)
if isinstance(coordinator, PersistenceCoordinator):
return coordinator
coordinator = PersistenceCoordinator()
st.session_state[PERSISTENCE_COORDINATOR_KEY] = coordinator
return coordinator
def flush_persistence_coordinator() -> int:
coordinator = st.session_state.get(PERSISTENCE_COORDINATOR_KEY)
if not isinstance(coordinator, PersistenceCoordinator):
return 0
snapshot_event_count = coordinator.flush()
failure_message = coordinator.failure_message()
if failure_message is not None:
st.session_state[PERSISTENCE_WARNING_KEY] = (
"Background snapshot persistence was disabled after a write failure. "
f"Falling back to synchronous snapshot writes. Details: {failure_message}"
)
return snapshot_event_count
def ensure_session_persistence(
*,
session_id: str,
scenario_name: str,
snapshot,
metrics_factory: Callable[[], dict[str, object]] | None = None,
) -> SessionPersistenceResult:
persisted = st.session_state.get(SESSION_PERSISTENCE_KEY)
generation = int(st.session_state.get(PERSISTENCE_GENERATION_KEY, 0))
event_count = len(snapshot.event_history)
coordinator = (
get_or_create_persistence_coordinator()
if snapshot.observation.mode == SimulatorMode.REAL_FIELD
else None
)
latest_snapshot_event_count = max(
persisted.snapshot_event_count if persisted is not None else 0,
coordinator.latest_snapshot_event_count() if coordinator is not None else 0,
)
if persisted is not None and latest_snapshot_event_count != persisted.snapshot_event_count:
persisted = replace(persisted, snapshot_event_count=latest_snapshot_event_count)
st.session_state[SESSION_PERSISTENCE_KEY] = persisted
if (
persisted is not None
and persisted.generation == generation
and persisted.event_count == event_count
):
return persisted
should_write_snapshot = (
persisted is None
or persisted.generation != generation
or snapshot.done
or event_count <= 2
or (event_count - latest_snapshot_event_count) >= SNAPSHOT_WRITE_INTERVAL
)
write_snapshot_sync = should_write_snapshot and (
coordinator is None
or snapshot.done
or coordinator.failure_message() is not None
)
if write_snapshot_sync and coordinator is not None:
latest_snapshot_event_count = max(latest_snapshot_event_count, flush_persistence_coordinator())
metrics = metrics_factory() if should_write_snapshot and metrics_factory is not None else None
result = persist_interactive_session(
session_id=session_id,
generation=generation,
scenario_name=scenario_name,
snapshot=snapshot,
metrics=metrics,
last_logged_generation=persisted.generation if persisted is not None else -1,
last_logged_event_count=persisted.event_count if persisted is not None else 0,
write_snapshot=write_snapshot_sync,
last_snapshot_event_count=latest_snapshot_event_count,
)
if should_write_snapshot and not write_snapshot_sync and coordinator is not None:
queued = coordinator.enqueue_snapshot(
session_id=session_id,
generation=generation,
scenario_name=scenario_name,
snapshot=snapshot,
metrics=metrics or {},
last_snapshot_event_count=latest_snapshot_event_count,
)
if not queued:
st.session_state[PERSISTENCE_WARNING_KEY] = (
"Background snapshot persistence was unavailable for this session. "
"Falling back to synchronous snapshot writes."
)
result = persist_interactive_session(
session_id=session_id,
generation=generation,
scenario_name=scenario_name,
snapshot=snapshot,
metrics=metrics,
last_logged_generation=result.generation,
last_logged_event_count=result.event_count,
write_snapshot=True,
last_snapshot_event_count=result.snapshot_event_count,
)
st.session_state[SESSION_PERSISTENCE_KEY] = result
return result
def render_session_state_panel(persistence: SessionPersistenceResult) -> None:
st.divider()
st.subheader("Session State")
st.caption("Interactive session persistence for external inspection.")
st.write(
{
"session_id": persistence.session_id,
"generation": persistence.generation,
"persisted_at": persistence.persisted_at,
}
)
st.code(
"\n".join(
[
f"snapshot: {persistence.snapshot_path}",
f"events: {persistence.events_path}",
f"pointer: {persistence.pointer_path}",
]
),
language="text",
)
def export_timestamp() -> str:
return datetime.now(timezone.utc).isoformat()
def challenge_records_for_step(snapshot, *, zone_id: str, step_index: int) -> list[dict[str, object]]:
records: list[dict[str, object]] = []
for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]:
if challenge.zone_id != zone_id:
continue
if challenge.placed_step_index > step_index:
continue
if challenge.resolved_step_index is not None and step_index > challenge.resolved_step_index:
continue
records.append(
{
"instance_id": challenge.instance_id,
"template_id": challenge.template_id,
"name": challenge.name,
"description": challenge.description,
"resolution_status": challenge.resolution_status,
"resolution_message": challenge.resolution_message,
"placed_step_index": challenge.placed_step_index,
"resolved_step_index": challenge.resolved_step_index,
}
)
return records
def challenge_state_by_zone(snapshot) -> dict[str, dict[str, list[dict[str, object]]]]:
by_zone: dict[str, dict[str, list[dict[str, object]]]] = {}
for status_name, challenges in (
("active", snapshot.active_challenges),
("resolved", snapshot.resolved_challenges),
):
for challenge in challenges:
zone_bucket = by_zone.setdefault(challenge.zone_id, {"active": [], "resolved": []})
zone_bucket[status_name].append(
{
"instance_id": challenge.instance_id,
"template_id": challenge.template_id,
"name": challenge.name,
"description": challenge.description,
"resolution_status": challenge.resolution_status,
"resolution_message": challenge.resolution_message,
"placed_step_index": challenge.placed_step_index,
"resolved_step_index": challenge.resolved_step_index,
}
)
return by_zone
def build_field_tile_state_export(env: SimulatorEnvironment, snapshot, *, session_id: str) -> dict[str, object]:
challenge_state = challenge_state_by_zone(snapshot)
tile_records: list[dict[str, object]] = []
for zone in snapshot.observation.zone_states:
baseline_zone = env.baseline_zone_states.get(zone.zone_id)
evidence_state = snapshot.observation.zone_evidence_state_by_zone.get(zone.zone_id)
latest_event = env.latest_event_for_zone(zone.zone_id)
zone_challenges = challenge_state.get(zone.zone_id, {"active": [], "resolved": []})
tile_records.append(
{
"zone_id": zone.zone_id,
"row": zone.row,
"col": zone.col,
"initial_state": baseline_zone.model_dump(mode="json") if baseline_zone is not None else None,
"current_state": zone.model_dump(mode="json"),
"control_state": evidence_state.control_state if evidence_state is not None else None,
"inspection_count": (
evidence_state.guarded_context.inspection_count if evidence_state is not None else 0
),
"revisit_count": (
evidence_state.guarded_context.revisit_count if evidence_state is not None else 0
),
"active_challenges": zone_challenges["active"],
"resolved_challenges": zone_challenges["resolved"],
"active_challenge_summaries": [str(item["name"]) for item in zone_challenges["active"]],
"latest_event": latest_event.model_dump(mode="json") if latest_event is not None else None,
}
)
return {
"session_id": session_id,
"exported_at": export_timestamp(),
"mode": snapshot.observation.mode,
"step_index": snapshot.observation.step_index,
"event_count": len(snapshot.event_history),
"tiles": tile_records,
}
def build_tile_action_log_export(snapshot, *, session_id: str) -> dict[str, object]:
tile_actions: list[dict[str, object]] = []
field_action_names = {
ActionType.INSPECT.value,
ActionType.REVISIT.value,
ActionType.DEFER.value,
ActionType.REMOVE_WEEDS.value,
ActionType.MEASURE_SOIL.value,
ActionType.APPLY_FERTILIZER.value,
ActionType.APPLY_WATER.value,
ActionType.INSTALL_DRAINAGE.value,
ActionType.SUBSOIL.value,
}
for event in snapshot.event_history:
if event.zone_id is None or str(event.action) not in field_action_names:
continue
active_challenges = challenge_records_for_step(
snapshot,
zone_id=event.zone_id,
step_index=event.step_index,
)
tile_actions.append(
{
"step_index": event.step_index,
"zone_id": event.zone_id,
"action": str(event.action),
"execution_outcome": str(event.execution_outcome),
"reward": event.reward,
"from_zone_id": event.details.get("from_zone_id"),
"to_zone_id": event.details.get("to_zone_id"),
"revealed_facts": list(event.details.get("revealed_facts") or []),
"challenge_names": [record["name"] for record in active_challenges],
"challenges": active_challenges,
"planned_task": (
event.planner_context.normalized_task.model_dump(mode="json")
if event.planner_context is not None and event.planner_context.normalized_task is not None
else None
),
"requested_task": (
event.planner_context.chosen_task_before_normalization.model_dump(mode="json")
if event.planner_context is not None
and event.planner_context.chosen_task_before_normalization is not None
else None
),
}
)
return {
"session_id": session_id,
"exported_at": export_timestamp(),
"mode": snapshot.observation.mode,
"step_index": snapshot.observation.step_index,
"event_count": len(snapshot.event_history),
"tile_actions": tile_actions,
}
def write_script_eval_exports(env: SimulatorEnvironment, snapshot, *, session_id: str) -> None:
SCRIPT_EVAL_EXPORT_ROOT.mkdir(parents=True, exist_ok=True)
FIELD_TILE_STATE_EXPORT_PATH.write_text(
json.dumps(
build_field_tile_state_export(env, snapshot, session_id=session_id),
indent=2,
sort_keys=True,
),
encoding="utf-8",
)
TILE_ACTION_LOG_EXPORT_PATH.write_text(
json.dumps(
build_tile_action_log_export(snapshot, session_id=session_id),
indent=2,
sort_keys=True,
),
encoding="utf-8",
)
write_sim_input_csvs(snapshot)
def render_outcome_banner(snapshot, env: SimulatorEnvironment) -> None:
outcome = snapshot.outcome
if outcome.status == EpisodeOutcomeStatus.SUCCESS:
st.success(outcome.message)
elif outcome.status == EpisodeOutcomeStatus.FAILURE:
st.error(f"{outcome.message} ({outcome.done_reason})")
elif outcome.status == EpisodeOutcomeStatus.ERROR:
st.warning(f"{outcome.message} ({outcome.done_reason})")
elif outcome.status == EpisodeOutcomeStatus.STOPPED:
st.warning(outcome.message)
elif env.mode == SimulatorMode.RIDDLE:
st.info("Riddle in progress.")
else:
st.info("Persistent world is running. The robot moves one neighboring field at a time, then performs field checks, soil sampling, or treatments on the field where it stands.")
def render_primary_objective(riddle) -> None:
st.markdown(f"## Objective: {riddle.objective}")
note_left, note_center, note_right = st.columns(3)
with note_left:
st.info(f"**Riddle Note**\n\n{riddle.description}")
with note_center:
st.info(f"**Challenge**\n\n{riddle.challenge_summary}")
with note_right:
st.info(f"**Why It Matters**\n\n{riddle.why_it_matters}")
def battery_percentage_int(energy_remaining: float | None, max_energy: float | None) -> int | None:
if energy_remaining is None or max_energy in {None, 0}:
return None
ratio = max(0.0, min(1.0, float(energy_remaining) / float(max_energy)))
if ratio <= 0.0:
return 0
return max(0, min(100, int(math.ceil(ratio * 100.0))))
def render_battery_indicator(energy_remaining: float | None, max_energy: float | None) -> None:
if energy_remaining is None or max_energy in {None, 0}:
st.write("Battery unavailable")
return
percentage = battery_percentage_int(energy_remaining, max_energy)
assert percentage is not None
filled = round(percentage / 20)
if percentage >= 60:
fill_color, border_color = "#16a34a", "#15803d"
elif percentage >= 30:
fill_color, border_color = "#d97706", "#b45309"
else:
fill_color, border_color = "#dc2626", "#b91c1c"
segs = "".join(
f'<div style="flex:1;height:100%;background:{fill_color if i < filled else "#e2e8f0"};border-radius:2px;"></div>'
for i in range(5)
)
st.markdown(
f"""
<div style="display:flex;align-items:center;gap:10px;justify-content:center;padding:4px 0;">
<div style="display:flex;align-items:center;">
<div style="width:68px;height:26px;border:2px solid {border_color};border-radius:5px;padding:3px;display:flex;gap:2px;background:#f8fafc;">
{segs}
</div>
<div style="width:5px;height:13px;background:{border_color};border-radius:0 3px 3px 0;margin-left:-1px;flex-shrink:0;"></div>
</div>
<span style="font-weight:700;font-size:14px;color:#1e293b;">{percentage}%</span>
</div>
""",
unsafe_allow_html=True,
)
def render_riddle_layout(
env: SimulatorEnvironment,
snapshot,
target_zone_id: str | None,
show_truth: bool,
riddle,
selected_event,
) -> None:
map_col, right_col = st.columns([3.8, 1.3])
with map_col:
st.subheader("Field Grid")
figure = get_cached_zone_grid_figure(
env,
target_zone_id=target_zone_id,
show_truth=show_truth,
mission_plan=snapshot.mission_plan,
show_planned_trajectory=show_planned_trajectory_enabled(),
)
plot_state = st.plotly_chart(
figure,
width="stretch",
on_select="rerun",
selection_mode="points",
# Keep a stable widget identity so step updates do not remount the chart.
key="riddle-grid",
)
update_selected_zone_from_plot_state(plot_state, env, fallback_zone_id=target_zone_id)
render_selected_step_summary(selected_event)
with right_col:
step_col, done_col = st.columns(2)
with step_col:
st.metric("Step", snapshot.observation.step_index)
with done_col:
st.metric("Done", "Yes" if snapshot.done else "No")
if snapshot.observation.max_energy is not None:
st.write("**Battery**")
render_battery_indicator(snapshot.observation.energy_remaining, snapshot.observation.max_energy)
render_grid_legend()
render_selected_zone_panel(env, snapshot, selected_zone_id(env, target_zone_id=target_zone_id))
with st.container():
render_metrics_overview(env)
def _render_challenge_status(snapshot) -> None:
all_challenges = list(snapshot.active_challenges) + list(snapshot.resolved_challenges)
recent = sorted(all_challenges, key=lambda c: c.placed_step_index, reverse=True)[:3]
st.markdown("**Recent Challenges**")
if not recent:
st.caption("No challenges placed yet.")
return
for ch in recent:
zone = short_zone_label(ch.zone_id)
st.caption(f"· **{ch.name}** @ {zone}")
def render_challenge_placement_controls(challenge_palette) -> None:
with st.container(border=True):
st.caption("Challenge Placement")
challenge_col, radius_col, action_col = st.columns([2.5, 1.1, 1.1])
with challenge_col:
template_id = st.selectbox(
"Challenge",
list(challenge_palette),
format_func=lambda item: challenge_palette[item].display_name,
key="interactive_manual_template",
)
with radius_col:
radius = st.selectbox(
"Radius",
[1, 2, 3],
format_func=lambda value: f"{value} tile" if value == 1 else f"{value} tiles",
key="interactive_manual_radius",
)
with action_col:
st.write("")
if st.button("Apply", key="interactive_manual_apply", width="stretch"):
arm_pending_challenge(template_id, int(radius))
if hasattr(st, "toast"):
st.toast("Apply by selecting the tile.")
pending_request = pending_challenge_request()
if pending_request is not None:
radius = pending_challenge_radius()
st.info(
"Apply by selecting the tile."
f" Pending: {challenge_palette[pending_request].display_name}, radius {radius}."
)
else:
st.caption("Choose a challenge, press Apply, then click the target tile on the field.")
def maybe_apply_pending_challenge_selection(env: SimulatorEnvironment, zone_id: str | None, strategy) -> bool:
pending_request = pending_challenge_request()
if pending_request is None or zone_id is None:
return False
template_id = pending_request
radius = pending_challenge_radius()
clear_pending_challenge()
st.session_state[SELECTED_ZONE_KEY] = zone_id
try:
success, message = env.place_challenge(template_id, zone_id, radius=radius)
except NotImplementedError as exc:
success, message = False, str(exc)
if success:
if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)):
pause_playback(clear_last_tick=True)
env.current_assessments, env.current_plan = strategy.plan(env.planning_observation()) # type: ignore[arg-type]
env.invalidate_cached_views()
bump_env_generation()
set_placement_flash_message(success, message)
st.rerun()
return True
def _render_world_map_fragment(
env: SimulatorEnvironment,
snapshot,
selected_event,
strategy,
) -> None:
"""Render the world map chart inside a stable fragment block.
Wrapping this in @st.fragment gives the chart a stable, identifier-based
parent block so it is never unmounted when unrelated elements above it shift
the main-page element tree between reruns.
"""
map_col, side_col = st.columns([3.2, 1.2])
with map_col:
st.subheader("World Map")
figure = get_cached_zone_grid_figure(
env,
target_zone_id=None,
show_truth=False,
mission_plan=snapshot.mission_plan,
show_planned_trajectory=show_planned_trajectory_enabled(),
)
plot_state = st.plotly_chart(
figure,
width="stretch",
on_select="rerun",
selection_mode="points",
key="interactive-grid",
)
clicked_zone_id = selected_zone_from_plot_state(plot_state)
if clicked_zone_id is not None:
st.session_state[SELECTED_ZONE_KEY] = clicked_zone_id
maybe_apply_pending_challenge_selection(env, clicked_zone_id, strategy)
update_selected_zone_from_plot_state(
plot_state,
env,
fallback_zone_id=(env.current_zone_id if env.current_zone_id in {zone.zone_id for zone in env.zone_states} else env.home_neighbor_zone_id),
)
render_selected_step_summary(selected_event)
with side_col:
render_grid_legend()
# Wrap with @st.fragment when available so the chart block has a stable
# identifier-based anchor in the page tree, independent of positional shifts
# caused by conditional elements elsewhere on the page.
_render_world_map_fragment = getattr(st, "fragment", lambda f: f)(_render_world_map_fragment)
def render_interactive_layout(env: SimulatorEnvironment, snapshot, selected_event, strategy, challenge_palette) -> None:
with st.container(border=True):
step_col, battery_col, challenge_col = st.columns([0.8, 1.3, 3.4])
with step_col:
st.metric("Step", snapshot.observation.step_index)
if env.mode != SimulatorMode.REAL_FIELD:
metric_label, metric_value = field_layout_metric(env)
st.metric(metric_label, metric_value)
with battery_col:
st.caption("Battery")
render_battery_indicator(snapshot.observation.energy_remaining, snapshot.observation.max_energy)
with challenge_col:
_render_challenge_status(snapshot)
render_challenge_placement_controls(challenge_palette)
_render_world_map_fragment(env, snapshot, selected_event, strategy)
render_selected_zone_panel(
env,
snapshot,
selected_zone_id(
env,
target_zone_id=(env.current_zone_id if env.current_zone_id in {zone.zone_id for zone in env.zone_states} else env.home_neighbor_zone_id),
),
)
with st.container():
render_metrics_overview(env)
def mission_plan_signature(plan: MissionPlan | None) -> tuple[tuple[str | None, str | None, str | None], ...]:
if plan is None:
return ()
signature: list[tuple[str | None, str | None, str | None]] = []
for task in plan.ordered_tasks:
if task.target_zone is None:
continue
signature.append(
(
str(task.task_type),
task.target_zone,
str(task.control_intent) if task.control_intent is not None else None,
)
)
return tuple(signature)
def get_cached_zone_grid_figure(
env: SimulatorEnvironment,
*,
target_zone_id: str | None,
show_truth: bool,
mission_plan: MissionPlan | None,
show_planned_trajectory: bool,
) -> go.Figure:
cache = st.session_state.setdefault(FIGURE_CACHE_KEY, {})
generation = int(st.session_state.get(ENV_GENERATION_KEY, 0))
cache_key = (
id(env),
generation,
target_zone_id,
show_truth,
show_planned_trajectory,
mission_plan_signature(mission_plan),
)
figure = cache.get(cache_key)
if figure is not None:
return figure
figure = build_zone_grid_figure(
env,
target_zone_id=target_zone_id,
show_truth=show_truth,
mission_plan=mission_plan,
show_planned_trajectory=show_planned_trajectory,
)
cache[cache_key] = figure
if len(cache) > 8:
stale_keys = [key for key in cache if key != cache_key]
for stale_key in stale_keys[:-7]:
cache.pop(stale_key, None)
return figure
def get_real_field_static_map_payload(env: SimulatorEnvironment) -> RealFieldStaticMapPayload:
cache = st.session_state.setdefault(STATIC_MAP_CACHE_KEY, {})
cache_key = id(env)
payload = cache.get(cache_key)
if payload is not None:
return payload
grid_size = max(field_extent(env.zone_states))
cell_marker_size = max(28, 104 - ((grid_size - 3) * 7))
cell_text_size = max(8, 15 - max(0, grid_size - 3))
marker_ring_size = cell_marker_size + max(12, 30 - max(0, grid_size - 3) * 2)
step_marker_size = max(16, 28 - max(0, grid_size - 3))
home_base_zone = env._zone_state(env.home_neighbor_zone_id) if env.home_neighbor_zone_id is not None else None
home_base_row = home_base_zone.row if home_base_zone is not None else max(1, (grid_size + 1) // 2)
zone_ids = tuple(zone.zone_id for zone in env.zone_states)
xs = tuple(zone.col or 0 for zone in env.zone_states)
ys = tuple(-(zone.row or 0) for zone in env.zone_states)
x_values = xs
y_values = ys
home_base_x = min(x_values) - 2.0
home_base_y = -home_base_row
entry_x = float(home_base_zone.col if home_base_zone is not None else min(x_values))
entry_y = float(-(home_base_zone.row or home_base_row) if home_base_zone is not None else -home_base_row)
figure = go.Figure()
figure.add_trace(
go.Scatter(
x=[home_base_x, entry_x],
y=[home_base_y, entry_y],
mode="lines",
line={"color": "#64748b", "width": 2, "dash": "dot"},
hoverinfo="skip",
showlegend=False,
)
)
figure.add_layout_image(
dict(
source=base_icon_data_uri(),
x=home_base_x,
y=home_base_y,
sizex=1.95,
sizey=1.95,
xref="x",
yref="y",
xanchor="center",
yanchor="middle",
layer="above",
)
)
figure.add_annotation(
x=home_base_x,
y=home_base_y - 1.25,
text="Robot Home Base",
showarrow=False,
font={"size": max(9, cell_text_size), "color": "#334155"},
)
figure.update_layout(
margin={"l": 10, "r": 10, "t": 10, "b": 10},
xaxis={"title": "Column", "dtick": 1, "range": [home_base_x - 1.2, max(x_values) + 0.6], "showgrid": False, "zeroline": False, "constrain": "domain"},
yaxis={"title": "Row", "dtick": 1, "range": [min(min(y_values), home_base_y) - 1.6, max(y_values) + 0.6], "showgrid": False, "zeroline": False, "scaleanchor": "x", "scaleratio": 1},
height=min(900, max(480, 90 * grid_size)),
plot_bgcolor="#f8f5ef",
paper_bgcolor="#ffffff",
# Stable uirevision tells Plotly.js to preserve the current viewport (zoom/pan)
# across data updates, preventing a full-reset flash on every simulation step.
uirevision=1,
)
payload = RealFieldStaticMapPayload(
figure=figure,
zone_ids=zone_ids,
xs=xs,
ys=ys,
x_values=x_values,
y_values=y_values,
grid_size=grid_size,
cell_marker_size=cell_marker_size,
cell_text_size=cell_text_size,
marker_ring_size=marker_ring_size,
step_marker_size=step_marker_size,
home_base_x=home_base_x,
home_base_y=home_base_y,
entry_x=entry_x,
entry_y=entry_y,
tile_world_x0=min(x_values) - 0.5,
tile_world_x1=max(x_values) + 0.5,
tile_world_y0=min(y_values) - 0.5,
tile_world_y1=max(y_values) + 0.5,
)
cache[cache_key] = payload
if len(cache) > 4:
stale_keys = [key for key in cache if key != cache_key]
for stale_key in stale_keys[:-3]:
cache.pop(stale_key, None)
return payload
def location_xy(
env: SimulatorEnvironment,
static_payload: RealFieldStaticMapPayload,
zone_id: str | None,
) -> tuple[float | None, float | None]:
if zone_id is None:
return None, None
if zone_id == env.home_zone_id:
return static_payload.home_base_x, static_payload.home_base_y
zone = env._zone_state(zone_id)
return float(zone.col or 0), float(-(zone.row or 0))
def build_planned_horizon_payload(
env: SimulatorEnvironment,
*,
mission_plan: MissionPlan | None,
move_step_budget: int = PLANNED_TRAJECTORY_MOVE_STEP_BUDGET,
) -> PlannedHorizonPayload | None:
plan = mission_plan or env.current_plan
if plan is None or not plan.ordered_tasks:
return None
anchor_zone_id = env.current_zone_id or env.home_zone_id
starts_from_home = env.current_zone_id == env.home_zone_id
if anchor_zone_id is None:
return None
route_zone_ids: list[str] = []
planned_actions: list[PlannedHorizonAction] = []
move_steps_used = 0
action_order = 0
for task in plan.ordered_tasks:
target_zone_id = task.target_zone
if target_zone_id is None:
continue
if anchor_zone_id == target_zone_id:
path = [target_zone_id]
else:
path = shortest_path_zone_ids(
env.zone_states,
anchor_zone_id,
target_zone_id,
home_zone_id=env.home_zone_id,
home_neighbor_zone_id=env.home_neighbor_zone_id,
)
if not path:
continue
path_move_count = max(0, len(path) - 1)
remaining_move_budget = max(0, move_step_budget - move_steps_used)
path_to_add = path
reached_target = True
if path_move_count > remaining_move_budget:
path_to_add = path[: remaining_move_budget + 1]
reached_target = False
if path_to_add:
if not route_zone_ids:
route_zone_ids.extend(path_to_add)
elif route_zone_ids[-1] == path_to_add[0]:
route_zone_ids.extend(path_to_add[1:])
else:
route_zone_ids.extend(path_to_add)
if not reached_target:
break
move_steps_used += path_move_count
anchor_zone_id = target_zone_id
if task.task_type != ActionType.MOVE:
action_order += 1
planned_actions.append(
PlannedHorizonAction(
order=action_order,
zone_id=target_zone_id,
task_type=str(task.task_type),
rationale=task.rationale,
)
)
if move_steps_used >= move_step_budget:
continue
if not route_zone_ids and not planned_actions:
return None
return PlannedHorizonPayload(
route_zone_ids=tuple(route_zone_ids),
starts_from_home=starts_from_home,
actions=tuple(planned_actions),
)
def build_planned_trajectory_payload(
env: SimulatorEnvironment,
*,
mission_plan: MissionPlan | None,
static_payload: RealFieldStaticMapPayload,
move_step_budget: int = PLANNED_TRAJECTORY_MOVE_STEP_BUDGET,
) -> PlannedTrajectoryPayload | None:
horizon = build_planned_horizon_payload(
env,
mission_plan=mission_plan,
move_step_budget=move_step_budget,
)
if horizon is None:
return None
task_marker_xs: list[float] = []
task_marker_ys: list[float] = []
task_marker_texts: list[str] = []
task_marker_sizes: list[int] = []
task_marker_colors: list[str] = []
task_marker_hovers: list[str] = []
per_zone_offsets: dict[str, int] = {}
for planned_action in horizon.actions:
target_zone_id = planned_action.zone_id
zone_offset = per_zone_offsets.get(target_zone_id, 0)
per_zone_offsets[target_zone_id] = zone_offset + 1
offset_x, offset_y = numbered_step_offset(zone_offset)
marker_x, marker_y = location_xy(env, static_payload, target_zone_id)
if marker_x is None or marker_y is None:
continue
task_marker_xs.append(marker_x + (offset_x * 0.75))
task_marker_ys.append(marker_y + (offset_y * 0.75))
task_marker_texts.append(str(planned_action.order))
task_marker_sizes.append(static_payload.step_marker_size + (10 if planned_action.order == 1 else 4))
task_marker_colors.append("#0f172a" if planned_action.order == 1 else "rgba(15,23,42,0.72)")
task_marker_hovers.append(
f"#{planned_action.order} {humanize_action_name(planned_action.task_type)} on "
f"{short_zone_label(target_zone_id)}<br>{planned_action.rationale}"
)
if not horizon.route_zone_ids and not task_marker_xs:
return None
route_xs: list[float] = []
route_ys: list[float] = []
for zone_id in horizon.route_zone_ids:
marker_x, marker_y = location_xy(env, static_payload, zone_id)
if marker_x is None or marker_y is None:
continue
route_xs.append(marker_x)
route_ys.append(marker_y)
waypoint_zone_ids = horizon.route_zone_ids[1:]
waypoint_xs: list[float] = []
waypoint_ys: list[float] = []
for zone_id in waypoint_zone_ids:
marker_x, marker_y = location_xy(env, static_payload, zone_id)
if marker_x is None or marker_y is None:
continue
waypoint_xs.append(marker_x)
waypoint_ys.append(marker_y)
return PlannedTrajectoryPayload(
route_zone_ids=horizon.route_zone_ids,
starts_from_home=horizon.starts_from_home,
route_xs=route_xs,
route_ys=route_ys,
waypoint_xs=waypoint_xs,
waypoint_ys=waypoint_ys,
task_marker_xs=task_marker_xs,
task_marker_ys=task_marker_ys,
task_marker_texts=task_marker_texts,
task_marker_sizes=task_marker_sizes,
task_marker_colors=task_marker_colors,
task_marker_hovers=task_marker_hovers,
)
def build_real_field_dynamic_map_payload(
env: SimulatorEnvironment,
*,
target_zone_id: str | None,
show_truth: bool,
static_payload: RealFieldStaticMapPayload,
mission_plan: MissionPlan | None,
show_planned_trajectory: bool,
) -> RealFieldDynamicMapPayload:
assessments = {assessment.zone_id: assessment for assessment in env.current_assessments}
current_zone_id = env.current_zone_id
previous_zone_id = env.event_history[-1].details.get("from_zone_id") if env.event_history else None
latest_event_by_zone = env.latest_events_by_zone()
latest_soil_event_by_zone = env.latest_zone_action_events(ActionType.MEASURE_SOIL)
step_index = env.step_index
zone_action_badges = build_zone_action_badges(env, mission_plan=mission_plan)
texts: list[str] = []
colors: list[str] = []
line_colors: list[str] = []
line_widths: list[int] = []
hover_texts: list[str] = []
for zone in env.zone_states:
assessment = assessments.get(zone.zone_id)
hidden = env.hidden_zone_states[zone.zone_id]
label = build_zone_cell_text(
zone=zone,
action_badges=zone_action_badges.get(zone.zone_id, []),
)
if show_truth:
label += f"<br>{hidden.true_priority_band}"
texts.append(label)
fill_color = priority_fill_color(
float(zone.static_features.get("scenario_priority_hint", 0.0) or 0.0),
needs_intervention=visible_tile_needs_intervention(zone),
yield_pred_missing=tile_has_missing_yield(zone),
)
colors.append(fill_color)
border_color, border_width = zone_border_style(
hidden=hidden,
is_current_zone=zone.zone_id == current_zone_id,
measured_fill_color=fill_color,
)
line_colors.append(border_color)
line_widths.append(border_width)
hover_texts.append(
build_zone_hover_text(
zone_state=zone,
assessment=assessment,
event=latest_event_by_zone.get(zone.zone_id),
soil_event=latest_soil_event_by_zone.get(zone.zone_id),
step_index=step_index,
)
)
event_marker_xs: list[float] = []
event_marker_ys: list[float] = []
event_marker_texts: list[str] = []
event_marker_colors: list[str] = []
event_marker_hovers: list[str] = []
per_zone_offsets: dict[str, int] = {}
for event in env.event_history[-8:]:
if event.zone_id is None:
continue
zone_offset = per_zone_offsets.get(event.zone_id, 0)
per_zone_offsets[event.zone_id] = zone_offset + 1
offset_x, offset_y = numbered_step_offset(zone_offset)
event_x, event_y = location_xy(env, static_payload, event.zone_id)
if event_x is None or event_y is None:
continue
event_marker_xs.append(event_x + offset_x)
event_marker_ys.append(event_y + offset_y)
event_marker_texts.append(str(event.step_index))
event_marker_colors.append(action_color(str(event.action)))
event_marker_hovers.append(build_event_hover_text(event))
planned_trajectory = (
build_planned_trajectory_payload(
env,
mission_plan=mission_plan,
static_payload=static_payload,
)
if show_planned_trajectory
else None
)
return RealFieldDynamicMapPayload(
texts=texts,
colors=colors,
line_colors=line_colors,
line_widths=line_widths,
hover_texts=hover_texts,
previous_zone_id=previous_zone_id,
current_zone_id=current_zone_id,
event_marker_xs=event_marker_xs,
event_marker_ys=event_marker_ys,
event_marker_texts=event_marker_texts,
event_marker_colors=event_marker_colors,
event_marker_hovers=event_marker_hovers,
planned_trajectory=planned_trajectory,
)
def build_zone_grid_figure(
env: SimulatorEnvironment,
target_zone_id: str | None,
show_truth: bool,
*,
mission_plan: MissionPlan | None = None,
show_planned_trajectory: bool = True,
) -> go.Figure:
static_payload = get_real_field_static_map_payload(env)
dynamic_payload = build_real_field_dynamic_map_payload(
env,
target_zone_id=target_zone_id,
show_truth=show_truth,
static_payload=static_payload,
mission_plan=mission_plan,
show_planned_trajectory=show_planned_trajectory,
)
figure = go.Figure(static_payload.figure)
# Draw cell tiles as native Plotly shapes (SVG rects in data coords) instead of a
# raster layout_image. Plotly updates shape attributes in-place via Plotly.react()
# with no image-swap step, eliminating the per-step flash caused by the old approach.
cell_shapes = [
{
"type": "rect",
"x0": float(x) - 0.5,
"x1": float(x) + 0.5,
"y0": float(y) - 0.5,
"y1": float(y) + 0.5,
"fillcolor": fill_color,
"line": {"color": line_color, "width": float(line_width)},
"layer": "below",
"xref": "x",
"yref": "y",
}
for x, y, fill_color, line_color, line_width in zip(
static_payload.xs,
static_payload.ys,
dynamic_payload.colors,
dynamic_payload.line_colors,
dynamic_payload.line_widths,
strict=False,
)
]
figure.update_layout(shapes=cell_shapes)
figure.add_trace(
go.Scatter(
x=list(static_payload.xs),
y=list(static_payload.ys),
mode="markers+text",
showlegend=False,
text=dynamic_payload.texts,
textposition="middle center",
textfont={"size": static_payload.cell_text_size},
marker={
"size": static_payload.cell_marker_size,
"symbol": "square",
"color": dynamic_payload.colors,
"line": {"width": dynamic_payload.line_widths, "color": dynamic_payload.line_colors},
"opacity": 0.001,
},
hovertemplate="%{hovertext}<extra></extra>",
hovertext=dynamic_payload.hover_texts,
customdata=list(static_payload.zone_ids),
selected={"marker": {"opacity": 1.0}},
unselected={"marker": {"opacity": 1.0}},
)
)
if (
dynamic_payload.previous_zone_id is not None
and dynamic_payload.current_zone_id is not None
and dynamic_payload.previous_zone_id != dynamic_payload.current_zone_id
):
previous_x, previous_y = location_xy(env, static_payload, dynamic_payload.previous_zone_id)
current_x, current_y = location_xy(env, static_payload, dynamic_payload.current_zone_id)
if previous_x is not None and previous_y is not None and current_x is not None and current_y is not None:
figure.add_trace(
go.Scatter(
x=[previous_x, current_x],
y=[previous_y, current_y],
mode="lines",
line={"color": "#9aa6b2", "width": 3, "dash": "dot"},
hoverinfo="skip",
showlegend=False,
)
)
figure.add_trace(
go.Scatter(
x=[previous_x],
y=[previous_y],
mode="markers+text",
text=["Prev"],
textposition="bottom center",
textfont={"size": max(8, static_payload.cell_text_size - 2), "color": "#64748b"},
marker={"size": max(12, static_payload.step_marker_size - 8), "symbol": "circle", "color": "rgba(100,116,139,0.35)", "line": {"width": 0}},
hovertemplate="Previous robot position<extra></extra>",
showlegend=False,
)
)
if dynamic_payload.planned_trajectory is not None:
planned = dynamic_payload.planned_trajectory
if len(planned.route_xs) >= 2:
figure.add_trace(
go.Scatter(
x=planned.route_xs,
y=planned.route_ys,
mode="lines",
line={"color": "rgba(15,23,42,0.48)", "width": 5},
hoverinfo="skip",
showlegend=False,
)
)
if planned.waypoint_xs:
figure.add_trace(
go.Scatter(
x=planned.waypoint_xs,
y=planned.waypoint_ys,
mode="markers",
marker={
"size": max(8, static_payload.step_marker_size - 8),
"symbol": "circle",
"color": "rgba(15,23,42,0.50)",
"line": {"width": 1, "color": "rgba(255,255,255,0.95)"},
},
hoverinfo="skip",
showlegend=False,
)
)
if planned.task_marker_xs:
figure.add_trace(
go.Scatter(
x=planned.task_marker_xs,
y=planned.task_marker_ys,
mode="markers+text",
text=planned.task_marker_texts,
textposition="middle center",
textfont={"color": "white", "size": max(8, static_payload.cell_text_size - 1)},
marker={
"size": planned.task_marker_sizes,
"symbol": "circle",
"color": planned.task_marker_colors,
"line": {"width": 2, "color": "white"},
},
customdata=planned.task_marker_hovers,
hovertemplate="%{customdata}<extra></extra>",
showlegend=False,
)
)
if dynamic_payload.event_marker_xs:
figure.add_trace(
go.Scatter(
x=dynamic_payload.event_marker_xs,
y=dynamic_payload.event_marker_ys,
mode="markers+text",
text=dynamic_payload.event_marker_texts,
textposition="middle center",
textfont={"color": "white", "size": max(8, static_payload.cell_text_size - 1)},
marker={
"size": static_payload.step_marker_size,
"symbol": "circle",
"color": dynamic_payload.event_marker_colors,
"line": {"width": 2, "color": "white"},
},
customdata=dynamic_payload.event_marker_hovers,
hovertemplate="%{customdata}<extra></extra>",
showlegend=False,
)
)
if dynamic_payload.current_zone_id is not None:
robot_x, robot_y = location_xy(env, static_payload, dynamic_payload.current_zone_id)
if robot_x is None or robot_y is None:
return figure
figure.add_trace(
go.Scatter(
x=[robot_x],
y=[robot_y],
mode="markers",
marker={
"size": static_payload.marker_ring_size + 20,
"symbol": "circle-open",
"color": "rgba(0,0,0,0)",
"line": {"width": 10, "color": "#0f172a"},
},
hovertemplate="Robot current position<extra></extra>",
showlegend=False,
)
)
figure.add_layout_image(
dict(
source=robot_icon_data_uri(),
x=robot_x,
y=robot_y,
sizex=0.88,
sizey=0.88,
xref="x",
yref="y",
xanchor="center",
yanchor="middle",
layer="above",
)
)
return figure
def build_latest_event_by_zone(event_history) -> dict[str, object]:
latest = {}
for event in event_history:
if event.zone_id is not None:
latest[event.zone_id] = event
return latest
def build_latest_zone_action_by_zone(event_history, action: ActionType) -> dict[str, object]:
latest = {}
for event in event_history:
if event.zone_id is not None and event.action == action:
latest[event.zone_id] = event
return latest
def latest_soil_measurement_event(event_history, zone_id: str):
for event in reversed(event_history):
if event.zone_id == zone_id and event.action == ActionType.MEASURE_SOIL:
return event
return None
def latest_field_check_event(event_history, zone_id: str):
for event in reversed(event_history):
if event.zone_id == zone_id and event.action in {ActionType.INSPECT, ActionType.REVISIT}:
return event
return None
def latest_zone_action_event(event_history, zone_id: str, action: ActionType):
for event in reversed(event_history):
if event.zone_id == zone_id and event.action == action:
return event
return None
def format_steps_ago(current_step_index: int, measured_step_index: int | None) -> str:
if measured_step_index is None:
return "age unknown"
delta = max(0, current_step_index - measured_step_index)
if delta == 0:
return "measured this step"
if delta == 1:
return "measured 1 step ago"
return f"measured {delta} steps ago"
def format_steps_since(current_step_index: int, event_step_index: int | None, *, label: str = "checked") -> str:
if event_step_index is None:
return "not yet"
delta = max(0, current_step_index - event_step_index)
if delta == 0:
return f"{label} this step"
if delta == 1:
return f"{label} 1 step ago"
return f"{label} {delta} steps ago"
def format_optional_number(value, decimals: int = 2, suffix: str = "") -> str:
if value is None:
return "—"
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
return f"{numeric:.{decimals}f}{suffix}"
def normalize_priority_label(value) -> str:
text = str(value)
return {
"Inspect now": "Act now",
"Inspect soon": "Actionable",
"Watch": "Monitor",
"Low attention": "Low",
}.get(text, text)
def short_action_label(action_name: str) -> str:
return {
"move": "M",
"inspect": "C",
"revisit": "C",
"defer": "C",
"patrol": "M",
"recharge": "M",
"remove_weeds": "C",
"measure_soil": "S",
"apply_fertilizer": "F",
"apply_water": "W",
"install_drainage": "D",
"subsoil": "U",
"end_mission": "M",
}.get(action_name, action_name[:1].upper())
def action_color(action_name: str) -> str:
return {
"move": "#2563eb",
"inspect": "#1f7a8c",
"revisit": "#2a9d8f",
"defer": "#1f7a8c",
"patrol": "#2563eb",
"recharge": "#2563eb",
"remove_weeds": "#1f7a8c",
"measure_soil": "#7c3aed",
"apply_fertilizer": "#4f772d",
"apply_water": "#2a6f97",
"install_drainage": "#8d6e63",
"subsoil": "#6b705c",
"end_mission": "#2563eb",
}.get(action_name, "#264653")
def build_zone_hover_text(*, zone_state, assessment, event, soil_event, step_index: int) -> str:
lines = []
if event is not None:
lines.append(f"last_execution={humanize_execution_outcome(event.execution_outcome)}")
else:
lines.append("last_execution=No execution yet")
urgency_score = float(zone_state.static_features.get("scenario_priority_hint", 0.0))
lines.append(f"urgency_score={urgency_score:.2f}")
if tile_has_missing_yield(zone_state):
lines.append("yield_pred=Missing")
next_action = humanize_action_name(str(assessment.recommended_action)) if assessment is not None else "Check field"
lines.append(f"next_action={next_action}")
if zone_state.soil_measurement is not None:
measurement_age = format_steps_ago(
step_index,
soil_event.step_index if soil_event is not None else None,
)
lines.append(f"soil_measurement={measurement_age}")
measurement = zone_state.soil_measurement
lines.append(
f"moisture={measurement.moisture_m3m3:.2f} m3/m3"
if measurement.moisture_m3m3 is not None
else "moisture=—"
)
lines.append(
f"nitrogen={measurement.nitrogen_gkg:.2f} g/kg"
if measurement.nitrogen_gkg is not None
else "nitrogen=—"
)
lines.append(
f"organic_carbon={measurement.organic_carbon_gkg:.1f} g/kg"
if measurement.organic_carbon_gkg is not None
else "organic_carbon=—"
)
lines.append(
f"ph={measurement.ph:.1f}"
if measurement.ph is not None
else "ph=—"
)
else:
lines.append("soil_measurement=Not measured")
return "<br>".join(lines)
def build_event_hover_text(event) -> str:
lines = [
f"step={event.step_index}",
f"step_action={humanize_action_name(str(event.action))}",
f"execution={humanize_execution_outcome(event.execution_outcome)}",
f"reward={event.reward:.2f}",
]
if event_control_intent(event) is not None:
lines.append(f"intent={humanize_control_intent(event_control_intent(event))}")
if event.control_state_after is not None:
lines.append(f"control_state={humanize_control_state(event.control_state_after)}")
if event_terminal_reason(event) is not None:
lines.append(f"terminal_reason={humanize_terminal_reason(event_terminal_reason(event))}")
if event.details.get("revealed_facts"):
lines.append("revealed=" + ", ".join(event.details["revealed_facts"][:3]))
return "<br>".join(lines)
CAUTION_URGENCY_THRESHOLD = REAL_FIELD_CAUTION_URGENCY_THRESHOLD
RED_URGENCY_THRESHOLD = REAL_FIELD_CRITICAL_URGENCY_THRESHOLD
def show_planned_trajectory_enabled() -> bool:
return bool(st.session_state.get(SHOW_PLANNED_TRAJECTORY_KEY, True))
def render_grid_legend() -> None:
st.markdown(
"""
<div style="border:1px solid #c9c0ac;border-radius:8px;padding:10px 14px;
background:#fffaf0;font-size:0.82em;line-height:2.0;margin-top:4px">
<strong>Legend</strong><br>
<em>Tile fill</em><br>
&bull; <span style="background:#bdd9bf;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;Green: no intervention tile, urgency &le; 0.52<br>
&bull; <span style="background:#f1e3b0;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;Sand: no intervention tile, urgency &gt; 0.52<br>
&bull; <span style="background:#edae49;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;Orange: intervention tile, urgency &lt; 0.978<br>
&bull; <span style="background:#d1495b;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;Red: intervention tile, urgency &ge; 0.978<br>
&bull; <span style="background:#000000;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;Black: predicted yield is missing, excluded from urgency planning<br>
<em>Tile border</em><br>
&bull; <span style="color:#9aa6b2;font-size:1.2em">&#9644;</span> Ground truth matches the robot&apos;s current belief<br>
&bull; <span style="background:#bdd9bf;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;<span style="background:#f1e3b0;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;<span style="background:#edae49;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;<span style="background:#d1495b;border-radius:3px;padding:1px 8px">&nbsp;</span>&nbsp;Colored border: hidden ground truth urgency differs from the robot&apos;s current belief<br>
<em>Tile action badges</em><br>
&bull; <span style="font-size:1.0em">1🌿</span> Planned tile action from the receding-horizon plan<br>
&bull; <span style="font-size:1.0em">→</span> Planned action executed correctly in the last 5 steps<br>
&bull; <span style="font-size:1.0em">✕</span> Planned action failed in the last 5 steps<br>
<em>Planned route</em><br>
&bull; <span style="color:#0f172a;font-size:1.2em">&#9472;&#9472;</span> Dark preview line for the next 10 movement steps<br>
&bull; <span style="color:#0f172a;font-size:1.2em">&#9679;</span> Connected waypoint dots<br>
&bull; <span style="display:inline-block;background:#0f172a;color:white;border-radius:999px;padding:0 7px;font-size:0.9em">1</span> Planned task marker<br>
<em>Markers</em><br>
&bull; &#x2295;&nbsp;🤖 Robot position<br>
&bull; &#x2460;&#x2461; Step order dots
</div>
""",
unsafe_allow_html=True,
)
def render_selected_step_summary(selected_event) -> None:
if selected_event is None:
return
location = format_event_location(selected_event)
outcome = humanize_execution_outcome(selected_event.execution_outcome)
st.caption(
f"Latest step {selected_event.step_index}: "
f"{humanize_action_name(str(selected_event.action))} | {location} | {outcome}"
)
def selected_zone_id(env: SimulatorEnvironment, *, target_zone_id: str | None) -> str:
valid_zone_ids = {zone.zone_id for zone in env.zone_states}
current = st.session_state.get(SELECTED_ZONE_KEY)
if current in valid_zone_ids:
return current
fallback = (
target_zone_id
or (env.current_zone_id if env.current_zone_id in valid_zone_ids else None)
or env.home_neighbor_zone_id
or env.zone_states[0].zone_id
)
st.session_state[SELECTED_ZONE_KEY] = fallback
return fallback
def update_selected_zone_from_plot_state(plot_state, env: SimulatorEnvironment, *, fallback_zone_id: str | None) -> None:
zone_id = selected_zone_from_plot_state(plot_state)
if zone_id is not None and zone_id in {zone.zone_id for zone in env.zone_states}:
st.session_state[SELECTED_ZONE_KEY] = zone_id
return
selected_zone_id(env, target_zone_id=fallback_zone_id)
def selected_zone_from_plot_state(plot_state) -> str | None:
if plot_state is None:
return None
selection = getattr(plot_state, "selection", None)
if selection is None and isinstance(plot_state, dict):
selection = plot_state.get("selection")
if selection is None:
return None
points = getattr(selection, "points", None)
if points is None and isinstance(selection, dict):
points = selection.get("points")
if not points:
return None
for point in points:
customdata = getattr(point, "customdata", None)
if customdata is None and isinstance(point, dict):
customdata = point.get("customdata")
if isinstance(customdata, str) and customdata.startswith("zone_"):
return customdata
return None
def latest_event_for_zone(snapshot, zone_id: str):
for event in reversed(snapshot.event_history):
if event.zone_id == zone_id:
return event
return None
def latest_evidence_event_for_zone(snapshot, zone_id: str):
for event in reversed(snapshot.event_history):
if event.zone_id != zone_id:
continue
evidence_path = event.details.get("evidence_image_path")
if evidence_path and Path(evidence_path).exists():
return event
return None
def overlay_weed_mask(evidence_path: str, mask_path: str) -> bytes:
"""Blend a blue semi-transparent weed segmentation mask over the evidence image."""
import io
import numpy as np
from PIL import Image
evidence = Image.open(evidence_path).convert("RGBA")
mask = Image.open(mask_path).convert("L")
if mask.size != evidence.size:
mask = mask.resize(evidence.size, Image.NEAREST)
mask_arr = np.array(mask)
overlay = np.zeros((*mask_arr.shape, 4), dtype=np.uint8)
overlay[mask_arr > 128] = [30, 100, 220, 150] # blue, semi-transparent
blended = Image.alpha_composite(evidence, Image.fromarray(overlay, "RGBA"))
buf = io.BytesIO()
blended.convert("RGB").save(buf, format="PNG")
return buf.getvalue()
def render_selected_zone_panel(env: SimulatorEnvironment, snapshot, zone_id: str) -> None:
zone = env._zone_state(zone_id)
evidence_state = snapshot.observation.zone_evidence_state_by_zone.get(zone_id)
workflow = snapshot.observation.riddle_workflows_by_zone.get(zone_id)
latest_event = env.latest_event_for_zone(zone_id)
inspect_event = env.latest_zone_action_event(zone_id, ActionType.INSPECT)
revisit_event = env.latest_zone_action_event(zone_id, ActionType.REVISIT)
if inspect_event is None:
field_check_event = revisit_event
elif revisit_event is None:
field_check_event = inspect_event
else:
field_check_event = inspect_event if inspect_event.step_index >= revisit_event.step_index else revisit_event
soil_event = env.latest_zone_action_event(zone_id, ActionType.MEASURE_SOIL)
fertilizer_event = env.latest_zone_action_event(zone_id, ActionType.APPLY_FERTILIZER)
water_event = env.latest_zone_action_event(zone_id, ActionType.APPLY_WATER)
step_index = snapshot.observation.step_index
static = zone.static_features or {}
weather = zone.weather_proxies or {}
inspection_count = evidence_state.guarded_context.inspection_count if evidence_state else 0
if zone.soil_measurement is not None:
m = zone.soil_measurement
soil_measurement_summary = (
f"{format_steps_ago(step_index, soil_event.step_index if soil_event is not None else None)} | "
f"moisture {format_optional_number(m.moisture_m3m3, 2)}, "
f"nitrogen {format_optional_number(m.nitrogen_gkg, 2)}, "
f"organic C {format_optional_number(m.organic_carbon_gkg, 1)}, "
f"pH {format_optional_number(m.ph, 1)}"
)
else:
soil_measurement_summary = "not yet"
if latest_event is not None:
last_execution_summary = (
f"{humanize_action_name(str(latest_event.action))} -> "
f"{humanize_execution_outcome(latest_event.execution_outcome)}"
)
else:
last_execution_summary = "none yet"
if zone.applied_fertilizer_amount_kg_ha is not None:
fertilizer_product = (
zone.fertilizer_recommendation.product_name
if zone.fertilizer_recommendation is not None
else "fertilizer"
)
fertilizer_summary = (
f"{format_steps_ago(step_index, fertilizer_event.step_index if fertilizer_event is not None else None)} "
f"({fertilizer_product} @ {zone.applied_fertilizer_amount_kg_ha:.1f} kg/ha)"
)
else:
fertilizer_summary = "not yet"
if zone.applied_water_amount_mm is not None:
water_summary = (
f"{format_steps_ago(step_index, water_event.step_index if water_event is not None else None)} "
f"({zone.applied_water_amount_mm:.1f} mm)"
)
else:
water_summary = "not yet"
with st.expander(f"Selected Field: {short_zone_label(zone_id)}", expanded=False):
overview_col, imagery_col, soil_col, robot_col = st.columns(4)
with overview_col:
st.markdown("**Overview**")
st.write(f"Urgency score: {format_optional_number(static.get('scenario_priority_hint'), 3)}")
st.write(f"Predicted yield missing: {'Yes' if tile_has_missing_yield(zone) else 'No'}")
st.write(f"Status: {humanize_metric_token(str(static.get('status', 'unknown')))}")
st.write(f"Primary stress: {humanize_metric_token(str(static.get('primary_stress', 'unknown')))}")
st.write(f"Recommended action: {humanize_metric_token(str(static.get('recommended_action', 'none')))}")
with imagery_col:
st.markdown("**Crop And Imagery**")
st.write(f"Yield: {format_optional_number(static.get('yield_tha'), 2, ' t/ha')}")
st.write(f"Predicted yield: {format_optional_number(static.get('yield_pred'), 2, ' t/ha')}")
st.write(f"Field mean yield: {format_optional_number(static.get('field_mean_yield'), 2, ' t/ha')}")
st.write(f"Yield deficit: {format_optional_number(static.get('yield_deficit_pct'), 1, ' %')}")
st.write(f"NDVI: {format_optional_number(static.get('ndvi_last'), 3)}")
st.write(f"NDRE: {format_optional_number(static.get('ndre_last'), 3)}")
st.write(f"NDWI: {format_optional_number(static.get('ndwi_last'), 3)}")
with soil_col:
st.markdown("**Soil And Field**")
st.write(f"Nitrogen: {format_optional_number(static.get('nitrogen_0-5'), 1)}")
st.write(f"Soil organic carbon: {format_optional_number(static.get('soc_0-5'), 1)}")
st.write(f"Clay: {format_optional_number(static.get('clay_0-5'), 1)}")
st.write(f"Sand: {format_optional_number(static.get('sand_0-5'), 1)}")
st.write(f"Temperature mean: {format_optional_number(weather.get('temp_mean_mean'), 2)}")
st.write(f"Total precipitation: {format_optional_number(weather.get('total_prec_mean'), 2)}")
with robot_col:
st.markdown("**Robot State And Actions**")
st.write(f"Field checks: {inspection_count}")
st.write(
f"Last checked: "
f"{format_steps_since(step_index, field_check_event.step_index if field_check_event is not None else None)}"
)
st.write(f"Last execution: {last_execution_summary}")
st.write(f"Soil measured: {soil_measurement_summary}")
st.write(f"Fertilizer applied: {fertilizer_summary}")
st.write(f"Water applied: {water_summary}")
if workflow is not None and workflow.last_message:
st.caption(workflow.last_message)
def render_metrics_overview(env: SimulatorEnvironment) -> None:
metrics = env.metrics()
summary_col, technical_col = st.columns(2)
with summary_col:
with st.expander("Run Summary", expanded=False):
st.caption("Plain-language summary of what happened in the simulator episode.")
for heading, lines in summarize_metrics(metrics).items():
st.write(f"**{heading}**")
for line in lines:
st.write(f"- {line}")
with technical_col:
with st.expander("Technical Metrics", expanded=False):
st.json(metrics)
def summarize_metrics(metrics: dict[str, object]) -> dict[str, list[str]]:
return {
"Mission Progress": [
f"Steps taken: {int(metrics.get('steps', 0))}",
f"Field checks: {int(metrics.get('inspect_actions', 0))}",
f"Follow-up checks: {int(metrics.get('revisit_actions', 0))}",
f"Soil samples: {int(metrics.get('measure_soil_actions', 0))}",
f"Challenges resolved: {int(metrics.get('resolved_challenges', 0))}",
],
"Outcome": [
f"Episode status: {humanize_metric_token(str(metrics.get('outcome_status', 'unknown')))}",
f"Termination reason: {humanize_metric_token(str(metrics.get('termination_reason', 'in_progress')))}",
f"Reward score: {metrics.get('total_reward', 0)}",
],
}
def grid_dimension(zone_states) -> int:
return max(max((zone.row or 0) for zone in zone_states), max((zone.col or 0) for zone in zone_states))
def add_field_grid_shapes(figure: go.Figure, zone_states) -> None:
x_values = [zone.col or 0 for zone in zone_states]
y_values = [-(zone.row or 0) for zone in zone_states]
if is_rectangular_field(zone_states):
vertical_boundaries = [min(x_values) - 0.5] + [x + 0.5 for x in sorted(set(x_values))[:-1]] + [max(x_values) + 0.5]
horizontal_boundaries = [max(y_values) + 0.5] + [y - 0.5 for y in sorted(set(y_values), reverse=True)[:-1]] + [min(y_values) - 0.5]
for boundary_x in vertical_boundaries:
figure.add_shape(
type="line",
x0=boundary_x,
x1=boundary_x,
y0=min(y_values) - 0.5,
y1=max(y_values) + 0.5,
line={"color": "#d7d0c0", "width": 1},
layer="below",
)
for boundary_y in horizontal_boundaries:
figure.add_shape(
type="line",
x0=min(x_values) - 0.5,
x1=max(x_values) + 0.5,
y0=boundary_y,
y1=boundary_y,
line={"color": "#d7d0c0", "width": 1},
layer="below",
)
return
for zone in zone_states:
col = zone.col or 0
row = zone.row or 0
figure.add_shape(
type="rect",
x0=col - 0.5,
x1=col + 0.5,
y0=-(row) - 0.5,
y1=-(row) + 0.5,
line={"color": "#d7d0c0", "width": 1},
fillcolor="rgba(255,255,255,0)",
layer="below",
)
def robot_icon_data_uri() -> str:
svg = """
<svg xmlns='http://www.w3.org/2000/svg' width='64' height='64' viewBox='0 0 64 64'>
<rect x='16' y='18' width='32' height='26' rx='8' fill='#1f2937'/>
<rect x='21' y='24' width='8' height='8' rx='4' fill='#f8fafc'/>
<rect x='35' y='24' width='8' height='8' rx='4' fill='#f8fafc'/>
<rect x='26' y='36' width='12' height='4' rx='2' fill='#93c5fd'/>
<rect x='20' y='46' width='4' height='10' rx='2' fill='#1f2937'/>
<rect x='40' y='46' width='4' height='10' rx='2' fill='#1f2937'/>
<rect x='13' y='28' width='4' height='10' rx='2' fill='#1f2937'/>
<rect x='47' y='28' width='4' height='10' rx='2' fill='#1f2937'/>
<rect x='30' y='10' width='4' height='8' rx='2' fill='#1f2937'/>
<circle cx='32' cy='8' r='4' fill='#2563eb'/>
</svg>
""".strip()
return f"data:image/svg+xml;utf8,{quote(svg)}"
def base_icon_data_uri() -> str:
svg = """
<svg xmlns='http://www.w3.org/2000/svg' width='64' height='64' viewBox='0 0 64 64'>
<rect x='10' y='30' width='44' height='20' rx='8' fill='#cbd5e1'/>
<rect x='18' y='18' width='28' height='16' rx='6' fill='#475569'/>
<rect x='28' y='8' width='8' height='10' rx='3' fill='#1e293b'/>
<rect x='24' y='36' width='16' height='8' rx='4' fill='#93c5fd'/>
</svg>
""".strip()
return f"data:image/svg+xml;utf8,{quote(svg)}"
def planned_action_icon(action_name: str) -> str:
return {
ActionType.INSPECT.value: "🔎",
ActionType.REVISIT.value: "🔎",
ActionType.DEFER.value: "⏸",
ActionType.REMOVE_WEEDS.value: "🌿",
ActionType.MEASURE_SOIL.value: "🧭",
ActionType.APPLY_FERTILIZER.value: "🧪",
ActionType.APPLY_WATER.value: "💧",
ActionType.INSTALL_DRAINAGE.value: "🛠",
ActionType.SUBSOIL.value: "⛏",
}.get(action_name, "•")
def planned_task_succeeded(event, *, task_type: str) -> bool:
success_by_task = {
ActionType.INSPECT.value: ExecutionOutcomeType.INSPECTION_COMPLETED.value,
ActionType.REVISIT.value: ExecutionOutcomeType.INSPECTION_COMPLETED.value,
ActionType.DEFER.value: ExecutionOutcomeType.DEFERRED.value,
ActionType.REMOVE_WEEDS.value: ExecutionOutcomeType.WEED_REMOVAL_COMPLETED.value,
ActionType.MEASURE_SOIL.value: ExecutionOutcomeType.SOIL_MEASUREMENT_COMPLETED.value,
ActionType.APPLY_FERTILIZER.value: ExecutionOutcomeType.FERTILIZER_APPLIED.value,
ActionType.APPLY_WATER.value: ExecutionOutcomeType.WATER_APPLIED.value,
ActionType.INSTALL_DRAINAGE.value: ExecutionOutcomeType.DRAINAGE_INSTALLED.value,
ActionType.SUBSOIL.value: ExecutionOutcomeType.SUBSOIL_COMPLETED.value,
}
return str(event.execution_outcome) == success_by_task.get(task_type)
def planned_task_failed(event, *, task_type: str) -> bool:
del task_type
return str(event.execution_outcome) in {
ExecutionOutcomeType.INVALID_TASK.value,
ExecutionOutcomeType.MOVE_REJECTED.value,
ExecutionOutcomeType.POSITION_REJECTED.value,
ExecutionOutcomeType.RESOURCE_EXHAUSTED.value,
}
def recent_task_status_by_zone_action(
env: SimulatorEnvironment,
*,
ttl_steps: int = RECENT_TASK_ICON_TTL_STEPS,
) -> dict[tuple[str, str], str]:
statuses: dict[tuple[str, str], str] = {}
minimum_step_index = max(0, env.step_index - ttl_steps)
for event in reversed(env.event_history):
if event.step_index < minimum_step_index:
break
planner_task = event.planner_context.normalized_task if event.planner_context is not None else None
if planner_task is None or planner_task.target_zone is None:
continue
task_type = str(planner_task.task_type)
if task_type == ActionType.MOVE.value:
continue
status_key = (planner_task.target_zone, task_type)
if status_key in statuses:
continue
if planned_task_succeeded(event, task_type=task_type):
statuses[status_key] = "→"
elif planned_task_failed(event, task_type=task_type):
statuses[status_key] = "✕"
return statuses
def build_zone_action_badges(
env: SimulatorEnvironment,
*,
mission_plan: MissionPlan | None,
) -> dict[str, list[str]]:
badges_by_zone: dict[str, list[str]] = {}
horizon = build_planned_horizon_payload(env, mission_plan=mission_plan)
recent_statuses = recent_task_status_by_zone_action(env)
represented_keys: set[tuple[str, str]] = set()
if horizon is not None:
for action in horizon.actions:
badge = f"{action.order}{planned_action_icon(action.task_type)}"
status = recent_statuses.get((action.zone_id, action.task_type))
if status is not None:
badge += status
badges_by_zone.setdefault(action.zone_id, []).append(badge)
represented_keys.add((action.zone_id, action.task_type))
for (zone_id, task_type), status in recent_statuses.items():
if (zone_id, task_type) in represented_keys:
continue
badges_by_zone.setdefault(zone_id, []).append(f"{planned_action_icon(task_type)}{status}")
return badges_by_zone
def build_zone_cell_text(*, zone, action_badges: list[str]) -> str:
del zone
if not action_badges:
return ""
return "<br>".join([" ".join(action_badges)])
def numbered_step_offset(index: int) -> tuple[float, float]:
offsets = [
(-0.28, 0.28),
(-0.02, 0.28),
(0.24, 0.28),
(-0.28, 0.02),
(-0.02, 0.02),
(0.24, 0.02),
(-0.28, -0.24),
(-0.02, -0.24),
(0.24, -0.24),
]
return offsets[index % len(offsets)]
_BAND_TO_PRIORITY: dict[str, float] = {
"low": 0.15,
"medium": 0.35,
"high": 0.65,
"critical": 0.9,
}
def priority_fill_color(priority: float, *, needs_intervention: bool, yield_pred_missing: bool = False) -> str:
if yield_pred_missing:
return MISSING_YIELD_COLOR
if needs_intervention:
if priority >= RED_URGENCY_THRESHOLD:
return "#d1495b"
return "#edae49"
if priority > CAUTION_URGENCY_THRESHOLD:
return "#f1e3b0"
return "#bdd9bf"
def ground_truth_fill_color(hidden) -> str:
needs_intervention = truth_tile_needs_intervention(hidden)
priority_hint = getattr(hidden, "true_priority_hint", None)
if priority_hint is not None:
priority = float(priority_hint or 0.0)
else:
band = getattr(getattr(hidden, "true_priority_band", None), "value", None) or "low"
priority = _BAND_TO_PRIORITY.get(str(band).lower(), 0.0)
return priority_fill_color(
priority,
needs_intervention=needs_intervention,
yield_pred_missing=bool(getattr(hidden, "true_yield_pred_missing", False)),
)
def zone_border_style(*, hidden, is_current_zone: bool, measured_fill_color: str) -> tuple[str, int]:
"""Return (border_color, border_width).
Robot position → dark ink, width 4
Belief/truth discrepancy → ground truth fill color as border, width 3
Belief matches truth → subtle blue-gray, width 1
"""
if is_current_zone:
return "#21312a", 4
truth_color = ground_truth_fill_color(hidden)
if truth_color != measured_fill_color:
return truth_color, 3 # discrepancy: ground truth differs from robot's view
return "#9aa6b2", 1 # visited, measurement matches ground truth
def visible_tile_needs_intervention(zone_state) -> bool:
mapped_action = action_type_for_field_recommendation(zone_state.static_features.get("recommended_action"))
if mapped_action in SERVICE_ACTIONS:
return True
return bool(zone_state.static_features.get("needs_intervention", False))
def tile_has_missing_yield(zone_state) -> bool:
return bool(zone_state.static_features.get("yield_pred_missing", False))
def truth_tile_needs_intervention(hidden) -> bool:
if bool(getattr(hidden, "true_needs_intervention", False)):
return True
mapped_action = action_type_for_field_recommendation(getattr(hidden, "true_recommended_action", "none"))
if mapped_action in SERVICE_ACTIONS:
return True
return (
getattr(hidden, "weed_removal_required", False)
or getattr(hidden, "fertilizer_recommendation", None) is not None
or getattr(hidden, "irrigation_recommendation", None) is not None
)
def format_step_option(event) -> str:
return f"Step {event.step_index}: {humanize_action_name(str(event.action))} | {format_event_location(event)}"
def format_event_location(event) -> str:
from_zone = event.details.get("from_zone_id")
to_zone = event.details.get("to_zone_id") or event.zone_id
if str(event.action) == ActionType.MOVE.value and from_zone and to_zone:
return f"{short_zone_label(from_zone)} -> {short_zone_label(to_zone)}"
if to_zone is None:
return "global action"
return f"at {short_zone_label(to_zone)}"
def current_location_label(zone_id: str | None) -> str:
if zone_id in {None, "home_base"}:
return "Home Base"
return short_zone_label(zone_id)
def short_zone_label(zone_id: str | None) -> str:
if not zone_id:
return "None"
if zone_id == "home_base":
return "Home Base"
try:
_, row_token, col_token = zone_id.split("_")
return f"R{int(row_token.replace('r', ''))}C{int(col_token.replace('c', ''))}"
except (ValueError, IndexError):
return zone_id
def priority_band_label(priority: float) -> str:
if priority >= 0.8:
return "Act now"
if priority >= 0.6:
return "Actionable"
if priority >= 0.35:
return "Monitor"
return "Low"
def priority_band_short_label(priority: float) -> str:
if priority >= 0.8:
return "Now"
if priority >= 0.6:
return "Act"
if priority >= 0.35:
return "Mon"
return "Low"
def humanize_constraint(constraint_level) -> str:
value = enum_value(constraint_level)
return {
"none": "No constraint",
"soft": "Needs reconfirmation",
"hard": "High constraint",
}.get(value, str(value))
def humanize_action_name(action_name: str) -> str:
return {
"move": "Move",
"inspect": "Check field",
"revisit": "Check field",
"defer": "Check field",
"patrol": "Move",
"recharge": "Move",
"remove_weeds": "Check field",
"measure_soil": "Take soil sample",
"apply_fertilizer": "Apply fertilizer",
"apply_water": "Apply water",
"install_drainage": "Install drainage",
"subsoil": "Subsoil",
"end_mission": "Move",
}.get(action_name, action_name.replace("_", " ").title())
def humanize_metric_token(value: str) -> str:
return value.replace("_", " ").title()
def humanize_control_intent(control_intent) -> str:
value = enum_value(control_intent)
return {
ResolutionIntent.CONTINUE_INSPECTION.value: "Continue field check",
ResolutionIntent.REQUEST_RECHECK.value: "Continue field check",
ResolutionIntent.DEFER_ZONE.value: "No action on tile",
ResolutionIntent.STOP_FOR_RESOURCE_LIMIT.value: "Stop for resource limit",
ResolutionIntent.STOP_FOR_BACKEND_ERROR.value: "Stop for backend error",
}.get(value, humanize_metric_token(str(value)))
def humanize_control_state(control_state) -> str:
value = enum_value(control_state)
return {
ControlState.UNSEEN.value: "Unseen",
ControlState.INSPECT_REQUIRED.value: "Field check required",
ControlState.RECHECK_REQUIRED.value: "Field check required",
ControlState.RESOLVED_COMPLETE.value: "Resolved complete",
ControlState.RESOLVED_DEFERRED.value: "No action applied",
ControlState.RESOLVED_ERROR.value: "Resolved error",
}.get(value, humanize_metric_token(str(value)))
def humanize_terminal_reason(terminal_reason) -> str:
value = enum_value(terminal_reason)
return {
TerminalReason.COMPLETION.value: "Completion",
TerminalReason.DEFERRED.value: "No action applied",
TerminalReason.RESOURCE_LIMIT.value: "Resource limit",
TerminalReason.BACKEND_ERROR.value: "Backend error",
}.get(value, humanize_metric_token(str(value)))
def humanize_execution_outcome(execution_outcome) -> str:
value = enum_value(execution_outcome)
return {
ExecutionOutcomeType.MISSION_STOPPED.value: "Mission stopped",
ExecutionOutcomeType.INVALID_TASK.value: "Invalid task",
ExecutionOutcomeType.MOVE_COMPLETED.value: "Move completed",
ExecutionOutcomeType.MOVE_SKIPPED.value: "Move skipped",
ExecutionOutcomeType.MOVE_REJECTED.value: "Move rejected",
ExecutionOutcomeType.POSITION_REJECTED.value: "Position rejected",
ExecutionOutcomeType.PATROL_COMPLETED.value: "Move completed",
ExecutionOutcomeType.RECHARGE_COMPLETED.value: "Move completed",
ExecutionOutcomeType.INSPECTION_COMPLETED.value: "Field check completed",
ExecutionOutcomeType.WEED_REMOVAL_COMPLETED.value: "Weed removal completed",
ExecutionOutcomeType.SOIL_MEASUREMENT_COMPLETED.value: "Soil sample completed",
ExecutionOutcomeType.FERTILIZER_APPLIED.value: "Fertilizer applied",
ExecutionOutcomeType.WATER_APPLIED.value: "Water applied",
ExecutionOutcomeType.DRAINAGE_INSTALLED.value: "Drainage installed",
ExecutionOutcomeType.SUBSOIL_COMPLETED.value: "Subsoil completed",
ExecutionOutcomeType.DEFERRED.value: "No action applied",
ExecutionOutcomeType.RESOURCE_EXHAUSTED.value: "Resource exhausted",
}.get(value, humanize_metric_token(str(value)))
def render_plan(snapshot) -> None:
if not snapshot.mission_plan.ordered_tasks:
st.write("No mission plan available yet.")
return
current_task = snapshot.mission_plan.ordered_tasks[0]
st.write(f"**Next Robot Step:** {humanize_action_name(str(current_task.task_type))}")
if current_task.control_intent is not None:
st.write(f"**Control Intent:** {humanize_control_intent(current_task.control_intent)}")
st.write(f"**Target Field:** {short_zone_label(current_task.target_zone)}")
st.write(f"**Rationale:** {current_task.rationale}")
st.write("**Plan Queue**")
for task in snapshot.mission_plan.ordered_tasks[:6]:
st.write(
f"- {humanize_action_name(str(task.task_type))} on {short_zone_label(task.target_zone)} "
f"(planner score {task.priority:.2f}"
+ (f", {humanize_control_intent(task.control_intent)}" if task.control_intent is not None else "")
+ ")"
)
st.write("**Field Assessments**")
for assessment in snapshot.assessments[:6]:
st.write(
f"- {short_zone_label(assessment.zone_id)}: {priority_band_label(assessment.priority_score)} | "
f"{humanize_action_name(str(assessment.recommended_action))} next | "
f"{humanize_control_state(assessment.control_state)} | "
f"{humanize_constraint(assessment.constraint_level)} | uncertainty {assessment.uncertainty:.2f}"
)
def format_control_state_transition(event) -> str | None:
before = event.control_state_before
after = event.control_state_after
if before is None and after is None:
return None
if before is None:
return humanize_control_state(after)
if after is None:
return humanize_control_state(before)
before_label = humanize_control_state(before)
after_label = humanize_control_state(after)
if before_label == after_label:
return after_label
return f"{before_label} -> {after_label}"
def describe_task(task) -> str:
if task is None:
return "No planner task recorded"
task_text = humanize_action_name(str(task.task_type))
if task.target_zone is not None:
task_text += f" on {short_zone_label(task.target_zone)}"
if task.control_intent is not None:
task_text += f" ({humanize_control_intent(task.control_intent)})"
return task_text
def render_assessment_snapshot(title: str, assessment) -> None:
st.write(f"**{title}**")
if assessment is None:
st.caption("No assessment snapshot recorded for this step.")
return
st.write(
f"{short_zone_label(assessment.zone_id)} | {priority_band_label(assessment.priority_score)} | "
f"{humanize_action_name(str(assessment.recommended_action))} next | "
f"{humanize_control_state(assessment.control_state)} | "
f"{humanize_constraint(assessment.constraint_level)} | uncertainty {assessment.uncertainty:.2f}"
)
if assessment.recommended_control_intent is not None:
st.write(f"Control intent: {humanize_control_intent(assessment.recommended_control_intent)}")
if assessment.supporting_evidence:
st.write("Supporting evidence")
for fact in assessment.supporting_evidence[:5]:
st.write(f"- {fact}")
def render_assessment_after_step(summary: dict[str, object] | None) -> None:
st.write("**Field State After Step**")
if not summary:
st.caption("No post-step field assessment was recorded.")
return
st.write(
f"{short_zone_label(summary.get('zone_id'))} | {normalize_priority_label(summary.get('inspection_priority'))} | "
f"{summary.get('control_state')} | uncertainty {float(summary.get('uncertainty', 0.0)):.2f}"
)
st.write(
f"Visit count: {summary.get('visit_count', 0)}"
+ (f" | Revisits: {summary.get('revisit_count', 0)}" if summary.get("revisit_count") is not None else "")
)
def render_plan_and_timeline(snapshot) -> None:
"""Show the current plan as an inline 'next step' banner, then the full step history below."""
plan = snapshot.mission_plan
if plan.ordered_tasks:
task = plan.ordered_tasks[0]
target_label = short_zone_label(task.target_zone) if task.target_zone else "—"
intent_suffix = f" ({humanize_control_intent(task.control_intent)})" if task.control_intent else ""
queue_preview = ", ".join(
f"{humanize_action_name(str(t.task_type))} {short_zone_label(t.target_zone)}"
for t in plan.ordered_tasks[1:4]
)
st.markdown(
f"**Next → {humanize_action_name(str(task.task_type))} on {target_label}{intent_suffix}** &nbsp; "
f"_{task.rationale}_",
unsafe_allow_html=True,
)
if queue_preview:
st.caption(f"Queue: {queue_preview}")
default_show_timeline = snapshot.observation.mode != SimulatorMode.REAL_FIELD
show_timeline = st.toggle(
"Show step timeline",
value=bool(st.session_state.get(SHOW_TIMELINE_BODY_KEY, default_show_timeline)),
key=SHOW_TIMELINE_BODY_KEY,
)
if not show_timeline:
if snapshot.event_history:
st.caption(f"Step timeline hidden. {len(snapshot.event_history)} recorded steps are available on demand.")
else:
st.caption("No events yet.")
return
if snapshot.event_history:
timeline_window = st.selectbox(
"Timeline Depth",
TIMELINE_WINDOW_OPTIONS,
index=0,
key=TIMELINE_WINDOW_KEY,
format_func=lambda value: "All steps" if value == 0 else f"Latest {value} steps",
)
total_steps = len(snapshot.event_history)
if timeline_window and total_steps > timeline_window:
st.caption(f"Showing the latest {timeline_window} of {total_steps} steps.")
render_step_timeline(snapshot)
def render_step_timeline(snapshot) -> None:
if not snapshot.event_history:
st.write("No events yet.")
return
latest_step_index = snapshot.event_history[-1].step_index
timeline_window = int(st.session_state.get(TIMELINE_WINDOW_KEY, TIMELINE_WINDOW_OPTIONS[0]))
events = snapshot.event_history if timeline_window == 0 else snapshot.event_history[-timeline_window:]
for event in events:
transition = format_control_state_transition(event)
label_parts = [
f"Step {event.step_index}",
humanize_action_name(str(event.action)),
format_event_location(event),
]
if transition is not None:
label_parts.append(transition)
with st.expander(" | ".join(label_parts), expanded=event.step_index == latest_step_index):
summary_left, summary_middle, summary_right = st.columns([2, 1, 1])
with summary_left:
st.write("**Step Overview**")
st.write(f"Action: {humanize_action_name(str(event.action))}")
st.write(f"Location: {format_event_location(event)}")
st.write(f"Outcome: {humanize_execution_outcome(event.execution_outcome)}")
st.write(f"Reward: {event.reward:.2f}")
with summary_middle:
st.write("**Control Outcome**")
if transition is not None:
st.write(f"Control state: {transition}")
if event_control_intent(event) is not None:
st.write(f"Intent: {humanize_control_intent(event_control_intent(event))}")
if event.control_event is not None and event.control_event.rejection_reason is not None:
st.write(f"Rejection reason: `{event.control_event.rejection_reason}`")
if event_terminal_reason(event) is not None:
st.write(f"Terminal reason: {humanize_terminal_reason(event_terminal_reason(event))}")
if event.details.get("reason"):
st.write(f"Step reason: {event.details['reason']}")
if event.planner_context is not None:
chosen = event.planner_context.chosen_task_before_normalization
normalized = event.planner_context.normalized_task
if chosen != normalized:
st.write(f"Normalized task: {describe_task(normalized)}")
with summary_right:
st.write("**Evidence And Context**")
context = event.guarded_context_after
if context is None:
st.caption("No context recorded.")
else:
st.write(f"Field checks: {context.inspection_count}")
st.write(f"Revisits: {context.revisit_count}")
if event.details.get("uncertainty_delta") is not None:
st.write(f"Uncertainty delta: {float(event.details['uncertainty_delta']):+.2f}")
if event.planner_context is not None:
st.write(f"Evidence reused: {'Yes' if event.planner_context.evidence_reused else 'No'}")
if event.planner_context.fixture_identity is not None:
st.write(f"Fixture: `{event.planner_context.fixture_identity}`")
revealed_facts = event.details.get("revealed_facts") or ["No new field information was revealed."]
st.write("**What Happened In This Step**")
for fact in revealed_facts[:6]:
st.write(f"- {fact}")
render_assessment_snapshot(
"Planner View Before Step",
event.planner_context.selected_assessment if event.planner_context else None,
)
if event.planner_context is not None:
st.write("**Why This Decision**")
max_e = snapshot.observation.max_energy
if event.planner_context.battery_before_step is not None and max_e:
pct_before = battery_percentage_int(event.planner_context.battery_before_step, max_e)
pct_after = battery_percentage_int(event.planner_context.estimated_battery_after_step, max_e)
st.write(f"Battery: {pct_before}% → {pct_after}%")
if event.planner_context.return_distance_before_step is not None:
st.write(f"Hops to base: {event.planner_context.return_distance_before_step}{event.planner_context.return_distance_after_step}")
all_candidates = list(event.planner_context.candidate_summaries) + list(event.planner_context.adjacent_candidate_summaries)
if all_candidates:
st.write("Other options considered")
for summary in all_candidates:
target_label = short_zone_label(summary.target_zone) if summary.target_zone else "—"
if summary.rejected_for_battery_infeasibility:
note = " [battery-infeasible]"
elif summary.rejection_reason:
note = f" [{summary.rejection_reason}]"
else:
note = ""
st.write(f"- {humanize_action_name(str(summary.task_type))} {target_label} — utility {summary.net_utility:.2f}{note}")
render_assessment_after_step(event.details.get("assessment_summary_after_step"))
if __name__ == "__main__":
main()