""" backend/workers/analytics_worker.py Analytics & evaluation background tasks: - compute_daily_metrics: aggregates logs into per-tenant daily metrics - compute_rag_quality: synthetic RAG evaluation (precision@k style) - export_metrics_to_supabase: writes aggregated metrics to a supabase analytics table """ import os import logging import time from typing import Dict, Any, List try: from backend.workers.celeryconfig import celery_app except Exception: celery_app = None logger = logging.getLogger("analytics_worker") logger.setLevel(os.getenv("LOG_LEVEL", "INFO")) # Placeholder: hook to your logging/event store. Replace with concrete DB queries def fetch_raw_logs(since_ts: int = 0) -> List[Dict[str, Any]]: """ Fetch raw logs from the event store (Postgres / Supabase table). Here we return dummy data if no DB configured. """ # In prod: query your events table for entries after since_ts return [ {"tenant_id": "demo", "tool": "rag", "latency_ms": 120, "tokens": 12, "ts": since_ts + 10}, {"tenant_id": "demo", "tool": "web", "latency_ms": 70, "tokens": 0, "ts": since_ts + 20}, {"tenant_id": "demo", "tool": "llm", "latency_ms": 400, "tokens": 150, "ts": since_ts + 30}, ] def aggregate_logs(logs: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """ Simple aggregator by tenant: counts requests, tokens, latency per tool. Returns a dict keyed by tenant_id. """ out = {} for e in logs: tid = e.get("tenant_id", "unknown") out.setdefault(tid, {"requests": 0, "tokens": 0, "tools": {}, "total_latency_ms": 0}) out[tid]["requests"] += 1 out[tid]["tokens"] += e.get("tokens", 0) out[tid]["total_latency_ms"] += e.get("latency_ms", 0) tool = e.get("tool", "unknown") out[tid]["tools"].setdefault(tool, {"count": 0, "latency_ms": 0}) out[tid]["tools"][tool]["count"] += 1 out[tid]["tools"][tool]["latency_ms"] += e.get("latency_ms", 0) return out def write_metrics_supabase(metrics: Dict[str, Dict[str, Any]]): """ Writes aggregated metrics to supabase analytics table. Falls back to logging if supabase not configured. """ from backend.workers.ingestion_worker import SUPABASE if SUPABASE is None: logger.info("Supabase not configured; metrics:\n%s", metrics) return {"status": "logged", "metrics_count": len(metrics)} table = os.getenv("SUPABASE_ANALYTICS_TABLE", "analytics_daily") rows = [] ts = int(time.time()) for tenant_id, data in metrics.items(): rows.append({ "tenant_id": tenant_id, "date": time.strftime("%Y-%m-%d", time.gmtime(ts)), "requests": data["requests"], "tokens": data["tokens"], "total_latency_ms": data["total_latency_ms"], "tools": data["tools"], }) try: res = SUPABASE.table(table).upsert(rows).execute() logger.info("Wrote %d metrics rows to supabase table %s", len(rows), table) return {"status": "ok", "count": len(rows), "result": res} except Exception as e: logger.exception("Failed to write metrics to supabase: %s", e) return {"status": "error", "error": str(e)} # Celery decorator def task_decorator(func): if celery_app is not None: return celery_app.task(func) else: def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper @task_decorator def compute_daily_metrics(window_seconds: int = 86400): """ Compute daily metrics by fetching raw logs and aggregating. """ logger.info("compute_daily_metrics started; window=%s", window_seconds) since_ts = int(time.time()) - window_seconds logs = fetch_raw_logs(since_ts=since_ts) metrics = aggregate_logs(logs) res = write_metrics_supabase(metrics) logger.info("compute_daily_metrics finished; res=%s", res) return res @task_decorator def compute_rag_quality(sample_size: int = 20, top_k: int = 3): """ Synthetic RAG evaluation: - sample random chunks (or seed queries) - ask the RAG server to search - compute simple precision@k using exact string matching heuristic """ logger.info("compute_rag_quality started; sample_size=%d top_k=%d", sample_size, top_k) # In production: sample from DB. Here we simulate a simple check using mock data or minimal queries. try: # fake sample sample_queries = ["What is our HR policy?", "What is refund policy?"] * (sample_size // 2) results = [] # Try to call rag MCP if available from backend.api.mcp_clients.rag_client import RagClient rag = RagClient() for q in sample_queries: try: r = rag.search({"tenant_id": "demo", "query": q}) # r should contain scored results; we use heuristics: check if any item contains a keyword top_texts = [it.get("text", "") for it in r.get("results", [])[:top_k]] success = any("policy" in t.lower() for t in top_texts) results.append({"query": q, "success": success}) except Exception: results.append({"query": q, "success": False}) precision = sum(1 for r in results if r["success"]) / max(1, len(results)) logger.info("compute_rag_quality precision=%.3f", precision) return {"precision_at_k": precision, "sample": len(results)} except Exception as e: logger.exception("compute_rag_quality failed: %s", e) return {"error": str(e)}