Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced Evaluation Runner with Deterministic Groundedness | |
| Integrates deterministic evaluation controls with the existing evaluation system | |
| to provide reproducible groundedness and citation accuracy measurements. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import requests | |
| from tqdm import tqdm | |
| from .deterministic import ( | |
| evaluate_citation_accuracy_deterministic, | |
| evaluate_groundedness_deterministic, | |
| get_evaluation_seed, | |
| setup_deterministic_evaluation, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class EnhancedEvaluationRunner: | |
| """ | |
| Enhanced evaluation runner with deterministic groundedness evaluation. | |
| Combines the original evaluation functionality with improved: | |
| - Deterministic groundedness scoring | |
| - Enhanced citation accuracy validation | |
| - Reproducible evaluation results | |
| - Fallback mechanisms for API failures | |
| """ | |
| def __init__( | |
| self, | |
| target_url: str = None, | |
| chat_endpoint: str = "/chat", | |
| timeout: int = 30, | |
| evaluation_seed: Optional[int] = None, | |
| ): | |
| """Initialize enhanced evaluation runner.""" | |
| self.target_url = target_url or os.getenv( | |
| "EVAL_TARGET_URL", "https://msse-team-3-ai-engineering-project.hf.space" | |
| ) | |
| self.chat_endpoint = chat_endpoint | |
| self.timeout = timeout | |
| # Setup deterministic evaluation | |
| self.evaluation_seed = evaluation_seed or get_evaluation_seed() | |
| self.deterministic_evaluator = setup_deterministic_evaluation(self.evaluation_seed) | |
| # Results storage | |
| self.results = [] | |
| self.latencies = [] | |
| self.groundedness_scores = [] | |
| self.citation_scores = [] | |
| logger.info(f"Enhanced evaluation runner initialized with seed: {self.evaluation_seed}") | |
| def evaluate_single_query(self, question: Dict[str, Any], gold_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Evaluate a single query with enhanced groundedness and citation accuracy. | |
| Args: | |
| question: Question dictionary with id and question text | |
| gold_data: Gold standard data with expected answer and sources | |
| Returns: | |
| Comprehensive evaluation result dictionary | |
| """ | |
| query_id = str(question["id"]) | |
| question_text = question["question"] | |
| # Prepare API request | |
| payload = {"message": question_text, "include_sources": True} | |
| url = self.target_url.rstrip("/") + self.chat_endpoint | |
| # Track timing | |
| start_time = time.time() | |
| try: | |
| # Make API request | |
| response = requests.post(url, json=payload, timeout=self.timeout) | |
| latency = time.time() - start_time | |
| self.latencies.append(latency) | |
| if response.status_code != 200: | |
| return { | |
| "id": query_id, | |
| "question": question_text, | |
| "status_code": response.status_code, | |
| "error": response.text, | |
| "latency_s": latency, | |
| } | |
| # Parse response | |
| data = response.json() | |
| response_text = data.get("response", "") | |
| returned_sources = data.get("sources", []) or [] | |
| # Get gold standard data | |
| gold_answer = gold_data.get("answer", "") | |
| expected_sources = gold_data.get("expected_sources", []) | |
| # Enhanced groundedness evaluation | |
| groundedness_metrics = self._evaluate_groundedness_enhanced(response_text, returned_sources, gold_answer) | |
| # Deterministic citation accuracy | |
| citation_metrics = evaluate_citation_accuracy_deterministic( | |
| response_text, returned_sources, expected_sources, self.deterministic_evaluator | |
| ) | |
| # Traditional overlap score for comparison | |
| overlap_score = self._calculate_token_overlap(gold_answer, response_text) | |
| # Store metrics for aggregation | |
| self.groundedness_scores.append(groundedness_metrics["groundedness_score"]) | |
| self.citation_scores.append(citation_metrics["citation_accuracy"]) | |
| return { | |
| "id": query_id, | |
| "question": question_text, | |
| "response": response_text, | |
| "latency_s": latency, | |
| # Enhanced metrics | |
| "groundedness_metrics": groundedness_metrics, | |
| "citation_metrics": citation_metrics, | |
| # Traditional metrics for comparison | |
| "overlap_score": overlap_score, | |
| "returned_sources": returned_sources, | |
| "expected_sources": expected_sources, | |
| # Metadata | |
| "evaluation_seed": self.evaluation_seed, | |
| "timestamp": time.time(), | |
| } | |
| except Exception as e: | |
| latency = time.time() - start_time | |
| self.latencies.append(latency) | |
| return { | |
| "id": query_id, | |
| "question": question_text, | |
| "status_code": "error", | |
| "error": str(e), | |
| "latency_s": latency, | |
| } | |
| def _evaluate_groundedness_enhanced( | |
| self, response_text: str, returned_sources: List[Dict[str, Any]], gold_answer: str | |
| ) -> Dict[str, float]: | |
| """ | |
| Enhanced groundedness evaluation with multiple approaches. | |
| Combines: | |
| 1. Deterministic source-based groundedness | |
| 2. Reference comparison | |
| 3. Factual consistency checks | |
| """ | |
| # Extract source passages | |
| source_passages = [] | |
| for source in returned_sources: | |
| if isinstance(source, dict): | |
| # Try different keys for content | |
| content = ( | |
| source.get("content") or source.get("text") or source.get("snippet") or source.get("passage", "") | |
| ) | |
| if content: | |
| source_passages.append(str(content)) | |
| else: | |
| source_passages.append(str(source)) | |
| # Deterministic source-based groundedness | |
| source_groundedness = evaluate_groundedness_deterministic( | |
| response_text, source_passages, self.deterministic_evaluator | |
| ) | |
| # Reference-based groundedness (compare to gold answer) | |
| reference_groundedness = evaluate_groundedness_deterministic( | |
| response_text, [gold_answer] if gold_answer else [], self.deterministic_evaluator | |
| ) | |
| # Combine metrics with appropriate weighting | |
| combined_score = ( | |
| source_groundedness["groundedness_score"] * 0.7 # Source-based primary | |
| + reference_groundedness["groundedness_score"] * 0.3 # Reference secondary | |
| ) | |
| # Compile comprehensive metrics | |
| metrics = { | |
| "groundedness_score": combined_score, | |
| "source_groundedness": source_groundedness["groundedness_score"], | |
| "reference_groundedness": reference_groundedness["groundedness_score"], | |
| "passage_coverage": source_groundedness["passage_coverage"], | |
| "token_overlap": source_groundedness["token_overlap"], | |
| "exact_matches": source_groundedness["exact_matches"], | |
| "num_sources_used": len(source_passages), | |
| } | |
| return self.deterministic_evaluator.normalize_metrics(metrics) | |
| def _calculate_token_overlap(self, gold: str, response: str) -> float: | |
| """Calculate traditional token overlap score for comparison.""" | |
| if not gold.strip(): | |
| return 0.0 | |
| gold_tokens = set(gold.lower().split()) | |
| response_tokens = set(response.lower().split()) | |
| if not gold_tokens: | |
| return 0.0 | |
| overlap = gold_tokens & response_tokens | |
| return len(overlap) / len(gold_tokens) | |
| def run_evaluation(self, questions_file: str, gold_file: str, output_file: str = None) -> Dict[str, Any]: | |
| """ | |
| Run comprehensive evaluation with enhanced groundedness. | |
| Args: | |
| questions_file: Path to questions JSON file | |
| gold_file: Path to gold answers JSON file | |
| output_file: Optional output file path | |
| Returns: | |
| Complete evaluation results dictionary | |
| """ | |
| # Load data | |
| with open(questions_file, "r", encoding="utf-8") as f: | |
| questions = json.load(f) | |
| with open(gold_file, "r", encoding="utf-8") as f: | |
| gold_data = json.load(f) | |
| logger.info(f"Starting enhanced evaluation with {len(questions)} questions") | |
| # Process questions in deterministic order | |
| sorted_questions = self.deterministic_evaluator.ensure_deterministic_order( | |
| questions, key_func=lambda x: str(x.get("id", "")) | |
| ) | |
| # Reset results for fresh run | |
| self.results = [] | |
| self.latencies = [] | |
| self.groundedness_scores = [] | |
| self.citation_scores = [] | |
| # Evaluate each question | |
| for question in tqdm(sorted_questions, desc="Evaluating questions"): | |
| query_id = str(question["id"]) | |
| gold_info = gold_data.get(query_id, {}) | |
| result = self.evaluate_single_query(question, gold_info) | |
| self.results.append(result) | |
| # Calculate summary metrics | |
| summary = self._calculate_summary_metrics() | |
| # Prepare output | |
| output = { | |
| "summary": summary, | |
| "results": self.deterministic_evaluator.sort_evaluation_results(self.results), | |
| "configuration": { | |
| "target_url": self.target_url, | |
| "evaluation_seed": self.evaluation_seed, | |
| "deterministic_mode": True, | |
| "timestamp": time.time(), | |
| }, | |
| } | |
| # Save results | |
| if output_file: | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(output, f, indent=2) | |
| logger.info(f"Enhanced evaluation results saved to {output_file}") | |
| return output | |
| def _calculate_summary_metrics(self) -> Dict[str, Any]: | |
| """Calculate comprehensive summary metrics.""" | |
| successful_results = [r for r in self.results if "error" not in r] | |
| summary = { | |
| "target_url": self.target_url, | |
| "n_questions": len(self.results), | |
| "n_successful": len(successful_results), | |
| "evaluation_seed": self.evaluation_seed, | |
| } | |
| # Latency metrics | |
| if self.latencies: | |
| sorted_latencies = sorted(self.latencies) | |
| summary.update( | |
| { | |
| "latency_p50_s": sorted_latencies[len(sorted_latencies) // 2], | |
| "latency_p95_s": sorted_latencies[max(0, int(len(sorted_latencies) * 0.95) - 1)], | |
| "avg_latency_s": sum(self.latencies) / len(self.latencies), | |
| "max_latency_s": max(self.latencies), | |
| "min_latency_s": min(self.latencies), | |
| } | |
| ) | |
| # Enhanced groundedness metrics | |
| if self.groundedness_scores: | |
| summary.update( | |
| { | |
| "avg_groundedness": sum(self.groundedness_scores) / len(self.groundedness_scores), | |
| "min_groundedness": min(self.groundedness_scores), | |
| "max_groundedness": max(self.groundedness_scores), | |
| } | |
| ) | |
| # Citation accuracy metrics | |
| if self.citation_scores: | |
| summary.update( | |
| { | |
| "avg_citation_accuracy": sum(self.citation_scores) / len(self.citation_scores), | |
| "min_citation_accuracy": min(self.citation_scores), | |
| "max_citation_accuracy": max(self.citation_scores), | |
| } | |
| ) | |
| # Traditional overlap scores for comparison | |
| overlap_scores = [ | |
| r.get("overlap_score", 0) for r in successful_results if isinstance(r.get("overlap_score"), (int, float)) | |
| ] | |
| if overlap_scores: | |
| summary["avg_overlap"] = sum(overlap_scores) / len(overlap_scores) | |
| # Normalize all metrics | |
| return self.deterministic_evaluator.normalize_metrics(summary) | |
| def print_summary(self) -> None: | |
| """Print a formatted summary of evaluation results.""" | |
| if not self.results: | |
| print("No evaluation results available.") | |
| return | |
| summary = self._calculate_summary_metrics() | |
| print("\n" + "=" * 70) | |
| print("ENHANCED RAG EVALUATION SUMMARY") | |
| print("=" * 70) | |
| print(f"Target URL: {summary['target_url']}") | |
| print(f"Evaluation Seed: {summary['evaluation_seed']}") | |
| print(f"Questions: {summary['n_successful']}/{summary['n_questions']} successful") | |
| print() | |
| print("PERFORMANCE METRICS:") | |
| print("-" * 25) | |
| if "avg_latency_s" in summary: | |
| print(f" Average Latency: {summary['avg_latency_s']:.3f}s") | |
| print(f" P50 Latency: {summary['latency_p50_s']:.3f}s") | |
| print(f" P95 Latency: {summary['latency_p95_s']:.3f}s") | |
| print() | |
| print("GROUNDEDNESS EVALUATION:") | |
| print("-" * 26) | |
| if "avg_groundedness" in summary: | |
| print(f" Average Groundedness: {summary['avg_groundedness']:.4f}") | |
| print(f" Min Groundedness: {summary['min_groundedness']:.4f}") | |
| print(f" Max Groundedness: {summary['max_groundedness']:.4f}") | |
| print() | |
| print("CITATION ACCURACY:") | |
| print("-" * 19) | |
| if "avg_citation_accuracy" in summary: | |
| print(f" Average Citation Accuracy: {summary['avg_citation_accuracy']:.4f}") | |
| print(f" Min Citation Accuracy: {summary['min_citation_accuracy']:.4f}") | |
| print(f" Max Citation Accuracy: {summary['max_citation_accuracy']:.4f}") | |
| print() | |
| if "avg_overlap" in summary: | |
| print("COMPARISON METRICS:") | |
| print("-" * 20) | |
| print(f" Traditional Overlap Score: {summary['avg_overlap']:.4f}") | |
| print("=" * 70) | |
| def run_enhanced_evaluation( | |
| questions_file: str = None, | |
| gold_file: str = None, | |
| output_file: str = None, | |
| target_url: str = None, | |
| evaluation_seed: int = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convenience function to run enhanced evaluation. | |
| Args: | |
| questions_file: Path to questions JSON (default: evaluation/questions.json) | |
| gold_file: Path to gold answers JSON (default: evaluation/gold_answers.json) | |
| output_file: Output file path (default: evaluation/enhanced_results.json) | |
| target_url: Target API URL (default: from environment) | |
| evaluation_seed: Random seed for reproducibility (default: from environment) | |
| Returns: | |
| Complete evaluation results | |
| """ | |
| # Set defaults | |
| eval_dir = Path(__file__).parent.parent.parent / "evaluation" | |
| questions_file = questions_file or str(eval_dir / "questions.json") | |
| gold_file = gold_file or str(eval_dir / "gold_answers.json") | |
| output_file = output_file or str(eval_dir / "enhanced_results.json") | |
| # Initialize runner | |
| runner = EnhancedEvaluationRunner(target_url=target_url, evaluation_seed=evaluation_seed) | |
| # Run evaluation | |
| results = runner.run_evaluation(questions_file, gold_file, output_file) | |
| # Print summary | |
| runner.print_summary() | |
| return results | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Run enhanced RAG evaluation") | |
| parser.add_argument("--questions", help="Questions JSON file") | |
| parser.add_argument("--gold", help="Gold answers JSON file") | |
| parser.add_argument("--output", help="Output results file") | |
| parser.add_argument("--target", help="Target API URL") | |
| parser.add_argument("--seed", type=int, help="Evaluation seed") | |
| args = parser.parse_args() | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| # Run evaluation | |
| run_enhanced_evaluation( | |
| questions_file=args.questions, | |
| gold_file=args.gold, | |
| output_file=args.output, | |
| target_url=args.target, | |
| evaluation_seed=args.seed, | |
| ) | |