Spaces:
Sleeping
Sleeping
File size: 5,620 Bytes
aa63765 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
"""
backend/workers/analytics_worker.py
Analytics & evaluation background tasks:
- compute_daily_metrics: aggregates logs into per-tenant daily metrics
- compute_rag_quality: synthetic RAG evaluation (precision@k style)
- export_metrics_to_supabase: writes aggregated metrics to a supabase analytics table
"""
import os
import logging
import time
from typing import Dict, Any, List
try:
from backend.workers.celeryconfig import celery_app
except Exception:
celery_app = None
logger = logging.getLogger("analytics_worker")
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))
# Placeholder: hook to your logging/event store. Replace with concrete DB queries
def fetch_raw_logs(since_ts: int = 0) -> List[Dict[str, Any]]:
"""
Fetch raw logs from the event store (Postgres / Supabase table).
Here we return dummy data if no DB configured.
"""
# In prod: query your events table for entries after since_ts
return [
{"tenant_id": "demo", "tool": "rag", "latency_ms": 120, "tokens": 12, "ts": since_ts + 10},
{"tenant_id": "demo", "tool": "web", "latency_ms": 70, "tokens": 0, "ts": since_ts + 20},
{"tenant_id": "demo", "tool": "llm", "latency_ms": 400, "tokens": 150, "ts": since_ts + 30},
]
def aggregate_logs(logs: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Simple aggregator by tenant: counts requests, tokens, latency per tool.
Returns a dict keyed by tenant_id.
"""
out = {}
for e in logs:
tid = e.get("tenant_id", "unknown")
out.setdefault(tid, {"requests": 0, "tokens": 0, "tools": {}, "total_latency_ms": 0})
out[tid]["requests"] += 1
out[tid]["tokens"] += e.get("tokens", 0)
out[tid]["total_latency_ms"] += e.get("latency_ms", 0)
tool = e.get("tool", "unknown")
out[tid]["tools"].setdefault(tool, {"count": 0, "latency_ms": 0})
out[tid]["tools"][tool]["count"] += 1
out[tid]["tools"][tool]["latency_ms"] += e.get("latency_ms", 0)
return out
def write_metrics_supabase(metrics: Dict[str, Dict[str, Any]]):
"""
Writes aggregated metrics to supabase analytics table. Falls back to logging if supabase not configured.
"""
from backend.workers.ingestion_worker import SUPABASE
if SUPABASE is None:
logger.info("Supabase not configured; metrics:\n%s", metrics)
return {"status": "logged", "metrics_count": len(metrics)}
table = os.getenv("SUPABASE_ANALYTICS_TABLE", "analytics_daily")
rows = []
ts = int(time.time())
for tenant_id, data in metrics.items():
rows.append({
"tenant_id": tenant_id,
"date": time.strftime("%Y-%m-%d", time.gmtime(ts)),
"requests": data["requests"],
"tokens": data["tokens"],
"total_latency_ms": data["total_latency_ms"],
"tools": data["tools"],
})
try:
res = SUPABASE.table(table).upsert(rows).execute()
logger.info("Wrote %d metrics rows to supabase table %s", len(rows), table)
return {"status": "ok", "count": len(rows), "result": res}
except Exception as e:
logger.exception("Failed to write metrics to supabase: %s", e)
return {"status": "error", "error": str(e)}
# Celery decorator
def task_decorator(func):
if celery_app is not None:
return celery_app.task(func)
else:
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@task_decorator
def compute_daily_metrics(window_seconds: int = 86400):
"""
Compute daily metrics by fetching raw logs and aggregating.
"""
logger.info("compute_daily_metrics started; window=%s", window_seconds)
since_ts = int(time.time()) - window_seconds
logs = fetch_raw_logs(since_ts=since_ts)
metrics = aggregate_logs(logs)
res = write_metrics_supabase(metrics)
logger.info("compute_daily_metrics finished; res=%s", res)
return res
@task_decorator
def compute_rag_quality(sample_size: int = 20, top_k: int = 3):
"""
Synthetic RAG evaluation:
- sample random chunks (or seed queries)
- ask the RAG server to search
- compute simple precision@k using exact string matching heuristic
"""
logger.info("compute_rag_quality started; sample_size=%d top_k=%d", sample_size, top_k)
# In production: sample from DB. Here we simulate a simple check using mock data or minimal queries.
try:
# fake sample
sample_queries = ["What is our HR policy?", "What is refund policy?"] * (sample_size // 2)
results = []
# Try to call rag MCP if available
from backend.api.mcp_clients.rag_client import RagClient
rag = RagClient()
for q in sample_queries:
try:
r = rag.search({"tenant_id": "demo", "query": q})
# r should contain scored results; we use heuristics: check if any item contains a keyword
top_texts = [it.get("text", "") for it in r.get("results", [])[:top_k]]
success = any("policy" in t.lower() for t in top_texts)
results.append({"query": q, "success": success})
except Exception:
results.append({"query": q, "success": False})
precision = sum(1 for r in results if r["success"]) / max(1, len(results))
logger.info("compute_rag_quality precision=%.3f", precision)
return {"precision_at_k": precision, "sample": len(results)}
except Exception as e:
logger.exception("compute_rag_quality failed: %s", e)
return {"error": str(e)}
|