GodSpeed / graph_store /reader.py
AdithyaVardan's picture
Fix defunct Neo4j connection: add keep-alive, liveness check, retry on SessionExpired
cf1b61a
from __future__ import annotations
import logging
from neo4j import AsyncDriver
from graph_store.config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cypher — traversal queries
# ---------------------------------------------------------------------------
_TRAVERSE_INCIDENT_VIA_LIBRARY = """
MATCH (i:Incident {incident_id: $incident_id})
-[:CAUSED_BY]->(s:Service)
-[:DEPENDS_ON]->(l:Library)
<-[:REFERENCES]-(c:Chunk {team_id: $team_id})
RETURN DISTINCT c.text AS text
"""
_TRAVERSE_INCIDENT_VIA_MENTIONS = """
MATCH (i:Incident {incident_id: $incident_id})
-[:CAUSED_BY]->(s:Service)
<-[:MENTIONS]-(c:Chunk {team_id: $team_id})
RETURN DISTINCT c.text AS text
"""
_TRAVERSE_INCIDENT_VIA_DOCS = """
MATCH (i:Incident {incident_id: $incident_id})
-[:CAUSED_BY]->(s:Service)
<-[:DOCUMENTS]-(d:Document)
-[:HAS_CHUNK]->(c:Chunk {team_id: $team_id})
RETURN DISTINCT c.text AS text
"""
_TRAVERSE_SERVICE_MENTIONS = """
MATCH (s:Service {name: $service_name, team_id: $team_id})
<-[:MENTIONS]-(c:Chunk {team_id: $team_id})
RETURN DISTINCT c.text AS text
"""
_TRAVERSE_SERVICE_VIA_LIBRARY = """
MATCH (s:Service {name: $service_name, team_id: $team_id})
-[:DEPENDS_ON]->(l:Library)
<-[:REFERENCES]-(c:Chunk {team_id: $team_id})
RETURN DISTINCT c.text AS text
"""
_TRAVERSE_SERVICE_VIA_DOCS = """
MATCH (s:Service {name: $service_name, team_id: $team_id})
<-[:DOCUMENTS]-(d:Document)
-[:HAS_CHUNK]->(c:Chunk {team_id: $team_id})
RETURN DISTINCT c.text AS text
"""
_FIND_LIBRARY_CHUNKS = """
MATCH (l:Library {name: $library_name})
<-[:REFERENCES]-(c:Chunk {team_id: $team_id})
RETURN c.text AS text
ORDER BY c.chunk_index ASC
"""
_FIND_RELATED_CHUNKS = """
MATCH (start:Chunk {chunk_id: $chunk_id})-[:MENTIONS|REFERENCES]->(pivot)
WITH pivot
MATCH (c:Chunk {team_id: $team_id})-[:MENTIONS|REFERENCES]->(pivot)
WHERE c.chunk_id <> $chunk_id
RETURN DISTINCT c.text AS text
LIMIT 20
"""
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _run_query(driver: AsyncDriver, cypher: str, **params) -> list[str]:
from neo4j.exceptions import SessionExpired
try:
async with driver.session(database=settings.neo4j_database) as session:
result = await session.run(cypher, **params)
records = await result.data()
return [r["text"] for r in records if r.get("text")]
except SessionExpired:
logger.warning("reader: SessionExpired — resetting driver and retrying")
from graph_store.writer import close_driver, get_driver
await close_driver()
driver = get_driver()
async with driver.session(database=settings.neo4j_database) as session:
result = await session.run(cypher, **params)
records = await result.data()
return [r["text"] for r in records if r.get("text")]
async def _union_queries(driver: AsyncDriver, queries: list[tuple[str, dict]]) -> list[str]:
seen: set[str] = set()
texts: list[str] = []
for cypher, params in queries:
for text in await _run_query(driver, cypher, **params):
if text not in seen:
seen.add(text)
texts.append(text)
return texts
# ---------------------------------------------------------------------------
# Public read API
# ---------------------------------------------------------------------------
async def traverse_from_incident(
incident_id: str, team_id: str, driver: AsyncDriver
) -> list[str]:
return await _union_queries(
driver,
[
(_TRAVERSE_INCIDENT_VIA_LIBRARY, {"incident_id": incident_id, "team_id": team_id}),
(_TRAVERSE_INCIDENT_VIA_MENTIONS, {"incident_id": incident_id, "team_id": team_id}),
(_TRAVERSE_INCIDENT_VIA_DOCS, {"incident_id": incident_id, "team_id": team_id}),
],
)
async def traverse_from_service(
service_name: str, team_id: str, driver: AsyncDriver
) -> list[str]:
return await _union_queries(
driver,
[
(_TRAVERSE_SERVICE_MENTIONS, {"service_name": service_name, "team_id": team_id}),
(_TRAVERSE_SERVICE_VIA_LIBRARY, {"service_name": service_name, "team_id": team_id}),
(_TRAVERSE_SERVICE_VIA_DOCS, {"service_name": service_name, "team_id": team_id}),
],
)
async def find_library_chunks(
library_name: str, team_id: str, driver: AsyncDriver
) -> list[str]:
return await _run_query(
driver, _FIND_LIBRARY_CHUNKS, library_name=library_name, team_id=team_id
)
async def find_related_chunks(
chunk_id: str, team_id: str, driver: AsyncDriver
) -> list[str]:
return await _run_query(
driver, _FIND_RELATED_CHUNKS, chunk_id=chunk_id, team_id=team_id
)