| from __future__ import annotations |
|
|
| import base64 |
| import io |
| import json |
| import math |
| import os |
| import time |
| from dataclasses import dataclass, replace |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Callable |
| from urllib.parse import quote |
|
|
| import numpy as np |
| import pandas as pd |
| import plotly.graph_objects as go |
| import streamlit as st |
| import streamlit.components.v1 as components |
| from autofarm.adapters.datasets import DatasetCatalog |
| from autofarm.contracts import ( |
| ActionType, |
| ControlState, |
| EpisodeOutcomeStatus, |
| ExecutionOutcomeType, |
| MissionPlan, |
| ResolutionIntent, |
| SimulatorMode, |
| TerminalReason, |
| ) |
| from autofarm.debug_profile import ( |
| ACTIVE_DEBUG_SCENARIO, |
| ACTIVE_DEBUG_STRATEGY, |
| visible_riddle_scenarios, |
| visible_strategy_names, |
| ) |
| from autofarm.evaluation.baselines import HybridStrategy, build_strategies |
| from autofarm.paths import REPO_ROOT |
| from autofarm.urgency import ( |
| MISSING_YIELD_COLOR, |
| REAL_FIELD_CAUTION_URGENCY_THRESHOLD, |
| REAL_FIELD_CRITICAL_URGENCY_THRESHOLD, |
| ) |
| from autofarm.planning.utility import action_type_for_field_recommendation |
|
|
| from intervention import render_intervention_view, render_intervention_sim_view |
| from intervention.simulation_view import write_sim_input_csvs |
| from intervention.constants import ( |
| INTERVENTION_DATA_KEY, |
| INTERVENTION_PREDICTED_KEY, |
| INTERVENTION_SIM_ERROR_KEY, |
| INTERVENTION_SIM_MAP_PATH, |
| INTERVENTION_SIM_VIEW_KEY, |
| INTERVENTION_VIEW_KEY, |
| ) |
| from autofarm.sim.engine import ( |
| SimulatorEnvironment, |
| build_environment, |
| build_interactive_environment, |
| build_real_field_environment, |
| shortest_path_zone_ids, |
| ) |
| from autofarm.sim.playback import ( |
| PLAYBACK_MODE_AUTO, |
| PLAYBACK_MODE_MANUAL, |
| PLAYBACK_SPEED_1HZ, |
| PLAYBACK_SPEED_4HZ, |
| PLAYBACK_SPEED_8HZ, |
| default_playback_state, |
| playback_interval_ms, |
| playback_speed_options, |
| should_process_tick, |
| ) |
| from autofarm.sim.session_persistence import ( |
| PersistenceCoordinator, |
| SessionPersistenceResult, |
| create_session_id, |
| persist_interactive_session, |
| ) |
| from autofarm.sim.scenarios import RIDDLE_SAFETY_MAX_STEPS, available_riddles, interactive_challenge_templates |
|
|
| ENV_KEY = "env" |
| ENV_GENERATION_KEY = "env_generation" |
| PERSISTENCE_GENERATION_KEY = "persistence_generation" |
| SESSION_ID_KEY = "simulator_session_id" |
| SESSION_PERSISTENCE_KEY = "simulator_session_persistence" |
| PERSISTENCE_COORDINATOR_KEY = "simulator_persistence_coordinator" |
| PERSISTENCE_WARNING_KEY = "simulator_persistence_warning" |
| LAST_BOARD_EVENT_KEY = "interactive_last_board_event" |
| SELECTED_ZONE_KEY = "selected_zone_id" |
| PENDING_CHALLENGE_TEMPLATE_KEY = "interactive_pending_challenge_template" |
| PENDING_CHALLENGE_RADIUS_KEY = "interactive_pending_challenge_radius" |
| PLACEMENT_FLASH_KEY = "interactive_placement_flash" |
| FIGURE_CACHE_KEY = "zone_grid_figure_cache" |
| STATIC_MAP_CACHE_KEY = "zone_grid_static_cache" |
| TIMELINE_WINDOW_KEY = "timeline_window_size" |
| SHOW_TIMELINE_BODY_KEY = "show_timeline_body" |
| SHOW_PLANNED_TRAJECTORY_KEY = "show_planned_trajectory" |
| PLAYBACK_MODE_KEY = "playback_mode" |
| PLAYBACK_RUNNING_KEY = "playback_running" |
| PLAYBACK_SPEED_KEY = "playback_speed" |
| PLAYBACK_LAST_TICK_KEY = "playback_last_processed_tick" |
| PLAYBACK_WARNING_KEY = "playback_warning" |
| PLAYBACK_NEXT_STEP_AT_KEY = "playback_next_step_at" |
| PLAYBACK_TELEMETRY_KEY = "playback_telemetry" |
| PLAYBACK_PENDING_STEP_KEY = "playback_pending_step_telemetry" |
| BOARD_COMPONENT = components.declare_component( |
| "challenge_board", |
| path=str((Path(__file__).parent / "components" / "challenge_board").resolve()), |
| ) |
| AUTO_TICK_COMPONENT = components.declare_component( |
| "auto_tick", |
| path=str((Path(__file__).parent / "components" / "auto_tick").resolve()), |
| ) |
| SCRIPT_EVAL_EXPORT_ROOT = REPO_ROOT / "files_script_eval" |
| FIELD_TILE_STATE_EXPORT_PATH = SCRIPT_EVAL_EXPORT_ROOT / "field_tile_states.json" |
| TILE_ACTION_LOG_EXPORT_PATH = SCRIPT_EVAL_EXPORT_ROOT / "tile_action_log.json" |
| POSITIVE_SIGNAL_ANOMALIES = {"weed_presence"} |
| SNAPSHOT_WRITE_INTERVAL = 5 |
| TIMELINE_WINDOW_OPTIONS = [25, 50, 100, 250, 0] |
| PLANNED_TRAJECTORY_MOVE_STEP_BUDGET = 10 |
| RECENT_TASK_ICON_TTL_STEPS = 5 |
| PLAYBACK_TELEMETRY_WINDOW = 20 |
| SERVICE_ACTIONS = { |
| ActionType.REMOVE_WEEDS, |
| ActionType.APPLY_FERTILIZER, |
| ActionType.APPLY_WATER, |
| ActionType.INSTALL_DRAINAGE, |
| ActionType.SUBSOIL, |
| } |
|
|
|
|
| @dataclass(frozen=True) |
| class RealFieldStaticMapPayload: |
| figure: go.Figure |
| zone_ids: tuple[str, ...] |
| xs: tuple[int, ...] |
| ys: tuple[int, ...] |
| x_values: tuple[int, ...] |
| y_values: tuple[int, ...] |
| grid_size: int |
| cell_marker_size: int |
| cell_text_size: int |
| marker_ring_size: int |
| step_marker_size: int |
| home_base_x: float |
| home_base_y: float |
| entry_x: float |
| entry_y: float |
| tile_world_x0: float |
| tile_world_x1: float |
| tile_world_y0: float |
| tile_world_y1: float |
|
|
|
|
| @dataclass(frozen=True) |
| class RealFieldDynamicMapPayload: |
| texts: list[str] |
| colors: list[str] |
| line_colors: list[str] |
| line_widths: list[int] |
| hover_texts: list[str] |
| previous_zone_id: str | None |
| current_zone_id: str | None |
| event_marker_xs: list[float] |
| event_marker_ys: list[float] |
| event_marker_texts: list[str] |
| event_marker_colors: list[str] |
| event_marker_hovers: list[str] |
| planned_trajectory: "PlannedTrajectoryPayload | None" |
|
|
|
|
| @dataclass(frozen=True) |
| class PlannedTrajectoryPayload: |
| route_zone_ids: tuple[str, ...] |
| starts_from_home: bool |
| route_xs: list[float] |
| route_ys: list[float] |
| waypoint_xs: list[float] |
| waypoint_ys: list[float] |
| task_marker_xs: list[float] |
| task_marker_ys: list[float] |
| task_marker_texts: list[str] |
| task_marker_sizes: list[int] |
| task_marker_colors: list[str] |
| task_marker_hovers: list[str] |
|
|
|
|
| @dataclass(frozen=True) |
| class PlannedHorizonAction: |
| order: int |
| zone_id: str |
| task_type: str |
| rationale: str |
|
|
|
|
| @dataclass(frozen=True) |
| class PlannedHorizonPayload: |
| route_zone_ids: tuple[str, ...] |
| starts_from_home: bool |
| actions: tuple[PlannedHorizonAction, ...] |
|
|
|
|
| def enum_value(value): |
| return getattr(value, "value", value) |
|
|
|
|
| def finding_bucket(anomaly_type: str) -> str: |
| if anomaly_type in POSITIVE_SIGNAL_ANOMALIES: |
| return "Positive signal findings" |
| return "Other findings" |
|
|
|
|
| def render_finding_groups(findings, *, title_when_empty: str) -> None: |
| if not findings: |
| st.write(title_when_empty) |
| return |
| grouped: dict[str, list[str]] = {} |
| for finding in findings: |
| anomaly_type = str(finding.anomaly_type) |
| grouped.setdefault(finding_bucket(anomaly_type), []).append( |
| f"`{anomaly_type}` severity={finding.severity} confidence={finding.confidence:.2f}" |
| ) |
| for section_title in ( |
| "Positive signal findings", |
| "Other findings", |
| ): |
| rows = grouped.get(section_title) |
| if not rows: |
| continue |
| st.write(section_title) |
| for row in rows: |
| st.write(f"- {row}") |
|
|
|
|
| def event_control_intent(event): |
| if event.control_event is None: |
| return None |
| return event.control_event.requested_intent |
|
|
|
|
| def event_terminal_reason(event): |
| if event.guarded_context_after is None: |
| return None |
| return event.guarded_context_after.terminal_reason |
|
|
|
|
| def preferred_option(options: list[str], preferred: str) -> str: |
| if preferred in options: |
| return preferred |
| return options[0] |
|
|
|
|
| def set_manual_primary_axis(axis: str) -> None: |
| st.session_state["interactive_manual_primary_axis"] = axis |
|
|
|
|
| def arm_pending_challenge(template_id: str, radius: int) -> None: |
| st.session_state[PENDING_CHALLENGE_TEMPLATE_KEY] = template_id |
| st.session_state[PENDING_CHALLENGE_RADIUS_KEY] = int(radius) |
|
|
|
|
| def clear_pending_challenge() -> None: |
| st.session_state.pop(PENDING_CHALLENGE_TEMPLATE_KEY, None) |
| st.session_state.pop(PENDING_CHALLENGE_RADIUS_KEY, None) |
|
|
|
|
| def pending_challenge_request() -> str | None: |
| template_id = st.session_state.get(PENDING_CHALLENGE_TEMPLATE_KEY) |
| if not isinstance(template_id, str) or not template_id: |
| return None |
| return template_id |
|
|
|
|
| def pending_challenge_radius() -> int: |
| radius = st.session_state.get(PENDING_CHALLENGE_RADIUS_KEY, 1) |
| try: |
| return max(1, min(3, int(radius))) |
| except (TypeError, ValueError): |
| return 1 |
|
|
|
|
| def set_placement_flash_message(success: bool, message: str) -> None: |
| st.session_state[PLACEMENT_FLASH_KEY] = {"success": success, "message": message} |
|
|
|
|
| def pop_placement_flash_message() -> tuple[bool, str] | None: |
| payload = st.session_state.pop(PLACEMENT_FLASH_KEY, None) |
| if not isinstance(payload, dict): |
| return None |
| return bool(payload.get("success")), str(payload.get("message") or "") |
|
|
|
|
| def bump_env_generation() -> None: |
| st.session_state[ENV_GENERATION_KEY] = int(st.session_state.get(ENV_GENERATION_KEY, 0)) + 1 |
|
|
|
|
| def bump_persistence_generation() -> None: |
| st.session_state[PERSISTENCE_GENERATION_KEY] = int(st.session_state.get(PERSISTENCE_GENERATION_KEY, 0)) + 1 |
|
|
|
|
| def is_persistent_world_mode_value(mode_name: str) -> bool: |
| return mode_name in {SimulatorMode.INTERACTIVE.value, SimulatorMode.REAL_FIELD.value} |
|
|
|
|
| def is_persistent_world_mode(mode: SimulatorMode) -> bool: |
| return mode in {SimulatorMode.INTERACTIVE, SimulatorMode.REAL_FIELD} |
|
|
|
|
| def ensure_playback_state_defaults() -> None: |
| defaults = default_playback_state() |
| st.session_state.setdefault(PLAYBACK_MODE_KEY, defaults["mode"]) |
| st.session_state.setdefault(PLAYBACK_RUNNING_KEY, defaults["running"]) |
| st.session_state.setdefault(PLAYBACK_SPEED_KEY, defaults["speed"]) |
| st.session_state.setdefault(PLAYBACK_LAST_TICK_KEY, defaults["last_processed_tick"]) |
| st.session_state.setdefault(PLAYBACK_WARNING_KEY, None) |
| st.session_state.setdefault(PLAYBACK_NEXT_STEP_AT_KEY, None) |
| st.session_state.setdefault(PLAYBACK_TELEMETRY_KEY, default_playback_telemetry()) |
| st.session_state.setdefault(PLAYBACK_PENDING_STEP_KEY, None) |
|
|
|
|
| def default_playback_telemetry() -> dict[str, object]: |
| return { |
| "speed": PLAYBACK_SPEED_1HZ, |
| "started_at": None, |
| "step_count": 0, |
| "last_step_completed_at": None, |
| "recent_step_intervals_ms": [], |
| "last_engine_step_ms": None, |
| "recent_engine_step_ms": [], |
| "last_total_app_step_ms": None, |
| "recent_total_app_step_ms": [], |
| "last_render_ms": None, |
| "recent_render_ms": [], |
| "last_persistence_ms": None, |
| "recent_persistence_ms": [], |
| "last_export_ms": None, |
| "recent_export_ms": [], |
| "last_strategy_plan_ms": None, |
| "recent_strategy_plan_ms": [], |
| "last_decision_trace_ms": None, |
| "recent_decision_trace_ms": [], |
| } |
|
|
|
|
| def append_recent_metric(series: object, value: float) -> list[float]: |
| values = [float(item) for item in series] if isinstance(series, list) else [] |
| values.append(round(float(value), 3)) |
| return values[-PLAYBACK_TELEMETRY_WINDOW:] |
|
|
|
|
| def reset_playback_telemetry(speed: str) -> None: |
| telemetry = default_playback_telemetry() |
| telemetry["speed"] = speed |
| st.session_state[PLAYBACK_TELEMETRY_KEY] = telemetry |
| st.session_state[PLAYBACK_PENDING_STEP_KEY] = None |
|
|
|
|
| def maybe_reset_playback_telemetry_for_speed_change(speed: str) -> None: |
| telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY) |
| if not isinstance(telemetry, dict) or telemetry.get("speed") == speed: |
| return |
| reset_playback_telemetry(speed) |
| if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): |
| st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + ( |
| playback_interval_ms(speed) / 1000.0 |
| ) |
|
|
|
|
| def begin_playback_step_telemetry( |
| *, |
| speed: str, |
| engine_step_ms: float, |
| step_started_at: float, |
| step_perf: dict[str, float] | None, |
| ) -> None: |
| st.session_state[PLAYBACK_PENDING_STEP_KEY] = { |
| "speed": speed, |
| "engine_step_ms": round(engine_step_ms, 3), |
| "step_started_at": step_started_at, |
| "step_perf": dict(step_perf or {}), |
| } |
|
|
|
|
| def finalize_playback_step_telemetry( |
| *, |
| speed: str, |
| completed_at: float, |
| total_app_step_ms: float, |
| render_ms: float, |
| persistence_ms: float, |
| export_ms: float, |
| ) -> None: |
| pending = st.session_state.get(PLAYBACK_PENDING_STEP_KEY) |
| if not isinstance(pending, dict): |
| return |
| telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY) |
| if not isinstance(telemetry, dict): |
| telemetry = default_playback_telemetry() |
| last_step_completed_at = telemetry.get("last_step_completed_at") |
| recent_intervals = telemetry.get("recent_step_intervals_ms") |
| if isinstance(last_step_completed_at, (int, float)): |
| recent_intervals = append_recent_metric( |
| recent_intervals, |
| (completed_at - float(last_step_completed_at)) * 1000.0, |
| ) |
| step_perf = pending.get("step_perf") |
| strategy_plan_ms = None |
| decision_trace_ms = None |
| if isinstance(step_perf, dict): |
| if isinstance(step_perf.get("strategy_plan_ms"), (int, float)): |
| strategy_plan_ms = float(step_perf["strategy_plan_ms"]) |
| if isinstance(step_perf.get("decision_trace_ms"), (int, float)): |
| decision_trace_ms = float(step_perf["decision_trace_ms"]) |
| telemetry.update( |
| { |
| "speed": speed, |
| "started_at": telemetry.get("started_at") or completed_at, |
| "step_count": int(telemetry.get("step_count", 0)) + 1, |
| "last_step_completed_at": completed_at, |
| "last_engine_step_ms": round(float(pending.get("engine_step_ms", 0.0)), 3), |
| "recent_engine_step_ms": append_recent_metric( |
| telemetry.get("recent_engine_step_ms"), |
| float(pending.get("engine_step_ms", 0.0)), |
| ), |
| "last_total_app_step_ms": round(total_app_step_ms, 3), |
| "recent_total_app_step_ms": append_recent_metric( |
| telemetry.get("recent_total_app_step_ms"), |
| total_app_step_ms, |
| ), |
| "last_render_ms": round(render_ms, 3), |
| "recent_render_ms": append_recent_metric( |
| telemetry.get("recent_render_ms"), |
| render_ms, |
| ), |
| "last_persistence_ms": round(persistence_ms, 3), |
| "recent_persistence_ms": append_recent_metric( |
| telemetry.get("recent_persistence_ms"), |
| persistence_ms, |
| ), |
| "last_export_ms": round(export_ms, 3), |
| "recent_export_ms": append_recent_metric( |
| telemetry.get("recent_export_ms"), |
| export_ms, |
| ), |
| "recent_step_intervals_ms": recent_intervals if isinstance(recent_intervals, list) else [], |
| } |
| ) |
| if strategy_plan_ms is not None: |
| telemetry["last_strategy_plan_ms"] = round(strategy_plan_ms, 3) |
| telemetry["recent_strategy_plan_ms"] = append_recent_metric( |
| telemetry.get("recent_strategy_plan_ms"), |
| strategy_plan_ms, |
| ) |
| if decision_trace_ms is not None: |
| telemetry["last_decision_trace_ms"] = round(decision_trace_ms, 3) |
| telemetry["recent_decision_trace_ms"] = append_recent_metric( |
| telemetry.get("recent_decision_trace_ms"), |
| decision_trace_ms, |
| ) |
| st.session_state[PLAYBACK_TELEMETRY_KEY] = telemetry |
| st.session_state[PLAYBACK_PENDING_STEP_KEY] = None |
|
|
|
|
| def average_metric(series: object) -> float | None: |
| if not isinstance(series, list) or not series: |
| return None |
| return round(sum(float(item) for item in series) / len(series), 3) |
|
|
|
|
| def p95_metric(series: object) -> float | None: |
| if not isinstance(series, list) or not series: |
| return None |
| values = sorted(float(item) for item in series) |
| index = min(len(values) - 1, math.ceil(0.95 * len(values)) - 1) |
| return round(values[index], 3) |
|
|
|
|
| def max_metric(series: object) -> float | None: |
| if not isinstance(series, list) or not series: |
| return None |
| return round(max(float(item) for item in series), 3) |
|
|
|
|
| def metric_summary(*, telemetry: dict[str, object], name: str) -> dict[str, float | None]: |
| last_key = f"last_{name}" |
| recent_key = f"recent_{name}" |
| last_value = telemetry.get(last_key) |
| return { |
| "last": round(float(last_value), 3) if isinstance(last_value, (int, float)) else None, |
| "avg": average_metric(telemetry.get(recent_key)), |
| "p95": p95_metric(telemetry.get(recent_key)), |
| "max": max_metric(telemetry.get(recent_key)), |
| } |
|
|
|
|
| def playback_telemetry_snapshot() -> dict[str, object]: |
| telemetry = st.session_state.get(PLAYBACK_TELEMETRY_KEY) |
| if not isinstance(telemetry, dict): |
| return { |
| "step_count": 0, |
| "achieved_hz": None, |
| "engine_step_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| "total_app_step_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| "render_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| "persistence_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| "export_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| "strategy_plan_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| "decision_trace_ms": {"last": None, "avg": None, "p95": None, "max": None}, |
| } |
| avg_interval_ms = average_metric(telemetry.get("recent_step_intervals_ms")) |
| achieved_hz = None if avg_interval_ms in {None, 0.0} else round(1000.0 / avg_interval_ms, 3) |
| return { |
| "step_count": int(telemetry.get("step_count", 0)), |
| "achieved_hz": achieved_hz, |
| "engine_step_ms": metric_summary(telemetry=telemetry, name="engine_step_ms"), |
| "total_app_step_ms": metric_summary(telemetry=telemetry, name="total_app_step_ms"), |
| "render_ms": metric_summary(telemetry=telemetry, name="render_ms"), |
| "persistence_ms": metric_summary(telemetry=telemetry, name="persistence_ms"), |
| "export_ms": metric_summary(telemetry=telemetry, name="export_ms"), |
| "strategy_plan_ms": metric_summary(telemetry=telemetry, name="strategy_plan_ms"), |
| "decision_trace_ms": metric_summary(telemetry=telemetry, name="decision_trace_ms"), |
| } |
|
|
|
|
| def render_playback_telemetry(*, speed: str, running: bool, mode: SimulatorMode) -> None: |
| target_hz = round(1000.0 / playback_interval_ms(speed), 2) |
| telemetry = playback_telemetry_snapshot() |
| achieved_hz = telemetry.get("achieved_hz") |
| engine_step_ms = telemetry["engine_step_ms"] |
| total_app_step_ms = telemetry["total_app_step_ms"] |
| if running: |
| if achieved_hz is None: |
| st.caption(f"Target {target_hz:.2f} Hz. Telemetry will populate after the next auto-step.") |
| else: |
| parts = [ |
| f"Target {target_hz:.2f} Hz", |
| f"Achieved {float(achieved_hz):.2f} Hz", |
| ( |
| f"Engine {float(engine_step_ms['last'] or 0.0):.1f} / " |
| f"{float(engine_step_ms['avg'] or 0.0):.1f} / " |
| f"{float(engine_step_ms['p95'] or 0.0):.1f} ms" |
| ), |
| ] |
| if mode == SimulatorMode.REAL_FIELD: |
| parts.append( |
| f"App {float(total_app_step_ms['last'] or 0.0):.1f} / " |
| f"{float(total_app_step_ms['avg'] or 0.0):.1f} / " |
| f"{float(total_app_step_ms['p95'] or 0.0):.1f} ms" |
| ) |
| parts.append( |
| f"Render {float(telemetry['render_ms']['avg'] or 0.0):.1f} ms" |
| ) |
| parts.append( |
| f"Persist {float(telemetry['persistence_ms']['avg'] or 0.0):.1f} ms" |
| ) |
| parts.append( |
| f"Export {float(telemetry['export_ms']['avg'] or 0.0):.1f} ms" |
| ) |
| st.caption(" | ".join(parts)) |
| return |
| if int(telemetry.get("step_count", 0)) > 0: |
| achieved_text = "n/a" if achieved_hz is None else f"{float(achieved_hz):.2f} Hz" |
| parts = [ |
| f"Last run {achieved_text} achieved", |
| f"Engine {float(engine_step_ms['last'] or 0.0):.1f} / {float(engine_step_ms['avg'] or 0.0):.1f} ms", |
| ] |
| if mode == SimulatorMode.REAL_FIELD: |
| parts.append( |
| f"App {float(total_app_step_ms['last'] or 0.0):.1f} / {float(total_app_step_ms['avg'] or 0.0):.1f} ms" |
| ) |
| st.caption(" | ".join(parts)) |
|
|
|
|
| def build_runtime_performance_metrics(env: SimulatorEnvironment) -> dict[str, object]: |
| metrics = env.metrics() |
| if env.mode != SimulatorMode.REAL_FIELD: |
| return metrics |
| metrics = dict(metrics) |
| metrics["runtime_performance"] = { |
| "playback": playback_telemetry_snapshot(), |
| "last_step_engine_phases_ms": env.last_step_performance(), |
| } |
| return metrics |
|
|
|
|
| def pause_playback(*, clear_last_tick: bool) -> None: |
| st.session_state[PLAYBACK_RUNNING_KEY] = False |
| if clear_last_tick: |
| st.session_state[PLAYBACK_LAST_TICK_KEY] = None |
| st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = None |
|
|
|
|
| def start_playback() -> None: |
| speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)) |
| st.session_state[PLAYBACK_RUNNING_KEY] = True |
| st.session_state[PLAYBACK_WARNING_KEY] = None |
| reset_playback_telemetry(speed) |
| st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + ( |
| playback_interval_ms(speed) / 1000.0 |
| ) |
|
|
|
|
| def execute_simulator_step(env: SimulatorEnvironment, strategy) -> tuple[bool, float, float, dict[str, float]]: |
| before_step_index = int(env.step_index) |
| before_event_count = len(env.event_history) |
| started_at = time.perf_counter() |
| env.step(strategy) |
| duration_ms = (time.perf_counter() - started_at) * 1000.0 |
| after_step_index = int(env.step_index) |
| after_event_count = len(env.event_history) |
| return ( |
| after_step_index > before_step_index and after_event_count > before_event_count, |
| started_at, |
| duration_ms, |
| env.last_step_performance(), |
| ) |
|
|
|
|
| def maybe_run_fragment_auto_step(env: SimulatorEnvironment, strategy) -> bool: |
| if str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) != PLAYBACK_MODE_AUTO: |
| return False |
| if not bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) or env.done: |
| return False |
|
|
| interval_seconds = playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))) / 1000.0 |
| next_step_at = st.session_state.get(PLAYBACK_NEXT_STEP_AT_KEY) |
| now = time.monotonic() |
| if next_step_at is None: |
| st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = now + interval_seconds |
| return False |
| if now < float(next_step_at): |
| return False |
|
|
| advanced, step_started_at, step_duration_ms, step_perf = execute_simulator_step(env, strategy) |
| if advanced: |
| begin_playback_step_telemetry( |
| speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)), |
| engine_step_ms=step_duration_ms, |
| step_started_at=step_started_at, |
| step_perf=step_perf, |
| ) |
| st.session_state[PLAYBACK_WARNING_KEY] = None |
| st.session_state[PLAYBACK_NEXT_STEP_AT_KEY] = time.monotonic() + interval_seconds |
| else: |
| pause_playback(clear_last_tick=True) |
| st.session_state[PLAYBACK_WARNING_KEY] = ( |
| "Playback paused because the last auto-step did not advance the simulator state." |
| ) |
| return True |
|
|
|
|
| def render_auto_tick_component(*, enabled: bool, interval_ms: int, session_token: str): |
| return AUTO_TICK_COMPONENT( |
| enabled=enabled, |
| interval_ms=interval_ms, |
| session_token=session_token, |
| key="simulator_auto_tick", |
| ) |
|
|
|
|
| def fragment_runner(run_every_seconds: float): |
| if hasattr(st, "fragment"): |
| return st.fragment(run_every=run_every_seconds) |
| if hasattr(st, "experimental_fragment"): |
| return st.experimental_fragment(run_every=run_every_seconds) |
| return None |
|
|
|
|
| def field_extent(zone_states) -> tuple[int, int]: |
| return ( |
| max((zone.row or 0) for zone in zone_states), |
| max((zone.col or 0) for zone in zone_states), |
| ) |
|
|
|
|
| def is_rectangular_field(zone_states) -> bool: |
| rows, cols = field_extent(zone_states) |
| return len(zone_states) == rows * cols |
|
|
|
|
| def field_layout_metric(env: SimulatorEnvironment) -> tuple[str, str]: |
| rows, cols = field_extent(env.zone_states) |
| if env.mode == SimulatorMode.REAL_FIELD: |
| return "Field Extent", f"{rows} x {cols}" |
| return "Field Size", f"{rows} x {cols}" |
|
|
|
|
| def _prewarm_simulator_resources() -> None: |
| """Warm st.cache_resource caches so the simulator page loads instantly.""" |
| try: |
| get_strategies() |
| except Exception: |
| pass |
| try: |
| if ENV_KEY not in st.session_state: |
| catalog = DatasetCatalog(ground_weeds=()) |
| st.session_state[ENV_KEY] = build_real_field_environment( |
| catalog=catalog, |
| ) |
| except Exception: |
| pass |
|
|
|
|
| def render_sidebar_heading(label: str, *, size_rem: float) -> None: |
| st.markdown( |
| ( |
| f"<div style='font-size:{size_rem:.2f}rem;font-weight:700;" |
| "line-height:1.2;color:#0f172a;margin:0 0 0.35rem 0;'>" |
| f"{label}</div>" |
| ), |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| def main() -> None: |
| st.set_page_config(page_title="AutoFarm Simulator", layout="wide") |
| st.markdown( |
| "<style>MainMenu {visibility: hidden;} header {visibility: hidden;}" |
| " .block-container {padding-top: 1rem;}</style>", |
| unsafe_allow_html=True, |
| ) |
| st.title("AutoFarm Simulator") |
|
|
| st.session_state.setdefault(INTERVENTION_VIEW_KEY, True) |
| st.session_state.setdefault(INTERVENTION_PREDICTED_KEY, False) |
| st.session_state.setdefault(INTERVENTION_SIM_VIEW_KEY, False) |
| st.session_state.setdefault(INTERVENTION_SIM_ERROR_KEY, None) |
|
|
| if st.session_state[INTERVENTION_VIEW_KEY]: |
| render_intervention_view(on_continue=_prewarm_simulator_resources) |
| return |
|
|
| if st.session_state[INTERVENTION_SIM_VIEW_KEY]: |
| def _get_snapshot(): |
| env = st.session_state.get(ENV_KEY) |
| return env.snapshot() if env is not None else None |
| render_intervention_sim_view(get_snapshot=_get_snapshot) |
| return |
|
|
| ensure_playback_state_defaults() |
|
|
| catalog = DatasetCatalog(ground_weeds=()) |
| strategies = get_strategies() |
| riddle_map = available_riddles() |
| challenge_palette = interactive_challenge_templates() |
| requested_mode = os.getenv("AUTOFARM_SIM_MODE", SimulatorMode.REAL_FIELD.value) |
| default_strategy = os.getenv("AUTOFARM_SIM_STRATEGY", ACTIVE_DEBUG_STRATEGY) |
| default_scenario = os.getenv("AUTOFARM_SIM_SCENARIO", ACTIVE_DEBUG_SCENARIO) |
| mode_name = SimulatorMode.RIDDLE.value if requested_mode == SimulatorMode.RIDDLE.value else SimulatorMode.REAL_FIELD.value |
| show_parked_lanes = False |
| playback_toggle_clicked = False |
|
|
| with st.sidebar: |
| render_sidebar_heading("Simulator Controls", size_rem=1.32) |
| if mode_name == SimulatorMode.RIDDLE.value: |
| grid_size = 5 |
| else: |
| grid_size = 0 |
| scenario_name = None |
| strategy_name = ACTIVE_DEBUG_STRATEGY |
| if mode_name == SimulatorMode.RIDDLE.value: |
| visible_scenarios = visible_riddle_scenarios(list(riddle_map), show_parked=show_parked_lanes) |
| scenario_choice = preferred_option(visible_scenarios, default_scenario) |
| scenario_name = st.selectbox( |
| "Riddle", |
| visible_scenarios, |
| index=visible_scenarios.index(scenario_choice), |
| ) |
| visible_strategies = visible_strategy_names(list(strategies), show_parked=show_parked_lanes) |
| if len(visible_strategies) == 1: |
| strategy_name = visible_strategies[0] |
| else: |
| preferred_strategy = preferred_option(visible_strategies, default_strategy) |
| strategy_name = st.selectbox( |
| "Strategy", |
| visible_strategies, |
| index=visible_strategies.index(preferred_strategy), |
| ) |
| st.divider() |
| render_sidebar_heading("Playback", size_rem=0.98) |
| playback_mode = st.radio( |
| "Playback Mode", |
| [PLAYBACK_MODE_MANUAL, PLAYBACK_MODE_AUTO], |
| key=PLAYBACK_MODE_KEY, |
| horizontal=True, |
| ) |
| playback_speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)) |
| if playback_mode == PLAYBACK_MODE_AUTO: |
| available_playback_speeds = playback_speed_options(enable_8hz=True) |
| playback_speed = st.radio( |
| "Auto Speed", |
| available_playback_speeds, |
| key=PLAYBACK_SPEED_KEY, |
| horizontal=True, |
| ) |
| maybe_reset_playback_telemetry_for_speed_change(playback_speed) |
| playback_running = bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) |
| toggle_label = "Pause Playback" if playback_running else "Start Playback" |
| playback_toggle_clicked = st.button(toggle_label, width="stretch") |
| render_playback_telemetry( |
| speed=playback_speed, |
| running=playback_running, |
| mode=SimulatorMode(mode_name), |
| ) |
| else: |
| playback_speed = str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)) |
|
|
| if playback_mode == PLAYBACK_MODE_MANUAL and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): |
| pause_playback(clear_last_tick=False) |
| if playback_toggle_clicked: |
| if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): |
| pause_playback(clear_last_tick=False) |
| else: |
| start_playback() |
|
|
| auto_running = ( |
| str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) == PLAYBACK_MODE_AUTO |
| and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) |
| ) |
| render_sidebar_heading("Visualization", size_rem=0.98) |
| show_planned_trajectory = st.toggle( |
| "Show planned trajectory", |
| value=bool(st.session_state.get(SHOW_PLANNED_TRAJECTORY_KEY, True)), |
| key=SHOW_PLANNED_TRAJECTORY_KEY, |
| ) |
| st.divider() |
| reset_label = "Reset Riddle" if mode_name == SimulatorMode.RIDDLE.value else "Reset World" |
| step_label = "Advance One Step" if mode_name == SimulatorMode.RIDDLE.value else "Advance One Step" |
| reset = st.button(reset_label, width="stretch") |
| if mode_name == SimulatorMode.RIDDLE.value: |
| step_col, run_col = st.columns(2) |
| with step_col: |
| step_once = st.button(step_label, width="stretch", disabled=auto_running) |
| with run_col: |
| run_to_outcome = st.button("Run To Outcome", width="stretch", disabled=auto_running) |
| else: |
| step_once = st.button(step_label, width="stretch", disabled=auto_running) |
| run_to_outcome = False |
|
|
| st.divider() |
| render_sidebar_heading("Analysis", size_rem=0.98) |
| investigate_clicked = st.button("Intervention Impact", type="primary", width="stretch") |
|
|
| if investigate_clicked: |
| st.session_state[INTERVENTION_SIM_VIEW_KEY] = True |
| st.session_state[INTERVENTION_SIM_ERROR_KEY] = None |
| |
| if INTERVENTION_SIM_MAP_PATH.exists(): |
| INTERVENTION_SIM_MAP_PATH.unlink() |
| st.rerun() |
|
|
| if reset: |
| pause_playback(clear_last_tick=True) |
| st.session_state[PLAYBACK_WARNING_KEY] = None |
| reset_simulator_run_state() |
|
|
| session_id = get_or_create_session_id() |
| env = get_or_reset_environment( |
| mode_name=mode_name, |
| scenario_name=scenario_name, |
| reset=reset, |
| catalog=catalog, |
| grid_size=grid_size, |
| ) |
| strategy = strategies[strategy_name] |
| persistence_duration_ms = 0.0 |
| export_duration_ms = 0.0 |
| render_duration_ms = 0.0 |
| auto_tick_payload = None |
| run_every_seconds = playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))) / 1000.0 |
| playback_fragment = fragment_runner(run_every_seconds) |
| if playback_fragment is not None: |
| @playback_fragment |
| def auto_step_fragment() -> None: |
| st.empty() |
| if maybe_run_fragment_auto_step(env, strategy): |
| bump_env_generation() |
| st.rerun() |
|
|
| auto_step_fragment() |
| else: |
| auto_tick_payload = render_auto_tick_component( |
| enabled=( |
| str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)) == PLAYBACK_MODE_AUTO |
| and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)) |
| and not env.done |
| ), |
| interval_ms=playback_interval_ms(str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))), |
| session_token=( |
| f"{session_id}:" |
| f"{int(st.session_state.get(ENV_GENERATION_KEY, 0))}:" |
| f"{str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ))}" |
| ), |
| ) |
| step_executed = reset or playback_toggle_clicked |
|
|
| if not env.current_assessments: |
| env.current_assessments, env.current_plan = strategy.plan(env.planning_observation()) |
| env.invalidate_cached_views() |
|
|
| placement_message = pop_placement_flash_message() |
|
|
| if step_once and not env.done: |
| st.session_state[PLAYBACK_WARNING_KEY] = None |
| step_advanced, _, _, _ = execute_simulator_step(env, strategy) |
| if step_advanced: |
| bump_env_generation() |
| step_executed = True |
|
|
| if run_to_outcome and env.mode == SimulatorMode.RIDDLE and not env.done: |
| stepped = False |
| for _ in range(max(1, RIDDLE_SAFETY_MAX_STEPS + 2)): |
| step_advanced, _, _, _ = execute_simulator_step(env, strategy) |
| if not step_advanced: |
| break |
| stepped = True |
| if env.done: |
| break |
| if stepped: |
| bump_env_generation() |
| step_executed = True |
|
|
| if playback_fragment is None: |
| should_tick, tick_id = should_process_tick( |
| mode=str(st.session_state.get(PLAYBACK_MODE_KEY, PLAYBACK_MODE_MANUAL)), |
| running=bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)), |
| env_done=env.done, |
| tick_payload=auto_tick_payload, |
| last_processed_tick=st.session_state.get(PLAYBACK_LAST_TICK_KEY), |
| ) |
| if should_tick and not step_executed: |
| step_advanced, step_started_at, step_duration_ms, step_perf = execute_simulator_step(env, strategy) |
| if step_advanced: |
| begin_playback_step_telemetry( |
| speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)), |
| engine_step_ms=step_duration_ms, |
| step_started_at=step_started_at, |
| step_perf=step_perf, |
| ) |
| st.session_state[PLAYBACK_LAST_TICK_KEY] = tick_id |
| st.session_state[PLAYBACK_WARNING_KEY] = None |
| bump_env_generation() |
| step_executed = True |
| else: |
| pause_playback(clear_last_tick=True) |
| st.session_state[PLAYBACK_WARNING_KEY] = ( |
| "Playback paused because the last auto-step did not advance the simulator state." |
| ) |
|
|
| if env.done and bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): |
| pause_playback(clear_last_tick=True) |
|
|
| snapshot = env.snapshot() |
| persistence_started_at = time.perf_counter() |
| persistence = ensure_session_persistence( |
| session_id=session_id, |
| scenario_name=scenario_name or env.name, |
| snapshot=snapshot, |
| metrics_factory=lambda: build_runtime_performance_metrics(env), |
| ) |
| persistence_duration_ms = (time.perf_counter() - persistence_started_at) * 1000.0 |
| export_started_at = time.perf_counter() |
| write_script_eval_exports(env, snapshot, session_id=session_id) |
| export_duration_ms = (time.perf_counter() - export_started_at) * 1000.0 |
| current_task = snapshot.mission_plan.ordered_tasks[0] if snapshot.mission_plan.ordered_tasks else None |
|
|
| |
| |
| placement_slot = st.empty() |
| if placement_message is not None: |
| if placement_message[0]: |
| placement_slot.success(placement_message[1]) |
| else: |
| placement_slot.warning(placement_message[1]) |
|
|
| playback_warning = st.session_state.get(PLAYBACK_WARNING_KEY) |
| playback_slot = st.empty() |
| if playback_warning: |
| playback_slot.warning(str(playback_warning)) |
|
|
| persistence_warning = st.session_state.get(PERSISTENCE_WARNING_KEY) |
| persistence_slot = st.empty() |
| if persistence_warning: |
| persistence_slot.warning(str(persistence_warning)) |
|
|
| if env.mode == SimulatorMode.RIDDLE and scenario_name is not None: |
| render_primary_objective(riddle_map[scenario_name]) |
| render_started_at = time.perf_counter() |
| render_outcome_banner(snapshot, env) |
| selected_event = snapshot.event_history[-1] if snapshot.event_history else None |
|
|
| if env.mode == SimulatorMode.RIDDLE: |
| render_riddle_layout( |
| env, |
| snapshot, |
| riddle_map[scenario_name].target_zone_id, |
| False, |
| riddle_map[scenario_name], |
| selected_event, |
| ) |
| else: |
| render_interactive_layout(env, snapshot, selected_event, strategy, challenge_palette) |
|
|
| st.subheader("Plan & Steps") |
| render_plan_and_timeline(snapshot) |
| render_duration_ms = (time.perf_counter() - render_started_at) * 1000.0 |
|
|
| pending_auto_step = st.session_state.get(PLAYBACK_PENDING_STEP_KEY) |
| if isinstance(pending_auto_step, dict): |
| total_app_step_ms = (time.perf_counter() - float(pending_auto_step.get("step_started_at", 0.0))) * 1000.0 |
| finalize_playback_step_telemetry( |
| speed=str(st.session_state.get(PLAYBACK_SPEED_KEY, PLAYBACK_SPEED_1HZ)), |
| completed_at=time.perf_counter(), |
| total_app_step_ms=total_app_step_ms, |
| render_ms=render_duration_ms, |
| persistence_ms=persistence_duration_ms, |
| export_ms=export_duration_ms, |
| ) |
|
|
|
|
| @st.cache_resource(show_spinner=False) |
| def get_strategies(): |
| return build_strategies() |
|
|
|
|
| def get_or_reset_environment( |
| *, |
| mode_name: str, |
| scenario_name: str | None, |
| reset: bool, |
| catalog, |
| grid_size: int, |
| ) -> SimulatorEnvironment: |
| active_mode = st.session_state.get("mode_name") |
| active_scenario = st.session_state.get("scenario_name") |
| active_grid_size = st.session_state.get("grid_size") |
| if ( |
| reset |
| or ENV_KEY not in st.session_state |
| or active_mode != mode_name |
| or active_scenario != scenario_name |
| or active_grid_size != grid_size |
| ): |
| pause_playback(clear_last_tick=True) |
| st.session_state[PLAYBACK_WARNING_KEY] = None |
| flush_persistence_coordinator() |
| if mode_name == SimulatorMode.INTERACTIVE.value: |
| st.session_state[ENV_KEY] = build_interactive_environment( |
| catalog=catalog, |
| grid_size=grid_size, |
| ) |
| elif mode_name == SimulatorMode.REAL_FIELD.value: |
| st.session_state[ENV_KEY] = build_real_field_environment( |
| catalog=catalog, |
| ) |
| else: |
| st.session_state[ENV_KEY] = build_environment( |
| scenario_name or list(available_riddles())[0], |
| catalog=catalog, |
| grid_size=grid_size, |
| ) |
| bump_env_generation() |
| bump_persistence_generation() |
| st.session_state["mode_name"] = mode_name |
| st.session_state["scenario_name"] = scenario_name |
| st.session_state["grid_size"] = grid_size |
| st.session_state[LAST_BOARD_EVENT_KEY] = None |
| clear_pending_challenge() |
| st.session_state.pop(PLACEMENT_FLASH_KEY, None) |
| return st.session_state[ENV_KEY] |
|
|
|
|
| def get_or_create_session_id() -> str: |
| if SESSION_ID_KEY not in st.session_state: |
| st.session_state[SESSION_ID_KEY] = create_session_id() |
| return st.session_state[SESSION_ID_KEY] |
|
|
|
|
| def reset_simulator_run_state() -> None: |
| coordinator = st.session_state.pop(PERSISTENCE_COORDINATOR_KEY, None) |
| if isinstance(coordinator, PersistenceCoordinator): |
| coordinator.close() |
| st.session_state[SESSION_ID_KEY] = create_session_id() |
| st.session_state[ENV_GENERATION_KEY] = 0 |
| st.session_state[PERSISTENCE_GENERATION_KEY] = 0 |
| st.session_state.pop(SESSION_PERSISTENCE_KEY, None) |
| st.session_state.pop(SELECTED_ZONE_KEY, None) |
| st.session_state.pop(LAST_BOARD_EVENT_KEY, None) |
| clear_pending_challenge() |
| st.session_state.pop(PLACEMENT_FLASH_KEY, None) |
| st.session_state[PERSISTENCE_WARNING_KEY] = None |
|
|
|
|
| def get_or_create_persistence_coordinator() -> PersistenceCoordinator: |
| coordinator = st.session_state.get(PERSISTENCE_COORDINATOR_KEY) |
| if isinstance(coordinator, PersistenceCoordinator): |
| return coordinator |
| coordinator = PersistenceCoordinator() |
| st.session_state[PERSISTENCE_COORDINATOR_KEY] = coordinator |
| return coordinator |
|
|
|
|
| def flush_persistence_coordinator() -> int: |
| coordinator = st.session_state.get(PERSISTENCE_COORDINATOR_KEY) |
| if not isinstance(coordinator, PersistenceCoordinator): |
| return 0 |
| snapshot_event_count = coordinator.flush() |
| failure_message = coordinator.failure_message() |
| if failure_message is not None: |
| st.session_state[PERSISTENCE_WARNING_KEY] = ( |
| "Background snapshot persistence was disabled after a write failure. " |
| f"Falling back to synchronous snapshot writes. Details: {failure_message}" |
| ) |
| return snapshot_event_count |
|
|
|
|
| def ensure_session_persistence( |
| *, |
| session_id: str, |
| scenario_name: str, |
| snapshot, |
| metrics_factory: Callable[[], dict[str, object]] | None = None, |
| ) -> SessionPersistenceResult: |
| persisted = st.session_state.get(SESSION_PERSISTENCE_KEY) |
| generation = int(st.session_state.get(PERSISTENCE_GENERATION_KEY, 0)) |
| event_count = len(snapshot.event_history) |
| coordinator = ( |
| get_or_create_persistence_coordinator() |
| if snapshot.observation.mode == SimulatorMode.REAL_FIELD |
| else None |
| ) |
| latest_snapshot_event_count = max( |
| persisted.snapshot_event_count if persisted is not None else 0, |
| coordinator.latest_snapshot_event_count() if coordinator is not None else 0, |
| ) |
| if persisted is not None and latest_snapshot_event_count != persisted.snapshot_event_count: |
| persisted = replace(persisted, snapshot_event_count=latest_snapshot_event_count) |
| st.session_state[SESSION_PERSISTENCE_KEY] = persisted |
| if ( |
| persisted is not None |
| and persisted.generation == generation |
| and persisted.event_count == event_count |
| ): |
| return persisted |
|
|
| should_write_snapshot = ( |
| persisted is None |
| or persisted.generation != generation |
| or snapshot.done |
| or event_count <= 2 |
| or (event_count - latest_snapshot_event_count) >= SNAPSHOT_WRITE_INTERVAL |
| ) |
| write_snapshot_sync = should_write_snapshot and ( |
| coordinator is None |
| or snapshot.done |
| or coordinator.failure_message() is not None |
| ) |
| if write_snapshot_sync and coordinator is not None: |
| latest_snapshot_event_count = max(latest_snapshot_event_count, flush_persistence_coordinator()) |
| metrics = metrics_factory() if should_write_snapshot and metrics_factory is not None else None |
| result = persist_interactive_session( |
| session_id=session_id, |
| generation=generation, |
| scenario_name=scenario_name, |
| snapshot=snapshot, |
| metrics=metrics, |
| last_logged_generation=persisted.generation if persisted is not None else -1, |
| last_logged_event_count=persisted.event_count if persisted is not None else 0, |
| write_snapshot=write_snapshot_sync, |
| last_snapshot_event_count=latest_snapshot_event_count, |
| ) |
| if should_write_snapshot and not write_snapshot_sync and coordinator is not None: |
| queued = coordinator.enqueue_snapshot( |
| session_id=session_id, |
| generation=generation, |
| scenario_name=scenario_name, |
| snapshot=snapshot, |
| metrics=metrics or {}, |
| last_snapshot_event_count=latest_snapshot_event_count, |
| ) |
| if not queued: |
| st.session_state[PERSISTENCE_WARNING_KEY] = ( |
| "Background snapshot persistence was unavailable for this session. " |
| "Falling back to synchronous snapshot writes." |
| ) |
| result = persist_interactive_session( |
| session_id=session_id, |
| generation=generation, |
| scenario_name=scenario_name, |
| snapshot=snapshot, |
| metrics=metrics, |
| last_logged_generation=result.generation, |
| last_logged_event_count=result.event_count, |
| write_snapshot=True, |
| last_snapshot_event_count=result.snapshot_event_count, |
| ) |
| st.session_state[SESSION_PERSISTENCE_KEY] = result |
| return result |
|
|
|
|
| def render_session_state_panel(persistence: SessionPersistenceResult) -> None: |
| st.divider() |
| st.subheader("Session State") |
| st.caption("Interactive session persistence for external inspection.") |
| st.write( |
| { |
| "session_id": persistence.session_id, |
| "generation": persistence.generation, |
| "persisted_at": persistence.persisted_at, |
| } |
| ) |
| st.code( |
| "\n".join( |
| [ |
| f"snapshot: {persistence.snapshot_path}", |
| f"events: {persistence.events_path}", |
| f"pointer: {persistence.pointer_path}", |
| ] |
| ), |
| language="text", |
| ) |
|
|
|
|
| def export_timestamp() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def challenge_records_for_step(snapshot, *, zone_id: str, step_index: int) -> list[dict[str, object]]: |
| records: list[dict[str, object]] = [] |
| for challenge in [*snapshot.active_challenges, *snapshot.resolved_challenges]: |
| if challenge.zone_id != zone_id: |
| continue |
| if challenge.placed_step_index > step_index: |
| continue |
| if challenge.resolved_step_index is not None and step_index > challenge.resolved_step_index: |
| continue |
| records.append( |
| { |
| "instance_id": challenge.instance_id, |
| "template_id": challenge.template_id, |
| "name": challenge.name, |
| "description": challenge.description, |
| "resolution_status": challenge.resolution_status, |
| "resolution_message": challenge.resolution_message, |
| "placed_step_index": challenge.placed_step_index, |
| "resolved_step_index": challenge.resolved_step_index, |
| } |
| ) |
| return records |
|
|
|
|
| def challenge_state_by_zone(snapshot) -> dict[str, dict[str, list[dict[str, object]]]]: |
| by_zone: dict[str, dict[str, list[dict[str, object]]]] = {} |
| for status_name, challenges in ( |
| ("active", snapshot.active_challenges), |
| ("resolved", snapshot.resolved_challenges), |
| ): |
| for challenge in challenges: |
| zone_bucket = by_zone.setdefault(challenge.zone_id, {"active": [], "resolved": []}) |
| zone_bucket[status_name].append( |
| { |
| "instance_id": challenge.instance_id, |
| "template_id": challenge.template_id, |
| "name": challenge.name, |
| "description": challenge.description, |
| "resolution_status": challenge.resolution_status, |
| "resolution_message": challenge.resolution_message, |
| "placed_step_index": challenge.placed_step_index, |
| "resolved_step_index": challenge.resolved_step_index, |
| } |
| ) |
| return by_zone |
|
|
|
|
| def build_field_tile_state_export(env: SimulatorEnvironment, snapshot, *, session_id: str) -> dict[str, object]: |
| challenge_state = challenge_state_by_zone(snapshot) |
| tile_records: list[dict[str, object]] = [] |
| for zone in snapshot.observation.zone_states: |
| baseline_zone = env.baseline_zone_states.get(zone.zone_id) |
| evidence_state = snapshot.observation.zone_evidence_state_by_zone.get(zone.zone_id) |
| latest_event = env.latest_event_for_zone(zone.zone_id) |
| zone_challenges = challenge_state.get(zone.zone_id, {"active": [], "resolved": []}) |
| tile_records.append( |
| { |
| "zone_id": zone.zone_id, |
| "row": zone.row, |
| "col": zone.col, |
| "initial_state": baseline_zone.model_dump(mode="json") if baseline_zone is not None else None, |
| "current_state": zone.model_dump(mode="json"), |
| "control_state": evidence_state.control_state if evidence_state is not None else None, |
| "inspection_count": ( |
| evidence_state.guarded_context.inspection_count if evidence_state is not None else 0 |
| ), |
| "revisit_count": ( |
| evidence_state.guarded_context.revisit_count if evidence_state is not None else 0 |
| ), |
| "active_challenges": zone_challenges["active"], |
| "resolved_challenges": zone_challenges["resolved"], |
| "active_challenge_summaries": [str(item["name"]) for item in zone_challenges["active"]], |
| "latest_event": latest_event.model_dump(mode="json") if latest_event is not None else None, |
| } |
| ) |
| return { |
| "session_id": session_id, |
| "exported_at": export_timestamp(), |
| "mode": snapshot.observation.mode, |
| "step_index": snapshot.observation.step_index, |
| "event_count": len(snapshot.event_history), |
| "tiles": tile_records, |
| } |
|
|
|
|
| def build_tile_action_log_export(snapshot, *, session_id: str) -> dict[str, object]: |
| tile_actions: list[dict[str, object]] = [] |
| field_action_names = { |
| ActionType.INSPECT.value, |
| ActionType.REVISIT.value, |
| ActionType.DEFER.value, |
| ActionType.REMOVE_WEEDS.value, |
| ActionType.MEASURE_SOIL.value, |
| ActionType.APPLY_FERTILIZER.value, |
| ActionType.APPLY_WATER.value, |
| ActionType.INSTALL_DRAINAGE.value, |
| ActionType.SUBSOIL.value, |
| } |
|
|
| for event in snapshot.event_history: |
| if event.zone_id is None or str(event.action) not in field_action_names: |
| continue |
| active_challenges = challenge_records_for_step( |
| snapshot, |
| zone_id=event.zone_id, |
| step_index=event.step_index, |
| ) |
| tile_actions.append( |
| { |
| "step_index": event.step_index, |
| "zone_id": event.zone_id, |
| "action": str(event.action), |
| "execution_outcome": str(event.execution_outcome), |
| "reward": event.reward, |
| "from_zone_id": event.details.get("from_zone_id"), |
| "to_zone_id": event.details.get("to_zone_id"), |
| "revealed_facts": list(event.details.get("revealed_facts") or []), |
| "challenge_names": [record["name"] for record in active_challenges], |
| "challenges": active_challenges, |
| "planned_task": ( |
| event.planner_context.normalized_task.model_dump(mode="json") |
| if event.planner_context is not None and event.planner_context.normalized_task is not None |
| else None |
| ), |
| "requested_task": ( |
| event.planner_context.chosen_task_before_normalization.model_dump(mode="json") |
| if event.planner_context is not None |
| and event.planner_context.chosen_task_before_normalization is not None |
| else None |
| ), |
| } |
| ) |
|
|
| return { |
| "session_id": session_id, |
| "exported_at": export_timestamp(), |
| "mode": snapshot.observation.mode, |
| "step_index": snapshot.observation.step_index, |
| "event_count": len(snapshot.event_history), |
| "tile_actions": tile_actions, |
| } |
|
|
|
|
| def write_script_eval_exports(env: SimulatorEnvironment, snapshot, *, session_id: str) -> None: |
| SCRIPT_EVAL_EXPORT_ROOT.mkdir(parents=True, exist_ok=True) |
| FIELD_TILE_STATE_EXPORT_PATH.write_text( |
| json.dumps( |
| build_field_tile_state_export(env, snapshot, session_id=session_id), |
| indent=2, |
| sort_keys=True, |
| ), |
| encoding="utf-8", |
| ) |
| TILE_ACTION_LOG_EXPORT_PATH.write_text( |
| json.dumps( |
| build_tile_action_log_export(snapshot, session_id=session_id), |
| indent=2, |
| sort_keys=True, |
| ), |
| encoding="utf-8", |
| ) |
| write_sim_input_csvs(snapshot) |
|
|
|
|
| def render_outcome_banner(snapshot, env: SimulatorEnvironment) -> None: |
| outcome = snapshot.outcome |
| if outcome.status == EpisodeOutcomeStatus.SUCCESS: |
| st.success(outcome.message) |
| elif outcome.status == EpisodeOutcomeStatus.FAILURE: |
| st.error(f"{outcome.message} ({outcome.done_reason})") |
| elif outcome.status == EpisodeOutcomeStatus.ERROR: |
| st.warning(f"{outcome.message} ({outcome.done_reason})") |
| elif outcome.status == EpisodeOutcomeStatus.STOPPED: |
| st.warning(outcome.message) |
| elif env.mode == SimulatorMode.RIDDLE: |
| st.info("Riddle in progress.") |
| else: |
| st.info("Persistent world is running. The robot moves one neighboring field at a time, then performs field checks, soil sampling, or treatments on the field where it stands.") |
|
|
| def render_primary_objective(riddle) -> None: |
| st.markdown(f"## Objective: {riddle.objective}") |
| note_left, note_center, note_right = st.columns(3) |
| with note_left: |
| st.info(f"**Riddle Note**\n\n{riddle.description}") |
| with note_center: |
| st.info(f"**Challenge**\n\n{riddle.challenge_summary}") |
| with note_right: |
| st.info(f"**Why It Matters**\n\n{riddle.why_it_matters}") |
|
|
|
|
| def battery_percentage_int(energy_remaining: float | None, max_energy: float | None) -> int | None: |
| if energy_remaining is None or max_energy in {None, 0}: |
| return None |
| ratio = max(0.0, min(1.0, float(energy_remaining) / float(max_energy))) |
| if ratio <= 0.0: |
| return 0 |
| return max(0, min(100, int(math.ceil(ratio * 100.0)))) |
|
|
|
|
| def render_battery_indicator(energy_remaining: float | None, max_energy: float | None) -> None: |
| if energy_remaining is None or max_energy in {None, 0}: |
| st.write("Battery unavailable") |
| return |
| percentage = battery_percentage_int(energy_remaining, max_energy) |
| assert percentage is not None |
| filled = round(percentage / 20) |
| if percentage >= 60: |
| fill_color, border_color = "#16a34a", "#15803d" |
| elif percentage >= 30: |
| fill_color, border_color = "#d97706", "#b45309" |
| else: |
| fill_color, border_color = "#dc2626", "#b91c1c" |
| segs = "".join( |
| f'<div style="flex:1;height:100%;background:{fill_color if i < filled else "#e2e8f0"};border-radius:2px;"></div>' |
| for i in range(5) |
| ) |
| st.markdown( |
| f""" |
| <div style="display:flex;align-items:center;gap:10px;justify-content:center;padding:4px 0;"> |
| <div style="display:flex;align-items:center;"> |
| <div style="width:68px;height:26px;border:2px solid {border_color};border-radius:5px;padding:3px;display:flex;gap:2px;background:#f8fafc;"> |
| {segs} |
| </div> |
| <div style="width:5px;height:13px;background:{border_color};border-radius:0 3px 3px 0;margin-left:-1px;flex-shrink:0;"></div> |
| </div> |
| <span style="font-weight:700;font-size:14px;color:#1e293b;">{percentage}%</span> |
| </div> |
| """, |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| def render_riddle_layout( |
| env: SimulatorEnvironment, |
| snapshot, |
| target_zone_id: str | None, |
| show_truth: bool, |
| riddle, |
| selected_event, |
| ) -> None: |
| map_col, right_col = st.columns([3.8, 1.3]) |
| with map_col: |
| st.subheader("Field Grid") |
| figure = get_cached_zone_grid_figure( |
| env, |
| target_zone_id=target_zone_id, |
| show_truth=show_truth, |
| mission_plan=snapshot.mission_plan, |
| show_planned_trajectory=show_planned_trajectory_enabled(), |
| ) |
| plot_state = st.plotly_chart( |
| figure, |
| width="stretch", |
| on_select="rerun", |
| selection_mode="points", |
| |
| key="riddle-grid", |
| ) |
| update_selected_zone_from_plot_state(plot_state, env, fallback_zone_id=target_zone_id) |
| render_selected_step_summary(selected_event) |
| with right_col: |
| step_col, done_col = st.columns(2) |
| with step_col: |
| st.metric("Step", snapshot.observation.step_index) |
| with done_col: |
| st.metric("Done", "Yes" if snapshot.done else "No") |
| if snapshot.observation.max_energy is not None: |
| st.write("**Battery**") |
| render_battery_indicator(snapshot.observation.energy_remaining, snapshot.observation.max_energy) |
| render_grid_legend() |
| render_selected_zone_panel(env, snapshot, selected_zone_id(env, target_zone_id=target_zone_id)) |
| with st.container(): |
| render_metrics_overview(env) |
|
|
|
|
| def _render_challenge_status(snapshot) -> None: |
| all_challenges = list(snapshot.active_challenges) + list(snapshot.resolved_challenges) |
| recent = sorted(all_challenges, key=lambda c: c.placed_step_index, reverse=True)[:3] |
| st.markdown("**Recent Challenges**") |
| if not recent: |
| st.caption("No challenges placed yet.") |
| return |
| for ch in recent: |
| zone = short_zone_label(ch.zone_id) |
| st.caption(f"· **{ch.name}** @ {zone}") |
|
|
|
|
| def render_challenge_placement_controls(challenge_palette) -> None: |
| with st.container(border=True): |
| st.caption("Challenge Placement") |
| challenge_col, radius_col, action_col = st.columns([2.5, 1.1, 1.1]) |
| with challenge_col: |
| template_id = st.selectbox( |
| "Challenge", |
| list(challenge_palette), |
| format_func=lambda item: challenge_palette[item].display_name, |
| key="interactive_manual_template", |
| ) |
| with radius_col: |
| radius = st.selectbox( |
| "Radius", |
| [1, 2, 3], |
| format_func=lambda value: f"{value} tile" if value == 1 else f"{value} tiles", |
| key="interactive_manual_radius", |
| ) |
| with action_col: |
| st.write("") |
| if st.button("Apply", key="interactive_manual_apply", width="stretch"): |
| arm_pending_challenge(template_id, int(radius)) |
| if hasattr(st, "toast"): |
| st.toast("Apply by selecting the tile.") |
|
|
| pending_request = pending_challenge_request() |
| if pending_request is not None: |
| radius = pending_challenge_radius() |
| st.info( |
| "Apply by selecting the tile." |
| f" Pending: {challenge_palette[pending_request].display_name}, radius {radius}." |
| ) |
| else: |
| st.caption("Choose a challenge, press Apply, then click the target tile on the field.") |
|
|
|
|
| def maybe_apply_pending_challenge_selection(env: SimulatorEnvironment, zone_id: str | None, strategy) -> bool: |
| pending_request = pending_challenge_request() |
| if pending_request is None or zone_id is None: |
| return False |
|
|
| template_id = pending_request |
| radius = pending_challenge_radius() |
| clear_pending_challenge() |
| st.session_state[SELECTED_ZONE_KEY] = zone_id |
| try: |
| success, message = env.place_challenge(template_id, zone_id, radius=radius) |
| except NotImplementedError as exc: |
| success, message = False, str(exc) |
| if success: |
| if bool(st.session_state.get(PLAYBACK_RUNNING_KEY, False)): |
| pause_playback(clear_last_tick=True) |
| env.current_assessments, env.current_plan = strategy.plan(env.planning_observation()) |
| env.invalidate_cached_views() |
| bump_env_generation() |
| set_placement_flash_message(success, message) |
| st.rerun() |
| return True |
|
|
|
|
| def _render_world_map_fragment( |
| env: SimulatorEnvironment, |
| snapshot, |
| selected_event, |
| strategy, |
| ) -> None: |
| """Render the world map chart inside a stable fragment block. |
| |
| Wrapping this in @st.fragment gives the chart a stable, identifier-based |
| parent block so it is never unmounted when unrelated elements above it shift |
| the main-page element tree between reruns. |
| """ |
| map_col, side_col = st.columns([3.2, 1.2]) |
| with map_col: |
| st.subheader("World Map") |
| figure = get_cached_zone_grid_figure( |
| env, |
| target_zone_id=None, |
| show_truth=False, |
| mission_plan=snapshot.mission_plan, |
| show_planned_trajectory=show_planned_trajectory_enabled(), |
| ) |
| plot_state = st.plotly_chart( |
| figure, |
| width="stretch", |
| on_select="rerun", |
| selection_mode="points", |
| key="interactive-grid", |
| ) |
| clicked_zone_id = selected_zone_from_plot_state(plot_state) |
| if clicked_zone_id is not None: |
| st.session_state[SELECTED_ZONE_KEY] = clicked_zone_id |
| maybe_apply_pending_challenge_selection(env, clicked_zone_id, strategy) |
| update_selected_zone_from_plot_state( |
| plot_state, |
| env, |
| fallback_zone_id=(env.current_zone_id if env.current_zone_id in {zone.zone_id for zone in env.zone_states} else env.home_neighbor_zone_id), |
| ) |
| render_selected_step_summary(selected_event) |
| with side_col: |
| render_grid_legend() |
|
|
|
|
| |
| |
| |
| _render_world_map_fragment = getattr(st, "fragment", lambda f: f)(_render_world_map_fragment) |
|
|
|
|
| def render_interactive_layout(env: SimulatorEnvironment, snapshot, selected_event, strategy, challenge_palette) -> None: |
| with st.container(border=True): |
| step_col, battery_col, challenge_col = st.columns([0.8, 1.3, 3.4]) |
| with step_col: |
| st.metric("Step", snapshot.observation.step_index) |
| if env.mode != SimulatorMode.REAL_FIELD: |
| metric_label, metric_value = field_layout_metric(env) |
| st.metric(metric_label, metric_value) |
| with battery_col: |
| st.caption("Battery") |
| render_battery_indicator(snapshot.observation.energy_remaining, snapshot.observation.max_energy) |
| with challenge_col: |
| _render_challenge_status(snapshot) |
|
|
| render_challenge_placement_controls(challenge_palette) |
|
|
| _render_world_map_fragment(env, snapshot, selected_event, strategy) |
|
|
| render_selected_zone_panel( |
| env, |
| snapshot, |
| selected_zone_id( |
| env, |
| target_zone_id=(env.current_zone_id if env.current_zone_id in {zone.zone_id for zone in env.zone_states} else env.home_neighbor_zone_id), |
| ), |
| ) |
|
|
| with st.container(): |
| render_metrics_overview(env) |
|
|
|
|
| def mission_plan_signature(plan: MissionPlan | None) -> tuple[tuple[str | None, str | None, str | None], ...]: |
| if plan is None: |
| return () |
| signature: list[tuple[str | None, str | None, str | None]] = [] |
| for task in plan.ordered_tasks: |
| if task.target_zone is None: |
| continue |
| signature.append( |
| ( |
| str(task.task_type), |
| task.target_zone, |
| str(task.control_intent) if task.control_intent is not None else None, |
| ) |
| ) |
| return tuple(signature) |
|
|
|
|
| def get_cached_zone_grid_figure( |
| env: SimulatorEnvironment, |
| *, |
| target_zone_id: str | None, |
| show_truth: bool, |
| mission_plan: MissionPlan | None, |
| show_planned_trajectory: bool, |
| ) -> go.Figure: |
| cache = st.session_state.setdefault(FIGURE_CACHE_KEY, {}) |
| generation = int(st.session_state.get(ENV_GENERATION_KEY, 0)) |
| cache_key = ( |
| id(env), |
| generation, |
| target_zone_id, |
| show_truth, |
| show_planned_trajectory, |
| mission_plan_signature(mission_plan), |
| ) |
| figure = cache.get(cache_key) |
| if figure is not None: |
| return figure |
|
|
| figure = build_zone_grid_figure( |
| env, |
| target_zone_id=target_zone_id, |
| show_truth=show_truth, |
| mission_plan=mission_plan, |
| show_planned_trajectory=show_planned_trajectory, |
| ) |
| cache[cache_key] = figure |
| if len(cache) > 8: |
| stale_keys = [key for key in cache if key != cache_key] |
| for stale_key in stale_keys[:-7]: |
| cache.pop(stale_key, None) |
| return figure |
|
|
|
|
| def get_real_field_static_map_payload(env: SimulatorEnvironment) -> RealFieldStaticMapPayload: |
| cache = st.session_state.setdefault(STATIC_MAP_CACHE_KEY, {}) |
| cache_key = id(env) |
| payload = cache.get(cache_key) |
| if payload is not None: |
| return payload |
|
|
| grid_size = max(field_extent(env.zone_states)) |
| cell_marker_size = max(28, 104 - ((grid_size - 3) * 7)) |
| cell_text_size = max(8, 15 - max(0, grid_size - 3)) |
| marker_ring_size = cell_marker_size + max(12, 30 - max(0, grid_size - 3) * 2) |
| step_marker_size = max(16, 28 - max(0, grid_size - 3)) |
| home_base_zone = env._zone_state(env.home_neighbor_zone_id) if env.home_neighbor_zone_id is not None else None |
| home_base_row = home_base_zone.row if home_base_zone is not None else max(1, (grid_size + 1) // 2) |
| zone_ids = tuple(zone.zone_id for zone in env.zone_states) |
| xs = tuple(zone.col or 0 for zone in env.zone_states) |
| ys = tuple(-(zone.row or 0) for zone in env.zone_states) |
| x_values = xs |
| y_values = ys |
| home_base_x = min(x_values) - 2.0 |
| home_base_y = -home_base_row |
| entry_x = float(home_base_zone.col if home_base_zone is not None else min(x_values)) |
| entry_y = float(-(home_base_zone.row or home_base_row) if home_base_zone is not None else -home_base_row) |
|
|
| figure = go.Figure() |
| figure.add_trace( |
| go.Scatter( |
| x=[home_base_x, entry_x], |
| y=[home_base_y, entry_y], |
| mode="lines", |
| line={"color": "#64748b", "width": 2, "dash": "dot"}, |
| hoverinfo="skip", |
| showlegend=False, |
| ) |
| ) |
| figure.add_layout_image( |
| dict( |
| source=base_icon_data_uri(), |
| x=home_base_x, |
| y=home_base_y, |
| sizex=1.95, |
| sizey=1.95, |
| xref="x", |
| yref="y", |
| xanchor="center", |
| yanchor="middle", |
| layer="above", |
| ) |
| ) |
| figure.add_annotation( |
| x=home_base_x, |
| y=home_base_y - 1.25, |
| text="Robot Home Base", |
| showarrow=False, |
| font={"size": max(9, cell_text_size), "color": "#334155"}, |
| ) |
| figure.update_layout( |
| margin={"l": 10, "r": 10, "t": 10, "b": 10}, |
| xaxis={"title": "Column", "dtick": 1, "range": [home_base_x - 1.2, max(x_values) + 0.6], "showgrid": False, "zeroline": False, "constrain": "domain"}, |
| yaxis={"title": "Row", "dtick": 1, "range": [min(min(y_values), home_base_y) - 1.6, max(y_values) + 0.6], "showgrid": False, "zeroline": False, "scaleanchor": "x", "scaleratio": 1}, |
| height=min(900, max(480, 90 * grid_size)), |
| plot_bgcolor="#f8f5ef", |
| paper_bgcolor="#ffffff", |
| |
| |
| uirevision=1, |
| ) |
| payload = RealFieldStaticMapPayload( |
| figure=figure, |
| zone_ids=zone_ids, |
| xs=xs, |
| ys=ys, |
| x_values=x_values, |
| y_values=y_values, |
| grid_size=grid_size, |
| cell_marker_size=cell_marker_size, |
| cell_text_size=cell_text_size, |
| marker_ring_size=marker_ring_size, |
| step_marker_size=step_marker_size, |
| home_base_x=home_base_x, |
| home_base_y=home_base_y, |
| entry_x=entry_x, |
| entry_y=entry_y, |
| tile_world_x0=min(x_values) - 0.5, |
| tile_world_x1=max(x_values) + 0.5, |
| tile_world_y0=min(y_values) - 0.5, |
| tile_world_y1=max(y_values) + 0.5, |
| ) |
| cache[cache_key] = payload |
| if len(cache) > 4: |
| stale_keys = [key for key in cache if key != cache_key] |
| for stale_key in stale_keys[:-3]: |
| cache.pop(stale_key, None) |
| return payload |
|
|
|
|
| def location_xy( |
| env: SimulatorEnvironment, |
| static_payload: RealFieldStaticMapPayload, |
| zone_id: str | None, |
| ) -> tuple[float | None, float | None]: |
| if zone_id is None: |
| return None, None |
| if zone_id == env.home_zone_id: |
| return static_payload.home_base_x, static_payload.home_base_y |
| zone = env._zone_state(zone_id) |
| return float(zone.col or 0), float(-(zone.row or 0)) |
|
|
|
|
| def build_planned_horizon_payload( |
| env: SimulatorEnvironment, |
| *, |
| mission_plan: MissionPlan | None, |
| move_step_budget: int = PLANNED_TRAJECTORY_MOVE_STEP_BUDGET, |
| ) -> PlannedHorizonPayload | None: |
| plan = mission_plan or env.current_plan |
| if plan is None or not plan.ordered_tasks: |
| return None |
|
|
| anchor_zone_id = env.current_zone_id or env.home_zone_id |
| starts_from_home = env.current_zone_id == env.home_zone_id |
| if anchor_zone_id is None: |
| return None |
|
|
| route_zone_ids: list[str] = [] |
| planned_actions: list[PlannedHorizonAction] = [] |
| move_steps_used = 0 |
| action_order = 0 |
|
|
| for task in plan.ordered_tasks: |
| target_zone_id = task.target_zone |
| if target_zone_id is None: |
| continue |
|
|
| if anchor_zone_id == target_zone_id: |
| path = [target_zone_id] |
| else: |
| path = shortest_path_zone_ids( |
| env.zone_states, |
| anchor_zone_id, |
| target_zone_id, |
| home_zone_id=env.home_zone_id, |
| home_neighbor_zone_id=env.home_neighbor_zone_id, |
| ) |
| if not path: |
| continue |
|
|
| path_move_count = max(0, len(path) - 1) |
| remaining_move_budget = max(0, move_step_budget - move_steps_used) |
| path_to_add = path |
| reached_target = True |
| if path_move_count > remaining_move_budget: |
| path_to_add = path[: remaining_move_budget + 1] |
| reached_target = False |
|
|
| if path_to_add: |
| if not route_zone_ids: |
| route_zone_ids.extend(path_to_add) |
| elif route_zone_ids[-1] == path_to_add[0]: |
| route_zone_ids.extend(path_to_add[1:]) |
| else: |
| route_zone_ids.extend(path_to_add) |
|
|
| if not reached_target: |
| break |
|
|
| move_steps_used += path_move_count |
| anchor_zone_id = target_zone_id |
|
|
| if task.task_type != ActionType.MOVE: |
| action_order += 1 |
| planned_actions.append( |
| PlannedHorizonAction( |
| order=action_order, |
| zone_id=target_zone_id, |
| task_type=str(task.task_type), |
| rationale=task.rationale, |
| ) |
| ) |
|
|
| if move_steps_used >= move_step_budget: |
| continue |
|
|
| if not route_zone_ids and not planned_actions: |
| return None |
|
|
| return PlannedHorizonPayload( |
| route_zone_ids=tuple(route_zone_ids), |
| starts_from_home=starts_from_home, |
| actions=tuple(planned_actions), |
| ) |
|
|
|
|
| def build_planned_trajectory_payload( |
| env: SimulatorEnvironment, |
| *, |
| mission_plan: MissionPlan | None, |
| static_payload: RealFieldStaticMapPayload, |
| move_step_budget: int = PLANNED_TRAJECTORY_MOVE_STEP_BUDGET, |
| ) -> PlannedTrajectoryPayload | None: |
| horizon = build_planned_horizon_payload( |
| env, |
| mission_plan=mission_plan, |
| move_step_budget=move_step_budget, |
| ) |
| if horizon is None: |
| return None |
|
|
| task_marker_xs: list[float] = [] |
| task_marker_ys: list[float] = [] |
| task_marker_texts: list[str] = [] |
| task_marker_sizes: list[int] = [] |
| task_marker_colors: list[str] = [] |
| task_marker_hovers: list[str] = [] |
| per_zone_offsets: dict[str, int] = {} |
|
|
| for planned_action in horizon.actions: |
| target_zone_id = planned_action.zone_id |
| zone_offset = per_zone_offsets.get(target_zone_id, 0) |
| per_zone_offsets[target_zone_id] = zone_offset + 1 |
| offset_x, offset_y = numbered_step_offset(zone_offset) |
| marker_x, marker_y = location_xy(env, static_payload, target_zone_id) |
| if marker_x is None or marker_y is None: |
| continue |
| task_marker_xs.append(marker_x + (offset_x * 0.75)) |
| task_marker_ys.append(marker_y + (offset_y * 0.75)) |
| task_marker_texts.append(str(planned_action.order)) |
| task_marker_sizes.append(static_payload.step_marker_size + (10 if planned_action.order == 1 else 4)) |
| task_marker_colors.append("#0f172a" if planned_action.order == 1 else "rgba(15,23,42,0.72)") |
| task_marker_hovers.append( |
| f"#{planned_action.order} {humanize_action_name(planned_action.task_type)} on " |
| f"{short_zone_label(target_zone_id)}<br>{planned_action.rationale}" |
| ) |
|
|
| if not horizon.route_zone_ids and not task_marker_xs: |
| return None |
|
|
| route_xs: list[float] = [] |
| route_ys: list[float] = [] |
| for zone_id in horizon.route_zone_ids: |
| marker_x, marker_y = location_xy(env, static_payload, zone_id) |
| if marker_x is None or marker_y is None: |
| continue |
| route_xs.append(marker_x) |
| route_ys.append(marker_y) |
|
|
| waypoint_zone_ids = horizon.route_zone_ids[1:] |
| waypoint_xs: list[float] = [] |
| waypoint_ys: list[float] = [] |
| for zone_id in waypoint_zone_ids: |
| marker_x, marker_y = location_xy(env, static_payload, zone_id) |
| if marker_x is None or marker_y is None: |
| continue |
| waypoint_xs.append(marker_x) |
| waypoint_ys.append(marker_y) |
|
|
| return PlannedTrajectoryPayload( |
| route_zone_ids=horizon.route_zone_ids, |
| starts_from_home=horizon.starts_from_home, |
| route_xs=route_xs, |
| route_ys=route_ys, |
| waypoint_xs=waypoint_xs, |
| waypoint_ys=waypoint_ys, |
| task_marker_xs=task_marker_xs, |
| task_marker_ys=task_marker_ys, |
| task_marker_texts=task_marker_texts, |
| task_marker_sizes=task_marker_sizes, |
| task_marker_colors=task_marker_colors, |
| task_marker_hovers=task_marker_hovers, |
| ) |
|
|
|
|
| def build_real_field_dynamic_map_payload( |
| env: SimulatorEnvironment, |
| *, |
| target_zone_id: str | None, |
| show_truth: bool, |
| static_payload: RealFieldStaticMapPayload, |
| mission_plan: MissionPlan | None, |
| show_planned_trajectory: bool, |
| ) -> RealFieldDynamicMapPayload: |
| assessments = {assessment.zone_id: assessment for assessment in env.current_assessments} |
| current_zone_id = env.current_zone_id |
| previous_zone_id = env.event_history[-1].details.get("from_zone_id") if env.event_history else None |
| latest_event_by_zone = env.latest_events_by_zone() |
| latest_soil_event_by_zone = env.latest_zone_action_events(ActionType.MEASURE_SOIL) |
| step_index = env.step_index |
| zone_action_badges = build_zone_action_badges(env, mission_plan=mission_plan) |
|
|
| texts: list[str] = [] |
| colors: list[str] = [] |
| line_colors: list[str] = [] |
| line_widths: list[int] = [] |
| hover_texts: list[str] = [] |
| for zone in env.zone_states: |
| assessment = assessments.get(zone.zone_id) |
| hidden = env.hidden_zone_states[zone.zone_id] |
| label = build_zone_cell_text( |
| zone=zone, |
| action_badges=zone_action_badges.get(zone.zone_id, []), |
| ) |
| if show_truth: |
| label += f"<br>{hidden.true_priority_band}" |
| texts.append(label) |
| fill_color = priority_fill_color( |
| float(zone.static_features.get("scenario_priority_hint", 0.0) or 0.0), |
| needs_intervention=visible_tile_needs_intervention(zone), |
| yield_pred_missing=tile_has_missing_yield(zone), |
| ) |
| colors.append(fill_color) |
| border_color, border_width = zone_border_style( |
| hidden=hidden, |
| is_current_zone=zone.zone_id == current_zone_id, |
| measured_fill_color=fill_color, |
| ) |
| line_colors.append(border_color) |
| line_widths.append(border_width) |
| hover_texts.append( |
| build_zone_hover_text( |
| zone_state=zone, |
| assessment=assessment, |
| event=latest_event_by_zone.get(zone.zone_id), |
| soil_event=latest_soil_event_by_zone.get(zone.zone_id), |
| step_index=step_index, |
| ) |
| ) |
|
|
| event_marker_xs: list[float] = [] |
| event_marker_ys: list[float] = [] |
| event_marker_texts: list[str] = [] |
| event_marker_colors: list[str] = [] |
| event_marker_hovers: list[str] = [] |
| per_zone_offsets: dict[str, int] = {} |
| for event in env.event_history[-8:]: |
| if event.zone_id is None: |
| continue |
| zone_offset = per_zone_offsets.get(event.zone_id, 0) |
| per_zone_offsets[event.zone_id] = zone_offset + 1 |
| offset_x, offset_y = numbered_step_offset(zone_offset) |
| event_x, event_y = location_xy(env, static_payload, event.zone_id) |
| if event_x is None or event_y is None: |
| continue |
| event_marker_xs.append(event_x + offset_x) |
| event_marker_ys.append(event_y + offset_y) |
| event_marker_texts.append(str(event.step_index)) |
| event_marker_colors.append(action_color(str(event.action))) |
| event_marker_hovers.append(build_event_hover_text(event)) |
|
|
| planned_trajectory = ( |
| build_planned_trajectory_payload( |
| env, |
| mission_plan=mission_plan, |
| static_payload=static_payload, |
| ) |
| if show_planned_trajectory |
| else None |
| ) |
|
|
| return RealFieldDynamicMapPayload( |
| texts=texts, |
| colors=colors, |
| line_colors=line_colors, |
| line_widths=line_widths, |
| hover_texts=hover_texts, |
| previous_zone_id=previous_zone_id, |
| current_zone_id=current_zone_id, |
| event_marker_xs=event_marker_xs, |
| event_marker_ys=event_marker_ys, |
| event_marker_texts=event_marker_texts, |
| event_marker_colors=event_marker_colors, |
| event_marker_hovers=event_marker_hovers, |
| planned_trajectory=planned_trajectory, |
| ) |
|
|
|
|
|
|
| def build_zone_grid_figure( |
| env: SimulatorEnvironment, |
| target_zone_id: str | None, |
| show_truth: bool, |
| *, |
| mission_plan: MissionPlan | None = None, |
| show_planned_trajectory: bool = True, |
| ) -> go.Figure: |
| static_payload = get_real_field_static_map_payload(env) |
| dynamic_payload = build_real_field_dynamic_map_payload( |
| env, |
| target_zone_id=target_zone_id, |
| show_truth=show_truth, |
| static_payload=static_payload, |
| mission_plan=mission_plan, |
| show_planned_trajectory=show_planned_trajectory, |
| ) |
| figure = go.Figure(static_payload.figure) |
| |
| |
| |
| cell_shapes = [ |
| { |
| "type": "rect", |
| "x0": float(x) - 0.5, |
| "x1": float(x) + 0.5, |
| "y0": float(y) - 0.5, |
| "y1": float(y) + 0.5, |
| "fillcolor": fill_color, |
| "line": {"color": line_color, "width": float(line_width)}, |
| "layer": "below", |
| "xref": "x", |
| "yref": "y", |
| } |
| for x, y, fill_color, line_color, line_width in zip( |
| static_payload.xs, |
| static_payload.ys, |
| dynamic_payload.colors, |
| dynamic_payload.line_colors, |
| dynamic_payload.line_widths, |
| strict=False, |
| ) |
| ] |
| figure.update_layout(shapes=cell_shapes) |
| figure.add_trace( |
| go.Scatter( |
| x=list(static_payload.xs), |
| y=list(static_payload.ys), |
| mode="markers+text", |
| showlegend=False, |
| text=dynamic_payload.texts, |
| textposition="middle center", |
| textfont={"size": static_payload.cell_text_size}, |
| marker={ |
| "size": static_payload.cell_marker_size, |
| "symbol": "square", |
| "color": dynamic_payload.colors, |
| "line": {"width": dynamic_payload.line_widths, "color": dynamic_payload.line_colors}, |
| "opacity": 0.001, |
| }, |
| hovertemplate="%{hovertext}<extra></extra>", |
| hovertext=dynamic_payload.hover_texts, |
| customdata=list(static_payload.zone_ids), |
| selected={"marker": {"opacity": 1.0}}, |
| unselected={"marker": {"opacity": 1.0}}, |
| ) |
| ) |
|
|
| if ( |
| dynamic_payload.previous_zone_id is not None |
| and dynamic_payload.current_zone_id is not None |
| and dynamic_payload.previous_zone_id != dynamic_payload.current_zone_id |
| ): |
| previous_x, previous_y = location_xy(env, static_payload, dynamic_payload.previous_zone_id) |
| current_x, current_y = location_xy(env, static_payload, dynamic_payload.current_zone_id) |
| if previous_x is not None and previous_y is not None and current_x is not None and current_y is not None: |
| figure.add_trace( |
| go.Scatter( |
| x=[previous_x, current_x], |
| y=[previous_y, current_y], |
| mode="lines", |
| line={"color": "#9aa6b2", "width": 3, "dash": "dot"}, |
| hoverinfo="skip", |
| showlegend=False, |
| ) |
| ) |
| figure.add_trace( |
| go.Scatter( |
| x=[previous_x], |
| y=[previous_y], |
| mode="markers+text", |
| text=["Prev"], |
| textposition="bottom center", |
| textfont={"size": max(8, static_payload.cell_text_size - 2), "color": "#64748b"}, |
| marker={"size": max(12, static_payload.step_marker_size - 8), "symbol": "circle", "color": "rgba(100,116,139,0.35)", "line": {"width": 0}}, |
| hovertemplate="Previous robot position<extra></extra>", |
| showlegend=False, |
| ) |
| ) |
|
|
| if dynamic_payload.planned_trajectory is not None: |
| planned = dynamic_payload.planned_trajectory |
| if len(planned.route_xs) >= 2: |
| figure.add_trace( |
| go.Scatter( |
| x=planned.route_xs, |
| y=planned.route_ys, |
| mode="lines", |
| line={"color": "rgba(15,23,42,0.48)", "width": 5}, |
| hoverinfo="skip", |
| showlegend=False, |
| ) |
| ) |
| if planned.waypoint_xs: |
| figure.add_trace( |
| go.Scatter( |
| x=planned.waypoint_xs, |
| y=planned.waypoint_ys, |
| mode="markers", |
| marker={ |
| "size": max(8, static_payload.step_marker_size - 8), |
| "symbol": "circle", |
| "color": "rgba(15,23,42,0.50)", |
| "line": {"width": 1, "color": "rgba(255,255,255,0.95)"}, |
| }, |
| hoverinfo="skip", |
| showlegend=False, |
| ) |
| ) |
| if planned.task_marker_xs: |
| figure.add_trace( |
| go.Scatter( |
| x=planned.task_marker_xs, |
| y=planned.task_marker_ys, |
| mode="markers+text", |
| text=planned.task_marker_texts, |
| textposition="middle center", |
| textfont={"color": "white", "size": max(8, static_payload.cell_text_size - 1)}, |
| marker={ |
| "size": planned.task_marker_sizes, |
| "symbol": "circle", |
| "color": planned.task_marker_colors, |
| "line": {"width": 2, "color": "white"}, |
| }, |
| customdata=planned.task_marker_hovers, |
| hovertemplate="%{customdata}<extra></extra>", |
| showlegend=False, |
| ) |
| ) |
|
|
| if dynamic_payload.event_marker_xs: |
| figure.add_trace( |
| go.Scatter( |
| x=dynamic_payload.event_marker_xs, |
| y=dynamic_payload.event_marker_ys, |
| mode="markers+text", |
| text=dynamic_payload.event_marker_texts, |
| textposition="middle center", |
| textfont={"color": "white", "size": max(8, static_payload.cell_text_size - 1)}, |
| marker={ |
| "size": static_payload.step_marker_size, |
| "symbol": "circle", |
| "color": dynamic_payload.event_marker_colors, |
| "line": {"width": 2, "color": "white"}, |
| }, |
| customdata=dynamic_payload.event_marker_hovers, |
| hovertemplate="%{customdata}<extra></extra>", |
| showlegend=False, |
| ) |
| ) |
|
|
| if dynamic_payload.current_zone_id is not None: |
| robot_x, robot_y = location_xy(env, static_payload, dynamic_payload.current_zone_id) |
| if robot_x is None or robot_y is None: |
| return figure |
| figure.add_trace( |
| go.Scatter( |
| x=[robot_x], |
| y=[robot_y], |
| mode="markers", |
| marker={ |
| "size": static_payload.marker_ring_size + 20, |
| "symbol": "circle-open", |
| "color": "rgba(0,0,0,0)", |
| "line": {"width": 10, "color": "#0f172a"}, |
| }, |
| hovertemplate="Robot current position<extra></extra>", |
| showlegend=False, |
| ) |
| ) |
| figure.add_layout_image( |
| dict( |
| source=robot_icon_data_uri(), |
| x=robot_x, |
| y=robot_y, |
| sizex=0.88, |
| sizey=0.88, |
| xref="x", |
| yref="y", |
| xanchor="center", |
| yanchor="middle", |
| layer="above", |
| ) |
| ) |
| return figure |
|
|
|
|
| def build_latest_event_by_zone(event_history) -> dict[str, object]: |
| latest = {} |
| for event in event_history: |
| if event.zone_id is not None: |
| latest[event.zone_id] = event |
| return latest |
|
|
|
|
| def build_latest_zone_action_by_zone(event_history, action: ActionType) -> dict[str, object]: |
| latest = {} |
| for event in event_history: |
| if event.zone_id is not None and event.action == action: |
| latest[event.zone_id] = event |
| return latest |
|
|
|
|
| def latest_soil_measurement_event(event_history, zone_id: str): |
| for event in reversed(event_history): |
| if event.zone_id == zone_id and event.action == ActionType.MEASURE_SOIL: |
| return event |
| return None |
|
|
|
|
| def latest_field_check_event(event_history, zone_id: str): |
| for event in reversed(event_history): |
| if event.zone_id == zone_id and event.action in {ActionType.INSPECT, ActionType.REVISIT}: |
| return event |
| return None |
|
|
|
|
| def latest_zone_action_event(event_history, zone_id: str, action: ActionType): |
| for event in reversed(event_history): |
| if event.zone_id == zone_id and event.action == action: |
| return event |
| return None |
|
|
|
|
| def format_steps_ago(current_step_index: int, measured_step_index: int | None) -> str: |
| if measured_step_index is None: |
| return "age unknown" |
| delta = max(0, current_step_index - measured_step_index) |
| if delta == 0: |
| return "measured this step" |
| if delta == 1: |
| return "measured 1 step ago" |
| return f"measured {delta} steps ago" |
|
|
|
|
| def format_steps_since(current_step_index: int, event_step_index: int | None, *, label: str = "checked") -> str: |
| if event_step_index is None: |
| return "not yet" |
| delta = max(0, current_step_index - event_step_index) |
| if delta == 0: |
| return f"{label} this step" |
| if delta == 1: |
| return f"{label} 1 step ago" |
| return f"{label} {delta} steps ago" |
|
|
|
|
| def format_optional_number(value, decimals: int = 2, suffix: str = "") -> str: |
| if value is None: |
| return "—" |
| try: |
| numeric = float(value) |
| except (TypeError, ValueError): |
| return str(value) |
| return f"{numeric:.{decimals}f}{suffix}" |
|
|
|
|
| def normalize_priority_label(value) -> str: |
| text = str(value) |
| return { |
| "Inspect now": "Act now", |
| "Inspect soon": "Actionable", |
| "Watch": "Monitor", |
| "Low attention": "Low", |
| }.get(text, text) |
|
|
|
|
| def short_action_label(action_name: str) -> str: |
| return { |
| "move": "M", |
| "inspect": "C", |
| "revisit": "C", |
| "defer": "C", |
| "patrol": "M", |
| "recharge": "M", |
| "remove_weeds": "C", |
| "measure_soil": "S", |
| "apply_fertilizer": "F", |
| "apply_water": "W", |
| "install_drainage": "D", |
| "subsoil": "U", |
| "end_mission": "M", |
| }.get(action_name, action_name[:1].upper()) |
|
|
|
|
| def action_color(action_name: str) -> str: |
| return { |
| "move": "#2563eb", |
| "inspect": "#1f7a8c", |
| "revisit": "#2a9d8f", |
| "defer": "#1f7a8c", |
| "patrol": "#2563eb", |
| "recharge": "#2563eb", |
| "remove_weeds": "#1f7a8c", |
| "measure_soil": "#7c3aed", |
| "apply_fertilizer": "#4f772d", |
| "apply_water": "#2a6f97", |
| "install_drainage": "#8d6e63", |
| "subsoil": "#6b705c", |
| "end_mission": "#2563eb", |
| }.get(action_name, "#264653") |
|
|
|
|
| def build_zone_hover_text(*, zone_state, assessment, event, soil_event, step_index: int) -> str: |
| lines = [] |
| if event is not None: |
| lines.append(f"last_execution={humanize_execution_outcome(event.execution_outcome)}") |
| else: |
| lines.append("last_execution=No execution yet") |
| urgency_score = float(zone_state.static_features.get("scenario_priority_hint", 0.0)) |
| lines.append(f"urgency_score={urgency_score:.2f}") |
| if tile_has_missing_yield(zone_state): |
| lines.append("yield_pred=Missing") |
| next_action = humanize_action_name(str(assessment.recommended_action)) if assessment is not None else "Check field" |
| lines.append(f"next_action={next_action}") |
| if zone_state.soil_measurement is not None: |
| measurement_age = format_steps_ago( |
| step_index, |
| soil_event.step_index if soil_event is not None else None, |
| ) |
| lines.append(f"soil_measurement={measurement_age}") |
| measurement = zone_state.soil_measurement |
| lines.append( |
| f"moisture={measurement.moisture_m3m3:.2f} m3/m3" |
| if measurement.moisture_m3m3 is not None |
| else "moisture=—" |
| ) |
| lines.append( |
| f"nitrogen={measurement.nitrogen_gkg:.2f} g/kg" |
| if measurement.nitrogen_gkg is not None |
| else "nitrogen=—" |
| ) |
| lines.append( |
| f"organic_carbon={measurement.organic_carbon_gkg:.1f} g/kg" |
| if measurement.organic_carbon_gkg is not None |
| else "organic_carbon=—" |
| ) |
| lines.append( |
| f"ph={measurement.ph:.1f}" |
| if measurement.ph is not None |
| else "ph=—" |
| ) |
| else: |
| lines.append("soil_measurement=Not measured") |
| return "<br>".join(lines) |
|
|
|
|
| def build_event_hover_text(event) -> str: |
| lines = [ |
| f"step={event.step_index}", |
| f"step_action={humanize_action_name(str(event.action))}", |
| f"execution={humanize_execution_outcome(event.execution_outcome)}", |
| f"reward={event.reward:.2f}", |
| ] |
| if event_control_intent(event) is not None: |
| lines.append(f"intent={humanize_control_intent(event_control_intent(event))}") |
| if event.control_state_after is not None: |
| lines.append(f"control_state={humanize_control_state(event.control_state_after)}") |
| if event_terminal_reason(event) is not None: |
| lines.append(f"terminal_reason={humanize_terminal_reason(event_terminal_reason(event))}") |
| if event.details.get("revealed_facts"): |
| lines.append("revealed=" + ", ".join(event.details["revealed_facts"][:3])) |
| return "<br>".join(lines) |
|
|
|
|
| CAUTION_URGENCY_THRESHOLD = REAL_FIELD_CAUTION_URGENCY_THRESHOLD |
| RED_URGENCY_THRESHOLD = REAL_FIELD_CRITICAL_URGENCY_THRESHOLD |
|
|
|
|
| def show_planned_trajectory_enabled() -> bool: |
| return bool(st.session_state.get(SHOW_PLANNED_TRAJECTORY_KEY, True)) |
|
|
|
|
| def render_grid_legend() -> None: |
| st.markdown( |
| """ |
| <div style="border:1px solid #c9c0ac;border-radius:8px;padding:10px 14px; |
| background:#fffaf0;font-size:0.82em;line-height:2.0;margin-top:4px"> |
| <strong>Legend</strong><br> |
| <em>Tile fill</em><br> |
| • <span style="background:#bdd9bf;border-radius:3px;padding:1px 8px"> </span> Green: no intervention tile, urgency ≤ 0.52<br> |
| • <span style="background:#f1e3b0;border-radius:3px;padding:1px 8px"> </span> Sand: no intervention tile, urgency > 0.52<br> |
| • <span style="background:#edae49;border-radius:3px;padding:1px 8px"> </span> Orange: intervention tile, urgency < 0.978<br> |
| • <span style="background:#d1495b;border-radius:3px;padding:1px 8px"> </span> Red: intervention tile, urgency ≥ 0.978<br> |
| • <span style="background:#000000;border-radius:3px;padding:1px 8px"> </span> Black: predicted yield is missing, excluded from urgency planning<br> |
| <em>Tile border</em><br> |
| • <span style="color:#9aa6b2;font-size:1.2em">▬</span> Ground truth matches the robot's current belief<br> |
| • <span style="background:#bdd9bf;border-radius:3px;padding:1px 8px"> </span> <span style="background:#f1e3b0;border-radius:3px;padding:1px 8px"> </span> <span style="background:#edae49;border-radius:3px;padding:1px 8px"> </span> <span style="background:#d1495b;border-radius:3px;padding:1px 8px"> </span> Colored border: hidden ground truth urgency differs from the robot's current belief<br> |
| <em>Tile action badges</em><br> |
| • <span style="font-size:1.0em">1🌿</span> Planned tile action from the receding-horizon plan<br> |
| • <span style="font-size:1.0em">→</span> Planned action executed correctly in the last 5 steps<br> |
| • <span style="font-size:1.0em">✕</span> Planned action failed in the last 5 steps<br> |
| <em>Planned route</em><br> |
| • <span style="color:#0f172a;font-size:1.2em">──</span> Dark preview line for the next 10 movement steps<br> |
| • <span style="color:#0f172a;font-size:1.2em">●</span> Connected waypoint dots<br> |
| • <span style="display:inline-block;background:#0f172a;color:white;border-radius:999px;padding:0 7px;font-size:0.9em">1</span> Planned task marker<br> |
| <em>Markers</em><br> |
| • ⊕ 🤖 Robot position<br> |
| • ①② Step order dots |
| </div> |
| """, |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| def render_selected_step_summary(selected_event) -> None: |
| if selected_event is None: |
| return |
| location = format_event_location(selected_event) |
| outcome = humanize_execution_outcome(selected_event.execution_outcome) |
| st.caption( |
| f"Latest step {selected_event.step_index}: " |
| f"{humanize_action_name(str(selected_event.action))} | {location} | {outcome}" |
| ) |
|
|
|
|
| def selected_zone_id(env: SimulatorEnvironment, *, target_zone_id: str | None) -> str: |
| valid_zone_ids = {zone.zone_id for zone in env.zone_states} |
| current = st.session_state.get(SELECTED_ZONE_KEY) |
| if current in valid_zone_ids: |
| return current |
| fallback = ( |
| target_zone_id |
| or (env.current_zone_id if env.current_zone_id in valid_zone_ids else None) |
| or env.home_neighbor_zone_id |
| or env.zone_states[0].zone_id |
| ) |
| st.session_state[SELECTED_ZONE_KEY] = fallback |
| return fallback |
|
|
|
|
| def update_selected_zone_from_plot_state(plot_state, env: SimulatorEnvironment, *, fallback_zone_id: str | None) -> None: |
| zone_id = selected_zone_from_plot_state(plot_state) |
| if zone_id is not None and zone_id in {zone.zone_id for zone in env.zone_states}: |
| st.session_state[SELECTED_ZONE_KEY] = zone_id |
| return |
| selected_zone_id(env, target_zone_id=fallback_zone_id) |
|
|
|
|
| def selected_zone_from_plot_state(plot_state) -> str | None: |
| if plot_state is None: |
| return None |
| selection = getattr(plot_state, "selection", None) |
| if selection is None and isinstance(plot_state, dict): |
| selection = plot_state.get("selection") |
| if selection is None: |
| return None |
| points = getattr(selection, "points", None) |
| if points is None and isinstance(selection, dict): |
| points = selection.get("points") |
| if not points: |
| return None |
| for point in points: |
| customdata = getattr(point, "customdata", None) |
| if customdata is None and isinstance(point, dict): |
| customdata = point.get("customdata") |
| if isinstance(customdata, str) and customdata.startswith("zone_"): |
| return customdata |
| return None |
|
|
|
|
| def latest_event_for_zone(snapshot, zone_id: str): |
| for event in reversed(snapshot.event_history): |
| if event.zone_id == zone_id: |
| return event |
| return None |
|
|
|
|
| def latest_evidence_event_for_zone(snapshot, zone_id: str): |
| for event in reversed(snapshot.event_history): |
| if event.zone_id != zone_id: |
| continue |
| evidence_path = event.details.get("evidence_image_path") |
| if evidence_path and Path(evidence_path).exists(): |
| return event |
| return None |
|
|
|
|
| def overlay_weed_mask(evidence_path: str, mask_path: str) -> bytes: |
| """Blend a blue semi-transparent weed segmentation mask over the evidence image.""" |
| import io |
| import numpy as np |
| from PIL import Image |
|
|
| evidence = Image.open(evidence_path).convert("RGBA") |
| mask = Image.open(mask_path).convert("L") |
| if mask.size != evidence.size: |
| mask = mask.resize(evidence.size, Image.NEAREST) |
| mask_arr = np.array(mask) |
| overlay = np.zeros((*mask_arr.shape, 4), dtype=np.uint8) |
| overlay[mask_arr > 128] = [30, 100, 220, 150] |
| blended = Image.alpha_composite(evidence, Image.fromarray(overlay, "RGBA")) |
| buf = io.BytesIO() |
| blended.convert("RGB").save(buf, format="PNG") |
| return buf.getvalue() |
|
|
|
|
| def render_selected_zone_panel(env: SimulatorEnvironment, snapshot, zone_id: str) -> None: |
| zone = env._zone_state(zone_id) |
| evidence_state = snapshot.observation.zone_evidence_state_by_zone.get(zone_id) |
| workflow = snapshot.observation.riddle_workflows_by_zone.get(zone_id) |
| latest_event = env.latest_event_for_zone(zone_id) |
| inspect_event = env.latest_zone_action_event(zone_id, ActionType.INSPECT) |
| revisit_event = env.latest_zone_action_event(zone_id, ActionType.REVISIT) |
| if inspect_event is None: |
| field_check_event = revisit_event |
| elif revisit_event is None: |
| field_check_event = inspect_event |
| else: |
| field_check_event = inspect_event if inspect_event.step_index >= revisit_event.step_index else revisit_event |
| soil_event = env.latest_zone_action_event(zone_id, ActionType.MEASURE_SOIL) |
| fertilizer_event = env.latest_zone_action_event(zone_id, ActionType.APPLY_FERTILIZER) |
| water_event = env.latest_zone_action_event(zone_id, ActionType.APPLY_WATER) |
| step_index = snapshot.observation.step_index |
| static = zone.static_features or {} |
| weather = zone.weather_proxies or {} |
| inspection_count = evidence_state.guarded_context.inspection_count if evidence_state else 0 |
|
|
| if zone.soil_measurement is not None: |
| m = zone.soil_measurement |
| soil_measurement_summary = ( |
| f"{format_steps_ago(step_index, soil_event.step_index if soil_event is not None else None)} | " |
| f"moisture {format_optional_number(m.moisture_m3m3, 2)}, " |
| f"nitrogen {format_optional_number(m.nitrogen_gkg, 2)}, " |
| f"organic C {format_optional_number(m.organic_carbon_gkg, 1)}, " |
| f"pH {format_optional_number(m.ph, 1)}" |
| ) |
| else: |
| soil_measurement_summary = "not yet" |
|
|
| if latest_event is not None: |
| last_execution_summary = ( |
| f"{humanize_action_name(str(latest_event.action))} -> " |
| f"{humanize_execution_outcome(latest_event.execution_outcome)}" |
| ) |
| else: |
| last_execution_summary = "none yet" |
|
|
| if zone.applied_fertilizer_amount_kg_ha is not None: |
| fertilizer_product = ( |
| zone.fertilizer_recommendation.product_name |
| if zone.fertilizer_recommendation is not None |
| else "fertilizer" |
| ) |
| fertilizer_summary = ( |
| f"{format_steps_ago(step_index, fertilizer_event.step_index if fertilizer_event is not None else None)} " |
| f"({fertilizer_product} @ {zone.applied_fertilizer_amount_kg_ha:.1f} kg/ha)" |
| ) |
| else: |
| fertilizer_summary = "not yet" |
|
|
| if zone.applied_water_amount_mm is not None: |
| water_summary = ( |
| f"{format_steps_ago(step_index, water_event.step_index if water_event is not None else None)} " |
| f"({zone.applied_water_amount_mm:.1f} mm)" |
| ) |
| else: |
| water_summary = "not yet" |
|
|
| with st.expander(f"Selected Field: {short_zone_label(zone_id)}", expanded=False): |
| overview_col, imagery_col, soil_col, robot_col = st.columns(4) |
|
|
| with overview_col: |
| st.markdown("**Overview**") |
| st.write(f"Urgency score: {format_optional_number(static.get('scenario_priority_hint'), 3)}") |
| st.write(f"Predicted yield missing: {'Yes' if tile_has_missing_yield(zone) else 'No'}") |
| st.write(f"Status: {humanize_metric_token(str(static.get('status', 'unknown')))}") |
| st.write(f"Primary stress: {humanize_metric_token(str(static.get('primary_stress', 'unknown')))}") |
| st.write(f"Recommended action: {humanize_metric_token(str(static.get('recommended_action', 'none')))}") |
|
|
| with imagery_col: |
| st.markdown("**Crop And Imagery**") |
| st.write(f"Yield: {format_optional_number(static.get('yield_tha'), 2, ' t/ha')}") |
| st.write(f"Predicted yield: {format_optional_number(static.get('yield_pred'), 2, ' t/ha')}") |
| st.write(f"Field mean yield: {format_optional_number(static.get('field_mean_yield'), 2, ' t/ha')}") |
| st.write(f"Yield deficit: {format_optional_number(static.get('yield_deficit_pct'), 1, ' %')}") |
| st.write(f"NDVI: {format_optional_number(static.get('ndvi_last'), 3)}") |
| st.write(f"NDRE: {format_optional_number(static.get('ndre_last'), 3)}") |
| st.write(f"NDWI: {format_optional_number(static.get('ndwi_last'), 3)}") |
|
|
| with soil_col: |
| st.markdown("**Soil And Field**") |
| st.write(f"Nitrogen: {format_optional_number(static.get('nitrogen_0-5'), 1)}") |
| st.write(f"Soil organic carbon: {format_optional_number(static.get('soc_0-5'), 1)}") |
| st.write(f"Clay: {format_optional_number(static.get('clay_0-5'), 1)}") |
| st.write(f"Sand: {format_optional_number(static.get('sand_0-5'), 1)}") |
| st.write(f"Temperature mean: {format_optional_number(weather.get('temp_mean_mean'), 2)}") |
| st.write(f"Total precipitation: {format_optional_number(weather.get('total_prec_mean'), 2)}") |
|
|
| with robot_col: |
| st.markdown("**Robot State And Actions**") |
| st.write(f"Field checks: {inspection_count}") |
| st.write( |
| f"Last checked: " |
| f"{format_steps_since(step_index, field_check_event.step_index if field_check_event is not None else None)}" |
| ) |
| st.write(f"Last execution: {last_execution_summary}") |
| st.write(f"Soil measured: {soil_measurement_summary}") |
| st.write(f"Fertilizer applied: {fertilizer_summary}") |
| st.write(f"Water applied: {water_summary}") |
|
|
| if workflow is not None and workflow.last_message: |
| st.caption(workflow.last_message) |
|
|
|
|
| def render_metrics_overview(env: SimulatorEnvironment) -> None: |
| metrics = env.metrics() |
| summary_col, technical_col = st.columns(2) |
| with summary_col: |
| with st.expander("Run Summary", expanded=False): |
| st.caption("Plain-language summary of what happened in the simulator episode.") |
| for heading, lines in summarize_metrics(metrics).items(): |
| st.write(f"**{heading}**") |
| for line in lines: |
| st.write(f"- {line}") |
| with technical_col: |
| with st.expander("Technical Metrics", expanded=False): |
| st.json(metrics) |
|
|
|
|
| def summarize_metrics(metrics: dict[str, object]) -> dict[str, list[str]]: |
| return { |
| "Mission Progress": [ |
| f"Steps taken: {int(metrics.get('steps', 0))}", |
| f"Field checks: {int(metrics.get('inspect_actions', 0))}", |
| f"Follow-up checks: {int(metrics.get('revisit_actions', 0))}", |
| f"Soil samples: {int(metrics.get('measure_soil_actions', 0))}", |
| f"Challenges resolved: {int(metrics.get('resolved_challenges', 0))}", |
| ], |
| "Outcome": [ |
| f"Episode status: {humanize_metric_token(str(metrics.get('outcome_status', 'unknown')))}", |
| f"Termination reason: {humanize_metric_token(str(metrics.get('termination_reason', 'in_progress')))}", |
| f"Reward score: {metrics.get('total_reward', 0)}", |
| ], |
| } |
|
|
|
|
| def grid_dimension(zone_states) -> int: |
| return max(max((zone.row or 0) for zone in zone_states), max((zone.col or 0) for zone in zone_states)) |
|
|
|
|
| def add_field_grid_shapes(figure: go.Figure, zone_states) -> None: |
| x_values = [zone.col or 0 for zone in zone_states] |
| y_values = [-(zone.row or 0) for zone in zone_states] |
| if is_rectangular_field(zone_states): |
| vertical_boundaries = [min(x_values) - 0.5] + [x + 0.5 for x in sorted(set(x_values))[:-1]] + [max(x_values) + 0.5] |
| horizontal_boundaries = [max(y_values) + 0.5] + [y - 0.5 for y in sorted(set(y_values), reverse=True)[:-1]] + [min(y_values) - 0.5] |
| for boundary_x in vertical_boundaries: |
| figure.add_shape( |
| type="line", |
| x0=boundary_x, |
| x1=boundary_x, |
| y0=min(y_values) - 0.5, |
| y1=max(y_values) + 0.5, |
| line={"color": "#d7d0c0", "width": 1}, |
| layer="below", |
| ) |
| for boundary_y in horizontal_boundaries: |
| figure.add_shape( |
| type="line", |
| x0=min(x_values) - 0.5, |
| x1=max(x_values) + 0.5, |
| y0=boundary_y, |
| y1=boundary_y, |
| line={"color": "#d7d0c0", "width": 1}, |
| layer="below", |
| ) |
| return |
|
|
| for zone in zone_states: |
| col = zone.col or 0 |
| row = zone.row or 0 |
| figure.add_shape( |
| type="rect", |
| x0=col - 0.5, |
| x1=col + 0.5, |
| y0=-(row) - 0.5, |
| y1=-(row) + 0.5, |
| line={"color": "#d7d0c0", "width": 1}, |
| fillcolor="rgba(255,255,255,0)", |
| layer="below", |
| ) |
|
|
|
|
| def robot_icon_data_uri() -> str: |
| svg = """ |
| <svg xmlns='http://www.w3.org/2000/svg' width='64' height='64' viewBox='0 0 64 64'> |
| <rect x='16' y='18' width='32' height='26' rx='8' fill='#1f2937'/> |
| <rect x='21' y='24' width='8' height='8' rx='4' fill='#f8fafc'/> |
| <rect x='35' y='24' width='8' height='8' rx='4' fill='#f8fafc'/> |
| <rect x='26' y='36' width='12' height='4' rx='2' fill='#93c5fd'/> |
| <rect x='20' y='46' width='4' height='10' rx='2' fill='#1f2937'/> |
| <rect x='40' y='46' width='4' height='10' rx='2' fill='#1f2937'/> |
| <rect x='13' y='28' width='4' height='10' rx='2' fill='#1f2937'/> |
| <rect x='47' y='28' width='4' height='10' rx='2' fill='#1f2937'/> |
| <rect x='30' y='10' width='4' height='8' rx='2' fill='#1f2937'/> |
| <circle cx='32' cy='8' r='4' fill='#2563eb'/> |
| </svg> |
| """.strip() |
| return f"data:image/svg+xml;utf8,{quote(svg)}" |
|
|
|
|
| def base_icon_data_uri() -> str: |
| svg = """ |
| <svg xmlns='http://www.w3.org/2000/svg' width='64' height='64' viewBox='0 0 64 64'> |
| <rect x='10' y='30' width='44' height='20' rx='8' fill='#cbd5e1'/> |
| <rect x='18' y='18' width='28' height='16' rx='6' fill='#475569'/> |
| <rect x='28' y='8' width='8' height='10' rx='3' fill='#1e293b'/> |
| <rect x='24' y='36' width='16' height='8' rx='4' fill='#93c5fd'/> |
| </svg> |
| """.strip() |
| return f"data:image/svg+xml;utf8,{quote(svg)}" |
|
|
|
|
| def planned_action_icon(action_name: str) -> str: |
| return { |
| ActionType.INSPECT.value: "🔎", |
| ActionType.REVISIT.value: "🔎", |
| ActionType.DEFER.value: "⏸", |
| ActionType.REMOVE_WEEDS.value: "🌿", |
| ActionType.MEASURE_SOIL.value: "🧭", |
| ActionType.APPLY_FERTILIZER.value: "🧪", |
| ActionType.APPLY_WATER.value: "💧", |
| ActionType.INSTALL_DRAINAGE.value: "🛠", |
| ActionType.SUBSOIL.value: "⛏", |
| }.get(action_name, "•") |
|
|
|
|
| def planned_task_succeeded(event, *, task_type: str) -> bool: |
| success_by_task = { |
| ActionType.INSPECT.value: ExecutionOutcomeType.INSPECTION_COMPLETED.value, |
| ActionType.REVISIT.value: ExecutionOutcomeType.INSPECTION_COMPLETED.value, |
| ActionType.DEFER.value: ExecutionOutcomeType.DEFERRED.value, |
| ActionType.REMOVE_WEEDS.value: ExecutionOutcomeType.WEED_REMOVAL_COMPLETED.value, |
| ActionType.MEASURE_SOIL.value: ExecutionOutcomeType.SOIL_MEASUREMENT_COMPLETED.value, |
| ActionType.APPLY_FERTILIZER.value: ExecutionOutcomeType.FERTILIZER_APPLIED.value, |
| ActionType.APPLY_WATER.value: ExecutionOutcomeType.WATER_APPLIED.value, |
| ActionType.INSTALL_DRAINAGE.value: ExecutionOutcomeType.DRAINAGE_INSTALLED.value, |
| ActionType.SUBSOIL.value: ExecutionOutcomeType.SUBSOIL_COMPLETED.value, |
| } |
| return str(event.execution_outcome) == success_by_task.get(task_type) |
|
|
|
|
| def planned_task_failed(event, *, task_type: str) -> bool: |
| del task_type |
| return str(event.execution_outcome) in { |
| ExecutionOutcomeType.INVALID_TASK.value, |
| ExecutionOutcomeType.MOVE_REJECTED.value, |
| ExecutionOutcomeType.POSITION_REJECTED.value, |
| ExecutionOutcomeType.RESOURCE_EXHAUSTED.value, |
| } |
|
|
|
|
| def recent_task_status_by_zone_action( |
| env: SimulatorEnvironment, |
| *, |
| ttl_steps: int = RECENT_TASK_ICON_TTL_STEPS, |
| ) -> dict[tuple[str, str], str]: |
| statuses: dict[tuple[str, str], str] = {} |
| minimum_step_index = max(0, env.step_index - ttl_steps) |
| for event in reversed(env.event_history): |
| if event.step_index < minimum_step_index: |
| break |
| planner_task = event.planner_context.normalized_task if event.planner_context is not None else None |
| if planner_task is None or planner_task.target_zone is None: |
| continue |
| task_type = str(planner_task.task_type) |
| if task_type == ActionType.MOVE.value: |
| continue |
| status_key = (planner_task.target_zone, task_type) |
| if status_key in statuses: |
| continue |
| if planned_task_succeeded(event, task_type=task_type): |
| statuses[status_key] = "→" |
| elif planned_task_failed(event, task_type=task_type): |
| statuses[status_key] = "✕" |
| return statuses |
|
|
|
|
| def build_zone_action_badges( |
| env: SimulatorEnvironment, |
| *, |
| mission_plan: MissionPlan | None, |
| ) -> dict[str, list[str]]: |
| badges_by_zone: dict[str, list[str]] = {} |
| horizon = build_planned_horizon_payload(env, mission_plan=mission_plan) |
| recent_statuses = recent_task_status_by_zone_action(env) |
| represented_keys: set[tuple[str, str]] = set() |
|
|
| if horizon is not None: |
| for action in horizon.actions: |
| badge = f"{action.order}{planned_action_icon(action.task_type)}" |
| status = recent_statuses.get((action.zone_id, action.task_type)) |
| if status is not None: |
| badge += status |
| badges_by_zone.setdefault(action.zone_id, []).append(badge) |
| represented_keys.add((action.zone_id, action.task_type)) |
|
|
| for (zone_id, task_type), status in recent_statuses.items(): |
| if (zone_id, task_type) in represented_keys: |
| continue |
| badges_by_zone.setdefault(zone_id, []).append(f"{planned_action_icon(task_type)}{status}") |
|
|
| return badges_by_zone |
|
|
|
|
| def build_zone_cell_text(*, zone, action_badges: list[str]) -> str: |
| del zone |
| if not action_badges: |
| return "" |
| return "<br>".join([" ".join(action_badges)]) |
|
|
|
|
| def numbered_step_offset(index: int) -> tuple[float, float]: |
| offsets = [ |
| (-0.28, 0.28), |
| (-0.02, 0.28), |
| (0.24, 0.28), |
| (-0.28, 0.02), |
| (-0.02, 0.02), |
| (0.24, 0.02), |
| (-0.28, -0.24), |
| (-0.02, -0.24), |
| (0.24, -0.24), |
| ] |
| return offsets[index % len(offsets)] |
|
|
|
|
| _BAND_TO_PRIORITY: dict[str, float] = { |
| "low": 0.15, |
| "medium": 0.35, |
| "high": 0.65, |
| "critical": 0.9, |
| } |
|
|
|
|
| def priority_fill_color(priority: float, *, needs_intervention: bool, yield_pred_missing: bool = False) -> str: |
| if yield_pred_missing: |
| return MISSING_YIELD_COLOR |
| if needs_intervention: |
| if priority >= RED_URGENCY_THRESHOLD: |
| return "#d1495b" |
| return "#edae49" |
| if priority > CAUTION_URGENCY_THRESHOLD: |
| return "#f1e3b0" |
| return "#bdd9bf" |
|
|
|
|
| def ground_truth_fill_color(hidden) -> str: |
| needs_intervention = truth_tile_needs_intervention(hidden) |
| priority_hint = getattr(hidden, "true_priority_hint", None) |
| if priority_hint is not None: |
| priority = float(priority_hint or 0.0) |
| else: |
| band = getattr(getattr(hidden, "true_priority_band", None), "value", None) or "low" |
| priority = _BAND_TO_PRIORITY.get(str(band).lower(), 0.0) |
| return priority_fill_color( |
| priority, |
| needs_intervention=needs_intervention, |
| yield_pred_missing=bool(getattr(hidden, "true_yield_pred_missing", False)), |
| ) |
|
|
|
|
| def zone_border_style(*, hidden, is_current_zone: bool, measured_fill_color: str) -> tuple[str, int]: |
| """Return (border_color, border_width). |
| |
| Robot position → dark ink, width 4 |
| Belief/truth discrepancy → ground truth fill color as border, width 3 |
| Belief matches truth → subtle blue-gray, width 1 |
| """ |
| if is_current_zone: |
| return "#21312a", 4 |
| truth_color = ground_truth_fill_color(hidden) |
| if truth_color != measured_fill_color: |
| return truth_color, 3 |
| return "#9aa6b2", 1 |
|
|
|
|
| def visible_tile_needs_intervention(zone_state) -> bool: |
| mapped_action = action_type_for_field_recommendation(zone_state.static_features.get("recommended_action")) |
| if mapped_action in SERVICE_ACTIONS: |
| return True |
| return bool(zone_state.static_features.get("needs_intervention", False)) |
|
|
|
|
| def tile_has_missing_yield(zone_state) -> bool: |
| return bool(zone_state.static_features.get("yield_pred_missing", False)) |
|
|
|
|
| def truth_tile_needs_intervention(hidden) -> bool: |
| if bool(getattr(hidden, "true_needs_intervention", False)): |
| return True |
| mapped_action = action_type_for_field_recommendation(getattr(hidden, "true_recommended_action", "none")) |
| if mapped_action in SERVICE_ACTIONS: |
| return True |
| return ( |
| getattr(hidden, "weed_removal_required", False) |
| or getattr(hidden, "fertilizer_recommendation", None) is not None |
| or getattr(hidden, "irrigation_recommendation", None) is not None |
| ) |
|
|
|
|
| def format_step_option(event) -> str: |
| return f"Step {event.step_index}: {humanize_action_name(str(event.action))} | {format_event_location(event)}" |
|
|
|
|
| def format_event_location(event) -> str: |
| from_zone = event.details.get("from_zone_id") |
| to_zone = event.details.get("to_zone_id") or event.zone_id |
| if str(event.action) == ActionType.MOVE.value and from_zone and to_zone: |
| return f"{short_zone_label(from_zone)} -> {short_zone_label(to_zone)}" |
| if to_zone is None: |
| return "global action" |
| return f"at {short_zone_label(to_zone)}" |
|
|
|
|
| def current_location_label(zone_id: str | None) -> str: |
| if zone_id in {None, "home_base"}: |
| return "Home Base" |
| return short_zone_label(zone_id) |
|
|
|
|
| def short_zone_label(zone_id: str | None) -> str: |
| if not zone_id: |
| return "None" |
| if zone_id == "home_base": |
| return "Home Base" |
| try: |
| _, row_token, col_token = zone_id.split("_") |
| return f"R{int(row_token.replace('r', ''))}C{int(col_token.replace('c', ''))}" |
| except (ValueError, IndexError): |
| return zone_id |
|
|
|
|
| def priority_band_label(priority: float) -> str: |
| if priority >= 0.8: |
| return "Act now" |
| if priority >= 0.6: |
| return "Actionable" |
| if priority >= 0.35: |
| return "Monitor" |
| return "Low" |
|
|
|
|
| def priority_band_short_label(priority: float) -> str: |
| if priority >= 0.8: |
| return "Now" |
| if priority >= 0.6: |
| return "Act" |
| if priority >= 0.35: |
| return "Mon" |
| return "Low" |
|
|
|
|
| def humanize_constraint(constraint_level) -> str: |
| value = enum_value(constraint_level) |
| return { |
| "none": "No constraint", |
| "soft": "Needs reconfirmation", |
| "hard": "High constraint", |
| }.get(value, str(value)) |
|
|
|
|
| def humanize_action_name(action_name: str) -> str: |
| return { |
| "move": "Move", |
| "inspect": "Check field", |
| "revisit": "Check field", |
| "defer": "Check field", |
| "patrol": "Move", |
| "recharge": "Move", |
| "remove_weeds": "Check field", |
| "measure_soil": "Take soil sample", |
| "apply_fertilizer": "Apply fertilizer", |
| "apply_water": "Apply water", |
| "install_drainage": "Install drainage", |
| "subsoil": "Subsoil", |
| "end_mission": "Move", |
| }.get(action_name, action_name.replace("_", " ").title()) |
|
|
|
|
| def humanize_metric_token(value: str) -> str: |
| return value.replace("_", " ").title() |
|
|
|
|
| def humanize_control_intent(control_intent) -> str: |
| value = enum_value(control_intent) |
| return { |
| ResolutionIntent.CONTINUE_INSPECTION.value: "Continue field check", |
| ResolutionIntent.REQUEST_RECHECK.value: "Continue field check", |
| ResolutionIntent.DEFER_ZONE.value: "No action on tile", |
| ResolutionIntent.STOP_FOR_RESOURCE_LIMIT.value: "Stop for resource limit", |
| ResolutionIntent.STOP_FOR_BACKEND_ERROR.value: "Stop for backend error", |
| }.get(value, humanize_metric_token(str(value))) |
|
|
|
|
| def humanize_control_state(control_state) -> str: |
| value = enum_value(control_state) |
| return { |
| ControlState.UNSEEN.value: "Unseen", |
| ControlState.INSPECT_REQUIRED.value: "Field check required", |
| ControlState.RECHECK_REQUIRED.value: "Field check required", |
| ControlState.RESOLVED_COMPLETE.value: "Resolved complete", |
| ControlState.RESOLVED_DEFERRED.value: "No action applied", |
| ControlState.RESOLVED_ERROR.value: "Resolved error", |
| }.get(value, humanize_metric_token(str(value))) |
|
|
|
|
| def humanize_terminal_reason(terminal_reason) -> str: |
| value = enum_value(terminal_reason) |
| return { |
| TerminalReason.COMPLETION.value: "Completion", |
| TerminalReason.DEFERRED.value: "No action applied", |
| TerminalReason.RESOURCE_LIMIT.value: "Resource limit", |
| TerminalReason.BACKEND_ERROR.value: "Backend error", |
| }.get(value, humanize_metric_token(str(value))) |
|
|
|
|
| def humanize_execution_outcome(execution_outcome) -> str: |
| value = enum_value(execution_outcome) |
| return { |
| ExecutionOutcomeType.MISSION_STOPPED.value: "Mission stopped", |
| ExecutionOutcomeType.INVALID_TASK.value: "Invalid task", |
| ExecutionOutcomeType.MOVE_COMPLETED.value: "Move completed", |
| ExecutionOutcomeType.MOVE_SKIPPED.value: "Move skipped", |
| ExecutionOutcomeType.MOVE_REJECTED.value: "Move rejected", |
| ExecutionOutcomeType.POSITION_REJECTED.value: "Position rejected", |
| ExecutionOutcomeType.PATROL_COMPLETED.value: "Move completed", |
| ExecutionOutcomeType.RECHARGE_COMPLETED.value: "Move completed", |
| ExecutionOutcomeType.INSPECTION_COMPLETED.value: "Field check completed", |
| ExecutionOutcomeType.WEED_REMOVAL_COMPLETED.value: "Weed removal completed", |
| ExecutionOutcomeType.SOIL_MEASUREMENT_COMPLETED.value: "Soil sample completed", |
| ExecutionOutcomeType.FERTILIZER_APPLIED.value: "Fertilizer applied", |
| ExecutionOutcomeType.WATER_APPLIED.value: "Water applied", |
| ExecutionOutcomeType.DRAINAGE_INSTALLED.value: "Drainage installed", |
| ExecutionOutcomeType.SUBSOIL_COMPLETED.value: "Subsoil completed", |
| ExecutionOutcomeType.DEFERRED.value: "No action applied", |
| ExecutionOutcomeType.RESOURCE_EXHAUSTED.value: "Resource exhausted", |
| }.get(value, humanize_metric_token(str(value))) |
|
|
|
|
| def render_plan(snapshot) -> None: |
| if not snapshot.mission_plan.ordered_tasks: |
| st.write("No mission plan available yet.") |
| return |
| current_task = snapshot.mission_plan.ordered_tasks[0] |
| st.write(f"**Next Robot Step:** {humanize_action_name(str(current_task.task_type))}") |
| if current_task.control_intent is not None: |
| st.write(f"**Control Intent:** {humanize_control_intent(current_task.control_intent)}") |
| st.write(f"**Target Field:** {short_zone_label(current_task.target_zone)}") |
| st.write(f"**Rationale:** {current_task.rationale}") |
| st.write("**Plan Queue**") |
| for task in snapshot.mission_plan.ordered_tasks[:6]: |
| st.write( |
| f"- {humanize_action_name(str(task.task_type))} on {short_zone_label(task.target_zone)} " |
| f"(planner score {task.priority:.2f}" |
| + (f", {humanize_control_intent(task.control_intent)}" if task.control_intent is not None else "") |
| + ")" |
| ) |
|
|
| st.write("**Field Assessments**") |
| for assessment in snapshot.assessments[:6]: |
| st.write( |
| f"- {short_zone_label(assessment.zone_id)}: {priority_band_label(assessment.priority_score)} | " |
| f"{humanize_action_name(str(assessment.recommended_action))} next | " |
| f"{humanize_control_state(assessment.control_state)} | " |
| f"{humanize_constraint(assessment.constraint_level)} | uncertainty {assessment.uncertainty:.2f}" |
| ) |
|
|
|
|
| def format_control_state_transition(event) -> str | None: |
| before = event.control_state_before |
| after = event.control_state_after |
| if before is None and after is None: |
| return None |
| if before is None: |
| return humanize_control_state(after) |
| if after is None: |
| return humanize_control_state(before) |
| before_label = humanize_control_state(before) |
| after_label = humanize_control_state(after) |
| if before_label == after_label: |
| return after_label |
| return f"{before_label} -> {after_label}" |
|
|
|
|
| def describe_task(task) -> str: |
| if task is None: |
| return "No planner task recorded" |
| task_text = humanize_action_name(str(task.task_type)) |
| if task.target_zone is not None: |
| task_text += f" on {short_zone_label(task.target_zone)}" |
| if task.control_intent is not None: |
| task_text += f" ({humanize_control_intent(task.control_intent)})" |
| return task_text |
|
|
|
|
| def render_assessment_snapshot(title: str, assessment) -> None: |
| st.write(f"**{title}**") |
| if assessment is None: |
| st.caption("No assessment snapshot recorded for this step.") |
| return |
| st.write( |
| f"{short_zone_label(assessment.zone_id)} | {priority_band_label(assessment.priority_score)} | " |
| f"{humanize_action_name(str(assessment.recommended_action))} next | " |
| f"{humanize_control_state(assessment.control_state)} | " |
| f"{humanize_constraint(assessment.constraint_level)} | uncertainty {assessment.uncertainty:.2f}" |
| ) |
| if assessment.recommended_control_intent is not None: |
| st.write(f"Control intent: {humanize_control_intent(assessment.recommended_control_intent)}") |
| if assessment.supporting_evidence: |
| st.write("Supporting evidence") |
| for fact in assessment.supporting_evidence[:5]: |
| st.write(f"- {fact}") |
|
|
|
|
| def render_assessment_after_step(summary: dict[str, object] | None) -> None: |
| st.write("**Field State After Step**") |
| if not summary: |
| st.caption("No post-step field assessment was recorded.") |
| return |
| st.write( |
| f"{short_zone_label(summary.get('zone_id'))} | {normalize_priority_label(summary.get('inspection_priority'))} | " |
| f"{summary.get('control_state')} | uncertainty {float(summary.get('uncertainty', 0.0)):.2f}" |
| ) |
| st.write( |
| f"Visit count: {summary.get('visit_count', 0)}" |
| + (f" | Revisits: {summary.get('revisit_count', 0)}" if summary.get("revisit_count") is not None else "") |
| ) |
|
|
|
|
| def render_plan_and_timeline(snapshot) -> None: |
| """Show the current plan as an inline 'next step' banner, then the full step history below.""" |
| plan = snapshot.mission_plan |
| if plan.ordered_tasks: |
| task = plan.ordered_tasks[0] |
| target_label = short_zone_label(task.target_zone) if task.target_zone else "—" |
| intent_suffix = f" ({humanize_control_intent(task.control_intent)})" if task.control_intent else "" |
| queue_preview = ", ".join( |
| f"{humanize_action_name(str(t.task_type))} {short_zone_label(t.target_zone)}" |
| for t in plan.ordered_tasks[1:4] |
| ) |
| st.markdown( |
| f"**Next → {humanize_action_name(str(task.task_type))} on {target_label}{intent_suffix}** " |
| f"_{task.rationale}_", |
| unsafe_allow_html=True, |
| ) |
| if queue_preview: |
| st.caption(f"Queue: {queue_preview}") |
| default_show_timeline = snapshot.observation.mode != SimulatorMode.REAL_FIELD |
| show_timeline = st.toggle( |
| "Show step timeline", |
| value=bool(st.session_state.get(SHOW_TIMELINE_BODY_KEY, default_show_timeline)), |
| key=SHOW_TIMELINE_BODY_KEY, |
| ) |
| if not show_timeline: |
| if snapshot.event_history: |
| st.caption(f"Step timeline hidden. {len(snapshot.event_history)} recorded steps are available on demand.") |
| else: |
| st.caption("No events yet.") |
| return |
| if snapshot.event_history: |
| timeline_window = st.selectbox( |
| "Timeline Depth", |
| TIMELINE_WINDOW_OPTIONS, |
| index=0, |
| key=TIMELINE_WINDOW_KEY, |
| format_func=lambda value: "All steps" if value == 0 else f"Latest {value} steps", |
| ) |
| total_steps = len(snapshot.event_history) |
| if timeline_window and total_steps > timeline_window: |
| st.caption(f"Showing the latest {timeline_window} of {total_steps} steps.") |
| render_step_timeline(snapshot) |
|
|
|
|
| def render_step_timeline(snapshot) -> None: |
| if not snapshot.event_history: |
| st.write("No events yet.") |
| return |
| latest_step_index = snapshot.event_history[-1].step_index |
| timeline_window = int(st.session_state.get(TIMELINE_WINDOW_KEY, TIMELINE_WINDOW_OPTIONS[0])) |
| events = snapshot.event_history if timeline_window == 0 else snapshot.event_history[-timeline_window:] |
| for event in events: |
| transition = format_control_state_transition(event) |
| label_parts = [ |
| f"Step {event.step_index}", |
| humanize_action_name(str(event.action)), |
| format_event_location(event), |
| ] |
| if transition is not None: |
| label_parts.append(transition) |
| with st.expander(" | ".join(label_parts), expanded=event.step_index == latest_step_index): |
| summary_left, summary_middle, summary_right = st.columns([2, 1, 1]) |
| with summary_left: |
| st.write("**Step Overview**") |
| st.write(f"Action: {humanize_action_name(str(event.action))}") |
| st.write(f"Location: {format_event_location(event)}") |
| st.write(f"Outcome: {humanize_execution_outcome(event.execution_outcome)}") |
| st.write(f"Reward: {event.reward:.2f}") |
| with summary_middle: |
| st.write("**Control Outcome**") |
| if transition is not None: |
| st.write(f"Control state: {transition}") |
| if event_control_intent(event) is not None: |
| st.write(f"Intent: {humanize_control_intent(event_control_intent(event))}") |
| if event.control_event is not None and event.control_event.rejection_reason is not None: |
| st.write(f"Rejection reason: `{event.control_event.rejection_reason}`") |
| if event_terminal_reason(event) is not None: |
| st.write(f"Terminal reason: {humanize_terminal_reason(event_terminal_reason(event))}") |
| if event.details.get("reason"): |
| st.write(f"Step reason: {event.details['reason']}") |
| if event.planner_context is not None: |
| chosen = event.planner_context.chosen_task_before_normalization |
| normalized = event.planner_context.normalized_task |
| if chosen != normalized: |
| st.write(f"Normalized task: {describe_task(normalized)}") |
| with summary_right: |
| st.write("**Evidence And Context**") |
| context = event.guarded_context_after |
| if context is None: |
| st.caption("No context recorded.") |
| else: |
| st.write(f"Field checks: {context.inspection_count}") |
| st.write(f"Revisits: {context.revisit_count}") |
| if event.details.get("uncertainty_delta") is not None: |
| st.write(f"Uncertainty delta: {float(event.details['uncertainty_delta']):+.2f}") |
| if event.planner_context is not None: |
| st.write(f"Evidence reused: {'Yes' if event.planner_context.evidence_reused else 'No'}") |
| if event.planner_context.fixture_identity is not None: |
| st.write(f"Fixture: `{event.planner_context.fixture_identity}`") |
|
|
| revealed_facts = event.details.get("revealed_facts") or ["No new field information was revealed."] |
| st.write("**What Happened In This Step**") |
| for fact in revealed_facts[:6]: |
| st.write(f"- {fact}") |
|
|
| render_assessment_snapshot( |
| "Planner View Before Step", |
| event.planner_context.selected_assessment if event.planner_context else None, |
| ) |
| if event.planner_context is not None: |
| st.write("**Why This Decision**") |
| max_e = snapshot.observation.max_energy |
| if event.planner_context.battery_before_step is not None and max_e: |
| pct_before = battery_percentage_int(event.planner_context.battery_before_step, max_e) |
| pct_after = battery_percentage_int(event.planner_context.estimated_battery_after_step, max_e) |
| st.write(f"Battery: {pct_before}% → {pct_after}%") |
| if event.planner_context.return_distance_before_step is not None: |
| st.write(f"Hops to base: {event.planner_context.return_distance_before_step} → {event.planner_context.return_distance_after_step}") |
| all_candidates = list(event.planner_context.candidate_summaries) + list(event.planner_context.adjacent_candidate_summaries) |
| if all_candidates: |
| st.write("Other options considered") |
| for summary in all_candidates: |
| target_label = short_zone_label(summary.target_zone) if summary.target_zone else "—" |
| if summary.rejected_for_battery_infeasibility: |
| note = " [battery-infeasible]" |
| elif summary.rejection_reason: |
| note = f" [{summary.rejection_reason}]" |
| else: |
| note = "" |
| st.write(f"- {humanize_action_name(str(summary.task_type))} {target_label} — utility {summary.net_utility:.2f}{note}") |
| render_assessment_after_step(event.details.get("assessment_summary_after_step")) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|