Cortex / evaluation /store.py
aditya-joshi-05's picture
Updates and fixes
27edbb8
"""
Cortex RAG β€” Evaluation Store (SQLite)
Two tables:
query_logs β€” one row per query: routing, CRAG grade, latency, chunk scores
eval_metrics β€” one row per query: RAGAS scores (written async after generation)
SQLite is the right choice here: zero infrastructure, works on Railway/Render
out of the box, and a dashboard corpus of ~10k queries fits in <50MB.
Swap to Postgres trivially later by changing the connection string.
The store is intentionally append-only. No deletes, no updates.
This preserves the full history for trend analysis in the dashboard.
"""
from __future__ import annotations
import json
import logging
import sqlite3
import time
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
_DEFAULT_DB_PATH = Path("data/cortex_eval.db")
# ── Schema ─────────────────────────────────────────────────────
_DDL = """
CREATE TABLE IF NOT EXISTS query_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
query TEXT NOT NULL,
intent TEXT,
strategies TEXT, -- JSON list
retriever_hits TEXT, -- JSON dict
crag_grade TEXT,
crag_rewritten INTEGER DEFAULT 0, -- bool
web_search_used INTEGER DEFAULT 0, -- bool
num_chunks INTEGER DEFAULT 0,
top_chunk_score REAL DEFAULT 0.0,
latency_ms REAL DEFAULT 0.0,
model TEXT,
extractor TEXT
);
CREATE TABLE IF NOT EXISTS eval_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query_log_id INTEGER NOT NULL REFERENCES query_logs(id),
timestamp REAL NOT NULL,
faithfulness REAL, -- 0-1: does answer contradict context?
answer_relevancy REAL, -- 0-1: does answer address the question?
context_precision REAL, -- 0-1: are retrieved chunks relevant?
context_utilisation REAL, -- 0-1: fraction of chunks cited in answer
mean_chunk_score REAL -- average retrieval score of final chunks
);
CREATE INDEX IF NOT EXISTS idx_query_logs_ts ON query_logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_eval_metrics_id ON eval_metrics(query_log_id);
"""
# ── Dataclasses ────────────────────────────────────────────────
@dataclass
class QueryLogEntry:
query: str
intent: str = ""
strategies: list[str] = None
retriever_hits: dict = None
crag_grade: str = ""
crag_rewritten: bool = False
web_search_used: bool = False
num_chunks: int = 0
top_chunk_score: float = 0.0
latency_ms: float = 0.0
model: str = ""
extractor: str = ""
def __post_init__(self):
if self.strategies is None:
self.strategies = []
if self.retriever_hits is None:
self.retriever_hits = {}
@dataclass
class EvalMetricEntry:
query_log_id: int
faithfulness: Optional[float] = None
answer_relevancy: Optional[float] = None
context_precision: Optional[float] = None
context_utilisation: Optional[float] = None
mean_chunk_score: Optional[float] = None
# ── Store ──────────────────────────────────────────────────────
class EvalStore:
"""
Thread-safe SQLite-backed store for query logs and eval metrics.
Usage:
store = EvalStore()
log_id = store.log_query(entry)
store.log_metrics(EvalMetricEntry(query_log_id=log_id, faithfulness=0.92, ...))
"""
def __init__(self, db_path: str | Path = _DEFAULT_DB_PATH) -> None:
self._path = Path(db_path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
# ── Write ──────────────────────────────────────────────────
def log_query(self, entry: QueryLogEntry) -> int:
"""Insert a query log row. Returns the new row id."""
with self._conn() as conn:
cur = conn.execute(
"""INSERT INTO query_logs
(timestamp, query, intent, strategies, retriever_hits,
crag_grade, crag_rewritten, web_search_used,
num_chunks, top_chunk_score, latency_ms, model, extractor)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
time.time(),
entry.query,
entry.intent,
json.dumps(entry.strategies),
json.dumps(entry.retriever_hits),
entry.crag_grade,
int(entry.crag_rewritten),
int(entry.web_search_used),
entry.num_chunks,
entry.top_chunk_score,
entry.latency_ms,
entry.model,
entry.extractor,
),
)
return cur.lastrowid
def log_metrics(self, entry: EvalMetricEntry) -> None:
"""Insert an eval_metrics row."""
with self._conn() as conn:
conn.execute(
"""INSERT INTO eval_metrics
(query_log_id, timestamp, faithfulness, answer_relevancy,
context_precision, context_utilisation, mean_chunk_score)
VALUES (?,?,?,?,?,?,?)""",
(
entry.query_log_id,
time.time(),
entry.faithfulness,
entry.answer_relevancy,
entry.context_precision,
entry.context_utilisation,
entry.mean_chunk_score,
),
)
# ── Read ───────────────────────────────────────────────────
def get_recent_queries(self, limit: int = 100) -> list[dict]:
"""Last N query logs joined with their eval metrics (if available)."""
with self._conn() as conn:
rows = conn.execute(
"""SELECT q.id, q.timestamp, q.query, q.intent, q.strategies,
q.crag_grade, q.web_search_used, q.num_chunks,
q.top_chunk_score, q.latency_ms,
e.faithfulness, e.answer_relevancy,
e.context_precision, e.context_utilisation,
e.mean_chunk_score
FROM query_logs q
LEFT JOIN eval_metrics e ON e.query_log_id = q.id
ORDER BY q.timestamp DESC
LIMIT ?""",
(limit,),
).fetchall()
return [self._row_to_dict(r) for r in rows]
def get_metric_timeseries(self, days: int = 7) -> list[dict]:
"""
Hourly-bucketed metric averages over the last N days.
Used for the trend line chart in the dashboard.
"""
since = time.time() - days * 86400
with self._conn() as conn:
rows = conn.execute(
"""SELECT
CAST((q.timestamp - ?) / 3600 AS INTEGER) AS hour_bucket,
AVG(e.faithfulness) AS faithfulness,
AVG(e.answer_relevancy) AS answer_relevancy,
AVG(e.context_precision) AS context_precision,
AVG(e.mean_chunk_score) AS mean_chunk_score,
COUNT(*) AS query_count
FROM query_logs q
JOIN eval_metrics e ON e.query_log_id = q.id
WHERE q.timestamp > ?
GROUP BY hour_bucket
ORDER BY hour_bucket""",
(since, since),
).fetchall()
return [dict(zip(
["hour_bucket", "faithfulness", "answer_relevancy",
"context_precision", "mean_chunk_score", "query_count"], r
)) for r in rows]
def get_summary_stats(self) -> dict:
"""Aggregate stats for the dashboard header metrics."""
with self._conn() as conn:
total = conn.execute("SELECT COUNT(*) FROM query_logs").fetchone()[0]
with_metrics = conn.execute("SELECT COUNT(*) FROM eval_metrics").fetchone()[0]
avgs = conn.execute(
"""SELECT AVG(faithfulness), AVG(answer_relevancy),
AVG(context_precision), AVG(mean_chunk_score)
FROM eval_metrics"""
).fetchone()
grade_dist = conn.execute(
"""SELECT crag_grade, COUNT(*) as cnt
FROM query_logs WHERE crag_grade != ''
GROUP BY crag_grade"""
).fetchall()
strategy_dist = conn.execute(
"""SELECT strategies, COUNT(*) as cnt
FROM query_logs GROUP BY strategies"""
).fetchall()
avg_latency = conn.execute(
"SELECT AVG(latency_ms) FROM query_logs WHERE latency_ms > 0"
).fetchone()[0]
return {
"total_queries": total,
"evaluated_queries": with_metrics,
"avg_faithfulness": round(avgs[0] or 0, 3),
"avg_answer_relevancy": round(avgs[1] or 0, 3),
"avg_context_precision": round(avgs[2] or 0, 3),
"avg_chunk_score": round(avgs[3] or 0, 3),
"avg_latency_ms": round(avg_latency or 0, 1),
"crag_grade_dist": {r[0]: r[1] for r in grade_dist},
"strategy_dist": {r[0]: r[1] for r in strategy_dist},
}
# ── Init ───────────────────────────────────────────────────
def _init_db(self) -> None:
with self._conn() as conn:
conn.executescript(_DDL)
logger.info("EvalStore ready at %s", self._path)
@contextmanager
def _conn(self):
conn = sqlite3.connect(self._path, timeout=10, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
@staticmethod
def _row_to_dict(row) -> dict:
d = dict(row)
for key in ("strategies",):
if d.get(key):
try:
d[key] = json.loads(d[key])
except Exception:
pass
return d