File size: 10,970 Bytes
f0d100b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27edbb8
f0d100b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
Cortex RAG β€” Evaluation Store (SQLite)

Two tables:
  query_logs   β€” one row per query: routing, CRAG grade, latency, chunk scores
  eval_metrics β€” one row per query: RAGAS scores (written async after generation)

SQLite is the right choice here: zero infrastructure, works on Railway/Render
out of the box, and a dashboard corpus of ~10k queries fits in <50MB.
Swap to Postgres trivially later by changing the connection string.

The store is intentionally append-only. No deletes, no updates.
This preserves the full history for trend analysis in the dashboard.
"""
from __future__ import annotations

import json
import logging
import sqlite3
import time
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

_DEFAULT_DB_PATH = Path("data/cortex_eval.db")

# ── Schema ─────────────────────────────────────────────────────

_DDL = """
CREATE TABLE IF NOT EXISTS query_logs (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp       REAL    NOT NULL,
    query           TEXT    NOT NULL,
    intent          TEXT,
    strategies      TEXT,           -- JSON list
    retriever_hits  TEXT,           -- JSON dict
    crag_grade      TEXT,
    crag_rewritten  INTEGER DEFAULT 0,   -- bool
    web_search_used INTEGER DEFAULT 0,   -- bool
    num_chunks      INTEGER DEFAULT 0,
    top_chunk_score REAL    DEFAULT 0.0,
    latency_ms      REAL    DEFAULT 0.0,
    model           TEXT,
    extractor       TEXT
);

CREATE TABLE IF NOT EXISTS eval_metrics (
    id                  INTEGER PRIMARY KEY AUTOINCREMENT,
    query_log_id        INTEGER NOT NULL REFERENCES query_logs(id),
    timestamp           REAL    NOT NULL,
    faithfulness        REAL,   -- 0-1: does answer contradict context?
    answer_relevancy    REAL,   -- 0-1: does answer address the question?
    context_precision   REAL,   -- 0-1: are retrieved chunks relevant?
    context_utilisation REAL,   -- 0-1: fraction of chunks cited in answer
    mean_chunk_score    REAL    -- average retrieval score of final chunks
);

CREATE INDEX IF NOT EXISTS idx_query_logs_ts   ON query_logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_eval_metrics_id ON eval_metrics(query_log_id);
"""


# ── Dataclasses ────────────────────────────────────────────────

@dataclass
class QueryLogEntry:
    query: str
    intent: str = ""
    strategies: list[str] = None
    retriever_hits: dict = None
    crag_grade: str = ""
    crag_rewritten: bool = False
    web_search_used: bool = False
    num_chunks: int = 0
    top_chunk_score: float = 0.0
    latency_ms: float = 0.0
    model: str = ""
    extractor: str = ""

    def __post_init__(self):
        if self.strategies is None:
            self.strategies = []
        if self.retriever_hits is None:
            self.retriever_hits = {}


@dataclass
class EvalMetricEntry:
    query_log_id: int
    faithfulness: Optional[float] = None
    answer_relevancy: Optional[float] = None
    context_precision: Optional[float] = None
    context_utilisation: Optional[float] = None
    mean_chunk_score: Optional[float] = None


# ── Store ──────────────────────────────────────────────────────

class EvalStore:
    """
    Thread-safe SQLite-backed store for query logs and eval metrics.

    Usage:
        store = EvalStore()
        log_id = store.log_query(entry)
        store.log_metrics(EvalMetricEntry(query_log_id=log_id, faithfulness=0.92, ...))
    """

    def __init__(self, db_path: str | Path = _DEFAULT_DB_PATH) -> None:
        self._path = Path(db_path)
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._init_db()

    # ── Write ──────────────────────────────────────────────────

    def log_query(self, entry: QueryLogEntry) -> int:
        """Insert a query log row. Returns the new row id."""
        with self._conn() as conn:
            cur = conn.execute(
                """INSERT INTO query_logs
                   (timestamp, query, intent, strategies, retriever_hits,
                    crag_grade, crag_rewritten, web_search_used,
                    num_chunks, top_chunk_score, latency_ms, model, extractor)
                   VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""",
                (
                    time.time(),
                    entry.query,
                    entry.intent,
                    json.dumps(entry.strategies),
                    json.dumps(entry.retriever_hits),
                    entry.crag_grade,
                    int(entry.crag_rewritten),
                    int(entry.web_search_used),
                    entry.num_chunks,
                    entry.top_chunk_score,
                    entry.latency_ms,
                    entry.model,
                    entry.extractor,
                ),
            )
            return cur.lastrowid

    def log_metrics(self, entry: EvalMetricEntry) -> None:
        """Insert an eval_metrics row."""
        with self._conn() as conn:
            conn.execute(
                """INSERT INTO eval_metrics
                   (query_log_id, timestamp, faithfulness, answer_relevancy,
                    context_precision, context_utilisation, mean_chunk_score)
                   VALUES (?,?,?,?,?,?,?)""",
                (
                    entry.query_log_id,
                    time.time(),
                    entry.faithfulness,
                    entry.answer_relevancy,
                    entry.context_precision,
                    entry.context_utilisation,
                    entry.mean_chunk_score,
                ),
            )

    # ── Read ───────────────────────────────────────────────────

    def get_recent_queries(self, limit: int = 100) -> list[dict]:
        """Last N query logs joined with their eval metrics (if available)."""
        with self._conn() as conn:
            rows = conn.execute(
                """SELECT q.id, q.timestamp, q.query, q.intent, q.strategies,
                          q.crag_grade, q.web_search_used, q.num_chunks,
                          q.top_chunk_score, q.latency_ms,
                          e.faithfulness, e.answer_relevancy,
                          e.context_precision, e.context_utilisation,
                          e.mean_chunk_score
                   FROM query_logs q
                   LEFT JOIN eval_metrics e ON e.query_log_id = q.id
                   ORDER BY q.timestamp DESC
                   LIMIT ?""",
                (limit,),
            ).fetchall()
        return [self._row_to_dict(r) for r in rows]

    def get_metric_timeseries(self, days: int = 7) -> list[dict]:
        """
        Hourly-bucketed metric averages over the last N days.
        Used for the trend line chart in the dashboard.
        """
        since = time.time() - days * 86400
        with self._conn() as conn:
            rows = conn.execute(
                """SELECT
                       CAST((q.timestamp - ?) / 3600 AS INTEGER) AS hour_bucket,
                       AVG(e.faithfulness)        AS faithfulness,
                       AVG(e.answer_relevancy)    AS answer_relevancy,
                       AVG(e.context_precision)   AS context_precision,
                       AVG(e.mean_chunk_score)    AS mean_chunk_score,
                       COUNT(*)                   AS query_count
                   FROM query_logs q
                   JOIN eval_metrics e ON e.query_log_id = q.id
                   WHERE q.timestamp > ?
                   GROUP BY hour_bucket
                   ORDER BY hour_bucket""",
                (since, since),
            ).fetchall()
        return [dict(zip(
            ["hour_bucket", "faithfulness", "answer_relevancy",
             "context_precision", "mean_chunk_score", "query_count"], r
        )) for r in rows]

    def get_summary_stats(self) -> dict:
        """Aggregate stats for the dashboard header metrics."""
        with self._conn() as conn:
            total = conn.execute("SELECT COUNT(*) FROM query_logs").fetchone()[0]
            with_metrics = conn.execute("SELECT COUNT(*) FROM eval_metrics").fetchone()[0]
            avgs = conn.execute(
                """SELECT AVG(faithfulness), AVG(answer_relevancy),
                          AVG(context_precision), AVG(mean_chunk_score)
                   FROM eval_metrics"""
            ).fetchone()
            grade_dist = conn.execute(
                """SELECT crag_grade, COUNT(*) as cnt
                   FROM query_logs WHERE crag_grade != ''
                   GROUP BY crag_grade"""
            ).fetchall()
            strategy_dist = conn.execute(
                """SELECT strategies, COUNT(*) as cnt
                   FROM query_logs GROUP BY strategies"""
            ).fetchall()
            avg_latency = conn.execute(
                "SELECT AVG(latency_ms) FROM query_logs WHERE latency_ms > 0"
            ).fetchone()[0]

        return {
            "total_queries": total,
            "evaluated_queries": with_metrics,
            "avg_faithfulness":      round(avgs[0] or 0, 3),
            "avg_answer_relevancy":  round(avgs[1] or 0, 3),
            "avg_context_precision": round(avgs[2] or 0, 3),
            "avg_chunk_score":       round(avgs[3] or 0, 3),
            "avg_latency_ms":        round(avg_latency or 0, 1),
            "crag_grade_dist":  {r[0]: r[1] for r in grade_dist},
            "strategy_dist":    {r[0]: r[1] for r in strategy_dist},
        }

    # ── Init ───────────────────────────────────────────────────

    def _init_db(self) -> None:
        with self._conn() as conn:
            conn.executescript(_DDL)
        logger.info("EvalStore ready at %s", self._path)

    @contextmanager
    def _conn(self):
        conn = sqlite3.connect(self._path, timeout=10, check_same_thread=False)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    @staticmethod
    def _row_to_dict(row) -> dict:
        d = dict(row)
        for key in ("strategies",):
            if d.get(key):
                try:
                    d[key] = json.loads(d[key])
                except Exception:
                    pass
        return d