Spaces:
Sleeping
Sleeping
Ananth Shyam
fix: enhance admin functionality and error handling in ingestion and analytics processes
e646563 | from __future__ import annotations | |
| from src.utils.logger import Timer, get_logger as _get_logger | |
| import logging | |
| from neo4j import AsyncGraphDatabase | |
| from fastapi import APIRouter, HTTPException | |
| from graph_store.config import settings | |
| from graph_store.models import GraphIngestRequest, GraphTraverseRequest | |
| logger = _get_logger(__name__) | |
| router = APIRouter(prefix="/graph", tags=["graph"]) | |
| def _fresh_driver(): | |
| return AsyncGraphDatabase.driver( | |
| settings.neo4j_uri, | |
| auth=(settings.neo4j_username, settings.neo4j_password), | |
| max_connection_lifetime=300, | |
| connection_acquisition_timeout=60, | |
| keep_alive=True, | |
| ) | |
| async def graph_nodes(limit: int = 50) -> dict: | |
| driver = _fresh_driver() | |
| with Timer() as t: | |
| try: | |
| async with driver.session(database=settings.neo4j_database) as session: | |
| result = await session.run( | |
| "MATCH (n) WHERE NOT n:Chunk AND NOT n:Document RETURN labels(n)[0] AS label, n.name AS name LIMIT $limit", | |
| limit=limit, | |
| ) | |
| records = await result.data() | |
| finally: | |
| await driver.close() | |
| logger.info("graph_nodes", extra={"count": len(records), "duration_ms": t.ms}) | |
| return {"count": len(records), "nodes": records} | |
| async def graph_ingest(request: GraphIngestRequest) -> dict: | |
| from ingestion.storage.supabase_store import get_client | |
| from graph_store.extractor import extract_batch | |
| from graph_store.writer import ensure_indexes, upsert_chunk | |
| sb = get_client() | |
| try: | |
| result = ( | |
| sb.table("chunks") | |
| .select("chunk_id, doc_id, text, source, source_type, team_id, chunk_index") | |
| .in_("chunk_id", request.chunk_ids) | |
| .eq("team_id", request.team_id) | |
| .execute() | |
| ) | |
| rows = result.data or [] | |
| except Exception: | |
| logger.exception("graph_ingest_supabase_error", extra={"chunk_ids": request.chunk_ids[:5]}) | |
| raise HTTPException(status_code=502, detail="Failed to fetch chunks from Supabase") | |
| if not rows: | |
| return {"ingested": 0, "total": 0, "failed": 0} | |
| texts = [r["text"] for r in rows] | |
| # Run Gemini extraction first (can take minutes) — driver created AFTER this | |
| extractions = await extract_batch(texts) | |
| class _ChunkProxy: | |
| def __init__(self, row: dict) -> None: | |
| self.chunk_id = row["chunk_id"] | |
| self.doc_id = row["doc_id"] | |
| self.text = row["text"] | |
| self.source = row["source"] | |
| self.source_type = row["source_type"] | |
| self.team_id = row["team_id"] | |
| self.chunk_index = row["chunk_index"] | |
| # Fresh driver opened AFTER extraction so the connection isn't idle during Gemini calls | |
| driver = _fresh_driver() | |
| try: | |
| await ensure_indexes(driver) | |
| ingested = 0 | |
| for row, extraction in zip(rows, extractions): | |
| try: | |
| await upsert_chunk(_ChunkProxy(row), extraction, driver) | |
| ingested += 1 | |
| except Exception: | |
| logger.exception("graph/ingest: upsert failed for chunk_id=%s", row["chunk_id"]) | |
| finally: | |
| await driver.close() | |
| total = len(rows) | |
| failed = total - ingested | |
| logger.info("graph_ingest_done", extra={"ingested": ingested, "total": total, "failed": failed}) | |
| return {"ingested": ingested, "total": total, "failed": failed} | |
| async def graph_traverse(type: str, name: str, team_id: str) -> dict: | |
| from graph_store.reader import ( | |
| find_library_chunks, | |
| traverse_from_incident, | |
| traverse_from_service, | |
| ) | |
| driver = _fresh_driver() | |
| try: | |
| if type == "incident": | |
| texts = await traverse_from_incident(name, team_id, driver) | |
| elif type == "service": | |
| texts = await traverse_from_service(name, team_id, driver) | |
| elif type == "library": | |
| texts = await find_library_chunks(name, team_id, driver) | |
| else: | |
| raise HTTPException(status_code=400, detail="type must be incident, service, or library") | |
| finally: | |
| await driver.close() | |
| return {"type": type, "name": name, "team_id": team_id, "chunks": texts} | |