File size: 3,049 Bytes
34d6eec a23d652 34d6eec a23d652 34d6eec a23d652 34d6eec a23d652 34d6eec a23d652 34d6eec a23d652 34d6eec a23d652 34d6eec a23d652 34d6eec | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | """
Graph manager for infrastructure topology.
Uses Neo4j if credentials provided, otherwise falls back to NetworkX.
"""
import logging
import os
from typing import Dict, Optional, List
logger = logging.getLogger(__name__)
class InfraGraph:
def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None):
self.neo4j_driver = None
if uri and user and password:
try:
from neo4j import GraphDatabase
self.neo4j_driver = GraphDatabase.driver(uri, auth=(user, password))
# Verify connection
with self.neo4j_driver.session() as session:
session.run("RETURN 1")
logger.info("Connected to Neo4j")
except Exception as e:
logger.warning(f"Neo4j connection failed, using NetworkX fallback: {e}")
# Always create NetworkX graph as fallback
import networkx as nx
self.nx_graph = nx.DiGraph()
def update_from_state(self, components: Dict):
"""Populate graph from simulator state."""
self.nx_graph.clear()
for cid, props in components.items():
self.nx_graph.add_node(cid, **props)
for conn in props.get("connections", []):
self.nx_graph.add_edge(cid, conn, relation="connects_to")
if "runs_on" in props:
self.nx_graph.add_edge(cid, props["runs_on"], relation="runs_on")
if self.neo4j_driver:
self._update_neo4j(components)
def _update_neo4j(self, components):
"""Sync with Neo4j (Cypher queries)."""
with self.neo4j_driver.session() as session:
# Clear existing graph
session.run("MATCH (n) DETACH DELETE n")
# Create nodes
for cid, props in components.items():
session.run(
f"CREATE (:{props['type']} {{id: $id, status: $status}})",
id=cid, status=props['status']
)
# Create relationships
for cid, props in components.items():
for conn in props.get("connections", []):
session.run(
"MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:CONNECTED_TO]->(b)",
aid=cid, bid=conn
)
if "runs_on" in props:
session.run(
"MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:RUNS_ON]->(b)",
aid=cid, bid=props["runs_on"]
)
def get_failing_components(self) -> List[str]:
"""Return list of components with status != 'up'."""
failing = []
for node, data in self.nx_graph.nodes(data=True):
if data.get("status") != "up":
failing.append(node)
return failing
def to_networkx(self):
"""Return the internal NetworkX graph."""
return self.nx_graph |