Spaces:
Runtime error
Runtime error
| """Graph-native path solvability checks.""" | |
| from __future__ import annotations | |
| from collections import defaultdict, deque | |
| from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec | |
| from open_range.validator.graphs import CompiledGraphs, compile_snapshot_graphs | |
| class PathSolvabilityCheck: | |
| """Verify that vuln and flag hosts are reachable in the compiled host graph.""" | |
| async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: | |
| compiled = compile_snapshot_graphs(snapshot) | |
| issues: list[str] = [] | |
| if not compiled.hosts: | |
| return CheckResult( | |
| name="path_solvability", | |
| passed=False, | |
| error="snapshot has no compiled hosts", | |
| ) | |
| start_hosts = _start_hosts(compiled) | |
| vuln_hosts = {v.host for v in snapshot.truth_graph.vulns if v.host} | |
| flag_hosts = {flag.host for flag in snapshot.flags if flag.host} | |
| target_hosts = sorted(vuln_hosts.union(flag_hosts)) | |
| if not target_hosts: | |
| return CheckResult( | |
| name="path_solvability", | |
| passed=False, | |
| error="snapshot has no vuln or flag hosts to solve toward", | |
| ) | |
| adjacency = build_host_adjacency(snapshot, compiled) | |
| unreachable = [ | |
| host | |
| for host in target_hosts | |
| if not _reachable_from_any(host, start_hosts, adjacency) | |
| ] | |
| if unreachable: | |
| issues.append(f"unreachable target hosts from start set {sorted(start_hosts)}: {unreachable}") | |
| for flag_host in sorted(flag_hosts): | |
| if not ( | |
| flag_host in vuln_hosts | |
| or _reachable_from_any(flag_host, vuln_hosts or start_hosts, adjacency) | |
| ): | |
| issues.append( | |
| f"flag host '{flag_host}' is not grounded by any vuln host or start host" | |
| ) | |
| passed = len(issues) == 0 | |
| return CheckResult( | |
| name="path_solvability", | |
| passed=passed, | |
| details={ | |
| "start_hosts": sorted(start_hosts), | |
| "target_hosts": target_hosts, | |
| "issues": issues, | |
| }, | |
| error="" if passed else "; ".join(issues), | |
| ) | |
| def _start_hosts(compiled: CompiledGraphs) -> set[str]: | |
| starts = { | |
| host | |
| for host in compiled.hosts | |
| if host in {"attacker", "internet"} | |
| or compiled.zones_by_host.get(host) == "external" | |
| } | |
| if starts: | |
| return starts | |
| if compiled.hosts: | |
| return {sorted(compiled.hosts)[0]} | |
| return set() | |
| def build_host_adjacency( | |
| snapshot: SnapshotSpec, | |
| compiled: CompiledGraphs, | |
| ) -> dict[str, set[str]]: | |
| adjacency: dict[str, set[str]] = defaultdict(set) | |
| for source, target in compiled.dependency_edges: | |
| adjacency[source].add(target) | |
| principal_hosts = _principal_hosts(snapshot) | |
| for source_principal, target_principal, _edge_type in compiled.trust_edges: | |
| source_hosts = principal_hosts.get(source_principal, set()) | |
| target_hosts = principal_hosts.get(target_principal, set()) | |
| for source_host in source_hosts: | |
| for target_host in target_hosts: | |
| if source_host and target_host: | |
| adjacency[source_host].add(target_host) | |
| return adjacency | |
| def has_host_path( | |
| start: str, | |
| target: str, | |
| adjacency: dict[str, set[str]], | |
| ) -> bool: | |
| return _has_path(start, target, adjacency) | |
| def _principal_hosts(snapshot: SnapshotSpec) -> dict[str, set[str]]: | |
| topology = snapshot.topology or {} | |
| mapping: dict[str, set[str]] = defaultdict(set) | |
| raw_users = topology.get("users", []) | |
| if isinstance(raw_users, list): | |
| for raw in raw_users: | |
| if not isinstance(raw, dict): | |
| continue | |
| username = str(raw.get("username", "")).strip() | |
| if not username: | |
| continue | |
| for raw_host in raw.get("hosts", []): | |
| host = str(raw_host).strip() | |
| if host: | |
| mapping[username].add(host) | |
| raw_catalog = topology.get("principal_catalog", {}) | |
| if isinstance(raw_catalog, dict): | |
| for raw_name, raw_principal in raw_catalog.items(): | |
| name = str(raw_name).strip() | |
| if not name or not isinstance(raw_principal, dict): | |
| continue | |
| for raw_host in raw_principal.get("hosts", []): | |
| host = str(raw_host).strip() | |
| if host: | |
| mapping[name].add(host) | |
| return mapping | |
| def _reachable_from_any( | |
| target: str, | |
| starts: set[str], | |
| adjacency: dict[str, set[str]], | |
| ) -> bool: | |
| for start in starts: | |
| if start == target: | |
| return True | |
| if _has_path(start, target, adjacency): | |
| return True | |
| return False | |
| def _has_path(start: str, target: str, adjacency: dict[str, set[str]]) -> bool: | |
| queue: deque[str] = deque([start]) | |
| seen = {start} | |
| while queue: | |
| current = queue.popleft() | |
| for neighbor in adjacency.get(current, set()): | |
| if neighbor == target: | |
| return True | |
| if neighbor in seen: | |
| continue | |
| seen.add(neighbor) | |
| queue.append(neighbor) | |
| return False | |