Lars Talian
Implement canonical base graph and mutation policy
80ef9e0
"""Compile SnapshotSpec into lightweight canonical graph views.
These helpers intentionally stay small and dependency-free. The validator uses
them to reason about host membership, dependency edges, trust edges, evidence
locations, and mutation targets before any live container checks run.
"""
from __future__ import annotations
from dataclasses import dataclass
from open_range.protocols import SnapshotSpec
@dataclass(frozen=True, slots=True)
class CompiledGraphs:
"""Canonical graph-like views derived from a snapshot."""
hosts: frozenset[str]
users: frozenset[str]
principals: frozenset[str]
zones_by_host: dict[str, str]
services_by_host: dict[str, frozenset[str]]
dependency_edges: frozenset[tuple[str, str]]
trust_edges: frozenset[tuple[str, str, str]]
vuln_ids: frozenset[str]
evidence_locations: frozenset[str]
def compile_snapshot_graphs(snapshot: SnapshotSpec) -> CompiledGraphs:
"""Compile a snapshot into canonical graph views."""
topology = snapshot.topology or {}
hosts = _compile_hosts(topology)
users = _compile_users(topology)
principals = _compile_principals(topology, users)
zones_by_host = _compile_zones(topology, hosts)
services_by_host = _compile_services(topology, hosts)
dependency_edges = _compile_dependency_edges(topology)
trust_edges = _compile_trust_edges(topology)
vuln_ids = frozenset(v.id for v in snapshot.truth_graph.vulns if v.id)
evidence_locations = frozenset(item.location for item in snapshot.evidence_spec if item.location)
return CompiledGraphs(
hosts=hosts,
users=users,
principals=principals,
zones_by_host=zones_by_host,
services_by_host=services_by_host,
dependency_edges=dependency_edges,
trust_edges=trust_edges,
vuln_ids=vuln_ids,
evidence_locations=evidence_locations,
)
def _compile_hosts(topology: dict[str, object]) -> frozenset[str]:
raw_hosts = topology.get("hosts", [])
hosts: set[str] = set()
for raw in raw_hosts if isinstance(raw_hosts, list) else []:
if isinstance(raw, dict):
name = str(raw.get("name", "")).strip()
if name:
hosts.add(name)
else:
name = str(raw).strip()
if name:
hosts.add(name)
return frozenset(hosts)
def _compile_users(topology: dict[str, object]) -> frozenset[str]:
raw_users = topology.get("users", [])
users: set[str] = set()
for raw in raw_users if isinstance(raw_users, list) else []:
if not isinstance(raw, dict):
continue
username = str(raw.get("username", "")).strip()
if username:
users.add(username)
return frozenset(users)
def _compile_services(
topology: dict[str, object],
hosts: frozenset[str],
) -> dict[str, frozenset[str]]:
host_details = topology.get("host_details", {})
host_catalog = topology.get("host_catalog", {})
compiled: dict[str, frozenset[str]] = {}
for host in hosts:
detail = {}
if isinstance(host_details, dict):
raw_detail = host_details.get(host, {})
if isinstance(raw_detail, dict):
detail = raw_detail
if not detail and isinstance(host_catalog, dict):
raw_catalog = host_catalog.get(host, {})
if isinstance(raw_catalog, dict):
detail = raw_catalog
services = detail.get("services", [])
if not isinstance(services, list):
services = []
compiled[host] = frozenset(str(service) for service in services if service)
return compiled
def _compile_dependency_edges(topology: dict[str, object]) -> frozenset[tuple[str, str]]:
raw_edges = topology.get("dependency_edges", [])
edges: set[tuple[str, str]] = set()
for raw in raw_edges if isinstance(raw_edges, list) else []:
if not isinstance(raw, dict):
continue
source = str(raw.get("source", "")).strip()
target = str(raw.get("target", "")).strip()
if source and target:
edges.add((source, target))
if edges:
return frozenset(edges)
host_details = topology.get("host_details", {})
if isinstance(host_details, dict):
for source, raw_detail in host_details.items():
if not isinstance(raw_detail, dict):
continue
raw_targets = raw_detail.get("connects_to", [])
if not isinstance(raw_targets, list):
continue
for raw_target in raw_targets:
target = str(raw_target).strip()
if source and target:
edges.add((str(source).strip(), target))
return frozenset(edges)
def _compile_trust_edges(topology: dict[str, object]) -> frozenset[tuple[str, str, str]]:
raw_edges = topology.get("trust_edges", [])
edges: set[tuple[str, str, str]] = set()
for raw in raw_edges if isinstance(raw_edges, list) else []:
if not isinstance(raw, dict):
continue
source = str(raw.get("source", "")).strip()
target = str(raw.get("target", "")).strip()
edge_type = str(raw.get("type", "")).strip()
if source and target:
edges.add((source, target, edge_type))
return frozenset(edges)
def _compile_principals(
topology: dict[str, object],
users: frozenset[str],
) -> frozenset[str]:
principals = set(users)
raw_catalog = topology.get("principal_catalog", {})
if isinstance(raw_catalog, dict):
for raw_name in raw_catalog:
name = str(raw_name).strip()
if name:
principals.add(name)
return frozenset(principals)
def _compile_zones(
topology: dict[str, object],
hosts: frozenset[str],
) -> dict[str, str]:
zones_by_host: dict[str, str] = {}
raw_zones = topology.get("zones", {})
if isinstance(raw_zones, dict):
for raw_zone, raw_hosts in raw_zones.items():
zone = str(raw_zone).strip()
if not zone or not isinstance(raw_hosts, list):
continue
for raw_host in raw_hosts:
host = str(raw_host).strip()
if host:
zones_by_host[host] = zone
host_details = topology.get("host_details", {})
if isinstance(host_details, dict):
for raw_host, raw_detail in host_details.items():
host = str(raw_host).strip()
if not host or host not in hosts or not isinstance(raw_detail, dict):
continue
zone = str(raw_detail.get("zone", "")).strip()
if zone and host not in zones_by_host:
zones_by_host[host] = zone
return zones_by_host