File size: 5,620 Bytes
aa63765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
backend/workers/analytics_worker.py

Analytics & evaluation background tasks:
 - compute_daily_metrics: aggregates logs into per-tenant daily metrics
 - compute_rag_quality: synthetic RAG evaluation (precision@k style)
 - export_metrics_to_supabase: writes aggregated metrics to a supabase analytics table
"""

import os
import logging
import time
from typing import Dict, Any, List

try:
    from backend.workers.celeryconfig import celery_app
except Exception:
    celery_app = None

logger = logging.getLogger("analytics_worker")
logger.setLevel(os.getenv("LOG_LEVEL", "INFO"))

# Placeholder: hook to your logging/event store. Replace with concrete DB queries
def fetch_raw_logs(since_ts: int = 0) -> List[Dict[str, Any]]:
    """
    Fetch raw logs from the event store (Postgres / Supabase table).
    Here we return dummy data if no DB configured.
    """
    # In prod: query your events table for entries after since_ts
    return [
        {"tenant_id": "demo", "tool": "rag", "latency_ms": 120, "tokens": 12, "ts": since_ts + 10},
        {"tenant_id": "demo", "tool": "web", "latency_ms": 70, "tokens": 0, "ts": since_ts + 20},
        {"tenant_id": "demo", "tool": "llm", "latency_ms": 400, "tokens": 150, "ts": since_ts + 30},
    ]


def aggregate_logs(logs: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
    """
    Simple aggregator by tenant: counts requests, tokens, latency per tool.
    Returns a dict keyed by tenant_id.
    """
    out = {}
    for e in logs:
        tid = e.get("tenant_id", "unknown")
        out.setdefault(tid, {"requests": 0, "tokens": 0, "tools": {}, "total_latency_ms": 0})
        out[tid]["requests"] += 1
        out[tid]["tokens"] += e.get("tokens", 0)
        out[tid]["total_latency_ms"] += e.get("latency_ms", 0)
        tool = e.get("tool", "unknown")
        out[tid]["tools"].setdefault(tool, {"count": 0, "latency_ms": 0})
        out[tid]["tools"][tool]["count"] += 1
        out[tid]["tools"][tool]["latency_ms"] += e.get("latency_ms", 0)
    return out


def write_metrics_supabase(metrics: Dict[str, Dict[str, Any]]):
    """
    Writes aggregated metrics to supabase analytics table. Falls back to logging if supabase not configured.
    """
    from backend.workers.ingestion_worker import SUPABASE
    if SUPABASE is None:
        logger.info("Supabase not configured; metrics:\n%s", metrics)
        return {"status": "logged", "metrics_count": len(metrics)}

    table = os.getenv("SUPABASE_ANALYTICS_TABLE", "analytics_daily")
    rows = []
    ts = int(time.time())
    for tenant_id, data in metrics.items():
        rows.append({
            "tenant_id": tenant_id,
            "date": time.strftime("%Y-%m-%d", time.gmtime(ts)),
            "requests": data["requests"],
            "tokens": data["tokens"],
            "total_latency_ms": data["total_latency_ms"],
            "tools": data["tools"],
        })
    try:
        res = SUPABASE.table(table).upsert(rows).execute()
        logger.info("Wrote %d metrics rows to supabase table %s", len(rows), table)
        return {"status": "ok", "count": len(rows), "result": res}
    except Exception as e:
        logger.exception("Failed to write metrics to supabase: %s", e)
        return {"status": "error", "error": str(e)}


# Celery decorator
def task_decorator(func):
    if celery_app is not None:
        return celery_app.task(func)
    else:
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper


@task_decorator
def compute_daily_metrics(window_seconds: int = 86400):
    """
    Compute daily metrics by fetching raw logs and aggregating.
    """
    logger.info("compute_daily_metrics started; window=%s", window_seconds)
    since_ts = int(time.time()) - window_seconds
    logs = fetch_raw_logs(since_ts=since_ts)
    metrics = aggregate_logs(logs)
    res = write_metrics_supabase(metrics)
    logger.info("compute_daily_metrics finished; res=%s", res)
    return res


@task_decorator
def compute_rag_quality(sample_size: int = 20, top_k: int = 3):
    """
    Synthetic RAG evaluation:
     - sample random chunks (or seed queries)
     - ask the RAG server to search
     - compute simple precision@k using exact string matching heuristic
    """
    logger.info("compute_rag_quality started; sample_size=%d top_k=%d", sample_size, top_k)

    # In production: sample from DB. Here we simulate a simple check using mock data or minimal queries.
    try:
        # fake sample
        sample_queries = ["What is our HR policy?", "What is refund policy?"] * (sample_size // 2)
        results = []
        # Try to call rag MCP if available
        from backend.api.mcp_clients.rag_client import RagClient
        rag = RagClient()
        for q in sample_queries:
            try:
                r = rag.search({"tenant_id": "demo", "query": q})
                # r should contain scored results; we use heuristics: check if any item contains a keyword
                top_texts = [it.get("text", "") for it in r.get("results", [])[:top_k]]
                success = any("policy" in t.lower() for t in top_texts)
                results.append({"query": q, "success": success})
            except Exception:
                results.append({"query": q, "success": False})
        precision = sum(1 for r in results if r["success"]) / max(1, len(results))
        logger.info("compute_rag_quality precision=%.3f", precision)
        return {"precision_at_k": precision, "sample": len(results)}
    except Exception as e:
        logger.exception("compute_rag_quality failed: %s", e)
        return {"error": str(e)}