open-range / src /open_range /validator /graph_reward_grounding.py
Lars Talian
Tighten graph checks and trust-principal lint
5b3b677
"""Graph-native reward grounding checks."""
from __future__ import annotations
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs
from open_range.validator.path_solvability import build_host_adjacency, has_host_path
class GraphRewardGroundingCheck:
"""Verify rewards are grounded by graph facts before live checks run."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
compiled = compile_snapshot_graphs(snapshot)
issues: list[str] = []
if not snapshot.flags:
return CheckResult(
name="graph_reward_grounding",
passed=False,
error="snapshot has no flags to ground",
)
adjacency = build_host_adjacency(snapshot, compiled)
vuln_hosts = {v.host for v in snapshot.truth_graph.vulns if v.host}
for flag in snapshot.flags:
if flag.host not in compiled.hosts:
issues.append(f"flag '{flag.id}' references unknown host '{flag.host}'")
continue
if flag.host in vuln_hosts:
continue
if vuln_hosts and not any(
has_host_path(source, flag.host, adjacency) for source in vuln_hosts
):
issues.append(
f"flag '{flag.id}' on '{flag.host}' is not reachable from any vuln host"
)
passed = len(issues) == 0
return CheckResult(
name="graph_reward_grounding",
passed=passed,
details={"issues": issues},
error="" if passed else "; ".join(issues),
)