GodSpeed / src /analytics /router.py
Ananth Shyam
fix: enhance admin functionality and error handling in ingestion and analytics processes
e646563
"""Analytics API — aggregates from Redis query events and Neo4j graph data."""
from __future__ import annotations
import csv
import io
import json
from datetime import datetime, timedelta
from typing import Literal
import redis.asyncio as aioredis
from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse
from src.config import settings
from src.utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/api/analytics", tags=["analytics"])
DateRange = Literal["7d", "30d", "90d", "all"]
REDIS_QUERY_KEY = "gs:queries" # lpush list; each item is a JSON query event
async def _redis() -> aioredis.Redis:
return aioredis.from_url(settings.redis_url, decode_responses=True)
def _cutoff(date_range: DateRange) -> datetime | None:
if date_range == "all":
return None
days = {"7d": 7, "30d": 30, "90d": 90}[date_range]
return datetime.utcnow() - timedelta(days=days)
async def _load_events(date_range: DateRange) -> list[dict]:
"""Return query events from Redis. Returns [] if Redis is unavailable."""
r = await _redis()
try:
raw_list = await r.lrange(REDIS_QUERY_KEY, 0, 9999)
except Exception as exc:
logger.warning("analytics_redis_read_failed", extra={"error": str(exc)})
return []
finally:
await r.aclose()
cutoff = _cutoff(date_range)
events = []
for raw in raw_list:
try:
ev = json.loads(raw)
if cutoff:
ts = datetime.fromisoformat(ev.get("created_at", "1970-01-01T00:00:00"))
if ts < cutoff:
continue
events.append(ev)
except Exception:
continue
return events
# ---------------------------------------------------------------------------
# GET /api/analytics/queries
# ---------------------------------------------------------------------------
@router.get("/queries")
async def get_queries(date_range: DateRange = Query(default="30d")) -> dict:
events = await _load_events(date_range)
if not events:
return {
"query_count": 0,
"unique_users": 0,
"avg_response_time_ms": 0,
"success_rate": 0.0,
"trend": {"data": []},
}
total = len(events)
successful = sum(1 for e in events if e.get("success", True))
durations = [e["duration_ms"] for e in events if "duration_ms" in e]
avg_duration = int(sum(durations) / len(durations)) if durations else 0
users = {e.get("team_id", "unknown") for e in events}
daily: dict[str, int] = {}
for ev in events:
day = ev.get("created_at", "")[:10]
if day:
daily[day] = daily.get(day, 0) + 1
return {
"query_count": total,
"unique_users": len(users),
"avg_response_time_ms": avg_duration,
"success_rate": round(successful / total, 3),
"trend": {"data": [{"date": d, "count": c} for d, c in sorted(daily.items())]},
}
# ---------------------------------------------------------------------------
# GET /api/analytics/topics
# ---------------------------------------------------------------------------
@router.get("/topics")
async def get_topics(limit: int = Query(default=10, ge=1, le=50)) -> dict:
r = await _redis()
try:
results = await r.zrevrange("gs:topics", 0, limit - 1, withscores=True)
except Exception as exc:
logger.warning("topics_redis_read_failed", extra={"error": str(exc)})
return {"topics": []}
finally:
await r.aclose()
return {"topics": [{"topic": t, "count": int(c)} for t, c in results]}
# ---------------------------------------------------------------------------
# GET /api/analytics/knowledge-health
# ---------------------------------------------------------------------------
_EMPTY_DOMAINS = [
{"domain": label, "coverage": 0.0, "freshness": None, "accuracy": None, "score": 0.0}
for label in ["Service", "Library", "Incident", "Team"]
]
async def _compute_freshness() -> float | None:
"""Fraction of documents ingested/updated within the last 30 days.
Returns None if Supabase is unavailable or unconfigured."""
try:
from src.auth.db import _client as _sb_client
sb = _sb_client()
cutoff = (datetime.utcnow() - timedelta(days=30)).isoformat()
total_result = sb.table("documents").select("id", count="exact").execute()
recent_result = sb.table("documents").select("id", count="exact").gte("updated_at", cutoff).execute()
total = total_result.count or 0
recent = recent_result.count or 0
if total == 0:
return None
return round(recent / total, 2)
except Exception:
logger.warning("analytics: freshness query failed — returning None")
return None
@router.get("/knowledge-health")
async def get_knowledge_health() -> dict:
rows: list[dict] = []
try:
from graph_store.config import settings as neo4j_settings
from neo4j import AsyncGraphDatabase
driver = AsyncGraphDatabase.driver(
neo4j_settings.neo4j_uri,
auth=(neo4j_settings.neo4j_username, neo4j_settings.neo4j_password),
)
async with driver.session() as session:
result = await session.run(
"MATCH (n) RETURN labels(n)[0] AS label, count(n) AS cnt"
)
rows = await result.data()
await driver.close()
except Exception as exc:
logger.warning("knowledge_health_neo4j_error", extra={"error": str(exc)})
counts: dict[str, int] = {r["label"]: r["cnt"] for r in rows if r.get("label")}
total_nodes = sum(counts.values()) or 1
freshness = await _compute_freshness()
def _score(label: str) -> dict:
cnt = counts.get(label, 0)
coverage = min(1.0, cnt / max(total_nodes * 0.25, 1))
parts = [coverage]
if freshness is not None:
parts.append(freshness)
return {
"domain": label,
"coverage": round(coverage, 2),
"freshness": freshness,
"accuracy": None,
"score": round(sum(parts) / len(parts), 2),
}
domains = [_score(lbl) for lbl in ["Service", "Library", "Incident", "Team"]]
if not any(d["coverage"] > 0 for d in domains):
domains = _EMPTY_DOMAINS
overall = round(sum(d["score"] for d in domains) / len(domains), 2)
return {"overall_score": overall, "domains": domains}
# ---------------------------------------------------------------------------
# GET /api/analytics/dependencies
# ---------------------------------------------------------------------------
@router.get("/dependencies")
async def get_dependencies() -> dict:
rows: list[dict] = []
try:
from graph_store.config import settings as neo4j_settings
from neo4j import AsyncGraphDatabase
driver = AsyncGraphDatabase.driver(
neo4j_settings.neo4j_uri,
auth=(neo4j_settings.neo4j_username, neo4j_settings.neo4j_password),
)
async with driver.session() as session:
result = await session.run(
"""
MATCH (n)
WHERE n:Service OR n:Library
RETURN labels(n)[0] AS type,
n.name AS name,
coalesce(n.version, '0.0.0') AS current_version,
coalesce(n.latest_version, '0.0.0') AS latest_version,
coalesce(n.breaking_change, false) AS breaking_change,
coalesce(n.team, 'unknown') AS team
LIMIT 100
"""
)
rows = await result.data()
await driver.close()
except Exception as exc:
logger.warning("dependencies_neo4j_error", extra={"error": str(exc)})
now = datetime.utcnow().isoformat()
deps = []
for r in rows:
name = r.get("name")
if not name:
continue # skip nodes without a name property
deps.append({
"name": str(name),
"type": (r.get("type") or "service").lower(),
"current_version": r.get("current_version", "0.0.0"),
"latest_version": r.get("latest_version", "0.0.0"),
"breaking_change": bool(r.get("breaking_change", False)),
"teams": [str(r.get("team", "unknown"))],
"last_checked": now,
})
return {"dependencies": deps}
# ---------------------------------------------------------------------------
# GET /api/analytics/coverage-gaps
# ---------------------------------------------------------------------------
@router.get("/coverage-gaps")
async def get_coverage_gaps() -> dict:
"""Least-documented entities per label — shows where ingestion has the highest impact."""
rows: list[dict] = []
try:
from graph_store.writer import get_driver
driver = get_driver()
async with driver.session() as session:
result = await session.run("""
MATCH (n)
WHERE n:Service OR n:Library OR n:Incident OR n:Team
OPTIONAL MATCH (n)<-[:MENTIONS|REFERENCES]-(c:Chunk)
RETURN labels(n)[0] AS label,
n.name AS name,
count(c) AS mention_count
ORDER BY mention_count ASC, label ASC
LIMIT 60
""")
rows = await result.data()
except Exception as exc:
logger.warning("coverage_gaps_neo4j_error", extra={"error": str(exc)})
gaps: dict[str, list[dict]] = {"Service": [], "Library": [], "Incident": [], "Team": []}
for r in rows:
label = r.get("label")
name = r.get("name")
if label in gaps and name:
gaps[label].append({"name": name, "mention_count": r.get("mention_count", 0)})
return {"gaps": gaps}
# ---------------------------------------------------------------------------
# GET /api/analytics/escalations
# ---------------------------------------------------------------------------
@router.get("/escalations")
async def get_escalations(limit: int = Query(default=50, ge=1, le=200)) -> dict:
r = await _redis()
try:
raw_list = await r.lrange("gs:escalations", 0, limit - 1)
total = await r.llen("gs:escalations")
except Exception as exc:
logger.warning("escalations_redis_read_failed", extra={"error": str(exc)})
return {"escalations": [], "total": 0}
finally:
await r.aclose()
escalations = []
for raw in raw_list:
try:
escalations.append(json.loads(raw))
except Exception:
continue
return {"escalations": escalations, "total": total}
# ---------------------------------------------------------------------------
# GET /api/analytics/export
# ---------------------------------------------------------------------------
@router.get("/export")
async def export_analytics(
scope: str = Query(default="full"),
format: str = Query(default="csv"),
date_range: DateRange = Query(default="30d"),
) -> StreamingResponse:
events = await _load_events(date_range)
if format == "csv":
buf = io.StringIO()
writer = csv.DictWriter(
buf,
fieldnames=["created_at", "query", "team_id", "success", "duration_ms"],
)
writer.writeheader()
for ev in events:
writer.writerow({
"created_at": ev.get("created_at", ""),
"query": ev.get("query", ""),
"team_id": ev.get("team_id", ""),
"success": ev.get("success", True),
"duration_ms": ev.get("duration_ms", 0),
})
content = buf.getvalue().encode("utf-8")
media = "text/csv"
suffix = "csv"
else:
# PDF renderer (e.g. weasyprint) not yet wired — return CSV fallback
content = b"PDF export not yet implemented. Use format=csv.\n"
media = "text/plain"
suffix = "txt"
filename = f"godspeed-{scope}-{date_range}.{suffix}"
return StreamingResponse(
iter([content]),
media_type=media,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)