CapStoneRAG10 / trace_evaluator.py
Developer
Initial commit for HuggingFace Spaces - RAG Capstone Project with Qdrant Cloud
1d10b0a
"""TRACe evaluation metrics for RAG systems (per RAGBench paper: arXiv:2407.11005).
TRACe Framework (4 metrics):
- uTilization (T): Fraction of retrieved context the generator uses
Formula: Utilization = Σ Len(U_i) / Σ Len(d_i)
where U_i = utilized spans in doc d_i
- Relevance (R): Fraction of retrieved context relevant to query
Formula: Relevance = Σ Len(R_i) / Σ Len(d_i)
where R_i = relevant spans in doc d_i
- Adherence (A): Whether response is grounded in context (no hallucinations)
Boolean/Span-level: All response claims must be supported by docs
- Completeness (C): Fraction of relevant info covered by response
Formula: Completeness = Len(R_i ∩ U_i) / Len(R_i)
where R_i ∩ U_i = intersection of relevant AND utilized spans
Note: This is a 4-metric framework. The stylization "TRACe" does not include a 5th "E=Evaluation" metric.
GPT Labeling Integration:
This module also supports advanced GPT-based labeling using sentence-level annotations
to compute metrics more accurately than rule-based heuristics. See advanced_rag_evaluator.py
for the detailed implementation.
"""
from typing import List, Dict, Optional
import numpy as np
from dataclasses import dataclass
import re
from collections import Counter
@dataclass
class TRACEScores:
"""Container for TRACE evaluation scores."""
utilization: float
relevance: float
adherence: float
completeness: float
def to_dict(self) -> Dict:
"""Convert to dictionary."""
return {
"utilization": self.utilization,
"relevance": self.relevance,
"adherence": self.adherence,
"completeness": self.completeness,
"average": self.average()
}
def average(self) -> float:
"""Calculate average score."""
return (self.utilization + self.relevance +
self.adherence + self.completeness) / 4
class TRACEEvaluator:
"""TRACe evaluation metrics for RAG systems (per RAGBench paper arXiv:2407.11005)."""
def __init__(
self,
llm_client=None,
chunking_strategy: Optional[str] = None,
embedding_model: Optional[str] = None,
chunk_size: Optional[int] = None,
chunk_overlap: Optional[int] = None
):
"""Initialize TRACe evaluator.
Args:
llm_client: Optional LLM client for LLM-based evaluation
chunking_strategy: Chunking strategy used (e.g., 'dense', 'sparse', 'hybrid')
embedding_model: Embedding model used for vector retrieval
chunk_size: Size of chunks used
chunk_overlap: Overlap size between chunks
"""
self.llm_client = llm_client
self.chunking_strategy = chunking_strategy
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def evaluate(
self,
query: str,
response: str,
retrieved_documents: List[str],
ground_truth: Optional[str] = None
) -> TRACEScores:
"""Evaluate a RAG response using TRACE metrics.
Args:
query: User query
response: Generated response
retrieved_documents: List of retrieved documents
ground_truth: Optional ground truth answer
Returns:
TRACEScores object
"""
utilization = self._compute_utilization(response, retrieved_documents)
relevance = self._compute_relevance(query, retrieved_documents)
adherence = self._compute_adherence(response, retrieved_documents)
completeness = self._compute_completeness(query, response, ground_truth)
return TRACEScores(
utilization=utilization,
relevance=relevance,
adherence=adherence,
completeness=completeness
)
def _compute_utilization(
self,
response: str,
retrieved_documents: List[str]
) -> float:
"""Compute utilization score.
Measures how well the system uses retrieved documents.
Score based on:
- Number of documents that contributed to the response
- Proportion of retrieved documents used
Args:
response: Generated response
retrieved_documents: List of retrieved documents
Returns:
Utilization score (0-1)
"""
if not retrieved_documents or not response:
return 0.0
response_lower = response.lower()
response_words = set(self._tokenize(response_lower))
# Count how many documents contributed
docs_used = 0
total_overlap = 0
for doc in retrieved_documents:
doc_lower = doc.lower()
doc_words = set(self._tokenize(doc_lower))
# Check for significant overlap
overlap = len(response_words & doc_words)
if overlap > 5: # Threshold for significant contribution
docs_used += 1
total_overlap += overlap
# Score based on proportion of documents used
proportion_used = docs_used / len(retrieved_documents)
# Also consider depth of utilization
avg_overlap = total_overlap / len(retrieved_documents) if retrieved_documents else 0
depth_score = min(avg_overlap / 20, 1.0) # Normalize
# Combined score
utilization_score = 0.6 * proportion_used + 0.4 * depth_score
return min(utilization_score, 1.0)
def _compute_relevance(
self,
query: str,
retrieved_documents: List[str]
) -> float:
"""Compute relevance score.
Measures relevance of retrieved documents to the query.
Uses lexical overlap and keyword matching.
Args:
query: User query
retrieved_documents: List of retrieved documents
Returns:
Relevance score (0-1)
"""
if not retrieved_documents or not query:
return 0.0
query_lower = query.lower()
query_words = set(self._tokenize(query_lower))
query_keywords = self._extract_keywords(query_lower)
relevance_scores = []
for doc in retrieved_documents:
doc_lower = doc.lower()
doc_words = set(self._tokenize(doc_lower))
# Lexical overlap
overlap = len(query_words & doc_words)
overlap_score = overlap / len(query_words) if query_words else 0
# Keyword matching
keyword_matches = sum(1 for kw in query_keywords if kw in doc_lower)
keyword_score = keyword_matches / len(query_keywords) if query_keywords else 0
# Combined relevance for this document
doc_relevance = 0.5 * overlap_score + 0.5 * keyword_score
relevance_scores.append(doc_relevance)
# Average relevance across documents
return float(np.mean(relevance_scores))
def _compute_adherence(
self,
response: str,
retrieved_documents: List[str]
) -> float:
"""Compute adherence score (Boolean: 0.0 = hallucinated, 1.0 = grounded).
Per RAGBench paper: Adherence is whether ALL response claims are grounded.
Example-level: Boolean indicating if entire response is supported by documents.
Args:
response: Generated response
retrieved_documents: List of retrieved documents
Returns:
Adherence score (1.0 = fully grounded, 0.0 = contains hallucinations)
"""
if not retrieved_documents or not response:
return 0.0
# Combine all documents
combined_docs = " ".join(retrieved_documents).lower()
doc_words = set(self._tokenize(combined_docs))
# Analyze response
response_lower = response.lower()
response_sentences = self._split_sentences(response_lower)
if not response_sentences:
return 0.0
# Check if ALL sentences are grounded (Boolean logic per paper)
# If ANY sentence has low grounding, response contains hallucination
grounding_threshold = 0.5 # At least 50% of words must be in docs
all_grounded = True
for sentence in response_sentences:
sentence_words = set(self._tokenize(sentence))
if not sentence_words: # Skip empty sentences
continue
# Check what proportion of sentence words appear in documents
grounded_words = len(sentence_words & doc_words)
grounding_ratio = grounded_words / len(sentence_words)
# If any sentence is below threshold, mark as hallucinated
if grounding_ratio < grounding_threshold:
all_grounded = False
break
# Return Boolean: 1.0 if fully grounded, 0.0 if contains hallucination
return 1.0 if all_grounded else 0.0
def _compute_completeness(
self,
query: str,
response: str,
ground_truth: Optional[str] = None
) -> float:
"""Compute completeness score.
Per RAGBench: Completeness = Len(R_i ∩ U_i) / Len(R_i)
How much of the relevant information is covered by the response.
Args:
query: User query
response: Generated response
ground_truth: Optional ground truth answer
Returns:
Completeness score (0-1)
"""
if not response or not query:
return 0.0
response_lower = response.lower()
response_words = set(self._tokenize(response_lower))
if not response_words:
return 0.0
completeness_scores = []
# Score 1: Response length (must have substantive content)
min_content_words = 10 # At least 10 meaningful words
length_score = min(len(response_words) / min_content_words, 1.0)
completeness_scores.append(length_score * 0.3) # Weight: 30%
# Score 2: Ground truth coverage (if available)
if ground_truth:
gt_lower = ground_truth.lower()
gt_words = set(self._tokenize(gt_lower))
if gt_words:
# Completeness = intersection / relevant_set
# How much of ground truth info is in response
overlap = len(gt_words & response_words)
gt_coverage = overlap / len(gt_words)
completeness_scores.append(gt_coverage * 0.7) # Weight: 70%
else:
completeness_scores.append(0.0)
else:
# Without ground truth, use query type matching heuristic
query_lower = query.lower()
# Check for key information based on query type
answer_patterns = {
"what": ["is", "are", "can", "does"],
"when": ["year", "date", "time", "century", "period"],
"where": ["location", "place", "country", "city", "region"],
"who": ["person", "people", "name", "character"],
"why": ["because", "due", "reason", "cause"],
"how": ["method", "process", "step", "way"]
}
base_score = 0.3 # Default if no query type match
for q_type, keywords in answer_patterns.items():
if q_type in query_lower:
# Check if response contains relevant keywords
keyword_matches = sum(1 for kw in keywords if kw in response_lower)
if keyword_matches > 0:
base_score = 0.7
break
completeness_scores.append(base_score)
# Return average completeness
return float(np.mean(completeness_scores)) if completeness_scores else 0.0
def _tokenize(self, text: str) -> List[str]:
"""Tokenize text into words."""
# Remove punctuation and split
text = re.sub(r'[^\w\s]', ' ', text)
words = text.split()
# Filter out very short words and common stop words
stop_words = {"a", "an", "the", "is", "are", "was", "were", "in", "on", "at", "to", "for"}
return [w for w in words if len(w) > 2 and w not in stop_words]
def _extract_keywords(self, text: str) -> List[str]:
"""Extract keywords from text."""
words = self._tokenize(text)
# Simple keyword extraction - words that appear in query
# In production, use TF-IDF or similar
word_freq = Counter(words)
# Return words that appear at least once
return list(word_freq.keys())
def _split_sentences(self, text: str) -> List[str]:
"""Split text into sentences."""
# Simple sentence splitting
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def evaluate_batch(
self,
test_data: List[Dict]
) -> Dict:
"""Evaluate multiple test cases.
Args:
test_data: List of test cases, each containing:
- query: User query
- response: Generated response
- retrieved_documents: Retrieved documents
- ground_truth: Ground truth answer (optional)
Returns:
Dictionary with aggregated scores and metadata, plus detailed per-query info
"""
all_scores = []
detailed_results = []
for i, test_case in enumerate(test_data):
print(f"Evaluating test case {i+1}/{len(test_data)}")
query = test_case.get("query", "")
response = test_case.get("response", "")
retrieved_documents = test_case.get("retrieved_documents", [])
ground_truth = test_case.get("ground_truth")
scores = self.evaluate(
query=query,
response=response,
retrieved_documents=retrieved_documents,
ground_truth=ground_truth
)
all_scores.append(scores)
# Store detailed information for each query
detailed_results.append({
"query_id": i + 1,
"question": query,
"llm_response": response,
"retrieved_documents": retrieved_documents,
"ground_truth": ground_truth,
"metrics": {
"utilization": float(scores.utilization),
"relevance": float(scores.relevance),
"adherence": float(scores.adherence),
"completeness": float(scores.completeness),
"average": float(scores.average())
}
})
# Aggregate scores
avg_utilization = np.mean([s.utilization for s in all_scores])
avg_relevance = np.mean([s.relevance for s in all_scores])
avg_adherence = np.mean([s.adherence for s in all_scores])
avg_completeness = np.mean([s.completeness for s in all_scores])
results = {
"utilization": float(avg_utilization),
"relevance": float(avg_relevance),
"adherence": float(avg_adherence),
"completeness": float(avg_completeness),
"average": float((avg_utilization + avg_relevance +
avg_adherence + avg_completeness) / 4),
"num_samples": len(test_data),
"individual_scores": [s.to_dict() for s in all_scores],
# Include detailed per-query information
"detailed_results": detailed_results,
# Include evaluation metadata for reproducibility
"evaluation_config": {
"chunking_strategy": self.chunking_strategy,
"embedding_model": self.embedding_model,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap
}
}
return results