Ananth Shyam
fix: enhance admin functionality and error handling in ingestion and analytics processes
e646563
from __future__ import annotations
from src.utils.logger import Timer, get_logger as _get_logger
import logging
from neo4j import AsyncGraphDatabase
from fastapi import APIRouter, HTTPException
from graph_store.config import settings
from graph_store.models import GraphIngestRequest, GraphTraverseRequest
logger = _get_logger(__name__)
router = APIRouter(prefix="/graph", tags=["graph"])
def _fresh_driver():
return AsyncGraphDatabase.driver(
settings.neo4j_uri,
auth=(settings.neo4j_username, settings.neo4j_password),
max_connection_lifetime=300,
connection_acquisition_timeout=60,
keep_alive=True,
)
@router.get("/nodes")
async def graph_nodes(limit: int = 50) -> dict:
driver = _fresh_driver()
with Timer() as t:
try:
async with driver.session(database=settings.neo4j_database) as session:
result = await session.run(
"MATCH (n) WHERE NOT n:Chunk AND NOT n:Document RETURN labels(n)[0] AS label, n.name AS name LIMIT $limit",
limit=limit,
)
records = await result.data()
finally:
await driver.close()
logger.info("graph_nodes", extra={"count": len(records), "duration_ms": t.ms})
return {"count": len(records), "nodes": records}
@router.post("/ingest")
async def graph_ingest(request: GraphIngestRequest) -> dict:
from ingestion.storage.supabase_store import get_client
from graph_store.extractor import extract_batch
from graph_store.writer import ensure_indexes, upsert_chunk
sb = get_client()
try:
result = (
sb.table("chunks")
.select("chunk_id, doc_id, text, source, source_type, team_id, chunk_index")
.in_("chunk_id", request.chunk_ids)
.eq("team_id", request.team_id)
.execute()
)
rows = result.data or []
except Exception:
logger.exception("graph_ingest_supabase_error", extra={"chunk_ids": request.chunk_ids[:5]})
raise HTTPException(status_code=502, detail="Failed to fetch chunks from Supabase")
if not rows:
return {"ingested": 0, "total": 0, "failed": 0}
texts = [r["text"] for r in rows]
# Run Gemini extraction first (can take minutes) — driver created AFTER this
extractions = await extract_batch(texts)
class _ChunkProxy:
def __init__(self, row: dict) -> None:
self.chunk_id = row["chunk_id"]
self.doc_id = row["doc_id"]
self.text = row["text"]
self.source = row["source"]
self.source_type = row["source_type"]
self.team_id = row["team_id"]
self.chunk_index = row["chunk_index"]
# Fresh driver opened AFTER extraction so the connection isn't idle during Gemini calls
driver = _fresh_driver()
try:
await ensure_indexes(driver)
ingested = 0
for row, extraction in zip(rows, extractions):
try:
await upsert_chunk(_ChunkProxy(row), extraction, driver)
ingested += 1
except Exception:
logger.exception("graph/ingest: upsert failed for chunk_id=%s", row["chunk_id"])
finally:
await driver.close()
total = len(rows)
failed = total - ingested
logger.info("graph_ingest_done", extra={"ingested": ingested, "total": total, "failed": failed})
return {"ingested": ingested, "total": total, "failed": failed}
@router.get("/traverse")
async def graph_traverse(type: str, name: str, team_id: str) -> dict:
from graph_store.reader import (
find_library_chunks,
traverse_from_incident,
traverse_from_service,
)
driver = _fresh_driver()
try:
if type == "incident":
texts = await traverse_from_incident(name, team_id, driver)
elif type == "service":
texts = await traverse_from_service(name, team_id, driver)
elif type == "library":
texts = await find_library_chunks(name, team_id, driver)
else:
raise HTTPException(status_code=400, detail="type must be incident, service, or library")
finally:
await driver.close()
return {"type": type, "name": name, "team_id": team_id, "chunks": texts}