Spaces:
Sleeping
Sleeping
| """ | |
| backend/workers/analytics_worker.py | |
| Analytics & evaluation background tasks: | |
| - compute_daily_metrics: aggregates logs into per-tenant daily metrics | |
| - compute_rag_quality: synthetic RAG evaluation (precision@k style) | |
| - export_metrics_to_supabase: writes aggregated metrics to a supabase analytics table | |
| """ | |
| import os | |
| import logging | |
| import time | |
| from typing import Dict, Any, List | |
| try: | |
| from backend.workers.celeryconfig import celery_app | |
| except Exception: | |
| celery_app = None | |
| logger = logging.getLogger("analytics_worker") | |
| logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) | |
| # Placeholder: hook to your logging/event store. Replace with concrete DB queries | |
| def fetch_raw_logs(since_ts: int = 0) -> List[Dict[str, Any]]: | |
| """ | |
| Fetch raw logs from the event store (Postgres / Supabase table). | |
| Here we return dummy data if no DB configured. | |
| """ | |
| # In prod: query your events table for entries after since_ts | |
| return [ | |
| {"tenant_id": "demo", "tool": "rag", "latency_ms": 120, "tokens": 12, "ts": since_ts + 10}, | |
| {"tenant_id": "demo", "tool": "web", "latency_ms": 70, "tokens": 0, "ts": since_ts + 20}, | |
| {"tenant_id": "demo", "tool": "llm", "latency_ms": 400, "tokens": 150, "ts": since_ts + 30}, | |
| ] | |
| def aggregate_logs(logs: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Simple aggregator by tenant: counts requests, tokens, latency per tool. | |
| Returns a dict keyed by tenant_id. | |
| """ | |
| out = {} | |
| for e in logs: | |
| tid = e.get("tenant_id", "unknown") | |
| out.setdefault(tid, {"requests": 0, "tokens": 0, "tools": {}, "total_latency_ms": 0}) | |
| out[tid]["requests"] += 1 | |
| out[tid]["tokens"] += e.get("tokens", 0) | |
| out[tid]["total_latency_ms"] += e.get("latency_ms", 0) | |
| tool = e.get("tool", "unknown") | |
| out[tid]["tools"].setdefault(tool, {"count": 0, "latency_ms": 0}) | |
| out[tid]["tools"][tool]["count"] += 1 | |
| out[tid]["tools"][tool]["latency_ms"] += e.get("latency_ms", 0) | |
| return out | |
| def write_metrics_supabase(metrics: Dict[str, Dict[str, Any]]): | |
| """ | |
| Writes aggregated metrics to supabase analytics table. Falls back to logging if supabase not configured. | |
| """ | |
| from backend.workers.ingestion_worker import SUPABASE | |
| if SUPABASE is None: | |
| logger.info("Supabase not configured; metrics:\n%s", metrics) | |
| return {"status": "logged", "metrics_count": len(metrics)} | |
| table = os.getenv("SUPABASE_ANALYTICS_TABLE", "analytics_daily") | |
| rows = [] | |
| ts = int(time.time()) | |
| for tenant_id, data in metrics.items(): | |
| rows.append({ | |
| "tenant_id": tenant_id, | |
| "date": time.strftime("%Y-%m-%d", time.gmtime(ts)), | |
| "requests": data["requests"], | |
| "tokens": data["tokens"], | |
| "total_latency_ms": data["total_latency_ms"], | |
| "tools": data["tools"], | |
| }) | |
| try: | |
| res = SUPABASE.table(table).upsert(rows).execute() | |
| logger.info("Wrote %d metrics rows to supabase table %s", len(rows), table) | |
| return {"status": "ok", "count": len(rows), "result": res} | |
| except Exception as e: | |
| logger.exception("Failed to write metrics to supabase: %s", e) | |
| return {"status": "error", "error": str(e)} | |
| # Celery decorator | |
| def task_decorator(func): | |
| if celery_app is not None: | |
| return celery_app.task(func) | |
| else: | |
| def wrapper(*args, **kwargs): | |
| return func(*args, **kwargs) | |
| return wrapper | |
| def compute_daily_metrics(window_seconds: int = 86400): | |
| """ | |
| Compute daily metrics by fetching raw logs and aggregating. | |
| """ | |
| logger.info("compute_daily_metrics started; window=%s", window_seconds) | |
| since_ts = int(time.time()) - window_seconds | |
| logs = fetch_raw_logs(since_ts=since_ts) | |
| metrics = aggregate_logs(logs) | |
| res = write_metrics_supabase(metrics) | |
| logger.info("compute_daily_metrics finished; res=%s", res) | |
| return res | |
| def compute_rag_quality(sample_size: int = 20, top_k: int = 3): | |
| """ | |
| Synthetic RAG evaluation: | |
| - sample random chunks (or seed queries) | |
| - ask the RAG server to search | |
| - compute simple precision@k using exact string matching heuristic | |
| """ | |
| logger.info("compute_rag_quality started; sample_size=%d top_k=%d", sample_size, top_k) | |
| # In production: sample from DB. Here we simulate a simple check using mock data or minimal queries. | |
| try: | |
| # fake sample | |
| sample_queries = ["What is our HR policy?", "What is refund policy?"] * (sample_size // 2) | |
| results = [] | |
| # Try to call rag MCP if available | |
| from backend.api.mcp_clients.rag_client import RagClient | |
| rag = RagClient() | |
| for q in sample_queries: | |
| try: | |
| r = rag.search({"tenant_id": "demo", "query": q}) | |
| # r should contain scored results; we use heuristics: check if any item contains a keyword | |
| top_texts = [it.get("text", "") for it in r.get("results", [])[:top_k]] | |
| success = any("policy" in t.lower() for t in top_texts) | |
| results.append({"query": q, "success": success}) | |
| except Exception: | |
| results.append({"query": q, "success": False}) | |
| precision = sum(1 for r in results if r["success"]) / max(1, len(results)) | |
| logger.info("compute_rag_quality precision=%.3f", precision) | |
| return {"precision_at_k": precision, "sample": len(results)} | |
| except Exception as e: | |
| logger.exception("compute_rag_quality failed: %s", e) | |
| return {"error": str(e)} | |