"""Operator debugging console for OpenRange. Provides a lightweight HTML-based single-page application for monitoring the range environment state, viewing action history, and triggering resets. """ from __future__ import annotations import copy import time from typing import Any from fastapi import APIRouter, Request from fastapi.responses import HTMLResponse, JSONResponse console_router = APIRouter(prefix="/console", tags=["console"]) # --------------------------------------------------------------------------- # In-memory action history (shared across the module) # --------------------------------------------------------------------------- _action_history: list[dict[str, Any]] = [] _MAX_HISTORY = 50 # keep more than 20 internally, but serve 20 _published_episode: dict[str, dict[str, Any]] | None = None def record_action(action_record: dict[str, Any]) -> None: """Append an action record to the console history ring buffer.""" _action_history.append(action_record) if len(_action_history) > _MAX_HISTORY: del _action_history[: len(_action_history) - _MAX_HISTORY] def clear_history() -> None: """Clear the action history (called on reset).""" _action_history.clear() def get_history(limit: int = 20) -> list[dict[str, Any]]: """Return the most recent *limit* action records, newest first.""" return list(reversed(_action_history[-limit:])) def publish_episode(snapshot: Any, state: Any) -> None: """Publish the latest episode summary for console readers. This is the bridge used by real reset/step traffic from HTTP handlers, where OpenEnv creates short-lived environment instances per request. """ global _published_episode topo = snapshot.topology if snapshot and isinstance(snapshot.topology, dict) else {} hosts = topo.get("hosts", []) zones = topo.get("zones", {}) vuln_count = 0 if snapshot is not None and getattr(snapshot, "truth_graph", None) is not None: vuln_count = len(getattr(snapshot.truth_graph, "vulns", []) or []) _published_episode = { "snapshot": { "id": getattr(state, "episode_id", None), "tier": topo.get("tier", getattr(state, "tier", 1)), "hosts": list(hosts) if isinstance(hosts, list) else [], "zones": copy.deepcopy(zones) if isinstance(zones, dict) else {}, "vuln_count": vuln_count, }, "episode": { "step_count": int(getattr(state, "step_count", 0) or 0), "flags_found": len(getattr(state, "flags_found", []) or []), "mode": getattr(state, "mode", ""), "services_status": copy.deepcopy(getattr(state, "services_status", {}) or {}), }, } def clear_episode() -> None: """Clear the published episode summary.""" global _published_episode _published_episode = None def get_published_episode() -> dict[str, dict[str, Any]] | None: """Return a defensive copy of the published episode summary.""" return copy.deepcopy(_published_episode) # --------------------------------------------------------------------------- # API routes # --------------------------------------------------------------------------- @console_router.get("/api/snapshot") async def api_snapshot(request: Request) -> JSONResponse: """Return current snapshot metadata (no truth graph or flags).""" ctx = _get_env_context(request) published = ctx.get("published_episode") if published is not None: return JSONResponse({ **published["snapshot"], "state_scope": ctx["state_scope"], "session_id": ctx["session_id"], "warning": ctx["warning"], }) env = ctx["env"] snapshot = env.snapshot if snapshot is None: return JSONResponse({ "id": None, "tier": None, "hosts": [], "zones": {}, "vuln_count": 0, "state_scope": ctx["state_scope"], "session_id": ctx["session_id"], "warning": ctx["warning"], }) topo = snapshot.topology if isinstance(snapshot.topology, dict) else {} hosts = topo.get("hosts", []) zones = topo.get("zones", {}) tier = topo.get("tier", 1) vuln_count = len(snapshot.truth_graph.vulns) if snapshot.truth_graph else 0 return JSONResponse({ "id": env.state.episode_id, "tier": tier, "hosts": hosts, "zones": zones, "vuln_count": vuln_count, "state_scope": ctx["state_scope"], "session_id": ctx["session_id"], "warning": ctx["warning"], }) @console_router.get("/api/episode") async def api_episode(request: Request) -> JSONResponse: """Return current episode state.""" ctx = _get_env_context(request) published = ctx.get("published_episode") if published is not None: return JSONResponse({ **published["episode"], "state_scope": ctx["state_scope"], "session_id": ctx["session_id"], "warning": ctx["warning"], }) env = ctx["env"] state = env.state return JSONResponse({ "step_count": state.step_count, "flags_found": len(state.flags_found), "mode": state.mode, "services_status": state.services_status, "state_scope": ctx["state_scope"], "session_id": ctx["session_id"], "warning": ctx["warning"], }) @console_router.get("/api/history") async def api_history() -> JSONResponse: """Return recent action history (last 20 actions with timestamps).""" return JSONResponse(get_history(20)) @console_router.get("", response_class=HTMLResponse) @console_router.get("/", response_class=HTMLResponse) async def console_page() -> HTMLResponse: """Serve the single-page operator console.""" return HTMLResponse(_CONSOLE_HTML) # --------------------------------------------------------------------------- # Helper to retrieve the environment from app state # --------------------------------------------------------------------------- def _get_env_context(request: Request) -> dict[str, Any]: """Resolve the environment context used by the console endpoints. Priority: 1. Active OpenEnv WebSocket session environment (session-scoped truth) 2. Published reset/step state from real HTTP traffic 3. ``app.state.env`` fallback environment (global app scope) 4. Lazily created fallback environment (tests/dev) """ app = request.app server = getattr(app.state, "openenv_server", None) sessions = getattr(server, "_sessions", None) if isinstance(sessions, dict) and sessions: if len(sessions) == 1: session_id, env = next(iter(sessions.items())) return { "env": env, "published_episode": None, "state_scope": "websocket_session", "session_id": session_id, "warning": None, } session_info = getattr(server, "_session_info", {}) selected_id = max( sessions.keys(), key=lambda sid: float(getattr(session_info.get(sid), "last_activity_at", 0.0) or 0.0), ) return { "env": sessions[selected_id], "published_episode": None, "state_scope": "websocket_session", "session_id": selected_id, "warning": ( f"{len(sessions)} active sessions detected; " f"showing the most recently active session ({selected_id})." ), } published = get_published_episode() if published is not None: return { "env": None, "published_episode": published, "state_scope": "published_episode", "session_id": None, "warning": ( "No active WebSocket session found; console is showing the most " "recent reset/step state observed by the server." ), } if hasattr(app.state, "env"): return { "env": app.state.env, "published_episode": None, "state_scope": "app_state_env", "session_id": None, "warning": ( "No active WebSocket session found; console is showing shared " "app-state environment data." ), } # Fallback: create an ephemeral environment (tests/dev) from open_range.server.environment import RangeEnvironment if not hasattr(app.state, "_fallback_env"): app.state._fallback_env = RangeEnvironment(docker_available=False) return { "env": app.state._fallback_env, "published_episode": None, "state_scope": "fallback_env", "session_id": None, "warning": "Console is using a fallback environment (no server session available).", } def _get_env(request: Request) -> Any: """Compatibility helper for callers that only need the env object.""" return _get_env_context(request)["env"] # --------------------------------------------------------------------------- # HTML template (inline JS, no build step) # --------------------------------------------------------------------------- _CONSOLE_HTML = """\ OpenRange Operator Console

OpenRange Operator Console

Debugging dashboard — auto-refreshes every 2s

Snapshot
No snapshot loaded
Episode State
Waiting for data...
Action History (last 20)
No actions recorded
"""