Spaces:
Sleeping
Sleeping
Ananth Shyam
fix: enhance admin functionality and error handling in ingestion and analytics processes
e646563 | """Analytics API — aggregates from Redis query events and Neo4j graph data.""" | |
| from __future__ import annotations | |
| import csv | |
| import io | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Literal | |
| import redis.asyncio as aioredis | |
| from fastapi import APIRouter, Query | |
| from fastapi.responses import StreamingResponse | |
| from src.config import settings | |
| from src.utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| router = APIRouter(prefix="/api/analytics", tags=["analytics"]) | |
| DateRange = Literal["7d", "30d", "90d", "all"] | |
| REDIS_QUERY_KEY = "gs:queries" # lpush list; each item is a JSON query event | |
| async def _redis() -> aioredis.Redis: | |
| return aioredis.from_url(settings.redis_url, decode_responses=True) | |
| def _cutoff(date_range: DateRange) -> datetime | None: | |
| if date_range == "all": | |
| return None | |
| days = {"7d": 7, "30d": 30, "90d": 90}[date_range] | |
| return datetime.utcnow() - timedelta(days=days) | |
| async def _load_events(date_range: DateRange) -> list[dict]: | |
| """Return query events from Redis. Returns [] if Redis is unavailable.""" | |
| r = await _redis() | |
| try: | |
| raw_list = await r.lrange(REDIS_QUERY_KEY, 0, 9999) | |
| except Exception as exc: | |
| logger.warning("analytics_redis_read_failed", extra={"error": str(exc)}) | |
| return [] | |
| finally: | |
| await r.aclose() | |
| cutoff = _cutoff(date_range) | |
| events = [] | |
| for raw in raw_list: | |
| try: | |
| ev = json.loads(raw) | |
| if cutoff: | |
| ts = datetime.fromisoformat(ev.get("created_at", "1970-01-01T00:00:00")) | |
| if ts < cutoff: | |
| continue | |
| events.append(ev) | |
| except Exception: | |
| continue | |
| return events | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/queries | |
| # --------------------------------------------------------------------------- | |
| async def get_queries(date_range: DateRange = Query(default="30d")) -> dict: | |
| events = await _load_events(date_range) | |
| if not events: | |
| return { | |
| "query_count": 0, | |
| "unique_users": 0, | |
| "avg_response_time_ms": 0, | |
| "success_rate": 0.0, | |
| "trend": {"data": []}, | |
| } | |
| total = len(events) | |
| successful = sum(1 for e in events if e.get("success", True)) | |
| durations = [e["duration_ms"] for e in events if "duration_ms" in e] | |
| avg_duration = int(sum(durations) / len(durations)) if durations else 0 | |
| users = {e.get("team_id", "unknown") for e in events} | |
| daily: dict[str, int] = {} | |
| for ev in events: | |
| day = ev.get("created_at", "")[:10] | |
| if day: | |
| daily[day] = daily.get(day, 0) + 1 | |
| return { | |
| "query_count": total, | |
| "unique_users": len(users), | |
| "avg_response_time_ms": avg_duration, | |
| "success_rate": round(successful / total, 3), | |
| "trend": {"data": [{"date": d, "count": c} for d, c in sorted(daily.items())]}, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/topics | |
| # --------------------------------------------------------------------------- | |
| async def get_topics(limit: int = Query(default=10, ge=1, le=50)) -> dict: | |
| r = await _redis() | |
| try: | |
| results = await r.zrevrange("gs:topics", 0, limit - 1, withscores=True) | |
| except Exception as exc: | |
| logger.warning("topics_redis_read_failed", extra={"error": str(exc)}) | |
| return {"topics": []} | |
| finally: | |
| await r.aclose() | |
| return {"topics": [{"topic": t, "count": int(c)} for t, c in results]} | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/knowledge-health | |
| # --------------------------------------------------------------------------- | |
| _EMPTY_DOMAINS = [ | |
| {"domain": label, "coverage": 0.0, "freshness": None, "accuracy": None, "score": 0.0} | |
| for label in ["Service", "Library", "Incident", "Team"] | |
| ] | |
| async def _compute_freshness() -> float | None: | |
| """Fraction of documents ingested/updated within the last 30 days. | |
| Returns None if Supabase is unavailable or unconfigured.""" | |
| try: | |
| from src.auth.db import _client as _sb_client | |
| sb = _sb_client() | |
| cutoff = (datetime.utcnow() - timedelta(days=30)).isoformat() | |
| total_result = sb.table("documents").select("id", count="exact").execute() | |
| recent_result = sb.table("documents").select("id", count="exact").gte("updated_at", cutoff).execute() | |
| total = total_result.count or 0 | |
| recent = recent_result.count or 0 | |
| if total == 0: | |
| return None | |
| return round(recent / total, 2) | |
| except Exception: | |
| logger.warning("analytics: freshness query failed — returning None") | |
| return None | |
| async def get_knowledge_health() -> dict: | |
| rows: list[dict] = [] | |
| try: | |
| from graph_store.config import settings as neo4j_settings | |
| from neo4j import AsyncGraphDatabase | |
| driver = AsyncGraphDatabase.driver( | |
| neo4j_settings.neo4j_uri, | |
| auth=(neo4j_settings.neo4j_username, neo4j_settings.neo4j_password), | |
| ) | |
| async with driver.session() as session: | |
| result = await session.run( | |
| "MATCH (n) RETURN labels(n)[0] AS label, count(n) AS cnt" | |
| ) | |
| rows = await result.data() | |
| await driver.close() | |
| except Exception as exc: | |
| logger.warning("knowledge_health_neo4j_error", extra={"error": str(exc)}) | |
| counts: dict[str, int] = {r["label"]: r["cnt"] for r in rows if r.get("label")} | |
| total_nodes = sum(counts.values()) or 1 | |
| freshness = await _compute_freshness() | |
| def _score(label: str) -> dict: | |
| cnt = counts.get(label, 0) | |
| coverage = min(1.0, cnt / max(total_nodes * 0.25, 1)) | |
| parts = [coverage] | |
| if freshness is not None: | |
| parts.append(freshness) | |
| return { | |
| "domain": label, | |
| "coverage": round(coverage, 2), | |
| "freshness": freshness, | |
| "accuracy": None, | |
| "score": round(sum(parts) / len(parts), 2), | |
| } | |
| domains = [_score(lbl) for lbl in ["Service", "Library", "Incident", "Team"]] | |
| if not any(d["coverage"] > 0 for d in domains): | |
| domains = _EMPTY_DOMAINS | |
| overall = round(sum(d["score"] for d in domains) / len(domains), 2) | |
| return {"overall_score": overall, "domains": domains} | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/dependencies | |
| # --------------------------------------------------------------------------- | |
| async def get_dependencies() -> dict: | |
| rows: list[dict] = [] | |
| try: | |
| from graph_store.config import settings as neo4j_settings | |
| from neo4j import AsyncGraphDatabase | |
| driver = AsyncGraphDatabase.driver( | |
| neo4j_settings.neo4j_uri, | |
| auth=(neo4j_settings.neo4j_username, neo4j_settings.neo4j_password), | |
| ) | |
| async with driver.session() as session: | |
| result = await session.run( | |
| """ | |
| MATCH (n) | |
| WHERE n:Service OR n:Library | |
| RETURN labels(n)[0] AS type, | |
| n.name AS name, | |
| coalesce(n.version, '0.0.0') AS current_version, | |
| coalesce(n.latest_version, '0.0.0') AS latest_version, | |
| coalesce(n.breaking_change, false) AS breaking_change, | |
| coalesce(n.team, 'unknown') AS team | |
| LIMIT 100 | |
| """ | |
| ) | |
| rows = await result.data() | |
| await driver.close() | |
| except Exception as exc: | |
| logger.warning("dependencies_neo4j_error", extra={"error": str(exc)}) | |
| now = datetime.utcnow().isoformat() | |
| deps = [] | |
| for r in rows: | |
| name = r.get("name") | |
| if not name: | |
| continue # skip nodes without a name property | |
| deps.append({ | |
| "name": str(name), | |
| "type": (r.get("type") or "service").lower(), | |
| "current_version": r.get("current_version", "0.0.0"), | |
| "latest_version": r.get("latest_version", "0.0.0"), | |
| "breaking_change": bool(r.get("breaking_change", False)), | |
| "teams": [str(r.get("team", "unknown"))], | |
| "last_checked": now, | |
| }) | |
| return {"dependencies": deps} | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/coverage-gaps | |
| # --------------------------------------------------------------------------- | |
| async def get_coverage_gaps() -> dict: | |
| """Least-documented entities per label — shows where ingestion has the highest impact.""" | |
| rows: list[dict] = [] | |
| try: | |
| from graph_store.writer import get_driver | |
| driver = get_driver() | |
| async with driver.session() as session: | |
| result = await session.run(""" | |
| MATCH (n) | |
| WHERE n:Service OR n:Library OR n:Incident OR n:Team | |
| OPTIONAL MATCH (n)<-[:MENTIONS|REFERENCES]-(c:Chunk) | |
| RETURN labels(n)[0] AS label, | |
| n.name AS name, | |
| count(c) AS mention_count | |
| ORDER BY mention_count ASC, label ASC | |
| LIMIT 60 | |
| """) | |
| rows = await result.data() | |
| except Exception as exc: | |
| logger.warning("coverage_gaps_neo4j_error", extra={"error": str(exc)}) | |
| gaps: dict[str, list[dict]] = {"Service": [], "Library": [], "Incident": [], "Team": []} | |
| for r in rows: | |
| label = r.get("label") | |
| name = r.get("name") | |
| if label in gaps and name: | |
| gaps[label].append({"name": name, "mention_count": r.get("mention_count", 0)}) | |
| return {"gaps": gaps} | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/escalations | |
| # --------------------------------------------------------------------------- | |
| async def get_escalations(limit: int = Query(default=50, ge=1, le=200)) -> dict: | |
| r = await _redis() | |
| try: | |
| raw_list = await r.lrange("gs:escalations", 0, limit - 1) | |
| total = await r.llen("gs:escalations") | |
| except Exception as exc: | |
| logger.warning("escalations_redis_read_failed", extra={"error": str(exc)}) | |
| return {"escalations": [], "total": 0} | |
| finally: | |
| await r.aclose() | |
| escalations = [] | |
| for raw in raw_list: | |
| try: | |
| escalations.append(json.loads(raw)) | |
| except Exception: | |
| continue | |
| return {"escalations": escalations, "total": total} | |
| # --------------------------------------------------------------------------- | |
| # GET /api/analytics/export | |
| # --------------------------------------------------------------------------- | |
| async def export_analytics( | |
| scope: str = Query(default="full"), | |
| format: str = Query(default="csv"), | |
| date_range: DateRange = Query(default="30d"), | |
| ) -> StreamingResponse: | |
| events = await _load_events(date_range) | |
| if format == "csv": | |
| buf = io.StringIO() | |
| writer = csv.DictWriter( | |
| buf, | |
| fieldnames=["created_at", "query", "team_id", "success", "duration_ms"], | |
| ) | |
| writer.writeheader() | |
| for ev in events: | |
| writer.writerow({ | |
| "created_at": ev.get("created_at", ""), | |
| "query": ev.get("query", ""), | |
| "team_id": ev.get("team_id", ""), | |
| "success": ev.get("success", True), | |
| "duration_ms": ev.get("duration_ms", 0), | |
| }) | |
| content = buf.getvalue().encode("utf-8") | |
| media = "text/csv" | |
| suffix = "csv" | |
| else: | |
| # PDF renderer (e.g. weasyprint) not yet wired — return CSV fallback | |
| content = b"PDF export not yet implemented. Use format=csv.\n" | |
| media = "text/plain" | |
| suffix = "txt" | |
| filename = f"godspeed-{scope}-{date_range}.{suffix}" | |
| return StreamingResponse( | |
| iter([content]), | |
| media_type=media, | |
| headers={"Content-Disposition": f'attachment; filename="{filename}"'}, | |
| ) | |