File size: 1,727 Bytes
80ef9e0
 
 
 
 
 
5b3b677
80ef9e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b3b677
80ef9e0
 
 
 
 
 
 
 
 
5b3b677
 
 
80ef9e0
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"""Graph-native reward grounding checks."""

from __future__ import annotations

from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs
from open_range.validator.path_solvability import build_host_adjacency, has_host_path


class GraphRewardGroundingCheck:
    """Verify rewards are grounded by graph facts before live checks run."""

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        compiled = compile_snapshot_graphs(snapshot)
        issues: list[str] = []

        if not snapshot.flags:
            return CheckResult(
                name="graph_reward_grounding",
                passed=False,
                error="snapshot has no flags to ground",
            )

        adjacency = build_host_adjacency(snapshot, compiled)
        vuln_hosts = {v.host for v in snapshot.truth_graph.vulns if v.host}
        for flag in snapshot.flags:
            if flag.host not in compiled.hosts:
                issues.append(f"flag '{flag.id}' references unknown host '{flag.host}'")
                continue

            if flag.host in vuln_hosts:
                continue

            if vuln_hosts and not any(
                has_host_path(source, flag.host, adjacency) for source in vuln_hosts
            ):
                issues.append(
                    f"flag '{flag.id}' on '{flag.host}' is not reachable from any vuln host"
                )

        passed = len(issues) == 0
        return CheckResult(
            name="graph_reward_grounding",
            passed=passed,
            details={"issues": issues},
            error="" if passed else "; ".join(issues),
        )