Spaces:
Running
Running
| """Evaluation runner — runs the RAG pipeline on an eval dataset and scores it. | |
| Produces per-example and aggregate metrics for both retrieval and generation. | |
| """ | |
| import logging | |
| from dataclasses import dataclass, field | |
| from src.evaluation.dataset import EvalDataset, EvalExample | |
| from src.evaluation.metrics import evaluate_generation, evaluate_retrieval | |
| from src.generation.rag_engine import RAGEngine | |
| logger = logging.getLogger(__name__) | |
| class ExampleResult: | |
| """Evaluation results for a single example.""" | |
| query: str | |
| retrieval_metrics: dict | |
| generation_metrics: dict | |
| retrieved_paper_ids: list[str] | |
| answer: str | |
| class EvalReport: | |
| """Aggregate evaluation report.""" | |
| dataset_name: str | |
| num_examples: int | |
| example_results: list[ExampleResult] | |
| aggregate_retrieval: dict = field(default_factory=dict) | |
| aggregate_generation: dict = field(default_factory=dict) | |
| class EvalRunner: | |
| """Runs evaluation on a dataset using the RAG engine.""" | |
| def __init__(self, rag_engine: RAGEngine, top_k: int = 5): | |
| self.engine = rag_engine | |
| self.top_k = top_k | |
| def evaluate_example(self, example: EvalExample) -> ExampleResult: | |
| """Evaluate a single example.""" | |
| response = self.engine.query(question=example.query, top_k=self.top_k) | |
| retrieved_paper_ids = [s["paper_id"] for s in response.sources] | |
| relevant_ids = set(example.relevant_paper_ids) | |
| retrieval_metrics = evaluate_retrieval( | |
| retrieved_ids=retrieved_paper_ids, | |
| relevant_ids=relevant_ids, | |
| ) | |
| generation_metrics = evaluate_generation( | |
| prediction=response.answer, | |
| reference=example.reference_answer, | |
| keywords=example.keywords if example.keywords else None, | |
| ) | |
| return ExampleResult( | |
| query=example.query, | |
| retrieval_metrics=retrieval_metrics, | |
| generation_metrics=generation_metrics, | |
| retrieved_paper_ids=retrieved_paper_ids, | |
| answer=response.answer, | |
| ) | |
| def run(self, dataset: EvalDataset) -> EvalReport: | |
| """Run evaluation on the full dataset.""" | |
| logger.info("Running evaluation on %r (%d examples)", dataset.name, len(dataset)) | |
| results = [] | |
| for i, example in enumerate(dataset): | |
| logger.info("Evaluating %d/%d: %r", i + 1, len(dataset), example.query) | |
| result = self.evaluate_example(example) | |
| results.append(result) | |
| report = EvalReport( | |
| dataset_name=dataset.name, | |
| num_examples=len(results), | |
| example_results=results, | |
| ) | |
| # Aggregate retrieval metrics | |
| if results: | |
| all_ret_keys = results[0].retrieval_metrics.keys() | |
| report.aggregate_retrieval = { | |
| key: sum(r.retrieval_metrics[key] for r in results) / len(results) | |
| for key in all_ret_keys | |
| } | |
| # Collect union of all generation metric keys (some examples may | |
| # have keywords and others may not, so keys can differ). | |
| all_gen_keys: set[str] = set() | |
| for r in results: | |
| all_gen_keys.update(r.generation_metrics.keys()) | |
| report.aggregate_generation = { | |
| key: sum(r.generation_metrics.get(key, 0.0) for r in results) / len(results) | |
| for key in sorted(all_gen_keys) | |
| } | |
| logger.info( | |
| "Evaluation complete — retrieval: %s, generation: %s", | |
| {k: round(v, 3) for k, v in report.aggregate_retrieval.items()}, | |
| {k: round(v, 3) for k, v in report.aggregate_generation.items()}, | |
| ) | |
| return report | |