"""Graph-level consistency checks for compiled snapshot state.""" from __future__ import annotations from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec from open_range.validator.graphs import compile_snapshot_graphs class GraphConsistencyCheck: """Verify internal consistency of the canonical graph views.""" async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: compiled = compile_snapshot_graphs(snapshot) issues: list[str] = [] for source, target in compiled.dependency_edges: if source not in compiled.hosts or target not in compiled.hosts: issues.append(f"dependency edge '{source}->{target}' references unknown host") for source, target, _edge_type in compiled.trust_edges: if source not in compiled.principals or target not in compiled.principals: issues.append(f"trust edge '{source}->{target}' references unknown principal") lineage = snapshot.lineage if lineage.generation_depth == 0 and lineage.parent_snapshot_id: issues.append("root snapshot must not have parent_snapshot_id") if lineage.generation_depth > 0 and not lineage.parent_snapshot_id: issues.append("child snapshot missing parent_snapshot_id") if snapshot.mutation_plan is not None: if snapshot.mutation_plan.parent_snapshot_id != lineage.parent_snapshot_id: issues.append("mutation plan parent does not match lineage parent") for op in snapshot.mutation_plan.ops: if op.op_type in {"add_service", "seed_vuln"}: host = op.target_selector.get("host", "") if host and host not in compiled.hosts: issues.append( f"mutation '{op.mutation_id}' targets unknown host '{host}'" ) if op.op_type == "add_dependency_edge": source = op.target_selector.get("source", "") target = op.target_selector.get("target", "") if source and source not in compiled.hosts: issues.append( f"mutation '{op.mutation_id}' source host '{source}' missing" ) if target and target not in compiled.hosts: issues.append( f"mutation '{op.mutation_id}' target host '{target}' missing" ) if op.op_type == "add_trust_edge": source = op.target_selector.get("source", "") target = op.target_selector.get("target", "") if source and source not in compiled.principals: issues.append( f"mutation '{op.mutation_id}' source principal '{source}' missing" ) if target and target not in compiled.principals: issues.append( f"mutation '{op.mutation_id}' target principal '{target}' missing" ) passed = len(issues) == 0 return CheckResult( name="graph_consistency", passed=passed, details={ "hosts": len(compiled.hosts), "users": len(compiled.users), "principals": len(compiled.principals), "dependency_edges": len(compiled.dependency_edges), "trust_edges": len(compiled.trust_edges), }, error="" if passed else "; ".join(issues), )