Spaces:
Sleeping
Sleeping
| """ | |
| Cortex RAG β Evaluation Store (SQLite) | |
| Two tables: | |
| query_logs β one row per query: routing, CRAG grade, latency, chunk scores | |
| eval_metrics β one row per query: RAGAS scores (written async after generation) | |
| SQLite is the right choice here: zero infrastructure, works on Railway/Render | |
| out of the box, and a dashboard corpus of ~10k queries fits in <50MB. | |
| Swap to Postgres trivially later by changing the connection string. | |
| The store is intentionally append-only. No deletes, no updates. | |
| This preserves the full history for trend analysis in the dashboard. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import sqlite3 | |
| import time | |
| from contextlib import contextmanager | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| _DEFAULT_DB_PATH = Path("data/cortex_eval.db") | |
| # ββ Schema βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _DDL = """ | |
| CREATE TABLE IF NOT EXISTS query_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp REAL NOT NULL, | |
| query TEXT NOT NULL, | |
| intent TEXT, | |
| strategies TEXT, -- JSON list | |
| retriever_hits TEXT, -- JSON dict | |
| crag_grade TEXT, | |
| crag_rewritten INTEGER DEFAULT 0, -- bool | |
| web_search_used INTEGER DEFAULT 0, -- bool | |
| num_chunks INTEGER DEFAULT 0, | |
| top_chunk_score REAL DEFAULT 0.0, | |
| latency_ms REAL DEFAULT 0.0, | |
| model TEXT, | |
| extractor TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS eval_metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| query_log_id INTEGER NOT NULL REFERENCES query_logs(id), | |
| timestamp REAL NOT NULL, | |
| faithfulness REAL, -- 0-1: does answer contradict context? | |
| answer_relevancy REAL, -- 0-1: does answer address the question? | |
| context_precision REAL, -- 0-1: are retrieved chunks relevant? | |
| context_utilisation REAL, -- 0-1: fraction of chunks cited in answer | |
| mean_chunk_score REAL -- average retrieval score of final chunks | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_query_logs_ts ON query_logs(timestamp); | |
| CREATE INDEX IF NOT EXISTS idx_eval_metrics_id ON eval_metrics(query_log_id); | |
| """ | |
| # ββ Dataclasses ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class QueryLogEntry: | |
| query: str | |
| intent: str = "" | |
| strategies: list[str] = None | |
| retriever_hits: dict = None | |
| crag_grade: str = "" | |
| crag_rewritten: bool = False | |
| web_search_used: bool = False | |
| num_chunks: int = 0 | |
| top_chunk_score: float = 0.0 | |
| latency_ms: float = 0.0 | |
| model: str = "" | |
| extractor: str = "" | |
| def __post_init__(self): | |
| if self.strategies is None: | |
| self.strategies = [] | |
| if self.retriever_hits is None: | |
| self.retriever_hits = {} | |
| class EvalMetricEntry: | |
| query_log_id: int | |
| faithfulness: Optional[float] = None | |
| answer_relevancy: Optional[float] = None | |
| context_precision: Optional[float] = None | |
| context_utilisation: Optional[float] = None | |
| mean_chunk_score: Optional[float] = None | |
| # ββ Store ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class EvalStore: | |
| """ | |
| Thread-safe SQLite-backed store for query logs and eval metrics. | |
| Usage: | |
| store = EvalStore() | |
| log_id = store.log_query(entry) | |
| store.log_metrics(EvalMetricEntry(query_log_id=log_id, faithfulness=0.92, ...)) | |
| """ | |
| def __init__(self, db_path: str | Path = _DEFAULT_DB_PATH) -> None: | |
| self._path = Path(db_path) | |
| self._path.parent.mkdir(parents=True, exist_ok=True) | |
| self._init_db() | |
| # ββ Write ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_query(self, entry: QueryLogEntry) -> int: | |
| """Insert a query log row. Returns the new row id.""" | |
| with self._conn() as conn: | |
| cur = conn.execute( | |
| """INSERT INTO query_logs | |
| (timestamp, query, intent, strategies, retriever_hits, | |
| crag_grade, crag_rewritten, web_search_used, | |
| num_chunks, top_chunk_score, latency_ms, model, extractor) | |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", | |
| ( | |
| time.time(), | |
| entry.query, | |
| entry.intent, | |
| json.dumps(entry.strategies), | |
| json.dumps(entry.retriever_hits), | |
| entry.crag_grade, | |
| int(entry.crag_rewritten), | |
| int(entry.web_search_used), | |
| entry.num_chunks, | |
| entry.top_chunk_score, | |
| entry.latency_ms, | |
| entry.model, | |
| entry.extractor, | |
| ), | |
| ) | |
| return cur.lastrowid | |
| def log_metrics(self, entry: EvalMetricEntry) -> None: | |
| """Insert an eval_metrics row.""" | |
| with self._conn() as conn: | |
| conn.execute( | |
| """INSERT INTO eval_metrics | |
| (query_log_id, timestamp, faithfulness, answer_relevancy, | |
| context_precision, context_utilisation, mean_chunk_score) | |
| VALUES (?,?,?,?,?,?,?)""", | |
| ( | |
| entry.query_log_id, | |
| time.time(), | |
| entry.faithfulness, | |
| entry.answer_relevancy, | |
| entry.context_precision, | |
| entry.context_utilisation, | |
| entry.mean_chunk_score, | |
| ), | |
| ) | |
| # ββ Read βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_recent_queries(self, limit: int = 100) -> list[dict]: | |
| """Last N query logs joined with their eval metrics (if available).""" | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| """SELECT q.id, q.timestamp, q.query, q.intent, q.strategies, | |
| q.crag_grade, q.web_search_used, q.num_chunks, | |
| q.top_chunk_score, q.latency_ms, | |
| e.faithfulness, e.answer_relevancy, | |
| e.context_precision, e.context_utilisation, | |
| e.mean_chunk_score | |
| FROM query_logs q | |
| LEFT JOIN eval_metrics e ON e.query_log_id = q.id | |
| ORDER BY q.timestamp DESC | |
| LIMIT ?""", | |
| (limit,), | |
| ).fetchall() | |
| return [self._row_to_dict(r) for r in rows] | |
| def get_metric_timeseries(self, days: int = 7) -> list[dict]: | |
| """ | |
| Hourly-bucketed metric averages over the last N days. | |
| Used for the trend line chart in the dashboard. | |
| """ | |
| since = time.time() - days * 86400 | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| """SELECT | |
| CAST((q.timestamp - ?) / 3600 AS INTEGER) AS hour_bucket, | |
| AVG(e.faithfulness) AS faithfulness, | |
| AVG(e.answer_relevancy) AS answer_relevancy, | |
| AVG(e.context_precision) AS context_precision, | |
| AVG(e.mean_chunk_score) AS mean_chunk_score, | |
| COUNT(*) AS query_count | |
| FROM query_logs q | |
| JOIN eval_metrics e ON e.query_log_id = q.id | |
| WHERE q.timestamp > ? | |
| GROUP BY hour_bucket | |
| ORDER BY hour_bucket""", | |
| (since, since), | |
| ).fetchall() | |
| return [dict(zip( | |
| ["hour_bucket", "faithfulness", "answer_relevancy", | |
| "context_precision", "mean_chunk_score", "query_count"], r | |
| )) for r in rows] | |
| def get_summary_stats(self) -> dict: | |
| """Aggregate stats for the dashboard header metrics.""" | |
| with self._conn() as conn: | |
| total = conn.execute("SELECT COUNT(*) FROM query_logs").fetchone()[0] | |
| with_metrics = conn.execute("SELECT COUNT(*) FROM eval_metrics").fetchone()[0] | |
| avgs = conn.execute( | |
| """SELECT AVG(faithfulness), AVG(answer_relevancy), | |
| AVG(context_precision), AVG(mean_chunk_score) | |
| FROM eval_metrics""" | |
| ).fetchone() | |
| grade_dist = conn.execute( | |
| """SELECT crag_grade, COUNT(*) as cnt | |
| FROM query_logs WHERE crag_grade != '' | |
| GROUP BY crag_grade""" | |
| ).fetchall() | |
| strategy_dist = conn.execute( | |
| """SELECT strategies, COUNT(*) as cnt | |
| FROM query_logs GROUP BY strategies""" | |
| ).fetchall() | |
| avg_latency = conn.execute( | |
| "SELECT AVG(latency_ms) FROM query_logs WHERE latency_ms > 0" | |
| ).fetchone()[0] | |
| return { | |
| "total_queries": total, | |
| "evaluated_queries": with_metrics, | |
| "avg_faithfulness": round(avgs[0] or 0, 3), | |
| "avg_answer_relevancy": round(avgs[1] or 0, 3), | |
| "avg_context_precision": round(avgs[2] or 0, 3), | |
| "avg_chunk_score": round(avgs[3] or 0, 3), | |
| "avg_latency_ms": round(avg_latency or 0, 1), | |
| "crag_grade_dist": {r[0]: r[1] for r in grade_dist}, | |
| "strategy_dist": {r[0]: r[1] for r in strategy_dist}, | |
| } | |
| # ββ Init βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _init_db(self) -> None: | |
| with self._conn() as conn: | |
| conn.executescript(_DDL) | |
| logger.info("EvalStore ready at %s", self._path) | |
| def _conn(self): | |
| conn = sqlite3.connect(self._path, timeout=10, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| conn.commit() | |
| except Exception: | |
| conn.rollback() | |
| raise | |
| finally: | |
| conn.close() | |
| def _row_to_dict(row) -> dict: | |
| d = dict(row) | |
| for key in ("strategies",): | |
| if d.get(key): | |
| try: | |
| d[key] = json.loads(d[key]) | |
| except Exception: | |
| pass | |
| return d | |