Lars Talian commited on
Commit
d2dbaf9
·
1 Parent(s): 0d2d9bf

runtime: canonicalize snapshot store semantics and revalidate persisted snapshots

Browse files
src/open_range/builder/snapshot_store.py CHANGED
@@ -46,7 +46,6 @@ class SnapshotStore:
46
  The snapshot ID string.
47
  """
48
  if snapshot_id is None:
49
- hosts = snapshot.topology.get("hosts", [])
50
  vuln_types = [v.type for v in snapshot.truth_graph.vulns]
51
  snapshot_id = (
52
  f"snap_{'_'.join(vuln_types[:3])}"
@@ -63,21 +62,7 @@ class SnapshotStore:
63
  )
64
 
65
  # Write metadata sidecar for fast listing
66
- meta = {
67
- "snapshot_id": snapshot_id,
68
- "vuln_classes": [v.type for v in snapshot.truth_graph.vulns],
69
- "golden_path_steps": len(snapshot.golden_path),
70
- "flag_count": len(snapshot.flags),
71
- "npc_count": len(snapshot.npc_personas),
72
- "has_compose": bool(snapshot.compose),
73
- "has_payload_files": bool(snapshot.files),
74
- "live_validated": bool(snapshot.topology.get("live_validated", False)),
75
- "parent_snapshot_id": snapshot.lineage.parent_snapshot_id,
76
- "root_snapshot_id": snapshot.lineage.root_snapshot_id,
77
- "generation_depth": snapshot.lineage.generation_depth,
78
- "mutation_summary": list(snapshot.lineage.mutation_summary),
79
- "stored_at": time.time(),
80
- }
81
  meta_path = snap_dir / "metadata.json"
82
  meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
83
 
@@ -113,38 +98,69 @@ class SnapshotStore:
113
  else: # latest -- sort by parent dir mtime
114
  chosen = max(spec_files, key=lambda p: p.stat().st_mtime)
115
 
116
- raw = json.loads(chosen.read_text(encoding="utf-8"))
117
  return StoredSnapshot(
118
  snapshot_id=chosen.parent.name,
119
- snapshot=SnapshotSpec.model_validate(raw),
120
  )
121
 
122
  async def list_entries(self) -> list[StoredSnapshot]:
123
  """Return every stored snapshot plus its persisted ID."""
124
  entries: list[StoredSnapshot] = []
125
  for spec_path in sorted(self.store_dir.glob("*/spec.json")):
126
- raw = json.loads(spec_path.read_text(encoding="utf-8"))
127
  entries.append(
128
  StoredSnapshot(
129
  snapshot_id=spec_path.parent.name,
130
- snapshot=SnapshotSpec.model_validate(raw),
131
  )
132
  )
133
  return entries
134
 
 
 
 
 
135
  async def list_snapshots(self) -> list[dict[str, Any]]:
136
  """List all snapshots with their metadata.
137
 
138
  Returns:
139
  List of metadata dicts, sorted by stored_at descending.
140
  """
 
 
141
  results: list[dict[str, Any]] = []
142
- for meta_path in self.store_dir.glob("*/metadata.json"):
 
 
143
  try:
144
- meta = json.loads(meta_path.read_text(encoding="utf-8"))
145
- results.append(meta)
 
 
 
 
 
 
 
146
  except (json.JSONDecodeError, OSError) as exc:
147
- logger.warning("Skipping corrupt metadata: %s (%s)", meta_path, exc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
  results.sort(key=lambda m: m.get("stored_at", 0), reverse=True)
150
  return results
@@ -158,8 +174,7 @@ class SnapshotStore:
158
  spec_path = self.store_dir / snapshot_id / "spec.json"
159
  if not spec_path.exists():
160
  raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
161
- raw = json.loads(spec_path.read_text(encoding="utf-8"))
162
- return SnapshotSpec.model_validate(raw)
163
 
164
  async def get_entry(self, snapshot_id: str) -> StoredSnapshot:
165
  """Load a specific snapshot plus its ID."""
@@ -167,3 +182,34 @@ class SnapshotStore:
167
  snapshot_id=snapshot_id,
168
  snapshot=await self.get(snapshot_id),
169
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  The snapshot ID string.
47
  """
48
  if snapshot_id is None:
 
49
  vuln_types = [v.type for v in snapshot.truth_graph.vulns]
50
  snapshot_id = (
51
  f"snap_{'_'.join(vuln_types[:3])}"
 
62
  )
63
 
64
  # Write metadata sidecar for fast listing
65
+ meta = self._metadata_from_snapshot(snapshot_id, snapshot)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
  meta_path = snap_dir / "metadata.json"
67
  meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
68
 
 
98
  else: # latest -- sort by parent dir mtime
99
  chosen = max(spec_files, key=lambda p: p.stat().st_mtime)
100
 
 
101
  return StoredSnapshot(
102
  snapshot_id=chosen.parent.name,
103
+ snapshot=self._load_spec(chosen),
104
  )
105
 
106
  async def list_entries(self) -> list[StoredSnapshot]:
107
  """Return every stored snapshot plus its persisted ID."""
108
  entries: list[StoredSnapshot] = []
109
  for spec_path in sorted(self.store_dir.glob("*/spec.json")):
 
110
  entries.append(
111
  StoredSnapshot(
112
  snapshot_id=spec_path.parent.name,
113
+ snapshot=self._load_spec(spec_path),
114
  )
115
  )
116
  return entries
117
 
118
+ async def count_entries(self) -> int:
119
+ """Return canonical snapshot count based on persisted specs."""
120
+ return len(await self.list_entries())
121
+
122
  async def list_snapshots(self) -> list[dict[str, Any]]:
123
  """List all snapshots with their metadata.
124
 
125
  Returns:
126
  List of metadata dicts, sorted by stored_at descending.
127
  """
128
+ entries = await self.list_entries()
129
+ spec_ids = {entry.snapshot_id for entry in entries}
130
  results: list[dict[str, Any]] = []
131
+ for entry in entries:
132
+ meta_path = self.store_dir / entry.snapshot_id / "metadata.json"
133
+ existing_meta: dict[str, Any] | None = None
134
  try:
135
+ if meta_path.exists():
136
+ loaded = json.loads(meta_path.read_text(encoding="utf-8"))
137
+ if isinstance(loaded, dict):
138
+ existing_meta = loaded
139
+ else:
140
+ logger.warning(
141
+ "Repairing metadata sidecar with non-object payload: %s",
142
+ meta_path,
143
+ )
144
  except (json.JSONDecodeError, OSError) as exc:
145
+ logger.warning("Repairing corrupt metadata: %s (%s)", meta_path, exc)
146
+
147
+ stored_at = existing_meta.get("stored_at") if existing_meta else None
148
+ canonical = self._metadata_from_snapshot(
149
+ entry.snapshot_id,
150
+ entry.snapshot,
151
+ stored_at=stored_at if isinstance(stored_at, (int, float)) else None,
152
+ )
153
+ results.append(canonical)
154
+
155
+ if existing_meta != canonical:
156
+ try:
157
+ meta_path.write_text(json.dumps(canonical, indent=2), encoding="utf-8")
158
+ except OSError as exc:
159
+ logger.warning("Failed to repair metadata sidecar %s (%s)", meta_path, exc)
160
+
161
+ for meta_path in self.store_dir.glob("*/metadata.json"):
162
+ if meta_path.parent.name not in spec_ids:
163
+ logger.warning("Ignoring orphan metadata without spec.json: %s", meta_path)
164
 
165
  results.sort(key=lambda m: m.get("stored_at", 0), reverse=True)
166
  return results
 
174
  spec_path = self.store_dir / snapshot_id / "spec.json"
175
  if not spec_path.exists():
176
  raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
177
+ return self._load_spec(spec_path)
 
178
 
179
  async def get_entry(self, snapshot_id: str) -> StoredSnapshot:
180
  """Load a specific snapshot plus its ID."""
 
182
  snapshot_id=snapshot_id,
183
  snapshot=await self.get(snapshot_id),
184
  )
185
+
186
+ @staticmethod
187
+ def _metadata_from_snapshot(
188
+ snapshot_id: str,
189
+ snapshot: SnapshotSpec,
190
+ *,
191
+ stored_at: float | None = None,
192
+ ) -> dict[str, Any]:
193
+ return {
194
+ "snapshot_id": snapshot_id,
195
+ "vuln_classes": [v.type for v in snapshot.truth_graph.vulns],
196
+ "golden_path_steps": len(snapshot.golden_path),
197
+ "flag_count": len(snapshot.flags),
198
+ "npc_count": len(snapshot.npc_personas),
199
+ "has_compose": bool(snapshot.compose),
200
+ "has_payload_files": bool(snapshot.files),
201
+ "live_validated": bool(snapshot.topology.get("live_validated", False)),
202
+ "parent_snapshot_id": snapshot.lineage.parent_snapshot_id,
203
+ "root_snapshot_id": snapshot.lineage.root_snapshot_id,
204
+ "generation_depth": snapshot.lineage.generation_depth,
205
+ "mutation_summary": list(snapshot.lineage.mutation_summary),
206
+ "stored_at": float(time.time() if stored_at is None else stored_at),
207
+ }
208
+
209
+ @staticmethod
210
+ def _load_spec(spec_path: Path) -> SnapshotSpec:
211
+ try:
212
+ raw = json.loads(spec_path.read_text(encoding="utf-8"))
213
+ return SnapshotSpec.model_validate(raw)
214
+ except Exception as exc: # noqa: BLE001
215
+ raise ValueError(f"invalid snapshot spec at {spec_path}: {exc}") from exc
src/open_range/server/runtime.py CHANGED
@@ -66,6 +66,13 @@ _VALIDATOR_PROFILE_ALIASES = {
66
  "strict": "training",
67
  }
68
  _LIVE_VALIDATOR_PROFILES = {"training"}
 
 
 
 
 
 
 
69
 
70
 
71
  def _env_flag(name: str, default: bool = False) -> bool:
@@ -318,6 +325,17 @@ def _normalize_validator_profile(profile: str | None) -> str:
318
  return normalized
319
 
320
 
 
 
 
 
 
 
 
 
 
 
 
321
  def _graph_checks(manifest: dict[str, Any]) -> list[Any]:
322
  return [
323
  ManifestComplianceCheck(manifest),
@@ -392,6 +410,7 @@ class ManagedSnapshotRuntime:
392
  compose_runner: ComposeProjectRunner | None = None,
393
  live_validator: ValidatorGate | None = None,
394
  enable_patch_validation: bool = False,
 
395
  mutation_policy: PopulationMutationPolicy | None = None,
396
  ) -> None:
397
  self.manifest_path = (
@@ -408,7 +427,16 @@ class ManagedSnapshotRuntime:
408
  self.validator_profile = _normalize_validator_profile(
409
  validator_profile or os.getenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", "offline")
410
  )
 
 
 
 
411
  self.validator = validator or _build_validator(self.validator_profile, self.manifest)
 
 
 
 
 
412
  self.renderer = SnapshotRenderer()
413
  self.curriculum = CurriculumTracker()
414
  self.pool_size = max(1, pool_size)
@@ -458,6 +486,10 @@ class ManagedSnapshotRuntime:
458
  "OPENRANGE_ENABLE_PATCH_VALIDATION",
459
  default=False,
460
  ),
 
 
 
 
461
  )
462
 
463
  @staticmethod
@@ -477,6 +509,7 @@ class ManagedSnapshotRuntime:
477
  if existing < self.pool_size:
478
  self._top_up_pool(self.pool_size - existing)
479
  self._ensure_existing_artifacts()
 
480
 
481
  available = self.snapshot_count()
482
  if available == 0:
@@ -528,6 +561,7 @@ class ManagedSnapshotRuntime:
528
  if alternative is not None:
529
  stored = alternative
530
 
 
531
  result = RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
532
  self._track_acquisition(result.snapshot_id)
533
  return result
@@ -544,14 +578,15 @@ class ManagedSnapshotRuntime:
544
  if not recent_ids:
545
  return set()
546
 
547
- all_meta = self.list_snapshots()
548
- meta_by_id = {m.get("snapshot_id"): m for m in all_meta}
549
  vuln_types: set[str] = set()
550
  for sid in recent_ids:
551
- meta = meta_by_id.get(sid)
552
- if meta:
553
- vuln_types.update(meta.get("vuln_classes", []))
554
  return vuln_types
 
555
  def _is_diverse(self, snapshot: SnapshotSpec) -> bool:
556
  """Return True if *snapshot* has at least one vuln type not in recent history."""
557
  recent = self._recent_vuln_types()
@@ -569,32 +604,29 @@ class ManagedSnapshotRuntime:
569
  """Try to find a snapshot in the store whose vulns don't fully overlap."""
570
  from open_range.builder.snapshot_store import StoredSnapshot
571
 
572
- all_meta = self.list_snapshots()
573
  recent = self._recent_vuln_types()
574
 
575
- for meta in all_meta:
576
- sid = meta.get("snapshot_id", "")
577
  if sid == exclude_id:
578
  continue
579
- candidate_vulns = set(meta.get("vuln_classes", []))
580
  if not candidate_vulns or not candidate_vulns.issubset(recent):
581
- try:
582
- entry = _run_coro_sync(self.store.get_entry(sid))
583
- return entry
584
- except Exception: # noqa: BLE001
585
- continue
586
  return None
587
 
588
  def get_snapshot(self, snapshot_id: str) -> RuntimeSnapshot:
589
  self.start()
590
  stored = _run_coro_sync(self.store.get_entry(snapshot_id))
 
591
  return RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
592
 
593
  def list_snapshots(self) -> list[dict[str, Any]]:
594
  return _run_coro_sync(self.store.list_snapshots())
595
 
596
  def snapshot_count(self) -> int:
597
- return len(self.list_snapshots())
598
 
599
  def status(self) -> dict[str, Any]:
600
  return {
@@ -604,6 +636,7 @@ class ManagedSnapshotRuntime:
604
  "selection_strategy": self.selection_strategy,
605
  "parent_selection_strategy": self.parent_selection_strategy,
606
  "validator_profile": self.validator_profile,
 
607
  "refill_enabled": self.refill_enabled,
608
  "live_admission_enabled": self.live_admission_enabled,
609
  "snapshot_count": self.snapshot_count(),
@@ -666,30 +699,33 @@ class ManagedSnapshotRuntime:
666
  self._generate_and_store_snapshot()
667
 
668
  def _ensure_existing_artifacts(self) -> None:
669
- for meta in self.list_snapshots():
670
- snapshot_id = str(meta.get("snapshot_id", ""))
671
- if not snapshot_id:
672
- continue
673
  artifacts_dir = self._artifacts_dir(snapshot_id)
674
  if artifacts_dir.exists():
675
  continue
676
- stored = _run_coro_sync(self.store.get_entry(snapshot_id))
677
  materialized = self._materialize_snapshot(stored.snapshot, snapshot_id)
678
  _run_coro_sync(self.store.store(materialized, snapshot_id=snapshot_id))
679
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
680
  def _generate_and_store_snapshot(self) -> str:
681
  last_error: str | None = None
682
- parent_snapshot: SnapshotSpec | None = None
683
- parent_snapshot_id: str | None = None
684
- existing = self.list_snapshots()
685
- if existing:
686
- parent_snapshot_id = str(existing[0].get("snapshot_id", "") or "")
687
- if parent_snapshot_id:
688
- try:
689
- parent_snapshot = _run_coro_sync(self.store.get(parent_snapshot_id))
690
- except FileNotFoundError:
691
- parent_snapshot = None
692
- parent_snapshot_id = None
693
 
694
  for attempt in range(1, self.generation_retries + 1):
695
  context = self._build_context()
 
66
  "strict": "training",
67
  }
68
  _LIVE_VALIDATOR_PROFILES = {"training"}
69
+ _PERSISTED_SNAPSHOT_VALIDATION_ALIASES = {
70
+ "none": "trust",
71
+ "disabled": "trust",
72
+ "off": "trust",
73
+ "revalidate": "offline",
74
+ "strict": "offline",
75
+ }
76
 
77
 
78
  def _env_flag(name: str, default: bool = False) -> bool:
 
325
  return normalized
326
 
327
 
328
+ def _normalize_persisted_snapshot_validation(policy: str | None) -> str:
329
+ normalized = (policy or "offline").strip().lower()
330
+ normalized = _PERSISTED_SNAPSHOT_VALIDATION_ALIASES.get(normalized, normalized)
331
+ if normalized not in {"trust", "offline"}:
332
+ raise ValueError(
333
+ f"Unsupported persisted snapshot validation policy {policy!r}. "
334
+ "Expected 'trust' or 'offline'."
335
+ )
336
+ return normalized
337
+
338
+
339
  def _graph_checks(manifest: dict[str, Any]) -> list[Any]:
340
  return [
341
  ManifestComplianceCheck(manifest),
 
410
  compose_runner: ComposeProjectRunner | None = None,
411
  live_validator: ValidatorGate | None = None,
412
  enable_patch_validation: bool = False,
413
+ persisted_snapshot_validation: str | None = None,
414
  mutation_policy: PopulationMutationPolicy | None = None,
415
  ) -> None:
416
  self.manifest_path = (
 
427
  self.validator_profile = _normalize_validator_profile(
428
  validator_profile or os.getenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", "offline")
429
  )
430
+ self.persisted_snapshot_validation = _normalize_persisted_snapshot_validation(
431
+ persisted_snapshot_validation
432
+ or os.getenv("OPENRANGE_PERSISTED_SNAPSHOT_VALIDATION", "offline")
433
+ )
434
  self.validator = validator or _build_validator(self.validator_profile, self.manifest)
435
+ self.persisted_validator = (
436
+ _build_validator("offline", self.manifest)
437
+ if self.persisted_snapshot_validation == "offline"
438
+ else None
439
+ )
440
  self.renderer = SnapshotRenderer()
441
  self.curriculum = CurriculumTracker()
442
  self.pool_size = max(1, pool_size)
 
486
  "OPENRANGE_ENABLE_PATCH_VALIDATION",
487
  default=False,
488
  ),
489
+ persisted_snapshot_validation=os.getenv(
490
+ "OPENRANGE_PERSISTED_SNAPSHOT_VALIDATION",
491
+ "offline",
492
+ ),
493
  )
494
 
495
  @staticmethod
 
509
  if existing < self.pool_size:
510
  self._top_up_pool(self.pool_size - existing)
511
  self._ensure_existing_artifacts()
512
+ self._revalidate_persisted_snapshots()
513
 
514
  available = self.snapshot_count()
515
  if available == 0:
 
561
  if alternative is not None:
562
  stored = alternative
563
 
564
+ self._assert_persisted_snapshot_valid(stored.snapshot_id, stored.snapshot)
565
  result = RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
566
  self._track_acquisition(result.snapshot_id)
567
  return result
 
578
  if not recent_ids:
579
  return set()
580
 
581
+ entries = _run_coro_sync(self.store.list_entries())
582
+ by_id = {entry.snapshot_id: entry for entry in entries}
583
  vuln_types: set[str] = set()
584
  for sid in recent_ids:
585
+ entry = by_id.get(sid)
586
+ if entry:
587
+ vuln_types.update(v.type for v in entry.snapshot.truth_graph.vulns)
588
  return vuln_types
589
+
590
  def _is_diverse(self, snapshot: SnapshotSpec) -> bool:
591
  """Return True if *snapshot* has at least one vuln type not in recent history."""
592
  recent = self._recent_vuln_types()
 
604
  """Try to find a snapshot in the store whose vulns don't fully overlap."""
605
  from open_range.builder.snapshot_store import StoredSnapshot
606
 
607
+ entries = _run_coro_sync(self.store.list_entries())
608
  recent = self._recent_vuln_types()
609
 
610
+ for entry in entries:
611
+ sid = entry.snapshot_id
612
  if sid == exclude_id:
613
  continue
614
+ candidate_vulns = {v.type for v in entry.snapshot.truth_graph.vulns}
615
  if not candidate_vulns or not candidate_vulns.issubset(recent):
616
+ return entry
 
 
 
 
617
  return None
618
 
619
  def get_snapshot(self, snapshot_id: str) -> RuntimeSnapshot:
620
  self.start()
621
  stored = _run_coro_sync(self.store.get_entry(snapshot_id))
622
+ self._assert_persisted_snapshot_valid(stored.snapshot_id, stored.snapshot)
623
  return RuntimeSnapshot(snapshot_id=stored.snapshot_id, snapshot=stored.snapshot)
624
 
625
  def list_snapshots(self) -> list[dict[str, Any]]:
626
  return _run_coro_sync(self.store.list_snapshots())
627
 
628
  def snapshot_count(self) -> int:
629
+ return int(_run_coro_sync(self.store.count_entries()))
630
 
631
  def status(self) -> dict[str, Any]:
632
  return {
 
636
  "selection_strategy": self.selection_strategy,
637
  "parent_selection_strategy": self.parent_selection_strategy,
638
  "validator_profile": self.validator_profile,
639
+ "persisted_snapshot_validation": self.persisted_snapshot_validation,
640
  "refill_enabled": self.refill_enabled,
641
  "live_admission_enabled": self.live_admission_enabled,
642
  "snapshot_count": self.snapshot_count(),
 
699
  self._generate_and_store_snapshot()
700
 
701
  def _ensure_existing_artifacts(self) -> None:
702
+ for stored in _run_coro_sync(self.store.list_entries()):
703
+ snapshot_id = stored.snapshot_id
 
 
704
  artifacts_dir = self._artifacts_dir(snapshot_id)
705
  if artifacts_dir.exists():
706
  continue
 
707
  materialized = self._materialize_snapshot(stored.snapshot, snapshot_id)
708
  _run_coro_sync(self.store.store(materialized, snapshot_id=snapshot_id))
709
 
710
+ def _revalidate_persisted_snapshots(self) -> None:
711
+ if self.persisted_snapshot_validation == "trust":
712
+ return
713
+ for entry in _run_coro_sync(self.store.list_entries()):
714
+ self._assert_persisted_snapshot_valid(entry.snapshot_id, entry.snapshot)
715
+
716
+ def _assert_persisted_snapshot_valid(self, snapshot_id: str, snapshot: SnapshotSpec) -> None:
717
+ if self.persisted_validator is None:
718
+ return
719
+ result = _run_coro_sync(self.persisted_validator.validate(snapshot, ContainerSet()))
720
+ if result.passed:
721
+ return
722
+ raise RuntimeError(
723
+ "persisted snapshot failed startup revalidation "
724
+ f"({snapshot_id}): {self._validation_error(result)}"
725
+ )
726
+
727
  def _generate_and_store_snapshot(self) -> str:
728
  last_error: str | None = None
 
 
 
 
 
 
 
 
 
 
 
729
 
730
  for attempt in range(1, self.generation_retries + 1):
731
  context = self._build_context()
tests/test_builder.py CHANGED
@@ -2,6 +2,7 @@
2
 
3
  import json
4
  import tempfile
 
5
 
6
  import pytest
7
 
@@ -542,6 +543,52 @@ async def test_snapshot_store_list():
542
  assert "snap_b" in ids
543
 
544
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
545
  @pytest.mark.asyncio
546
  async def test_snapshot_store_get_by_id():
547
  from open_range.builder.snapshot_store import SnapshotStore
 
2
 
3
  import json
4
  import tempfile
5
+ from pathlib import Path
6
 
7
  import pytest
8
 
 
543
  assert "snap_b" in ids
544
 
545
 
546
+ @pytest.mark.asyncio
547
+ async def test_snapshot_store_repairs_missing_metadata_from_spec():
548
+ from open_range.builder.snapshot_store import SnapshotStore
549
+
550
+ with tempfile.TemporaryDirectory() as tmpdir:
551
+ store = SnapshotStore(store_dir=tmpdir)
552
+ spec = SnapshotSpec(topology={"hosts": ["web"]})
553
+ await store.store(spec, snapshot_id="snap_a")
554
+
555
+ metadata_path = Path(tmpdir) / "snap_a" / "metadata.json"
556
+ metadata_path.unlink()
557
+
558
+ listing = await store.list_snapshots()
559
+ assert len(listing) == 1
560
+ assert listing[0]["snapshot_id"] == "snap_a"
561
+ assert metadata_path.exists()
562
+
563
+ selected = await store.select_entry(strategy="latest")
564
+ assert selected.snapshot_id == "snap_a"
565
+
566
+
567
+ @pytest.mark.asyncio
568
+ async def test_snapshot_store_ignores_orphan_metadata_without_spec():
569
+ from open_range.builder.snapshot_store import SnapshotStore
570
+
571
+ with tempfile.TemporaryDirectory() as tmpdir:
572
+ store = SnapshotStore(store_dir=tmpdir)
573
+ spec = SnapshotSpec(topology={"hosts": ["web"]})
574
+ await store.store(spec, snapshot_id="snap_real")
575
+
576
+ orphan_dir = Path(tmpdir) / "orphan_meta"
577
+ orphan_dir.mkdir(parents=True, exist_ok=True)
578
+ (orphan_dir / "metadata.json").write_text(
579
+ json.dumps({"snapshot_id": "orphan_meta", "stored_at": 9999999999}),
580
+ encoding="utf-8",
581
+ )
582
+
583
+ listing = await store.list_snapshots()
584
+ ids = {meta["snapshot_id"] for meta in listing}
585
+ assert ids == {"snap_real"}
586
+ assert await store.count_entries() == 1
587
+
588
+ selected = await store.select_entry(strategy="latest")
589
+ assert selected.snapshot_id == "snap_real"
590
+
591
+
592
  @pytest.mark.asyncio
593
  async def test_snapshot_store_get_by_id():
594
  from open_range.builder.snapshot_store import SnapshotStore
tests/test_runtime.py CHANGED
@@ -2,6 +2,7 @@
2
 
3
  from __future__ import annotations
4
 
 
5
  from pathlib import Path
6
 
7
  import pytest
@@ -70,6 +71,44 @@ class TestManagedSnapshotRuntime:
70
  finally:
71
  runtime.stop()
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  def test_start_materializes_rendered_artifacts(self, tier1_manifest, tmp_path):
74
  runtime = ManagedSnapshotRuntime(
75
  manifest=tier1_manifest,
 
2
 
3
  from __future__ import annotations
4
 
5
+ import json
6
  from pathlib import Path
7
 
8
  import pytest
 
71
  finally:
72
  runtime.stop()
73
 
74
+ def test_start_revalidates_persisted_snapshots_by_default(self, tier1_manifest, tmp_path):
75
+ store_dir = tmp_path / "snapshots"
76
+
77
+ runtime = ManagedSnapshotRuntime(
78
+ manifest=tier1_manifest,
79
+ store_dir=store_dir,
80
+ pool_size=1,
81
+ refill_enabled=False,
82
+ )
83
+ runtime.start()
84
+ runtime.stop()
85
+
86
+ spec_path = next(store_dir.glob("*/spec.json"))
87
+ raw = json.loads(spec_path.read_text(encoding="utf-8"))
88
+ raw["truth_graph"]["vulns"] = []
89
+ raw["golden_path"] = []
90
+ raw["flags"] = []
91
+ spec_path.write_text(json.dumps(raw, indent=2), encoding="utf-8")
92
+
93
+ runtime = ManagedSnapshotRuntime(
94
+ manifest=tier1_manifest,
95
+ store_dir=store_dir,
96
+ pool_size=1,
97
+ refill_enabled=False,
98
+ )
99
+ with pytest.raises(RuntimeError, match="persisted snapshot failed startup revalidation"):
100
+ runtime.start()
101
+
102
+ trust_runtime = ManagedSnapshotRuntime(
103
+ manifest=tier1_manifest,
104
+ store_dir=store_dir,
105
+ pool_size=1,
106
+ refill_enabled=False,
107
+ persisted_snapshot_validation="trust",
108
+ )
109
+ trust_runtime.start()
110
+ trust_runtime.stop()
111
+
112
  def test_start_materializes_rendered_artifacts(self, tier1_manifest, tmp_path):
113
  runtime = ManagedSnapshotRuntime(
114
  manifest=tier1_manifest,