"""TRACe evaluation metrics for RAG systems (per RAGBench paper: arXiv:2407.11005). TRACe Framework (4 metrics): - uTilization (T): Fraction of retrieved context the generator uses Formula: Utilization = Σ Len(U_i) / Σ Len(d_i) where U_i = utilized spans in doc d_i - Relevance (R): Fraction of retrieved context relevant to query Formula: Relevance = Σ Len(R_i) / Σ Len(d_i) where R_i = relevant spans in doc d_i - Adherence (A): Whether response is grounded in context (no hallucinations) Boolean/Span-level: All response claims must be supported by docs - Completeness (C): Fraction of relevant info covered by response Formula: Completeness = Len(R_i ∩ U_i) / Len(R_i) where R_i ∩ U_i = intersection of relevant AND utilized spans Note: This is a 4-metric framework. The stylization "TRACe" does not include a 5th "E=Evaluation" metric. GPT Labeling Integration: This module also supports advanced GPT-based labeling using sentence-level annotations to compute metrics more accurately than rule-based heuristics. See advanced_rag_evaluator.py for the detailed implementation. """ from typing import List, Dict, Optional import numpy as np from dataclasses import dataclass import re from collections import Counter @dataclass class TRACEScores: """Container for TRACE evaluation scores.""" utilization: float relevance: float adherence: float completeness: float def to_dict(self) -> Dict: """Convert to dictionary.""" return { "utilization": self.utilization, "relevance": self.relevance, "adherence": self.adherence, "completeness": self.completeness, "average": self.average() } def average(self) -> float: """Calculate average score.""" return (self.utilization + self.relevance + self.adherence + self.completeness) / 4 class TRACEEvaluator: """TRACe evaluation metrics for RAG systems (per RAGBench paper arXiv:2407.11005).""" def __init__( self, llm_client=None, chunking_strategy: Optional[str] = None, embedding_model: Optional[str] = None, chunk_size: Optional[int] = None, chunk_overlap: Optional[int] = None ): """Initialize TRACe evaluator. Args: llm_client: Optional LLM client for LLM-based evaluation chunking_strategy: Chunking strategy used (e.g., 'dense', 'sparse', 'hybrid') embedding_model: Embedding model used for vector retrieval chunk_size: Size of chunks used chunk_overlap: Overlap size between chunks """ self.llm_client = llm_client self.chunking_strategy = chunking_strategy self.embedding_model = embedding_model self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def evaluate( self, query: str, response: str, retrieved_documents: List[str], ground_truth: Optional[str] = None ) -> TRACEScores: """Evaluate a RAG response using TRACE metrics. Args: query: User query response: Generated response retrieved_documents: List of retrieved documents ground_truth: Optional ground truth answer Returns: TRACEScores object """ utilization = self._compute_utilization(response, retrieved_documents) relevance = self._compute_relevance(query, retrieved_documents) adherence = self._compute_adherence(response, retrieved_documents) completeness = self._compute_completeness(query, response, ground_truth) return TRACEScores( utilization=utilization, relevance=relevance, adherence=adherence, completeness=completeness ) def _compute_utilization( self, response: str, retrieved_documents: List[str] ) -> float: """Compute utilization score. Measures how well the system uses retrieved documents. Score based on: - Number of documents that contributed to the response - Proportion of retrieved documents used Args: response: Generated response retrieved_documents: List of retrieved documents Returns: Utilization score (0-1) """ if not retrieved_documents or not response: return 0.0 response_lower = response.lower() response_words = set(self._tokenize(response_lower)) # Count how many documents contributed docs_used = 0 total_overlap = 0 for doc in retrieved_documents: doc_lower = doc.lower() doc_words = set(self._tokenize(doc_lower)) # Check for significant overlap overlap = len(response_words & doc_words) if overlap > 5: # Threshold for significant contribution docs_used += 1 total_overlap += overlap # Score based on proportion of documents used proportion_used = docs_used / len(retrieved_documents) # Also consider depth of utilization avg_overlap = total_overlap / len(retrieved_documents) if retrieved_documents else 0 depth_score = min(avg_overlap / 20, 1.0) # Normalize # Combined score utilization_score = 0.6 * proportion_used + 0.4 * depth_score return min(utilization_score, 1.0) def _compute_relevance( self, query: str, retrieved_documents: List[str] ) -> float: """Compute relevance score. Measures relevance of retrieved documents to the query. Uses lexical overlap and keyword matching. Args: query: User query retrieved_documents: List of retrieved documents Returns: Relevance score (0-1) """ if not retrieved_documents or not query: return 0.0 query_lower = query.lower() query_words = set(self._tokenize(query_lower)) query_keywords = self._extract_keywords(query_lower) relevance_scores = [] for doc in retrieved_documents: doc_lower = doc.lower() doc_words = set(self._tokenize(doc_lower)) # Lexical overlap overlap = len(query_words & doc_words) overlap_score = overlap / len(query_words) if query_words else 0 # Keyword matching keyword_matches = sum(1 for kw in query_keywords if kw in doc_lower) keyword_score = keyword_matches / len(query_keywords) if query_keywords else 0 # Combined relevance for this document doc_relevance = 0.5 * overlap_score + 0.5 * keyword_score relevance_scores.append(doc_relevance) # Average relevance across documents return float(np.mean(relevance_scores)) def _compute_adherence( self, response: str, retrieved_documents: List[str] ) -> float: """Compute adherence score (Boolean: 0.0 = hallucinated, 1.0 = grounded). Per RAGBench paper: Adherence is whether ALL response claims are grounded. Example-level: Boolean indicating if entire response is supported by documents. Args: response: Generated response retrieved_documents: List of retrieved documents Returns: Adherence score (1.0 = fully grounded, 0.0 = contains hallucinations) """ if not retrieved_documents or not response: return 0.0 # Combine all documents combined_docs = " ".join(retrieved_documents).lower() doc_words = set(self._tokenize(combined_docs)) # Analyze response response_lower = response.lower() response_sentences = self._split_sentences(response_lower) if not response_sentences: return 0.0 # Check if ALL sentences are grounded (Boolean logic per paper) # If ANY sentence has low grounding, response contains hallucination grounding_threshold = 0.5 # At least 50% of words must be in docs all_grounded = True for sentence in response_sentences: sentence_words = set(self._tokenize(sentence)) if not sentence_words: # Skip empty sentences continue # Check what proportion of sentence words appear in documents grounded_words = len(sentence_words & doc_words) grounding_ratio = grounded_words / len(sentence_words) # If any sentence is below threshold, mark as hallucinated if grounding_ratio < grounding_threshold: all_grounded = False break # Return Boolean: 1.0 if fully grounded, 0.0 if contains hallucination return 1.0 if all_grounded else 0.0 def _compute_completeness( self, query: str, response: str, ground_truth: Optional[str] = None ) -> float: """Compute completeness score. Per RAGBench: Completeness = Len(R_i ∩ U_i) / Len(R_i) How much of the relevant information is covered by the response. Args: query: User query response: Generated response ground_truth: Optional ground truth answer Returns: Completeness score (0-1) """ if not response or not query: return 0.0 response_lower = response.lower() response_words = set(self._tokenize(response_lower)) if not response_words: return 0.0 completeness_scores = [] # Score 1: Response length (must have substantive content) min_content_words = 10 # At least 10 meaningful words length_score = min(len(response_words) / min_content_words, 1.0) completeness_scores.append(length_score * 0.3) # Weight: 30% # Score 2: Ground truth coverage (if available) if ground_truth: gt_lower = ground_truth.lower() gt_words = set(self._tokenize(gt_lower)) if gt_words: # Completeness = intersection / relevant_set # How much of ground truth info is in response overlap = len(gt_words & response_words) gt_coverage = overlap / len(gt_words) completeness_scores.append(gt_coverage * 0.7) # Weight: 70% else: completeness_scores.append(0.0) else: # Without ground truth, use query type matching heuristic query_lower = query.lower() # Check for key information based on query type answer_patterns = { "what": ["is", "are", "can", "does"], "when": ["year", "date", "time", "century", "period"], "where": ["location", "place", "country", "city", "region"], "who": ["person", "people", "name", "character"], "why": ["because", "due", "reason", "cause"], "how": ["method", "process", "step", "way"] } base_score = 0.3 # Default if no query type match for q_type, keywords in answer_patterns.items(): if q_type in query_lower: # Check if response contains relevant keywords keyword_matches = sum(1 for kw in keywords if kw in response_lower) if keyword_matches > 0: base_score = 0.7 break completeness_scores.append(base_score) # Return average completeness return float(np.mean(completeness_scores)) if completeness_scores else 0.0 def _tokenize(self, text: str) -> List[str]: """Tokenize text into words.""" # Remove punctuation and split text = re.sub(r'[^\w\s]', ' ', text) words = text.split() # Filter out very short words and common stop words stop_words = {"a", "an", "the", "is", "are", "was", "were", "in", "on", "at", "to", "for"} return [w for w in words if len(w) > 2 and w not in stop_words] def _extract_keywords(self, text: str) -> List[str]: """Extract keywords from text.""" words = self._tokenize(text) # Simple keyword extraction - words that appear in query # In production, use TF-IDF or similar word_freq = Counter(words) # Return words that appear at least once return list(word_freq.keys()) def _split_sentences(self, text: str) -> List[str]: """Split text into sentences.""" # Simple sentence splitting sentences = re.split(r'[.!?]+', text) return [s.strip() for s in sentences if s.strip()] def evaluate_batch( self, test_data: List[Dict] ) -> Dict: """Evaluate multiple test cases. Args: test_data: List of test cases, each containing: - query: User query - response: Generated response - retrieved_documents: Retrieved documents - ground_truth: Ground truth answer (optional) Returns: Dictionary with aggregated scores and metadata, plus detailed per-query info """ all_scores = [] detailed_results = [] for i, test_case in enumerate(test_data): print(f"Evaluating test case {i+1}/{len(test_data)}") query = test_case.get("query", "") response = test_case.get("response", "") retrieved_documents = test_case.get("retrieved_documents", []) ground_truth = test_case.get("ground_truth") scores = self.evaluate( query=query, response=response, retrieved_documents=retrieved_documents, ground_truth=ground_truth ) all_scores.append(scores) # Store detailed information for each query detailed_results.append({ "query_id": i + 1, "question": query, "llm_response": response, "retrieved_documents": retrieved_documents, "ground_truth": ground_truth, "metrics": { "utilization": float(scores.utilization), "relevance": float(scores.relevance), "adherence": float(scores.adherence), "completeness": float(scores.completeness), "average": float(scores.average()) } }) # Aggregate scores avg_utilization = np.mean([s.utilization for s in all_scores]) avg_relevance = np.mean([s.relevance for s in all_scores]) avg_adherence = np.mean([s.adherence for s in all_scores]) avg_completeness = np.mean([s.completeness for s in all_scores]) results = { "utilization": float(avg_utilization), "relevance": float(avg_relevance), "adherence": float(avg_adherence), "completeness": float(avg_completeness), "average": float((avg_utilization + avg_relevance + avg_adherence + avg_completeness) / 4), "num_samples": len(test_data), "individual_scores": [s.to_dict() for s in all_scores], # Include detailed per-query information "detailed_results": detailed_results, # Include evaluation metadata for reproducibility "evaluation_config": { "chunking_strategy": self.chunking_strategy, "embedding_model": self.embedding_model, "chunk_size": self.chunk_size, "chunk_overlap": self.chunk_overlap } } return results