AdithyaVardan commited on
Commit
cf1b61a
·
1 Parent(s): 54f37d0

Fix defunct Neo4j connection: add keep-alive, liveness check, retry on SessionExpired

Browse files
graph_store/api.py CHANGED
@@ -11,6 +11,20 @@ logger = logging.getLogger(__name__)
11
  router = APIRouter(prefix="/graph", tags=["graph"])
12
 
13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  def _get_driver():
15
  from graph_store.writer import get_driver
16
  return get_driver()
 
11
  router = APIRouter(prefix="/graph", tags=["graph"])
12
 
13
 
14
+ @router.get("/nodes")
15
+ async def graph_nodes(limit: int = 50) -> dict:
16
+ from graph_store.writer import get_driver
17
+ from graph_store.config import settings
18
+ driver = get_driver()
19
+ async with driver.session(database=settings.neo4j_database) as session:
20
+ result = await session.run(
21
+ "MATCH (n) WHERE NOT n:Chunk AND NOT n:Document RETURN labels(n)[0] AS label, n.name AS name LIMIT $limit",
22
+ limit=limit,
23
+ )
24
+ records = await result.data()
25
+ return {"count": len(records), "nodes": records}
26
+
27
+
28
  def _get_driver():
29
  from graph_store.writer import get_driver
30
  return get_driver()
graph_store/reader.py CHANGED
@@ -76,10 +76,21 @@ LIMIT 20
76
  # ---------------------------------------------------------------------------
77
 
78
  async def _run_query(driver: AsyncDriver, cypher: str, **params) -> list[str]:
79
- async with driver.session(database=settings.neo4j_database) as session:
80
- result = await session.run(cypher, **params)
81
- records = await result.data()
82
- return [r["text"] for r in records if r.get("text")]
 
 
 
 
 
 
 
 
 
 
 
83
 
84
 
85
  async def _union_queries(driver: AsyncDriver, queries: list[tuple[str, dict]]) -> list[str]:
 
76
  # ---------------------------------------------------------------------------
77
 
78
  async def _run_query(driver: AsyncDriver, cypher: str, **params) -> list[str]:
79
+ from neo4j.exceptions import SessionExpired
80
+ try:
81
+ async with driver.session(database=settings.neo4j_database) as session:
82
+ result = await session.run(cypher, **params)
83
+ records = await result.data()
84
+ return [r["text"] for r in records if r.get("text")]
85
+ except SessionExpired:
86
+ logger.warning("reader: SessionExpired — resetting driver and retrying")
87
+ from graph_store.writer import close_driver, get_driver
88
+ await close_driver()
89
+ driver = get_driver()
90
+ async with driver.session(database=settings.neo4j_database) as session:
91
+ result = await session.run(cypher, **params)
92
+ records = await result.data()
93
+ return [r["text"] for r in records if r.get("text")]
94
 
95
 
96
  async def _union_queries(driver: AsyncDriver, queries: list[tuple[str, dict]]) -> list[str]:
graph_store/stream.py CHANGED
@@ -34,6 +34,10 @@ async def graph_stream(websocket: WebSocket):
34
  driver = AsyncGraphDatabase.driver(
35
  settings.neo4j_uri,
36
  auth=(settings.neo4j_username, settings.neo4j_password),
 
 
 
 
37
  )
38
  try:
39
  async with driver.session(database=settings.neo4j_database) as session:
 
34
  driver = AsyncGraphDatabase.driver(
35
  settings.neo4j_uri,
36
  auth=(settings.neo4j_username, settings.neo4j_password),
37
+ max_connection_lifetime=1800,
38
+ connection_acquisition_timeout=30,
39
+ keep_alive=True,
40
+ liveness_check_timeout=10,
41
  )
42
  try:
43
  async with driver.session(database=settings.neo4j_database) as session:
graph_store/writer.py CHANGED
@@ -107,6 +107,10 @@ def get_driver() -> AsyncDriver:
107
  _driver = AsyncGraphDatabase.driver(
108
  settings.neo4j_uri,
109
  auth=(settings.neo4j_username, settings.neo4j_password),
 
 
 
 
110
  )
111
  return _driver
112
 
 
107
  _driver = AsyncGraphDatabase.driver(
108
  settings.neo4j_uri,
109
  auth=(settings.neo4j_username, settings.neo4j_password),
110
+ max_connection_lifetime=1800,
111
+ connection_acquisition_timeout=30,
112
+ keep_alive=True,
113
+ liveness_check_timeout=10,
114
  )
115
  return _driver
116