"""Graph adapter: NetworkX fallback and a full Neo4j adapter (lazy imports). This file provides a production-ready adapter implementation that will use the `neo4j` python driver when available and fall back to an in-memory NetworkX graph otherwise. """ from typing import Any, Dict, List, Optional import logging logger = logging.getLogger(__name__) try: import networkx as nx except Exception: nx = None class NetworkXGraph: def __init__(self): if nx is None: raise RuntimeError("networkx is not available") self.g = nx.MultiDiGraph() def add_node(self, node_id: str, **props: Any): self.g.add_node(node_id, **props) def add_edge(self, a: str, b: str, **props: Any): self.g.add_edge(a, b, **props) def find_nodes(self, key: str, value: str) -> List[str]: return [n for n, d in self.g.nodes(data=True) if d.get(key) == value] def run_cypher(self, query: str, **params: Any): # Not applicable for NetworkX; provide simple pattern matcher if needed raise NotImplementedError("Cypher not supported for NetworkX fallback") class Neo4jAdapter: def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None): self._driver = None self._connected = False self._uri = uri or "bolt://localhost:7687" self._user = user or "neo4j" self._password = password or "testpassword" try: # lazy import to avoid importing heavy driver at module import time from neo4j import GraphDatabase self._driver = GraphDatabase.driver(self._uri, auth=(self._user, self._password)) self._connected = True except Exception as e: logger.info("Neo4j driver not available or connection failed: %s", e) self._driver = None def is_available(self) -> bool: return self._driver is not None def close(self): if self._driver: try: self._driver.close() except Exception: pass def run(self, cypher: str, **params: Any) -> List[Dict[str, Any]]: if not self._driver: raise RuntimeError("Neo4j driver not available") with self._driver.session() as session: res = session.run(cypher, **params) return [dict(record) for record in res] def create_node(self, labels: List[str], props: Dict[str, Any]) -> Dict[str, Any]: lbl = ":".join(labels) if labels else "" cypher = f"CREATE (n:{lbl} $props) RETURN id(n) as id" rows = self.run(cypher, props=props) return rows[0] if rows else {} def create_relationship(self, a_id: int, b_id: int, rel_type: str, props: Dict[str, Any] = None) -> Dict[str, Any]: props = props or {} cypher = "MATCH (a),(b) WHERE id(a)=$aid AND id(b)=$bid CREATE (a)-[r:%s $props]->(b) RETURN id(r) as id" % rel_type rows = self.run(cypher, aid=a_id, bid=b_id, props=props) return rows[0] if rows else {}