Spaces:
Sleeping
Sleeping
| """TRACe evaluation metrics for RAG systems (per RAGBench paper: arXiv:2407.11005). | |
| TRACe Framework (4 metrics): | |
| - uTilization (T): Fraction of retrieved context the generator uses | |
| Formula: Utilization = Σ Len(U_i) / Σ Len(d_i) | |
| where U_i = utilized spans in doc d_i | |
| - Relevance (R): Fraction of retrieved context relevant to query | |
| Formula: Relevance = Σ Len(R_i) / Σ Len(d_i) | |
| where R_i = relevant spans in doc d_i | |
| - Adherence (A): Whether response is grounded in context (no hallucinations) | |
| Boolean/Span-level: All response claims must be supported by docs | |
| - Completeness (C): Fraction of relevant info covered by response | |
| Formula: Completeness = Len(R_i ∩ U_i) / Len(R_i) | |
| where R_i ∩ U_i = intersection of relevant AND utilized spans | |
| Note: This is a 4-metric framework. The stylization "TRACe" does not include a 5th "E=Evaluation" metric. | |
| GPT Labeling Integration: | |
| This module also supports advanced GPT-based labeling using sentence-level annotations | |
| to compute metrics more accurately than rule-based heuristics. See advanced_rag_evaluator.py | |
| for the detailed implementation. | |
| """ | |
| from typing import List, Dict, Optional | |
| import numpy as np | |
| from dataclasses import dataclass | |
| import re | |
| from collections import Counter | |
| class TRACEScores: | |
| """Container for TRACE evaluation scores.""" | |
| utilization: float | |
| relevance: float | |
| adherence: float | |
| completeness: float | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary.""" | |
| return { | |
| "utilization": self.utilization, | |
| "relevance": self.relevance, | |
| "adherence": self.adherence, | |
| "completeness": self.completeness, | |
| "average": self.average() | |
| } | |
| def average(self) -> float: | |
| """Calculate average score.""" | |
| return (self.utilization + self.relevance + | |
| self.adherence + self.completeness) / 4 | |
| class TRACEEvaluator: | |
| """TRACe evaluation metrics for RAG systems (per RAGBench paper arXiv:2407.11005).""" | |
| def __init__( | |
| self, | |
| llm_client=None, | |
| chunking_strategy: Optional[str] = None, | |
| embedding_model: Optional[str] = None, | |
| chunk_size: Optional[int] = None, | |
| chunk_overlap: Optional[int] = None | |
| ): | |
| """Initialize TRACe evaluator. | |
| Args: | |
| llm_client: Optional LLM client for LLM-based evaluation | |
| chunking_strategy: Chunking strategy used (e.g., 'dense', 'sparse', 'hybrid') | |
| embedding_model: Embedding model used for vector retrieval | |
| chunk_size: Size of chunks used | |
| chunk_overlap: Overlap size between chunks | |
| """ | |
| self.llm_client = llm_client | |
| self.chunking_strategy = chunking_strategy | |
| self.embedding_model = embedding_model | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| def evaluate( | |
| self, | |
| query: str, | |
| response: str, | |
| retrieved_documents: List[str], | |
| ground_truth: Optional[str] = None | |
| ) -> TRACEScores: | |
| """Evaluate a RAG response using TRACE metrics. | |
| Args: | |
| query: User query | |
| response: Generated response | |
| retrieved_documents: List of retrieved documents | |
| ground_truth: Optional ground truth answer | |
| Returns: | |
| TRACEScores object | |
| """ | |
| utilization = self._compute_utilization(response, retrieved_documents) | |
| relevance = self._compute_relevance(query, retrieved_documents) | |
| adherence = self._compute_adherence(response, retrieved_documents) | |
| completeness = self._compute_completeness(query, response, ground_truth) | |
| return TRACEScores( | |
| utilization=utilization, | |
| relevance=relevance, | |
| adherence=adherence, | |
| completeness=completeness | |
| ) | |
| def _compute_utilization( | |
| self, | |
| response: str, | |
| retrieved_documents: List[str] | |
| ) -> float: | |
| """Compute utilization score. | |
| Measures how well the system uses retrieved documents. | |
| Score based on: | |
| - Number of documents that contributed to the response | |
| - Proportion of retrieved documents used | |
| Args: | |
| response: Generated response | |
| retrieved_documents: List of retrieved documents | |
| Returns: | |
| Utilization score (0-1) | |
| """ | |
| if not retrieved_documents or not response: | |
| return 0.0 | |
| response_lower = response.lower() | |
| response_words = set(self._tokenize(response_lower)) | |
| # Count how many documents contributed | |
| docs_used = 0 | |
| total_overlap = 0 | |
| for doc in retrieved_documents: | |
| doc_lower = doc.lower() | |
| doc_words = set(self._tokenize(doc_lower)) | |
| # Check for significant overlap | |
| overlap = len(response_words & doc_words) | |
| if overlap > 5: # Threshold for significant contribution | |
| docs_used += 1 | |
| total_overlap += overlap | |
| # Score based on proportion of documents used | |
| proportion_used = docs_used / len(retrieved_documents) | |
| # Also consider depth of utilization | |
| avg_overlap = total_overlap / len(retrieved_documents) if retrieved_documents else 0 | |
| depth_score = min(avg_overlap / 20, 1.0) # Normalize | |
| # Combined score | |
| utilization_score = 0.6 * proportion_used + 0.4 * depth_score | |
| return min(utilization_score, 1.0) | |
| def _compute_relevance( | |
| self, | |
| query: str, | |
| retrieved_documents: List[str] | |
| ) -> float: | |
| """Compute relevance score. | |
| Measures relevance of retrieved documents to the query. | |
| Uses lexical overlap and keyword matching. | |
| Args: | |
| query: User query | |
| retrieved_documents: List of retrieved documents | |
| Returns: | |
| Relevance score (0-1) | |
| """ | |
| if not retrieved_documents or not query: | |
| return 0.0 | |
| query_lower = query.lower() | |
| query_words = set(self._tokenize(query_lower)) | |
| query_keywords = self._extract_keywords(query_lower) | |
| relevance_scores = [] | |
| for doc in retrieved_documents: | |
| doc_lower = doc.lower() | |
| doc_words = set(self._tokenize(doc_lower)) | |
| # Lexical overlap | |
| overlap = len(query_words & doc_words) | |
| overlap_score = overlap / len(query_words) if query_words else 0 | |
| # Keyword matching | |
| keyword_matches = sum(1 for kw in query_keywords if kw in doc_lower) | |
| keyword_score = keyword_matches / len(query_keywords) if query_keywords else 0 | |
| # Combined relevance for this document | |
| doc_relevance = 0.5 * overlap_score + 0.5 * keyword_score | |
| relevance_scores.append(doc_relevance) | |
| # Average relevance across documents | |
| return float(np.mean(relevance_scores)) | |
| def _compute_adherence( | |
| self, | |
| response: str, | |
| retrieved_documents: List[str] | |
| ) -> float: | |
| """Compute adherence score (Boolean: 0.0 = hallucinated, 1.0 = grounded). | |
| Per RAGBench paper: Adherence is whether ALL response claims are grounded. | |
| Example-level: Boolean indicating if entire response is supported by documents. | |
| Args: | |
| response: Generated response | |
| retrieved_documents: List of retrieved documents | |
| Returns: | |
| Adherence score (1.0 = fully grounded, 0.0 = contains hallucinations) | |
| """ | |
| if not retrieved_documents or not response: | |
| return 0.0 | |
| # Combine all documents | |
| combined_docs = " ".join(retrieved_documents).lower() | |
| doc_words = set(self._tokenize(combined_docs)) | |
| # Analyze response | |
| response_lower = response.lower() | |
| response_sentences = self._split_sentences(response_lower) | |
| if not response_sentences: | |
| return 0.0 | |
| # Check if ALL sentences are grounded (Boolean logic per paper) | |
| # If ANY sentence has low grounding, response contains hallucination | |
| grounding_threshold = 0.5 # At least 50% of words must be in docs | |
| all_grounded = True | |
| for sentence in response_sentences: | |
| sentence_words = set(self._tokenize(sentence)) | |
| if not sentence_words: # Skip empty sentences | |
| continue | |
| # Check what proportion of sentence words appear in documents | |
| grounded_words = len(sentence_words & doc_words) | |
| grounding_ratio = grounded_words / len(sentence_words) | |
| # If any sentence is below threshold, mark as hallucinated | |
| if grounding_ratio < grounding_threshold: | |
| all_grounded = False | |
| break | |
| # Return Boolean: 1.0 if fully grounded, 0.0 if contains hallucination | |
| return 1.0 if all_grounded else 0.0 | |
| def _compute_completeness( | |
| self, | |
| query: str, | |
| response: str, | |
| ground_truth: Optional[str] = None | |
| ) -> float: | |
| """Compute completeness score. | |
| Per RAGBench: Completeness = Len(R_i ∩ U_i) / Len(R_i) | |
| How much of the relevant information is covered by the response. | |
| Args: | |
| query: User query | |
| response: Generated response | |
| ground_truth: Optional ground truth answer | |
| Returns: | |
| Completeness score (0-1) | |
| """ | |
| if not response or not query: | |
| return 0.0 | |
| response_lower = response.lower() | |
| response_words = set(self._tokenize(response_lower)) | |
| if not response_words: | |
| return 0.0 | |
| completeness_scores = [] | |
| # Score 1: Response length (must have substantive content) | |
| min_content_words = 10 # At least 10 meaningful words | |
| length_score = min(len(response_words) / min_content_words, 1.0) | |
| completeness_scores.append(length_score * 0.3) # Weight: 30% | |
| # Score 2: Ground truth coverage (if available) | |
| if ground_truth: | |
| gt_lower = ground_truth.lower() | |
| gt_words = set(self._tokenize(gt_lower)) | |
| if gt_words: | |
| # Completeness = intersection / relevant_set | |
| # How much of ground truth info is in response | |
| overlap = len(gt_words & response_words) | |
| gt_coverage = overlap / len(gt_words) | |
| completeness_scores.append(gt_coverage * 0.7) # Weight: 70% | |
| else: | |
| completeness_scores.append(0.0) | |
| else: | |
| # Without ground truth, use query type matching heuristic | |
| query_lower = query.lower() | |
| # Check for key information based on query type | |
| answer_patterns = { | |
| "what": ["is", "are", "can", "does"], | |
| "when": ["year", "date", "time", "century", "period"], | |
| "where": ["location", "place", "country", "city", "region"], | |
| "who": ["person", "people", "name", "character"], | |
| "why": ["because", "due", "reason", "cause"], | |
| "how": ["method", "process", "step", "way"] | |
| } | |
| base_score = 0.3 # Default if no query type match | |
| for q_type, keywords in answer_patterns.items(): | |
| if q_type in query_lower: | |
| # Check if response contains relevant keywords | |
| keyword_matches = sum(1 for kw in keywords if kw in response_lower) | |
| if keyword_matches > 0: | |
| base_score = 0.7 | |
| break | |
| completeness_scores.append(base_score) | |
| # Return average completeness | |
| return float(np.mean(completeness_scores)) if completeness_scores else 0.0 | |
| def _tokenize(self, text: str) -> List[str]: | |
| """Tokenize text into words.""" | |
| # Remove punctuation and split | |
| text = re.sub(r'[^\w\s]', ' ', text) | |
| words = text.split() | |
| # Filter out very short words and common stop words | |
| stop_words = {"a", "an", "the", "is", "are", "was", "were", "in", "on", "at", "to", "for"} | |
| return [w for w in words if len(w) > 2 and w not in stop_words] | |
| def _extract_keywords(self, text: str) -> List[str]: | |
| """Extract keywords from text.""" | |
| words = self._tokenize(text) | |
| # Simple keyword extraction - words that appear in query | |
| # In production, use TF-IDF or similar | |
| word_freq = Counter(words) | |
| # Return words that appear at least once | |
| return list(word_freq.keys()) | |
| def _split_sentences(self, text: str) -> List[str]: | |
| """Split text into sentences.""" | |
| # Simple sentence splitting | |
| sentences = re.split(r'[.!?]+', text) | |
| return [s.strip() for s in sentences if s.strip()] | |
| def evaluate_batch( | |
| self, | |
| test_data: List[Dict] | |
| ) -> Dict: | |
| """Evaluate multiple test cases. | |
| Args: | |
| test_data: List of test cases, each containing: | |
| - query: User query | |
| - response: Generated response | |
| - retrieved_documents: Retrieved documents | |
| - ground_truth: Ground truth answer (optional) | |
| Returns: | |
| Dictionary with aggregated scores and metadata, plus detailed per-query info | |
| """ | |
| all_scores = [] | |
| detailed_results = [] | |
| for i, test_case in enumerate(test_data): | |
| print(f"Evaluating test case {i+1}/{len(test_data)}") | |
| query = test_case.get("query", "") | |
| response = test_case.get("response", "") | |
| retrieved_documents = test_case.get("retrieved_documents", []) | |
| ground_truth = test_case.get("ground_truth") | |
| scores = self.evaluate( | |
| query=query, | |
| response=response, | |
| retrieved_documents=retrieved_documents, | |
| ground_truth=ground_truth | |
| ) | |
| all_scores.append(scores) | |
| # Store detailed information for each query | |
| detailed_results.append({ | |
| "query_id": i + 1, | |
| "question": query, | |
| "llm_response": response, | |
| "retrieved_documents": retrieved_documents, | |
| "ground_truth": ground_truth, | |
| "metrics": { | |
| "utilization": float(scores.utilization), | |
| "relevance": float(scores.relevance), | |
| "adherence": float(scores.adherence), | |
| "completeness": float(scores.completeness), | |
| "average": float(scores.average()) | |
| } | |
| }) | |
| # Aggregate scores | |
| avg_utilization = np.mean([s.utilization for s in all_scores]) | |
| avg_relevance = np.mean([s.relevance for s in all_scores]) | |
| avg_adherence = np.mean([s.adherence for s in all_scores]) | |
| avg_completeness = np.mean([s.completeness for s in all_scores]) | |
| results = { | |
| "utilization": float(avg_utilization), | |
| "relevance": float(avg_relevance), | |
| "adherence": float(avg_adherence), | |
| "completeness": float(avg_completeness), | |
| "average": float((avg_utilization + avg_relevance + | |
| avg_adherence + avg_completeness) / 4), | |
| "num_samples": len(test_data), | |
| "individual_scores": [s.to_dict() for s in all_scores], | |
| # Include detailed per-query information | |
| "detailed_results": detailed_results, | |
| # Include evaluation metadata for reproducibility | |
| "evaluation_config": { | |
| "chunking_strategy": self.chunking_strategy, | |
| "embedding_model": self.embedding_model, | |
| "chunk_size": self.chunk_size, | |
| "chunk_overlap": self.chunk_overlap | |
| } | |
| } | |
| return results | |