Spaces:
Runtime error
Runtime error
| """Graph-native evidence sufficiency checks.""" | |
| from __future__ import annotations | |
| from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec | |
| from open_range.validator.graphs import compile_snapshot_graphs | |
| class GraphEvidenceSufficiencyCheck: | |
| """Verify that the compiled world exposes enough evidence for key facts.""" | |
| async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: | |
| compiled = compile_snapshot_graphs(snapshot) | |
| evidence_hosts = { | |
| _location_host(location) | |
| for location in compiled.evidence_locations | |
| if _location_host(location) | |
| } | |
| issues: list[str] = [] | |
| if not compiled.evidence_locations: | |
| return CheckResult( | |
| name="graph_evidence_sufficiency", | |
| passed=False, | |
| error="snapshot has no evidence locations", | |
| ) | |
| for vuln in snapshot.truth_graph.vulns: | |
| supporting_hosts = {vuln.host, "siem"} | |
| if not evidence_hosts.intersection(supporting_hosts): | |
| issues.append( | |
| f"vuln '{vuln.id}' on host '{vuln.host}' has no supporting evidence host" | |
| ) | |
| for flag in snapshot.flags: | |
| supporting_hosts = {flag.host, "siem"} | |
| if not evidence_hosts.intersection(supporting_hosts): | |
| issues.append( | |
| f"flag '{flag.id}' on host '{flag.host}' has no supporting evidence host" | |
| ) | |
| passed = len(issues) == 0 | |
| return CheckResult( | |
| name="graph_evidence_sufficiency", | |
| passed=passed, | |
| details={ | |
| "evidence_hosts": sorted(evidence_hosts), | |
| "issues": issues, | |
| }, | |
| error="" if passed else "; ".join(issues), | |
| ) | |
| def _location_host(location: str) -> str: | |
| if ":" not in location: | |
| return "siem" | |
| return location.split(":", 1)[0].strip() | |