from __future__ import annotations import base64 import io import json import math import os import time from dataclasses import dataclass, replace from datetime import datetime, timezone from pathlib import Path from typing import Callable from urllib.parse import quote import numpy as np import pandas as pd import plotly.graph_objects as go import streamlit as st import streamlit.components.v1 as components from autofarm.adapters.datasets import DatasetCatalog from autofarm.contracts import ( ActionType, ControlState, EpisodeOutcomeStatus, ExecutionOutcomeType, MissionPlan, ResolutionIntent, SimulatorMode, TerminalReason, ) from autofarm.debug_profile import ( ACTIVE_DEBUG_SCENARIO, ACTIVE_DEBUG_STRATEGY, visible_riddle_scenarios, visible_strategy_names, ) from autofarm.evaluation.baselines import HybridStrategy, build_strategies from autofarm.paths import REPO_ROOT from autofarm.urgency import ( MISSING_YIELD_COLOR, REAL_FIELD_CAUTION_URGENCY_THRESHOLD, REAL_FIELD_CRITICAL_URGENCY_THRESHOLD, ) from autofarm.planning.utility import action_type_for_field_recommendation from intervention import render_intervention_view, render_intervention_sim_view from intervention.simulation_view import write_sim_input_csvs from intervention.constants import ( INTERVENTION_DATA_KEY, INTERVENTION_PREDICTED_KEY, INTERVENTION_SIM_ERROR_KEY, INTERVENTION_SIM_MAP_PATH, INTERVENTION_SIM_VIEW_KEY, INTERVENTION_VIEW_KEY, ) from autofarm.sim.engine import ( SimulatorEnvironment, build_environment, build_interactive_environment, build_real_field_environment, shortest_path_zone_ids, ) from autofarm.sim.playback import ( PLAYBACK_MODE_AUTO, PLAYBACK_MODE_MANUAL, PLAYBACK_SPEED_1HZ, PLAYBACK_SPEED_4HZ, PLAYBACK_SPEED_8HZ, default_playback_state, playback_interval_ms, playback_speed_options, should_process_tick, ) from autofarm.sim.session_persistence import ( PersistenceCoordinator, SessionPersistenceResult, create_session_id, persist_interactive_session, ) from autofarm.sim.scenarios import RIDDLE_SAFETY_MAX_STEPS, available_riddles, interactive_challenge_templates ENV_KEY = "env" ENV_GENERATION_KEY = "env_generation" PERSISTENCE_GENERATION_KEY = "persistence_generation" SESSION_ID_KEY = "simulator_session_id" SESSION_PERSISTENCE_KEY = "simulator_session_persistence" PERSISTENCE_COORDINATOR_KEY = "simulator_persistence_coordinator" PERSISTENCE_WARNING_KEY = "simulator_persistence_warning" LAST_BOARD_EVENT_KEY = "interactive_last_board_event" SELECTED_ZONE_KEY = "selected_zone_id" PENDING_CHALLENGE_TEMPLATE_KEY = "interactive_pending_challenge_template" PENDING_CHALLENGE_RADIUS_KEY = "interactive_pending_challenge_radius" PLACEMENT_FLASH_KEY = "interactive_placement_flash" FIGURE_CACHE_KEY = "zone_grid_figure_cache" STATIC_MAP_CACHE_KEY = "zone_grid_static_cache" TIMELINE_WINDOW_KEY = "timeline_window_size" SHOW_TIMELINE_BODY_KEY = "show_timeline_body" SHOW_PLANNED_TRAJECTORY_KEY = "show_planned_trajectory" PLAYBACK_MODE_KEY = "playback_mode" PLAYBACK_RUNNING_KEY = "playback_running" PLAYBACK_SPEED_KEY = "playback_speed" PLAYBACK_LAST_TICK_KEY = "playback_last_processed_tick" PLAYBACK_WARNING_KEY = "playback_warning" PLAYBACK_NEXT_STEP_AT_KEY = "playback_next_step_at" PLAYBACK_TELEMETRY_KEY = "playback_telemetry" PLAYBACK_PENDING_STEP_KEY = "playback_pending_step_telemetry" BOARD_COMPONENT = components.declare_component( "challenge_board", path=str((Path(__file__).parent / "components" / "challenge_board").resolve()), ) AUTO_TICK_COMPONENT = components.declare_component( "auto_tick", path=str((Path(__file__).parent / "components" / "auto_tick").resolve()), ) SCRIPT_EVAL_EXPORT_ROOT = REPO_ROOT / "files_script_eval" FIELD_TILE_STATE_EXPORT_PATH = SCRIPT_EVAL_EXPORT_ROOT / "field_tile_states.json" TILE_ACTION_LOG_EXPORT_PATH = SCRIPT_EVAL_EXPORT_ROOT / "tile_action_log.json" POSITIVE_SIGNAL_ANOMALIES = {"weed_presence"} SNAPSHOT_WRITE_INTERVAL = 5 TIMELINE_WINDOW_OPTIONS = [25, 50, 100, 250, 0] PLANNED_TRAJECTORY_MOVE_STEP_BUDGET = 10 RECENT_TASK_ICON_TTL_STEPS = 5 PLAYBACK_TELEMETRY_WINDOW = 20 SERVICE_ACTIONS = { ActionType.REMOVE_WEEDS, ActionType.APPLY_FERTILIZER, ActionType.APPLY_WATER, ActionType.INSTALL_DRAINAGE, ActionType.SUBSOIL, } @dataclass(frozen=True) class RealFieldStaticMapPayload: figure: go.Figure zone_ids: tuple[str, ...] xs: tuple[int, ...] ys: tuple[int, ...] x_values: tuple[int, ...] y_values: tuple[int, ...] grid_size: int cell_marker_size: int cell_text_size: int marker_ring_size: int step_marker_size: int home_base_x: float home_base_y: float entry_x: float entry_y: float tile_world_x0: float tile_world_x1: float tile_world_y0: float tile_world_y1: float @dataclass(frozen=True) class RealFieldDynamicMapPayload: texts: list[str] colors: list[str] line_colors: list[str] line_widths: list[int] hover_texts: list[str] previous_zone_id: str | None current_zone_id: str | None event_marker_xs: list[float] event_marker_ys: list[float] event_marker_texts: list[str] event_marker_colors: list[str] event_marker_hovers: list[str] planned_trajectory: "PlannedTrajectoryPayload | None" @dataclass(frozen=True) class PlannedTrajectoryPayload: route_zone_ids: tuple[str, ...] starts_from_home: bool route_xs: list[float] route_ys: list[float] waypoint_xs: list[float] waypoint_ys: list[float] task_marker_xs: list[float] task_marker_ys: list[float] task_marker_texts: list[str] task_marker_sizes: list[int] task_marker_colors: list[str] task_marker_hovers: list[str] @dataclass(frozen=True) class PlannedHorizonAction: order: int zone_id: str task_type: str rationale: str @dataclass(frozen=True) class PlannedHorizonPayload: route_zone_ids: tuple[str, ...] starts_from_home: bool actions: tuple[PlannedHorizonAction, ...] def enum_value(value): return getattr(value, "value", value) def finding_bucket(anomaly_type: str) -> str: if anomaly_type in POSITIVE_SIGNAL_ANOMALIES: return "Positive signal findings" return "Other findings" def render_finding_groups(findings, *, title_when_empty: str) -> None: if not findings: st.write(title_when_empty) return grouped: dict[str, list[str]] = {} for finding in findings: anomaly_type = str(finding.anomaly_type) grouped.setdefault(finding_bucket(anomaly_type), []).append( f"`{anomaly_type}` severity={finding.severity} confidence={finding.confidence:.2f}" ) for section_title in ( "Positive signal findings", "Other findings", ): rows = grouped.get(section_title) if not rows: continue st.write(section_title) for row in rows: st.write(f"- {row}") def event_control_intent(event): if event.control_event is None: return None return event.control_event.requested_intent def event_terminal_reason(event): if event.guarded_context_after is None: return None return event.guarded_context_after.terminal_reason def preferred_option(options: list[str], preferred: str) -> str: if preferred in options: return preferred return options[0] def set_manual_primary_axis(axis: str) -> None: st.session_state["interactive_manual_primary_axis"] = axis def arm_pending_challenge(template_id: str, radius: int) -> None: st.session_state[PENDING_CHALLENGE_TEMPLATE_KEY] = template_id st.session_state[PENDING_CHALLENGE_RADIUS_KEY] = int(radius) def clear_pending_challenge() -> None: st.session_state.pop(PENDING_CHALLENGE_TEMPLATE_KEY, None) st.session_state.pop(PENDING_CHALLENGE_RADIUS_KEY, None) def pending_challenge_request() -> str | None: template_id = st.session_state.get(PENDING_CHALLENGE_TEMPLATE_KEY) if not isinstance(template_id, str) or not template_id: return None return template_id def pending_challenge_radius() -> int: radius = st.session_state.get(PENDING_CHALLENGE_RADIUS_KEY, 1) try: return max(1, min(3, int(radius))) except (TypeError, ValueError): return 1 def set_placement_flash_message(success: bool, message: str) -> None: st.session_state[PLACEMENT_FLASH_KEY] = {"success": success, "message": message} def pop_placement_flash_message() -> tuple[bool, str] | None: payload = st.session_state.pop(PLACEMENT_FLASH_KEY, None) if not isinstance(payload, dict): return None return bool(payload.get("success")), str(payload.get("message") or "") def bump_env_generation() -> None: st.session_state[ENV_GENERATION_KEY] = int(st.session_state.get(ENV_GENERATION_KEY, 0)) + 1 def bump_persistence_generation() -> None: st.session_state[PERSISTENCE_GENERATION_KEY] = int(st.session_state.get(PERSISTENCE_GENERATION_KEY, 0)) + 1 def is_persistent_world_mode_value(mode_name: str) -> bool: return mode_name in {SimulatorMode.INTERACTIVE.value, SimulatorMode.REAL_FIELD.value} def is_persistent_world_mode(mode: SimulatorMode) -> bool: return mode in {SimulatorMode.INTERACTIVE, SimulatorMode.REAL_FIELD} def ensure_playback_state_defaults() -> None: defaults = default_playback_state() st.session_state.setdefault(PLAYBACK_MODE_KEY, defaults["mode"]) st.session_state.setdefault(PLAYBACK_RUNNING_KEY, defaults["running"]) st.session_state.setdefault(PLAYBACK_SPEED_KEY, defaults["speed"]) st.session_state.setdefault(PLAYBACK_LAST_TICK_KEY, defaults["last_processed_tick"]) st.session_state.setdefault(PLAYBACK_WARNING_KEY, None) st.session_state.setdefault(PLAYBACK_NEXT_STEP_AT_KEY, None) st.session_state.setdefault(PLAYBACK_TELEMETRY_KEY, default_playback_telemetry()) st.session_state.setdefault(PLAYBACK_PENDING_STEP_KEY, None) def default_playback_telemetry() -> dict[str, object]: return { "speed": PLAYBACK_SPEED_1HZ, "started_at": None, "step_count": 0, "last_step_completed_at": None, "recent_step_intervals_ms": [], "last_engine_step_ms": None, "recent_engine_step_ms": [], "last_total_app_step_ms": None, "recent_total_app_step_ms": [], "last_render_ms": None, "recent_render_ms": [], "last_persistence_ms": None, "recent_persistence_ms": [], "last_export_ms": None, "recent_export_ms": [], "last_strategy_plan_ms": None, "recent_strategy_plan_ms": [], "last_decision_trace_ms": None, "recent_decision_trace_ms": [], } def append_recent_metric(series: object, value: float) -> list[float]: values = [float(item) for item in series] if isinstance(series, list) else [] values.append(round(float(value), 3)) return values[-PLAYBACK_TELEMETRY_WINDOW:] def reset_playback_telemetry(speed: str) -> None: telemetry = default_playback_telemetry() telemetry["speed"] = speed st.session_state[PLAYBACK_TELEMETRY_KEY] = telemetry st.session_state[PLAYBACK_PENDING_STEP_KEY] = None def maybe_reset_playback_telemetry_for_speed_change(speed: str) -> None: telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY) if not isinstance(telemetry, dict) or telemetry.get("speed") == speed: return reset_playback_telemetry(speed) if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + ( playback_interval_ms(speed) / 1000.0 ) def begin_playback_step_telemetry( *, speed: str, engine_step_ms: float, step_started_at: float, step_perf: dict[str, float] | None, ) -> None: st.session_state[PLAYBACK_PENDING_STEP_KEY] = { "speed": speed, "engine_step_ms": round(engine_step_ms, 3), "step_started_at": step_started_at, "step_perf": dict(step_perf or {}), } def finalize_playback_step_telemetry( *, speed: str, completed_at: float, total_app_step_ms: float, render_ms: float, persistence_ms: float, export_ms: float, ) -> None: pending = st.session_state.get(PLAYBACK_PENDING_STEP_KEY) if not isinstance(pending, dict): return telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY) if not isinstance(telemetry, dict): telemetry = default_playback_telemetry() last_step_completed_at = telemetry.get("last_step_completed_at") recent_intervals = telemetry.get("recent_step_intervals_ms") if isinstance(last_step_completed_at, (int, float)): recent_intervals = append_recent_metric( recent_intervals, (completed_at - float(last_step_completed_at)) * 1000.0, ) step_perf = pending.get("step_perf") strategy_plan_ms = None decision_trace_ms = None if isinstance(step_perf, dict): if isinstance(step_perf.get("strategy_plan_ms"), (int, float)): strategy_plan_ms = float(step_perf["strategy_plan_ms"]) if isinstance(step_perf.get("decision_trace_ms"), (int, float)): decision_trace_ms = float(step_perf["decision_trace_ms"]) telemetry.update( { "speed": speed, "started_at": telemetry.get("started_at") or completed_at, "step_count": int(telemetry.get("step_count", 0)) + 1, "last_step_completed_at": completed_at, "last_engine_step_ms": round(float(pending.get("engine_step_ms", 0.0)), 3), "recent_engine_step_ms": append_recent_metric( telemetry.get("recent_engine_step_ms"), float(pending.get("engine_step_ms", 0.0)), ), "last_total_app_step_ms": round(total_app_step_ms, 3), "recent_total_app_step_ms": append_recent_metric( telemetry.get("recent_total_app_step_ms"), total_app_step_ms, ), "last_render_ms": round(render_ms, 3), "recent_render_ms": append_recent_metric( telemetry.get("recent_render_ms"), render_ms, ), "last_persistence_ms": round(persistence_ms, 3), "recent_persistence_ms": append_recent_metric( telemetry.get("recent_persistence_ms"), persistence_ms, ), "last_export_ms": round(export_ms, 3), "recent_export_ms": append_recent_metric( telemetry.get("recent_export_ms"), export_ms, ), "recent_step_intervals_ms": recent_intervals if isinstance(recent_intervals, list) else [], } ) if strategy_plan_ms is not None: telemetry["last_strategy_plan_ms"] = round(strategy_plan_ms, 3) telemetry["recent_strategy_plan_ms"] = append_recent_metric( telemetry.get("recent_strategy_plan_ms"), strategy_plan_ms, ) if decision_trace_ms is not None: telemetry["last_decision_trace_ms"] = round(decision_trace_ms, 3) telemetry["recent_decision_trace_ms"] = append_recent_metric( telemetry.get("recent_decision_trace_ms"), decision_trace_ms, ) st.session_state[PLAYBACK_TELEMETRY_KEY] = telemetry st.session_state[PLAYBACK_PENDING_STEP_KEY] = None def average_metric(series: object) -> float | None: if not isinstance(series, list) or not series: return None return round(sum(float(item) for item in series) / len(series), 3) def p95_metric(series: object) -> float | None: if not isinstance(series, list) or not series: return None values = sorted(float(item) for item in series) index = min(len(values) - 1, math.ceil(0.95 * len(values)) - 1) return round(values[index], 3) def max_metric(series: object) -> float | None: if not isinstance(series, list) or not series: return None return round(max(float(item) for item in series), 3) def metric_summary(*, telemetry: dict[str, object], name: str) -> dict[str, float | None]: last_key = f"last_{name}" recent_key = f"recent_{name}" last_value = telemetry.get(last_key) return { "last": round(float(last_value), 3) if isinstance(last_value, (int, float)) else None, "avg": average_metric(telemetry.get(recent_key)), "p95": p95_metric(telemetry.get(recent_key)), "max": max_metric(telemetry.get(recent_key)), } def playback_telemetry_snapshot() -> dict[str, object]: telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY) if not isinstance(telemetry, dict): return { "step_count": 0, "achieved_hz": None, "engine_step_ms": {"last": None, "avg": None, "p95": None, "max": None}, "total_app_step_ms": {"last": None, "avg": None, "p95": None, "max": None}, "render_ms": {"last": None, "avg": None, "p95": None, "max": None}, "persistence_ms": {"last": None, "avg": None, "p95": None, "max": None}, "export_ms": {"last": None, "avg": None, "p95": None, "max": None}, "strategy_plan_ms": {"last": None, "avg": None, "p95": None, "max": None}, "decision_trace_ms": {"last": None, "avg": None, "p95": None, "max": None}, } avg_interval_ms = average_metric(telemetry.get("recent_step_intervals_ms")) achieved_hz = None if avg_interval_ms in {None, 0.0} else round(1000.0 / avg_interval_ms, 3) return { "step_count": int(telemetry.get("step_count", 0)), "achieved_hz": achieved_hz, "engine_step_ms": metric_summary(telemetry=telemetry, name="engine_step_ms"), "total_app_step_ms": metric_summary(telemetry=telemetry, name="total_app_step_ms"), "render_ms": metric_summary(telemetry=telemetry, name="render_ms"), "persistence_ms": metric_summary(telemetry=telemetry, name="persistence_ms"), "export_ms": metric_summary(telemetry=telemetry, name="export_ms"), "strategy_plan_ms": metric_summary(telemetry=telemetry, name="strategy_plan_ms"), "decision_trace_ms": metric_summary(telemetry=telemetry, name="decision_trace_ms"), } def render_playback_telemetry(*, speed: str, running: bool, mode: SimulatorMode) -> None: target_hz = round(1000.0 / playback_interval_ms(speed), 2) telemetry = playback_telemetry_snapshot() achieved_hz = telemetry.get("achieved_hz") engine_step_ms = telemetry["engine_step_ms"] total_app_step_ms = telemetry["total_app_step_ms"] if running: if achieved_hz is None: st.caption(f"Target {target_hz:.2f} Hz. Telemetry will populate after the next auto-step.") else: parts = [ f"Target {target_hz:.2f} Hz", f"Achieved {float(achieved_hz):.2f} Hz", ( f"Engine {float(engine_step_ms['last'] or 0.0):.1f} / " f"{float(engine_step_ms['avg'] or 0.0):.1f} / " f"{float(engine_step_ms['p95'] or 0.0):.1f} ms" ), ] if mode == SimulatorMode.REAL_FIELD: parts.append( f"App {float(total_app_step_ms['last'] or 0.0):.1f} / " f"{float(total_app_step_ms['avg'] or 0.0):.1f} / " f"{float(total_app_step_ms['p95'] or 0.0):.1f} ms" ) parts.append( f"Render {float(telemetry['render_ms']['avg'] or 0.0):.1f} ms" ) parts.append( f"Persist {float(telemetry['persistence_ms']['avg'] or 0.0):.1f} ms" ) parts.append( f"Export {float(telemetry['export_ms']['avg'] or 0.0):.1f} ms" ) st.caption(" | ".join(parts)) return if int(telemetry.get("step_count", 0)) > 0: achieved_text = "n/a" if achieved_hz is None else f"{float(achieved_hz):.2f} Hz" parts = [ f"Last run {achieved_text} achieved", f"Engine {float(engine_step_ms['last'] or 0.0):.1f} / {float(engine_step_ms['avg'] or 0.0):.1f} ms", ] if mode == SimulatorMode.REAL_FIELD: parts.append( f"App {float(total_app_step_ms['last'] or 0.0):.1f} / {float(total_app_step_ms['avg'] or 0.0):.1f} ms" ) st.caption(" | ".join(parts)) def build_runtime_performance_metrics(env: SimulatorEnvironment) -> dict[str, object]: metrics = env.metrics() if env.mode != SimulatorMode.REAL_FIELD: return metrics metrics = dict(metrics) metrics["runtime_performance"] = { "playback": playback_telemetry_snapshot(), "last_step_engine_phases_ms": env.last_step_performance(), } return metrics def pause_playback(*, clear_last_tick: bool) -> None: st.session_state[PLAYBACK_RUNNING_KEY] = False if clear_last_tick: st.session_state[PLAYBACK_LAST_TICK_KEY] = None st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = None def start_playback() -> None: speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)) st.session_state[PLAYBACK_RUNNING_KEY] = True st.session_state[PLAYBACK_WARNING_KEY] = None reset_playback_telemetry(speed) st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + ( playback_interval_ms(speed) / 1000.0 ) def execute_simulator_step(env: SimulatorEnvironment, strategy) -> tuple[bool, float, float, dict[str, float]]: before_step_index = int(env.step_index) before_event_count = len(env.event_history) started_at = time.perf_counter() env.step(strategy) # type: ignore[arg-type] duration_ms = (time.perf_counter() - started_at) * 1000.0 after_step_index = int(env.step_index) after_event_count = len(env.event_history) return ( after_step_index > before_step_index and after_event_count > before_event_count, started_at, duration_ms, env.last_step_performance(), ) def maybe_run_fragment_auto_step(env: SimulatorEnvironment, strategy) -> bool: if str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) != PLAYBACK_MODE_AUTO: return False if not bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) or env.done: return False interval_seconds = playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))) / 1000.0 next_step_at = st.session_state.get(PLAYBACK_NEXT_STEP_AT_KEY) now = time.monotonic() if next_step_at is None: st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = now + interval_seconds return False if now < float(next_step_at): return False advanced, step_started_at, step_duration_ms, step_perf = execute_simulator_step(env, strategy) if advanced: begin_playback_step_telemetry( speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)), engine_step_ms=step_duration_ms, step_started_at=step_started_at, step_perf=step_perf, ) st.session_state[PLAYBACK_WARNING_KEY] = None st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + interval_seconds else: pause_playback(clear_last_tick=True) st.session_state[PLAYBACK_WARNING_KEY] = ( "Playback paused because the last auto-step did not advance the simulator state." ) return True def render_auto_tick_component(*, enabled: bool, interval_ms: int, session_token: str): return AUTO_TICK_COMPONENT( enabled=enabled, interval_ms=interval_ms, session_token=session_token, key="simulator_auto_tick", ) def fragment_runner(run_every_seconds: float): if hasattr(st, "fragment"): return st.fragment(run_every=run_every_seconds) if hasattr(st, "experimental_fragment"): return st.experimental_fragment(run_every=run_every_seconds) return None def field_extent(zone_states) -> tuple[int, int]: return ( max((zone.row or 0) for zone in zone_states), max((zone.col or 0) for zone in zone_states), ) def is_rectangular_field(zone_states) -> bool: rows, cols = field_extent(zone_states) return len(zone_states) == rows * cols def field_layout_metric(env: SimulatorEnvironment) -> tuple[str, str]: rows, cols = field_extent(env.zone_states) if env.mode == SimulatorMode.REAL_FIELD: return "Field Extent", f"{rows} x {cols}" return "Field Size", f"{rows} x {cols}" def _prewarm_simulator_resources() -> None: """Warm st.cache_resource caches so the simulator page loads instantly.""" try: get_strategies() except Exception: pass try: if ENV_KEY not in st.session_state: catalog = DatasetCatalog(ground_weeds=()) st.session_state[ENV_KEY] = build_real_field_environment( catalog=catalog, ) except Exception: pass def render_sidebar_heading(label: str, *, size_rem: float) -> None: st.markdown( ( f"
" f"{label}
" ), unsafe_allow_html=True, ) def main() -> None: st.set_page_config(page_title="AutoFarm Simulator", layout="wide") st.markdown( "", unsafe_allow_html=True, ) st.title("AutoFarm Simulator") st.session_state.setdefault(INTERVENTION_VIEW_KEY, True) st.session_state.setdefault(INTERVENTION_PREDICTED_KEY, False) st.session_state.setdefault(INTERVENTION_SIM_VIEW_KEY, False) st.session_state.setdefault(INTERVENTION_SIM_ERROR_KEY, None) if st.session_state[INTERVENTION_VIEW_KEY]: render_intervention_view(on_continue=_prewarm_simulator_resources) return if st.session_state[INTERVENTION_SIM_VIEW_KEY]: def _get_snapshot(): env = st.session_state.get(ENV_KEY) return env.snapshot() if env is not None else None render_intervention_sim_view(get_snapshot=_get_snapshot) return ensure_playback_state_defaults() catalog = DatasetCatalog(ground_weeds=()) strategies = get_strategies() riddle_map = available_riddles() challenge_palette = interactive_challenge_templates() requested_mode = os.getenv("AUTOFARM_SIM_MODE", SimulatorMode.REAL_FIELD.value) default_strategy = os.getenv("AUTOFARM_SIM_STRATEGY", ACTIVE_DEBUG_STRATEGY) default_scenario = os.getenv("AUTOFARM_SIM_SCENARIO", ACTIVE_DEBUG_SCENARIO) mode_name = SimulatorMode.RIDDLE.value if requested_mode == SimulatorMode.RIDDLE.value else SimulatorMode.REAL_FIELD.value show_parked_lanes = False playback_toggle_clicked = False with st.sidebar: render_sidebar_heading("Simulator Controls", size_rem=1.32) if mode_name == SimulatorMode.RIDDLE.value: grid_size = 5 else: grid_size = 0 scenario_name = None strategy_name = ACTIVE_DEBUG_STRATEGY if mode_name == SimulatorMode.RIDDLE.value: visible_scenarios = visible_riddle_scenarios(list(riddle_map), show_parked=show_parked_lanes) scenario_choice = preferred_option(visible_scenarios, default_scenario) scenario_name = st.selectbox( "Riddle", visible_scenarios, index=visible_scenarios.index(scenario_choice), ) visible_strategies = visible_strategy_names(list(strategies), show_parked=show_parked_lanes) if len(visible_strategies) == 1: strategy_name = visible_strategies[0] else: preferred_strategy = preferred_option(visible_strategies, default_strategy) strategy_name = st.selectbox( "Strategy", visible_strategies, index=visible_strategies.index(preferred_strategy), ) st.divider() render_sidebar_heading("Playback", size_rem=0.98) playback_mode = st.radio( "Playback Mode", [PLAYBACK_MODE_MANUAL, PLAYBACK_MODE_AUTO], key=PLAYBACK_MODE_KEY, horizontal=True, ) playback_speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)) if playback_mode == PLAYBACK_MODE_AUTO: available_playback_speeds = playback_speed_options(enable_8hz=True) playback_speed = st.radio( "Auto Speed", available_playback_speeds, key=PLAYBACK_SPEED_KEY, horizontal=True, ) maybe_reset_playback_telemetry_for_speed_change(playback_speed) playback_running = bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) toggle_label = "Pause Playback" if playback_running else "Start Playback" playback_toggle_clicked = st.button(toggle_label, width="stretch") render_playback_telemetry( speed=playback_speed, running=playback_running, mode=SimulatorMode(mode_name), ) else: playback_speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)) if playback_mode == PLAYBACK_MODE_MANUAL and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): pause_playback(clear_last_tick=False) if playback_toggle_clicked: if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): pause_playback(clear_last_tick=False) else: start_playback() auto_running = ( str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) == PLAYBACK_MODE_AUTO and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) ) render_sidebar_heading("Visualization", size_rem=0.98) show_planned_trajectory = st.toggle( "Show planned trajectory", value=bool(st.session_state.get(SHOW_PLANNED_TRAJECTORY_KEY, True)), key=SHOW_PLANNED_TRAJECTORY_KEY, ) st.divider() reset_label = "Reset Riddle" if mode_name == SimulatorMode.RIDDLE.value else "Reset World" step_label = "Advance One Step" if mode_name == SimulatorMode.RIDDLE.value else "Advance One Step" reset = st.button(reset_label, width="stretch") if mode_name == SimulatorMode.RIDDLE.value: step_col, run_col = st.columns(2) with step_col: step_once = st.button(step_label, width="stretch", disabled=auto_running) with run_col: run_to_outcome = st.button("Run To Outcome", width="stretch", disabled=auto_running) else: step_once = st.button(step_label, width="stretch", disabled=auto_running) run_to_outcome = False st.divider() render_sidebar_heading("Analysis", size_rem=0.98) investigate_clicked = st.button("Intervention Impact", type="primary", width="stretch") if investigate_clicked: st.session_state[INTERVENTION_SIM_VIEW_KEY] = True st.session_state[INTERVENTION_SIM_ERROR_KEY] = None # Remove stale PNG so the pipeline re-runs with current simulator data if INTERVENTION_SIM_MAP_PATH.exists(): INTERVENTION_SIM_MAP_PATH.unlink() st.rerun() if reset: pause_playback(clear_last_tick=True) st.session_state[PLAYBACK_WARNING_KEY] = None reset_simulator_run_state() session_id = get_or_create_session_id() env = get_or_reset_environment( mode_name=mode_name, scenario_name=scenario_name, reset=reset, catalog=catalog, grid_size=grid_size, ) strategy = strategies[strategy_name] persistence_duration_ms = 0.0 export_duration_ms = 0.0 render_duration_ms = 0.0 auto_tick_payload = None run_every_seconds = playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))) / 1000.0 playback_fragment = fragment_runner(run_every_seconds) if playback_fragment is not None: @playback_fragment def auto_step_fragment() -> None: st.empty() if maybe_run_fragment_auto_step(env, strategy): bump_env_generation() st.rerun() auto_step_fragment() else: auto_tick_payload = render_auto_tick_component( enabled=( str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) == PLAYBACK_MODE_AUTO and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) and not env.done ), interval_ms=playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))), session_token=( f"{session_id}:" f"{int(st.session_state.get(ENV_GENERATION_KEY, 0))}:" f"{str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))}" ), ) step_executed = reset or playback_toggle_clicked if not env.current_assessments: env.current_assessments, env.current_plan = strategy.plan(env.planning_observation()) # type: ignore[arg-type] env.invalidate_cached_views() placement_message = pop_placement_flash_message() if step_once and not env.done: st.session_state[PLAYBACK_WARNING_KEY] = None step_advanced, _, _, _ = execute_simulator_step(env, strategy) if step_advanced: bump_env_generation() step_executed = True if run_to_outcome and env.mode == SimulatorMode.RIDDLE and not env.done: stepped = False for _ in range(max(1, RIDDLE_SAFETY_MAX_STEPS + 2)): step_advanced, _, _, _ = execute_simulator_step(env, strategy) if not step_advanced: break stepped = True if env.done: break if stepped: bump_env_generation() step_executed = True if playback_fragment is None: should_tick, tick_id = should_process_tick( mode=str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)), running=bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)), env_done=env.done, tick_payload=auto_tick_payload, last_processed_tick=st.session_state.get(PLAYBACK_LAST_TICK_KEY), ) if should_tick and not step_executed: step_advanced, step_started_at, step_duration_ms, step_perf = execute_simulator_step(env, strategy) if step_advanced: begin_playback_step_telemetry( speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)), engine_step_ms=step_duration_ms, step_started_at=step_started_at, step_perf=step_perf, ) st.session_state[PLAYBACK_LAST_TICK_KEY] = tick_id st.session_state[PLAYBACK_WARNING_KEY] = None bump_env_generation() step_executed = True else: pause_playback(clear_last_tick=True) st.session_state[PLAYBACK_WARNING_KEY] = ( "Playback paused because the last auto-step did not advance the simulator state." ) if env.done and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): pause_playback(clear_last_tick=True) snapshot = env.snapshot() persistence_started_at = time.perf_counter() persistence = ensure_session_persistence( session_id=session_id, scenario_name=scenario_name or env.name, snapshot=snapshot, metrics_factory=lambda: build_runtime_performance_metrics(env), ) persistence_duration_ms = (time.perf_counter() - persistence_started_at) * 1000.0 export_started_at = time.perf_counter() write_script_eval_exports(env, snapshot, session_id=session_id) export_duration_ms = (time.perf_counter() - export_started_at) * 1000.0 current_task = snapshot.mission_plan.ordered_tasks[0] if snapshot.mission_plan.ordered_tasks else None # Use fixed-slot placeholders so these optional banners never shift the # cursor position of elements below (which would remount the chart). placement_slot = st.empty() if placement_message is not None: if placement_message[0]: placement_slot.success(placement_message[1]) else: placement_slot.warning(placement_message[1]) playback_warning = st.session_state.get(PLAYBACK_WARNING_KEY) playback_slot = st.empty() if playback_warning: playback_slot.warning(str(playback_warning)) persistence_warning = st.session_state.get(PERSISTENCE_WARNING_KEY) persistence_slot = st.empty() if persistence_warning: persistence_slot.warning(str(persistence_warning)) if env.mode == SimulatorMode.RIDDLE and scenario_name is not None: render_primary_objective(riddle_map[scenario_name]) render_started_at = time.perf_counter() render_outcome_banner(snapshot, env) selected_event = snapshot.event_history[-1] if snapshot.event_history else None if env.mode == SimulatorMode.RIDDLE: render_riddle_layout( env, snapshot, riddle_map[scenario_name].target_zone_id, False, riddle_map[scenario_name], selected_event, ) else: render_interactive_layout(env, snapshot, selected_event, strategy, challenge_palette) st.subheader("Plan & Steps") render_plan_and_timeline(snapshot) render_duration_ms = (time.perf_counter() - render_started_at) * 1000.0 pending_auto_step = st.session_state.get(PLAYBACK_PENDING_STEP_KEY) if isinstance(pending_auto_step, dict): total_app_step_ms = (time.perf_counter() - float(pending_auto_step.get("step_started_at", 0.0))) * 1000.0 finalize_playback_step_telemetry( speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)), completed_at=time.perf_counter(), total_app_step_ms=total_app_step_ms, render_ms=render_duration_ms, persistence_ms=persistence_duration_ms, export_ms=export_duration_ms, ) @st.cache_resource(show_spinner=False) def get_strategies(): return build_strategies() def get_or_reset_environment( *, mode_name: str, scenario_name: str | None, reset: bool, catalog, grid_size: int, ) -> SimulatorEnvironment: active_mode = st.session_state.get("mode_name") active_scenario = st.session_state.get("scenario_name") active_grid_size = st.session_state.get("grid_size") if ( reset or ENV_KEY not in st.session_state or active_mode != mode_name or active_scenario != scenario_name or active_grid_size != grid_size ): pause_playback(clear_last_tick=True) st.session_state[PLAYBACK_WARNING_KEY] = None flush_persistence_coordinator() if mode_name == SimulatorMode.INTERACTIVE.value: st.session_state[ENV_KEY] = build_interactive_environment( catalog=catalog, grid_size=grid_size, ) elif mode_name == SimulatorMode.REAL_FIELD.value: st.session_state[ENV_KEY] = build_real_field_environment( catalog=catalog, ) else: st.session_state[ENV_KEY] = build_environment( scenario_name or list(available_riddles())[0], catalog=catalog, grid_size=grid_size, ) bump_env_generation() bump_persistence_generation() st.session_state["mode_name"] = mode_name st.session_state["scenario_name"] = scenario_name st.session_state["grid_size"] = grid_size st.session_state[LAST_BOARD_EVENT_KEY] = None clear_pending_challenge() st.session_state.pop(PLACEMENT_FLASH_KEY, None) return st.session_state[ENV_KEY] def get_or_create_session_id() -> str: if SESSION_ID_KEY not in st.session_state: st.session_state[SESSION_ID_KEY] = create_session_id() return st.session_state[SESSION_ID_KEY] def reset_simulator_run_state() -> None: coordinator = st.session_state.pop(PERSISTENCE_COORDINATOR_KEY, None) if isinstance(coordinator, PersistenceCoordinator): coordinator.close() st.session_state[SESSION_ID_KEY] = create_session_id() st.session_state[ENV_GENERATION_KEY] = 0 st.session_state[PERSISTENCE_GENERATION_KEY] = 0 st.session_state.pop(SESSION_PERSISTENCE_KEY, None) st.session_state.pop(SELECTED_ZONE_KEY, None) st.session_state.pop(LAST_BOARD_EVENT_KEY, None) clear_pending_challenge() st.session_state.pop(PLACEMENT_FLASH_KEY, None) st.session_state[PERSISTENCE_WARNING_KEY] = None def get_or_create_persistence_coordinator() -> PersistenceCoordinator: coordinator = st.session_state.get(PERSISTENCE_COORDINATOR_KEY) if isinstance(coordinator, PersistenceCoordinator): return coordinator coordinator = PersistenceCoordinator() st.session_state[PERSISTENCE_COORDINATOR_KEY] = coordinator return coordinator def flush_persistence_coordinator() -> int: coordinator = st.session_state.get(PERSISTENCE_COORDINATOR_KEY) if not isinstance(coordinator, PersistenceCoordinator): return 0 snapshot_event_count = coordinator.flush() failure_message = coordinator.failure_message() if failure_message is not None: st.session_state[PERSISTENCE_WARNING_KEY] = ( "Background snapshot persistence was disabled after a write failure. " f"Falling back to synchronous snapshot writes. Details: {failure_message}" ) return snapshot_event_count def ensure_session_persistence( *, session_id: str, scenario_name: str, snapshot, metrics_factory: Callable[[], dict[str, object]] | None = None, ) -> SessionPersistenceResult: persisted = st.session_state.get(SESSION_PERSISTENCE_KEY) generation = int(st.session_state.get(PERSISTENCE_GENERATION_KEY, 0)) event_count = len(snapshot.event_history) coordinator = ( get_or_create_persistence_coordinator() if snapshot.observation.mode == SimulatorMode.REAL_FIELD else None ) latest_snapshot_event_count = max( persisted.snapshot_event_count if persisted is not None else 0, coordinator.latest_snapshot_event_count() if coordinator is not None else 0, ) if persisted is not None and latest_snapshot_event_count != persisted.snapshot_event_count: persisted = replace(persisted, snapshot_event_count=latest_snapshot_event_count) st.session_state[SESSION_PERSISTENCE_KEY] = persisted if ( persisted is not None and persisted.generation == generation and persisted.event_count == event_count ): return persisted should_write_snapshot = ( persisted is None or persisted.generation != generation or snapshot.done or event_count <= 2 or (event_count - latest_snapshot_event_count) >= SNAPSHOT_WRITE_INTERVAL ) write_snapshot_sync = should_write_snapshot and ( coordinator is None or snapshot.done or coordinator.failure_message() is not None ) if write_snapshot_sync and coordinator is not None: latest_snapshot_event_count = max(latest_snapshot_event_count, flush_persistence_coordinator()) metrics = metrics_factory() if should_write_snapshot and metrics_factory is not None else None result = persist_interactive_session( session_id=session_id, generation=generation, scenario_name=scenario_name, snapshot=snapshot, metrics=metrics, last_logged_generation=persisted.generation if persisted is not None else -1, last_logged_event_count=persisted.event_count if persisted is not None else 0, write_snapshot=write_snapshot_sync, last_snapshot_event_count=latest_snapshot_event_count, ) if should_write_snapshot and not write_snapshot_sync and coordinator is not None: queued = coordinator.enqueue_snapshot( session_id=session_id, generation=generation, scenario_name=scenario_name, snapshot=snapshot, metrics=metrics or {}, last_snapshot_event_count=latest_snapshot_event_count, ) if not queued: st.session_state[PERSISTENCE_WARNING_KEY] = ( "Background snapshot persistence was unavailable for this session. " "Falling back to synchronous snapshot writes." ) result = persist_interactive_session( session_id=session_id, generation=generation, scenario_name=scenario_name, snapshot=snapshot, metrics=metrics, last_logged_generation=result.generation, last_logged_event_count=result.event_count, write_snapshot=True, last_snapshot_event_count=result.snapshot_event_count, ) st.session_state[SESSION_PERSISTENCE_KEY] = result return result def render_session_state_panel(persistence: SessionPersistenceResult) -> None: st.divider() st.subheader("Session State") st.caption("Interactive session persistence for external inspection.") st.write( { "session_id": persistence.session_id, "generation": persistence.generation, "persisted_at": persistence.persisted_at, } ) st.code( "\n".join( [ f"snapshot: {persistence.snapshot_path}", f"events: {persistence.events_path}", f"pointer: {persistence.pointer_path}", ] ), language="text", ) def export_timestamp() -> str: return datetime.now(timezone.utc).isoformat() def challenge_records_for_step(snapshot, *, zone_id: str, step_index: int) -> list[dict[str, object]]: records: list[dict[str, object]] = [] for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]: if challenge.zone_id != zone_id: continue if challenge.placed_step_index > step_index: continue if challenge.resolved_step_index is not None and step_index > challenge.resolved_step_index: continue records.append( { "instance_id": challenge.instance_id, "template_id": challenge.template_id, "name": challenge.name, "description": challenge.description, "resolution_status": challenge.resolution_status, "resolution_message": challenge.resolution_message, "placed_step_index": challenge.placed_step_index, "resolved_step_index": challenge.resolved_step_index, } ) return records def challenge_state_by_zone(snapshot) -> dict[str, dict[str, list[dict[str, object]]]]: by_zone: dict[str, dict[str, list[dict[str, object]]]] = {} for status_name, challenges in ( ("active", snapshot.active_challenges), ("resolved", snapshot.resolved_challenges), ): for challenge in challenges: zone_bucket = by_zone.setdefault(challenge.zone_id, {"active": [], "resolved": []}) zone_bucket[status_name].append( { "instance_id": challenge.instance_id, "template_id": challenge.template_id, "name": challenge.name, "description": challenge.description, "resolution_status": challenge.resolution_status, "resolution_message": challenge.resolution_message, "placed_step_index": challenge.placed_step_index, "resolved_step_index": challenge.resolved_step_index, } ) return by_zone def build_field_tile_state_export(env: SimulatorEnvironment, snapshot, *, session_id: str) -> dict[str, object]: challenge_state = challenge_state_by_zone(snapshot) tile_records: list[dict[str, object]] = [] for zone in snapshot.observation.zone_states: baseline_zone = env.baseline_zone_states.get(zone.zone_id) evidence_state = snapshot.observation.zone_evidence_state_by_zone.get(zone.zone_id) latest_event = env.latest_event_for_zone(zone.zone_id) zone_challenges = challenge_state.get(zone.zone_id, {"active": [], "resolved": []}) tile_records.append( { "zone_id": zone.zone_id, "row": zone.row, "col": zone.col, "initial_state": baseline_zone.model_dump(mode="json") if baseline_zone is not None else None, "current_state": zone.model_dump(mode="json"), "control_state": evidence_state.control_state if evidence_state is not None else None, "inspection_count": ( evidence_state.guarded_context.inspection_count if evidence_state is not None else 0 ), "revisit_count": ( evidence_state.guarded_context.revisit_count if evidence_state is not None else 0 ), "active_challenges": zone_challenges["active"], "resolved_challenges": zone_challenges["resolved"], "active_challenge_summaries": [str(item["name"]) for item in zone_challenges["active"]], "latest_event": latest_event.model_dump(mode="json") if latest_event is not None else None, } ) return { "session_id": session_id, "exported_at": export_timestamp(), "mode": snapshot.observation.mode, "step_index": snapshot.observation.step_index, "event_count": len(snapshot.event_history), "tiles": tile_records, } def build_tile_action_log_export(snapshot, *, session_id: str) -> dict[str, object]: tile_actions: list[dict[str, object]] = [] field_action_names = { ActionType.INSPECT.value, ActionType.REVISIT.value, ActionType.DEFER.value, ActionType.REMOVE_WEEDS.value, ActionType.MEASURE_SOIL.value, ActionType.APPLY_FERTILIZER.value, ActionType.APPLY_WATER.value, ActionType.INSTALL_DRAINAGE.value, ActionType.SUBSOIL.value, } for event in snapshot.event_history: if event.zone_id is None or str(event.action) not in field_action_names: continue active_challenges = challenge_records_for_step( snapshot, zone_id=event.zone_id, step_index=event.step_index, ) tile_actions.append( { "step_index": event.step_index, "zone_id": event.zone_id, "action": str(event.action), "execution_outcome": str(event.execution_outcome), "reward": event.reward, "from_zone_id": event.details.get("from_zone_id"), "to_zone_id": event.details.get("to_zone_id"), "revealed_facts": list(event.details.get("revealed_facts") or []), "challenge_names": [record["name"] for record in active_challenges], "challenges": active_challenges, "planned_task": ( event.planner_context.normalized_task.model_dump(mode="json") if event.planner_context is not None and event.planner_context.normalized_task is not None else None ), "requested_task": ( event.planner_context.chosen_task_before_normalization.model_dump(mode="json") if event.planner_context is not None and event.planner_context.chosen_task_before_normalization is not None else None ), } ) return { "session_id": session_id, "exported_at": export_timestamp(), "mode": snapshot.observation.mode, "step_index": snapshot.observation.step_index, "event_count": len(snapshot.event_history), "tile_actions": tile_actions, } def write_script_eval_exports(env: SimulatorEnvironment, snapshot, *, session_id: str) -> None: SCRIPT_EVAL_EXPORT_ROOT.mkdir(parents=True, exist_ok=True) FIELD_TILE_STATE_EXPORT_PATH.write_text( json.dumps( build_field_tile_state_export(env, snapshot, session_id=session_id), indent=2, sort_keys=True, ), encoding="utf-8", ) TILE_ACTION_LOG_EXPORT_PATH.write_text( json.dumps( build_tile_action_log_export(snapshot, session_id=session_id), indent=2, sort_keys=True, ), encoding="utf-8", ) write_sim_input_csvs(snapshot) def render_outcome_banner(snapshot, env: SimulatorEnvironment) -> None: outcome = snapshot.outcome if outcome.status == EpisodeOutcomeStatus.SUCCESS: st.success(outcome.message) elif outcome.status == EpisodeOutcomeStatus.FAILURE: st.error(f"{outcome.message} ({outcome.done_reason})") elif outcome.status == EpisodeOutcomeStatus.ERROR: st.warning(f"{outcome.message} ({outcome.done_reason})") elif outcome.status == EpisodeOutcomeStatus.STOPPED: st.warning(outcome.message) elif env.mode == SimulatorMode.RIDDLE: st.info("Riddle in progress.") else: st.info("Persistent world is running. The robot moves one neighboring field at a time, then performs field checks, soil sampling, or treatments on the field where it stands.") def render_primary_objective(riddle) -> None: st.markdown(f"## Objective: {riddle.objective}") note_left, note_center, note_right = st.columns(3) with note_left: st.info(f"**Riddle Note**\n\n{riddle.description}") with note_center: st.info(f"**Challenge**\n\n{riddle.challenge_summary}") with note_right: st.info(f"**Why It Matters**\n\n{riddle.why_it_matters}") def battery_percentage_int(energy_remaining: float | None, max_energy: float | None) -> int | None: if energy_remaining is None or max_energy in {None, 0}: return None ratio = max(0.0, min(1.0, float(energy_remaining) / float(max_energy))) if ratio <= 0.0: return 0 return max(0, min(100, int(math.ceil(ratio * 100.0)))) def render_battery_indicator(energy_remaining: float | None, max_energy: float | None) -> None: if energy_remaining is None or max_energy in {None, 0}: st.write("Battery unavailable") return percentage = battery_percentage_int(energy_remaining, max_energy) assert percentage is not None filled = round(percentage / 20) if percentage >= 60: fill_color, border_color = "#16a34a", "#15803d" elif percentage >= 30: fill_color, border_color = "#d97706", "#b45309" else: fill_color, border_color = "#dc2626", "#b91c1c" segs = "".join( f'
' for i in range(5) ) st.markdown( f"""
{segs}
{percentage}%
""", unsafe_allow_html=True, ) def render_riddle_layout( env: SimulatorEnvironment, snapshot, target_zone_id: str | None, show_truth: bool, riddle, selected_event, ) -> None: map_col, right_col = st.columns([3.8, 1.3]) with map_col: st.subheader("Field Grid") figure = get_cached_zone_grid_figure( env, target_zone_id=target_zone_id, show_truth=show_truth, mission_plan=snapshot.mission_plan, show_planned_trajectory=show_planned_trajectory_enabled(), ) plot_state = st.plotly_chart( figure, width="stretch", on_select="rerun", selection_mode="points", # Keep a stable widget identity so step updates do not remount the chart. key="riddle-grid", ) update_selected_zone_from_plot_state(plot_state, env, fallback_zone_id=target_zone_id) render_selected_step_summary(selected_event) with right_col: step_col, done_col = st.columns(2) with step_col: st.metric("Step", snapshot.observation.step_index) with done_col: st.metric("Done", "Yes" if snapshot.done else "No") if snapshot.observation.max_energy is not None: st.write("**Battery**") render_battery_indicator(snapshot.observation.energy_remaining, snapshot.observation.max_energy) render_grid_legend() render_selected_zone_panel(env, snapshot, selected_zone_id(env, target_zone_id=target_zone_id)) with st.container(): render_metrics_overview(env) def _render_challenge_status(snapshot) -> None: all_challenges = list(snapshot.active_challenges) + list(snapshot.resolved_challenges) recent = sorted(all_challenges, key=lambda c: c.placed_step_index, reverse=True)[:3] st.markdown("**Recent Challenges**") if not recent: st.caption("No challenges placed yet.") return for ch in recent: zone = short_zone_label(ch.zone_id) st.caption(f"· **{ch.name}** @ {zone}") def render_challenge_placement_controls(challenge_palette) -> None: with st.container(border=True): st.caption("Challenge Placement") challenge_col, radius_col, action_col = st.columns([2.5, 1.1, 1.1]) with challenge_col: template_id = st.selectbox( "Challenge", list(challenge_palette), format_func=lambda item: challenge_palette[item].display_name, key="interactive_manual_template", ) with radius_col: radius = st.selectbox( "Radius", [1, 2, 3], format_func=lambda value: f"{value} tile" if value == 1 else f"{value} tiles", key="interactive_manual_radius", ) with action_col: st.write("") if st.button("Apply", key="interactive_manual_apply", width="stretch"): arm_pending_challenge(template_id, int(radius)) if hasattr(st, "toast"): st.toast("Apply by selecting the tile.") pending_request = pending_challenge_request() if pending_request is not None: radius = pending_challenge_radius() st.info( "Apply by selecting the tile." f" Pending: {challenge_palette[pending_request].display_name}, radius {radius}." ) else: st.caption("Choose a challenge, press Apply, then click the target tile on the field.") def maybe_apply_pending_challenge_selection(env: SimulatorEnvironment, zone_id: str | None, strategy) -> bool: pending_request = pending_challenge_request() if pending_request is None or zone_id is None: return False template_id = pending_request radius = pending_challenge_radius() clear_pending_challenge() st.session_state[SELECTED_ZONE_KEY] = zone_id try: success, message = env.place_challenge(template_id, zone_id, radius=radius) except NotImplementedError as exc: success, message = False, str(exc) if success: if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): pause_playback(clear_last_tick=True) env.current_assessments, env.current_plan = strategy.plan(env.planning_observation()) # type: ignore[arg-type] env.invalidate_cached_views() bump_env_generation() set_placement_flash_message(success, message) st.rerun() return True def _render_world_map_fragment( env: SimulatorEnvironment, snapshot, selected_event, strategy, ) -> None: """Render the world map chart inside a stable fragment block. Wrapping this in @st.fragment gives the chart a stable, identifier-based parent block so it is never unmounted when unrelated elements above it shift the main-page element tree between reruns. """ map_col, side_col = st.columns([3.2, 1.2]) with map_col: st.subheader("World Map") figure = get_cached_zone_grid_figure( env, target_zone_id=None, show_truth=False, mission_plan=snapshot.mission_plan, show_planned_trajectory=show_planned_trajectory_enabled(), ) plot_state = st.plotly_chart( figure, width="stretch", on_select="rerun", selection_mode="points", key="interactive-grid", ) clicked_zone_id = selected_zone_from_plot_state(plot_state) if clicked_zone_id is not None: st.session_state[SELECTED_ZONE_KEY] = clicked_zone_id maybe_apply_pending_challenge_selection(env, clicked_zone_id, strategy) update_selected_zone_from_plot_state( plot_state, env, fallback_zone_id=(env.current_zone_id if env.current_zone_id in {zone.zone_id for zone in env.zone_states} else env.home_neighbor_zone_id), ) render_selected_step_summary(selected_event) with side_col: render_grid_legend() # Wrap with @st.fragment when available so the chart block has a stable # identifier-based anchor in the page tree, independent of positional shifts # caused by conditional elements elsewhere on the page. _render_world_map_fragment = getattr(st, "fragment", lambda f: f)(_render_world_map_fragment) def render_interactive_layout(env: SimulatorEnvironment, snapshot, selected_event, strategy, challenge_palette) -> None: with st.container(border=True): step_col, battery_col, challenge_col = st.columns([0.8, 1.3, 3.4]) with step_col: st.metric("Step", snapshot.observation.step_index) if env.mode != SimulatorMode.REAL_FIELD: metric_label, metric_value = field_layout_metric(env) st.metric(metric_label, metric_value) with battery_col: st.caption("Battery") render_battery_indicator(snapshot.observation.energy_remaining, snapshot.observation.max_energy) with challenge_col: _render_challenge_status(snapshot) render_challenge_placement_controls(challenge_palette) _render_world_map_fragment(env, snapshot, selected_event, strategy) render_selected_zone_panel( env, snapshot, selected_zone_id( env, target_zone_id=(env.current_zone_id if env.current_zone_id in {zone.zone_id for zone in env.zone_states} else env.home_neighbor_zone_id), ), ) with st.container(): render_metrics_overview(env) def mission_plan_signature(plan: MissionPlan | None) -> tuple[tuple[str | None, str | None, str | None], ...]: if plan is None: return () signature: list[tuple[str | None, str | None, str | None]] = [] for task in plan.ordered_tasks: if task.target_zone is None: continue signature.append( ( str(task.task_type), task.target_zone, str(task.control_intent) if task.control_intent is not None else None, ) ) return tuple(signature) def get_cached_zone_grid_figure( env: SimulatorEnvironment, *, target_zone_id: str | None, show_truth: bool, mission_plan: MissionPlan | None, show_planned_trajectory: bool, ) -> go.Figure: cache = st.session_state.setdefault(FIGURE_CACHE_KEY, {}) generation = int(st.session_state.get(ENV_GENERATION_KEY, 0)) cache_key = ( id(env), generation, target_zone_id, show_truth, show_planned_trajectory, mission_plan_signature(mission_plan), ) figure = cache.get(cache_key) if figure is not None: return figure figure = build_zone_grid_figure( env, target_zone_id=target_zone_id, show_truth=show_truth, mission_plan=mission_plan, show_planned_trajectory=show_planned_trajectory, ) cache[cache_key] = figure if len(cache) > 8: stale_keys = [key for key in cache if key != cache_key] for stale_key in stale_keys[:-7]: cache.pop(stale_key, None) return figure def get_real_field_static_map_payload(env: SimulatorEnvironment) -> RealFieldStaticMapPayload: cache = st.session_state.setdefault(STATIC_MAP_CACHE_KEY, {}) cache_key = id(env) payload = cache.get(cache_key) if payload is not None: return payload grid_size = max(field_extent(env.zone_states)) cell_marker_size = max(28, 104 - ((grid_size - 3) * 7)) cell_text_size = max(8, 15 - max(0, grid_size - 3)) marker_ring_size = cell_marker_size + max(12, 30 - max(0, grid_size - 3) * 2) step_marker_size = max(16, 28 - max(0, grid_size - 3)) home_base_zone = env._zone_state(env.home_neighbor_zone_id) if env.home_neighbor_zone_id is not None else None home_base_row = home_base_zone.row if home_base_zone is not None else max(1, (grid_size + 1) // 2) zone_ids = tuple(zone.zone_id for zone in env.zone_states) xs = tuple(zone.col or 0 for zone in env.zone_states) ys = tuple(-(zone.row or 0) for zone in env.zone_states) x_values = xs y_values = ys home_base_x = min(x_values) - 2.0 home_base_y = -home_base_row entry_x = float(home_base_zone.col if home_base_zone is not None else min(x_values)) entry_y = float(-(home_base_zone.row or home_base_row) if home_base_zone is not None else -home_base_row) figure = go.Figure() figure.add_trace( go.Scatter( x=[home_base_x, entry_x], y=[home_base_y, entry_y], mode="lines", line={"color": "#64748b", "width": 2, "dash": "dot"}, hoverinfo="skip", showlegend=False, ) ) figure.add_layout_image( dict( source=base_icon_data_uri(), x=home_base_x, y=home_base_y, sizex=1.95, sizey=1.95, xref="x", yref="y", xanchor="center", yanchor="middle", layer="above", ) ) figure.add_annotation( x=home_base_x, y=home_base_y - 1.25, text="Robot Home Base", showarrow=False, font={"size": max(9, cell_text_size), "color": "#334155"}, ) figure.update_layout( margin={"l": 10, "r": 10, "t": 10, "b": 10}, xaxis={"title": "Column", "dtick": 1, "range": [home_base_x - 1.2, max(x_values) + 0.6], "showgrid": False, "zeroline": False, "constrain": "domain"}, yaxis={"title": "Row", "dtick": 1, "range": [min(min(y_values), home_base_y) - 1.6, max(y_values) + 0.6], "showgrid": False, "zeroline": False, "scaleanchor": "x", "scaleratio": 1}, height=min(900, max(480, 90 * grid_size)), plot_bgcolor="#f8f5ef", paper_bgcolor="#ffffff", # Stable uirevision tells Plotly.js to preserve the current viewport (zoom/pan) # across data updates, preventing a full-reset flash on every simulation step. uirevision=1, ) payload = RealFieldStaticMapPayload( figure=figure, zone_ids=zone_ids, xs=xs, ys=ys, x_values=x_values, y_values=y_values, grid_size=grid_size, cell_marker_size=cell_marker_size, cell_text_size=cell_text_size, marker_ring_size=marker_ring_size, step_marker_size=step_marker_size, home_base_x=home_base_x, home_base_y=home_base_y, entry_x=entry_x, entry_y=entry_y, tile_world_x0=min(x_values) - 0.5, tile_world_x1=max(x_values) + 0.5, tile_world_y0=min(y_values) - 0.5, tile_world_y1=max(y_values) + 0.5, ) cache[cache_key] = payload if len(cache) > 4: stale_keys = [key for key in cache if key != cache_key] for stale_key in stale_keys[:-3]: cache.pop(stale_key, None) return payload def location_xy( env: SimulatorEnvironment, static_payload: RealFieldStaticMapPayload, zone_id: str | None, ) -> tuple[float | None, float | None]: if zone_id is None: return None, None if zone_id == env.home_zone_id: return static_payload.home_base_x, static_payload.home_base_y zone = env._zone_state(zone_id) return float(zone.col or 0), float(-(zone.row or 0)) def build_planned_horizon_payload( env: SimulatorEnvironment, *, mission_plan: MissionPlan | None, move_step_budget: int = PLANNED_TRAJECTORY_MOVE_STEP_BUDGET, ) -> PlannedHorizonPayload | None: plan = mission_plan or env.current_plan if plan is None or not plan.ordered_tasks: return None anchor_zone_id = env.current_zone_id or env.home_zone_id starts_from_home = env.current_zone_id == env.home_zone_id if anchor_zone_id is None: return None route_zone_ids: list[str] = [] planned_actions: list[PlannedHorizonAction] = [] move_steps_used = 0 action_order = 0 for task in plan.ordered_tasks: target_zone_id = task.target_zone if target_zone_id is None: continue if anchor_zone_id == target_zone_id: path = [target_zone_id] else: path = shortest_path_zone_ids( env.zone_states, anchor_zone_id, target_zone_id, home_zone_id=env.home_zone_id, home_neighbor_zone_id=env.home_neighbor_zone_id, ) if not path: continue path_move_count = max(0, len(path) - 1) remaining_move_budget = max(0, move_step_budget - move_steps_used) path_to_add = path reached_target = True if path_move_count > remaining_move_budget: path_to_add = path[: remaining_move_budget + 1] reached_target = False if path_to_add: if not route_zone_ids: route_zone_ids.extend(path_to_add) elif route_zone_ids[-1] == path_to_add[0]: route_zone_ids.extend(path_to_add[1:]) else: route_zone_ids.extend(path_to_add) if not reached_target: break move_steps_used += path_move_count anchor_zone_id = target_zone_id if task.task_type != ActionType.MOVE: action_order += 1 planned_actions.append( PlannedHorizonAction( order=action_order, zone_id=target_zone_id, task_type=str(task.task_type), rationale=task.rationale, ) ) if move_steps_used >= move_step_budget: continue if not route_zone_ids and not planned_actions: return None return PlannedHorizonPayload( route_zone_ids=tuple(route_zone_ids), starts_from_home=starts_from_home, actions=tuple(planned_actions), ) def build_planned_trajectory_payload( env: SimulatorEnvironment, *, mission_plan: MissionPlan | None, static_payload: RealFieldStaticMapPayload, move_step_budget: int = PLANNED_TRAJECTORY_MOVE_STEP_BUDGET, ) -> PlannedTrajectoryPayload | None: horizon = build_planned_horizon_payload( env, mission_plan=mission_plan, move_step_budget=move_step_budget, ) if horizon is None: return None task_marker_xs: list[float] = [] task_marker_ys: list[float] = [] task_marker_texts: list[str] = [] task_marker_sizes: list[int] = [] task_marker_colors: list[str] = [] task_marker_hovers: list[str] = [] per_zone_offsets: dict[str, int] = {} for planned_action in horizon.actions: target_zone_id = planned_action.zone_id zone_offset = per_zone_offsets.get(target_zone_id, 0) per_zone_offsets[target_zone_id] = zone_offset + 1 offset_x, offset_y = numbered_step_offset(zone_offset) marker_x, marker_y = location_xy(env, static_payload, target_zone_id) if marker_x is None or marker_y is None: continue task_marker_xs.append(marker_x + (offset_x * 0.75)) task_marker_ys.append(marker_y + (offset_y * 0.75)) task_marker_texts.append(str(planned_action.order)) task_marker_sizes.append(static_payload.step_marker_size + (10 if planned_action.order == 1 else 4)) task_marker_colors.append("#0f172a" if planned_action.order == 1 else "rgba(15,23,42,0.72)") task_marker_hovers.append( f"#{planned_action.order} {humanize_action_name(planned_action.task_type)} on " f"{short_zone_label(target_zone_id)}
{planned_action.rationale}" ) if not horizon.route_zone_ids and not task_marker_xs: return None route_xs: list[float] = [] route_ys: list[float] = [] for zone_id in horizon.route_zone_ids: marker_x, marker_y = location_xy(env, static_payload, zone_id) if marker_x is None or marker_y is None: continue route_xs.append(marker_x) route_ys.append(marker_y) waypoint_zone_ids = horizon.route_zone_ids[1:] waypoint_xs: list[float] = [] waypoint_ys: list[float] = [] for zone_id in waypoint_zone_ids: marker_x, marker_y = location_xy(env, static_payload, zone_id) if marker_x is None or marker_y is None: continue waypoint_xs.append(marker_x) waypoint_ys.append(marker_y) return PlannedTrajectoryPayload( route_zone_ids=horizon.route_zone_ids, starts_from_home=horizon.starts_from_home, route_xs=route_xs, route_ys=route_ys, waypoint_xs=waypoint_xs, waypoint_ys=waypoint_ys, task_marker_xs=task_marker_xs, task_marker_ys=task_marker_ys, task_marker_texts=task_marker_texts, task_marker_sizes=task_marker_sizes, task_marker_colors=task_marker_colors, task_marker_hovers=task_marker_hovers, ) def build_real_field_dynamic_map_payload( env: SimulatorEnvironment, *, target_zone_id: str | None, show_truth: bool, static_payload: RealFieldStaticMapPayload, mission_plan: MissionPlan | None, show_planned_trajectory: bool, ) -> RealFieldDynamicMapPayload: assessments = {assessment.zone_id: assessment for assessment in env.current_assessments} current_zone_id = env.current_zone_id previous_zone_id = env.event_history[-1].details.get("from_zone_id") if env.event_history else None latest_event_by_zone = env.latest_events_by_zone() latest_soil_event_by_zone = env.latest_zone_action_events(ActionType.MEASURE_SOIL) step_index = env.step_index zone_action_badges = build_zone_action_badges(env, mission_plan=mission_plan) texts: list[str] = [] colors: list[str] = [] line_colors: list[str] = [] line_widths: list[int] = [] hover_texts: list[str] = [] for zone in env.zone_states: assessment = assessments.get(zone.zone_id) hidden = env.hidden_zone_states[zone.zone_id] label = build_zone_cell_text( zone=zone, action_badges=zone_action_badges.get(zone.zone_id, []), ) if show_truth: label += f"
{hidden.true_priority_band}" texts.append(label) fill_color = priority_fill_color( float(zone.static_features.get("scenario_priority_hint", 0.0) or 0.0), needs_intervention=visible_tile_needs_intervention(zone), yield_pred_missing=tile_has_missing_yield(zone), ) colors.append(fill_color) border_color, border_width = zone_border_style( hidden=hidden, is_current_zone=zone.zone_id == current_zone_id, measured_fill_color=fill_color, ) line_colors.append(border_color) line_widths.append(border_width) hover_texts.append( build_zone_hover_text( zone_state=zone, assessment=assessment, event=latest_event_by_zone.get(zone.zone_id), soil_event=latest_soil_event_by_zone.get(zone.zone_id), step_index=step_index, ) ) event_marker_xs: list[float] = [] event_marker_ys: list[float] = [] event_marker_texts: list[str] = [] event_marker_colors: list[str] = [] event_marker_hovers: list[str] = [] per_zone_offsets: dict[str, int] = {} for event in env.event_history[-8:]: if event.zone_id is None: continue zone_offset = per_zone_offsets.get(event.zone_id, 0) per_zone_offsets[event.zone_id] = zone_offset + 1 offset_x, offset_y = numbered_step_offset(zone_offset) event_x, event_y = location_xy(env, static_payload, event.zone_id) if event_x is None or event_y is None: continue event_marker_xs.append(event_x + offset_x) event_marker_ys.append(event_y + offset_y) event_marker_texts.append(str(event.step_index)) event_marker_colors.append(action_color(str(event.action))) event_marker_hovers.append(build_event_hover_text(event)) planned_trajectory = ( build_planned_trajectory_payload( env, mission_plan=mission_plan, static_payload=static_payload, ) if show_planned_trajectory else None ) return RealFieldDynamicMapPayload( texts=texts, colors=colors, line_colors=line_colors, line_widths=line_widths, hover_texts=hover_texts, previous_zone_id=previous_zone_id, current_zone_id=current_zone_id, event_marker_xs=event_marker_xs, event_marker_ys=event_marker_ys, event_marker_texts=event_marker_texts, event_marker_colors=event_marker_colors, event_marker_hovers=event_marker_hovers, planned_trajectory=planned_trajectory, ) def build_zone_grid_figure( env: SimulatorEnvironment, target_zone_id: str | None, show_truth: bool, *, mission_plan: MissionPlan | None = None, show_planned_trajectory: bool = True, ) -> go.Figure: static_payload = get_real_field_static_map_payload(env) dynamic_payload = build_real_field_dynamic_map_payload( env, target_zone_id=target_zone_id, show_truth=show_truth, static_payload=static_payload, mission_plan=mission_plan, show_planned_trajectory=show_planned_trajectory, ) figure = go.Figure(static_payload.figure) # Draw cell tiles as native Plotly shapes (SVG rects in data coords) instead of a # raster layout_image. Plotly updates shape attributes in-place via Plotly.react() # with no image-swap step, eliminating the per-step flash caused by the old approach. cell_shapes = [ { "type": "rect", "x0": float(x) - 0.5, "x1": float(x) + 0.5, "y0": float(y) - 0.5, "y1": float(y) + 0.5, "fillcolor": fill_color, "line": {"color": line_color, "width": float(line_width)}, "layer": "below", "xref": "x", "yref": "y", } for x, y, fill_color, line_color, line_width in zip( static_payload.xs, static_payload.ys, dynamic_payload.colors, dynamic_payload.line_colors, dynamic_payload.line_widths, strict=False, ) ] figure.update_layout(shapes=cell_shapes) figure.add_trace( go.Scatter( x=list(static_payload.xs), y=list(static_payload.ys), mode="markers+text", showlegend=False, text=dynamic_payload.texts, textposition="middle center", textfont={"size": static_payload.cell_text_size}, marker={ "size": static_payload.cell_marker_size, "symbol": "square", "color": dynamic_payload.colors, "line": {"width": dynamic_payload.line_widths, "color": dynamic_payload.line_colors}, "opacity": 0.001, }, hovertemplate="%{hovertext}", hovertext=dynamic_payload.hover_texts, customdata=list(static_payload.zone_ids), selected={"marker": {"opacity": 1.0}}, unselected={"marker": {"opacity": 1.0}}, ) ) if ( dynamic_payload.previous_zone_id is not None and dynamic_payload.current_zone_id is not None and dynamic_payload.previous_zone_id != dynamic_payload.current_zone_id ): previous_x, previous_y = location_xy(env, static_payload, dynamic_payload.previous_zone_id) current_x, current_y = location_xy(env, static_payload, dynamic_payload.current_zone_id) if previous_x is not None and previous_y is not None and current_x is not None and current_y is not None: figure.add_trace( go.Scatter( x=[previous_x, current_x], y=[previous_y, current_y], mode="lines", line={"color": "#9aa6b2", "width": 3, "dash": "dot"}, hoverinfo="skip", showlegend=False, ) ) figure.add_trace( go.Scatter( x=[previous_x], y=[previous_y], mode="markers+text", text=["Prev"], textposition="bottom center", textfont={"size": max(8, static_payload.cell_text_size - 2), "color": "#64748b"}, marker={"size": max(12, static_payload.step_marker_size - 8), "symbol": "circle", "color": "rgba(100,116,139,0.35)", "line": {"width": 0}}, hovertemplate="Previous robot position", showlegend=False, ) ) if dynamic_payload.planned_trajectory is not None: planned = dynamic_payload.planned_trajectory if len(planned.route_xs) >= 2: figure.add_trace( go.Scatter( x=planned.route_xs, y=planned.route_ys, mode="lines", line={"color": "rgba(15,23,42,0.48)", "width": 5}, hoverinfo="skip", showlegend=False, ) ) if planned.waypoint_xs: figure.add_trace( go.Scatter( x=planned.waypoint_xs, y=planned.waypoint_ys, mode="markers", marker={ "size": max(8, static_payload.step_marker_size - 8), "symbol": "circle", "color": "rgba(15,23,42,0.50)", "line": {"width": 1, "color": "rgba(255,255,255,0.95)"}, }, hoverinfo="skip", showlegend=False, ) ) if planned.task_marker_xs: figure.add_trace( go.Scatter( x=planned.task_marker_xs, y=planned.task_marker_ys, mode="markers+text", text=planned.task_marker_texts, textposition="middle center", textfont={"color": "white", "size": max(8, static_payload.cell_text_size - 1)}, marker={ "size": planned.task_marker_sizes, "symbol": "circle", "color": planned.task_marker_colors, "line": {"width": 2, "color": "white"}, }, customdata=planned.task_marker_hovers, hovertemplate="%{customdata}", showlegend=False, ) ) if dynamic_payload.event_marker_xs: figure.add_trace( go.Scatter( x=dynamic_payload.event_marker_xs, y=dynamic_payload.event_marker_ys, mode="markers+text", text=dynamic_payload.event_marker_texts, textposition="middle center", textfont={"color": "white", "size": max(8, static_payload.cell_text_size - 1)}, marker={ "size": static_payload.step_marker_size, "symbol": "circle", "color": dynamic_payload.event_marker_colors, "line": {"width": 2, "color": "white"}, }, customdata=dynamic_payload.event_marker_hovers, hovertemplate="%{customdata}", showlegend=False, ) ) if dynamic_payload.current_zone_id is not None: robot_x, robot_y = location_xy(env, static_payload, dynamic_payload.current_zone_id) if robot_x is None or robot_y is None: return figure figure.add_trace( go.Scatter( x=[robot_x], y=[robot_y], mode="markers", marker={ "size": static_payload.marker_ring_size + 20, "symbol": "circle-open", "color": "rgba(0,0,0,0)", "line": {"width": 10, "color": "#0f172a"}, }, hovertemplate="Robot current position", showlegend=False, ) ) figure.add_layout_image( dict( source=robot_icon_data_uri(), x=robot_x, y=robot_y, sizex=0.88, sizey=0.88, xref="x", yref="y", xanchor="center", yanchor="middle", layer="above", ) ) return figure def build_latest_event_by_zone(event_history) -> dict[str, object]: latest = {} for event in event_history: if event.zone_id is not None: latest[event.zone_id] = event return latest def build_latest_zone_action_by_zone(event_history, action: ActionType) -> dict[str, object]: latest = {} for event in event_history: if event.zone_id is not None and event.action == action: latest[event.zone_id] = event return latest def latest_soil_measurement_event(event_history, zone_id: str): for event in reversed(event_history): if event.zone_id == zone_id and event.action == ActionType.MEASURE_SOIL: return event return None def latest_field_check_event(event_history, zone_id: str): for event in reversed(event_history): if event.zone_id == zone_id and event.action in {ActionType.INSPECT, ActionType.REVISIT}: return event return None def latest_zone_action_event(event_history, zone_id: str, action: ActionType): for event in reversed(event_history): if event.zone_id == zone_id and event.action == action: return event return None def format_steps_ago(current_step_index: int, measured_step_index: int | None) -> str: if measured_step_index is None: return "age unknown" delta = max(0, current_step_index - measured_step_index) if delta == 0: return "measured this step" if delta == 1: return "measured 1 step ago" return f"measured {delta} steps ago" def format_steps_since(current_step_index: int, event_step_index: int | None, *, label: str = "checked") -> str: if event_step_index is None: return "not yet" delta = max(0, current_step_index - event_step_index) if delta == 0: return f"{label} this step" if delta == 1: return f"{label} 1 step ago" return f"{label} {delta} steps ago" def format_optional_number(value, decimals: int = 2, suffix: str = "") -> str: if value is None: return "—" try: numeric = float(value) except (TypeError, ValueError): return str(value) return f"{numeric:.{decimals}f}{suffix}" def normalize_priority_label(value) -> str: text = str(value) return { "Inspect now": "Act now", "Inspect soon": "Actionable", "Watch": "Monitor", "Low attention": "Low", }.get(text, text) def short_action_label(action_name: str) -> str: return { "move": "M", "inspect": "C", "revisit": "C", "defer": "C", "patrol": "M", "recharge": "M", "remove_weeds": "C", "measure_soil": "S", "apply_fertilizer": "F", "apply_water": "W", "install_drainage": "D", "subsoil": "U", "end_mission": "M", }.get(action_name, action_name[:1].upper()) def action_color(action_name: str) -> str: return { "move": "#2563eb", "inspect": "#1f7a8c", "revisit": "#2a9d8f", "defer": "#1f7a8c", "patrol": "#2563eb", "recharge": "#2563eb", "remove_weeds": "#1f7a8c", "measure_soil": "#7c3aed", "apply_fertilizer": "#4f772d", "apply_water": "#2a6f97", "install_drainage": "#8d6e63", "subsoil": "#6b705c", "end_mission": "#2563eb", }.get(action_name, "#264653") def build_zone_hover_text(*, zone_state, assessment, event, soil_event, step_index: int) -> str: lines = [] if event is not None: lines.append(f"last_execution={humanize_execution_outcome(event.execution_outcome)}") else: lines.append("last_execution=No execution yet") urgency_score = float(zone_state.static_features.get("scenario_priority_hint", 0.0)) lines.append(f"urgency_score={urgency_score:.2f}") if tile_has_missing_yield(zone_state): lines.append("yield_pred=Missing") next_action = humanize_action_name(str(assessment.recommended_action)) if assessment is not None else "Check field" lines.append(f"next_action={next_action}") if zone_state.soil_measurement is not None: measurement_age = format_steps_ago( step_index, soil_event.step_index if soil_event is not None else None, ) lines.append(f"soil_measurement={measurement_age}") measurement = zone_state.soil_measurement lines.append( f"moisture={measurement.moisture_m3m3:.2f} m3/m3" if measurement.moisture_m3m3 is not None else "moisture=—" ) lines.append( f"nitrogen={measurement.nitrogen_gkg:.2f} g/kg" if measurement.nitrogen_gkg is not None else "nitrogen=—" ) lines.append( f"organic_carbon={measurement.organic_carbon_gkg:.1f} g/kg" if measurement.organic_carbon_gkg is not None else "organic_carbon=—" ) lines.append( f"ph={measurement.ph:.1f}" if measurement.ph is not None else "ph=—" ) else: lines.append("soil_measurement=Not measured") return "
".join(lines) def build_event_hover_text(event) -> str: lines = [ f"step={event.step_index}", f"step_action={humanize_action_name(str(event.action))}", f"execution={humanize_execution_outcome(event.execution_outcome)}", f"reward={event.reward:.2f}", ] if event_control_intent(event) is not None: lines.append(f"intent={humanize_control_intent(event_control_intent(event))}") if event.control_state_after is not None: lines.append(f"control_state={humanize_control_state(event.control_state_after)}") if event_terminal_reason(event) is not None: lines.append(f"terminal_reason={humanize_terminal_reason(event_terminal_reason(event))}") if event.details.get("revealed_facts"): lines.append("revealed=" + ", ".join(event.details["revealed_facts"][:3])) return "
".join(lines) CAUTION_URGENCY_THRESHOLD = REAL_FIELD_CAUTION_URGENCY_THRESHOLD RED_URGENCY_THRESHOLD = REAL_FIELD_CRITICAL_URGENCY_THRESHOLD def show_planned_trajectory_enabled() -> bool: return bool(st.session_state.get(SHOW_PLANNED_TRAJECTORY_KEY, True)) def render_grid_legend() -> None: st.markdown( """
Legend
Tile fill
  Green: no intervention tile, urgency ≤ 0.52
  Sand: no intervention tile, urgency > 0.52
  Orange: intervention tile, urgency < 0.978
  Red: intervention tile, urgency ≥ 0.978
  Black: predicted yield is missing, excluded from urgency planning
Tile border
Ground truth matches the robot's current belief
        Colored border: hidden ground truth urgency differs from the robot's current belief
Tile action badges
1🌿 Planned tile action from the receding-horizon plan
Planned action executed correctly in the last 5 steps
Planned action failed in the last 5 steps
Planned route
── Dark preview line for the next 10 movement steps
Connected waypoint dots
1 Planned task marker
Markers
• ⊕ 🤖 Robot position
• ①② Step order dots
""", unsafe_allow_html=True, ) def render_selected_step_summary(selected_event) -> None: if selected_event is None: return location = format_event_location(selected_event) outcome = humanize_execution_outcome(selected_event.execution_outcome) st.caption( f"Latest step {selected_event.step_index}: " f"{humanize_action_name(str(selected_event.action))} | {location} | {outcome}" ) def selected_zone_id(env: SimulatorEnvironment, *, target_zone_id: str | None) -> str: valid_zone_ids = {zone.zone_id for zone in env.zone_states} current = st.session_state.get(SELECTED_ZONE_KEY) if current in valid_zone_ids: return current fallback = ( target_zone_id or (env.current_zone_id if env.current_zone_id in valid_zone_ids else None) or env.home_neighbor_zone_id or env.zone_states[0].zone_id ) st.session_state[SELECTED_ZONE_KEY] = fallback return fallback def update_selected_zone_from_plot_state(plot_state, env: SimulatorEnvironment, *, fallback_zone_id: str | None) -> None: zone_id = selected_zone_from_plot_state(plot_state) if zone_id is not None and zone_id in {zone.zone_id for zone in env.zone_states}: st.session_state[SELECTED_ZONE_KEY] = zone_id return selected_zone_id(env, target_zone_id=fallback_zone_id) def selected_zone_from_plot_state(plot_state) -> str | None: if plot_state is None: return None selection = getattr(plot_state, "selection", None) if selection is None and isinstance(plot_state, dict): selection = plot_state.get("selection") if selection is None: return None points = getattr(selection, "points", None) if points is None and isinstance(selection, dict): points = selection.get("points") if not points: return None for point in points: customdata = getattr(point, "customdata", None) if customdata is None and isinstance(point, dict): customdata = point.get("customdata") if isinstance(customdata, str) and customdata.startswith("zone_"): return customdata return None def latest_event_for_zone(snapshot, zone_id: str): for event in reversed(snapshot.event_history): if event.zone_id == zone_id: return event return None def latest_evidence_event_for_zone(snapshot, zone_id: str): for event in reversed(snapshot.event_history): if event.zone_id != zone_id: continue evidence_path = event.details.get("evidence_image_path") if evidence_path and Path(evidence_path).exists(): return event return None def overlay_weed_mask(evidence_path: str, mask_path: str) -> bytes: """Blend a blue semi-transparent weed segmentation mask over the evidence image.""" import io import numpy as np from PIL import Image evidence = Image.open(evidence_path).convert("RGBA") mask = Image.open(mask_path).convert("L") if mask.size != evidence.size: mask = mask.resize(evidence.size, Image.NEAREST) mask_arr = np.array(mask) overlay = np.zeros((*mask_arr.shape, 4), dtype=np.uint8) overlay[mask_arr > 128] = [30, 100, 220, 150] # blue, semi-transparent blended = Image.alpha_composite(evidence, Image.fromarray(overlay, "RGBA")) buf = io.BytesIO() blended.convert("RGB").save(buf, format="PNG") return buf.getvalue() def render_selected_zone_panel(env: SimulatorEnvironment, snapshot, zone_id: str) -> None: zone = env._zone_state(zone_id) evidence_state = snapshot.observation.zone_evidence_state_by_zone.get(zone_id) workflow = snapshot.observation.riddle_workflows_by_zone.get(zone_id) latest_event = env.latest_event_for_zone(zone_id) inspect_event = env.latest_zone_action_event(zone_id, ActionType.INSPECT) revisit_event = env.latest_zone_action_event(zone_id, ActionType.REVISIT) if inspect_event is None: field_check_event = revisit_event elif revisit_event is None: field_check_event = inspect_event else: field_check_event = inspect_event if inspect_event.step_index >= revisit_event.step_index else revisit_event soil_event = env.latest_zone_action_event(zone_id, ActionType.MEASURE_SOIL) fertilizer_event = env.latest_zone_action_event(zone_id, ActionType.APPLY_FERTILIZER) water_event = env.latest_zone_action_event(zone_id, ActionType.APPLY_WATER) step_index = snapshot.observation.step_index static = zone.static_features or {} weather = zone.weather_proxies or {} inspection_count = evidence_state.guarded_context.inspection_count if evidence_state else 0 if zone.soil_measurement is not None: m = zone.soil_measurement soil_measurement_summary = ( f"{format_steps_ago(step_index, soil_event.step_index if soil_event is not None else None)} | " f"moisture {format_optional_number(m.moisture_m3m3, 2)}, " f"nitrogen {format_optional_number(m.nitrogen_gkg, 2)}, " f"organic C {format_optional_number(m.organic_carbon_gkg, 1)}, " f"pH {format_optional_number(m.ph, 1)}" ) else: soil_measurement_summary = "not yet" if latest_event is not None: last_execution_summary = ( f"{humanize_action_name(str(latest_event.action))} -> " f"{humanize_execution_outcome(latest_event.execution_outcome)}" ) else: last_execution_summary = "none yet" if zone.applied_fertilizer_amount_kg_ha is not None: fertilizer_product = ( zone.fertilizer_recommendation.product_name if zone.fertilizer_recommendation is not None else "fertilizer" ) fertilizer_summary = ( f"{format_steps_ago(step_index, fertilizer_event.step_index if fertilizer_event is not None else None)} " f"({fertilizer_product} @ {zone.applied_fertilizer_amount_kg_ha:.1f} kg/ha)" ) else: fertilizer_summary = "not yet" if zone.applied_water_amount_mm is not None: water_summary = ( f"{format_steps_ago(step_index, water_event.step_index if water_event is not None else None)} " f"({zone.applied_water_amount_mm:.1f} mm)" ) else: water_summary = "not yet" with st.expander(f"Selected Field: {short_zone_label(zone_id)}", expanded=False): overview_col, imagery_col, soil_col, robot_col = st.columns(4) with overview_col: st.markdown("**Overview**") st.write(f"Urgency score: {format_optional_number(static.get('scenario_priority_hint'), 3)}") st.write(f"Predicted yield missing: {'Yes' if tile_has_missing_yield(zone) else 'No'}") st.write(f"Status: {humanize_metric_token(str(static.get('status', 'unknown')))}") st.write(f"Primary stress: {humanize_metric_token(str(static.get('primary_stress', 'unknown')))}") st.write(f"Recommended action: {humanize_metric_token(str(static.get('recommended_action', 'none')))}") with imagery_col: st.markdown("**Crop And Imagery**") st.write(f"Yield: {format_optional_number(static.get('yield_tha'), 2, ' t/ha')}") st.write(f"Predicted yield: {format_optional_number(static.get('yield_pred'), 2, ' t/ha')}") st.write(f"Field mean yield: {format_optional_number(static.get('field_mean_yield'), 2, ' t/ha')}") st.write(f"Yield deficit: {format_optional_number(static.get('yield_deficit_pct'), 1, ' %')}") st.write(f"NDVI: {format_optional_number(static.get('ndvi_last'), 3)}") st.write(f"NDRE: {format_optional_number(static.get('ndre_last'), 3)}") st.write(f"NDWI: {format_optional_number(static.get('ndwi_last'), 3)}") with soil_col: st.markdown("**Soil And Field**") st.write(f"Nitrogen: {format_optional_number(static.get('nitrogen_0-5'), 1)}") st.write(f"Soil organic carbon: {format_optional_number(static.get('soc_0-5'), 1)}") st.write(f"Clay: {format_optional_number(static.get('clay_0-5'), 1)}") st.write(f"Sand: {format_optional_number(static.get('sand_0-5'), 1)}") st.write(f"Temperature mean: {format_optional_number(weather.get('temp_mean_mean'), 2)}") st.write(f"Total precipitation: {format_optional_number(weather.get('total_prec_mean'), 2)}") with robot_col: st.markdown("**Robot State And Actions**") st.write(f"Field checks: {inspection_count}") st.write( f"Last checked: " f"{format_steps_since(step_index, field_check_event.step_index if field_check_event is not None else None)}" ) st.write(f"Last execution: {last_execution_summary}") st.write(f"Soil measured: {soil_measurement_summary}") st.write(f"Fertilizer applied: {fertilizer_summary}") st.write(f"Water applied: {water_summary}") if workflow is not None and workflow.last_message: st.caption(workflow.last_message) def render_metrics_overview(env: SimulatorEnvironment) -> None: metrics = env.metrics() summary_col, technical_col = st.columns(2) with summary_col: with st.expander("Run Summary", expanded=False): st.caption("Plain-language summary of what happened in the simulator episode.") for heading, lines in summarize_metrics(metrics).items(): st.write(f"**{heading}**") for line in lines: st.write(f"- {line}") with technical_col: with st.expander("Technical Metrics", expanded=False): st.json(metrics) def summarize_metrics(metrics: dict[str, object]) -> dict[str, list[str]]: return { "Mission Progress": [ f"Steps taken: {int(metrics.get('steps', 0))}", f"Field checks: {int(metrics.get('inspect_actions', 0))}", f"Follow-up checks: {int(metrics.get('revisit_actions', 0))}", f"Soil samples: {int(metrics.get('measure_soil_actions', 0))}", f"Challenges resolved: {int(metrics.get('resolved_challenges', 0))}", ], "Outcome": [ f"Episode status: {humanize_metric_token(str(metrics.get('outcome_status', 'unknown')))}", f"Termination reason: {humanize_metric_token(str(metrics.get('termination_reason', 'in_progress')))}", f"Reward score: {metrics.get('total_reward', 0)}", ], } def grid_dimension(zone_states) -> int: return max(max((zone.row or 0) for zone in zone_states), max((zone.col or 0) for zone in zone_states)) def add_field_grid_shapes(figure: go.Figure, zone_states) -> None: x_values = [zone.col or 0 for zone in zone_states] y_values = [-(zone.row or 0) for zone in zone_states] if is_rectangular_field(zone_states): vertical_boundaries = [min(x_values) - 0.5] + [x + 0.5 for x in sorted(set(x_values))[:-1]] + [max(x_values) + 0.5] horizontal_boundaries = [max(y_values) + 0.5] + [y - 0.5 for y in sorted(set(y_values), reverse=True)[:-1]] + [min(y_values) - 0.5] for boundary_x in vertical_boundaries: figure.add_shape( type="line", x0=boundary_x, x1=boundary_x, y0=min(y_values) - 0.5, y1=max(y_values) + 0.5, line={"color": "#d7d0c0", "width": 1}, layer="below", ) for boundary_y in horizontal_boundaries: figure.add_shape( type="line", x0=min(x_values) - 0.5, x1=max(x_values) + 0.5, y0=boundary_y, y1=boundary_y, line={"color": "#d7d0c0", "width": 1}, layer="below", ) return for zone in zone_states: col = zone.col or 0 row = zone.row or 0 figure.add_shape( type="rect", x0=col - 0.5, x1=col + 0.5, y0=-(row) - 0.5, y1=-(row) + 0.5, line={"color": "#d7d0c0", "width": 1}, fillcolor="rgba(255,255,255,0)", layer="below", ) def robot_icon_data_uri() -> str: svg = """ """.strip() return f"data:image/svg+xml;utf8,{quote(svg)}" def base_icon_data_uri() -> str: svg = """ """.strip() return f"data:image/svg+xml;utf8,{quote(svg)}" def planned_action_icon(action_name: str) -> str: return { ActionType.INSPECT.value: "🔎", ActionType.REVISIT.value: "🔎", ActionType.DEFER.value: "⏸", ActionType.REMOVE_WEEDS.value: "🌿", ActionType.MEASURE_SOIL.value: "🧭", ActionType.APPLY_FERTILIZER.value: "🧪", ActionType.APPLY_WATER.value: "💧", ActionType.INSTALL_DRAINAGE.value: "🛠", ActionType.SUBSOIL.value: "⛏", }.get(action_name, "•") def planned_task_succeeded(event, *, task_type: str) -> bool: success_by_task = { ActionType.INSPECT.value: ExecutionOutcomeType.INSPECTION_COMPLETED.value, ActionType.REVISIT.value: ExecutionOutcomeType.INSPECTION_COMPLETED.value, ActionType.DEFER.value: ExecutionOutcomeType.DEFERRED.value, ActionType.REMOVE_WEEDS.value: ExecutionOutcomeType.WEED_REMOVAL_COMPLETED.value, ActionType.MEASURE_SOIL.value: ExecutionOutcomeType.SOIL_MEASUREMENT_COMPLETED.value, ActionType.APPLY_FERTILIZER.value: ExecutionOutcomeType.FERTILIZER_APPLIED.value, ActionType.APPLY_WATER.value: ExecutionOutcomeType.WATER_APPLIED.value, ActionType.INSTALL_DRAINAGE.value: ExecutionOutcomeType.DRAINAGE_INSTALLED.value, ActionType.SUBSOIL.value: ExecutionOutcomeType.SUBSOIL_COMPLETED.value, } return str(event.execution_outcome) == success_by_task.get(task_type) def planned_task_failed(event, *, task_type: str) -> bool: del task_type return str(event.execution_outcome) in { ExecutionOutcomeType.INVALID_TASK.value, ExecutionOutcomeType.MOVE_REJECTED.value, ExecutionOutcomeType.POSITION_REJECTED.value, ExecutionOutcomeType.RESOURCE_EXHAUSTED.value, } def recent_task_status_by_zone_action( env: SimulatorEnvironment, *, ttl_steps: int = RECENT_TASK_ICON_TTL_STEPS, ) -> dict[tuple[str, str], str]: statuses: dict[tuple[str, str], str] = {} minimum_step_index = max(0, env.step_index - ttl_steps) for event in reversed(env.event_history): if event.step_index < minimum_step_index: break planner_task = event.planner_context.normalized_task if event.planner_context is not None else None if planner_task is None or planner_task.target_zone is None: continue task_type = str(planner_task.task_type) if task_type == ActionType.MOVE.value: continue status_key = (planner_task.target_zone, task_type) if status_key in statuses: continue if planned_task_succeeded(event, task_type=task_type): statuses[status_key] = "→" elif planned_task_failed(event, task_type=task_type): statuses[status_key] = "✕" return statuses def build_zone_action_badges( env: SimulatorEnvironment, *, mission_plan: MissionPlan | None, ) -> dict[str, list[str]]: badges_by_zone: dict[str, list[str]] = {} horizon = build_planned_horizon_payload(env, mission_plan=mission_plan) recent_statuses = recent_task_status_by_zone_action(env) represented_keys: set[tuple[str, str]] = set() if horizon is not None: for action in horizon.actions: badge = f"{action.order}{planned_action_icon(action.task_type)}" status = recent_statuses.get((action.zone_id, action.task_type)) if status is not None: badge += status badges_by_zone.setdefault(action.zone_id, []).append(badge) represented_keys.add((action.zone_id, action.task_type)) for (zone_id, task_type), status in recent_statuses.items(): if (zone_id, task_type) in represented_keys: continue badges_by_zone.setdefault(zone_id, []).append(f"{planned_action_icon(task_type)}{status}") return badges_by_zone def build_zone_cell_text(*, zone, action_badges: list[str]) -> str: del zone if not action_badges: return "" return "
".join([" ".join(action_badges)]) def numbered_step_offset(index: int) -> tuple[float, float]: offsets = [ (-0.28, 0.28), (-0.02, 0.28), (0.24, 0.28), (-0.28, 0.02), (-0.02, 0.02), (0.24, 0.02), (-0.28, -0.24), (-0.02, -0.24), (0.24, -0.24), ] return offsets[index % len(offsets)] _BAND_TO_PRIORITY: dict[str, float] = { "low": 0.15, "medium": 0.35, "high": 0.65, "critical": 0.9, } def priority_fill_color(priority: float, *, needs_intervention: bool, yield_pred_missing: bool = False) -> str: if yield_pred_missing: return MISSING_YIELD_COLOR if needs_intervention: if priority >= RED_URGENCY_THRESHOLD: return "#d1495b" return "#edae49" if priority > CAUTION_URGENCY_THRESHOLD: return "#f1e3b0" return "#bdd9bf" def ground_truth_fill_color(hidden) -> str: needs_intervention = truth_tile_needs_intervention(hidden) priority_hint = getattr(hidden, "true_priority_hint", None) if priority_hint is not None: priority = float(priority_hint or 0.0) else: band = getattr(getattr(hidden, "true_priority_band", None), "value", None) or "low" priority = _BAND_TO_PRIORITY.get(str(band).lower(), 0.0) return priority_fill_color( priority, needs_intervention=needs_intervention, yield_pred_missing=bool(getattr(hidden, "true_yield_pred_missing", False)), ) def zone_border_style(*, hidden, is_current_zone: bool, measured_fill_color: str) -> tuple[str, int]: """Return (border_color, border_width). Robot position → dark ink, width 4 Belief/truth discrepancy → ground truth fill color as border, width 3 Belief matches truth → subtle blue-gray, width 1 """ if is_current_zone: return "#21312a", 4 truth_color = ground_truth_fill_color(hidden) if truth_color != measured_fill_color: return truth_color, 3 # discrepancy: ground truth differs from robot's view return "#9aa6b2", 1 # visited, measurement matches ground truth def visible_tile_needs_intervention(zone_state) -> bool: mapped_action = action_type_for_field_recommendation(zone_state.static_features.get("recommended_action")) if mapped_action in SERVICE_ACTIONS: return True return bool(zone_state.static_features.get("needs_intervention", False)) def tile_has_missing_yield(zone_state) -> bool: return bool(zone_state.static_features.get("yield_pred_missing", False)) def truth_tile_needs_intervention(hidden) -> bool: if bool(getattr(hidden, "true_needs_intervention", False)): return True mapped_action = action_type_for_field_recommendation(getattr(hidden, "true_recommended_action", "none")) if mapped_action in SERVICE_ACTIONS: return True return ( getattr(hidden, "weed_removal_required", False) or getattr(hidden, "fertilizer_recommendation", None) is not None or getattr(hidden, "irrigation_recommendation", None) is not None ) def format_step_option(event) -> str: return f"Step {event.step_index}: {humanize_action_name(str(event.action))} | {format_event_location(event)}" def format_event_location(event) -> str: from_zone = event.details.get("from_zone_id") to_zone = event.details.get("to_zone_id") or event.zone_id if str(event.action) == ActionType.MOVE.value and from_zone and to_zone: return f"{short_zone_label(from_zone)} -> {short_zone_label(to_zone)}" if to_zone is None: return "global action" return f"at {short_zone_label(to_zone)}" def current_location_label(zone_id: str | None) -> str: if zone_id in {None, "home_base"}: return "Home Base" return short_zone_label(zone_id) def short_zone_label(zone_id: str | None) -> str: if not zone_id: return "None" if zone_id == "home_base": return "Home Base" try: _, row_token, col_token = zone_id.split("_") return f"R{int(row_token.replace('r', ''))}C{int(col_token.replace('c', ''))}" except (ValueError, IndexError): return zone_id def priority_band_label(priority: float) -> str: if priority >= 0.8: return "Act now" if priority >= 0.6: return "Actionable" if priority >= 0.35: return "Monitor" return "Low" def priority_band_short_label(priority: float) -> str: if priority >= 0.8: return "Now" if priority >= 0.6: return "Act" if priority >= 0.35: return "Mon" return "Low" def humanize_constraint(constraint_level) -> str: value = enum_value(constraint_level) return { "none": "No constraint", "soft": "Needs reconfirmation", "hard": "High constraint", }.get(value, str(value)) def humanize_action_name(action_name: str) -> str: return { "move": "Move", "inspect": "Check field", "revisit": "Check field", "defer": "Check field", "patrol": "Move", "recharge": "Move", "remove_weeds": "Check field", "measure_soil": "Take soil sample", "apply_fertilizer": "Apply fertilizer", "apply_water": "Apply water", "install_drainage": "Install drainage", "subsoil": "Subsoil", "end_mission": "Move", }.get(action_name, action_name.replace("_", " ").title()) def humanize_metric_token(value: str) -> str: return value.replace("_", " ").title() def humanize_control_intent(control_intent) -> str: value = enum_value(control_intent) return { ResolutionIntent.CONTINUE_INSPECTION.value: "Continue field check", ResolutionIntent.REQUEST_RECHECK.value: "Continue field check", ResolutionIntent.DEFER_ZONE.value: "No action on tile", ResolutionIntent.STOP_FOR_RESOURCE_LIMIT.value: "Stop for resource limit", ResolutionIntent.STOP_FOR_BACKEND_ERROR.value: "Stop for backend error", }.get(value, humanize_metric_token(str(value))) def humanize_control_state(control_state) -> str: value = enum_value(control_state) return { ControlState.UNSEEN.value: "Unseen", ControlState.INSPECT_REQUIRED.value: "Field check required", ControlState.RECHECK_REQUIRED.value: "Field check required", ControlState.RESOLVED_COMPLETE.value: "Resolved complete", ControlState.RESOLVED_DEFERRED.value: "No action applied", ControlState.RESOLVED_ERROR.value: "Resolved error", }.get(value, humanize_metric_token(str(value))) def humanize_terminal_reason(terminal_reason) -> str: value = enum_value(terminal_reason) return { TerminalReason.COMPLETION.value: "Completion", TerminalReason.DEFERRED.value: "No action applied", TerminalReason.RESOURCE_LIMIT.value: "Resource limit", TerminalReason.BACKEND_ERROR.value: "Backend error", }.get(value, humanize_metric_token(str(value))) def humanize_execution_outcome(execution_outcome) -> str: value = enum_value(execution_outcome) return { ExecutionOutcomeType.MISSION_STOPPED.value: "Mission stopped", ExecutionOutcomeType.INVALID_TASK.value: "Invalid task", ExecutionOutcomeType.MOVE_COMPLETED.value: "Move completed", ExecutionOutcomeType.MOVE_SKIPPED.value: "Move skipped", ExecutionOutcomeType.MOVE_REJECTED.value: "Move rejected", ExecutionOutcomeType.POSITION_REJECTED.value: "Position rejected", ExecutionOutcomeType.PATROL_COMPLETED.value: "Move completed", ExecutionOutcomeType.RECHARGE_COMPLETED.value: "Move completed", ExecutionOutcomeType.INSPECTION_COMPLETED.value: "Field check completed", ExecutionOutcomeType.WEED_REMOVAL_COMPLETED.value: "Weed removal completed", ExecutionOutcomeType.SOIL_MEASUREMENT_COMPLETED.value: "Soil sample completed", ExecutionOutcomeType.FERTILIZER_APPLIED.value: "Fertilizer applied", ExecutionOutcomeType.WATER_APPLIED.value: "Water applied", ExecutionOutcomeType.DRAINAGE_INSTALLED.value: "Drainage installed", ExecutionOutcomeType.SUBSOIL_COMPLETED.value: "Subsoil completed", ExecutionOutcomeType.DEFERRED.value: "No action applied", ExecutionOutcomeType.RESOURCE_EXHAUSTED.value: "Resource exhausted", }.get(value, humanize_metric_token(str(value))) def render_plan(snapshot) -> None: if not snapshot.mission_plan.ordered_tasks: st.write("No mission plan available yet.") return current_task = snapshot.mission_plan.ordered_tasks[0] st.write(f"**Next Robot Step:** {humanize_action_name(str(current_task.task_type))}") if current_task.control_intent is not None: st.write(f"**Control Intent:** {humanize_control_intent(current_task.control_intent)}") st.write(f"**Target Field:** {short_zone_label(current_task.target_zone)}") st.write(f"**Rationale:** {current_task.rationale}") st.write("**Plan Queue**") for task in snapshot.mission_plan.ordered_tasks[:6]: st.write( f"- {humanize_action_name(str(task.task_type))} on {short_zone_label(task.target_zone)} " f"(planner score {task.priority:.2f}" + (f", {humanize_control_intent(task.control_intent)}" if task.control_intent is not None else "") + ")" ) st.write("**Field Assessments**") for assessment in snapshot.assessments[:6]: st.write( f"- {short_zone_label(assessment.zone_id)}: {priority_band_label(assessment.priority_score)} | " f"{humanize_action_name(str(assessment.recommended_action))} next | " f"{humanize_control_state(assessment.control_state)} | " f"{humanize_constraint(assessment.constraint_level)} | uncertainty {assessment.uncertainty:.2f}" ) def format_control_state_transition(event) -> str | None: before = event.control_state_before after = event.control_state_after if before is None and after is None: return None if before is None: return humanize_control_state(after) if after is None: return humanize_control_state(before) before_label = humanize_control_state(before) after_label = humanize_control_state(after) if before_label == after_label: return after_label return f"{before_label} -> {after_label}" def describe_task(task) -> str: if task is None: return "No planner task recorded" task_text = humanize_action_name(str(task.task_type)) if task.target_zone is not None: task_text += f" on {short_zone_label(task.target_zone)}" if task.control_intent is not None: task_text += f" ({humanize_control_intent(task.control_intent)})" return task_text def render_assessment_snapshot(title: str, assessment) -> None: st.write(f"**{title}**") if assessment is None: st.caption("No assessment snapshot recorded for this step.") return st.write( f"{short_zone_label(assessment.zone_id)} | {priority_band_label(assessment.priority_score)} | " f"{humanize_action_name(str(assessment.recommended_action))} next | " f"{humanize_control_state(assessment.control_state)} | " f"{humanize_constraint(assessment.constraint_level)} | uncertainty {assessment.uncertainty:.2f}" ) if assessment.recommended_control_intent is not None: st.write(f"Control intent: {humanize_control_intent(assessment.recommended_control_intent)}") if assessment.supporting_evidence: st.write("Supporting evidence") for fact in assessment.supporting_evidence[:5]: st.write(f"- {fact}") def render_assessment_after_step(summary: dict[str, object] | None) -> None: st.write("**Field State After Step**") if not summary: st.caption("No post-step field assessment was recorded.") return st.write( f"{short_zone_label(summary.get('zone_id'))} | {normalize_priority_label(summary.get('inspection_priority'))} | " f"{summary.get('control_state')} | uncertainty {float(summary.get('uncertainty', 0.0)):.2f}" ) st.write( f"Visit count: {summary.get('visit_count', 0)}" + (f" | Revisits: {summary.get('revisit_count', 0)}" if summary.get("revisit_count") is not None else "") ) def render_plan_and_timeline(snapshot) -> None: """Show the current plan as an inline 'next step' banner, then the full step history below.""" plan = snapshot.mission_plan if plan.ordered_tasks: task = plan.ordered_tasks[0] target_label = short_zone_label(task.target_zone) if task.target_zone else "—" intent_suffix = f" ({humanize_control_intent(task.control_intent)})" if task.control_intent else "" queue_preview = ", ".join( f"{humanize_action_name(str(t.task_type))} {short_zone_label(t.target_zone)}" for t in plan.ordered_tasks[1:4] ) st.markdown( f"**Next → {humanize_action_name(str(task.task_type))} on {target_label}{intent_suffix}**   " f"_{task.rationale}_", unsafe_allow_html=True, ) if queue_preview: st.caption(f"Queue: {queue_preview}") default_show_timeline = snapshot.observation.mode != SimulatorMode.REAL_FIELD show_timeline = st.toggle( "Show step timeline", value=bool(st.session_state.get(SHOW_TIMELINE_BODY_KEY, default_show_timeline)), key=SHOW_TIMELINE_BODY_KEY, ) if not show_timeline: if snapshot.event_history: st.caption(f"Step timeline hidden. {len(snapshot.event_history)} recorded steps are available on demand.") else: st.caption("No events yet.") return if snapshot.event_history: timeline_window = st.selectbox( "Timeline Depth", TIMELINE_WINDOW_OPTIONS, index=0, key=TIMELINE_WINDOW_KEY, format_func=lambda value: "All steps" if value == 0 else f"Latest {value} steps", ) total_steps = len(snapshot.event_history) if timeline_window and total_steps > timeline_window: st.caption(f"Showing the latest {timeline_window} of {total_steps} steps.") render_step_timeline(snapshot) def render_step_timeline(snapshot) -> None: if not snapshot.event_history: st.write("No events yet.") return latest_step_index = snapshot.event_history[-1].step_index timeline_window = int(st.session_state.get(TIMELINE_WINDOW_KEY, TIMELINE_WINDOW_OPTIONS[0])) events = snapshot.event_history if timeline_window == 0 else snapshot.event_history[-timeline_window:] for event in events: transition = format_control_state_transition(event) label_parts = [ f"Step {event.step_index}", humanize_action_name(str(event.action)), format_event_location(event), ] if transition is not None: label_parts.append(transition) with st.expander(" | ".join(label_parts), expanded=event.step_index == latest_step_index): summary_left, summary_middle, summary_right = st.columns([2, 1, 1]) with summary_left: st.write("**Step Overview**") st.write(f"Action: {humanize_action_name(str(event.action))}") st.write(f"Location: {format_event_location(event)}") st.write(f"Outcome: {humanize_execution_outcome(event.execution_outcome)}") st.write(f"Reward: {event.reward:.2f}") with summary_middle: st.write("**Control Outcome**") if transition is not None: st.write(f"Control state: {transition}") if event_control_intent(event) is not None: st.write(f"Intent: {humanize_control_intent(event_control_intent(event))}") if event.control_event is not None and event.control_event.rejection_reason is not None: st.write(f"Rejection reason: `{event.control_event.rejection_reason}`") if event_terminal_reason(event) is not None: st.write(f"Terminal reason: {humanize_terminal_reason(event_terminal_reason(event))}") if event.details.get("reason"): st.write(f"Step reason: {event.details['reason']}") if event.planner_context is not None: chosen = event.planner_context.chosen_task_before_normalization normalized = event.planner_context.normalized_task if chosen != normalized: st.write(f"Normalized task: {describe_task(normalized)}") with summary_right: st.write("**Evidence And Context**") context = event.guarded_context_after if context is None: st.caption("No context recorded.") else: st.write(f"Field checks: {context.inspection_count}") st.write(f"Revisits: {context.revisit_count}") if event.details.get("uncertainty_delta") is not None: st.write(f"Uncertainty delta: {float(event.details['uncertainty_delta']):+.2f}") if event.planner_context is not None: st.write(f"Evidence reused: {'Yes' if event.planner_context.evidence_reused else 'No'}") if event.planner_context.fixture_identity is not None: st.write(f"Fixture: `{event.planner_context.fixture_identity}`") revealed_facts = event.details.get("revealed_facts") or ["No new field information was revealed."] st.write("**What Happened In This Step**") for fact in revealed_facts[:6]: st.write(f"- {fact}") render_assessment_snapshot( "Planner View Before Step", event.planner_context.selected_assessment if event.planner_context else None, ) if event.planner_context is not None: st.write("**Why This Decision**") max_e = snapshot.observation.max_energy if event.planner_context.battery_before_step is not None and max_e: pct_before = battery_percentage_int(event.planner_context.battery_before_step, max_e) pct_after = battery_percentage_int(event.planner_context.estimated_battery_after_step, max_e) st.write(f"Battery: {pct_before}% → {pct_after}%") if event.planner_context.return_distance_before_step is not None: st.write(f"Hops to base: {event.planner_context.return_distance_before_step} → {event.planner_context.return_distance_after_step}") all_candidates = list(event.planner_context.candidate_summaries) + list(event.planner_context.adjacent_candidate_summaries) if all_candidates: st.write("Other options considered") for summary in all_candidates: target_label = short_zone_label(summary.target_zone) if summary.target_zone else "—" if summary.rejected_for_battery_infeasibility: note = " [battery-infeasible]" elif summary.rejection_reason: note = f" [{summary.rejection_reason}]" else: note = "" st.write(f"- {humanize_action_name(str(summary.task_type))} {target_label} — utility {summary.net_utility:.2f}{note}") render_assessment_after_step(event.details.get("assessment_summary_after_step")) if __name__ == "__main__": main()