Spaces:
Runtime error
Runtime error
File size: 6,084 Bytes
8c486a8 ca6fc4d 8c486a8 ca6fc4d 8c486a8 7fedc25 8c486a8 7fedc25 8c486a8 7fedc25 8c486a8 7fedc25 8c486a8 ca6fc4d 8c486a8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | """Check 7: Task feasibility — golden path references real hosts, evidence targets exist."""
from __future__ import annotations
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs
from open_range.validator.path_solvability import build_host_adjacency, has_host_path
class TaskFeasibilityCheck:
"""Verify:
1. Every golden-path step references a host that exists in the topology.
2. Every evidence_spec item references a container that exists.
3. Red's exploit chain references vulns that exist in truth_graph.
"""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
issues: list[str] = []
compiled = compile_snapshot_graphs(snapshot)
adjacency = build_host_adjacency(snapshot, compiled)
# Collect known host names from topology.
topo_hosts: set[str] = set()
raw_hosts = snapshot.topology.get("hosts", [])
for h in raw_hosts:
if isinstance(h, dict):
topo_hosts.add(h.get("name", ""))
else:
topo_hosts.add(str(h))
topo_hosts.discard("")
# Fail early if topology has no hosts.
if not topo_hosts:
return CheckResult(
name="task_feasibility",
passed=False,
details={"issues": ["Topology has no hosts defined"]},
error="Topology has no hosts defined",
)
# 1. Golden-path hosts exist in topology.
for step in snapshot.golden_path:
host = getattr(step, "host", None) or "attacker"
if host not in topo_hosts:
issues.append(f"golden path step {step.step}: host '{host}' not in topology")
# 2. Evidence targets reference existing containers.
for item in snapshot.evidence_spec:
loc = item.location
if ":" in loc:
host = loc.split(":")[0]
else:
host = "siem"
if host not in topo_hosts:
issues.append(f"evidence item '{item.type}' references unknown host '{host}'")
# 3. Exploit chain vuln IDs exist in truth_graph.
vuln_ids = {v.id for v in snapshot.truth_graph.vulns}
for step in snapshot.truth_graph.exploit_chain:
if step.vuln_id and step.vuln_id not in vuln_ids:
issues.append(f"exploit chain references unknown vuln '{step.vuln_id}'")
# 4. Flag hosts exist in topology.
for flag in snapshot.flags:
if flag.host not in topo_hosts:
issues.append(f"flag '{flag.id}' references unknown host '{flag.host}'")
vuln_by_id = {v.id: v for v in snapshot.truth_graph.vulns}
flag_by_value = {flag.value: flag for flag in snapshot.flags}
submit_steps = {
step.command.removeprefix("submit_flag ").strip(): step
for step in snapshot.golden_path
if step.command.strip().startswith("submit_flag ")
}
exploit_steps = {step.vuln_id: step for step in snapshot.truth_graph.exploit_chain}
plan = snapshot.mutation_plan
if plan is not None:
for op in plan.ops:
if op.op_type != "seed_vuln":
continue
vuln_id = str(op.params.get("instantiated_vuln_id", "")).strip()
flag_value = str(op.params.get("instantiated_flag_value", "")).strip()
exploit_command = str(op.params.get("instantiated_exploit_command", "")).strip()
flag_host = str(op.params.get("instantiated_flag_host", "")).strip()
vuln = vuln_by_id.get(vuln_id)
if vuln is None:
issues.append(
f"seed_vuln mutation '{op.mutation_id}' did not materialize a vuln id"
)
continue
flag = flag_by_value.get(flag_value)
if flag is None:
issues.append(
f"seed_vuln mutation '{op.mutation_id}' did not materialize a flag"
)
continue
if flag_host and flag.host != flag_host:
issues.append(
f"seed_vuln mutation '{op.mutation_id}' flag host mismatch "
f"('{flag.host}' != '{flag_host}')"
)
if flag_value not in submit_steps:
issues.append(
f"seed_vuln mutation '{op.mutation_id}' missing submit_flag step for "
f"'{flag_value}'"
)
exploit = exploit_steps.get(vuln_id)
if exploit is None:
issues.append(
f"seed_vuln mutation '{op.mutation_id}' missing exploit_chain linkage"
)
elif exploit_command and exploit.command != exploit_command:
issues.append(
f"seed_vuln mutation '{op.mutation_id}' exploit command mismatch"
)
if exploit_command and not any(
exploit_command in step.command for step in snapshot.golden_path
):
issues.append(
f"seed_vuln mutation '{op.mutation_id}' missing golden-path exploit step"
)
if flag.host != vuln.host and not has_host_path(vuln.host, flag.host, adjacency):
issues.append(
f"seed_vuln mutation '{op.mutation_id}' flag host '{flag.host}' "
f"is unreachable from vuln host '{vuln.host}'"
)
passed = len(issues) == 0
return CheckResult(
name="task_feasibility",
passed=passed,
details={"issues": issues},
error="" if passed else f"{len(issues)} feasibility issue(s)",
)
|