File size: 9,219 Bytes
f0d100b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Cortex RAG β€” RAGAS Evaluation Harness (Phase 4)

Why reference-free metrics?
────────────────────────────
Classic RAG evaluation requires ground-truth answers (golden QA pairs).
We don't have those at runtime. RAGAS provides three metrics that need
only (question, answer, retrieved_contexts):

  faithfulness        β€” Does the answer make claims supported by the context?
                        Computed by asking an LLM to identify each claim in
                        the answer, then checking each claim against the context.
                        Score = supported_claims / total_claims.

  answer_relevancy    β€” Does the answer actually address the question?
                        Computed by generating N hypothetical questions from the
                        answer and measuring cosine similarity to the original
                        question. Low score = answer talks about something else.

  context_precision   β€” Are the retrieved chunks actually relevant to the query?
                        Computed by asking an LLM whether each chunk is useful
                        for answering the query. Score = relevant_chunks / total.

We also compute two lightweight custom metrics without any LLM calls:

  context_utilisation β€” What fraction of the retrieved chunks are cited in the
                        answer? (Count [1], [2]... citation markers.) A low score
                        means the generator ignored most of what was retrieved.

  mean_chunk_score    β€” Average retrieval score (post-reranking) of the final
                        chunks. Tracks retrieval quality independently of answer
                        quality. Useful for spotting when CRAG rewrites help.

Running mode
────────────
Evaluation is async β€” it runs in a background thread after the response
has been streamed to the user, so it never adds latency to the query path.
Results are written to the EvalStore (SQLite) and appear in the dashboard.

If RAGAS is not installed or the LLM call fails, only the two custom
metrics (context_utilisation, mean_chunk_score) are computed and stored.
This ensures the evaluation pipeline never blocks ingestion or queries.
"""
from __future__ import annotations

import logging
import re
import threading
from dataclasses import dataclass, field
from typing import Optional

from evaluation.store import EvalMetricEntry, EvalStore
from retrieval.dense import RetrievedChunk

logger = logging.getLogger(__name__)


@dataclass
class EvalInput:
    """Everything needed to evaluate one query-response pair."""
    query_log_id: int
    query: str
    answer: str
    chunks: list[RetrievedChunk] = field(default_factory=list)


@dataclass
class EvalResult:
    faithfulness:        Optional[float] = None
    answer_relevancy:    Optional[float] = None
    context_precision:   Optional[float] = None
    context_utilisation: Optional[float] = None
    mean_chunk_score:    Optional[float] = None

    def as_store_entry(self, query_log_id: int) -> EvalMetricEntry:
        return EvalMetricEntry(
            query_log_id=query_log_id,
            faithfulness=self.faithfulness,
            answer_relevancy=self.answer_relevancy,
            context_precision=self.context_precision,
            context_utilisation=self.context_utilisation,
            mean_chunk_score=self.mean_chunk_score,
        )


class RAGASEvaluator:
    """
    Computes RAGAS + custom metrics for a query-response pair.

    Usage β€” fire-and-forget (non-blocking):
        evaluator = RAGASEvaluator(store)
        evaluator.evaluate_async(EvalInput(
            query_log_id=log_id,
            query="What is attention?",
            answer="Attention is...",
            chunks=final_chunks,
        ))

    Usage β€” blocking (for testing):
        result = evaluator.evaluate(eval_input)
    """

    def __init__(self, store: Optional[EvalStore] = None) -> None:
        self._store = store or EvalStore()
        self._ragas_available = self._check_ragas()

    # ── Public API ─────────────────────────────────────────────

    def evaluate_async(self, inp: EvalInput) -> None:
        """
        Run evaluation in a daemon thread. Returns immediately.
        Results are written to EvalStore when complete.
        """
        thread = threading.Thread(
            target=self._run_and_store,
            args=(inp,),
            daemon=True,
            name=f"ragas-eval-{inp.query_log_id}",
        )
        thread.start()

    def evaluate(self, inp: EvalInput) -> EvalResult:
        """Blocking evaluation. Returns EvalResult."""
        result = EvalResult()

        # ── Custom metrics (no LLM, always computed) ──────────
        result.context_utilisation = self._context_utilisation(inp.answer, inp.chunks)
        result.mean_chunk_score    = self._mean_chunk_score(inp.chunks)

        # ── RAGAS metrics (LLM-based, may be skipped) ─────────
        if self._ragas_available and inp.chunks:
            ragas_scores = self._run_ragas(inp)
            result.faithfulness      = ragas_scores.get("faithfulness")
            result.answer_relevancy  = ragas_scores.get("answer_relevancy")
            result.context_precision = ragas_scores.get("context_precision")
        else:
            if not self._ragas_available:
                logger.debug("RAGAS not installed β€” only custom metrics computed.")

        return result

    # ── Private ────────────────────────────────────────────────

    def _run_and_store(self, inp: EvalInput) -> None:
        try:
            result = self.evaluate(inp)
            self._store.log_metrics(result.as_store_entry(inp.query_log_id))
            logger.debug(
                "Eval stored for query %d: faith=%.2f rel=%.2f prec=%.2f util=%.2f",
                inp.query_log_id,
                result.faithfulness      or 0,
                result.answer_relevancy  or 0,
                result.context_precision or 0,
                result.context_utilisation or 0,
            )
        except Exception as exc:
            logger.warning("Eval failed for query %d: %s", inp.query_log_id, exc)

    def _run_ragas(self, inp: EvalInput) -> dict:
        """
        Call RAGAS library. Returns dict of metric_name β†’ score.
        Returns empty dict on any failure.
        """
        try:
            from datasets import Dataset  # type: ignore
            from ragas import evaluate as ragas_evaluate  # type: ignore
            from ragas.metrics import (  # type: ignore
                answer_relevancy,
                context_precision,
                faithfulness,
            )
            from config import get_settings
            cfg = get_settings()

            # RAGAS expects a HuggingFace Dataset
            data = {
                "question":  [inp.query],
                "answer":    [inp.answer],
                "contexts":  [[c.parent_text or c.text for c in inp.chunks]],
                # reference not available at runtime β€” omit context_recall
            }
            dataset = Dataset.from_dict(data)

            scores = ragas_evaluate(
                dataset,
                metrics=[faithfulness, answer_relevancy, context_precision],
                raise_exceptions=False,
            )
            df = scores.to_pandas()
            return {
                "faithfulness":      float(df["faithfulness"].iloc[0])      if "faithfulness"      in df else None,
                "answer_relevancy":  float(df["answer_relevancy"].iloc[0])  if "answer_relevancy"  in df else None,
                "context_precision": float(df["context_precision"].iloc[0]) if "context_precision" in df else None,
            }

        except Exception as exc:
            logger.warning("RAGAS evaluation failed: %s", exc)
            return {}

    # ── Custom metrics (no LLM required) ──────────────────────

    @staticmethod
    def _context_utilisation(answer: str, chunks: list[RetrievedChunk]) -> float:
        """
        Fraction of retrieved chunks cited in the answer.
        Looks for inline [N] citation markers.
        """
        if not chunks:
            return 0.0
        cited_indices = set(int(n) for n in re.findall(r"\[(\d+)\]", answer))
        cited = sum(1 for i in range(1, len(chunks) + 1) if i in cited_indices)
        return round(cited / len(chunks), 3)

    @staticmethod
    def _mean_chunk_score(chunks: list[RetrievedChunk]) -> float:
        """Average retrieval score of the final chunks."""
        if not chunks:
            return 0.0
        return round(sum(c.score for c in chunks) / len(chunks), 3)

    @staticmethod
    def _check_ragas() -> bool:
        try:
            import ragas  # type: ignore  # noqa: F401
            import datasets  # type: ignore  # noqa: F401
            return True
        except ImportError:
            return False