petter2025's picture
Create infra_graph.py
34d6eec verified
raw
history blame
3.05 kB
"""
Graph manager for infrastructure topology.
Uses Neo4j if credentials provided, otherwise falls back to NetworkX.
"""
import logging
import os
from typing import Dict, Optional, List
logger = logging.getLogger(__name__)
class InfraGraph:
def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None):
self.neo4j_driver = None
if uri and user and password:
try:
from neo4j import GraphDatabase
self.neo4j_driver = GraphDatabase.driver(uri, auth=(user, password))
# Verify connection
with self.neo4j_driver.session() as session:
session.run("RETURN 1")
logger.info("Connected to Neo4j")
except Exception as e:
logger.warning(f"Neo4j connection failed, using NetworkX fallback: {e}")
# Always create NetworkX graph as fallback
import networkx as nx
self.nx_graph = nx.DiGraph()
def update_from_state(self, components: Dict):
"""Populate graph from simulator state."""
self.nx_graph.clear()
for cid, props in components.items():
self.nx_graph.add_node(cid, **props)
for conn in props.get("connections", []):
self.nx_graph.add_edge(cid, conn, relation="connects_to")
if "runs_on" in props:
self.nx_graph.add_edge(cid, props["runs_on"], relation="runs_on")
if self.neo4j_driver:
self._update_neo4j(components)
def _update_neo4j(self, components):
"""Sync with Neo4j (Cypher queries)."""
with self.neo4j_driver.session() as session:
# Clear existing graph
session.run("MATCH (n) DETACH DELETE n")
# Create nodes
for cid, props in components.items():
session.run(
f"CREATE (:{props['type']} {{id: $id, status: $status}})",
id=cid, status=props['status']
)
# Create relationships
for cid, props in components.items():
for conn in props.get("connections", []):
session.run(
"MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:CONNECTED_TO]->(b)",
aid=cid, bid=conn
)
if "runs_on" in props:
session.run(
"MATCH (a {id: $aid}), (b {id: $bid}) CREATE (a)-[:RUNS_ON]->(b)",
aid=cid, bid=props["runs_on"]
)
def get_failing_components(self) -> List[str]:
"""Return list of components with status != 'up'."""
failing = []
for node, data in self.nx_graph.nodes(data=True):
if data.get("status") != "up":
failing.append(node)
return failing
def to_networkx(self):
"""Return the internal NetworkX graph."""
return self.nx_graph