| """ |
| Graph manager for infrastructure topology. |
| Uses Neo4j if credentials provided, otherwise falls back to NetworkX. |
| """ |
| import logging |
| import os |
| from typing import Dict, Optional, List |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class InfraGraph: |
| def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None): |
| self.neo4j_driver = None |
| if uri and user and password: |
| try: |
| from neo4j import GraphDatabase |
| self.neo4j_driver = GraphDatabase.driver(uri, auth=(user, password)) |
| |
| with self.neo4j_driver.session() as session: |
| session.run("RETURN 1") |
| logger.info("Connected to Neo4j") |
| except Exception as e: |
| logger.warning(f"Neo4j connection failed, using NetworkX fallback: {e}") |
| |
| |
| import networkx as nx |
| self.nx_graph = nx.DiGraph() |
| |
| def update_from_state(self, components: Dict): |
| """Populate graph from simulator state.""" |
| self.nx_graph.clear() |
| for cid, props in components.items(): |
| self.nx_graph.add_node(cid, **props) |
| for conn in props.get("connections", []): |
| self.nx_graph.add_edge(cid, conn, relation="connects_to") |
| if "runs_on" in props: |
| self.nx_graph.add_edge(cid, props["runs_on"], relation="runs_on") |
| |
| if self.neo4j_driver: |
| self._update_neo4j(components) |
| |
| def _update_neo4j(self, components): |
| """Sync with Neo4j (Cypher queries).""" |
| with self.neo4j_driver.session() as session: |
| |
| session.run("MATCH (n) DETACH DELETE n") |
| |
| for cid, props in components.items(): |
| session.run( |
| f"CREATE (:{props['type']} {{id: $id, status: $status}})", |
| id=cid, status=props['status'] |
| ) |
| |
| for cid, props in components.items(): |
| for conn in props.get("connections", []): |
| session.run( |
| "MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:CONNECTED_TO]->(b)", |
| aid=cid, bid=conn |
| ) |
| if "runs_on" in props: |
| session.run( |
| "MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:RUNS_ON]->(b)", |
| aid=cid, bid=props["runs_on"] |
| ) |
| |
| def get_failing_components(self) -> List[str]: |
| """Return list of components with status != 'up'.""" |
| failing = [] |
| for node, data in self.nx_graph.nodes(data=True): |
| if data.get("status") != "up": |
| failing.append(node) |
| return failing |
| |
| def to_networkx(self): |
| """Return the internal NetworkX graph.""" |
| return self.nx_graph |