Lars Talian commited on
Commit
0d7e48b
·
2 Parent(s): a72929aeddb429

Merge remote-tracking branch 'origin/main' into codex/issue-77-20260308

Browse files
src/open_range/builder/snapshot_store.py CHANGED
@@ -46,7 +46,6 @@ class SnapshotStore:
46
  The snapshot ID string.
47
  """
48
  if snapshot_id is None:
49
- hosts = snapshot.topology.get("hosts", [])
50
  vuln_types = [v.type for v in snapshot.truth_graph.vulns]
51
  snapshot_id = (
52
  f"snap_{'_'.join(vuln_types[:3])}"
@@ -63,21 +62,7 @@ class SnapshotStore:
63
  )
64
 
65
  # Write metadata sidecar for fast listing
66
- meta = {
67
- "snapshot_id": snapshot_id,
68
- "vuln_classes": [v.type for v in snapshot.truth_graph.vulns],
69
- "golden_path_steps": len(snapshot.golden_path),
70
- "flag_count": len(snapshot.flags),
71
- "npc_count": len(snapshot.npc_personas),
72
- "has_compose": bool(snapshot.compose),
73
- "has_payload_files": bool(snapshot.files),
74
- "live_validated": bool(snapshot.topology.get("live_validated", False)),
75
- "parent_snapshot_id": snapshot.lineage.parent_snapshot_id,
76
- "root_snapshot_id": snapshot.lineage.root_snapshot_id,
77
- "generation_depth": snapshot.lineage.generation_depth,
78
- "mutation_summary": list(snapshot.lineage.mutation_summary),
79
- "stored_at": time.time(),
80
- }
81
  meta_path = snap_dir / "metadata.json"
82
  meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
83
 
@@ -113,38 +98,69 @@ class SnapshotStore:
113
  else: # latest -- sort by parent dir mtime
114
  chosen = max(spec_files, key=lambda p: p.stat().st_mtime)
115
 
116
- raw = json.loads(chosen.read_text(encoding="utf-8"))
117
  return StoredSnapshot(
118
  snapshot_id=chosen.parent.name,
119
- snapshot=SnapshotSpec.model_validate(raw),
120
  )
121
 
122
  async def list_entries(self) -> list[StoredSnapshot]:
123
  """Return every stored snapshot plus its persisted ID."""
124
  entries: list[StoredSnapshot] = []
125
  for spec_path in sorted(self.store_dir.glob("*/spec.json")):
126
- raw = json.loads(spec_path.read_text(encoding="utf-8"))
127
  entries.append(
128
  StoredSnapshot(
129
  snapshot_id=spec_path.parent.name,
130
- snapshot=SnapshotSpec.model_validate(raw),
131
  )
132
  )
133
  return entries
134
 
 
 
 
 
135
  async def list_snapshots(self) -> list[dict[str, Any]]:
136
  """List all snapshots with their metadata.
137
 
138
  Returns:
139
  List of metadata dicts, sorted by stored_at descending.
140
  """
 
 
141
  results: list[dict[str, Any]] = []
142
- for meta_path in self.store_dir.glob("*/metadata.json"):
 
 
143
  try:
144
- meta = json.loads(meta_path.read_text(encoding="utf-8"))
145
- results.append(meta)
 
 
 
 
 
 
 
146
  except (json.JSONDecodeError, OSError) as exc:
147
- logger.warning("Skipping corrupt metadata: %s (%s)", meta_path, exc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
  results.sort(key=lambda m: m.get("stored_at", 0), reverse=True)
150
  return results
@@ -158,8 +174,7 @@ class SnapshotStore:
158
  spec_path = self.store_dir / snapshot_id / "spec.json"
159
  if not spec_path.exists():
160
  raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
161
- raw = json.loads(spec_path.read_text(encoding="utf-8"))
162
- return SnapshotSpec.model_validate(raw)
163
 
164
  async def get_entry(self, snapshot_id: str) -> StoredSnapshot:
165
  """Load a specific snapshot plus its ID."""
@@ -167,3 +182,34 @@ class SnapshotStore:
167
  snapshot_id=snapshot_id,
168
  snapshot=await self.get(snapshot_id),
169
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  The snapshot ID string.
47
  """
48
  if snapshot_id is None:
 
49
  vuln_types = [v.type for v in snapshot.truth_graph.vulns]
50
  snapshot_id = (
51
  f"snap_{'_'.join(vuln_types[:3])}"
 
62
  )
63
 
64
  # Write metadata sidecar for fast listing
65
+ meta = self._metadata_from_snapshot(snapshot_id, snapshot)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
  meta_path = snap_dir / "metadata.json"
67
  meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
68
 
 
98
  else: # latest -- sort by parent dir mtime
99
  chosen = max(spec_files, key=lambda p: p.stat().st_mtime)
100
 
 
101
  return StoredSnapshot(
102
  snapshot_id=chosen.parent.name,
103
+ snapshot=self._load_spec(chosen),
104
  )
105
 
106
  async def list_entries(self) -> list[StoredSnapshot]:
107
  """Return every stored snapshot plus its persisted ID."""
108
  entries: list[StoredSnapshot] = []
109
  for spec_path in sorted(self.store_dir.glob("*/spec.json")):
 
110
  entries.append(
111
  StoredSnapshot(
112
  snapshot_id=spec_path.parent.name,
113
+ snapshot=self._load_spec(spec_path),
114
  )
115
  )
116
  return entries
117
 
118
+ async def count_entries(self) -> int:
119
+ """Return canonical snapshot count based on persisted specs."""
120
+ return len(await self.list_entries())
121
+
122
  async def list_snapshots(self) -> list[dict[str, Any]]:
123
  """List all snapshots with their metadata.
124
 
125
  Returns:
126
  List of metadata dicts, sorted by stored_at descending.
127
  """
128
+ entries = await self.list_entries()
129
+ spec_ids = {entry.snapshot_id for entry in entries}
130
  results: list[dict[str, Any]] = []
131
+ for entry in entries:
132
+ meta_path = self.store_dir / entry.snapshot_id / "metadata.json"
133
+ existing_meta: dict[str, Any] | None = None
134
  try:
135
+ if meta_path.exists():
136
+ loaded = json.loads(meta_path.read_text(encoding="utf-8"))
137
+ if isinstance(loaded, dict):
138
+ existing_meta = loaded
139
+ else:
140
+ logger.warning(
141
+ "Repairing metadata sidecar with non-object payload: %s",
142
+ meta_path,
143
+ )
144
  except (json.JSONDecodeError, OSError) as exc:
145
+ logger.warning("Repairing corrupt metadata: %s (%s)", meta_path, exc)
146
+
147
+ stored_at = existing_meta.get("stored_at") if existing_meta else None
148
+ canonical = self._metadata_from_snapshot(
149
+ entry.snapshot_id,
150
+ entry.snapshot,
151
+ stored_at=stored_at if isinstance(stored_at, (int, float)) else None,
152
+ )
153
+ results.append(canonical)
154
+
155
+ if existing_meta != canonical:
156
+ try:
157
+ meta_path.write_text(json.dumps(canonical, indent=2), encoding="utf-8")
158
+ except OSError as exc:
159
+ logger.warning("Failed to repair metadata sidecar %s (%s)", meta_path, exc)
160
+
161
+ for meta_path in self.store_dir.glob("*/metadata.json"):
162
+ if meta_path.parent.name not in spec_ids:
163
+ logger.warning("Ignoring orphan metadata without spec.json: %s", meta_path)
164
 
165
  results.sort(key=lambda m: m.get("stored_at", 0), reverse=True)
166
  return results
 
174
  spec_path = self.store_dir / snapshot_id / "spec.json"
175
  if not spec_path.exists():
176
  raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
177
+ return self._load_spec(spec_path)
 
178
 
179
  async def get_entry(self, snapshot_id: str) -> StoredSnapshot:
180
  """Load a specific snapshot plus its ID."""
 
182
  snapshot_id=snapshot_id,
183
  snapshot=await self.get(snapshot_id),
184
  )
185
+
186
+ @staticmethod
187
+ def _metadata_from_snapshot(
188
+ snapshot_id: str,
189
+ snapshot: SnapshotSpec,
190
+ *,
191
+ stored_at: float | None = None,
192
+ ) -> dict[str, Any]:
193
+ return {
194
+ "snapshot_id": snapshot_id,
195
+ "vuln_classes": [v.type for v in snapshot.truth_graph.vulns],
196
+ "golden_path_steps": len(snapshot.golden_path),
197
+ "flag_count": len(snapshot.flags),
198
+ "npc_count": len(snapshot.npc_personas),
199
+ "has_compose": bool(snapshot.compose),
200
+ "has_payload_files": bool(snapshot.files),
201
+ "live_validated": bool(snapshot.topology.get("live_validated", False)),
202
+ "parent_snapshot_id": snapshot.lineage.parent_snapshot_id,
203
+ "root_snapshot_id": snapshot.lineage.root_snapshot_id,
204
+ "generation_depth": snapshot.lineage.generation_depth,
205
+ "mutation_summary": list(snapshot.lineage.mutation_summary),
206
+ "stored_at": float(time.time() if stored_at is None else stored_at),
207
+ }
208
+
209
+ @staticmethod
210
+ def _load_spec(spec_path: Path) -> SnapshotSpec:
211
+ try:
212
+ raw = json.loads(spec_path.read_text(encoding="utf-8"))
213
+ return SnapshotSpec.model_validate(raw)
214
+ except Exception as exc: # noqa: BLE001
215
+ raise ValueError(f"invalid snapshot spec at {spec_path}: {exc}") from exc
src/open_range/server/app.py CHANGED
@@ -2,6 +2,7 @@
2
 
3
  from __future__ import annotations
4
 
 
5
  import logging
6
  import os
7
 
@@ -10,6 +11,24 @@ from fastapi import FastAPI
10
  logger = logging.getLogger(__name__)
11
 
12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  def create_app() -> FastAPI:
14
  """Create the OpenRange app through the canonical OpenEnv factory."""
15
  from openenv.core.env_server import create_app as create_openenv_app
@@ -37,6 +56,9 @@ def create_app() -> FastAPI:
37
  RangeObservation,
38
  env_name="open_range",
39
  )
 
 
 
40
 
41
  # Mount custom Gradio dashboard at /web if gradio is available
42
  try:
 
2
 
3
  from __future__ import annotations
4
 
5
+ import inspect
6
  import logging
7
  import os
8
 
 
11
  logger = logging.getLogger(__name__)
12
 
13
 
14
+ def _extract_openenv_server(fastapp: FastAPI) -> object | None:
15
+ """Best-effort extraction of OpenEnv's HTTPEnvServer from route closure."""
16
+ for route in fastapp.router.routes:
17
+ if getattr(route, "path", None) != "/ws":
18
+ continue
19
+ endpoint = getattr(route, "endpoint", None)
20
+ if endpoint is None:
21
+ continue
22
+ try:
23
+ closure = inspect.getclosurevars(endpoint)
24
+ except Exception:
25
+ continue
26
+ server = closure.nonlocals.get("self")
27
+ if server is not None and hasattr(server, "active_sessions"):
28
+ return server
29
+ return None
30
+
31
+
32
  def create_app() -> FastAPI:
33
  """Create the OpenRange app through the canonical OpenEnv factory."""
34
  from openenv.core.env_server import create_app as create_openenv_app
 
56
  RangeObservation,
57
  env_name="open_range",
58
  )
59
+ openenv_server = _extract_openenv_server(fastapp)
60
+ if openenv_server is not None:
61
+ fastapp.state.openenv_server = openenv_server
62
 
63
  # Mount custom Gradio dashboard at /web if gradio is available
64
  try:
src/open_range/server/console.py CHANGED
@@ -47,10 +47,20 @@ def get_history(limit: int = 20) -> list[dict[str, Any]]:
47
  @console_router.get("/api/snapshot")
48
  async def api_snapshot(request: Request) -> JSONResponse:
49
  """Return current snapshot metadata (no truth graph or flags)."""
50
- env = _get_env(request)
 
51
  snapshot = env.snapshot
52
  if snapshot is None:
53
- return JSONResponse({"id": None, "tier": None, "hosts": [], "zones": {}, "vuln_count": 0})
 
 
 
 
 
 
 
 
 
54
 
55
  topo = snapshot.topology if isinstance(snapshot.topology, dict) else {}
56
  hosts = topo.get("hosts", [])
@@ -64,19 +74,26 @@ async def api_snapshot(request: Request) -> JSONResponse:
64
  "hosts": hosts,
65
  "zones": zones,
66
  "vuln_count": vuln_count,
 
 
 
67
  })
68
 
69
 
70
  @console_router.get("/api/episode")
71
  async def api_episode(request: Request) -> JSONResponse:
72
  """Return current episode state."""
73
- env = _get_env(request)
 
74
  state = env.state
75
  return JSONResponse({
76
  "step_count": state.step_count,
77
  "flags_found": len(state.flags_found),
78
  "mode": state.mode,
79
  "services_status": state.services_status,
 
 
 
80
  })
81
 
82
 
@@ -98,20 +115,70 @@ async def console_page() -> HTMLResponse:
98
  # ---------------------------------------------------------------------------
99
 
100
 
101
- def _get_env(request: Request) -> Any:
102
- """Retrieve the RangeEnvironment from the app's state.
103
 
104
- The app.py startup stores the environment instance as ``app.state.env``.
105
- If that attribute is missing we fall back to importing a fresh one.
 
 
106
  """
107
  app = request.app
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
108
  if hasattr(app.state, "env"):
109
- return app.state.env
110
- # Fallback: create an ephemeral environment (tests, etc.)
 
 
 
 
 
 
 
 
 
111
  from open_range.server.environment import RangeEnvironment
 
112
  if not hasattr(app.state, "_fallback_env"):
113
  app.state._fallback_env = RangeEnvironment(docker_available=False)
114
- return app.state._fallback_env
 
 
 
 
 
 
 
 
 
 
115
 
116
 
117
  # ---------------------------------------------------------------------------
 
47
  @console_router.get("/api/snapshot")
48
  async def api_snapshot(request: Request) -> JSONResponse:
49
  """Return current snapshot metadata (no truth graph or flags)."""
50
+ ctx = _get_env_context(request)
51
+ env = ctx["env"]
52
  snapshot = env.snapshot
53
  if snapshot is None:
54
+ return JSONResponse({
55
+ "id": None,
56
+ "tier": None,
57
+ "hosts": [],
58
+ "zones": {},
59
+ "vuln_count": 0,
60
+ "state_scope": ctx["state_scope"],
61
+ "session_id": ctx["session_id"],
62
+ "warning": ctx["warning"],
63
+ })
64
 
65
  topo = snapshot.topology if isinstance(snapshot.topology, dict) else {}
66
  hosts = topo.get("hosts", [])
 
74
  "hosts": hosts,
75
  "zones": zones,
76
  "vuln_count": vuln_count,
77
+ "state_scope": ctx["state_scope"],
78
+ "session_id": ctx["session_id"],
79
+ "warning": ctx["warning"],
80
  })
81
 
82
 
83
  @console_router.get("/api/episode")
84
  async def api_episode(request: Request) -> JSONResponse:
85
  """Return current episode state."""
86
+ ctx = _get_env_context(request)
87
+ env = ctx["env"]
88
  state = env.state
89
  return JSONResponse({
90
  "step_count": state.step_count,
91
  "flags_found": len(state.flags_found),
92
  "mode": state.mode,
93
  "services_status": state.services_status,
94
+ "state_scope": ctx["state_scope"],
95
+ "session_id": ctx["session_id"],
96
+ "warning": ctx["warning"],
97
  })
98
 
99
 
 
115
  # ---------------------------------------------------------------------------
116
 
117
 
118
+ def _get_env_context(request: Request) -> dict[str, Any]:
119
+ """Resolve the environment context used by the console endpoints.
120
 
121
+ Priority:
122
+ 1. Active OpenEnv WebSocket session environment (session-scoped truth)
123
+ 2. ``app.state.env`` fallback environment (global app scope)
124
+ 3. Lazily created fallback environment (tests/dev)
125
  """
126
  app = request.app
127
+
128
+ server = getattr(app.state, "openenv_server", None)
129
+ sessions = getattr(server, "_sessions", None)
130
+ if isinstance(sessions, dict) and sessions:
131
+ if len(sessions) == 1:
132
+ session_id, env = next(iter(sessions.items()))
133
+ return {
134
+ "env": env,
135
+ "state_scope": "websocket_session",
136
+ "session_id": session_id,
137
+ "warning": None,
138
+ }
139
+
140
+ session_info = getattr(server, "_session_info", {})
141
+ selected_id = max(
142
+ sessions.keys(),
143
+ key=lambda sid: float(getattr(session_info.get(sid), "last_activity_at", 0.0) or 0.0),
144
+ )
145
+ return {
146
+ "env": sessions[selected_id],
147
+ "state_scope": "websocket_session",
148
+ "session_id": selected_id,
149
+ "warning": (
150
+ f"{len(sessions)} active sessions detected; "
151
+ f"showing the most recently active session ({selected_id})."
152
+ ),
153
+ }
154
+
155
  if hasattr(app.state, "env"):
156
+ return {
157
+ "env": app.state.env,
158
+ "state_scope": "app_state_env",
159
+ "session_id": None,
160
+ "warning": (
161
+ "No active WebSocket session found; console is showing shared "
162
+ "app-state environment data."
163
+ ),
164
+ }
165
+
166
+ # Fallback: create an ephemeral environment (tests/dev)
167
  from open_range.server.environment import RangeEnvironment
168
+
169
  if not hasattr(app.state, "_fallback_env"):
170
  app.state._fallback_env = RangeEnvironment(docker_available=False)
171
+ return {
172
+ "env": app.state._fallback_env,
173
+ "state_scope": "fallback_env",
174
+ "session_id": None,
175
+ "warning": "Console is using a fallback environment (no server session available).",
176
+ }
177
+
178
+
179
+ def _get_env(request: Request) -> Any:
180
+ """Compatibility helper for callers that only need the env object."""
181
+ return _get_env_context(request)["env"]
182
 
183
 
184
  # ---------------------------------------------------------------------------
src/open_range/server/environment.py CHANGED
@@ -1276,6 +1276,62 @@ class RangeEnvironment(Environment[RangeAction, RangeObservation, RangeState]):
1276
  )
1277
  return self._container_name(name)
1278
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1279
  # -----------------------------------------------------------------
1280
  # Core API
1281
  # -----------------------------------------------------------------
@@ -1354,6 +1410,9 @@ class RangeEnvironment(Environment[RangeAction, RangeObservation, RangeState]):
1354
  # Start NPC traffic for this episode
1355
  self._start_npcs(self._snapshot)
1356
 
 
 
 
1357
  # Build initial briefing
1358
  task = self._snapshot.task
1359
  if isinstance(task, dict):
@@ -1439,6 +1498,7 @@ class RangeEnvironment(Environment[RangeAction, RangeObservation, RangeState]):
1439
 
1440
  if cmd_name in meta_handlers:
1441
  obs = meta_handlers[cmd_name](action)
 
1442
  obs = self._apply_rewards(action, obs)
1443
  self._check_termination(obs)
1444
  self._report_if_done(obs)
@@ -1484,6 +1544,7 @@ class RangeEnvironment(Environment[RangeAction, RangeObservation, RangeState]):
1484
 
1485
  # Refresh NPC traffic log for reward computation
1486
  self._refresh_npc_traffic_log()
 
1487
 
1488
  # Build observation
1489
  obs = RangeObservation(
@@ -1620,8 +1681,8 @@ class RangeEnvironment(Environment[RangeAction, RangeObservation, RangeState]):
1620
 
1621
  In production (docker or subprocess mode with real infrastructure),
1622
  queries the SIEM container for actual log-based alerts. Falls back
1623
- to synthetic alerts derived from ALL Red actions when SIEM queries
1624
- return nothing or in unit-test mock mode.
1625
  """
1626
  # Try real SIEM query in non-mock modes
1627
  if self._docker_available is not False or self._execution_mode == "subprocess":
@@ -1629,7 +1690,23 @@ class RangeEnvironment(Environment[RangeAction, RangeObservation, RangeState]):
1629
  if siem_alerts:
1630
  return siem_alerts
1631
 
1632
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1633
 
1634
  # -----------------------------------------------------------------
1635
  # Introspection (for reward computation and debugging)
 
1276
  )
1277
  return self._container_name(name)
1278
 
1279
+ def _topology_host_names(self) -> list[str]:
1280
+ """Return deduplicated host names from the active snapshot topology."""
1281
+ if not self._snapshot or not isinstance(self._snapshot.topology, dict):
1282
+ return []
1283
+ hosts = self._snapshot.topology.get("hosts", [])
1284
+ names: list[str] = []
1285
+ for host in hosts:
1286
+ if isinstance(host, str):
1287
+ candidate = host
1288
+ elif isinstance(host, dict):
1289
+ candidate = host.get("name") or host.get("hostname") or ""
1290
+ else:
1291
+ candidate = ""
1292
+ name = str(candidate).strip()
1293
+ if name and name not in names:
1294
+ names.append(name)
1295
+ return names
1296
+
1297
+ def _refresh_services_status(self) -> None:
1298
+ """Refresh ``state.services_status`` from runtime/container health.
1299
+
1300
+ Availability reward should never rely on an empty status map after reset.
1301
+ When health cannot be verified, host status is marked ``"unknown"``.
1302
+ """
1303
+ host_names = self._topology_host_names()
1304
+ if not host_names:
1305
+ self._state.services_status = {}
1306
+ return
1307
+
1308
+ status_map = {host: "unknown" for host in host_names}
1309
+
1310
+ if self._execution_mode == "docker" and self._docker_available is not False:
1311
+ client = self._get_docker()
1312
+ if client is not None:
1313
+ for host in host_names:
1314
+ container_name = self._container_name(host)
1315
+ try:
1316
+ container = client.containers.get(container_name)
1317
+ status_map[host] = str(getattr(container, "status", "unknown") or "unknown")
1318
+ except Exception:
1319
+ status_map[host] = "down"
1320
+ self._state.services_status = status_map
1321
+ return
1322
+
1323
+ if self._execution_mode == "subprocess" and self._snapshot and self._snapshot.services:
1324
+ checks_by_host: dict[str, list[bool]] = {}
1325
+ for svc in self._snapshot.services:
1326
+ host = str(getattr(svc, "host", "") or "").strip()
1327
+ if not host:
1328
+ continue
1329
+ checks_by_host.setdefault(host, []).append(self._probe_readiness(svc.readiness))
1330
+ for host, checks in checks_by_host.items():
1331
+ status_map[host] = "healthy" if checks and all(checks) else "degraded"
1332
+
1333
+ self._state.services_status = status_map
1334
+
1335
  # -----------------------------------------------------------------
1336
  # Core API
1337
  # -----------------------------------------------------------------
 
1410
  # Start NPC traffic for this episode
1411
  self._start_npcs(self._snapshot)
1412
 
1413
+ # Prime service health map for availability reward grounding.
1414
+ self._refresh_services_status()
1415
+
1416
  # Build initial briefing
1417
  task = self._snapshot.task
1418
  if isinstance(task, dict):
 
1498
 
1499
  if cmd_name in meta_handlers:
1500
  obs = meta_handlers[cmd_name](action)
1501
+ self._refresh_services_status()
1502
  obs = self._apply_rewards(action, obs)
1503
  self._check_termination(obs)
1504
  self._report_if_done(obs)
 
1544
 
1545
  # Refresh NPC traffic log for reward computation
1546
  self._refresh_npc_traffic_log()
1547
+ self._refresh_services_status()
1548
 
1549
  # Build observation
1550
  obs = RangeObservation(
 
1681
 
1682
  In production (docker or subprocess mode with real infrastructure),
1683
  queries the SIEM container for actual log-based alerts. Falls back
1684
+ to synthetic alerts derived from Red action history when SIEM queries
1685
+ return nothing or in unit-test mock mode (capped to recent 20 lines).
1686
  """
1687
  # Try real SIEM query in non-mock modes
1688
  if self._docker_available is not False or self._execution_mode == "subprocess":
 
1690
  if siem_alerts:
1691
  return siem_alerts
1692
 
1693
+ # Fallback: synthesize alerts from recent Red actions so Blue still
1694
+ # receives actionable signal in mock/degraded SIEM paths.
1695
+ synthetic: list[str] = []
1696
+ for record in self._red_history:
1697
+ if record.get("type") in ("hallucinated_flag", "evidence"):
1698
+ continue
1699
+ command = str(record.get("command", "")).strip()
1700
+ if not command:
1701
+ continue
1702
+ step = record.get("step", "?")
1703
+ cmd_name = str(record.get("cmd_name", "")).strip() or _extract_command_name(command)
1704
+ target = str(record.get("target", "")).strip()
1705
+ if target:
1706
+ synthetic.append(f"[synthetic] step={step} cmd={cmd_name} target={target} :: {command}")
1707
+ else:
1708
+ synthetic.append(f"[synthetic] step={step} cmd={cmd_name} :: {command}")
1709
+ return synthetic[-20:]
1710
 
1711
  # -----------------------------------------------------------------
1712
  # Introspection (for reward computation and debugging)
src/open_range/server/runtime.py CHANGED
@@ -68,6 +68,13 @@ _VALIDATOR_PROFILE_ALIASES = {
68
  }
69
  _LIVE_VALIDATOR_PROFILES = {"training"}
70
  _ALLOW_OFFLINE_ADMISSION_ENV = "OPENRANGE_ALLOW_OFFLINE_ADMISSION"
 
 
 
 
 
 
 
71
 
72
 
73
  def _env_flag(name: str, default: bool = False) -> bool:
@@ -320,6 +327,17 @@ def _normalize_validator_profile(profile: str | None) -> str:
320
  return normalized
321
 
322
 
 
 
 
 
 
 
 
 
 
 
 
323
  def _graph_checks(manifest: dict[str, Any]) -> list[Any]:
324
  return [
325
  ManifestComplianceCheck(manifest),
@@ -395,6 +413,7 @@ class ManagedSnapshotRuntime:
395
  compose_runner: ComposeProjectRunner | None = None,
396
  live_validator: ValidatorGate | None = None,
397
  enable_patch_validation: bool = False,
 
398
  mutation_policy: PopulationMutationPolicy | None = None,
399
  ) -> None:
400
  self.manifest_path = (
@@ -418,7 +437,16 @@ class ManagedSnapshotRuntime:
418
  or os.getenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", _DEFAULT_VALIDATOR_PROFILE)
419
  )
420
  self._enforce_validator_profile_policy()
 
 
 
 
421
  self.validator = validator or _build_validator(self.validator_profile, self.manifest)
 
 
 
 
 
422
  self.renderer = SnapshotRenderer()
423
  self.curriculum = CurriculumTracker()
424
  self.pool_size = max(1, pool_size)
@@ -475,6 +503,10 @@ class ManagedSnapshotRuntime:
475
  "OPENRANGE_ENABLE_PATCH_VALIDATION",
476
  default=False,
477
  ),
 
 
 
 
478
  )
479
 
480
  def _enforce_validator_profile_policy(self) -> None:
@@ -516,6 +548,7 @@ class ManagedSnapshotRuntime:
516
  if existing < self.pool_size:
517
  self._top_up_pool(self.pool_size - existing)
518
  self._ensure_existing_artifacts()
 
519
 
520
  available = self.snapshot_count()
521
  if available == 0:
@@ -567,6 +600,7 @@ class ManagedSnapshotRuntime:
567
  if alternative is not None:
568
  stored = alternative
569
 
 
570
  result = RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
571
  self._track_acquisition(result.snapshot_id)
572
  return result
@@ -583,14 +617,15 @@ class ManagedSnapshotRuntime:
583
  if not recent_ids:
584
  return set()
585
 
586
- all_meta = self.list_snapshots()
587
- meta_by_id = {m.get("snapshot_id"): m for m in all_meta}
588
  vuln_types: set[str] = set()
589
  for sid in recent_ids:
590
- meta = meta_by_id.get(sid)
591
- if meta:
592
- vuln_types.update(meta.get("vuln_classes", []))
593
  return vuln_types
 
594
  def _is_diverse(self, snapshot: SnapshotSpec) -> bool:
595
  """Return True if *snapshot* has at least one vuln type not in recent history."""
596
  recent = self._recent_vuln_types()
@@ -608,32 +643,29 @@ class ManagedSnapshotRuntime:
608
  """Try to find a snapshot in the store whose vulns don't fully overlap."""
609
  from open_range.builder.snapshot_store import StoredSnapshot
610
 
611
- all_meta = self.list_snapshots()
612
  recent = self._recent_vuln_types()
613
 
614
- for meta in all_meta:
615
- sid = meta.get("snapshot_id", "")
616
  if sid == exclude_id:
617
  continue
618
- candidate_vulns = set(meta.get("vuln_classes", []))
619
  if not candidate_vulns or not candidate_vulns.issubset(recent):
620
- try:
621
- entry = _run_coro_sync(self.store.get_entry(sid))
622
- return entry
623
- except Exception: # noqa: BLE001
624
- continue
625
  return None
626
 
627
  def get_snapshot(self, snapshot_id: str) -> RuntimeSnapshot:
628
  self.start()
629
  stored = _run_coro_sync(self.store.get_entry(snapshot_id))
 
630
  return RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
631
 
632
  def list_snapshots(self) -> list[dict[str, Any]]:
633
  return _run_coro_sync(self.store.list_snapshots())
634
 
635
  def snapshot_count(self) -> int:
636
- return len(self.list_snapshots())
637
 
638
  def status(self) -> dict[str, Any]:
639
  return {
@@ -644,6 +676,7 @@ class ManagedSnapshotRuntime:
644
  "parent_selection_strategy": self.parent_selection_strategy,
645
  "validator_profile": self.validator_profile,
646
  "allow_insecure_offline_profile": self.allow_insecure_offline_profile,
 
647
  "refill_enabled": self.refill_enabled,
648
  "live_admission_enabled": self.live_admission_enabled,
649
  "snapshot_count": self.snapshot_count(),
@@ -706,30 +739,33 @@ class ManagedSnapshotRuntime:
706
  self._generate_and_store_snapshot()
707
 
708
  def _ensure_existing_artifacts(self) -> None:
709
- for meta in self.list_snapshots():
710
- snapshot_id = str(meta.get("snapshot_id", ""))
711
- if not snapshot_id:
712
- continue
713
  artifacts_dir = self._artifacts_dir(snapshot_id)
714
  if artifacts_dir.exists():
715
  continue
716
- stored = _run_coro_sync(self.store.get_entry(snapshot_id))
717
  materialized = self._materialize_snapshot(stored.snapshot, snapshot_id)
718
  _run_coro_sync(self.store.store(materialized, snapshot_id=snapshot_id))
719
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
720
  def _generate_and_store_snapshot(self) -> str:
721
  last_error: str | None = None
722
- parent_snapshot: SnapshotSpec | None = None
723
- parent_snapshot_id: str | None = None
724
- existing = self.list_snapshots()
725
- if existing:
726
- parent_snapshot_id = str(existing[0].get("snapshot_id", "") or "")
727
- if parent_snapshot_id:
728
- try:
729
- parent_snapshot = _run_coro_sync(self.store.get(parent_snapshot_id))
730
- except FileNotFoundError:
731
- parent_snapshot = None
732
- parent_snapshot_id = None
733
 
734
  for attempt in range(1, self.generation_retries + 1):
735
  context = self._build_context()
 
68
  }
69
  _LIVE_VALIDATOR_PROFILES = {"training"}
70
  _ALLOW_OFFLINE_ADMISSION_ENV = "OPENRANGE_ALLOW_OFFLINE_ADMISSION"
71
+ _PERSISTED_SNAPSHOT_VALIDATION_ALIASES = {
72
+ "none": "trust",
73
+ "disabled": "trust",
74
+ "off": "trust",
75
+ "revalidate": "offline",
76
+ "strict": "offline",
77
+ }
78
 
79
 
80
  def _env_flag(name: str, default: bool = False) -> bool:
 
327
  return normalized
328
 
329
 
330
+ def _normalize_persisted_snapshot_validation(policy: str | None) -> str:
331
+ normalized = (policy or "offline").strip().lower()
332
+ normalized = _PERSISTED_SNAPSHOT_VALIDATION_ALIASES.get(normalized, normalized)
333
+ if normalized not in {"trust", "offline"}:
334
+ raise ValueError(
335
+ f"Unsupported persisted snapshot validation policy {policy!r}. "
336
+ "Expected 'trust' or 'offline'."
337
+ )
338
+ return normalized
339
+
340
+
341
  def _graph_checks(manifest: dict[str, Any]) -> list[Any]:
342
  return [
343
  ManifestComplianceCheck(manifest),
 
413
  compose_runner: ComposeProjectRunner | None = None,
414
  live_validator: ValidatorGate | None = None,
415
  enable_patch_validation: bool = False,
416
+ persisted_snapshot_validation: str | None = None,
417
  mutation_policy: PopulationMutationPolicy | None = None,
418
  ) -> None:
419
  self.manifest_path = (
 
437
  or os.getenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", _DEFAULT_VALIDATOR_PROFILE)
438
  )
439
  self._enforce_validator_profile_policy()
440
+ self.persisted_snapshot_validation = _normalize_persisted_snapshot_validation(
441
+ persisted_snapshot_validation
442
+ or os.getenv("OPENRANGE_PERSISTED_SNAPSHOT_VALIDATION", "offline")
443
+ )
444
  self.validator = validator or _build_validator(self.validator_profile, self.manifest)
445
+ self.persisted_validator = (
446
+ _build_validator("offline", self.manifest)
447
+ if self.persisted_snapshot_validation == "offline"
448
+ else None
449
+ )
450
  self.renderer = SnapshotRenderer()
451
  self.curriculum = CurriculumTracker()
452
  self.pool_size = max(1, pool_size)
 
503
  "OPENRANGE_ENABLE_PATCH_VALIDATION",
504
  default=False,
505
  ),
506
+ persisted_snapshot_validation=os.getenv(
507
+ "OPENRANGE_PERSISTED_SNAPSHOT_VALIDATION",
508
+ "offline",
509
+ ),
510
  )
511
 
512
  def _enforce_validator_profile_policy(self) -> None:
 
548
  if existing < self.pool_size:
549
  self._top_up_pool(self.pool_size - existing)
550
  self._ensure_existing_artifacts()
551
+ self._revalidate_persisted_snapshots()
552
 
553
  available = self.snapshot_count()
554
  if available == 0:
 
600
  if alternative is not None:
601
  stored = alternative
602
 
603
+ self._assert_persisted_snapshot_valid(stored.snapshot_id, stored.snapshot)
604
  result = RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
605
  self._track_acquisition(result.snapshot_id)
606
  return result
 
617
  if not recent_ids:
618
  return set()
619
 
620
+ entries = _run_coro_sync(self.store.list_entries())
621
+ by_id = {entry.snapshot_id: entry for entry in entries}
622
  vuln_types: set[str] = set()
623
  for sid in recent_ids:
624
+ entry = by_id.get(sid)
625
+ if entry:
626
+ vuln_types.update(v.type for v in entry.snapshot.truth_graph.vulns)
627
  return vuln_types
628
+
629
  def _is_diverse(self, snapshot: SnapshotSpec) -> bool:
630
  """Return True if *snapshot* has at least one vuln type not in recent history."""
631
  recent = self._recent_vuln_types()
 
643
  """Try to find a snapshot in the store whose vulns don't fully overlap."""
644
  from open_range.builder.snapshot_store import StoredSnapshot
645
 
646
+ entries = _run_coro_sync(self.store.list_entries())
647
  recent = self._recent_vuln_types()
648
 
649
+ for entry in entries:
650
+ sid = entry.snapshot_id
651
  if sid == exclude_id:
652
  continue
653
+ candidate_vulns = {v.type for v in entry.snapshot.truth_graph.vulns}
654
  if not candidate_vulns or not candidate_vulns.issubset(recent):
655
+ return entry
 
 
 
 
656
  return None
657
 
658
  def get_snapshot(self, snapshot_id: str) -> RuntimeSnapshot:
659
  self.start()
660
  stored = _run_coro_sync(self.store.get_entry(snapshot_id))
661
+ self._assert_persisted_snapshot_valid(stored.snapshot_id, stored.snapshot)
662
  return RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
663
 
664
  def list_snapshots(self) -> list[dict[str, Any]]:
665
  return _run_coro_sync(self.store.list_snapshots())
666
 
667
  def snapshot_count(self) -> int:
668
+ return int(_run_coro_sync(self.store.count_entries()))
669
 
670
  def status(self) -> dict[str, Any]:
671
  return {
 
676
  "parent_selection_strategy": self.parent_selection_strategy,
677
  "validator_profile": self.validator_profile,
678
  "allow_insecure_offline_profile": self.allow_insecure_offline_profile,
679
+ "persisted_snapshot_validation": self.persisted_snapshot_validation,
680
  "refill_enabled": self.refill_enabled,
681
  "live_admission_enabled": self.live_admission_enabled,
682
  "snapshot_count": self.snapshot_count(),
 
739
  self._generate_and_store_snapshot()
740
 
741
  def _ensure_existing_artifacts(self) -> None:
742
+ for stored in _run_coro_sync(self.store.list_entries()):
743
+ snapshot_id = stored.snapshot_id
 
 
744
  artifacts_dir = self._artifacts_dir(snapshot_id)
745
  if artifacts_dir.exists():
746
  continue
 
747
  materialized = self._materialize_snapshot(stored.snapshot, snapshot_id)
748
  _run_coro_sync(self.store.store(materialized, snapshot_id=snapshot_id))
749
 
750
+ def _revalidate_persisted_snapshots(self) -> None:
751
+ if self.persisted_snapshot_validation == "trust":
752
+ return
753
+ for entry in _run_coro_sync(self.store.list_entries()):
754
+ self._assert_persisted_snapshot_valid(entry.snapshot_id, entry.snapshot)
755
+
756
+ def _assert_persisted_snapshot_valid(self, snapshot_id: str, snapshot: SnapshotSpec) -> None:
757
+ if self.persisted_validator is None:
758
+ return
759
+ result = _run_coro_sync(self.persisted_validator.validate(snapshot, ContainerSet()))
760
+ if result.passed:
761
+ return
762
+ raise RuntimeError(
763
+ "persisted snapshot failed startup revalidation "
764
+ f"({snapshot_id}): {self._validation_error(result)}"
765
+ )
766
+
767
  def _generate_and_store_snapshot(self) -> str:
768
  last_error: str | None = None
 
 
 
 
 
 
 
 
 
 
 
769
 
770
  for attempt in range(1, self.generation_retries + 1):
771
  context = self._build_context()
tests/test_builder.py CHANGED
@@ -2,6 +2,7 @@
2
 
3
  import json
4
  import tempfile
 
5
 
6
  import pytest
7
 
@@ -542,6 +543,52 @@ async def test_snapshot_store_list():
542
  assert "snap_b" in ids
543
 
544
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
545
  @pytest.mark.asyncio
546
  async def test_snapshot_store_get_by_id():
547
  from open_range.builder.snapshot_store import SnapshotStore
 
2
 
3
  import json
4
  import tempfile
5
+ from pathlib import Path
6
 
7
  import pytest
8
 
 
543
  assert "snap_b" in ids
544
 
545
 
546
+ @pytest.mark.asyncio
547
+ async def test_snapshot_store_repairs_missing_metadata_from_spec():
548
+ from open_range.builder.snapshot_store import SnapshotStore
549
+
550
+ with tempfile.TemporaryDirectory() as tmpdir:
551
+ store = SnapshotStore(store_dir=tmpdir)
552
+ spec = SnapshotSpec(topology={"hosts": ["web"]})
553
+ await store.store(spec, snapshot_id="snap_a")
554
+
555
+ metadata_path = Path(tmpdir) / "snap_a" / "metadata.json"
556
+ metadata_path.unlink()
557
+
558
+ listing = await store.list_snapshots()
559
+ assert len(listing) == 1
560
+ assert listing[0]["snapshot_id"] == "snap_a"
561
+ assert metadata_path.exists()
562
+
563
+ selected = await store.select_entry(strategy="latest")
564
+ assert selected.snapshot_id == "snap_a"
565
+
566
+
567
+ @pytest.mark.asyncio
568
+ async def test_snapshot_store_ignores_orphan_metadata_without_spec():
569
+ from open_range.builder.snapshot_store import SnapshotStore
570
+
571
+ with tempfile.TemporaryDirectory() as tmpdir:
572
+ store = SnapshotStore(store_dir=tmpdir)
573
+ spec = SnapshotSpec(topology={"hosts": ["web"]})
574
+ await store.store(spec, snapshot_id="snap_real")
575
+
576
+ orphan_dir = Path(tmpdir) / "orphan_meta"
577
+ orphan_dir.mkdir(parents=True, exist_ok=True)
578
+ (orphan_dir / "metadata.json").write_text(
579
+ json.dumps({"snapshot_id": "orphan_meta", "stored_at": 9999999999}),
580
+ encoding="utf-8",
581
+ )
582
+
583
+ listing = await store.list_snapshots()
584
+ ids = {meta["snapshot_id"] for meta in listing}
585
+ assert ids == {"snap_real"}
586
+ assert await store.count_entries() == 1
587
+
588
+ selected = await store.select_entry(strategy="latest")
589
+ assert selected.snapshot_id == "snap_real"
590
+
591
+
592
  @pytest.mark.asyncio
593
  async def test_snapshot_store_get_by_id():
594
  from open_range.builder.snapshot_store import SnapshotStore
tests/test_console_context.py ADDED
@@ -0,0 +1,64 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Unit tests for console environment context resolution."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from types import SimpleNamespace
6
+
7
+ from open_range.server.console import _get_env_context
8
+ from open_range.server.environment import RangeEnvironment
9
+
10
+
11
+ class _Req:
12
+ def __init__(self, app):
13
+ self.app = app
14
+
15
+
16
+ def _app_with_state(**kwargs):
17
+ return SimpleNamespace(state=SimpleNamespace(**kwargs))
18
+
19
+
20
+ def test_prefers_active_websocket_session_env():
21
+ fallback_env = RangeEnvironment(docker_available=False)
22
+ ws_env = RangeEnvironment(docker_available=False)
23
+ server = SimpleNamespace(
24
+ _sessions={"session_a": ws_env},
25
+ _session_info={"session_a": SimpleNamespace(last_activity_at=10.0)},
26
+ )
27
+ request = _Req(_app_with_state(env=fallback_env, openenv_server=server))
28
+
29
+ ctx = _get_env_context(request)
30
+ assert ctx["env"] is ws_env
31
+ assert ctx["state_scope"] == "websocket_session"
32
+ assert ctx["session_id"] == "session_a"
33
+ assert ctx["warning"] is None
34
+
35
+
36
+ def test_uses_app_state_env_when_no_active_session():
37
+ fallback_env = RangeEnvironment(docker_available=False)
38
+ server = SimpleNamespace(_sessions={}, _session_info={})
39
+ request = _Req(_app_with_state(env=fallback_env, openenv_server=server))
40
+
41
+ ctx = _get_env_context(request)
42
+ assert ctx["env"] is fallback_env
43
+ assert ctx["state_scope"] == "app_state_env"
44
+ assert ctx["session_id"] is None
45
+ assert isinstance(ctx["warning"], str) and ctx["warning"]
46
+
47
+
48
+ def test_multiple_sessions_selects_most_recent_and_warns():
49
+ older_env = RangeEnvironment(docker_available=False)
50
+ newer_env = RangeEnvironment(docker_available=False)
51
+ server = SimpleNamespace(
52
+ _sessions={"old": older_env, "new": newer_env},
53
+ _session_info={
54
+ "old": SimpleNamespace(last_activity_at=10.0),
55
+ "new": SimpleNamespace(last_activity_at=20.0),
56
+ },
57
+ )
58
+ request = _Req(_app_with_state(openenv_server=server))
59
+
60
+ ctx = _get_env_context(request)
61
+ assert ctx["env"] is newer_env
62
+ assert ctx["state_scope"] == "websocket_session"
63
+ assert ctx["session_id"] == "new"
64
+ assert "active sessions" in (ctx["warning"] or "").lower()
tests/test_environment.py CHANGED
@@ -66,6 +66,12 @@ class TestReset:
66
  assert isinstance(obs, RangeObservation)
67
  assert env.snapshot is not None
68
 
 
 
 
 
 
 
69
 
70
  class TestTargetResolution:
71
  """Target selection should honor manifest-compiled metadata."""
@@ -187,6 +193,14 @@ class TestBlueStep:
187
  obs = env.step(RangeAction(command="", mode="blue"))
188
  assert obs.stderr != ""
189
 
 
 
 
 
 
 
 
 
190
  def test_step_passes_timeout_override_to_executor(self):
191
  env = RangeEnvironment(docker_available=False)
192
  env.reset(snapshot=_MINIMAL_SNAPSHOT)
 
66
  assert isinstance(obs, RangeObservation)
67
  assert env.snapshot is not None
68
 
69
+ def test_reset_initializes_services_status_from_topology_hosts(self):
70
+ env = RangeEnvironment(docker_available=False)
71
+ env.reset(snapshot=_MINIMAL_SNAPSHOT)
72
+ # In mock mode service health is unknown, but hosts should be tracked.
73
+ assert set(env.state.services_status.keys()) == {"attacker", "siem"}
74
+
75
 
76
  class TestTargetResolution:
77
  """Target selection should honor manifest-compiled metadata."""
 
193
  obs = env.step(RangeAction(command="", mode="blue"))
194
  assert obs.stderr != ""
195
 
196
+ def test_blue_alerts_fall_back_to_synthetic_red_history(self):
197
+ env = RangeEnvironment(docker_available=False)
198
+ env.reset(snapshot=_MINIMAL_SNAPSHOT)
199
+ env.step(RangeAction(command="nmap -sV web", mode="red"))
200
+ obs = env.step(RangeAction(command="tail -n 50 /var/log/siem/all.log", mode="blue"))
201
+ assert obs.alerts
202
+ assert any("synthetic" in alert.lower() for alert in obs.alerts)
203
+
204
  def test_step_passes_timeout_override_to_executor(self):
205
  env = RangeEnvironment(docker_available=False)
206
  env.reset(snapshot=_MINIMAL_SNAPSHOT)
tests/test_runtime.py CHANGED
@@ -2,6 +2,7 @@
2
 
3
  from __future__ import annotations
4
 
 
5
  from pathlib import Path
6
 
7
  import pytest
@@ -120,6 +121,50 @@ class TestManagedSnapshotRuntime:
120
  finally:
121
  runtime.stop()
122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  def test_start_materializes_rendered_artifacts(self, tier1_manifest, tmp_path):
124
  runtime = ManagedSnapshotRuntime(
125
  manifest=tier1_manifest,
 
2
 
3
  from __future__ import annotations
4
 
5
+ import json
6
  from pathlib import Path
7
 
8
  import pytest
 
121
  finally:
122
  runtime.stop()
123
 
124
+ def test_start_revalidates_persisted_snapshots_by_default(self, tier1_manifest, tmp_path):
125
+ store_dir = tmp_path / "snapshots"
126
+
127
+ runtime = ManagedSnapshotRuntime(
128
+ manifest=tier1_manifest,
129
+ store_dir=store_dir,
130
+ validator_profile="offline",
131
+ allow_insecure_offline_profile=True,
132
+ pool_size=1,
133
+ refill_enabled=False,
134
+ )
135
+ runtime.start()
136
+ runtime.stop()
137
+
138
+ spec_path = next(store_dir.glob("*/spec.json"))
139
+ raw = json.loads(spec_path.read_text(encoding="utf-8"))
140
+ raw["truth_graph"]["vulns"] = []
141
+ raw["golden_path"] = []
142
+ raw["flags"] = []
143
+ spec_path.write_text(json.dumps(raw, indent=2), encoding="utf-8")
144
+
145
+ runtime = ManagedSnapshotRuntime(
146
+ manifest=tier1_manifest,
147
+ store_dir=store_dir,
148
+ validator_profile="offline",
149
+ allow_insecure_offline_profile=True,
150
+ pool_size=1,
151
+ refill_enabled=False,
152
+ )
153
+ with pytest.raises(RuntimeError, match="persisted snapshot failed startup revalidation"):
154
+ runtime.start()
155
+
156
+ trust_runtime = ManagedSnapshotRuntime(
157
+ manifest=tier1_manifest,
158
+ store_dir=store_dir,
159
+ validator_profile="offline",
160
+ allow_insecure_offline_profile=True,
161
+ pool_size=1,
162
+ refill_enabled=False,
163
+ persisted_snapshot_validation="trust",
164
+ )
165
+ trust_runtime.start()
166
+ trust_runtime.stop()
167
+
168
  def test_start_materializes_rendered_artifacts(self, tier1_manifest, tmp_path):
169
  runtime = ManagedSnapshotRuntime(
170
  manifest=tier1_manifest,