| """ |
| Telemetry: in-memory counters + HF Dataset persistence. |
| |
| In-memory β rolling deque(500), per-metric pass counters, avg latency. |
| Resets on restart. Powers /metrics (live session stats). |
| |
| Persistent β events flushed as JSONL shards to TELEMETRY_REPO HF dataset |
| every FLUSH_EVERY queries. Powers /report (accumulated history). |
| Falls back to in-memory if TELEMETRY_REPO is unset or write fails. |
| |
| TELEMETRY_REPO is inferred from HF Spaces env vars: |
| SPACE_AUTHOR_NAME + SPACE_REPO_NAME β {author}/{repo}-telemetry |
| Override with explicit TELEMETRY_REPO env var. |
| """ |
|
|
| import json |
| import logging |
| import os |
| import threading |
| from collections import deque, defaultdict |
| from datetime import datetime, timezone |
|
|
| from grader import GradeReport |
|
|
| log = logging.getLogger(__name__) |
|
|
| BUFFER_SIZE = 500 |
| FLUSH_EVERY = 20 |
|
|
| _lock = threading.Lock() |
| _events: deque = deque(maxlen=BUFFER_SIZE) |
| _unflushed: list = [] |
| _counters: dict = defaultdict(float) |
|
|
| _space_author = os.environ.get("SPACE_AUTHOR_NAME", "") |
| _space_repo = os.environ.get("SPACE_REPO_NAME", "ai-response-validator") |
| TELEMETRY_REPO = os.environ.get( |
| "TELEMETRY_REPO", |
| f"{_space_author}/{_space_repo}-telemetry" if _space_author else "", |
| ) |
|
|
| _METRICS = ["pii_leakage", "token_budget", "answer_relevancy", "faithfulness", "chain_terminology"] |
|
|
|
|
| def record( |
| client: str, |
| domain: str, |
| query_len: int, |
| latency_ms: dict[str, float], |
| report: GradeReport, |
| docs_retrieved: int, |
| min_retrieval_score: float, |
| ) -> None: |
| """Record one query event. Thread-safe. Flushes to HF dataset in background.""" |
| event = { |
| "ts": datetime.now(timezone.utc).isoformat(), |
| "client": client, |
| "domain": domain, |
| "query_len": query_len, |
| "latency_ms": {k: round(v) for k, v in latency_ms.items()}, |
| "metrics": {r.metric: round(r.score, 4) for r in report.results}, |
| "metric_passed": {r.metric: r.passed for r in report.results}, |
| "overall_pass": report.overall, |
| "docs_retrieved": docs_retrieved, |
| "min_retrieval_score": round(min_retrieval_score, 4), |
| } |
| with _lock: |
| _events.append(event) |
| _unflushed.append(event) |
| _counters["total"] += 1 |
| if report.overall: |
| _counters["overall_pass"] += 1 |
| for r in report.results: |
| _counters[f"{r.metric}_total"] += 1 |
| if r.passed: |
| _counters[f"{r.metric}_pass"] += 1 |
| for stage, ms in latency_ms.items(): |
| _counters[f"lat_{stage}_sum"] += ms |
| _counters[f"lat_{stage}_n"] += 1 |
| should_flush = len(_unflushed) >= FLUSH_EVERY |
|
|
| if should_flush and TELEMETRY_REPO: |
| threading.Thread(target=_flush, daemon=True).start() |
|
|
|
|
| def live_stats() -> dict: |
| """In-memory aggregate for the current session (/metrics endpoint).""" |
| with _lock: |
| total = int(_counters.get("total", 0)) |
| if total == 0: |
| return {"total_queries": 0, "message": "No queries recorded this session."} |
|
|
| metric_stats = {} |
| for m in _METRICS: |
| mt = int(_counters.get(f"{m}_total", 0)) |
| mp = int(_counters.get(f"{m}_pass", 0)) |
| metric_stats[m] = { |
| "pass_rate": round(mp / mt, 3) if mt else None, |
| "pass_count": mp, |
| "total": mt, |
| } |
|
|
| avg_latency = {} |
| for stage in ("retrieve", "generate", "grade"): |
| n = _counters.get(f"lat_{stage}_n", 0) |
| if n: |
| avg_latency[stage] = round(_counters[f"lat_{stage}_sum"] / n) |
|
|
| return { |
| "source": "in_memory", |
| "total_queries": total, |
| "overall_pass_rate": round(_counters.get("overall_pass", 0) / total, 3), |
| "metrics": metric_stats, |
| "avg_latency_ms": avg_latency, |
| "events_in_buffer": len(_events), |
| "telemetry_repo": TELEMETRY_REPO or None, |
| } |
|
|
|
|
| def persistent_report() -> dict: |
| """Aggregate from HF Dataset shards (/report endpoint). Falls back to live_stats.""" |
| if not TELEMETRY_REPO: |
| log.info("TELEMETRY_REPO not set β report from in-memory only") |
| return {"source": "in_memory", **live_stats()} |
|
|
| try: |
| from huggingface_hub import HfApi |
| hf_token = os.environ.get("HF_TOKEN") |
| api = HfApi(token=hf_token) |
|
|
| files = api.list_repo_files(TELEMETRY_REPO, repo_type="dataset") |
| shard_paths = [f for f in files if f.startswith("events/") and f.endswith(".jsonl")] |
| if not shard_paths: |
| return {"source": "hf_dataset", "repo": TELEMETRY_REPO, |
| "message": "No shards yet β data accumulates after first flush."} |
|
|
| events = [] |
| for path in shard_paths: |
| content = api.hf_hub_download( |
| TELEMETRY_REPO, path, repo_type="dataset", token=hf_token, |
| ) |
| with open(content) as f: |
| for line in f: |
| if line.strip(): |
| events.append(json.loads(line)) |
|
|
| if not events: |
| return {"source": "hf_dataset", "repo": TELEMETRY_REPO, "total_events": 0} |
|
|
| total = len(events) |
| overall_pass = sum(1 for e in events if e.get("overall_pass")) |
|
|
| metric_stats = {} |
| for m in _METRICS: |
| passed = sum(1 for e in events if e.get("metric_passed", {}).get(m)) |
| scores = [e["metrics"][m] for e in events if m in e.get("metrics", {})] |
| metric_stats[m] = { |
| "pass_rate": round(passed / total, 3), |
| "avg_score": round(sum(scores) / len(scores), 3) if scores else None, |
| } |
|
|
| client_breakdown = defaultdict(lambda: {"total": 0, "pass": 0}) |
| for e in events: |
| c = e.get("client", "unknown") |
| client_breakdown[c]["total"] += 1 |
| if e.get("overall_pass"): |
| client_breakdown[c]["pass"] += 1 |
|
|
| return { |
| "source": "hf_dataset", |
| "repo": TELEMETRY_REPO, |
| "total_queries": total, |
| "overall_pass_rate": round(overall_pass / total, 3), |
| "first_event": min(e["ts"] for e in events), |
| "last_event": max(e["ts"] for e in events), |
| "metrics": metric_stats, |
| "by_client": { |
| c: {"total": v["total"], "pass_rate": round(v["pass"] / v["total"], 3)} |
| for c, v in client_breakdown.items() |
| }, |
| "shards_read": len(shard_paths), |
| } |
|
|
| except Exception as e: |
| log.warning("HF Dataset report failed (%s) β falling back to in-memory", e) |
| return {"source": "in_memory_fallback", **live_stats()} |
|
|
|
|
| def _flush() -> None: |
| """Upload buffered events to HF Dataset as a JSONL shard. Runs in background thread.""" |
| with _lock: |
| if not _unflushed: |
| return |
| batch = list(_unflushed) |
| _unflushed.clear() |
|
|
| try: |
| from huggingface_hub import HfApi |
| hf_token = os.environ.get("HF_TOKEN") |
| if not hf_token: |
| log.warning("HF_TOKEN not set β telemetry flush skipped") |
| return |
|
|
| api = HfApi(token=hf_token) |
| try: |
| api.create_repo(TELEMETRY_REPO, repo_type="dataset", exist_ok=True, private=False) |
| except Exception: |
| pass |
|
|
| ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f") |
| content = "\n".join(json.dumps(e) for e in batch).encode() |
| api.upload_file( |
| path_or_fileobj=content, |
| path_in_repo=f"events/shard_{ts}.jsonl", |
| repo_id=TELEMETRY_REPO, |
| repo_type="dataset", |
| ) |
| log.info("Flushed %d telemetry events to %s", len(batch), TELEMETRY_REPO) |
|
|
| except Exception as e: |
| log.warning("Telemetry flush failed: %s β events returned to buffer", e) |
| with _lock: |
| _unflushed.extend(batch) |
|
|