| | """Graph adapter: NetworkX fallback and a full Neo4j adapter (lazy imports).
|
| |
|
| | This file provides a production-ready adapter implementation that will use the
|
| | `neo4j` python driver when available and fall back to an in-memory NetworkX
|
| | graph otherwise.
|
| | """
|
| | from typing import Any, Dict, List, Optional
|
| |
|
| | import logging
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | try:
|
| | import networkx as nx
|
| | except Exception:
|
| | nx = None
|
| |
|
| |
|
| | class NetworkXGraph:
|
| | def __init__(self):
|
| | if nx is None:
|
| | raise RuntimeError("networkx is not available")
|
| | self.g = nx.MultiDiGraph()
|
| |
|
| | def add_node(self, node_id: str, **props: Any):
|
| | self.g.add_node(node_id, **props)
|
| |
|
| | def add_edge(self, a: str, b: str, **props: Any):
|
| | self.g.add_edge(a, b, **props)
|
| |
|
| | def find_nodes(self, key: str, value: str) -> List[str]:
|
| | return [n for n, d in self.g.nodes(data=True) if d.get(key) == value]
|
| |
|
| | def run_cypher(self, query: str, **params: Any):
|
| |
|
| | raise NotImplementedError("Cypher not supported for NetworkX fallback")
|
| |
|
| |
|
| | class Neo4jAdapter:
|
| | def __init__(self, uri: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None):
|
| | self._driver = None
|
| | self._connected = False
|
| | self._uri = uri or "bolt://localhost:7687"
|
| | self._user = user or "neo4j"
|
| | self._password = password or "testpassword"
|
| | try:
|
| |
|
| | from neo4j import GraphDatabase
|
| | self._driver = GraphDatabase.driver(self._uri, auth=(self._user, self._password))
|
| | self._connected = True
|
| | except Exception as e:
|
| | logger.info("Neo4j driver not available or connection failed: %s", e)
|
| | self._driver = None
|
| |
|
| | def is_available(self) -> bool:
|
| | return self._driver is not None
|
| |
|
| | def close(self):
|
| | if self._driver:
|
| | try:
|
| | self._driver.close()
|
| | except Exception:
|
| | pass
|
| |
|
| | def run(self, cypher: str, **params: Any) -> List[Dict[str, Any]]:
|
| | if not self._driver:
|
| | raise RuntimeError("Neo4j driver not available")
|
| | with self._driver.session() as session:
|
| | res = session.run(cypher, **params)
|
| | return [dict(record) for record in res]
|
| |
|
| | def create_node(self, labels: List[str], props: Dict[str, Any]) -> Dict[str, Any]:
|
| | lbl = ":".join(labels) if labels else ""
|
| | cypher = f"CREATE (n:{lbl} $props) RETURN id(n) as id"
|
| | rows = self.run(cypher, props=props)
|
| | return rows[0] if rows else {}
|
| |
|
| | def create_relationship(self, a_id: int, b_id: int, rel_type: str, props: Dict[str, Any] = None) -> Dict[str, Any]:
|
| | props = props or {}
|
| | cypher = "MATCH (a),(b) WHERE id(a)=$aid AND id(b)=$bid CREATE (a)-[r:%s $props]->(b) RETURN id(r) as id" % rel_type
|
| | rows = self.run(cypher, aid=a_id, bid=b_id, props=props)
|
| | return rows[0] if rows else {}
|
| |
|