open-range / src /open_range /validator /graph_consistency.py
Lars Talian
Implement canonical base graph and mutation policy
80ef9e0
"""Graph-level consistency checks for compiled snapshot state."""
from __future__ import annotations
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs
class GraphConsistencyCheck:
"""Verify internal consistency of the canonical graph views."""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
compiled = compile_snapshot_graphs(snapshot)
issues: list[str] = []
for source, target in compiled.dependency_edges:
if source not in compiled.hosts or target not in compiled.hosts:
issues.append(f"dependency edge '{source}->{target}' references unknown host")
for source, target, _edge_type in compiled.trust_edges:
if source not in compiled.principals or target not in compiled.principals:
issues.append(f"trust edge '{source}->{target}' references unknown principal")
lineage = snapshot.lineage
if lineage.generation_depth == 0 and lineage.parent_snapshot_id:
issues.append("root snapshot must not have parent_snapshot_id")
if lineage.generation_depth > 0 and not lineage.parent_snapshot_id:
issues.append("child snapshot missing parent_snapshot_id")
if snapshot.mutation_plan is not None:
if snapshot.mutation_plan.parent_snapshot_id != lineage.parent_snapshot_id:
issues.append("mutation plan parent does not match lineage parent")
for op in snapshot.mutation_plan.ops:
if op.op_type in {"add_service", "seed_vuln"}:
host = op.target_selector.get("host", "")
if host and host not in compiled.hosts:
issues.append(
f"mutation '{op.mutation_id}' targets unknown host '{host}'"
)
if op.op_type == "add_dependency_edge":
source = op.target_selector.get("source", "")
target = op.target_selector.get("target", "")
if source and source not in compiled.hosts:
issues.append(
f"mutation '{op.mutation_id}' source host '{source}' missing"
)
if target and target not in compiled.hosts:
issues.append(
f"mutation '{op.mutation_id}' target host '{target}' missing"
)
if op.op_type == "add_trust_edge":
source = op.target_selector.get("source", "")
target = op.target_selector.get("target", "")
if source and source not in compiled.principals:
issues.append(
f"mutation '{op.mutation_id}' source principal '{source}' missing"
)
if target and target not in compiled.principals:
issues.append(
f"mutation '{op.mutation_id}' target principal '{target}' missing"
)
passed = len(issues) == 0
return CheckResult(
name="graph_consistency",
passed=passed,
details={
"hosts": len(compiled.hosts),
"users": len(compiled.users),
"principals": len(compiled.principals),
"dependency_edges": len(compiled.dependency_edges),
"trust_edges": len(compiled.trust_edges),
},
error="" if passed else "; ".join(issues),
)