Spaces:
Runtime error
Runtime error
File size: 1,996 Bytes
80ef9e0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | """Graph-native evidence sufficiency checks."""
from __future__ import annotations
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs
class GraphEvidenceSufficiencyCheck:
"""Verify that the compiled world exposes enough evidence for key facts."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
compiled = compile_snapshot_graphs(snapshot)
evidence_hosts = {
_location_host(location)
for location in compiled.evidence_locations
if _location_host(location)
}
issues: list[str] = []
if not compiled.evidence_locations:
return CheckResult(
name="graph_evidence_sufficiency",
passed=False,
error="snapshot has no evidence locations",
)
for vuln in snapshot.truth_graph.vulns:
supporting_hosts = {vuln.host, "siem"}
if not evidence_hosts.intersection(supporting_hosts):
issues.append(
f"vuln '{vuln.id}' on host '{vuln.host}' has no supporting evidence host"
)
for flag in snapshot.flags:
supporting_hosts = {flag.host, "siem"}
if not evidence_hosts.intersection(supporting_hosts):
issues.append(
f"flag '{flag.id}' on host '{flag.host}' has no supporting evidence host"
)
passed = len(issues) == 0
return CheckResult(
name="graph_evidence_sufficiency",
passed=passed,
details={
"evidence_hosts": sorted(evidence_hosts),
"issues": issues,
},
error="" if passed else "; ".join(issues),
)
def _location_host(location: str) -> str:
if ":" not in location:
return "siem"
return location.split(":", 1)[0].strip()
|