open-range / src /open_range /validator /path_solvability.py
Lars Talian
Tighten graph checks and trust-principal lint
5b3b677
"""Graph-native path solvability checks."""
from __future__ import annotations
from collections import defaultdict, deque
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import CompiledGraphs, compile_snapshot_graphs
class PathSolvabilityCheck:
"""Verify that vuln and flag hosts are reachable in the compiled host graph."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
compiled = compile_snapshot_graphs(snapshot)
issues: list[str] = []
if not compiled.hosts:
return CheckResult(
name="path_solvability",
passed=False,
error="snapshot has no compiled hosts",
)
start_hosts = _start_hosts(compiled)
vuln_hosts = {v.host for v in snapshot.truth_graph.vulns if v.host}
flag_hosts = {flag.host for flag in snapshot.flags if flag.host}
target_hosts = sorted(vuln_hosts.union(flag_hosts))
if not target_hosts:
return CheckResult(
name="path_solvability",
passed=False,
error="snapshot has no vuln or flag hosts to solve toward",
)
adjacency = build_host_adjacency(snapshot, compiled)
unreachable = [
host
for host in target_hosts
if not _reachable_from_any(host, start_hosts, adjacency)
]
if unreachable:
issues.append(f"unreachable target hosts from start set {sorted(start_hosts)}: {unreachable}")
for flag_host in sorted(flag_hosts):
if not (
flag_host in vuln_hosts
or _reachable_from_any(flag_host, vuln_hosts or start_hosts, adjacency)
):
issues.append(
f"flag host '{flag_host}' is not grounded by any vuln host or start host"
)
passed = len(issues) == 0
return CheckResult(
name="path_solvability",
passed=passed,
details={
"start_hosts": sorted(start_hosts),
"target_hosts": target_hosts,
"issues": issues,
},
error="" if passed else "; ".join(issues),
)
def _start_hosts(compiled: CompiledGraphs) -> set[str]:
starts = {
host
for host in compiled.hosts
if host in {"attacker", "internet"}
or compiled.zones_by_host.get(host) == "external"
}
if starts:
return starts
if compiled.hosts:
return {sorted(compiled.hosts)[0]}
return set()
def build_host_adjacency(
snapshot: SnapshotSpec,
compiled: CompiledGraphs,
) -> dict[str, set[str]]:
adjacency: dict[str, set[str]] = defaultdict(set)
for source, target in compiled.dependency_edges:
adjacency[source].add(target)
principal_hosts = _principal_hosts(snapshot)
for source_principal, target_principal, _edge_type in compiled.trust_edges:
source_hosts = principal_hosts.get(source_principal, set())
target_hosts = principal_hosts.get(target_principal, set())
for source_host in source_hosts:
for target_host in target_hosts:
if source_host and target_host:
adjacency[source_host].add(target_host)
return adjacency
def has_host_path(
start: str,
target: str,
adjacency: dict[str, set[str]],
) -> bool:
return _has_path(start, target, adjacency)
def _principal_hosts(snapshot: SnapshotSpec) -> dict[str, set[str]]:
topology = snapshot.topology or {}
mapping: dict[str, set[str]] = defaultdict(set)
raw_users = topology.get("users", [])
if isinstance(raw_users, list):
for raw in raw_users:
if not isinstance(raw, dict):
continue
username = str(raw.get("username", "")).strip()
if not username:
continue
for raw_host in raw.get("hosts", []):
host = str(raw_host).strip()
if host:
mapping[username].add(host)
raw_catalog = topology.get("principal_catalog", {})
if isinstance(raw_catalog, dict):
for raw_name, raw_principal in raw_catalog.items():
name = str(raw_name).strip()
if not name or not isinstance(raw_principal, dict):
continue
for raw_host in raw_principal.get("hosts", []):
host = str(raw_host).strip()
if host:
mapping[name].add(host)
return mapping
def _reachable_from_any(
target: str,
starts: set[str],
adjacency: dict[str, set[str]],
) -> bool:
for start in starts:
if start == target:
return True
if _has_path(start, target, adjacency):
return True
return False
def _has_path(start: str, target: str, adjacency: dict[str, set[str]]) -> bool:
queue: deque[str] = deque([start])
seen = {start}
while queue:
current = queue.popleft()
for neighbor in adjacency.get(current, set()):
if neighbor == target:
return True
if neighbor in seen:
continue
seen.add(neighbor)
queue.append(neighbor)
return False