"""RAGAS matrix evaluation: run a curated test set through multiple configurations of the RAG pipeline and persist results. Two pre-defined experiments: chunking — varies CHUNK_STRATEGY (FIXED_SIZE / RECURSIVE / SEMANTIC), fixes router=react, top_k=5 router — varies router (react / pipeline), fixes chunking=RECURSIVE, top_k=5 all — runs both experiments quick — single cell (chunking=RECURSIVE, router=react), for smoke testing Each cell: 1. Builds an in-memory Qdrant + BM25 index for its chunking strategy (indices are reused across cells with the same chunking). 2. Runs each test question through the chosen router. 3. Sends the (question, answer, contexts, reference) tuples to RAGAS, using the judge LLM from EVALUATOR_LLM_PROVIDER (or generation LLM if unset). Output: eval/runs/_.json — full result (config + per-sample) eval/runs/_.md — human-readable aggregate table Usage: python -m scripts.evaluate --experiment quick python -m scripts.evaluate --experiment chunking python -m scripts.evaluate --experiment all --top-k 5 Env vars: LLM_PROVIDER=groq (generation LLM) GROQ_API_KEY=gsk_... (required for groq) EVALUATOR_LLM_PROVIDER=groq (judge LLM; empty = reuse generation) """ import argparse import datetime as dt import json import logging import os import shutil import sys import tempfile from dataclasses import dataclass from pathlib import Path from typing import Any import yaml from langchain_core.output_parsers import StrOutputParser PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, PROJECT_ROOT) from src.agent.intent_classifier import IntentClassifier # noqa: E402 from src.agent.plan_and_execute import PlanAndExecuteRouter # noqa: E402 from src.agent.router import QueryRouter # noqa: E402 from src.config import Settings, load_settings # noqa: E402 from src.evaluation.evaluator import RAGEvaluator # noqa: E402 from src.ingestion.pipeline import IngestionPipeline # noqa: E402 from src.models import ChunkStrategy, GenerationResponse # noqa: E402 from src.provider import ( # noqa: E402 create_embeddings, create_evaluator_llm, create_llm, create_reranker, ) from src.retrieval.bm25_search import BM25Search # noqa: E402 from src.retrieval.embedder import Embedder # noqa: E402 from src.retrieval.hybrid import HybridRetriever # noqa: E402 from src.retrieval.reranker import Reranker # noqa: E402 from src.retrieval.vector_store import VectorStore # noqa: E402 logger = logging.getLogger(__name__) DOCS_DIR = os.path.join(PROJECT_ROOT, "docs") QA_SET_PATH = os.path.join(PROJECT_ROOT, "eval", "qa_set.yaml") RUNS_DIR = os.path.join(PROJECT_ROOT, "eval", "runs") # --------------------------------------------------------------------------- # Run configuration # --------------------------------------------------------------------------- @dataclass(frozen=True) class RunConfig: """Single experiment cell configuration.""" name: str chunking: ChunkStrategy router: str # "react" or "pipeline" top_k: int def _build_run_configs(experiment: str, top_k: int) -> list[RunConfig]: """Build the list of RunConfig cells for the requested experiment. Args: experiment: One of "quick", "chunking", "router", "all". top_k: Top-K passed to every cell. Returns: Ordered list of RunConfig cells to execute. Raises: ValueError: If the experiment name is unknown. """ if experiment == "quick": return [ RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k), ] if experiment == "chunking": return [ RunConfig("fixed_react", ChunkStrategy.FIXED_SIZE, "react", top_k), RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k), RunConfig("semantic_react", ChunkStrategy.SEMANTIC, "react", top_k), ] if experiment == "router": return [ RunConfig("recursive_pipeline", ChunkStrategy.RECURSIVE, "pipeline", top_k), RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k), ] if experiment == "all": return [ RunConfig("fixed_react", ChunkStrategy.FIXED_SIZE, "react", top_k), RunConfig("recursive_react", ChunkStrategy.RECURSIVE, "react", top_k), RunConfig("semantic_react", ChunkStrategy.SEMANTIC, "react", top_k), RunConfig("recursive_pipeline", ChunkStrategy.RECURSIVE, "pipeline", top_k), ] raise ValueError( f"Unknown experiment '{experiment}'. Expected one of: " f"quick, chunking, router, all." ) # --------------------------------------------------------------------------- # QA set loading # --------------------------------------------------------------------------- def _load_qa_set(path: str) -> list[dict[str, Any]]: """Load and validate the curated QA set, keeping only reviewed entries. Args: path: Path to eval/qa_set.yaml. Returns: List of reviewed question dicts. Raises: FileNotFoundError: If the file does not exist. ValueError: If the file is malformed or contains no reviewed entries. """ if not os.path.exists(path): raise FileNotFoundError( f"QA set not found at {path}. Run scripts/generate_qa_set.py first, " f"then curate the draft into eval/qa_set.yaml." ) with open(path, "r", encoding="utf-8") as fh: data = yaml.safe_load(fh) if isinstance(data, dict): questions = data.get("questions", []) else: questions = data if not isinstance(questions, list): raise ValueError(f"Invalid QA set format in {path}: expected list of questions.") reviewed = [q for q in questions if isinstance(q, dict) and q.get("reviewed")] if not reviewed: raise ValueError( f"No reviewed questions in {path}. " f"Set 'reviewed: true' on entries you want to keep." ) logger.info("Loaded %d reviewed questions from %s", len(reviewed), path) return reviewed # --------------------------------------------------------------------------- # Index building (with caching across cells) # --------------------------------------------------------------------------- @dataclass class BuiltIndex: """Bundle of artifacts produced by ingesting docs/ with one chunking strategy.""" vector_store: VectorStore bm25: BM25Search embedder: Embedder qdrant_path: str # tmp dir owned by this index, cleaned up at the end def _build_index( chunking: ChunkStrategy, settings: Settings, embeddings: Any, ) -> BuiltIndex: """Ingest docs/ with the given chunking strategy and build dense + sparse indices. Args: chunking: ChunkStrategy enum value. settings: Application settings. embeddings: LangChain Embeddings instance. Returns: BuiltIndex bundle ready to feed into a router. """ logger.info("Building index for chunking=%s ...", chunking.value) pipeline = IngestionPipeline( strategy=chunking, chunk_size=settings.chunk_size, chunk_overlap=settings.chunk_overlap, embeddings=embeddings if chunking == ChunkStrategy.SEMANTIC else None, ) chunks = pipeline.ingest_directory(DOCS_DIR) if not chunks: raise RuntimeError(f"No chunks produced for chunking={chunking.value}") logger.info(" -> %d chunks", len(chunks)) embedder = Embedder(embeddings) vectors = embedder.embed_batch([c.text for c in chunks]) qdrant_path = tempfile.mkdtemp(prefix=f"eval_qdrant_{chunking.value}_") vector_store = VectorStore( path=qdrant_path, collection_name=f"eval_{chunking.value}", dimension=settings.embedding_dimension, ) vector_store.add_chunks(chunks, vectors) bm25 = BM25Search() bm25.index(chunks) return BuiltIndex( vector_store=vector_store, bm25=bm25, embedder=embedder, qdrant_path=qdrant_path, ) # --------------------------------------------------------------------------- # Router construction # --------------------------------------------------------------------------- def _build_router( router_kind: str, index: BuiltIndex, reranker: Reranker, llm: Any, settings: Settings, ) -> Any: """Build the requested router (PlanAndExecuteRouter or QueryRouter). Args: router_kind: "react" for PlanAndExecuteRouter, "pipeline" for QueryRouter. index: BuiltIndex bundle. reranker: Shared Reranker instance. llm: Generation LLM instance. settings: Application settings. Returns: A router instance exposing ``route(query, top_k)``. """ hybrid = HybridRetriever( vector_store=index.vector_store, bm25_search=index.bm25, embedder=index.embedder, dense_weight=settings.dense_weight, bm25_weight=settings.bm25_weight, ) if router_kind == "react": return PlanAndExecuteRouter( llm=llm, hybrid_retriever=hybrid, reranker=reranker, vector_store=index.vector_store, default_top_k=settings.top_k, ) if router_kind == "pipeline": classifier = IntentClassifier(llm=llm, model_name=settings.generation_model) llm_chain = llm | StrOutputParser() return QueryRouter( intent_classifier=classifier, hybrid_retriever=hybrid, reranker=reranker, llm_chain=llm_chain, translate_query=settings.translate_query, ) raise ValueError(f"Unknown router kind: {router_kind!r}") # --------------------------------------------------------------------------- # Per-cell run # --------------------------------------------------------------------------- def _generate_records( config: RunConfig, qa_set: list[dict[str, Any]], index: BuiltIndex, reranker: Reranker, llm: Any, settings: Settings, ) -> list[dict[str, Any]]: """Run the router over the QA set and collect raw records. Args: config: Cell configuration. qa_set: Reviewed test questions. index: Pre-built dense + sparse indices for this chunking. reranker: Shared Reranker. llm: Generation LLM. settings: Application settings. Returns: List of raw record dicts (one per question), each containing the generated answer and retrieved contexts. """ router = _build_router(config.router, index, reranker, llm, settings) records: list[dict[str, Any]] = [] for entry in qa_set: question = entry["question"] logger.info(" Q: %s", question) try: response: GenerationResponse = router.route(query=question, top_k=config.top_k) except Exception as exc: logger.error("Router failed for question %r: %s", question, exc) response = GenerationResponse( answer="", sources=[], intent=None, # type: ignore[arg-type] confidence=0.0, ) ctx_texts = [r.chunk.text for r in response.sources] records.append( { "question": question, "answer": response.answer, "retrieved_contexts": ctx_texts, "reference_en": entry.get("reference_en", ""), "source_quote_da": entry.get("source_quote_da", ""), "source_doc": entry.get("source_doc", ""), "category": entry.get("category", ""), } ) return records def _judge_records( raw_records: list[dict[str, Any]], judge: RAGEvaluator, ) -> dict[str, Any]: """Run the RAGAS judge over a list of pre-computed raw records. Args: raw_records: List of records as produced by ``_generate_records``. judge: RAGEvaluator wrapping the judge LLM. Returns: Dict with ``aggregate`` and ``per_sample`` keys from the judge. """ questions = [r["question"] for r in raw_records] answers = [r["answer"] for r in raw_records] contexts = [r["retrieved_contexts"] for r in raw_records] ground_truths: list[str | dict[str, Any]] = [ { "reference_en": r.get("reference_en", ""), "source_quote_da": r.get("source_quote_da", ""), } for r in raw_records ] return judge.evaluate( questions=questions, answers=answers, contexts=contexts, ground_truths=ground_truths, ) def _run_cell( config: RunConfig, qa_set: list[dict[str, Any]], index: BuiltIndex, reranker: Reranker, llm: Any, judge: RAGEvaluator, settings: Settings, checkpoint_path: Path, ) -> dict[str, Any]: """Run one experiment cell end-to-end. The raw records (questions / answers / contexts) are checkpointed to ``checkpoint_path`` BEFORE the RAGAS judge is invoked, so a judge crash does not waste the generation work. Use ``--rejudge `` to re-run the judge step on a saved checkpoint. Args: config: The cell configuration. qa_set: List of reviewed test questions. index: Pre-built dense + sparse indices for this chunking. reranker: Shared Reranker. llm: Generation LLM. judge: RAGEvaluator wrapping the judge LLM. settings: Application settings. checkpoint_path: Where to write the raw-records checkpoint. Returns: Dict with config metadata, aggregate scores, per-sample scores, and the raw answers/contexts collected from the router. """ logger.info("=== Running cell %s ===", config.name) raw_records = _generate_records(config, qa_set, index, reranker, llm, settings) # Checkpoint BEFORE the judge so a RAGAS crash does not waste generation. checkpoint_payload = { "config": { "name": config.name, "chunking": config.chunking.value, "router": config.router, "top_k": config.top_k, }, "raw_records": raw_records, "n_samples": len(raw_records), } _write_json(checkpoint_path, checkpoint_payload) logger.info("Wrote raw-records checkpoint: %s", checkpoint_path) logger.info(" -> calling RAGAS judge ...") eval_result = _judge_records(raw_records, judge) return { "config": { "name": config.name, "chunking": config.chunking.value, "router": config.router, "top_k": config.top_k, }, "aggregate": eval_result["aggregate"], "per_sample": eval_result["per_sample"], "raw_records": raw_records, "n_samples": len(qa_set), } # --------------------------------------------------------------------------- # Persistence # --------------------------------------------------------------------------- def _safe_json(obj: Any) -> Any: """Convert non-JSON-serialisable values (numpy / pandas) to plain Python.""" if hasattr(obj, "item") and callable(obj.item): try: return obj.item() except Exception: # noqa: BLE001 return str(obj) if isinstance(obj, dict): return {k: _safe_json(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_safe_json(v) for v in obj] return obj def _write_json(path: Path, payload: dict[str, Any]) -> None: """Write a JSON result file with safe serialisation.""" with open(path, "w", encoding="utf-8") as fh: json.dump(_safe_json(payload), fh, indent=2, ensure_ascii=False) def _format_markdown( timestamp: str, settings: Settings, cell_results: list[dict[str, Any]], ) -> str: """Format the matrix results as a Markdown report. Args: timestamp: ISO timestamp for the report header. settings: Application settings (used for env metadata). cell_results: One dict per cell, as produced by _run_cell. Returns: Markdown string with a metadata block and one aggregate table. """ judge_provider = settings.evaluator_llm_provider or settings.llm_provider judge_model_label = ( settings.evaluator_llm_model or { "groq": settings.groq_model, "openai": settings.openai_model, "anthropic": settings.anthropic_model, "google_genai": settings.google_model, "ollama": settings.ollama_model, }.get(judge_provider, "(provider default)") ) lines: list[str] = [] lines.append(f"# RAGAS Evaluation — {timestamp}") lines.append("") lines.append("## Setup") lines.append("") lines.append(f"- Generation LLM: `{settings.llm_provider}` / " f"`{settings.groq_model if settings.llm_provider == 'groq' else settings.generation_model}`") lines.append(f"- Judge LLM: `{judge_provider}` / `{judge_model_label}`") lines.append(f"- Embeddings: `{settings.embedding_provider}` / `{settings.local_embedding_model}`") lines.append(f"- Reranker: `{settings.reranker_model}`") lines.append(f"- Samples: {cell_results[0]['n_samples'] if cell_results else 0}") lines.append("") if not cell_results: return "\n".join(lines) metric_keys: list[str] = [] for cell in cell_results: for key in cell["aggregate"].keys(): if key not in metric_keys: metric_keys.append(key) header = ["Config", "Chunking", "Router", "top_k", *metric_keys] lines.append("## Aggregate Scores") lines.append("") lines.append("| " + " | ".join(header) + " |") lines.append("|" + "|".join("---" for _ in header) + "|") for cell in cell_results: cfg = cell["config"] row = [ cfg["name"], cfg["chunking"], cfg["router"], str(cfg["top_k"]), ] for key in metric_keys: value = cell["aggregate"].get(key) row.append(f"{value:.4f}" if isinstance(value, (int, float)) else "—") lines.append("| " + " | ".join(row) + " |") lines.append("") return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: """Parse command-line arguments.""" parser = argparse.ArgumentParser( description="Run a RAGAS evaluation matrix over the curated test set.", ) parser.add_argument( "--experiment", choices=["quick", "chunking", "router", "all"], default="quick", help="Pre-defined experiment to run (default: quick).", ) parser.add_argument("--top-k", type=int, default=5, help="Top-K per query (default 5).") parser.add_argument( "--qa-set", type=str, default=QA_SET_PATH, help=f"Path to curated QA set YAML (default: {QA_SET_PATH}).", ) parser.add_argument( "--runs-dir", type=str, default=RUNS_DIR, help=f"Directory for output JSON+MD files (default: {RUNS_DIR}).", ) parser.add_argument( "--rejudge", type=str, default="", metavar="CHECKPOINT_PATH", help=( "Re-run only the RAGAS judge on a previously saved checkpoint " "(eval/runs/_.checkpoint.json). Skips index building " "and router invocation entirely." ), ) return parser.parse_args() def _rejudge_from_checkpoint( checkpoint_path: str, settings: Settings, runs_dir: Path, timestamp: str, ) -> None: """Re-run only the RAGAS judge against a saved raw-records checkpoint. Skips index building, router invocation, and the temp Qdrant lifecycle entirely. The new judged result is written next to the runs dir under a fresh timestamp. Args: checkpoint_path: Path to a checkpoint.json file from a prior run. settings: Application settings (used to build the judge LLM). runs_dir: Directory for output files. timestamp: Timestamp string used in output filenames. """ logger.info("Rejudge mode: loading checkpoint %s", checkpoint_path) with open(checkpoint_path, "r", encoding="utf-8") as fh: checkpoint = json.load(fh) config_dict = checkpoint.get("config", {}) raw_records = checkpoint.get("raw_records", []) if not raw_records: raise ValueError(f"Checkpoint {checkpoint_path} contains no raw_records.") logger.info( "Checkpoint config: %s | %d records", config_dict.get("name", "(unknown)"), len(raw_records), ) judge_llm = create_evaluator_llm(settings) embeddings = create_embeddings(settings) judge = RAGEvaluator(llm=judge_llm, embeddings=embeddings) logger.info("Calling RAGAS judge on cached records ...") eval_result = _judge_records(raw_records, judge) cell_result = { "config": config_dict, "aggregate": eval_result["aggregate"], "per_sample": eval_result["per_sample"], "raw_records": raw_records, "n_samples": len(raw_records), } cell_name = config_dict.get("name", "rejudged") out_json = runs_dir / f"{timestamp}_{cell_name}.rejudged.json" out_md = runs_dir / f"{timestamp}_{cell_name}.rejudged.md" _write_json(out_json, cell_result) with open(out_md, "w", encoding="utf-8") as fh: fh.write(_format_markdown(timestamp, settings, [cell_result])) logger.info("Wrote rejudged JSON: %s", out_json) logger.info("Wrote rejudged Markdown: %s", out_md) print(f"\nRejudge done. Markdown report: {out_md}") def main() -> None: """Run the requested experiment matrix and persist results.""" args = parse_args() settings = load_settings() logging.basicConfig( level=getattr(logging, settings.log_level.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) timestamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S") runs_dir = Path(args.runs_dir) runs_dir.mkdir(parents=True, exist_ok=True) if args.rejudge: logger.info("=== RAGAS Rejudge Start (%s) ===", timestamp) _rejudge_from_checkpoint(args.rejudge, settings, runs_dir, timestamp) return logger.info("=== RAGAS Matrix Evaluation Start (%s) ===", timestamp) qa_set = _load_qa_set(args.qa_set) run_configs = _build_run_configs(args.experiment, args.top_k) logger.info("Experiment: %s | %d cells | %d questions", args.experiment, len(run_configs), len(qa_set)) # --- Build shared resources ------------------------------------------- llm = create_llm(settings) judge_llm = create_evaluator_llm(settings) embeddings = create_embeddings(settings) judge = RAGEvaluator(llm=judge_llm, embeddings=embeddings) reranker = Reranker(model=create_reranker(settings.reranker_model)) # --- Build indices once per chunking strategy ------------------------- indices: dict[ChunkStrategy, BuiltIndex] = {} needed_chunkings = {cfg.chunking for cfg in run_configs} for chunking in needed_chunkings: indices[chunking] = _build_index(chunking, settings, embeddings) # --- Run cells --------------------------------------------------------- cell_results: list[dict[str, Any]] = [] try: for config in run_configs: checkpoint_path = runs_dir / f"{timestamp}_{config.name}.checkpoint.json" cell_result = _run_cell( config=config, qa_set=qa_set, index=indices[config.chunking], reranker=reranker, llm=llm, judge=judge, settings=settings, checkpoint_path=checkpoint_path, ) cell_results.append(cell_result) # Per-cell judged-result JSON cell_json_path = runs_dir / f"{timestamp}_{config.name}.json" _write_json(cell_json_path, cell_result) logger.info("Wrote cell result: %s", cell_json_path) finally: # --- Combined JSON + Markdown report ------------------------------- combined_path = runs_dir / f"{timestamp}_{args.experiment}.json" md_path = runs_dir / f"{timestamp}_{args.experiment}.md" _write_json( combined_path, { "timestamp": timestamp, "experiment": args.experiment, "qa_set_path": args.qa_set, "n_samples": len(qa_set), "cells": cell_results, }, ) with open(md_path, "w", encoding="utf-8") as fh: fh.write(_format_markdown(timestamp, settings, cell_results)) logger.info("Wrote combined JSON: %s", combined_path) logger.info("Wrote Markdown report: %s", md_path) # --- Cleanup tmp Qdrant dirs --------------------------------------- for index in indices.values(): shutil.rmtree(index.qdrant_path, ignore_errors=True) logger.info("Cleaned up %d tmp Qdrant dirs", len(indices)) print(f"\nDone. Markdown report: {md_path}") if __name__ == "__main__": main()