File size: 1,996 Bytes
80ef9e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Graph-native evidence sufficiency checks."""

from __future__ import annotations

from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs


class GraphEvidenceSufficiencyCheck:
    """Verify that the compiled world exposes enough evidence for key facts."""

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        compiled = compile_snapshot_graphs(snapshot)
        evidence_hosts = {
            _location_host(location)
            for location in compiled.evidence_locations
            if _location_host(location)
        }
        issues: list[str] = []

        if not compiled.evidence_locations:
            return CheckResult(
                name="graph_evidence_sufficiency",
                passed=False,
                error="snapshot has no evidence locations",
            )

        for vuln in snapshot.truth_graph.vulns:
            supporting_hosts = {vuln.host, "siem"}
            if not evidence_hosts.intersection(supporting_hosts):
                issues.append(
                    f"vuln '{vuln.id}' on host '{vuln.host}' has no supporting evidence host"
                )

        for flag in snapshot.flags:
            supporting_hosts = {flag.host, "siem"}
            if not evidence_hosts.intersection(supporting_hosts):
                issues.append(
                    f"flag '{flag.id}' on host '{flag.host}' has no supporting evidence host"
                )

        passed = len(issues) == 0
        return CheckResult(
            name="graph_evidence_sufficiency",
            passed=passed,
            details={
                "evidence_hosts": sorted(evidence_hosts),
                "issues": issues,
            },
            error="" if passed else "; ".join(issues),
        )


def _location_host(location: str) -> str:
    if ":" not in location:
        return "siem"
    return location.split(":", 1)[0].strip()