Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import logging | |
| from neo4j import AsyncDriver | |
| from graph_store.config import settings | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Cypher — traversal queries | |
| # --------------------------------------------------------------------------- | |
| _TRAVERSE_INCIDENT_VIA_LIBRARY = """ | |
| MATCH (i:Incident {incident_id: $incident_id}) | |
| -[:CAUSED_BY]->(s:Service) | |
| -[:DEPENDS_ON]->(l:Library) | |
| <-[:REFERENCES]-(c:Chunk {team_id: $team_id}) | |
| RETURN DISTINCT c.text AS text | |
| """ | |
| _TRAVERSE_INCIDENT_VIA_MENTIONS = """ | |
| MATCH (i:Incident {incident_id: $incident_id}) | |
| -[:CAUSED_BY]->(s:Service) | |
| <-[:MENTIONS]-(c:Chunk {team_id: $team_id}) | |
| RETURN DISTINCT c.text AS text | |
| """ | |
| _TRAVERSE_INCIDENT_VIA_DOCS = """ | |
| MATCH (i:Incident {incident_id: $incident_id}) | |
| -[:CAUSED_BY]->(s:Service) | |
| <-[:DOCUMENTS]-(d:Document) | |
| -[:HAS_CHUNK]->(c:Chunk {team_id: $team_id}) | |
| RETURN DISTINCT c.text AS text | |
| """ | |
| _TRAVERSE_SERVICE_MENTIONS = """ | |
| MATCH (s:Service {name: $service_name, team_id: $team_id}) | |
| <-[:MENTIONS]-(c:Chunk {team_id: $team_id}) | |
| RETURN DISTINCT c.text AS text | |
| """ | |
| _TRAVERSE_SERVICE_VIA_LIBRARY = """ | |
| MATCH (s:Service {name: $service_name, team_id: $team_id}) | |
| -[:DEPENDS_ON]->(l:Library) | |
| <-[:REFERENCES]-(c:Chunk {team_id: $team_id}) | |
| RETURN DISTINCT c.text AS text | |
| """ | |
| _TRAVERSE_SERVICE_VIA_DOCS = """ | |
| MATCH (s:Service {name: $service_name, team_id: $team_id}) | |
| <-[:DOCUMENTS]-(d:Document) | |
| -[:HAS_CHUNK]->(c:Chunk {team_id: $team_id}) | |
| RETURN DISTINCT c.text AS text | |
| """ | |
| _FIND_LIBRARY_CHUNKS = """ | |
| MATCH (l:Library {name: $library_name}) | |
| <-[:REFERENCES]-(c:Chunk {team_id: $team_id}) | |
| RETURN c.text AS text | |
| ORDER BY c.chunk_index ASC | |
| """ | |
| _FIND_RELATED_CHUNKS = """ | |
| MATCH (start:Chunk {chunk_id: $chunk_id})-[:MENTIONS|REFERENCES]->(pivot) | |
| WITH pivot | |
| MATCH (c:Chunk {team_id: $team_id})-[:MENTIONS|REFERENCES]->(pivot) | |
| WHERE c.chunk_id <> $chunk_id | |
| RETURN DISTINCT c.text AS text | |
| LIMIT 20 | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| async def _run_query(driver: AsyncDriver, cypher: str, **params) -> list[str]: | |
| from neo4j.exceptions import SessionExpired | |
| try: | |
| async with driver.session(database=settings.neo4j_database) as session: | |
| result = await session.run(cypher, **params) | |
| records = await result.data() | |
| return [r["text"] for r in records if r.get("text")] | |
| except SessionExpired: | |
| logger.warning("reader: SessionExpired — resetting driver and retrying") | |
| from graph_store.writer import close_driver, get_driver | |
| await close_driver() | |
| driver = get_driver() | |
| async with driver.session(database=settings.neo4j_database) as session: | |
| result = await session.run(cypher, **params) | |
| records = await result.data() | |
| return [r["text"] for r in records if r.get("text")] | |
| async def _union_queries(driver: AsyncDriver, queries: list[tuple[str, dict]]) -> list[str]: | |
| seen: set[str] = set() | |
| texts: list[str] = [] | |
| for cypher, params in queries: | |
| for text in await _run_query(driver, cypher, **params): | |
| if text not in seen: | |
| seen.add(text) | |
| texts.append(text) | |
| return texts | |
| # --------------------------------------------------------------------------- | |
| # Public read API | |
| # --------------------------------------------------------------------------- | |
| async def traverse_from_incident( | |
| incident_id: str, team_id: str, driver: AsyncDriver | |
| ) -> list[str]: | |
| return await _union_queries( | |
| driver, | |
| [ | |
| (_TRAVERSE_INCIDENT_VIA_LIBRARY, {"incident_id": incident_id, "team_id": team_id}), | |
| (_TRAVERSE_INCIDENT_VIA_MENTIONS, {"incident_id": incident_id, "team_id": team_id}), | |
| (_TRAVERSE_INCIDENT_VIA_DOCS, {"incident_id": incident_id, "team_id": team_id}), | |
| ], | |
| ) | |
| async def traverse_from_service( | |
| service_name: str, team_id: str, driver: AsyncDriver | |
| ) -> list[str]: | |
| return await _union_queries( | |
| driver, | |
| [ | |
| (_TRAVERSE_SERVICE_MENTIONS, {"service_name": service_name, "team_id": team_id}), | |
| (_TRAVERSE_SERVICE_VIA_LIBRARY, {"service_name": service_name, "team_id": team_id}), | |
| (_TRAVERSE_SERVICE_VIA_DOCS, {"service_name": service_name, "team_id": team_id}), | |
| ], | |
| ) | |
| async def find_library_chunks( | |
| library_name: str, team_id: str, driver: AsyncDriver | |
| ) -> list[str]: | |
| return await _run_query( | |
| driver, _FIND_LIBRARY_CHUNKS, library_name=library_name, team_id=team_id | |
| ) | |
| async def find_related_chunks( | |
| chunk_id: str, team_id: str, driver: AsyncDriver | |
| ) -> list[str]: | |
| return await _run_query( | |
| driver, _FIND_RELATED_CHUNKS, chunk_id=chunk_id, team_id=team_id | |
| ) | |