"""Tests for the managed snapshot runtime.""" from __future__ import annotations import json import logging from pathlib import Path import pytest import yaml from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec from open_range.server.compose_runner import BootedSnapshotProject from open_range.server.environment import RangeEnvironment from open_range.server.runtime import ManagedSnapshotRuntime from open_range.validator.validator import ValidationResult class TestManagedSnapshotRuntime: def test_from_env_defaults_to_training_and_live(self, tier1_manifest, tmp_path, monkeypatch): manifest_path = tmp_path / "manifest.yaml" manifest_path.write_text(yaml.safe_dump(tier1_manifest), encoding="utf-8") monkeypatch.setenv("OPENRANGE_RUNTIME_MANIFEST", str(manifest_path)) monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots")) monkeypatch.delenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", raising=False) monkeypatch.delenv("OPENRANGE_ENABLE_LIVE_ADMISSION", raising=False) monkeypatch.delenv("OPENRANGE_ALLOW_NON_LIVE_ADMISSION", raising=False) runtime = ManagedSnapshotRuntime.from_env() assert runtime.validator_profile == "training" assert runtime.live_admission_enabled is True def test_from_env_rejects_non_live_without_explicit_opt_out( self, tier1_manifest, tmp_path, monkeypatch, ): manifest_path = tmp_path / "manifest.yaml" manifest_path.write_text(yaml.safe_dump(tier1_manifest), encoding="utf-8") monkeypatch.setenv("OPENRANGE_RUNTIME_MANIFEST", str(manifest_path)) monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots")) monkeypatch.setenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", "offline") monkeypatch.setenv("OPENRANGE_ENABLE_LIVE_ADMISSION", "0") monkeypatch.delenv("OPENRANGE_ALLOW_NON_LIVE_ADMISSION", raising=False) with pytest.raises(RuntimeError, match="OPENRANGE_ALLOW_NON_LIVE_ADMISSION=1"): ManagedSnapshotRuntime.from_env() def test_from_env_allows_non_live_with_explicit_opt_out_warning( self, tier1_manifest, tmp_path, monkeypatch, caplog, ): manifest_path = tmp_path / "manifest.yaml" manifest_path.write_text(yaml.safe_dump(tier1_manifest), encoding="utf-8") monkeypatch.setenv("OPENRANGE_RUNTIME_MANIFEST", str(manifest_path)) monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots")) monkeypatch.setenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", "offline") monkeypatch.setenv("OPENRANGE_ENABLE_LIVE_ADMISSION", "0") monkeypatch.setenv("OPENRANGE_ALLOW_NON_LIVE_ADMISSION", "1") with caplog.at_level(logging.WARNING): runtime = ManagedSnapshotRuntime.from_env() assert runtime.validator_profile == "offline" assert runtime.live_admission_enabled is False assert "strict live admission" in caplog.text assert "OPENRANGE_ALLOW_NON_LIVE_ADMISSION=1" in caplog.text def test_offline_validator_profile_includes_static_checks(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, refill_enabled=False, ) names = [type(check).__name__ for check in runtime.validator.checks] assert names == [ "ManifestComplianceCheck", "GraphConsistencyCheck", "PathSolvabilityCheck", "GraphEvidenceSufficiencyCheck", "GraphRewardGroundingCheck", "StructuralSnapshotCheck", "TaskFeasibilityCheck", ] def test_training_validator_profile_includes_live_checks(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="training", refill_enabled=False, ) names = [type(check).__name__ for check in runtime.validator.checks] assert names[:5] == [ "ManifestComplianceCheck", "GraphConsistencyCheck", "PathSolvabilityCheck", "GraphEvidenceSufficiencyCheck", "GraphRewardGroundingCheck", ] assert "BuildBootCheck" in names assert "ExploitabilityCheck" in names assert "PatchabilityCheck" in names assert "EvidenceCheck" in names assert "RewardGroundingCheck" in names assert "DifficultyCheck" in names def test_offline_validator_profile_requires_explicit_opt_out( self, tier1_manifest, tmp_path, monkeypatch, ): monkeypatch.delenv("OPENRANGE_ALLOW_OFFLINE_ADMISSION", raising=False) with pytest.raises(RuntimeError, match="OPENRANGE_ALLOW_OFFLINE_ADMISSION=1"): ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", refill_enabled=False, ) def test_offline_validator_profile_logs_warning_when_opted_out( self, tier1_manifest, tmp_path, caplog, ): caplog.set_level("WARNING") ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, refill_enabled=False, ) assert any( "container-backed admission checks are disabled" in record.message for record in caplog.records ) def test_from_env_defaults_to_training_validator_profile(self, tmp_path, monkeypatch): repo_root = Path(__file__).resolve().parent.parent monkeypatch.setenv( "OPENRANGE_RUNTIME_MANIFEST", str(repo_root / "manifests" / "tier1_basic.yaml"), ) monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots")) monkeypatch.delenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", raising=False) monkeypatch.delenv("OPENRANGE_ALLOW_OFFLINE_ADMISSION", raising=False) runtime = ManagedSnapshotRuntime.from_env() assert runtime.validator_profile == "training" def test_start_preloads_snapshot_pool(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=2, refill_enabled=False, ) runtime.start() try: listing = runtime.list_snapshots() assert len(listing) == 2 assert all(item["snapshot_id"] for item in listing) finally: runtime.stop() def test_start_revalidates_persisted_snapshots_by_default(self, tier1_manifest, tmp_path): store_dir = tmp_path / "snapshots" runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=store_dir, validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) runtime.start() runtime.stop() spec_path = next(store_dir.glob("*/spec.json")) raw = json.loads(spec_path.read_text(encoding="utf-8")) raw["truth_graph"]["vulns"] = [] raw["golden_path"] = [] raw["flags"] = [] spec_path.write_text(json.dumps(raw, indent=2), encoding="utf-8") runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=store_dir, validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) with pytest.raises(RuntimeError, match="persisted snapshot failed startup revalidation"): runtime.start() trust_runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=store_dir, validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, persisted_snapshot_validation="trust", ) trust_runtime.start() trust_runtime.stop() def test_start_materializes_rendered_artifacts(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) runtime.start() try: admitted = runtime.acquire_snapshot() artifacts_dir = tmp_path / "snapshots" / admitted.snapshot_id / "artifacts" assert (artifacts_dir / "docker-compose.yml").exists() assert (artifacts_dir / "Dockerfile.web").exists() assert admitted.snapshot.compose assert "services" in admitted.snapshot.compose finally: runtime.stop() def test_acquire_snapshot_returns_admitted_snapshot(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, selection_strategy="latest", refill_enabled=False, ) runtime.start() try: admitted = runtime.acquire_snapshot() assert admitted.snapshot_id assert admitted.snapshot.truth_graph.vulns assert admitted.snapshot.flags finally: runtime.stop() def test_get_snapshot_by_id_returns_exact_snapshot(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) runtime.start() try: first = runtime.acquire_snapshot() loaded = runtime.get_snapshot(first.snapshot_id) assert loaded.snapshot_id == first.snapshot_id assert loaded.snapshot.flags[0].value == first.snapshot.flags[0].value assert loaded.snapshot.compose == first.snapshot.compose finally: runtime.stop() def test_start_records_root_and_child_lineage(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=2, selection_strategy="latest", parent_selection_strategy="policy", refill_enabled=False, ) runtime.start() try: listing = runtime.list_snapshots() assert len(listing) == 2 depths = {item["generation_depth"] for item in listing} assert 0 in depths assert 1 in depths assert any(item["parent_snapshot_id"] for item in listing) finally: runtime.stop() def test_status_reports_parent_selection_strategy(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, parent_selection_strategy="policy", refill_enabled=False, ) status = runtime.status() assert status["parent_selection_strategy"] == "policy" def test_acquire_snapshot_exposes_lineage_metadata(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=2, refill_enabled=False, ) runtime.start() try: admitted = runtime.acquire_snapshot() assert admitted.snapshot.lineage.snapshot_id == admitted.snapshot_id assert admitted.snapshot.lineage.root_snapshot_id finally: runtime.stop() def test_live_admission_boots_bundle_and_marks_snapshot( self, tier1_manifest, tmp_path, ): class FakeContainers: def __init__(self) -> None: self.exec_calls: list[tuple[str, str]] = [] self.cp_calls: list[tuple[str, str, str]] = [] async def exec(self, container: str, cmd: str, **kwargs) -> str: self.exec_calls.append((container, cmd)) if "mysql -u root" in cmd: return "ok" return "ok" async def cp(self, container: str, src: str, dest: str) -> None: self.cp_calls.append((container, src, dest)) async def is_healthy(self, container: str) -> bool: return True class FakeComposeRunner: def __init__(self) -> None: self.boot_calls: list[tuple[str, str]] = [] self.teardown_calls: list[str] = [] self.containers = FakeContainers() def boot(self, *, snapshot_id, artifacts_dir, compose): self.boot_calls.append((snapshot_id, str(artifacts_dir))) return BootedSnapshotProject( project_name=f"openrange-{snapshot_id}", compose_file=artifacts_dir / "docker-compose.yml", artifacts_dir=artifacts_dir, containers=self.containers, # type: ignore[arg-type] ) def teardown(self, project): self.teardown_calls.append(project.project_name) class FakeLiveValidator: async def validate(self, snapshot, containers): return ValidationResult( passed=True, checks=[CheckResult(name="live_checks", passed=True)], total_time_s=0.0, ) compose_runner = FakeComposeRunner() runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, live_admission_enabled=True, compose_runner=compose_runner, # type: ignore[arg-type] live_validator=FakeLiveValidator(), # type: ignore[arg-type] ) runtime.start() try: admitted = runtime.acquire_snapshot() assert compose_runner.boot_calls assert compose_runner.teardown_calls == [f"openrange-{admitted.snapshot_id}"] assert admitted.snapshot.topology["live_validated"] is True assert admitted.snapshot.compose["x-project-name"] == f"openrange-{admitted.snapshot_id}" assert any(dest.endswith("/var/www/portal/index.php") for _, _, dest in compose_runner.containers.cp_calls) listing = runtime.list_snapshots() assert listing[0]["live_validated"] is True finally: runtime.stop() def test_training_live_validation_uses_compose_runner_boot( self, tier1_manifest, tmp_path, ): class FakeContainers: def __init__(self) -> None: self.exec_calls: list[tuple[str, str]] = [] self.cp_calls: list[tuple[str, str, str]] = [] async def exec(self, container: str, cmd: str, **kwargs) -> str: self.exec_calls.append((container, cmd)) return "ok" async def cp(self, container: str, src: str, dest: str) -> None: self.cp_calls.append((container, src, dest)) async def is_healthy(self, container: str) -> bool: return True class FakeComposeRunner: def __init__(self) -> None: self.boot_calls: list[tuple[str, str, str | None]] = [] self.compose_payloads: list[dict[str, object]] = [] self.teardown_calls: list[str] = [] self.containers = FakeContainers() def boot(self, *, snapshot_id, artifacts_dir, compose, project_name=None): self.boot_calls.append((snapshot_id, str(artifacts_dir), project_name)) self.compose_payloads.append(compose) return BootedSnapshotProject( project_name=project_name or f"openrange-{snapshot_id}", compose_file=artifacts_dir / "docker-compose.yml", artifacts_dir=artifacts_dir, containers=self.containers, # type: ignore[arg-type] ) def teardown(self, project): self.teardown_calls.append(project.project_name) class FakeTrainingValidator: def __init__(self) -> None: self.calls: list[tuple[SnapshotSpec, object]] = [] async def validate(self, snapshot, containers): self.calls.append((snapshot, containers)) return ValidationResult( passed=True, checks=[CheckResult(name="build_boot", passed=True)], total_time_s=0.0, ) compose_runner = FakeComposeRunner() validator = FakeTrainingValidator() runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="training", pool_size=1, refill_enabled=False, compose_runner=compose_runner, # type: ignore[arg-type] validator=validator, # type: ignore[arg-type] ) runtime.start() try: listing = runtime.list_snapshots() assert listing assert compose_runner.boot_calls snapshot_id, artifacts_dir, project_name = compose_runner.boot_calls[0] assert Path(artifacts_dir).name.startswith(f"openrange-validate-{snapshot_id}") expected_project_name = runtime._project_name(snapshot_id) assert project_name == expected_project_name assert compose_runner.compose_payloads[0]["services"] assert compose_runner.teardown_calls == [expected_project_name] assert validator.calls assert any( dest.endswith("/var/www/portal/index.php") for _, _, dest in compose_runner.containers.cp_calls ) finally: runtime.stop() def test_activate_snapshot_project_uses_unique_episode_project_name( self, tier1_manifest, tmp_path, ): class FakeContainers: def __init__(self) -> None: self.exec_calls: list[tuple[str, str]] = [] self.cp_calls: list[tuple[str, str, str]] = [] async def exec(self, container: str, cmd: str, **kwargs) -> str: self.exec_calls.append((container, cmd)) return "ok" async def cp(self, container: str, src: str, dest: str) -> None: self.cp_calls.append((container, src, dest)) async def is_healthy(self, container: str) -> bool: return True class FakeComposeRunner: def __init__(self) -> None: self.boot_calls: list[tuple[str, str, str | None]] = [] self.teardown_calls: list[str] = [] self.containers = FakeContainers() def project_name_for(self, snapshot_id: str) -> str: return f"openrange-{snapshot_id}"[:63] def boot(self, *, snapshot_id, artifacts_dir, compose, project_name=None): self.boot_calls.append((snapshot_id, str(artifacts_dir), project_name)) return BootedSnapshotProject( project_name=project_name or f"openrange-{snapshot_id}", compose_file=artifacts_dir / "docker-compose.yml", artifacts_dir=artifacts_dir, containers=self.containers, # type: ignore[arg-type] ) def teardown(self, project): self.teardown_calls.append(project.project_name) compose_runner = FakeComposeRunner() runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, compose_runner=compose_runner, # type: ignore[arg-type] ) runtime.start() try: admitted = runtime.acquire_snapshot() project = runtime.activate_snapshot_project( snapshot_id=admitted.snapshot_id, snapshot=admitted.snapshot, episode_id="episode-123", ) assert compose_runner.boot_calls _, artifacts_dir, project_name = compose_runner.boot_calls[0] assert artifacts_dir.endswith(f"{admitted.snapshot_id}/artifacts") assert project_name == f"openrange-{admitted.snapshot_id}-episode-123" runtime.teardown_snapshot_project(project) assert compose_runner.teardown_calls == [project.project_name] finally: runtime.stop() class TestEnvironmentRuntimeIntegration: def test_reset_rejects_direct_live_docker_overlay(self): snapshot = SnapshotSpec( topology={"hosts": ["attacker", "siem", "web"]}, compose={"services": {"attacker": {}, "siem": {}, "web": {}}}, files={"web:/var/www/html/index.php": ""}, task={"red_briefing": "Go.", "blue_briefing": "Watch."}, ) env = RangeEnvironment(docker_available=True, execution_mode="docker") env._get_docker = lambda: object() # type: ignore[method-assign] try: with pytest.raises(RuntimeError, match="Direct docker snapshot reset is disabled"): env.reset(snapshot=snapshot) finally: env.close() def test_reset_uses_managed_runtime_snapshot(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) runtime.start() env = RangeEnvironment(runtime=runtime, docker_available=False) try: obs = env.reset() assert "Range ready" in obs.stdout assert env.snapshot is not None assert env.snapshot.truth_graph.vulns finally: env.close() runtime.stop() def test_reset_snapshot_id_uses_runtime_store(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) runtime.start() env = RangeEnvironment(runtime=runtime, docker_available=False) try: admitted = runtime.acquire_snapshot() env.reset(snapshot_id=admitted.snapshot_id) assert env.snapshot is not None assert env.snapshot.flags[0].value == admitted.snapshot.flags[0].value finally: env.close() runtime.stop() def test_missing_snapshot_id_raises(self, tier1_manifest, tmp_path): runtime = ManagedSnapshotRuntime( manifest=tier1_manifest, store_dir=tmp_path / "snapshots", validator_profile="offline", allow_insecure_offline_profile=True, pool_size=1, refill_enabled=False, ) runtime.start() env = RangeEnvironment(runtime=runtime, docker_available=False) try: with pytest.raises(FileNotFoundError): env.reset(snapshot_id="missing_snapshot") finally: env.close() runtime.stop() def test_reset_activates_clean_runtime_project_and_tears_down_previous(self): class FakeRuntime: def __init__(self, snapshot: SnapshotSpec) -> None: self.snapshot = snapshot self.activate_calls: list[tuple[str, str | None]] = [] self.teardown_calls: list[str] = [] self.recorded: list[bool] = [] def acquire_snapshot(self): return type( "Admitted", (), {"snapshot_id": "snap-001", "snapshot": self.snapshot}, )() def get_snapshot(self, snapshot_id: str): assert snapshot_id == "snap-001" return self.acquire_snapshot() def activate_snapshot_project(self, *, snapshot_id, snapshot, episode_id=None): self.activate_calls.append((snapshot_id, episode_id)) return BootedSnapshotProject( project_name=f"project-{episode_id}", compose_file=Path("/tmp/docker-compose.yml"), artifacts_dir=Path("/tmp"), containers=ContainerSet( project_name=f"project-{episode_id}", container_ids={"web": "cid-web", "attacker": "cid-attacker", "siem": "cid-siem"}, ), ) def teardown_snapshot_project(self, project): self.teardown_calls.append(project.project_name) def record_episode_result(self, **kwargs): self.recorded.append(bool(kwargs.get("completed", False))) snapshot = SnapshotSpec( topology={"hosts": ["attacker", "siem", "web"]}, compose={"services": {"attacker": {}, "siem": {}, "web": {}}}, task={"red_briefing": "Go.", "blue_briefing": "Watch."}, ) runtime = FakeRuntime(snapshot) env = RangeEnvironment( runtime=runtime, # type: ignore[arg-type] docker_available=True, execution_mode="docker", ) env._get_docker = lambda: object() # type: ignore[method-assign] apply_calls: list[str] = [] env._apply_snapshot = lambda snapshot: apply_calls.append("overlay") # type: ignore[method-assign] env._start_npcs = lambda snapshot: None # type: ignore[method-assign] try: env.reset(episode_id="ep-1") assert runtime.activate_calls == [("snap-001", "ep-1")] assert apply_calls == [] assert env.snapshot is not None assert env.snapshot.compose["x-project-name"] == "project-ep-1" assert env._container_name("web") == "cid-web" env.reset(episode_id="ep-2") assert runtime.activate_calls == [("snap-001", "ep-1"), ("snap-001", "ep-2")] assert runtime.teardown_calls == ["project-ep-1"] assert env.snapshot is not None assert env.snapshot.compose["x-project-name"] == "project-ep-2" finally: env.close() assert runtime.teardown_calls == ["project-ep-1", "project-ep-2"]