IntegraChat / backend /workers /analytics_worker.py
nothingworry's picture
feat: add knowledge base management and analytics dashboard
aa63765
raw
history blame
5.62 kB
"""
backend/workers/analytics_worker.py
Analytics & evaluation background tasks:
- compute_daily_metrics: aggregates logs into per-tenant daily metrics
- compute_rag_quality: synthetic RAG evaluation (precision@k style)
- export_metrics_to_supabase: writes aggregated metrics to a supabase analytics table
"""
import os
import logging
import time
from typing import Dict, Any, List
try:
from backend.workers.celeryconfig import celery_app
except Exception:
celery_app = None
logger = logging.getLogger("analytics_worker")
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
# Placeholder: hook to your logging/event store. Replace with concrete DB queries
def fetch_raw_logs(since_ts: int = 0) -> List[Dict[str, Any]]:
"""
Fetch raw logs from the event store (Postgres / Supabase table).
Here we return dummy data if no DB configured.
"""
# In prod: query your events table for entries after since_ts
return [
{"tenant_id": "demo", "tool": "rag", "latency_ms": 120, "tokens": 12, "ts": since_ts + 10},
{"tenant_id": "demo", "tool": "web", "latency_ms": 70, "tokens": 0, "ts": since_ts + 20},
{"tenant_id": "demo", "tool": "llm", "latency_ms": 400, "tokens": 150, "ts": since_ts + 30},
]
def aggregate_logs(logs: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Simple aggregator by tenant: counts requests, tokens, latency per tool.
Returns a dict keyed by tenant_id.
"""
out = {}
for e in logs:
tid = e.get("tenant_id", "unknown")
out.setdefault(tid, {"requests": 0, "tokens": 0, "tools": {}, "total_latency_ms": 0})
out[tid]["requests"] += 1
out[tid]["tokens"] += e.get("tokens", 0)
out[tid]["total_latency_ms"] += e.get("latency_ms", 0)
tool = e.get("tool", "unknown")
out[tid]["tools"].setdefault(tool, {"count": 0, "latency_ms": 0})
out[tid]["tools"][tool]["count"] += 1
out[tid]["tools"][tool]["latency_ms"] += e.get("latency_ms", 0)
return out
def write_metrics_supabase(metrics: Dict[str, Dict[str, Any]]):
"""
Writes aggregated metrics to supabase analytics table. Falls back to logging if supabase not configured.
"""
from backend.workers.ingestion_worker import SUPABASE
if SUPABASE is None:
logger.info("Supabase not configured; metrics:\n%s", metrics)
return {"status": "logged", "metrics_count": len(metrics)}
table = os.getenv("SUPABASE_ANALYTICS_TABLE", "analytics_daily")
rows = []
ts = int(time.time())
for tenant_id, data in metrics.items():
rows.append({
"tenant_id": tenant_id,
"date": time.strftime("%Y-%m-%d", time.gmtime(ts)),
"requests": data["requests"],
"tokens": data["tokens"],
"total_latency_ms": data["total_latency_ms"],
"tools": data["tools"],
})
try:
res = SUPABASE.table(table).upsert(rows).execute()
logger.info("Wrote %d metrics rows to supabase table %s", len(rows), table)
return {"status": "ok", "count": len(rows), "result": res}
except Exception as e:
logger.exception("Failed to write metrics to supabase: %s", e)
return {"status": "error", "error": str(e)}
# Celery decorator
def task_decorator(func):
if celery_app is not None:
return celery_app.task(func)
else:
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@task_decorator
def compute_daily_metrics(window_seconds: int = 86400):
"""
Compute daily metrics by fetching raw logs and aggregating.
"""
logger.info("compute_daily_metrics started; window=%s", window_seconds)
since_ts = int(time.time()) - window_seconds
logs = fetch_raw_logs(since_ts=since_ts)
metrics = aggregate_logs(logs)
res = write_metrics_supabase(metrics)
logger.info("compute_daily_metrics finished; res=%s", res)
return res
@task_decorator
def compute_rag_quality(sample_size: int = 20, top_k: int = 3):
"""
Synthetic RAG evaluation:
- sample random chunks (or seed queries)
- ask the RAG server to search
- compute simple precision@k using exact string matching heuristic
"""
logger.info("compute_rag_quality started; sample_size=%d top_k=%d", sample_size, top_k)
# In production: sample from DB. Here we simulate a simple check using mock data or minimal queries.
try:
# fake sample
sample_queries = ["What is our HR policy?", "What is refund policy?"] * (sample_size // 2)
results = []
# Try to call rag MCP if available
from backend.api.mcp_clients.rag_client import RagClient
rag = RagClient()
for q in sample_queries:
try:
r = rag.search({"tenant_id": "demo", "query": q})
# r should contain scored results; we use heuristics: check if any item contains a keyword
top_texts = [it.get("text", "") for it in r.get("results", [])[:top_k]]
success = any("policy" in t.lower() for t in top_texts)
results.append({"query": q, "success": success})
except Exception:
results.append({"query": q, "success": False})
precision = sum(1 for r in results if r["success"]) / max(1, len(results))
logger.info("compute_rag_quality precision=%.3f", precision)
return {"precision_at_k": precision, "sample": len(results)}
except Exception as e:
logger.exception("compute_rag_quality failed: %s", e)
return {"error": str(e)}