hugging2021's picture
Upload folder using huggingface_hub
40f6dcf verified
"""
Evaluator - RAG-The-Game-Changer
Comprehensive evaluation orchestrator for RAG systems.
"""
import asyncio
import logging
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from .metrics import MetricsCalculator
from .hallucination_detection import HallucinationDetector
from .benchmarks import BenchmarkRunner
logger = logging.getLogger(__name__)
@dataclass
class EvaluationConfig:
"""Configuration for evaluation runs."""
datasets: Dict[str, List[Dict]] = field(default_factory=dict)
metrics: List[str] = field(
default_factory=lambda: ["precision", "recall", "ndcg", "rouge", "bertscore"]
)
benchmarks: List[str] = field(default_factory=list)
top_k_values: List[int] = field(default_factory=lambda: [5, 10, 20])
enable_hallucination_check: bool = True
enable_quality_assessment: bool = True
@dataclass
class EvaluationResult:
"""Result from evaluation run."""
rag_pipeline_id: str
overall_score: float
metric_scores: Dict[str, float]
benchmark_results: List[Dict[str, Any]]
hallucination_stats: Dict[str, Any]
quality_score: float
metadata: Dict[str, Any] = field(default_factory=dict)
evaluation_time_ms: float
class Evaluator:
"""Main evaluation orchestrator for RAG systems."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.eval_config = EvaluationConfig(**self.config)
self.metrics_calculator = MetricsCalculator()
self.hallucination_detector = (
HallucinationDetector() if self.eval_config.enable_hallucination_check else None
)
self.benchmark_runner = BenchmarkRunner(self.config.get("benchmark_config"))
async def evaluate(self, rag_pipeline, test_data: Dict[str, List[Dict]]) -> EvaluationResult:
"""Run comprehensive evaluation of RAG pipeline."""
start_time = asyncio.get_event_loop().time()
logger.info(f"Starting evaluation for {self.eval_config.metrics} metrics")
# Initialize results
metric_scores = {}
benchmark_results = []
hallucination_stats = {}
quality_score = 0.0
# 1. Run metrics-based evaluation
metric_scores = await self._evaluate_metrics(rag_pipeline, test_data)
# 2. Run benchmarks
if self.eval_config.benchmarks:
benchmark_results = await self.benchmark_runner.run_all(rag_pipeline, test_data)
# 3. Check for hallucinations
if self.hallucination_detector:
hallucination_stats = await self._evaluate_hallucinations(rag_pipeline, test_data)
# 4. Quality assessment
if self.eval_config.enable_quality_assessment:
quality_score = await self._assess_quality(rag_pipeline, test_data)
# Calculate overall score
overall_score = self._calculate_overall_score(
metric_scores, benchmark_results, hallucination_stats, quality_score
)
evaluation_time = (asyncio.get_event_loop().time() - start_time) * 1000
result = EvaluationResult(
rag_pipeline_id=str(id(rag_pipeline)),
overall_score=overall_score,
metric_scores=metric_scores,
benchmark_results=[
{"name": r.get("name"), "score": r.get("score"), "details": r.get("details")}
for r in benchmark_results
],
hallucination_stats=hallucination_stats,
quality_score=quality_score,
metadata={
"config": self.eval_config.metrics,
"top_k_values": self.eval_config.top_k_values,
},
evaluation_time_ms=evaluation_time,
)
logger.info(f"Evaluation complete. Overall score: {overall_score:.4f}")
return result
async def _evaluate_metrics(
self, rag_pipeline, test_data: Dict[str, List[Dict]]
) -> Dict[str, float]:
"""Evaluate RAG pipeline using configured metrics."""
scores = {}
for metric in self.eval_config.metrics:
try:
score = await self.metrics_calculator.calculate_metric(
metric=metric,
rag_pipeline=rag_pipeline,
test_data=test_data,
top_k_values=self.eval_config.top_k_values,
)
scores[metric] = score
logger.info(f"Metric {metric}: {score:.4f}")
except Exception as e:
logger.error(f"Error calculating metric {metric}: {e}")
scores[metric] = 0.0
return scores
async def _evaluate_hallucinations(
self, rag_pipeline, test_data: Dict[str, List[Dict]]
) -> Dict[str, Any]:
"""Evaluate hallucination rate of RAG pipeline."""
if not self.hallucination_detector:
return {}
all_queries = []
for dataset_queries in test_data.values():
all_queries.extend(dataset_queries[:50]) # Sample 50 queries per dataset
hallucinated = 0
total = 0
detailed_results = []
for item in all_queries:
try:
query = item.get("query", "")
result = await rag_pipeline.query(query=query, top_k=5)
answer = result.answer
retrieved_contexts = [chunk.get("content") for chunk in result.retrieved_chunks]
# Check for hallucination
is_hallucinated = await self.hallucination_detector.detect_hallucination(
query=query, answer=answer, contexts=retrieved_contexts
)
if is_hallucinated:
hallucinated += 1
total += 1
detailed_results.append(
{
"query": query,
"answer": answer,
"hallucinated": is_hallucinated,
"confidence": result.confidence,
}
)
except Exception as e:
logger.error(f"Error checking hallucination: {e}")
continue
hallucination_rate = hallucinated / total if total > 0 else 0
stats = {
"total_queries": total,
"hallucinated_count": hallucinated,
"hallucination_rate": hallucination_rate,
"results": detailed_results,
}
logger.info(f"Hallucination rate: {hallucination_rate:.2%}")
return stats
async def _assess_quality(self, rag_pipeline, test_data: Dict[str, List[Dict]]) -> float:
"""Assess overall quality of RAG responses."""
all_queries = []
for dataset_queries in test_data.values():
all_queries.extend(dataset_queries[:50])
quality_scores = []
for item in all_queries:
try:
query = item.get("query", "")
result = await rag_pipeline.query(query=query, top_k=5)
answer = result.answer
retrieved_chunks = result.retrieved_chunks
# Assess quality
relevance_score = self._assess_relevance(query, answer, retrieved_chunks)
coherence_score = self._assess_coherence(answer)
completeness_score = self._assess_completeness(query, answer)
quality = (relevance_score + coherence_score + completeness_score) / 3
quality_scores.append(quality)
except Exception as e:
logger.error(f"Error assessing quality: {e}")
quality_scores.append(0.0)
avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0
logger.info(f"Average quality score: {avg_quality:.4f}")
return avg_quality
def _assess_relevance(self, query: str, answer: str, contexts: List) -> float:
"""Assess relevance of answer to query."""
query_lower = query.lower()
answer_lower = answer.lower()
# Simple keyword overlap
query_words = set(query_lower.split())
answer_words = set(answer_lower.split())
context_words = set(" ".join([c.get("content", "") for c in contexts]).lower().split())
if len(query_words) == 0:
return 0.5
query_overlap = len(answer_words & query_words) / len(query_words)
context_overlap = (
len(answer_words & context_words) / len(context_words) if context_words else 0
)
return (query_overlap + context_overlap) / 2
def _assess_coherence(self, answer: str) -> float:
"""Assess coherence of generated answer."""
sentences = answer.split(".")
if len(sentences) <= 1:
return 1.0
# Check for contradictions
score = 1.0
for i in range(len(sentences) - 1):
s1_words = set(sentences[i].lower().split())
s2_words = set(sentences[i + 1].lower().split())
# If sentences share no words, might be incoherent
if len(s1_words & s2_words) == 0:
score -= 0.2
return max(0.0, score)
def _assess_completeness(self, query: str, answer: str) -> float:
"""Assess completeness of answer relative to query."""
query_words = set(query.lower().split())
answer_words = set(answer.lower().split())
if len(query_words) == 0:
return 1.0
# How much of query is addressed
addressed = len(query_words & answer_words) / len(query_words)
return min(1.0, addressed + 0.2) # Bonus for covering all query aspects
def _calculate_overall_score(
self,
metric_scores: Dict[str, float],
benchmark_results: List[Dict],
hallucination_stats: Dict,
quality_score: float,
) -> float:
"""Calculate weighted overall evaluation score."""
weights = {"metrics": 0.4, "benchmarks": 0.3, "hallucination": 0.2, "quality": 0.1}
# Metric score (average of all metrics)
if metric_scores:
metric_avg = sum(metric_scores.values()) / len(metric_scores)
else:
metric_avg = 0.0
# Benchmark score (average of all benchmarks)
if benchmark_results:
benchmark_avg = sum(r.get("score", 0) for r in benchmark_results) / len(
benchmark_results
)
else:
benchmark_avg = 0.0
# Hallucination score (1 - hallucination_rate)
hallucination_rate = hallucination_stats.get("hallucination_rate", 0)
hallucination_score = 1.0 - hallucination_rate
# Weighted average
overall = (
weights["metrics"] * metric_avg
+ weights["benchmarks"] * benchmark_avg
+ weights["hallucination"] * hallucination_score
+ weights["quality"] * quality_score
)
return overall
def generate_report(self, result: EvaluationResult) -> str:
"""Generate human-readable evaluation report."""
lines = [
"=" * 80,
"RAG PIPELINE EVALUATION REPORT",
"=" * 80,
"",
f"Pipeline ID: {result.rag_pipeline_id}",
f"Overall Score: {result.overall_score:.4f}",
f"Quality Score: {result.quality_score:.4f}",
f"Evaluation Time: {result.evaluation_time_ms:.2f}ms",
"",
"-" * 80,
"METRIC SCORES",
"-" * 80,
]
for metric, score in result.metric_scores.items():
lines.append(f" {metric.upper()}: {score:.4f}")
lines.extend(
[
"",
"-" * 80,
"HALLUCINATION STATS",
"-" * 80,
f" Total Queries: {result.hallucination_stats.get('total_queries', 0)}",
f" Hallucinated: {result.hallucination_stats.get('hallucinated_count', 0)}",
f" Hallucination Rate: {result.hallucination_stats.get('hallucination_rate', 0):.2%}",
"",
"-" * 80,
"BENCHMARK RESULTS",
"-" * 80,
]
)
for bench in result.benchmark_results:
lines.append(f" {bench['name']}: {bench['score']:.4f}")
lines.extend(
[
"",
"=" * 80,
"END OF REPORT",
"=" * 80,
]
)
return "\n".join(lines)