Spaces:
Build error
Build error
| """ | |
| Evaluator - RAG-The-Game-Changer | |
| Comprehensive evaluation orchestrator for RAG systems. | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Any, Dict, List, Optional | |
| from dataclasses import dataclass, field | |
| from .metrics import MetricsCalculator | |
| from .hallucination_detection import HallucinationDetector | |
| from .benchmarks import BenchmarkRunner | |
| logger = logging.getLogger(__name__) | |
| class EvaluationConfig: | |
| """Configuration for evaluation runs.""" | |
| datasets: Dict[str, List[Dict]] = field(default_factory=dict) | |
| metrics: List[str] = field( | |
| default_factory=lambda: ["precision", "recall", "ndcg", "rouge", "bertscore"] | |
| ) | |
| benchmarks: List[str] = field(default_factory=list) | |
| top_k_values: List[int] = field(default_factory=lambda: [5, 10, 20]) | |
| enable_hallucination_check: bool = True | |
| enable_quality_assessment: bool = True | |
| class EvaluationResult: | |
| """Result from evaluation run.""" | |
| rag_pipeline_id: str | |
| overall_score: float | |
| metric_scores: Dict[str, float] | |
| benchmark_results: List[Dict[str, Any]] | |
| hallucination_stats: Dict[str, Any] | |
| quality_score: float | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| evaluation_time_ms: float | |
| class Evaluator: | |
| """Main evaluation orchestrator for RAG systems.""" | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| self.config = config or {} | |
| self.eval_config = EvaluationConfig(**self.config) | |
| self.metrics_calculator = MetricsCalculator() | |
| self.hallucination_detector = ( | |
| HallucinationDetector() if self.eval_config.enable_hallucination_check else None | |
| ) | |
| self.benchmark_runner = BenchmarkRunner(self.config.get("benchmark_config")) | |
| async def evaluate(self, rag_pipeline, test_data: Dict[str, List[Dict]]) -> EvaluationResult: | |
| """Run comprehensive evaluation of RAG pipeline.""" | |
| start_time = asyncio.get_event_loop().time() | |
| logger.info(f"Starting evaluation for {self.eval_config.metrics} metrics") | |
| # Initialize results | |
| metric_scores = {} | |
| benchmark_results = [] | |
| hallucination_stats = {} | |
| quality_score = 0.0 | |
| # 1. Run metrics-based evaluation | |
| metric_scores = await self._evaluate_metrics(rag_pipeline, test_data) | |
| # 2. Run benchmarks | |
| if self.eval_config.benchmarks: | |
| benchmark_results = await self.benchmark_runner.run_all(rag_pipeline, test_data) | |
| # 3. Check for hallucinations | |
| if self.hallucination_detector: | |
| hallucination_stats = await self._evaluate_hallucinations(rag_pipeline, test_data) | |
| # 4. Quality assessment | |
| if self.eval_config.enable_quality_assessment: | |
| quality_score = await self._assess_quality(rag_pipeline, test_data) | |
| # Calculate overall score | |
| overall_score = self._calculate_overall_score( | |
| metric_scores, benchmark_results, hallucination_stats, quality_score | |
| ) | |
| evaluation_time = (asyncio.get_event_loop().time() - start_time) * 1000 | |
| result = EvaluationResult( | |
| rag_pipeline_id=str(id(rag_pipeline)), | |
| overall_score=overall_score, | |
| metric_scores=metric_scores, | |
| benchmark_results=[ | |
| {"name": r.get("name"), "score": r.get("score"), "details": r.get("details")} | |
| for r in benchmark_results | |
| ], | |
| hallucination_stats=hallucination_stats, | |
| quality_score=quality_score, | |
| metadata={ | |
| "config": self.eval_config.metrics, | |
| "top_k_values": self.eval_config.top_k_values, | |
| }, | |
| evaluation_time_ms=evaluation_time, | |
| ) | |
| logger.info(f"Evaluation complete. Overall score: {overall_score:.4f}") | |
| return result | |
| async def _evaluate_metrics( | |
| self, rag_pipeline, test_data: Dict[str, List[Dict]] | |
| ) -> Dict[str, float]: | |
| """Evaluate RAG pipeline using configured metrics.""" | |
| scores = {} | |
| for metric in self.eval_config.metrics: | |
| try: | |
| score = await self.metrics_calculator.calculate_metric( | |
| metric=metric, | |
| rag_pipeline=rag_pipeline, | |
| test_data=test_data, | |
| top_k_values=self.eval_config.top_k_values, | |
| ) | |
| scores[metric] = score | |
| logger.info(f"Metric {metric}: {score:.4f}") | |
| except Exception as e: | |
| logger.error(f"Error calculating metric {metric}: {e}") | |
| scores[metric] = 0.0 | |
| return scores | |
| async def _evaluate_hallucinations( | |
| self, rag_pipeline, test_data: Dict[str, List[Dict]] | |
| ) -> Dict[str, Any]: | |
| """Evaluate hallucination rate of RAG pipeline.""" | |
| if not self.hallucination_detector: | |
| return {} | |
| all_queries = [] | |
| for dataset_queries in test_data.values(): | |
| all_queries.extend(dataset_queries[:50]) # Sample 50 queries per dataset | |
| hallucinated = 0 | |
| total = 0 | |
| detailed_results = [] | |
| for item in all_queries: | |
| try: | |
| query = item.get("query", "") | |
| result = await rag_pipeline.query(query=query, top_k=5) | |
| answer = result.answer | |
| retrieved_contexts = [chunk.get("content") for chunk in result.retrieved_chunks] | |
| # Check for hallucination | |
| is_hallucinated = await self.hallucination_detector.detect_hallucination( | |
| query=query, answer=answer, contexts=retrieved_contexts | |
| ) | |
| if is_hallucinated: | |
| hallucinated += 1 | |
| total += 1 | |
| detailed_results.append( | |
| { | |
| "query": query, | |
| "answer": answer, | |
| "hallucinated": is_hallucinated, | |
| "confidence": result.confidence, | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error checking hallucination: {e}") | |
| continue | |
| hallucination_rate = hallucinated / total if total > 0 else 0 | |
| stats = { | |
| "total_queries": total, | |
| "hallucinated_count": hallucinated, | |
| "hallucination_rate": hallucination_rate, | |
| "results": detailed_results, | |
| } | |
| logger.info(f"Hallucination rate: {hallucination_rate:.2%}") | |
| return stats | |
| async def _assess_quality(self, rag_pipeline, test_data: Dict[str, List[Dict]]) -> float: | |
| """Assess overall quality of RAG responses.""" | |
| all_queries = [] | |
| for dataset_queries in test_data.values(): | |
| all_queries.extend(dataset_queries[:50]) | |
| quality_scores = [] | |
| for item in all_queries: | |
| try: | |
| query = item.get("query", "") | |
| result = await rag_pipeline.query(query=query, top_k=5) | |
| answer = result.answer | |
| retrieved_chunks = result.retrieved_chunks | |
| # Assess quality | |
| relevance_score = self._assess_relevance(query, answer, retrieved_chunks) | |
| coherence_score = self._assess_coherence(answer) | |
| completeness_score = self._assess_completeness(query, answer) | |
| quality = (relevance_score + coherence_score + completeness_score) / 3 | |
| quality_scores.append(quality) | |
| except Exception as e: | |
| logger.error(f"Error assessing quality: {e}") | |
| quality_scores.append(0.0) | |
| avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0 | |
| logger.info(f"Average quality score: {avg_quality:.4f}") | |
| return avg_quality | |
| def _assess_relevance(self, query: str, answer: str, contexts: List) -> float: | |
| """Assess relevance of answer to query.""" | |
| query_lower = query.lower() | |
| answer_lower = answer.lower() | |
| # Simple keyword overlap | |
| query_words = set(query_lower.split()) | |
| answer_words = set(answer_lower.split()) | |
| context_words = set(" ".join([c.get("content", "") for c in contexts]).lower().split()) | |
| if len(query_words) == 0: | |
| return 0.5 | |
| query_overlap = len(answer_words & query_words) / len(query_words) | |
| context_overlap = ( | |
| len(answer_words & context_words) / len(context_words) if context_words else 0 | |
| ) | |
| return (query_overlap + context_overlap) / 2 | |
| def _assess_coherence(self, answer: str) -> float: | |
| """Assess coherence of generated answer.""" | |
| sentences = answer.split(".") | |
| if len(sentences) <= 1: | |
| return 1.0 | |
| # Check for contradictions | |
| score = 1.0 | |
| for i in range(len(sentences) - 1): | |
| s1_words = set(sentences[i].lower().split()) | |
| s2_words = set(sentences[i + 1].lower().split()) | |
| # If sentences share no words, might be incoherent | |
| if len(s1_words & s2_words) == 0: | |
| score -= 0.2 | |
| return max(0.0, score) | |
| def _assess_completeness(self, query: str, answer: str) -> float: | |
| """Assess completeness of answer relative to query.""" | |
| query_words = set(query.lower().split()) | |
| answer_words = set(answer.lower().split()) | |
| if len(query_words) == 0: | |
| return 1.0 | |
| # How much of query is addressed | |
| addressed = len(query_words & answer_words) / len(query_words) | |
| return min(1.0, addressed + 0.2) # Bonus for covering all query aspects | |
| def _calculate_overall_score( | |
| self, | |
| metric_scores: Dict[str, float], | |
| benchmark_results: List[Dict], | |
| hallucination_stats: Dict, | |
| quality_score: float, | |
| ) -> float: | |
| """Calculate weighted overall evaluation score.""" | |
| weights = {"metrics": 0.4, "benchmarks": 0.3, "hallucination": 0.2, "quality": 0.1} | |
| # Metric score (average of all metrics) | |
| if metric_scores: | |
| metric_avg = sum(metric_scores.values()) / len(metric_scores) | |
| else: | |
| metric_avg = 0.0 | |
| # Benchmark score (average of all benchmarks) | |
| if benchmark_results: | |
| benchmark_avg = sum(r.get("score", 0) for r in benchmark_results) / len( | |
| benchmark_results | |
| ) | |
| else: | |
| benchmark_avg = 0.0 | |
| # Hallucination score (1 - hallucination_rate) | |
| hallucination_rate = hallucination_stats.get("hallucination_rate", 0) | |
| hallucination_score = 1.0 - hallucination_rate | |
| # Weighted average | |
| overall = ( | |
| weights["metrics"] * metric_avg | |
| + weights["benchmarks"] * benchmark_avg | |
| + weights["hallucination"] * hallucination_score | |
| + weights["quality"] * quality_score | |
| ) | |
| return overall | |
| def generate_report(self, result: EvaluationResult) -> str: | |
| """Generate human-readable evaluation report.""" | |
| lines = [ | |
| "=" * 80, | |
| "RAG PIPELINE EVALUATION REPORT", | |
| "=" * 80, | |
| "", | |
| f"Pipeline ID: {result.rag_pipeline_id}", | |
| f"Overall Score: {result.overall_score:.4f}", | |
| f"Quality Score: {result.quality_score:.4f}", | |
| f"Evaluation Time: {result.evaluation_time_ms:.2f}ms", | |
| "", | |
| "-" * 80, | |
| "METRIC SCORES", | |
| "-" * 80, | |
| ] | |
| for metric, score in result.metric_scores.items(): | |
| lines.append(f" {metric.upper()}: {score:.4f}") | |
| lines.extend( | |
| [ | |
| "", | |
| "-" * 80, | |
| "HALLUCINATION STATS", | |
| "-" * 80, | |
| f" Total Queries: {result.hallucination_stats.get('total_queries', 0)}", | |
| f" Hallucinated: {result.hallucination_stats.get('hallucinated_count', 0)}", | |
| f" Hallucination Rate: {result.hallucination_stats.get('hallucination_rate', 0):.2%}", | |
| "", | |
| "-" * 80, | |
| "BENCHMARK RESULTS", | |
| "-" * 80, | |
| ] | |
| ) | |
| for bench in result.benchmark_results: | |
| lines.append(f" {bench['name']}: {bench['score']:.4f}") | |
| lines.extend( | |
| [ | |
| "", | |
| "=" * 80, | |
| "END OF REPORT", | |
| "=" * 80, | |
| ] | |
| ) | |
| return "\n".join(lines) | |