open-range / tests /test_validator.py
Lars Talian
fix(runtime): stabilize live admission boot path (#102)
5b99233 unverified
"""Tests for validator checks — all run without Docker via mock_containers."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from open_range.protocols import (
CheckResult,
EvidenceItem,
ExecResult,
ExploitStep,
FlagSpec,
GoldenPathStep,
MutationOp,
MutationPlan,
NPCPersona,
SnapshotSpec,
TaskSpec,
TruthGraph,
Vulnerability,
)
from open_range.validator.validator import ValidatorGate, ValidationResult
@pytest.mark.asyncio
async def test_manifest_compliance_rejects_illegal_mutation_plan(
tier1_manifest,
sample_snapshot_spec,
mock_containers,
):
from open_range.validator.manifest_compliance import ManifestComplianceCheck
spec = sample_snapshot_spec.model_copy(deep=True)
spec.mutation_plan = MutationPlan(
parent_snapshot_id="root_snap",
ops=[
MutationOp(
mutation_id="illegal1",
op_type="seed_vuln",
target_selector={"host": "web"},
params={"vuln_type": "totally_fake_bug"},
)
],
)
spec.lineage.parent_snapshot_id = "root_snap"
spec.lineage.generation_depth = 1
result = await ManifestComplianceCheck(tier1_manifest).check(spec, mock_containers)
assert result.passed is False
assert "illegal family" in result.error
@pytest.mark.asyncio
async def test_manifest_compliance_rejects_incompatible_seed_vuln_host(
tier1_manifest,
sample_snapshot_spec,
mock_containers,
):
from open_range.validator.manifest_compliance import ManifestComplianceCheck
spec = sample_snapshot_spec.model_copy(deep=True)
spec.mutation_plan = MutationPlan(
parent_snapshot_id="root_snap",
ops=[
MutationOp(
mutation_id="illegal_host",
op_type="seed_vuln",
target_selector={"host": "firewall"},
params={
"vuln_type": "path_traversal",
"template_id": "vuln_path_traversal",
"required_services": ["nginx", "php-fpm"],
},
)
],
)
spec.lineage.parent_snapshot_id = "root_snap"
spec.lineage.generation_depth = 1
result = await ManifestComplianceCheck(tier1_manifest).check(spec, mock_containers)
assert result.passed is False
assert "incompatible with required services" in result.error
@pytest.mark.asyncio
async def test_graph_consistency_rejects_missing_parent_lineage(sample_snapshot_spec, mock_containers):
from open_range.validator.graph_consistency import GraphConsistencyCheck
spec = sample_snapshot_spec.model_copy(deep=True)
spec.mutation_plan = MutationPlan(
parent_snapshot_id="root_snap",
ops=[
MutationOp(
mutation_id="mut1",
op_type="add_benign_noise",
target_selector={"location": "siem:noise.log"},
params={"location": "siem:noise.log"},
)
],
)
spec.lineage.generation_depth = 1
spec.lineage.parent_snapshot_id = None
result = await GraphConsistencyCheck().check(spec, mock_containers)
assert result.passed is False
assert "missing parent_snapshot_id" in result.error
@pytest.mark.asyncio
async def test_path_solvability_passes_for_reachable_flag_host(mock_containers):
from open_range.protocols import EvidenceItem, TruthGraph, Vulnerability
from open_range.validator.path_solvability import PathSolvabilityCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"zones": {"external": ["attacker"], "dmz": ["web"], "internal": ["db"]},
"dependency_edges": [
{"source": "attacker", "target": "web"},
{"source": "web", "target": "db"},
],
"host_details": {
"attacker": {"services": ["nmap"]},
"web": {"services": ["nginx"]},
"db": {"services": ["mysql"]},
},
},
truth_graph=TruthGraph(
vulns=[Vulnerability(id="v1", type="sqli", host="web", service="nginx")],
),
flags=[FlagSpec(id="f1", value="FLAG{ok}", path="/var/flags/flag1.txt", host="db")],
evidence_spec=[EvidenceItem(type="log_entry", location="siem:/var/log/siem/all.log")],
golden_path=[GoldenPathStep(step=1, command="nmap web", expect_in_stdout="80/tcp")],
task=TaskSpec(red_briefing="go", blue_briefing="watch"),
)
result = await PathSolvabilityCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_graph_evidence_sufficiency_fails_without_supporting_hosts(mock_containers):
from open_range.protocols import TruthGraph, Vulnerability
from open_range.validator.graph_evidence import GraphEvidenceSufficiencyCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"zones": {"external": ["attacker"], "dmz": ["web"], "internal": ["db"]},
"dependency_edges": [{"source": "attacker", "target": "web"}],
"host_details": {
"attacker": {"services": ["nmap"]},
"web": {"services": ["nginx"]},
"db": {"services": ["mysql"]},
},
},
truth_graph=TruthGraph(
vulns=[Vulnerability(id="v1", type="sqli", host="db", service="mysql")],
),
flags=[FlagSpec(id="f1", value="FLAG{db}", path="/var/flags/flag1.txt", host="db")],
evidence_spec=[EvidenceItem(type="log_entry", location="web:/var/log/access.log")],
golden_path=[GoldenPathStep(step=1, command="scan", expect_in_stdout="ok")],
task=TaskSpec(red_briefing="go", blue_briefing="watch"),
)
result = await GraphEvidenceSufficiencyCheck().check(spec, mock_containers)
assert result.passed is False
assert "no supporting evidence host" in result.error
@pytest.mark.asyncio
async def test_graph_reward_grounding_fails_when_flag_host_unreachable(mock_containers):
from open_range.protocols import TruthGraph, Vulnerability
from open_range.validator.graph_reward_grounding import GraphRewardGroundingCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"zones": {"external": ["attacker"], "dmz": ["web"], "internal": ["db"]},
"dependency_edges": [{"source": "attacker", "target": "web"}],
"host_details": {
"attacker": {"services": ["nmap"]},
"web": {"services": ["nginx"]},
"db": {"services": ["mysql"]},
},
},
truth_graph=TruthGraph(
vulns=[Vulnerability(id="v1", type="sqli", host="web", service="nginx")],
),
flags=[FlagSpec(id="f1", value="FLAG{db}", path="/var/flags/flag1.txt", host="db")],
evidence_spec=[EvidenceItem(type="log_entry", location="siem:/var/log/siem/all.log")],
golden_path=[GoldenPathStep(step=1, command="scan", expect_in_stdout="ok")],
task=TaskSpec(red_briefing="go", blue_briefing="watch"),
)
result = await GraphRewardGroundingCheck().check(spec, mock_containers)
assert result.passed is False
assert "not reachable from any vuln host" in result.error
@pytest.mark.asyncio
async def test_graph_checks_allow_trust_based_host_pivots(mock_containers):
from open_range.validator.graph_reward_grounding import GraphRewardGroundingCheck
from open_range.validator.path_solvability import PathSolvabilityCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"zones": {"external": ["attacker"], "dmz": ["web"], "internal": ["db"]},
"dependency_edges": [{"source": "attacker", "target": "web"}],
"trust_edges": [{"source": "websvc", "target": "dbsvc", "type": "credential_reuse"}],
"host_details": {
"attacker": {"services": ["nmap"]},
"web": {"services": ["nginx"]},
"db": {"services": ["mysql"]},
},
"principal_catalog": {
"websvc": {"username": "websvc", "hosts": ["web"], "is_login_account": False},
"dbsvc": {"username": "dbsvc", "hosts": ["db"], "is_login_account": False},
},
},
truth_graph=TruthGraph(
vulns=[Vulnerability(id="v1", type="credential_reuse", host="web", service="nginx")],
),
flags=[FlagSpec(id="f1", value="FLAG{db}", path="/var/flags/flag1.txt", host="db")],
evidence_spec=[EvidenceItem(type="log_entry", location="db:/var/log/mysql.log")],
golden_path=[GoldenPathStep(step=1, command="scan", expect_in_stdout="ok")],
task=TaskSpec(red_briefing="go", blue_briefing="watch"),
)
path_result = await PathSolvabilityCheck().check(spec, mock_containers)
reward_result = await GraphRewardGroundingCheck().check(spec, mock_containers)
assert path_result.passed is True
assert reward_result.passed is True
# ---------------------------------------------------------------------------
# Check 1: BuildBoot
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_build_boot_passes_when_all_healthy(sample_snapshot_spec, mock_containers):
from open_range.validator.build_boot import BuildBootCheck
for h in sample_snapshot_spec.topology["hosts"]:
mock_containers.healthy.add(h)
result = await BuildBootCheck().check(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert result.name == "build_boot"
@pytest.mark.asyncio
async def test_build_boot_fails_when_host_unhealthy(sample_snapshot_spec, mock_containers):
from open_range.validator.build_boot import BuildBootCheck
# Mark all except 'web' as healthy
for h in sample_snapshot_spec.topology["hosts"]:
if h != "web":
mock_containers.healthy.add(h)
result = await BuildBootCheck().check(sample_snapshot_spec, mock_containers)
assert result.passed is False
assert "web" in result.error
# ---------------------------------------------------------------------------
# Check 2: Exploitability
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_exploitability_passes_when_golden_path_succeeds(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
# Use a minimal spec with distinct commands to avoid substring collisions.
spec = SnapshotSpec(
golden_path=[
GoldenPathStep(step=1, command="nmap -sV 10.0.1.0/24", expect_in_stdout="80/tcp"),
GoldenPathStep(step=2, command="curl http://target/page", expect_in_stdout="OK"),
],
)
for step in spec.golden_path:
mock_containers.exec_results[("attacker", step.command)] = step.expect_in_stdout
result = await ExploitabilityCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_exploitability_normalizes_whitespace_for_tool_output(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
spec = SnapshotSpec(
golden_path=[
GoldenPathStep(
step=1,
command="nmap -sV 10.0.1.0/24",
expect_in_stdout="80/tcp open http",
),
],
)
mock_containers.exec_results[("attacker", "nmap -sV 10.0.1.0/24")] = (
"80/tcp open http nginx"
)
result = await ExploitabilityCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_exploitability_fails_when_step_output_missing(
sample_snapshot_spec, mock_containers
):
from open_range.validator.exploitability import ExploitabilityCheck
# Register all steps but make one return wrong output
for step in sample_snapshot_spec.golden_path:
host = getattr(step, "host", None) or "attacker"
mock_containers.exec_results[(host, step.command)] = step.expect_in_stdout
# Override one step to return wrong output
first_step = sample_snapshot_spec.golden_path[0]
host = getattr(first_step, "host", None) or "attacker"
mock_containers.exec_results[(host, first_step.command)] = "totally wrong output"
result = await ExploitabilityCheck().check(sample_snapshot_spec, mock_containers)
assert result.passed is False
assert "failed" in result.error
@pytest.mark.asyncio
async def test_exploitability_fails_on_empty_golden_path(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
spec = SnapshotSpec(golden_path=[])
result = await ExploitabilityCheck().check(spec, mock_containers)
assert result.passed is False
assert "empty" in result.error
@pytest.mark.asyncio
async def test_exploitability_skips_meta_commands(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
spec = SnapshotSpec(
golden_path=[
GoldenPathStep(step=1, command="curl http://web/", expect_in_stdout="Welcome"),
GoldenPathStep(step=2, command="submit_flag FLAG{abc}", expect_in_stdout="correct"),
],
)
mock_containers.exec_results[("attacker", "curl http://web/")] = "Welcome"
result = await ExploitabilityCheck().check(spec, mock_containers)
assert result.passed is True
assert result.details["skipped_steps"] == [2]
@pytest.mark.asyncio
async def test_exploitability_fails_when_expectation_missing_in_strict_mode(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
spec = SnapshotSpec(
golden_path=[
GoldenPathStep(step=1, command="curl http://web/", expect_in_stdout=""),
],
)
mock_containers.exec_results[("attacker", "curl http://web/")] = "Welcome"
result = await ExploitabilityCheck().check(spec, mock_containers)
assert result.passed is False
assert result.details["require_expectation"] is True
assert result.details["failed_steps"][0]["error"] == (
"golden path step 1 has no expect_in_stdout"
)
@pytest.mark.asyncio
async def test_exploitability_allows_missing_expectation_in_lenient_mode(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
spec = SnapshotSpec(
golden_path=[
GoldenPathStep(step=1, command="curl http://web/", expect_in_stdout=""),
],
)
mock_containers.exec_results[("attacker", "curl http://web/")] = "Welcome"
result = await ExploitabilityCheck(require_expectation=False).check(spec, mock_containers)
assert result.passed is True
assert result.details["require_expectation"] is False
assert result.details["unvalidated_steps"] == [1]
@pytest.mark.asyncio
async def test_exploitability_fails_on_nonzero_exit_even_with_expected_output(mock_containers):
from open_range.validator.exploitability import ExploitabilityCheck
spec = SnapshotSpec(
golden_path=[
GoldenPathStep(step=1, command="curl http://web/", expect_in_stdout="Welcome"),
],
)
mock_containers.exec_results[("attacker", "curl http://web/")] = "Welcome"
mock_containers.exec_status[("attacker", "curl http://web/")] = 7
result = await ExploitabilityCheck().check(spec, mock_containers)
assert result.passed is False
assert result.details["failed_steps"][0]["error"] == "command failed with exit_code=7"
# ---------------------------------------------------------------------------
# Check 3: Patchability
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_patchability_fails_when_no_vulns(mock_containers):
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(truth_graph=TruthGraph(vulns=[]))
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
@pytest.mark.asyncio
async def test_patchability_passes_when_patch_breaks_exploit(mock_containers):
"""Patch applied -> golden path step no longer produces expected output -> PASS."""
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="sed -i 's/unsafe/safe/' /var/www/app.php",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=exploit"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=exploit",
expect_in_stdout="SECRET_DATA",
),
],
)
# Remediation exec succeeds (exit 0 by default)
mock_containers.exec_results[("web", "sed")] = ""
# After patch, golden path step returns DIFFERENT output (no SECRET_DATA)
mock_containers.exec_results[("attacker", "curl http://web/search?q=exploit")] = "no results"
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is True
assert result.name == "patchability"
@pytest.mark.asyncio
async def test_patchability_fails_when_exploit_still_works(mock_containers):
"""Patch applied but golden path step still succeeds -> FAIL."""
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="sed -i 's/unsafe/safe/' /var/www/app.php",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=exploit"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=exploit",
expect_in_stdout="SECRET_DATA",
),
],
)
# Remediation exec succeeds
mock_containers.exec_results[("web", "sed")] = ""
# After patch, golden path step STILL returns the expected output (patch didn't work)
mock_containers.exec_results[("attacker", "curl http://web/search?q=exploit")] = "SECRET_DATA"
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
assert "exploitable after remediation" in result.error
@pytest.mark.asyncio
async def test_patchability_fails_when_remediation_command_exits_nonzero(mock_containers):
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="sed -i 's/unsafe/safe/' /var/www/app.php",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=exploit"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=exploit",
expect_in_stdout="SECRET_DATA",
),
],
)
mock_containers.exec_results[("web", "sed")] = "sed: cannot read /var/www/app.php"
mock_containers.exec_status[("web", "sed")] = 2
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
first = result.details["vuln_results"][0]
assert first["passed"] is False
assert "remediation command failed" in first["reason"]
assert mock_containers.restarted == ["web"]
@pytest.mark.asyncio
async def test_patchability_fails_when_retest_command_is_inconclusive(mock_containers):
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="sed -i 's/unsafe/safe/' /var/www/app.php",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=exploit"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=exploit",
expect_in_stdout="SECRET_DATA",
),
],
)
mock_containers.exec_results[("web", "sed")] = ""
mock_containers.exec_results[("attacker", "curl http://web/search?q=exploit")] = "curl: (7) failed to connect"
mock_containers.exec_status[("attacker", "curl http://web/search?q=exploit")] = 7
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
vuln_result = result.details["vuln_results"][0]
assert vuln_result["passed"] is False
assert vuln_result["reason"] == "retest inconclusive after remediation"
assert "retest command failed (exit_code=7)" in vuln_result["details"][0]["reason"]
assert mock_containers.restarted == ["web"]
@pytest.mark.asyncio
async def test_patchability_fails_when_retest_execution_is_inconclusive(mock_containers):
from open_range.protocols import ExecResult, ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="sed -i 's/unsafe/safe/' /var/www/app.php",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=exploit"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=exploit",
expect_in_stdout="SECRET_DATA",
),
],
)
async def exec_run_side_effect(container: str, cmd: str, **kwargs):
if container == "web" and "sed -i 's/unsafe/safe/' /var/www/app.php" in cmd:
return ExecResult(stdout="", exit_code=0)
if container == "attacker" and "curl http://web/search?q=exploit" in cmd:
raise RuntimeError("timeout")
return ExecResult(stdout="", exit_code=0)
mock_containers.exec_run = exec_run_side_effect
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
vuln_result = result.details["vuln_results"][0]
assert vuln_result["passed"] is False
assert vuln_result["reason"] == "retest inconclusive after remediation"
assert vuln_result["details"][0]["reason"] == "retest execution raised: timeout"
assert mock_containers.restarted == ["web"]
@pytest.mark.asyncio
async def test_patchability_skips_prose_remediation(mock_containers):
"""Non-executable remediation (prose) is skipped with warning, fails if all skipped."""
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="Use parameterized queries instead of string concatenation",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=exploit"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=exploit",
expect_in_stdout="SECRET_DATA",
),
],
)
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
# Verify it was recorded as a failure (not silently skipped)
vuln_results = result.details["vuln_results"]
assert len(vuln_results) == 1
assert vuln_results[0]["passed"] is False
assert "not executable" in vuln_results[0]["reason"]
@pytest.mark.asyncio
async def test_patchability_fails_when_all_skipped(mock_containers):
"""If all vulns are skipped (no testable remediation), check FAILS."""
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="", # empty remediation
),
Vulnerability(
id="v2",
type="xss",
host="web",
remediation="Sanitize all user input before rendering", # prose
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="cmd1"),
ExploitStep(vuln_id="v2", command="cmd2"),
],
),
golden_path=[
GoldenPathStep(step=1, command="cmd1", expect_in_stdout="found"),
GoldenPathStep(step=2, command="cmd2", expect_in_stdout="xss"),
],
)
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is False
assert "no vulns had testable remediation" in result.error
@pytest.mark.asyncio
async def test_patchability_restarts_container_after_patch(mock_containers):
"""Container is restarted after each vuln's test to restore pre-patched state."""
from open_range.protocols import ExploitStep
from open_range.validator.patchability import PatchabilityCheck
spec = SnapshotSpec(
truth_graph=TruthGraph(
vulns=[
Vulnerability(
id="v1",
type="sqli",
host="web",
remediation="sed -i 's/bad/good/' /app.php",
),
Vulnerability(
id="v2",
type="lfi",
host="web",
remediation="rm /etc/sensitive_file",
),
],
exploit_chain=[
ExploitStep(vuln_id="v1", command="curl http://web/search?q=inject"),
ExploitStep(vuln_id="v2", command="curl http://web/read?file=../../etc/passwd"),
],
),
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/search?q=inject",
expect_in_stdout="SQLI_RESULT",
),
GoldenPathStep(
step=2,
command="curl http://web/read?file=../../etc/passwd",
expect_in_stdout="root:x:0",
),
],
)
# Remediation succeeds for both
mock_containers.exec_results[("web", "sed")] = ""
mock_containers.exec_results[("web", "rm")] = ""
# After patch, exploits fail (return empty)
mock_containers.exec_results[("attacker", "curl http://web/search")] = "blocked"
mock_containers.exec_results[("attacker", "curl http://web/read")] = "forbidden"
result = await PatchabilityCheck().check(spec, mock_containers)
assert result.passed is True
# Container should have been restarted once per tested vuln
assert mock_containers.restarted == ["web", "web"]
# ---------------------------------------------------------------------------
# Check 4: Evidence
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_evidence_passes_vacuously_when_no_spec(mock_containers):
from open_range.validator.evidence import EvidenceCheck
spec = SnapshotSpec(evidence_spec=[])
result = await EvidenceCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_evidence_fails_when_pattern_missing(mock_containers):
from open_range.validator.evidence import EvidenceCheck
spec = SnapshotSpec(
evidence_spec=[
EvidenceItem(type="log_entry", location="siem:/var/log/test.log", pattern="ATTACK"),
]
)
# Return "0" matches from grep
mock_containers.exec_results[("siem", "grep")] = "0"
result = await EvidenceCheck().check(spec, mock_containers)
assert result.passed is False
@pytest.mark.asyncio
async def test_evidence_fails_when_grep_returns_error_text(mock_containers):
from open_range.validator.evidence import EvidenceCheck
spec = SnapshotSpec(
evidence_spec=[
EvidenceItem(type="log_entry", location="siem:/var/log/missing.log", pattern="ATTACK"),
]
)
mock_containers.exec_results[("siem", "grep")] = "grep: /var/log/missing.log: No such file or directory"
mock_containers.exec_status[("siem", "grep")] = 2
result = await EvidenceCheck().check(spec, mock_containers)
assert result.passed is False
assert "No such file or directory" in result.details["missing"][0]["error"]
@pytest.mark.asyncio
async def test_evidence_fails_on_nonzero_exit_even_when_output_present(mock_containers):
from open_range.validator.evidence import EvidenceCheck
spec = SnapshotSpec(
evidence_spec=[
EvidenceItem(type="artifact", location="siem:/var/log/test.log"),
]
)
mock_containers.exec_status[("siem", "test -f")] = 1
result = await EvidenceCheck().check(spec, mock_containers)
assert result.passed is False
assert result.details["missing"][0]["location"] == "siem:/var/log/test.log"
@pytest.mark.asyncio
async def test_evidence_quotes_pattern_and_location_path():
"""Evidence grep command must quote pattern and path from snapshot content."""
import shlex
from open_range.validator.evidence import EvidenceCheck
class RecordingContainers:
def __init__(self) -> None:
self.calls: list[tuple[str, str]] = []
async def exec_run(self, container: str, cmd: str, **kwargs) -> ExecResult:
self.calls.append((container, cmd))
return ExecResult(stdout="1", exit_code=0)
containers = RecordingContainers()
pattern = "ERR'; touch /tmp/pwn #"
path = "/var/log/app; echo PWNED"
spec = SnapshotSpec(
evidence_spec=[
EvidenceItem(type="log_entry", location=f"siem:{path}", pattern=pattern),
],
)
result = await EvidenceCheck().check(spec, containers) # type: ignore[arg-type]
assert result.passed is True
assert containers.calls
assert containers.calls[0][1] == f"grep -c {shlex.quote(pattern)} {shlex.quote(path)}"
# ---------------------------------------------------------------------------
# Check 5: Reward grounding
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reward_grounding_passes_when_flag_found(mock_containers):
from open_range.validator.reward_grounding import RewardGroundingCheck
spec = SnapshotSpec(
flags=[FlagSpec(id="f1", value="FLAG{abc}", path="/var/flags/f.txt", host="web")]
)
mock_containers.exec_results[("web", "cat")] = "FLAG{abc}"
result = await RewardGroundingCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_reward_grounding_fails_when_flag_wrong(mock_containers):
from open_range.validator.reward_grounding import RewardGroundingCheck
spec = SnapshotSpec(
flags=[FlagSpec(id="f1", value="FLAG{abc}", path="/var/flags/f.txt", host="web")]
)
mock_containers.exec_results[("web", "cat")] = "FLAG{wrong}"
result = await RewardGroundingCheck().check(spec, mock_containers)
assert result.passed is False
@pytest.mark.asyncio
async def test_reward_grounding_no_flags(mock_containers):
from open_range.validator.reward_grounding import RewardGroundingCheck
spec = SnapshotSpec(flags=[])
result = await RewardGroundingCheck().check(spec, mock_containers)
assert result.passed is False
@pytest.mark.asyncio
async def test_reward_grounding_verifies_db_flag(mock_containers):
"""DB-stored flag verified via MySQL query — value found."""
from open_range.validator.reward_grounding import RewardGroundingCheck
spec = SnapshotSpec(
flags=[FlagSpec(id="f1", value="FLAG{db_flag}", path="db:flags.secrets.flag", host="db")]
)
mock_containers.exec_results[("db", "mysql")] = "FLAG{db_flag}"
result = await RewardGroundingCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_reward_grounding_fails_db_flag_not_found(mock_containers):
"""DB-stored flag verified via MySQL query — wrong value returned."""
from open_range.validator.reward_grounding import RewardGroundingCheck
spec = SnapshotSpec(
flags=[FlagSpec(id="f1", value="FLAG{db_flag}", path="db:flags.secrets.flag", host="db")]
)
mock_containers.exec_results[("db", "mysql")] = "FLAG{wrong_value}"
result = await RewardGroundingCheck().check(spec, mock_containers)
assert result.passed is False
assert "1 flag(s)" in result.error
@pytest.mark.asyncio
async def test_reward_grounding_skips_db_sql_path(mock_containers):
"""db:sql deployment artifacts are skipped, not treated as flag locations."""
from open_range.validator.reward_grounding import RewardGroundingCheck
spec = SnapshotSpec(
flags=[
FlagSpec(id="f1", value="FLAG{abc}", path="/var/flags/f.txt", host="web"),
FlagSpec(id="f2", value="FLAG{deploy}", path="db:sql", host="db"),
]
)
mock_containers.exec_results[("web", "cat")] = "FLAG{abc}"
result = await RewardGroundingCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_reward_grounding_quotes_filesystem_path():
"""Filesystem flag paths with shell metacharacters must be quoted."""
from open_range.validator.reward_grounding import RewardGroundingCheck
class RecordingContainers:
def __init__(self):
self.calls: list[tuple[str, str]] = []
async def exec_run(self, container: str, cmd: str, **kwargs):
from open_range.protocols import ExecResult
self.calls.append((container, cmd))
return ExecResult(stdout="FLAG{abc}", exit_code=0)
async def exec(self, container: str, cmd: str, **kwargs) -> str:
return (await self.exec_run(container, cmd, **kwargs)).combined_output
containers = RecordingContainers()
spec = SnapshotSpec(
flags=[FlagSpec(id="f1", value="FLAG{abc}", path="/tmp/f; echo PWNED", host="web")]
)
result = await RewardGroundingCheck().check(spec, containers) # type: ignore[arg-type]
assert result.passed is True
assert containers.calls
assert containers.calls[0][1] == "cat -- '/tmp/f; echo PWNED'"
@pytest.mark.asyncio
async def test_reward_grounding_rejects_invalid_db_identifier_path():
"""Malformed DB paths must fail rather than altering SQL semantics."""
from open_range.validator.reward_grounding import RewardGroundingCheck
class RecordingContainers:
def __init__(self):
self.calls: list[tuple[str, str]] = []
async def exec_run(self, container: str, cmd: str, **kwargs):
from open_range.protocols import ExecResult
self.calls.append((container, cmd))
return ExecResult(stdout="FLAG{abc}", exit_code=0)
async def exec(self, container: str, cmd: str, **kwargs) -> str:
return (await self.exec_run(container, cmd, **kwargs)).combined_output
containers = RecordingContainers()
spec = SnapshotSpec(
flags=[
FlagSpec(
id="f1",
value="FLAG{abc}",
path="db:flags.secrets.flag FROM secrets; SELECT 'x' --",
host="db",
)
]
)
result = await RewardGroundingCheck().check(spec, containers) # type: ignore[arg-type]
assert result.passed is False
assert "invalid db flag path format" in result.details["results"][0]["error"]
assert containers.calls == []
@pytest.mark.asyncio
async def test_reward_grounding_quotes_mysql_password_from_snapshot():
"""DB checks must not rely on unquoted shell expansion for credentials."""
import shlex
from open_range.validator.reward_grounding import RewardGroundingCheck
class RecordingContainers:
def __init__(self):
self.calls: list[tuple[str, str]] = []
async def exec_run(self, container: str, cmd: str, **kwargs):
from open_range.protocols import ExecResult
self.calls.append((container, cmd))
return ExecResult(stdout="FLAG{abc}", exit_code=0)
async def exec(self, container: str, cmd: str, **kwargs) -> str:
return (await self.exec_run(container, cmd, **kwargs)).combined_output
containers = RecordingContainers()
password = "pa ss;$(id)"
spec = SnapshotSpec(
topology={"mysql_root_password": password},
flags=[FlagSpec(id="f1", value="FLAG{abc}", path="db:flags.secrets.flag", host="db")],
)
result = await RewardGroundingCheck().check(spec, containers) # type: ignore[arg-type]
assert result.passed is True
assert containers.calls
cmd = containers.calls[0][1]
assert cmd.startswith(
f"MYSQL_PWD={shlex.quote(password)} mysql -u root -N -e "
)
assert "-p$MYSQL_ROOT_PASSWORD" not in cmd
# ---------------------------------------------------------------------------
# Check 6: Isolation
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_isolation_passes_clean_briefing(mock_containers):
from open_range.validator.isolation import IsolationCheck
spec = SnapshotSpec(
topology={"hosts": ["web"], "zones": {}},
flags=[FlagSpec(id="f1", value="FLAG{secret}", path="/f.txt", host="web")],
golden_path=[],
task=TaskSpec(
red_briefing="Investigate the network.",
blue_briefing="Monitor for threats.",
),
)
result = await IsolationCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_isolation_fails_when_flag_in_briefing(mock_containers):
from open_range.validator.isolation import IsolationCheck
spec = SnapshotSpec(
topology={"hosts": ["web"], "zones": {}},
flags=[FlagSpec(id="f1", value="FLAG{secret}", path="/f.txt", host="web")],
golden_path=[],
task=TaskSpec(
red_briefing="The flag is FLAG{secret}, go find it.",
blue_briefing="Monitor for threats.",
),
)
result = await IsolationCheck().check(spec, mock_containers)
assert result.passed is False
assert "flag value leaked" in result.error
@pytest.mark.asyncio
async def test_isolation_checks_multiple_ports(mock_containers):
"""Zone isolation probes multiple ports — all CLOSED means pass."""
from open_range.validator.isolation import IsolationCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "db"],
"zones": {"internal": ["db"]},
},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
# All probes return CLOSED
mock_containers.exec_results[("attacker", "/dev/tcp/")] = "CLOSED"
result = await IsolationCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_isolation_fails_on_non_ssh_port(mock_containers):
"""Zone isolation detects reachability on a non-SSH port (e.g. 3306)."""
from open_range.validator.isolation import IsolationCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "db"],
"zones": {"internal": ["db"]},
},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
# Only port 3306 is OPEN; everything else CLOSED.
async def exec_side_effect(container, cmd, **kwargs):
if container == "attacker" and "/dev/tcp/" in cmd:
if " 3306 " in cmd:
return "OPEN"
return "CLOSED"
return ""
mock_containers.exec = exec_side_effect
result = await IsolationCheck().check(spec, mock_containers)
assert result.passed is False
assert "3306" in result.error
assert "db" in result.error
@pytest.mark.asyncio
async def test_isolation_uses_argument_safe_tcp_probe_for_target_name():
"""Target names are passed as positional args, not interpolated into script."""
from open_range.validator.isolation import IsolationCheck
class RecordingContainers:
def __init__(self) -> None:
self.calls: list[tuple[str, str]] = []
async def exec(self, container: str, cmd: str, **kwargs) -> str:
self.calls.append((container, cmd))
return "CLOSED"
containers = RecordingContainers()
target = "db'; touch /tmp/pwn #"
spec = SnapshotSpec(
topology={"hosts": ["attacker", "db"], "zones": {"internal": [target]}},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
result = await IsolationCheck().check(spec, containers) # type: ignore[arg-type]
assert result.passed is True
assert containers.calls
first_cmd = containers.calls[0][1]
script_part, _, arg_part = first_cmd.partition(" _ ")
assert "bash -lc 'echo > /dev/tcp/\"$1\"/\"$2\"'" in script_part
assert "touch /tmp/pwn" not in script_part
assert "touch /tmp/pwn" in arg_part
# ---------------------------------------------------------------------------
# Check 7: Task feasibility
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_task_feasibility_passes_valid_spec(sample_snapshot_spec, mock_containers):
from open_range.validator.task_feasibility import TaskFeasibilityCheck
result = await TaskFeasibilityCheck().check(sample_snapshot_spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_task_feasibility_fails_unknown_flag_host(mock_containers):
from open_range.validator.task_feasibility import TaskFeasibilityCheck
spec = SnapshotSpec(
topology={"hosts": ["web", "attacker"], "zones": {}},
flags=[FlagSpec(id="f1", value="FLAG{x}", path="/f.txt", host="nonexistent")],
golden_path=[],
)
result = await TaskFeasibilityCheck().check(spec, mock_containers)
assert result.passed is False
@pytest.mark.asyncio
async def test_task_feasibility_rejects_seed_vuln_without_task_path(mock_containers):
from open_range.validator.task_feasibility import TaskFeasibilityCheck
spec = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"zones": {"external": ["attacker"], "dmz": ["web"], "internal": ["db"]},
"dependency_edges": [{"source": "attacker", "target": "web"}],
"host_details": {
"attacker": {"services": ["nmap"]},
"web": {"services": ["nginx", "php-fpm"]},
"db": {"services": ["mysql"]},
},
},
truth_graph=TruthGraph(
vulns=[Vulnerability(id="path_traversal_1", type="path_traversal", host="web")],
exploit_chain=[
ExploitStep(
vuln_id="path_traversal_1",
command="curl 'http://web/download?file=../../../var/flags/path_traversal_mut1.txt'",
description="download planted flag",
)
],
),
flags=[FlagSpec(id="flag1", value="FLAG{path_mut1}", path="/var/flags/path_traversal_mut1.txt", host="web")],
evidence_spec=[EvidenceItem(type="log_entry", location="web:/var/log/app/access.log")],
golden_path=[
GoldenPathStep(
step=1,
command="curl http://web/",
expect_in_stdout="Welcome",
host="attacker",
)
],
task=TaskSpec(red_briefing="go", blue_briefing="watch"),
mutation_plan=MutationPlan(
parent_snapshot_id="root_snap",
ops=[
MutationOp(
mutation_id="seed_path",
op_type="seed_vuln",
target_selector={"host": "web"},
params={
"vuln_type": "path_traversal",
"instantiated_vuln_id": "path_traversal_1",
"instantiated_flag_id": "flag1",
"instantiated_flag_value": "FLAG{path_mut1}",
"instantiated_flag_host": "web",
"instantiated_exploit_command": "curl 'http://web/download?file=../../../var/flags/path_traversal_mut1.txt'",
},
)
],
),
)
spec.lineage.parent_snapshot_id = "root_snap"
spec.lineage.generation_depth = 1
result = await TaskFeasibilityCheck().check(spec, mock_containers)
assert result.passed is False
assert any(
"missing submit_flag step" in issue for issue in result.details["issues"]
)
# ---------------------------------------------------------------------------
# Check 8: Difficulty
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_difficulty_passes_tier1_with_8_steps(mock_containers):
from open_range.validator.difficulty import DifficultyCheck
steps = [GoldenPathStep(step=i, command=f"cmd_{i}") for i in range(1, 9)]
spec = SnapshotSpec(
topology={"tier": 1},
golden_path=steps,
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
)
result = await DifficultyCheck().check(spec, mock_containers)
assert result.passed is True
@pytest.mark.asyncio
async def test_difficulty_fails_tier1_with_3_steps(mock_containers):
from open_range.validator.difficulty import DifficultyCheck
steps = [GoldenPathStep(step=i, command=f"cmd_{i}") for i in range(1, 4)]
spec = SnapshotSpec(
topology={"tier": 1},
golden_path=steps,
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
)
result = await DifficultyCheck().check(spec, mock_containers)
assert result.passed is False
assert "steps" in result.error
@pytest.mark.asyncio
async def test_difficulty_fails_single_step(mock_containers):
from open_range.validator.difficulty import DifficultyCheck
spec = SnapshotSpec(
topology={"tier": 1},
golden_path=[GoldenPathStep(step=1, command="cmd1")],
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
)
result = await DifficultyCheck().check(spec, mock_containers)
assert result.passed is False
assert "trivial" in result.error or "steps" in result.error
@pytest.mark.asyncio
async def test_difficulty_fails_duplicate_consecutive(mock_containers):
from open_range.validator.difficulty import DifficultyCheck
steps = [GoldenPathStep(step=i, command="same_cmd") for i in range(1, 9)]
spec = SnapshotSpec(
topology={"tier": 1},
golden_path=steps,
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
)
result = await DifficultyCheck().check(spec, mock_containers)
assert result.passed is False
assert "duplicate" in result.error
# ---------------------------------------------------------------------------
# Check 9: NPC consistency — mechanical card validation
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_npc_consistency_passes_no_personas(mock_containers):
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(npc_personas=[])
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is True
assert result.advisory is True
@pytest.mark.asyncio
async def test_npc_consistency_fails_high_awareness_high_susceptibility(mock_containers):
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="Alice",
security_awareness=0.9,
susceptibility={"phishing_email": 0.8},
)
]
)
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is False
assert result.advisory is True
@pytest.mark.asyncio
async def test_npc_consistency_fails_out_of_range_awareness(mock_containers):
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(name="BadAwareness", security_awareness=1.5)
]
)
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is False
assert "out of [0, 1]" in result.details["issues"][0]
@pytest.mark.asyncio
async def test_npc_consistency_fails_low_awareness_low_susceptibility(mock_containers):
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="Bob",
security_awareness=0.2,
susceptibility={"phishing_email": 0.1, "vishing": 0.1},
)
]
)
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is False
assert "all susceptibility scores < 0.3" in result.details["issues"][0]
@pytest.mark.asyncio
async def test_npc_consistency_passes_valid_personas(mock_containers):
"""Valid personas with consistent awareness and susceptibility pass.
David (high awareness) passes the LLM test by correctly rejecting phishing.
Janet (mid-range awareness) is not tested by LLM — only mid-range skips.
"""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="David Chen",
role="CISO",
department="Security",
security_awareness=0.95,
susceptibility={
"phishing_email": 0.05,
"credential_sharing": 0.01,
"attachment_opening": 0.1,
"vishing": 0.05,
},
accounts={"ldap": "dchen"},
),
NPCPersona(
name="Janet Smith",
role="Marketing Coordinator",
department="Marketing",
security_awareness=0.5,
susceptibility={
"phishing_email": 0.5,
"credential_sharing": 0.3,
},
),
]
)
# David (high awareness) should report phishing to IT.
mock_acompletion = AsyncMock(
return_value=_make_llm_response(
"report_to_IT",
"This is clearly a phishing attempt.",
["urgency", "suspicious link"],
)
)
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is True
# Only David should be tested (high awareness) — Janet is mid-range.
assert mock_acompletion.call_count == 1
# ---------------------------------------------------------------------------
# Check 9: NPC consistency — LLM behavioral tests
# ---------------------------------------------------------------------------
def _make_llm_response(action: str, reasoning: str = "", suspicious: list | None = None):
"""Helper to create a mock litellm response."""
content = json.dumps({
"action": action,
"reasoning": reasoning,
"suspicious_elements_noticed": suspicious or [],
})
message = MagicMock()
message.content = content
choice = MagicMock()
choice.message = message
response = MagicMock()
response.choices = [choice]
return response
@pytest.mark.asyncio
async def test_npc_consistency_llm_high_awareness_rejects_phishing(mock_containers):
"""High-awareness NPC correctly rejects phishing — check passes."""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="David Chen",
role="CISO",
department="Security",
security_awareness=0.95,
susceptibility={
"phishing_email": 0.05,
"credential_sharing": 0.01,
},
accounts={"ldap": "dchen"},
),
]
)
mock_acompletion = AsyncMock(
return_value=_make_llm_response(
"report_to_IT",
"This looks like a phishing attempt.",
["suspicious link", "urgency"],
)
)
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is True
assert result.advisory is True
@pytest.mark.asyncio
async def test_npc_consistency_llm_high_awareness_falls_for_phishing(mock_containers):
"""High-awareness NPC falls for phishing — check fails."""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="David Chen",
role="CISO",
department="Security",
security_awareness=0.95,
susceptibility={
"phishing_email": 0.05,
},
accounts={"ldap": "dchen"},
),
]
)
mock_acompletion = AsyncMock(
return_value=_make_llm_response(
"click_link",
"I should reset my password as requested.",
)
)
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is False
assert result.advisory is True
assert "fell for test phishing" in result.details["issues"][0]
@pytest.mark.asyncio
async def test_npc_consistency_llm_low_awareness_falls_for_phishing(mock_containers):
"""Low-awareness NPC falls for phishing — check passes (expected behavior)."""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="Janet Smith",
role="Marketing Coordinator",
department="Marketing",
security_awareness=0.2,
susceptibility={
"phishing_email": 0.7,
"attachment_opening": 0.8,
},
accounts={"ldap": "jsmith"},
),
]
)
mock_acompletion = AsyncMock(
return_value=_make_llm_response(
"click_link",
"I need to reset my password right away!",
)
)
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is True
assert result.advisory is True
@pytest.mark.asyncio
async def test_npc_consistency_llm_low_awareness_rejects_phishing(mock_containers):
"""Low-awareness NPC rejects phishing — check fails (miscalibrated persona)."""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="Janet Smith",
role="Marketing Coordinator",
department="Marketing",
security_awareness=0.2,
susceptibility={
"phishing_email": 0.7,
},
accounts={"ldap": "jsmith"},
),
]
)
mock_acompletion = AsyncMock(
return_value=_make_llm_response(
"report_to_IT",
"This seems suspicious, I'll forward it to IT.",
["suspicious URL"],
)
)
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is False
assert result.advisory is True
assert "rejected test phishing" in result.details["issues"][0]
assert "miscalibrated" in result.details["issues"][0]
@pytest.mark.asyncio
async def test_npc_consistency_llm_failure_degrades_gracefully(mock_containers):
"""LLM failure does not cause the check to fail — degrades gracefully."""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="David Chen",
role="CISO",
security_awareness=0.95,
susceptibility={"phishing_email": 0.05},
accounts={"ldap": "dchen"},
),
]
)
mock_acompletion = AsyncMock(side_effect=Exception("API rate limit exceeded"))
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
# LLM failure should not cause the check to fail — mechanical checks passed.
assert result.passed is True
assert result.advisory is True
@pytest.mark.asyncio
async def test_npc_consistency_skips_mid_awareness_llm_test(mock_containers):
"""Mid-range awareness personas (0.3-0.8) skip LLM behavioral test."""
from open_range.validator.npc_consistency import NPCConsistencyCheck
spec = SnapshotSpec(
npc_personas=[
NPCPersona(
name="Bob Neutral",
role="Accountant",
security_awareness=0.5,
susceptibility={"phishing_email": 0.5},
),
]
)
# The LLM should NOT be called for mid-range personas.
mock_acompletion = AsyncMock(
side_effect=AssertionError("LLM should not be called for mid-range awareness")
)
with patch("litellm.acompletion", mock_acompletion):
result = await NPCConsistencyCheck().check(spec, mock_containers)
assert result.passed is True
mock_acompletion.assert_not_called()
# ---------------------------------------------------------------------------
# Check 10: Realism review (LLM advisory)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_realism_review_advisory_flag(mock_containers):
from open_range.validator.realism_review import RealismReviewCheck
spec = SnapshotSpec(topology={"hosts": ["web"], "tier": 1})
with patch("litellm.acompletion", AsyncMock(side_effect=Exception("no provider configured"))):
result = await RealismReviewCheck().check(spec, mock_containers)
# Should pass (advisory) when the LLM path is unavailable or misconfigured.
assert result.advisory is True
assert result.passed is True
@pytest.mark.asyncio
async def test_realism_review_passes_with_llm(mock_containers):
"""Realism review passes when LLM finds no issues."""
from open_range.validator.realism_review import RealismReviewCheck
spec = SnapshotSpec(
topology={"hosts": ["web", "db"], "tier": 1},
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
golden_path=[GoldenPathStep(step=i, command=f"cmd_{i}") for i in range(1, 9)],
task=TaskSpec(
red_briefing="Investigate the corporate network.",
blue_briefing="Monitor for threats.",
),
)
llm_response_content = json.dumps({"pass": True, "issues": []})
message = MagicMock()
message.content = llm_response_content
choice = MagicMock()
choice.message = message
mock_response = MagicMock()
mock_response.choices = [choice]
mock_acompletion = AsyncMock(return_value=mock_response)
with patch("litellm.acompletion", mock_acompletion):
result = await RealismReviewCheck().check(spec, mock_containers)
assert result.passed is True
assert result.advisory is True
assert result.details["issues"] == []
@pytest.mark.asyncio
async def test_realism_review_fails_with_issues(mock_containers):
"""Realism review fails when LLM finds issues (but still advisory)."""
from open_range.validator.realism_review import RealismReviewCheck
spec = SnapshotSpec(
topology={"hosts": ["web"], "tier": 1},
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
golden_path=[GoldenPathStep(step=1, command="cmd_1")],
task=TaskSpec(
red_briefing="Find the SQL injection vulnerability.",
blue_briefing="Monitor for threats.",
),
)
issues = ["Red briefing leaks vulnerability type (SQL injection)"]
llm_response_content = json.dumps({"pass": False, "issues": issues})
message = MagicMock()
message.content = llm_response_content
choice = MagicMock()
choice.message = message
mock_response = MagicMock()
mock_response.choices = [choice]
mock_acompletion = AsyncMock(return_value=mock_response)
with patch("litellm.acompletion", mock_acompletion):
result = await RealismReviewCheck().check(spec, mock_containers)
assert result.passed is False
assert result.advisory is True
assert "SQL injection" in result.error
assert result.details["issues"] == issues
@pytest.mark.asyncio
async def test_realism_review_never_sends_flags_to_llm(mock_containers):
"""Verify the redacted summary sent to LLM never contains flag values."""
from open_range.validator.realism_review import RealismReviewCheck
flag_value = "FLAG{super_secret_123}"
spec = SnapshotSpec(
topology={"hosts": ["web", "db"], "tier": 1},
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
flags=[FlagSpec(id="f1", value=flag_value, path="/f.txt", host="db")],
golden_path=[
GoldenPathStep(
step=1,
command=f"cat /f.txt # reveals {flag_value}",
expect_in_stdout=flag_value,
),
],
task=TaskSpec(
red_briefing="Investigate the network.",
blue_briefing="Monitor for threats.",
),
)
captured_messages = []
async def capturing_acompletion(**kwargs):
captured_messages.append(kwargs.get("messages", []))
content = json.dumps({"pass": True, "issues": []})
message = MagicMock()
message.content = content
choice = MagicMock()
choice.message = message
response = MagicMock()
response.choices = [choice]
return response
with patch("litellm.acompletion", capturing_acompletion):
result = await RealismReviewCheck().check(spec, mock_containers)
assert result.passed is True
# Verify the flag value was NOT in the messages sent to LLM
for msg_list in captured_messages:
for msg in msg_list:
assert flag_value not in msg.get("content", ""), \
"Flag value was leaked to the LLM reviewer!"
# Also verify golden path commands are not leaked
assert "cat /f.txt" not in msg.get("content", ""), \
"Golden path command was leaked to the LLM reviewer!"
@pytest.mark.asyncio
async def test_realism_review_llm_failure_degrades_gracefully(mock_containers):
"""LLM failure should not block validation — passes with advisory note."""
from open_range.validator.realism_review import RealismReviewCheck
spec = SnapshotSpec(
topology={"hosts": ["web"], "tier": 1},
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
golden_path=[GoldenPathStep(step=i, command=f"cmd_{i}") for i in range(1, 9)],
task=TaskSpec(red_briefing="Investigate.", blue_briefing="Monitor."),
)
mock_acompletion = AsyncMock(side_effect=Exception("API key invalid"))
with patch("litellm.acompletion", mock_acompletion):
result = await RealismReviewCheck().check(spec, mock_containers)
# Should pass even though LLM failed — advisory check degrades gracefully.
assert result.passed is True
assert result.advisory is True
assert "LLM review failed" in result.details["note"]
@pytest.mark.asyncio
async def test_realism_review_malformed_llm_response(mock_containers):
"""Malformed LLM JSON response degrades gracefully."""
from open_range.validator.realism_review import RealismReviewCheck
spec = SnapshotSpec(
topology={"hosts": ["web"], "tier": 1},
truth_graph=TruthGraph(vulns=[
Vulnerability(id="v1", type="sqli", host="web"),
]),
golden_path=[GoldenPathStep(step=i, command=f"cmd_{i}") for i in range(1, 9)],
task=TaskSpec(red_briefing="Investigate.", blue_briefing="Monitor."),
)
# Return invalid JSON
message = MagicMock()
message.content = "not valid json at all"
choice = MagicMock()
choice.message = message
mock_response = MagicMock()
mock_response.choices = [choice]
mock_acompletion = AsyncMock(return_value=mock_response)
with patch("litellm.acompletion", mock_acompletion):
result = await RealismReviewCheck().check(spec, mock_containers)
# Should pass gracefully — bad JSON is handled like an LLM failure.
assert result.passed is True
assert result.advisory is True
# ---------------------------------------------------------------------------
# ValidatorGate
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_validator_gate_all_pass(sample_snapshot_spec, mock_containers):
"""Gate with passing checks returns passed=True."""
class AlwaysPass:
async def check(self, snapshot, containers):
return CheckResult(name="pass_check", passed=True)
gate = ValidatorGate([AlwaysPass(), AlwaysPass()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert len(result.checks) == 2
assert result.total_time_s >= 0
@pytest.mark.asyncio
async def test_validator_gate_fail_fast_on_mechanical(sample_snapshot_spec, mock_containers):
"""Mechanical failure stops the pipeline."""
class Fail:
async def check(self, snapshot, containers):
return CheckResult(name="fail_check", passed=False, error="broken")
class NeverReached:
async def check(self, snapshot, containers):
raise AssertionError("should not be reached")
gate = ValidatorGate([Fail(), NeverReached()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is False
assert len(result.checks) == 1 # second check was never run
@pytest.mark.asyncio
async def test_validator_gate_advisory_does_not_block(sample_snapshot_spec, mock_containers):
"""Advisory check failure does not prevent overall pass."""
class Pass:
async def check(self, snapshot, containers):
return CheckResult(name="ok", passed=True)
class AdvisoryFail:
async def check(self, snapshot, containers):
return CheckResult(name="adv", passed=False, advisory=True, error="meh")
gate = ValidatorGate([Pass(), AdvisoryFail()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert len(result.checks) == 2
@pytest.mark.asyncio
async def test_validator_gate_catches_exception(sample_snapshot_spec, mock_containers):
"""Unhandled exception in a check is caught and recorded as failure."""
class Boom:
async def check(self, snapshot, containers):
raise RuntimeError("kaboom")
gate = ValidatorGate([Boom()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is False
assert "kaboom" in result.checks[0].error
@pytest.mark.asyncio
async def test_validator_gate_advisory_failure_does_not_stop_pipeline(
sample_snapshot_spec, mock_containers
):
"""Advisory failures do not stop the pipeline — subsequent checks still run."""
class MechanicalPass:
async def check(self, snapshot, containers):
return CheckResult(name="mechanical_ok", passed=True)
class AdvisoryFail:
async def check(self, snapshot, containers):
return CheckResult(name="advisory_fail", passed=False, advisory=True, error="soft fail")
class SecondMechanicalPass:
async def check(self, snapshot, containers):
return CheckResult(name="mechanical_ok_2", passed=True)
gate = ValidatorGate([MechanicalPass(), AdvisoryFail(), SecondMechanicalPass()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert len(result.checks) == 3 # All three checks ran
@pytest.mark.asyncio
async def test_validator_gate_mixed_mechanical_advisory(sample_snapshot_spec, mock_containers):
"""Mechanical pass + advisory fail = overall pass."""
checks_run = []
class Mechanical:
async def check(self, snapshot, containers):
checks_run.append("mechanical")
return CheckResult(name="mechanical", passed=True)
class Advisory:
async def check(self, snapshot, containers):
checks_run.append("advisory")
return CheckResult(name="advisory", passed=False, advisory=True, error="issue")
gate = ValidatorGate([Mechanical(), Advisory()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert checks_run == ["mechanical", "advisory"]
# Advisory failure should be recorded
assert result.checks[1].passed is False
assert result.checks[1].advisory is True
@pytest.mark.asyncio
async def test_validator_gate_mechanical_fail_before_advisory(
sample_snapshot_spec, mock_containers
):
"""Mechanical failure before advisory checks prevents advisory from running."""
class MechanicalFail:
async def check(self, snapshot, containers):
return CheckResult(name="mech_fail", passed=False, error="hard fail")
class AdvisoryNeverReached:
async def check(self, snapshot, containers):
raise AssertionError("should not reach advisory check")
gate = ValidatorGate([MechanicalFail(), AdvisoryNeverReached()])
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is False
assert len(result.checks) == 1
# ---------------------------------------------------------------------------
# Full 10-check pipeline simulation
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_full_pipeline_all_mechanical_pass_advisory_pass(
sample_snapshot_spec, mock_containers
):
"""Simulate all 10 checks passing."""
class MechanicalPass:
async def check(self, snapshot, containers):
return CheckResult(name="mechanical", passed=True)
class NPCConsistencyCheck:
async def check(self, snapshot, containers):
return CheckResult(name="npc_consistency", passed=True, advisory=True)
class RealismReviewCheck:
async def check(self, snapshot, containers):
return CheckResult(name="realism_review", passed=True, advisory=True)
checks = [MechanicalPass() for _ in range(8)]
checks.append(NPCConsistencyCheck())
checks.append(RealismReviewCheck())
gate = ValidatorGate(checks)
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert len(result.checks) == 10
@pytest.mark.asyncio
async def test_full_pipeline_mechanical_pass_advisory_fail(
sample_snapshot_spec, mock_containers
):
"""All mechanical pass, advisory checks fail — overall still passes."""
class MechanicalPass:
async def check(self, snapshot, containers):
return CheckResult(name="mechanical", passed=True)
class NPCConsistencyCheck:
async def check(self, snapshot, containers):
return CheckResult(
name="npc_consistency", passed=False, advisory=True,
error="NPC miscalibrated"
)
class RealismReviewCheck:
async def check(self, snapshot, containers):
return CheckResult(
name="realism_review", passed=False, advisory=True,
error="briefing leakage"
)
checks = [MechanicalPass() for _ in range(8)]
checks.append(NPCConsistencyCheck())
checks.append(RealismReviewCheck())
gate = ValidatorGate(checks)
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is True
assert len(result.checks) == 10
# Advisory failures are recorded
assert result.checks[8].passed is False
assert result.checks[8].advisory is True
assert result.checks[9].passed is False
assert result.checks[9].advisory is True
@pytest.mark.asyncio
async def test_full_pipeline_mechanical_fail_skips_rest(
sample_snapshot_spec, mock_containers
):
"""Mechanical failure at check 3 stops pipeline — checks 4-10 never run."""
run_order = []
class TrackedPass:
def __init__(self, n):
self.n = n
async def check(self, snapshot, containers):
run_order.append(self.n)
return CheckResult(name=f"check_{self.n}", passed=True)
class TrackedFail:
def __init__(self, n):
self.n = n
async def check(self, snapshot, containers):
run_order.append(self.n)
return CheckResult(name=f"check_{self.n}", passed=False, error="fail")
checks = [TrackedPass(1), TrackedPass(2), TrackedFail(3)]
checks.extend([TrackedPass(i) for i in range(4, 9)])
# Checks 9 and 10 are advisory but never reached
checks.append(TrackedPass(9))
checks.append(TrackedPass(10))
gate = ValidatorGate(checks)
result = await gate.validate(sample_snapshot_spec, mock_containers)
assert result.passed is False
assert run_order == [1, 2, 3] # Only first 3 ran
assert len(result.checks) == 3