""" Graph manager for infrastructure topology. Uses Neo4j if credentials provided, otherwise falls back to NetworkX. """ import logging import os from typing import Dict, Optional, List logger = logging.getLogger(__name__) class InfraGraph: def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None): self.neo4j_driver = None if uri and user and password: try: from neo4j import GraphDatabase self.neo4j_driver = GraphDatabase.driver(uri, auth=(user, password)) # Verify connection with self.neo4j_driver.session() as session: session.run("RETURN 1") logger.info("Connected to Neo4j") except Exception as e: logger.warning(f"Neo4j connection failed, using NetworkX fallback: {e}") # Always create NetworkX graph as fallback import networkx as nx self.nx_graph = nx.DiGraph() def update_from_state(self, components: Dict): """Populate graph from simulator state.""" self.nx_graph.clear() for cid, props in components.items(): self.nx_graph.add_node(cid, **props) for conn in props.get("connections", []): self.nx_graph.add_edge(cid, conn, relation="connects_to") if "runs_on" in props: self.nx_graph.add_edge(cid, props["runs_on"], relation="runs_on") if self.neo4j_driver: self._update_neo4j(components) def _update_neo4j(self, components): """Sync with Neo4j (Cypher queries).""" with self.neo4j_driver.session() as session: # Clear existing graph session.run("MATCH (n) DETACH DELETE n") # Create nodes for cid, props in components.items(): session.run( f"CREATE (:{props['type']} {{id: $id, status: $status}})", id=cid, status=props['status'] ) # Create relationships for cid, props in components.items(): for conn in props.get("connections", []): session.run( "MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:CONNECTED_TO]->(b)", aid=cid, bid=conn ) if "runs_on" in props: session.run( "MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:RUNS_ON]->(b)", aid=cid, bid=props["runs_on"] ) def get_failing_components(self) -> List[str]: """Return list of components with status != 'up'.""" failing = [] for node, data in self.nx_graph.nodes(data=True): if data.get("status") != "up": failing.append(node) return failing def to_networkx(self): """Return the internal NetworkX graph.""" return self.nx_graph