Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Evaluation Runner | |
| Orchestrates comprehensive evaluation of the RAG system using all available metrics. | |
| Provides automated testing pipeline and performance monitoring capabilities. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from dataclasses import asdict | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from .core import BenchmarkResults, EvaluationMetrics, EvaluationResult | |
| from .metrics import ( | |
| CitationAccuracyTracker, | |
| ErrorTracker, | |
| LatencyTracker, | |
| TaskCompletionTracker, | |
| ThroughputTracker, | |
| UserSatisfactionTracker, | |
| calculate_bert_score, | |
| calculate_bleu_score, | |
| calculate_faithfulness_score, | |
| calculate_rouge_scores, | |
| mean_reciprocal_rank, | |
| ndcg_at_k, | |
| precision_at_k, | |
| recall_at_k, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class EvaluationRunner: | |
| """ | |
| Main evaluation runner that orchestrates comprehensive RAG system evaluation. | |
| Supports: | |
| - Retrieval quality assessment (precision@K, recall@K, MRR, NDCG) | |
| - Generation quality evaluation (BLEU, ROUGE, BERTScore, faithfulness) | |
| - System performance monitoring (latency, throughput, error rates) | |
| - User experience metrics (satisfaction, task completion, citations) | |
| """ | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| """Initialize evaluation runner with configuration.""" | |
| self.config = config or self._get_default_config() | |
| # Initialize trackers | |
| self.latency_tracker = LatencyTracker() | |
| self.throughput_tracker = ThroughputTracker() | |
| self.error_tracker = ErrorTracker() | |
| self.satisfaction_tracker = UserSatisfactionTracker() | |
| self.completion_tracker = TaskCompletionTracker() | |
| self.citation_tracker = CitationAccuracyTracker() | |
| # Results storage | |
| self.results: List[EvaluationResult] = [] | |
| def _get_default_config(self) -> Dict[str, Any]: | |
| """Get default evaluation configuration.""" | |
| return { | |
| "retrieval_k_values": [1, 3, 5, 10], | |
| "generation_metrics": ["bleu", "rouge", "bert_score", "faithfulness"], | |
| "system_metrics": ["latency", "throughput", "error_rate"], | |
| "user_metrics": ["satisfaction", "task_completion", "citation_accuracy"], | |
| "output_dir": "evaluation_results", | |
| "save_detailed_results": True, | |
| "log_level": "INFO", | |
| } | |
| def evaluate_retrieval( | |
| self, | |
| retrieved_docs: List[str], | |
| relevant_docs: List[str], | |
| query_id: Optional[str] = None, | |
| ) -> Dict[str, float]: | |
| """ | |
| Evaluate retrieval quality for a single query. | |
| Args: | |
| retrieved_docs: List of retrieved document IDs in ranked order | |
| relevant_docs: List of relevant document IDs (ground truth) | |
| query_id: Optional query identifier for tracking | |
| Returns: | |
| Dictionary containing retrieval metrics | |
| """ | |
| relevant_set = set(relevant_docs) | |
| metrics = {} | |
| # Calculate metrics for different K values | |
| for k in self.config["retrieval_k_values"]: | |
| if k <= len(retrieved_docs): | |
| metrics[f"precision_at_{k}"] = precision_at_k(retrieved_docs, relevant_set, k) | |
| metrics[f"recall_at_{k}"] = recall_at_k(retrieved_docs, relevant_set, k) | |
| metrics[f"ndcg_at_{k}"] = ndcg_at_k(retrieved_docs, relevant_set, k) | |
| # Calculate MRR (requires single query format) | |
| if relevant_docs: | |
| mrr = mean_reciprocal_rank([retrieved_docs], [relevant_set]) | |
| metrics["mean_reciprocal_rank"] = mrr | |
| logger.info(f"Retrieval evaluation completed for query {query_id}: {metrics}") | |
| return metrics | |
| def evaluate_generation( | |
| self, | |
| generated_text: str, | |
| reference_text: str, | |
| context: Optional[str] = None, | |
| query_id: Optional[str] = None, | |
| ) -> Dict[str, float]: | |
| """ | |
| Evaluate generation quality for a single response. | |
| Args: | |
| generated_text: Generated response text | |
| reference_text: Reference/ground truth text | |
| context: Optional context used for generation | |
| query_id: Optional query identifier for tracking | |
| Returns: | |
| Dictionary containing generation quality metrics | |
| """ | |
| metrics = {} | |
| # Calculate configured generation metrics | |
| if "bleu" in self.config["generation_metrics"]: | |
| metrics["bleu_score"] = calculate_bleu_score(generated_text, reference_text) | |
| if "rouge" in self.config["generation_metrics"]: | |
| rouge_scores = calculate_rouge_scores(generated_text, reference_text) | |
| metrics.update(rouge_scores) | |
| if "bert_score" in self.config["generation_metrics"]: | |
| bert_score = calculate_bert_score(generated_text, reference_text) | |
| metrics["bert_score"] = bert_score | |
| if "faithfulness" in self.config["generation_metrics"] and context: | |
| metrics["faithfulness_score"] = calculate_faithfulness_score(generated_text, [context]) | |
| logger.info(f"Generation evaluation completed for query {query_id}: {metrics}") | |
| return metrics | |
| def evaluate_system_performance( | |
| self, | |
| start_time: float, | |
| end_time: float, | |
| error_occurred: bool = False, | |
| query_id: Optional[str] = None, | |
| ) -> Dict[str, float]: | |
| """ | |
| Evaluate system performance metrics. | |
| Args: | |
| start_time: Request start timestamp | |
| end_time: Request end timestamp | |
| error_occurred: Whether an error occurred during processing | |
| query_id: Optional query identifier for tracking | |
| Returns: | |
| Dictionary containing system performance metrics | |
| """ | |
| metrics = {} | |
| # Track latency | |
| latency = end_time - start_time | |
| self.latency_tracker.add_measurement(latency) | |
| metrics["latency"] = latency | |
| metrics["avg_latency"] = self.latency_tracker.get_average() | |
| # Track throughput | |
| self.throughput_tracker.add_request() | |
| metrics["current_throughput"] = self.throughput_tracker.get_throughput() | |
| # Track errors | |
| if error_occurred: | |
| self.error_tracker.add_error() | |
| self.error_tracker.add_request() | |
| metrics["error_rate"] = self.error_tracker.get_error_rate() | |
| logger.info(f"System performance evaluation for query {query_id}: {metrics}") | |
| return metrics | |
| def evaluate_user_experience( | |
| self, | |
| satisfaction_score: Optional[float] = None, | |
| task_completed: Optional[bool] = None, | |
| citations_accurate: Optional[bool] = None, | |
| query_id: Optional[str] = None, | |
| ) -> Dict[str, float]: | |
| """ | |
| Evaluate user experience metrics. | |
| Args: | |
| satisfaction_score: User satisfaction rating (1-5) | |
| task_completed: Whether user's task was completed successfully | |
| citations_accurate: Whether citations were accurate | |
| query_id: Optional query identifier for tracking | |
| Returns: | |
| Dictionary containing user experience metrics | |
| """ | |
| metrics = {} | |
| # Track satisfaction | |
| if satisfaction_score is not None: | |
| self.satisfaction_tracker.add_rating(satisfaction_score) | |
| metrics["satisfaction_score"] = satisfaction_score | |
| metrics["avg_satisfaction"] = self.satisfaction_tracker.get_average_satisfaction() | |
| # Track task completion | |
| if task_completed is not None: | |
| self.completion_tracker.add_completion(task_completed) | |
| metrics["task_completed"] = task_completed | |
| metrics["completion_rate"] = self.completion_tracker.get_completion_rate() | |
| # Track citation accuracy | |
| if citations_accurate is not None: | |
| self.citation_tracker.add_citation_check(citations_accurate) | |
| metrics["citations_accurate"] = citations_accurate | |
| metrics["citation_accuracy_rate"] = self.citation_tracker.get_accuracy_rate() | |
| logger.info(f"User experience evaluation for query {query_id}: {metrics}") | |
| return metrics | |
| def run_comprehensive_evaluation(self, test_queries: List[Dict[str, Any]]) -> BenchmarkResults: | |
| """ | |
| Run comprehensive evaluation across all test queries. | |
| Args: | |
| test_queries: List of test query dictionaries containing: | |
| - query: The question/query text | |
| - expected_docs: List of expected relevant documents | |
| - expected_answer: Expected answer text | |
| - query_id: Optional unique identifier | |
| Returns: | |
| BenchmarkResults containing comprehensive evaluation metrics | |
| """ | |
| logger.info(f"Starting comprehensive evaluation with {len(test_queries)} queries") | |
| all_metrics = [] | |
| start_time = time.time() | |
| for i, test_query in enumerate(test_queries): | |
| query_id = test_query.get("query_id", f"query_{i}") | |
| logger.info(f"Evaluating query {i+1}/{len(test_queries)}: {query_id}") | |
| try: | |
| # Initialize evaluation metrics for this query | |
| eval_metrics = EvaluationMetrics() | |
| # Simulate RAG pipeline execution (in real implementation, call actual pipeline) | |
| query_start = time.time() | |
| # TODO: Replace with actual RAG pipeline call | |
| # retrieved_docs, generated_response = rag_pipeline.process(test_query["query"]) | |
| # For now, use mock data (replace in actual implementation) | |
| retrieved_docs = test_query.get("mock_retrieved_docs", []) | |
| generated_response = test_query.get("mock_response", "") | |
| query_end = time.time() | |
| # Evaluate retrieval if expected docs provided | |
| if "expected_docs" in test_query and retrieved_docs: | |
| retrieval_metrics = self.evaluate_retrieval(retrieved_docs, test_query["expected_docs"], query_id) | |
| eval_metrics.retrieval_metrics.update(retrieval_metrics) | |
| # Evaluate generation if expected answer provided | |
| if "expected_answer" in test_query and generated_response: | |
| generation_metrics = self.evaluate_generation( | |
| generated_response, | |
| test_query["expected_answer"], | |
| test_query.get("context", ""), | |
| query_id, | |
| ) | |
| eval_metrics.generation_metrics.update(generation_metrics) | |
| # Evaluate system performance | |
| system_metrics = self.evaluate_system_performance(query_start, query_end, False, query_id) | |
| eval_metrics.system_metrics.update(system_metrics) | |
| # Evaluate user experience (with default values) | |
| user_metrics = self.evaluate_user_experience( | |
| satisfaction_score=test_query.get("satisfaction", 4.0), | |
| task_completed=test_query.get("task_completed", True), | |
| citations_accurate=test_query.get("citations_accurate", True), | |
| query_id=query_id, | |
| ) | |
| eval_metrics.user_metrics.update(user_metrics) | |
| # Store results | |
| result = EvaluationResult( | |
| query_id=query_id, | |
| query=test_query["query"], | |
| metrics=eval_metrics, | |
| timestamp=time.time(), | |
| ) | |
| self.results.append(result) | |
| all_metrics.append(eval_metrics) | |
| except Exception as e: | |
| logger.error(f"Error evaluating query {query_id}: {e}") | |
| # Track error in system metrics | |
| self.evaluate_system_performance(query_start, time.time(), True, query_id) | |
| total_time = time.time() - start_time | |
| # Aggregate results | |
| benchmark_results = self._aggregate_results(all_metrics, total_time) | |
| # Save results if configured | |
| if self.config["save_detailed_results"]: | |
| self._save_results(benchmark_results) | |
| logger.info(f"Comprehensive evaluation completed in {total_time:.2f}s") | |
| return benchmark_results | |
| def _aggregate_results(self, all_metrics: List[EvaluationMetrics], total_time: float) -> BenchmarkResults: | |
| """Aggregate individual evaluation results into benchmark summary.""" | |
| if not all_metrics: | |
| return BenchmarkResults() | |
| # Calculate aggregate retrieval metrics | |
| retrieval_aggregates = {} | |
| for metric_name in [ | |
| "precision_at_1", | |
| "precision_at_3", | |
| "precision_at_5", | |
| "recall_at_1", | |
| "recall_at_3", | |
| "recall_at_5", | |
| "ndcg_at_1", | |
| "ndcg_at_3", | |
| "ndcg_at_5", | |
| "mean_reciprocal_rank", | |
| ]: | |
| values = [ | |
| m.retrieval_metrics.get(metric_name, 0) for m in all_metrics if metric_name in m.retrieval_metrics | |
| ] | |
| if values: | |
| retrieval_aggregates[f"avg_{metric_name}"] = sum(values) / len(values) | |
| # Calculate aggregate generation metrics | |
| generation_aggregates = {} | |
| for metric_name in [ | |
| "bleu_score", | |
| "rouge_1_f1", | |
| "rouge_2_f1", | |
| "rouge_l_f1", | |
| "bert_score_f1", | |
| "faithfulness_score", | |
| ]: | |
| values = [ | |
| m.generation_metrics.get(metric_name, 0) for m in all_metrics if metric_name in m.generation_metrics | |
| ] | |
| if values: | |
| generation_aggregates[f"avg_{metric_name}"] = sum(values) / len(values) | |
| # System metrics aggregates | |
| system_aggregates = { | |
| "avg_latency": self.latency_tracker.get_average(), | |
| "max_latency": max([m.system_metrics.get("latency", 0) for m in all_metrics]), | |
| "min_latency": min([m.system_metrics.get("latency", float("inf")) for m in all_metrics]), | |
| "throughput": self.throughput_tracker.get_throughput(), | |
| "error_rate": self.error_tracker.get_error_rate(), | |
| "total_queries": len(all_metrics), | |
| "total_time": total_time, | |
| } | |
| # User experience aggregates | |
| user_aggregates = { | |
| "avg_satisfaction": self.satisfaction_tracker.get_average_satisfaction(), | |
| "completion_rate": self.completion_tracker.get_completion_rate(), | |
| "citation_accuracy_rate": self.citation_tracker.get_accuracy_rate(), | |
| } | |
| return BenchmarkResults( | |
| total_queries=len(all_metrics), | |
| avg_retrieval_metrics=retrieval_aggregates, | |
| avg_generation_metrics=generation_aggregates, | |
| system_performance=system_aggregates, | |
| user_experience=user_aggregates, | |
| timestamp=time.time(), | |
| evaluation_time=total_time, | |
| ) | |
| def _save_results(self, benchmark_results: BenchmarkResults) -> None: | |
| """Save evaluation results to disk.""" | |
| output_dir = Path(self.config["output_dir"]) | |
| output_dir.mkdir(exist_ok=True) | |
| # Save benchmark summary | |
| benchmark_file = output_dir / f"benchmark_results_{int(time.time())}.json" | |
| with open(benchmark_file, "w") as f: | |
| json.dump(asdict(benchmark_results), f, indent=2) | |
| # Save detailed results | |
| detailed_file = output_dir / f"detailed_results_{int(time.time())}.json" | |
| detailed_results = [asdict(result) for result in self.results] | |
| with open(detailed_file, "w") as f: | |
| json.dump(detailed_results, f, indent=2) | |
| logger.info(f"Results saved to {output_dir}") | |
| def get_summary_report(self) -> str: | |
| """Generate a human-readable summary report.""" | |
| if not self.results: | |
| return "No evaluation results available." | |
| latest_benchmark = self._aggregate_results( | |
| [r.metrics for r in self.results], | |
| sum(r.metrics.system_metrics.get("latency", 0) for r in self.results), | |
| ) | |
| report = [] | |
| report.append("=" * 60) | |
| report.append("RAG SYSTEM EVALUATION SUMMARY") | |
| report.append("=" * 60) | |
| report.append(f"Total Queries Evaluated: {latest_benchmark.total_queries}") | |
| report.append(f"Evaluation Time: {latest_benchmark.evaluation_time:.2f}s") | |
| report.append("") | |
| # Retrieval Performance | |
| report.append("RETRIEVAL PERFORMANCE:") | |
| report.append("-" * 25) | |
| for metric, value in latest_benchmark.avg_retrieval_metrics.items(): | |
| report.append(f" {metric}: {value:.3f}") | |
| report.append("") | |
| # Generation Quality | |
| report.append("GENERATION QUALITY:") | |
| report.append("-" * 20) | |
| for metric, value in latest_benchmark.avg_generation_metrics.items(): | |
| report.append(f" {metric}: {value:.3f}") | |
| report.append("") | |
| # System Performance | |
| report.append("SYSTEM PERFORMANCE:") | |
| report.append("-" * 20) | |
| for metric, value in latest_benchmark.system_performance.items(): | |
| if isinstance(value, float): | |
| report.append(f" {metric}: {value:.3f}") | |
| else: | |
| report.append(f" {metric}: {value}") | |
| report.append("") | |
| # User Experience | |
| report.append("USER EXPERIENCE:") | |
| report.append("-" * 17) | |
| for metric, value in latest_benchmark.user_experience.items(): | |
| report.append(f" {metric}: {value:.3f}") | |
| report.append("=" * 60) | |
| return "\n".join(report) | |
| def load_test_queries(file_path: str) -> List[Dict[str, Any]]: | |
| """Load test queries from JSON file.""" | |
| with open(file_path, "r") as f: | |
| return json.load(f) | |
| if __name__ == "__main__": | |
| # Example usage | |
| logging.basicConfig(level=logging.INFO) | |
| # Initialize runner | |
| runner = EvaluationRunner() | |
| # Load test queries (replace with actual file) | |
| # test_queries = load_test_queries("evaluation/questions.json") | |
| # Mock test queries for demonstration | |
| test_queries = [ | |
| { | |
| "query_id": "test_1", | |
| "query": "What is the remote work policy?", | |
| "expected_docs": ["remote_work_policy.md"], | |
| "expected_answer": "Employees can work remotely up to 3 days per week.", | |
| "mock_retrieved_docs": ["remote_work_policy.md", "employee_handbook.md"], | |
| "mock_response": "Based on company policy, employees can work remotely up to 3 days per week.", | |
| } | |
| ] | |
| # Run evaluation | |
| results = runner.run_comprehensive_evaluation(test_queries) | |
| # Print summary | |
| print(runner.get_summary_report()) | |