Dokumentassistent / scripts /evaluate.py
XQ
Add evaluation and update README
a493f04
raw
history blame
25.7 kB
"""RAGAS matrix evaluation: run a curated test set through multiple
configurations of the RAG pipeline and persist results.
Two pre-defined experiments:
chunking — varies CHUNK_STRATEGY (FIXED_SIZE / RECURSIVE / SEMANTIC),
fixes router=react, top_k=5
router — varies router (react / pipeline), fixes
chunking=RECURSIVE, top_k=5
all — runs both experiments
quick — single cell (chunking=RECURSIVE, router=react), for smoke testing
Each cell:
1. Builds an in-memory Qdrant + BM25 index for its chunking strategy
(indices are reused across cells with the same chunking).
2. Runs each test question through the chosen router.
3. Sends the (question, answer, contexts, reference) tuples to RAGAS,
using the judge LLM from EVALUATOR_LLM_PROVIDER (or generation LLM if unset).
Output:
eval/runs/<timestamp>_<config>.json — full result (config + per-sample)
eval/runs/<timestamp>_<config>.md — human-readable aggregate table
Usage:
python -m scripts.evaluate --experiment quick
python -m scripts.evaluate --experiment chunking
python -m scripts.evaluate --experiment all --top-k 5
Env vars:
LLM_PROVIDER=groq (generation LLM)
GROQ_API_KEY=gsk_... (required for groq)
EVALUATOR_LLM_PROVIDER=groq (judge LLM; empty = reuse generation)
"""
import argparse
import datetime as dt
import json
import logging
import os
import shutil
import sys
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
from langchain_core.output_parsers import StrOutputParser
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, PROJECT_ROOT)
from src.agent.intent_classifier import IntentClassifier # noqa: E402
from src.agent.plan_and_execute import PlanAndExecuteRouter # noqa: E402
from src.agent.router import QueryRouter # noqa: E402
from src.config import Settings, load_settings # noqa: E402
from src.evaluation.evaluator import RAGEvaluator # noqa: E402
from src.ingestion.pipeline import IngestionPipeline # noqa: E402
from src.models import ChunkStrategy, GenerationResponse # noqa: E402
from src.provider import ( # noqa: E402
create_embeddings,
create_evaluator_llm,
create_llm,
create_reranker,
)
from src.retrieval.bm25_search import BM25Search # noqa: E402
from src.retrieval.embedder import Embedder # noqa: E402
from src.retrieval.hybrid import HybridRetriever # noqa: E402
from src.retrieval.reranker import Reranker # noqa: E402
from src.retrieval.vector_store import VectorStore # noqa: E402
logger = logging.getLogger(__name__)
DOCS_DIR = os.path.join(PROJECT_ROOT, "docs")
QA_SET_PATH = os.path.join(PROJECT_ROOT, "eval", "qa_set.yaml")
RUNS_DIR = os.path.join(PROJECT_ROOT, "eval", "runs")
# ---------------------------------------------------------------------------
# Run configuration
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class RunConfig:
"""Single experiment cell configuration."""
name: str
chunking: ChunkStrategy
router: str # "react" or "pipeline"
top_k: int
def _build_run_configs(experiment: str, top_k: int) -> list[RunConfig]:
"""Build the list of RunConfig cells for the requested experiment.
Args:
experiment: One of "quick", "chunking", "router", "all".
top_k: Top-K passed to every cell.
Returns:
Ordered list of RunConfig cells to execute.
Raises:
ValueError: If the experiment name is unknown.
"""
if experiment == "quick":
return [
RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k),
]
if experiment == "chunking":
return [
RunConfig("fixed_react", ChunkStrategy.FIXED_SIZE, "react", top_k),
RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k),
RunConfig("semantic_react", ChunkStrategy.SEMANTIC, "react", top_k),
]
if experiment == "router":
return [
RunConfig("recursive_pipeline", ChunkStrategy.RECURSIVE, "pipeline", top_k),
RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k),
]
if experiment == "all":
return [
RunConfig("fixed_react", ChunkStrategy.FIXED_SIZE, "react", top_k),
RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k),
RunConfig("semantic_react", ChunkStrategy.SEMANTIC, "react", top_k),
RunConfig("recursive_pipeline", ChunkStrategy.RECURSIVE, "pipeline", top_k),
]
raise ValueError(
f"Unknown experiment '{experiment}'. Expected one of: "
f"quick, chunking, router, all."
)
# ---------------------------------------------------------------------------
# QA set loading
# ---------------------------------------------------------------------------
def _load_qa_set(path: str) -> list[dict[str, Any]]:
"""Load and validate the curated QA set, keeping only reviewed entries.
Args:
path: Path to eval/qa_set.yaml.
Returns:
List of reviewed question dicts.
Raises:
FileNotFoundError: If the file does not exist.
ValueError: If the file is malformed or contains no reviewed entries.
"""
if not os.path.exists(path):
raise FileNotFoundError(
f"QA set not found at {path}. Run scripts/generate_qa_set.py first, "
f"then curate the draft into eval/qa_set.yaml."
)
with open(path, "r", encoding="utf-8") as fh:
data = yaml.safe_load(fh)
if isinstance(data, dict):
questions = data.get("questions", [])
else:
questions = data
if not isinstance(questions, list):
raise ValueError(f"Invalid QA set format in {path}: expected list of questions.")
reviewed = [q for q in questions if isinstance(q, dict) and q.get("reviewed")]
if not reviewed:
raise ValueError(
f"No reviewed questions in {path}. "
f"Set 'reviewed: true' on entries you want to keep."
)
logger.info("Loaded %d reviewed questions from %s", len(reviewed), path)
return reviewed
# ---------------------------------------------------------------------------
# Index building (with caching across cells)
# ---------------------------------------------------------------------------
@dataclass
class BuiltIndex:
"""Bundle of artifacts produced by ingesting docs/ with one chunking strategy."""
vector_store: VectorStore
bm25: BM25Search
embedder: Embedder
qdrant_path: str # tmp dir owned by this index, cleaned up at the end
def _build_index(
chunking: ChunkStrategy,
settings: Settings,
embeddings: Any,
) -> BuiltIndex:
"""Ingest docs/ with the given chunking strategy and build dense + sparse indices.
Args:
chunking: ChunkStrategy enum value.
settings: Application settings.
embeddings: LangChain Embeddings instance.
Returns:
BuiltIndex bundle ready to feed into a router.
"""
logger.info("Building index for chunking=%s ...", chunking.value)
pipeline = IngestionPipeline(
strategy=chunking,
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap,
embeddings=embeddings if chunking == ChunkStrategy.SEMANTIC else None,
)
chunks = pipeline.ingest_directory(DOCS_DIR)
if not chunks:
raise RuntimeError(f"No chunks produced for chunking={chunking.value}")
logger.info(" -> %d chunks", len(chunks))
embedder = Embedder(embeddings)
vectors = embedder.embed_batch([c.text for c in chunks])
qdrant_path = tempfile.mkdtemp(prefix=f"eval_qdrant_{chunking.value}_")
vector_store = VectorStore(
path=qdrant_path,
collection_name=f"eval_{chunking.value}",
dimension=settings.embedding_dimension,
)
vector_store.add_chunks(chunks, vectors)
bm25 = BM25Search()
bm25.index(chunks)
return BuiltIndex(
vector_store=vector_store,
bm25=bm25,
embedder=embedder,
qdrant_path=qdrant_path,
)
# ---------------------------------------------------------------------------
# Router construction
# ---------------------------------------------------------------------------
def _build_router(
router_kind: str,
index: BuiltIndex,
reranker: Reranker,
llm: Any,
settings: Settings,
) -> Any:
"""Build the requested router (PlanAndExecuteRouter or QueryRouter).
Args:
router_kind: "react" for PlanAndExecuteRouter, "pipeline" for QueryRouter.
index: BuiltIndex bundle.
reranker: Shared Reranker instance.
llm: Generation LLM instance.
settings: Application settings.
Returns:
A router instance exposing ``route(query, top_k)``.
"""
hybrid = HybridRetriever(
vector_store=index.vector_store,
bm25_search=index.bm25,
embedder=index.embedder,
dense_weight=settings.dense_weight,
bm25_weight=settings.bm25_weight,
)
if router_kind == "react":
return PlanAndExecuteRouter(
llm=llm,
hybrid_retriever=hybrid,
reranker=reranker,
vector_store=index.vector_store,
default_top_k=settings.top_k,
)
if router_kind == "pipeline":
classifier = IntentClassifier(llm=llm, model_name=settings.generation_model)
llm_chain = llm | StrOutputParser()
return QueryRouter(
intent_classifier=classifier,
hybrid_retriever=hybrid,
reranker=reranker,
llm_chain=llm_chain,
translate_query=settings.translate_query,
)
raise ValueError(f"Unknown router kind: {router_kind!r}")
# ---------------------------------------------------------------------------
# Per-cell run
# ---------------------------------------------------------------------------
def _generate_records(
config: RunConfig,
qa_set: list[dict[str, Any]],
index: BuiltIndex,
reranker: Reranker,
llm: Any,
settings: Settings,
) -> list[dict[str, Any]]:
"""Run the router over the QA set and collect raw records.
Args:
config: Cell configuration.
qa_set: Reviewed test questions.
index: Pre-built dense + sparse indices for this chunking.
reranker: Shared Reranker.
llm: Generation LLM.
settings: Application settings.
Returns:
List of raw record dicts (one per question), each containing the
generated answer and retrieved contexts.
"""
router = _build_router(config.router, index, reranker, llm, settings)
records: list[dict[str, Any]] = []
for entry in qa_set:
question = entry["question"]
logger.info(" Q: %s", question)
try:
response: GenerationResponse = router.route(query=question, top_k=config.top_k)
except Exception as exc:
logger.error("Router failed for question %r: %s", question, exc)
response = GenerationResponse(
answer="",
sources=[],
intent=None, # type: ignore[arg-type]
confidence=0.0,
)
ctx_texts = [r.chunk.text for r in response.sources]
records.append(
{
"question": question,
"answer": response.answer,
"retrieved_contexts": ctx_texts,
"reference_en": entry.get("reference_en", ""),
"source_quote_da": entry.get("source_quote_da", ""),
"source_doc": entry.get("source_doc", ""),
"category": entry.get("category", ""),
}
)
return records
def _judge_records(
raw_records: list[dict[str, Any]],
judge: RAGEvaluator,
) -> dict[str, Any]:
"""Run the RAGAS judge over a list of pre-computed raw records.
Args:
raw_records: List of records as produced by ``_generate_records``.
judge: RAGEvaluator wrapping the judge LLM.
Returns:
Dict with ``aggregate`` and ``per_sample`` keys from the judge.
"""
questions = [r["question"] for r in raw_records]
answers = [r["answer"] for r in raw_records]
contexts = [r["retrieved_contexts"] for r in raw_records]
ground_truths: list[str | dict[str, Any]] = [
{
"reference_en": r.get("reference_en", ""),
"source_quote_da": r.get("source_quote_da", ""),
}
for r in raw_records
]
return judge.evaluate(
questions=questions,
answers=answers,
contexts=contexts,
ground_truths=ground_truths,
)
def _run_cell(
config: RunConfig,
qa_set: list[dict[str, Any]],
index: BuiltIndex,
reranker: Reranker,
llm: Any,
judge: RAGEvaluator,
settings: Settings,
checkpoint_path: Path,
) -> dict[str, Any]:
"""Run one experiment cell end-to-end.
The raw records (questions / answers / contexts) are checkpointed to
``checkpoint_path`` BEFORE the RAGAS judge is invoked, so a judge crash
does not waste the generation work. Use ``--rejudge <checkpoint>`` to
re-run the judge step on a saved checkpoint.
Args:
config: The cell configuration.
qa_set: List of reviewed test questions.
index: Pre-built dense + sparse indices for this chunking.
reranker: Shared Reranker.
llm: Generation LLM.
judge: RAGEvaluator wrapping the judge LLM.
settings: Application settings.
checkpoint_path: Where to write the raw-records checkpoint.
Returns:
Dict with config metadata, aggregate scores, per-sample scores, and
the raw answers/contexts collected from the router.
"""
logger.info("=== Running cell %s ===", config.name)
raw_records = _generate_records(config, qa_set, index, reranker, llm, settings)
# Checkpoint BEFORE the judge so a RAGAS crash does not waste generation.
checkpoint_payload = {
"config": {
"name": config.name,
"chunking": config.chunking.value,
"router": config.router,
"top_k": config.top_k,
},
"raw_records": raw_records,
"n_samples": len(raw_records),
}
_write_json(checkpoint_path, checkpoint_payload)
logger.info("Wrote raw-records checkpoint: %s", checkpoint_path)
logger.info(" -> calling RAGAS judge ...")
eval_result = _judge_records(raw_records, judge)
return {
"config": {
"name": config.name,
"chunking": config.chunking.value,
"router": config.router,
"top_k": config.top_k,
},
"aggregate": eval_result["aggregate"],
"per_sample": eval_result["per_sample"],
"raw_records": raw_records,
"n_samples": len(qa_set),
}
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
def _safe_json(obj: Any) -> Any:
"""Convert non-JSON-serialisable values (numpy / pandas) to plain Python."""
if hasattr(obj, "item") and callable(obj.item):
try:
return obj.item()
except Exception: # noqa: BLE001
return str(obj)
if isinstance(obj, dict):
return {k: _safe_json(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [_safe_json(v) for v in obj]
return obj
def _write_json(path: Path, payload: dict[str, Any]) -> None:
"""Write a JSON result file with safe serialisation."""
with open(path, "w", encoding="utf-8") as fh:
json.dump(_safe_json(payload), fh, indent=2, ensure_ascii=False)
def _format_markdown(
timestamp: str,
settings: Settings,
cell_results: list[dict[str, Any]],
) -> str:
"""Format the matrix results as a Markdown report.
Args:
timestamp: ISO timestamp for the report header.
settings: Application settings (used for env metadata).
cell_results: One dict per cell, as produced by _run_cell.
Returns:
Markdown string with a metadata block and one aggregate table.
"""
judge_provider = settings.evaluator_llm_provider or settings.llm_provider
judge_model_label = (
settings.evaluator_llm_model
or {
"groq": settings.groq_model,
"openai": settings.openai_model,
"anthropic": settings.anthropic_model,
"google_genai": settings.google_model,
"ollama": settings.ollama_model,
}.get(judge_provider, "(provider default)")
)
lines: list[str] = []
lines.append(f"# RAGAS Evaluation — {timestamp}")
lines.append("")
lines.append("## Setup")
lines.append("")
lines.append(f"- Generation LLM: `{settings.llm_provider}` / "
f"`{settings.groq_model if settings.llm_provider == 'groq' else settings.generation_model}`")
lines.append(f"- Judge LLM: `{judge_provider}` / `{judge_model_label}`")
lines.append(f"- Embeddings: `{settings.embedding_provider}` / `{settings.local_embedding_model}`")
lines.append(f"- Reranker: `{settings.reranker_model}`")
lines.append(f"- Samples: {cell_results[0]['n_samples'] if cell_results else 0}")
lines.append("")
if not cell_results:
return "\n".join(lines)
metric_keys: list[str] = []
for cell in cell_results:
for key in cell["aggregate"].keys():
if key not in metric_keys:
metric_keys.append(key)
header = ["Config", "Chunking", "Router", "top_k", *metric_keys]
lines.append("## Aggregate Scores")
lines.append("")
lines.append("| " + " | ".join(header) + " |")
lines.append("|" + "|".join("---" for _ in header) + "|")
for cell in cell_results:
cfg = cell["config"]
row = [
cfg["name"],
cfg["chunking"],
cfg["router"],
str(cfg["top_k"]),
]
for key in metric_keys:
value = cell["aggregate"].get(key)
row.append(f"{value:.4f}" if isinstance(value, (int, float)) else "—")
lines.append("| " + " | ".join(row) + " |")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Run a RAGAS evaluation matrix over the curated test set.",
)
parser.add_argument(
"--experiment",
choices=["quick", "chunking", "router", "all"],
default="quick",
help="Pre-defined experiment to run (default: quick).",
)
parser.add_argument("--top-k", type=int, default=5, help="Top-K per query (default 5).")
parser.add_argument(
"--qa-set",
type=str,
default=QA_SET_PATH,
help=f"Path to curated QA set YAML (default: {QA_SET_PATH}).",
)
parser.add_argument(
"--runs-dir",
type=str,
default=RUNS_DIR,
help=f"Directory for output JSON+MD files (default: {RUNS_DIR}).",
)
parser.add_argument(
"--rejudge",
type=str,
default="",
metavar="CHECKPOINT_PATH",
help=(
"Re-run only the RAGAS judge on a previously saved checkpoint "
"(eval/runs/<ts>_<cell>.checkpoint.json). Skips index building "
"and router invocation entirely."
),
)
return parser.parse_args()
def _rejudge_from_checkpoint(
checkpoint_path: str,
settings: Settings,
runs_dir: Path,
timestamp: str,
) -> None:
"""Re-run only the RAGAS judge against a saved raw-records checkpoint.
Skips index building, router invocation, and the temp Qdrant lifecycle
entirely. The new judged result is written next to the runs dir under a
fresh timestamp.
Args:
checkpoint_path: Path to a checkpoint.json file from a prior run.
settings: Application settings (used to build the judge LLM).
runs_dir: Directory for output files.
timestamp: Timestamp string used in output filenames.
"""
logger.info("Rejudge mode: loading checkpoint %s", checkpoint_path)
with open(checkpoint_path, "r", encoding="utf-8") as fh:
checkpoint = json.load(fh)
config_dict = checkpoint.get("config", {})
raw_records = checkpoint.get("raw_records", [])
if not raw_records:
raise ValueError(f"Checkpoint {checkpoint_path} contains no raw_records.")
logger.info(
"Checkpoint config: %s | %d records",
config_dict.get("name", "(unknown)"),
len(raw_records),
)
judge_llm = create_evaluator_llm(settings)
embeddings = create_embeddings(settings)
judge = RAGEvaluator(llm=judge_llm, embeddings=embeddings)
logger.info("Calling RAGAS judge on cached records ...")
eval_result = _judge_records(raw_records, judge)
cell_result = {
"config": config_dict,
"aggregate": eval_result["aggregate"],
"per_sample": eval_result["per_sample"],
"raw_records": raw_records,
"n_samples": len(raw_records),
}
cell_name = config_dict.get("name", "rejudged")
out_json = runs_dir / f"{timestamp}_{cell_name}.rejudged.json"
out_md = runs_dir / f"{timestamp}_{cell_name}.rejudged.md"
_write_json(out_json, cell_result)
with open(out_md, "w", encoding="utf-8") as fh:
fh.write(_format_markdown(timestamp, settings, [cell_result]))
logger.info("Wrote rejudged JSON: %s", out_json)
logger.info("Wrote rejudged Markdown: %s", out_md)
print(f"\nRejudge done. Markdown report: {out_md}")
def main() -> None:
"""Run the requested experiment matrix and persist results."""
args = parse_args()
settings = load_settings()
logging.basicConfig(
level=getattr(logging, settings.log_level.upper(), logging.INFO),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
timestamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
runs_dir = Path(args.runs_dir)
runs_dir.mkdir(parents=True, exist_ok=True)
if args.rejudge:
logger.info("=== RAGAS Rejudge Start (%s) ===", timestamp)
_rejudge_from_checkpoint(args.rejudge, settings, runs_dir, timestamp)
return
logger.info("=== RAGAS Matrix Evaluation Start (%s) ===", timestamp)
qa_set = _load_qa_set(args.qa_set)
run_configs = _build_run_configs(args.experiment, args.top_k)
logger.info("Experiment: %s | %d cells | %d questions",
args.experiment, len(run_configs), len(qa_set))
# --- Build shared resources -------------------------------------------
llm = create_llm(settings)
judge_llm = create_evaluator_llm(settings)
embeddings = create_embeddings(settings)
judge = RAGEvaluator(llm=judge_llm, embeddings=embeddings)
reranker = Reranker(model=create_reranker(settings.reranker_model))
# --- Build indices once per chunking strategy -------------------------
indices: dict[ChunkStrategy, BuiltIndex] = {}
needed_chunkings = {cfg.chunking for cfg in run_configs}
for chunking in needed_chunkings:
indices[chunking] = _build_index(chunking, settings, embeddings)
# --- Run cells ---------------------------------------------------------
cell_results: list[dict[str, Any]] = []
try:
for config in run_configs:
checkpoint_path = runs_dir / f"{timestamp}_{config.name}.checkpoint.json"
cell_result = _run_cell(
config=config,
qa_set=qa_set,
index=indices[config.chunking],
reranker=reranker,
llm=llm,
judge=judge,
settings=settings,
checkpoint_path=checkpoint_path,
)
cell_results.append(cell_result)
# Per-cell judged-result JSON
cell_json_path = runs_dir / f"{timestamp}_{config.name}.json"
_write_json(cell_json_path, cell_result)
logger.info("Wrote cell result: %s", cell_json_path)
finally:
# --- Combined JSON + Markdown report -------------------------------
combined_path = runs_dir / f"{timestamp}_{args.experiment}.json"
md_path = runs_dir / f"{timestamp}_{args.experiment}.md"
_write_json(
combined_path,
{
"timestamp": timestamp,
"experiment": args.experiment,
"qa_set_path": args.qa_set,
"n_samples": len(qa_set),
"cells": cell_results,
},
)
with open(md_path, "w", encoding="utf-8") as fh:
fh.write(_format_markdown(timestamp, settings, cell_results))
logger.info("Wrote combined JSON: %s", combined_path)
logger.info("Wrote Markdown report: %s", md_path)
# --- Cleanup tmp Qdrant dirs ---------------------------------------
for index in indices.values():
shutil.rmtree(index.qdrant_path, ignore_errors=True)
logger.info("Cleaned up %d tmp Qdrant dirs", len(indices))
print(f"\nDone. Markdown report: {md_path}")
if __name__ == "__main__":
main()