Spaces:
Runtime error
Runtime error
| """Compile SnapshotSpec into lightweight canonical graph views. | |
| These helpers intentionally stay small and dependency-free. The validator uses | |
| them to reason about host membership, dependency edges, trust edges, evidence | |
| locations, and mutation targets before any live container checks run. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from open_range.protocols import SnapshotSpec | |
| class CompiledGraphs: | |
| """Canonical graph-like views derived from a snapshot.""" | |
| hosts: frozenset[str] | |
| users: frozenset[str] | |
| principals: frozenset[str] | |
| zones_by_host: dict[str, str] | |
| services_by_host: dict[str, frozenset[str]] | |
| dependency_edges: frozenset[tuple[str, str]] | |
| trust_edges: frozenset[tuple[str, str, str]] | |
| vuln_ids: frozenset[str] | |
| evidence_locations: frozenset[str] | |
| def compile_snapshot_graphs(snapshot: SnapshotSpec) -> CompiledGraphs: | |
| """Compile a snapshot into canonical graph views.""" | |
| topology = snapshot.topology or {} | |
| hosts = _compile_hosts(topology) | |
| users = _compile_users(topology) | |
| principals = _compile_principals(topology, users) | |
| zones_by_host = _compile_zones(topology, hosts) | |
| services_by_host = _compile_services(topology, hosts) | |
| dependency_edges = _compile_dependency_edges(topology) | |
| trust_edges = _compile_trust_edges(topology) | |
| vuln_ids = frozenset(v.id for v in snapshot.truth_graph.vulns if v.id) | |
| evidence_locations = frozenset(item.location for item in snapshot.evidence_spec if item.location) | |
| return CompiledGraphs( | |
| hosts=hosts, | |
| users=users, | |
| principals=principals, | |
| zones_by_host=zones_by_host, | |
| services_by_host=services_by_host, | |
| dependency_edges=dependency_edges, | |
| trust_edges=trust_edges, | |
| vuln_ids=vuln_ids, | |
| evidence_locations=evidence_locations, | |
| ) | |
| def _compile_hosts(topology: dict[str, object]) -> frozenset[str]: | |
| raw_hosts = topology.get("hosts", []) | |
| hosts: set[str] = set() | |
| for raw in raw_hosts if isinstance(raw_hosts, list) else []: | |
| if isinstance(raw, dict): | |
| name = str(raw.get("name", "")).strip() | |
| if name: | |
| hosts.add(name) | |
| else: | |
| name = str(raw).strip() | |
| if name: | |
| hosts.add(name) | |
| return frozenset(hosts) | |
| def _compile_users(topology: dict[str, object]) -> frozenset[str]: | |
| raw_users = topology.get("users", []) | |
| users: set[str] = set() | |
| for raw in raw_users if isinstance(raw_users, list) else []: | |
| if not isinstance(raw, dict): | |
| continue | |
| username = str(raw.get("username", "")).strip() | |
| if username: | |
| users.add(username) | |
| return frozenset(users) | |
| def _compile_services( | |
| topology: dict[str, object], | |
| hosts: frozenset[str], | |
| ) -> dict[str, frozenset[str]]: | |
| host_details = topology.get("host_details", {}) | |
| host_catalog = topology.get("host_catalog", {}) | |
| compiled: dict[str, frozenset[str]] = {} | |
| for host in hosts: | |
| detail = {} | |
| if isinstance(host_details, dict): | |
| raw_detail = host_details.get(host, {}) | |
| if isinstance(raw_detail, dict): | |
| detail = raw_detail | |
| if not detail and isinstance(host_catalog, dict): | |
| raw_catalog = host_catalog.get(host, {}) | |
| if isinstance(raw_catalog, dict): | |
| detail = raw_catalog | |
| services = detail.get("services", []) | |
| if not isinstance(services, list): | |
| services = [] | |
| compiled[host] = frozenset(str(service) for service in services if service) | |
| return compiled | |
| def _compile_dependency_edges(topology: dict[str, object]) -> frozenset[tuple[str, str]]: | |
| raw_edges = topology.get("dependency_edges", []) | |
| edges: set[tuple[str, str]] = set() | |
| for raw in raw_edges if isinstance(raw_edges, list) else []: | |
| if not isinstance(raw, dict): | |
| continue | |
| source = str(raw.get("source", "")).strip() | |
| target = str(raw.get("target", "")).strip() | |
| if source and target: | |
| edges.add((source, target)) | |
| if edges: | |
| return frozenset(edges) | |
| host_details = topology.get("host_details", {}) | |
| if isinstance(host_details, dict): | |
| for source, raw_detail in host_details.items(): | |
| if not isinstance(raw_detail, dict): | |
| continue | |
| raw_targets = raw_detail.get("connects_to", []) | |
| if not isinstance(raw_targets, list): | |
| continue | |
| for raw_target in raw_targets: | |
| target = str(raw_target).strip() | |
| if source and target: | |
| edges.add((str(source).strip(), target)) | |
| return frozenset(edges) | |
| def _compile_trust_edges(topology: dict[str, object]) -> frozenset[tuple[str, str, str]]: | |
| raw_edges = topology.get("trust_edges", []) | |
| edges: set[tuple[str, str, str]] = set() | |
| for raw in raw_edges if isinstance(raw_edges, list) else []: | |
| if not isinstance(raw, dict): | |
| continue | |
| source = str(raw.get("source", "")).strip() | |
| target = str(raw.get("target", "")).strip() | |
| edge_type = str(raw.get("type", "")).strip() | |
| if source and target: | |
| edges.add((source, target, edge_type)) | |
| return frozenset(edges) | |
| def _compile_principals( | |
| topology: dict[str, object], | |
| users: frozenset[str], | |
| ) -> frozenset[str]: | |
| principals = set(users) | |
| raw_catalog = topology.get("principal_catalog", {}) | |
| if isinstance(raw_catalog, dict): | |
| for raw_name in raw_catalog: | |
| name = str(raw_name).strip() | |
| if name: | |
| principals.add(name) | |
| return frozenset(principals) | |
| def _compile_zones( | |
| topology: dict[str, object], | |
| hosts: frozenset[str], | |
| ) -> dict[str, str]: | |
| zones_by_host: dict[str, str] = {} | |
| raw_zones = topology.get("zones", {}) | |
| if isinstance(raw_zones, dict): | |
| for raw_zone, raw_hosts in raw_zones.items(): | |
| zone = str(raw_zone).strip() | |
| if not zone or not isinstance(raw_hosts, list): | |
| continue | |
| for raw_host in raw_hosts: | |
| host = str(raw_host).strip() | |
| if host: | |
| zones_by_host[host] = zone | |
| host_details = topology.get("host_details", {}) | |
| if isinstance(host_details, dict): | |
| for raw_host, raw_detail in host_details.items(): | |
| host = str(raw_host).strip() | |
| if not host or host not in hosts or not isinstance(raw_detail, dict): | |
| continue | |
| zone = str(raw_detail.get("zone", "")).strip() | |
| if zone and host not in zones_by_host: | |
| zones_by_host[host] = zone | |
| return zones_by_host | |