open-range / tests /test_runtime.py
Lars Talian
fix(runtime): stabilize live admission boot path (#102)
5b99233 unverified
"""Tests for the managed snapshot runtime."""
from __future__ import annotations
import json
import logging
from pathlib import Path
import pytest
import yaml
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.server.compose_runner import BootedSnapshotProject
from open_range.server.environment import RangeEnvironment
from open_range.server.runtime import ManagedSnapshotRuntime
from open_range.validator.validator import ValidationResult
class TestManagedSnapshotRuntime:
def test_from_env_defaults_to_training_and_live(self, tier1_manifest, tmp_path, monkeypatch):
manifest_path = tmp_path / "manifest.yaml"
manifest_path.write_text(yaml.safe_dump(tier1_manifest), encoding="utf-8")
monkeypatch.setenv("OPENRANGE_RUNTIME_MANIFEST", str(manifest_path))
monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots"))
monkeypatch.delenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", raising=False)
monkeypatch.delenv("OPENRANGE_ENABLE_LIVE_ADMISSION", raising=False)
monkeypatch.delenv("OPENRANGE_ALLOW_NON_LIVE_ADMISSION", raising=False)
runtime = ManagedSnapshotRuntime.from_env()
assert runtime.validator_profile == "training"
assert runtime.live_admission_enabled is True
def test_from_env_rejects_non_live_without_explicit_opt_out(
self,
tier1_manifest,
tmp_path,
monkeypatch,
):
manifest_path = tmp_path / "manifest.yaml"
manifest_path.write_text(yaml.safe_dump(tier1_manifest), encoding="utf-8")
monkeypatch.setenv("OPENRANGE_RUNTIME_MANIFEST", str(manifest_path))
monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots"))
monkeypatch.setenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", "offline")
monkeypatch.setenv("OPENRANGE_ENABLE_LIVE_ADMISSION", "0")
monkeypatch.delenv("OPENRANGE_ALLOW_NON_LIVE_ADMISSION", raising=False)
with pytest.raises(RuntimeError, match="OPENRANGE_ALLOW_NON_LIVE_ADMISSION=1"):
ManagedSnapshotRuntime.from_env()
def test_from_env_allows_non_live_with_explicit_opt_out_warning(
self,
tier1_manifest,
tmp_path,
monkeypatch,
caplog,
):
manifest_path = tmp_path / "manifest.yaml"
manifest_path.write_text(yaml.safe_dump(tier1_manifest), encoding="utf-8")
monkeypatch.setenv("OPENRANGE_RUNTIME_MANIFEST", str(manifest_path))
monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots"))
monkeypatch.setenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", "offline")
monkeypatch.setenv("OPENRANGE_ENABLE_LIVE_ADMISSION", "0")
monkeypatch.setenv("OPENRANGE_ALLOW_NON_LIVE_ADMISSION", "1")
with caplog.at_level(logging.WARNING):
runtime = ManagedSnapshotRuntime.from_env()
assert runtime.validator_profile == "offline"
assert runtime.live_admission_enabled is False
assert "strict live admission" in caplog.text
assert "OPENRANGE_ALLOW_NON_LIVE_ADMISSION=1" in caplog.text
def test_offline_validator_profile_includes_static_checks(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
refill_enabled=False,
)
names = [type(check).__name__ for check in runtime.validator.checks]
assert names == [
"ManifestComplianceCheck",
"GraphConsistencyCheck",
"PathSolvabilityCheck",
"GraphEvidenceSufficiencyCheck",
"GraphRewardGroundingCheck",
"StructuralSnapshotCheck",
"TaskFeasibilityCheck",
]
def test_training_validator_profile_includes_live_checks(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="training",
refill_enabled=False,
)
names = [type(check).__name__ for check in runtime.validator.checks]
assert names[:5] == [
"ManifestComplianceCheck",
"GraphConsistencyCheck",
"PathSolvabilityCheck",
"GraphEvidenceSufficiencyCheck",
"GraphRewardGroundingCheck",
]
assert "BuildBootCheck" in names
assert "ExploitabilityCheck" in names
assert "PatchabilityCheck" in names
assert "EvidenceCheck" in names
assert "RewardGroundingCheck" in names
assert "DifficultyCheck" in names
def test_offline_validator_profile_requires_explicit_opt_out(
self,
tier1_manifest,
tmp_path,
monkeypatch,
):
monkeypatch.delenv("OPENRANGE_ALLOW_OFFLINE_ADMISSION", raising=False)
with pytest.raises(RuntimeError, match="OPENRANGE_ALLOW_OFFLINE_ADMISSION=1"):
ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
refill_enabled=False,
)
def test_offline_validator_profile_logs_warning_when_opted_out(
self,
tier1_manifest,
tmp_path,
caplog,
):
caplog.set_level("WARNING")
ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
refill_enabled=False,
)
assert any(
"container-backed admission checks are disabled" in record.message
for record in caplog.records
)
def test_from_env_defaults_to_training_validator_profile(self, tmp_path, monkeypatch):
repo_root = Path(__file__).resolve().parent.parent
monkeypatch.setenv(
"OPENRANGE_RUNTIME_MANIFEST",
str(repo_root / "manifests" / "tier1_basic.yaml"),
)
monkeypatch.setenv("OPENRANGE_SNAPSHOT_DIR", str(tmp_path / "snapshots"))
monkeypatch.delenv("OPENRANGE_RUNTIME_VALIDATOR_PROFILE", raising=False)
monkeypatch.delenv("OPENRANGE_ALLOW_OFFLINE_ADMISSION", raising=False)
runtime = ManagedSnapshotRuntime.from_env()
assert runtime.validator_profile == "training"
def test_start_preloads_snapshot_pool(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=2,
refill_enabled=False,
)
runtime.start()
try:
listing = runtime.list_snapshots()
assert len(listing) == 2
assert all(item["snapshot_id"] for item in listing)
finally:
runtime.stop()
def test_start_revalidates_persisted_snapshots_by_default(self, tier1_manifest, tmp_path):
store_dir = tmp_path / "snapshots"
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=store_dir,
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
runtime.start()
runtime.stop()
spec_path = next(store_dir.glob("*/spec.json"))
raw = json.loads(spec_path.read_text(encoding="utf-8"))
raw["truth_graph"]["vulns"] = []
raw["golden_path"] = []
raw["flags"] = []
spec_path.write_text(json.dumps(raw, indent=2), encoding="utf-8")
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=store_dir,
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
with pytest.raises(RuntimeError, match="persisted snapshot failed startup revalidation"):
runtime.start()
trust_runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=store_dir,
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
persisted_snapshot_validation="trust",
)
trust_runtime.start()
trust_runtime.stop()
def test_start_materializes_rendered_artifacts(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
runtime.start()
try:
admitted = runtime.acquire_snapshot()
artifacts_dir = tmp_path / "snapshots" / admitted.snapshot_id / "artifacts"
assert (artifacts_dir / "docker-compose.yml").exists()
assert (artifacts_dir / "Dockerfile.web").exists()
assert admitted.snapshot.compose
assert "services" in admitted.snapshot.compose
finally:
runtime.stop()
def test_acquire_snapshot_returns_admitted_snapshot(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
selection_strategy="latest",
refill_enabled=False,
)
runtime.start()
try:
admitted = runtime.acquire_snapshot()
assert admitted.snapshot_id
assert admitted.snapshot.truth_graph.vulns
assert admitted.snapshot.flags
finally:
runtime.stop()
def test_get_snapshot_by_id_returns_exact_snapshot(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
runtime.start()
try:
first = runtime.acquire_snapshot()
loaded = runtime.get_snapshot(first.snapshot_id)
assert loaded.snapshot_id == first.snapshot_id
assert loaded.snapshot.flags[0].value == first.snapshot.flags[0].value
assert loaded.snapshot.compose == first.snapshot.compose
finally:
runtime.stop()
def test_start_records_root_and_child_lineage(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=2,
selection_strategy="latest",
parent_selection_strategy="policy",
refill_enabled=False,
)
runtime.start()
try:
listing = runtime.list_snapshots()
assert len(listing) == 2
depths = {item["generation_depth"] for item in listing}
assert 0 in depths
assert 1 in depths
assert any(item["parent_snapshot_id"] for item in listing)
finally:
runtime.stop()
def test_status_reports_parent_selection_strategy(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
parent_selection_strategy="policy",
refill_enabled=False,
)
status = runtime.status()
assert status["parent_selection_strategy"] == "policy"
def test_acquire_snapshot_exposes_lineage_metadata(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=2,
refill_enabled=False,
)
runtime.start()
try:
admitted = runtime.acquire_snapshot()
assert admitted.snapshot.lineage.snapshot_id == admitted.snapshot_id
assert admitted.snapshot.lineage.root_snapshot_id
finally:
runtime.stop()
def test_live_admission_boots_bundle_and_marks_snapshot(
self,
tier1_manifest,
tmp_path,
):
class FakeContainers:
def __init__(self) -> None:
self.exec_calls: list[tuple[str, str]] = []
self.cp_calls: list[tuple[str, str, str]] = []
async def exec(self, container: str, cmd: str, **kwargs) -> str:
self.exec_calls.append((container, cmd))
if "mysql -u root" in cmd:
return "ok"
return "ok"
async def cp(self, container: str, src: str, dest: str) -> None:
self.cp_calls.append((container, src, dest))
async def is_healthy(self, container: str) -> bool:
return True
class FakeComposeRunner:
def __init__(self) -> None:
self.boot_calls: list[tuple[str, str]] = []
self.teardown_calls: list[str] = []
self.containers = FakeContainers()
def boot(self, *, snapshot_id, artifacts_dir, compose):
self.boot_calls.append((snapshot_id, str(artifacts_dir)))
return BootedSnapshotProject(
project_name=f"openrange-{snapshot_id}",
compose_file=artifacts_dir / "docker-compose.yml",
artifacts_dir=artifacts_dir,
containers=self.containers, # type: ignore[arg-type]
)
def teardown(self, project):
self.teardown_calls.append(project.project_name)
class FakeLiveValidator:
async def validate(self, snapshot, containers):
return ValidationResult(
passed=True,
checks=[CheckResult(name="live_checks", passed=True)],
total_time_s=0.0,
)
compose_runner = FakeComposeRunner()
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
live_admission_enabled=True,
compose_runner=compose_runner, # type: ignore[arg-type]
live_validator=FakeLiveValidator(), # type: ignore[arg-type]
)
runtime.start()
try:
admitted = runtime.acquire_snapshot()
assert compose_runner.boot_calls
assert compose_runner.teardown_calls == [f"openrange-{admitted.snapshot_id}"]
assert admitted.snapshot.topology["live_validated"] is True
assert admitted.snapshot.compose["x-project-name"] == f"openrange-{admitted.snapshot_id}"
assert any(dest.endswith("/var/www/portal/index.php") for _, _, dest in compose_runner.containers.cp_calls)
listing = runtime.list_snapshots()
assert listing[0]["live_validated"] is True
finally:
runtime.stop()
def test_training_live_validation_uses_compose_runner_boot(
self,
tier1_manifest,
tmp_path,
):
class FakeContainers:
def __init__(self) -> None:
self.exec_calls: list[tuple[str, str]] = []
self.cp_calls: list[tuple[str, str, str]] = []
async def exec(self, container: str, cmd: str, **kwargs) -> str:
self.exec_calls.append((container, cmd))
return "ok"
async def cp(self, container: str, src: str, dest: str) -> None:
self.cp_calls.append((container, src, dest))
async def is_healthy(self, container: str) -> bool:
return True
class FakeComposeRunner:
def __init__(self) -> None:
self.boot_calls: list[tuple[str, str, str | None]] = []
self.compose_payloads: list[dict[str, object]] = []
self.teardown_calls: list[str] = []
self.containers = FakeContainers()
def boot(self, *, snapshot_id, artifacts_dir, compose, project_name=None):
self.boot_calls.append((snapshot_id, str(artifacts_dir), project_name))
self.compose_payloads.append(compose)
return BootedSnapshotProject(
project_name=project_name or f"openrange-{snapshot_id}",
compose_file=artifacts_dir / "docker-compose.yml",
artifacts_dir=artifacts_dir,
containers=self.containers, # type: ignore[arg-type]
)
def teardown(self, project):
self.teardown_calls.append(project.project_name)
class FakeTrainingValidator:
def __init__(self) -> None:
self.calls: list[tuple[SnapshotSpec, object]] = []
async def validate(self, snapshot, containers):
self.calls.append((snapshot, containers))
return ValidationResult(
passed=True,
checks=[CheckResult(name="build_boot", passed=True)],
total_time_s=0.0,
)
compose_runner = FakeComposeRunner()
validator = FakeTrainingValidator()
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="training",
pool_size=1,
refill_enabled=False,
compose_runner=compose_runner, # type: ignore[arg-type]
validator=validator, # type: ignore[arg-type]
)
runtime.start()
try:
listing = runtime.list_snapshots()
assert listing
assert compose_runner.boot_calls
snapshot_id, artifacts_dir, project_name = compose_runner.boot_calls[0]
assert Path(artifacts_dir).name.startswith(f"openrange-validate-{snapshot_id}")
expected_project_name = runtime._project_name(snapshot_id)
assert project_name == expected_project_name
assert compose_runner.compose_payloads[0]["services"]
assert compose_runner.teardown_calls == [expected_project_name]
assert validator.calls
assert any(
dest.endswith("/var/www/portal/index.php")
for _, _, dest in compose_runner.containers.cp_calls
)
finally:
runtime.stop()
def test_activate_snapshot_project_uses_unique_episode_project_name(
self,
tier1_manifest,
tmp_path,
):
class FakeContainers:
def __init__(self) -> None:
self.exec_calls: list[tuple[str, str]] = []
self.cp_calls: list[tuple[str, str, str]] = []
async def exec(self, container: str, cmd: str, **kwargs) -> str:
self.exec_calls.append((container, cmd))
return "ok"
async def cp(self, container: str, src: str, dest: str) -> None:
self.cp_calls.append((container, src, dest))
async def is_healthy(self, container: str) -> bool:
return True
class FakeComposeRunner:
def __init__(self) -> None:
self.boot_calls: list[tuple[str, str, str | None]] = []
self.teardown_calls: list[str] = []
self.containers = FakeContainers()
def project_name_for(self, snapshot_id: str) -> str:
return f"openrange-{snapshot_id}"[:63]
def boot(self, *, snapshot_id, artifacts_dir, compose, project_name=None):
self.boot_calls.append((snapshot_id, str(artifacts_dir), project_name))
return BootedSnapshotProject(
project_name=project_name or f"openrange-{snapshot_id}",
compose_file=artifacts_dir / "docker-compose.yml",
artifacts_dir=artifacts_dir,
containers=self.containers, # type: ignore[arg-type]
)
def teardown(self, project):
self.teardown_calls.append(project.project_name)
compose_runner = FakeComposeRunner()
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
compose_runner=compose_runner, # type: ignore[arg-type]
)
runtime.start()
try:
admitted = runtime.acquire_snapshot()
project = runtime.activate_snapshot_project(
snapshot_id=admitted.snapshot_id,
snapshot=admitted.snapshot,
episode_id="episode-123",
)
assert compose_runner.boot_calls
_, artifacts_dir, project_name = compose_runner.boot_calls[0]
assert artifacts_dir.endswith(f"{admitted.snapshot_id}/artifacts")
assert project_name == f"openrange-{admitted.snapshot_id}-episode-123"
runtime.teardown_snapshot_project(project)
assert compose_runner.teardown_calls == [project.project_name]
finally:
runtime.stop()
class TestEnvironmentRuntimeIntegration:
def test_reset_rejects_direct_live_docker_overlay(self):
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "siem", "web"]},
compose={"services": {"attacker": {}, "siem": {}, "web": {}}},
files={"web:/var/www/html/index.php": "<?php echo 'hi'; ?>"},
task={"red_briefing": "Go.", "blue_briefing": "Watch."},
)
env = RangeEnvironment(docker_available=True, execution_mode="docker")
env._get_docker = lambda: object() # type: ignore[method-assign]
try:
with pytest.raises(RuntimeError, match="Direct docker snapshot reset is disabled"):
env.reset(snapshot=snapshot)
finally:
env.close()
def test_reset_uses_managed_runtime_snapshot(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
runtime.start()
env = RangeEnvironment(runtime=runtime, docker_available=False)
try:
obs = env.reset()
assert "Range ready" in obs.stdout
assert env.snapshot is not None
assert env.snapshot.truth_graph.vulns
finally:
env.close()
runtime.stop()
def test_reset_snapshot_id_uses_runtime_store(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
runtime.start()
env = RangeEnvironment(runtime=runtime, docker_available=False)
try:
admitted = runtime.acquire_snapshot()
env.reset(snapshot_id=admitted.snapshot_id)
assert env.snapshot is not None
assert env.snapshot.flags[0].value == admitted.snapshot.flags[0].value
finally:
env.close()
runtime.stop()
def test_missing_snapshot_id_raises(self, tier1_manifest, tmp_path):
runtime = ManagedSnapshotRuntime(
manifest=tier1_manifest,
store_dir=tmp_path / "snapshots",
validator_profile="offline",
allow_insecure_offline_profile=True,
pool_size=1,
refill_enabled=False,
)
runtime.start()
env = RangeEnvironment(runtime=runtime, docker_available=False)
try:
with pytest.raises(FileNotFoundError):
env.reset(snapshot_id="missing_snapshot")
finally:
env.close()
runtime.stop()
def test_reset_activates_clean_runtime_project_and_tears_down_previous(self):
class FakeRuntime:
def __init__(self, snapshot: SnapshotSpec) -> None:
self.snapshot = snapshot
self.activate_calls: list[tuple[str, str | None]] = []
self.teardown_calls: list[str] = []
self.recorded: list[bool] = []
def acquire_snapshot(self):
return type(
"Admitted",
(),
{"snapshot_id": "snap-001", "snapshot": self.snapshot},
)()
def get_snapshot(self, snapshot_id: str):
assert snapshot_id == "snap-001"
return self.acquire_snapshot()
def activate_snapshot_project(self, *, snapshot_id, snapshot, episode_id=None):
self.activate_calls.append((snapshot_id, episode_id))
return BootedSnapshotProject(
project_name=f"project-{episode_id}",
compose_file=Path("/tmp/docker-compose.yml"),
artifacts_dir=Path("/tmp"),
containers=ContainerSet(
project_name=f"project-{episode_id}",
container_ids={"web": "cid-web", "attacker": "cid-attacker", "siem": "cid-siem"},
),
)
def teardown_snapshot_project(self, project):
self.teardown_calls.append(project.project_name)
def record_episode_result(self, **kwargs):
self.recorded.append(bool(kwargs.get("completed", False)))
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "siem", "web"]},
compose={"services": {"attacker": {}, "siem": {}, "web": {}}},
task={"red_briefing": "Go.", "blue_briefing": "Watch."},
)
runtime = FakeRuntime(snapshot)
env = RangeEnvironment(
runtime=runtime, # type: ignore[arg-type]
docker_available=True,
execution_mode="docker",
)
env._get_docker = lambda: object() # type: ignore[method-assign]
apply_calls: list[str] = []
env._apply_snapshot = lambda snapshot: apply_calls.append("overlay") # type: ignore[method-assign]
env._start_npcs = lambda snapshot: None # type: ignore[method-assign]
try:
env.reset(episode_id="ep-1")
assert runtime.activate_calls == [("snap-001", "ep-1")]
assert apply_calls == []
assert env.snapshot is not None
assert env.snapshot.compose["x-project-name"] == "project-ep-1"
assert env._container_name("web") == "cid-web"
env.reset(episode_id="ep-2")
assert runtime.activate_calls == [("snap-001", "ep-1"), ("snap-001", "ep-2")]
assert runtime.teardown_calls == ["project-ep-1"]
assert env.snapshot is not None
assert env.snapshot.compose["x-project-name"] == "project-ep-2"
finally:
env.close()
assert runtime.teardown_calls == ["project-ep-1", "project-ep-2"]