Spaces:
Sleeping
Sleeping
File size: 16,583 Bytes
1d10b0a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
"""TRACe evaluation metrics for RAG systems (per RAGBench paper: arXiv:2407.11005).
TRACe Framework (4 metrics):
- uTilization (T): Fraction of retrieved context the generator uses
Formula: Utilization = Σ Len(U_i) / Σ Len(d_i)
where U_i = utilized spans in doc d_i
- Relevance (R): Fraction of retrieved context relevant to query
Formula: Relevance = Σ Len(R_i) / Σ Len(d_i)
where R_i = relevant spans in doc d_i
- Adherence (A): Whether response is grounded in context (no hallucinations)
Boolean/Span-level: All response claims must be supported by docs
- Completeness (C): Fraction of relevant info covered by response
Formula: Completeness = Len(R_i ∩ U_i) / Len(R_i)
where R_i ∩ U_i = intersection of relevant AND utilized spans
Note: This is a 4-metric framework. The stylization "TRACe" does not include a 5th "E=Evaluation" metric.
GPT Labeling Integration:
This module also supports advanced GPT-based labeling using sentence-level annotations
to compute metrics more accurately than rule-based heuristics. See advanced_rag_evaluator.py
for the detailed implementation.
"""
from typing import List, Dict, Optional
import numpy as np
from dataclasses import dataclass
import re
from collections import Counter
@dataclass
class TRACEScores:
"""Container for TRACE evaluation scores."""
utilization: float
relevance: float
adherence: float
completeness: float
def to_dict(self) -> Dict:
"""Convert to dictionary."""
return {
"utilization": self.utilization,
"relevance": self.relevance,
"adherence": self.adherence,
"completeness": self.completeness,
"average": self.average()
}
def average(self) -> float:
"""Calculate average score."""
return (self.utilization + self.relevance +
self.adherence + self.completeness) / 4
class TRACEEvaluator:
"""TRACe evaluation metrics for RAG systems (per RAGBench paper arXiv:2407.11005)."""
def __init__(
self,
llm_client=None,
chunking_strategy: Optional[str] = None,
embedding_model: Optional[str] = None,
chunk_size: Optional[int] = None,
chunk_overlap: Optional[int] = None
):
"""Initialize TRACe evaluator.
Args:
llm_client: Optional LLM client for LLM-based evaluation
chunking_strategy: Chunking strategy used (e.g., 'dense', 'sparse', 'hybrid')
embedding_model: Embedding model used for vector retrieval
chunk_size: Size of chunks used
chunk_overlap: Overlap size between chunks
"""
self.llm_client = llm_client
self.chunking_strategy = chunking_strategy
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def evaluate(
self,
query: str,
response: str,
retrieved_documents: List[str],
ground_truth: Optional[str] = None
) -> TRACEScores:
"""Evaluate a RAG response using TRACE metrics.
Args:
query: User query
response: Generated response
retrieved_documents: List of retrieved documents
ground_truth: Optional ground truth answer
Returns:
TRACEScores object
"""
utilization = self._compute_utilization(response, retrieved_documents)
relevance = self._compute_relevance(query, retrieved_documents)
adherence = self._compute_adherence(response, retrieved_documents)
completeness = self._compute_completeness(query, response, ground_truth)
return TRACEScores(
utilization=utilization,
relevance=relevance,
adherence=adherence,
completeness=completeness
)
def _compute_utilization(
self,
response: str,
retrieved_documents: List[str]
) -> float:
"""Compute utilization score.
Measures how well the system uses retrieved documents.
Score based on:
- Number of documents that contributed to the response
- Proportion of retrieved documents used
Args:
response: Generated response
retrieved_documents: List of retrieved documents
Returns:
Utilization score (0-1)
"""
if not retrieved_documents or not response:
return 0.0
response_lower = response.lower()
response_words = set(self._tokenize(response_lower))
# Count how many documents contributed
docs_used = 0
total_overlap = 0
for doc in retrieved_documents:
doc_lower = doc.lower()
doc_words = set(self._tokenize(doc_lower))
# Check for significant overlap
overlap = len(response_words & doc_words)
if overlap > 5: # Threshold for significant contribution
docs_used += 1
total_overlap += overlap
# Score based on proportion of documents used
proportion_used = docs_used / len(retrieved_documents)
# Also consider depth of utilization
avg_overlap = total_overlap / len(retrieved_documents) if retrieved_documents else 0
depth_score = min(avg_overlap / 20, 1.0) # Normalize
# Combined score
utilization_score = 0.6 * proportion_used + 0.4 * depth_score
return min(utilization_score, 1.0)
def _compute_relevance(
self,
query: str,
retrieved_documents: List[str]
) -> float:
"""Compute relevance score.
Measures relevance of retrieved documents to the query.
Uses lexical overlap and keyword matching.
Args:
query: User query
retrieved_documents: List of retrieved documents
Returns:
Relevance score (0-1)
"""
if not retrieved_documents or not query:
return 0.0
query_lower = query.lower()
query_words = set(self._tokenize(query_lower))
query_keywords = self._extract_keywords(query_lower)
relevance_scores = []
for doc in retrieved_documents:
doc_lower = doc.lower()
doc_words = set(self._tokenize(doc_lower))
# Lexical overlap
overlap = len(query_words & doc_words)
overlap_score = overlap / len(query_words) if query_words else 0
# Keyword matching
keyword_matches = sum(1 for kw in query_keywords if kw in doc_lower)
keyword_score = keyword_matches / len(query_keywords) if query_keywords else 0
# Combined relevance for this document
doc_relevance = 0.5 * overlap_score + 0.5 * keyword_score
relevance_scores.append(doc_relevance)
# Average relevance across documents
return float(np.mean(relevance_scores))
def _compute_adherence(
self,
response: str,
retrieved_documents: List[str]
) -> float:
"""Compute adherence score (Boolean: 0.0 = hallucinated, 1.0 = grounded).
Per RAGBench paper: Adherence is whether ALL response claims are grounded.
Example-level: Boolean indicating if entire response is supported by documents.
Args:
response: Generated response
retrieved_documents: List of retrieved documents
Returns:
Adherence score (1.0 = fully grounded, 0.0 = contains hallucinations)
"""
if not retrieved_documents or not response:
return 0.0
# Combine all documents
combined_docs = " ".join(retrieved_documents).lower()
doc_words = set(self._tokenize(combined_docs))
# Analyze response
response_lower = response.lower()
response_sentences = self._split_sentences(response_lower)
if not response_sentences:
return 0.0
# Check if ALL sentences are grounded (Boolean logic per paper)
# If ANY sentence has low grounding, response contains hallucination
grounding_threshold = 0.5 # At least 50% of words must be in docs
all_grounded = True
for sentence in response_sentences:
sentence_words = set(self._tokenize(sentence))
if not sentence_words: # Skip empty sentences
continue
# Check what proportion of sentence words appear in documents
grounded_words = len(sentence_words & doc_words)
grounding_ratio = grounded_words / len(sentence_words)
# If any sentence is below threshold, mark as hallucinated
if grounding_ratio < grounding_threshold:
all_grounded = False
break
# Return Boolean: 1.0 if fully grounded, 0.0 if contains hallucination
return 1.0 if all_grounded else 0.0
def _compute_completeness(
self,
query: str,
response: str,
ground_truth: Optional[str] = None
) -> float:
"""Compute completeness score.
Per RAGBench: Completeness = Len(R_i ∩ U_i) / Len(R_i)
How much of the relevant information is covered by the response.
Args:
query: User query
response: Generated response
ground_truth: Optional ground truth answer
Returns:
Completeness score (0-1)
"""
if not response or not query:
return 0.0
response_lower = response.lower()
response_words = set(self._tokenize(response_lower))
if not response_words:
return 0.0
completeness_scores = []
# Score 1: Response length (must have substantive content)
min_content_words = 10 # At least 10 meaningful words
length_score = min(len(response_words) / min_content_words, 1.0)
completeness_scores.append(length_score * 0.3) # Weight: 30%
# Score 2: Ground truth coverage (if available)
if ground_truth:
gt_lower = ground_truth.lower()
gt_words = set(self._tokenize(gt_lower))
if gt_words:
# Completeness = intersection / relevant_set
# How much of ground truth info is in response
overlap = len(gt_words & response_words)
gt_coverage = overlap / len(gt_words)
completeness_scores.append(gt_coverage * 0.7) # Weight: 70%
else:
completeness_scores.append(0.0)
else:
# Without ground truth, use query type matching heuristic
query_lower = query.lower()
# Check for key information based on query type
answer_patterns = {
"what": ["is", "are", "can", "does"],
"when": ["year", "date", "time", "century", "period"],
"where": ["location", "place", "country", "city", "region"],
"who": ["person", "people", "name", "character"],
"why": ["because", "due", "reason", "cause"],
"how": ["method", "process", "step", "way"]
}
base_score = 0.3 # Default if no query type match
for q_type, keywords in answer_patterns.items():
if q_type in query_lower:
# Check if response contains relevant keywords
keyword_matches = sum(1 for kw in keywords if kw in response_lower)
if keyword_matches > 0:
base_score = 0.7
break
completeness_scores.append(base_score)
# Return average completeness
return float(np.mean(completeness_scores)) if completeness_scores else 0.0
def _tokenize(self, text: str) -> List[str]:
"""Tokenize text into words."""
# Remove punctuation and split
text = re.sub(r'[^\w\s]', ' ', text)
words = text.split()
# Filter out very short words and common stop words
stop_words = {"a", "an", "the", "is", "are", "was", "were", "in", "on", "at", "to", "for"}
return [w for w in words if len(w) > 2 and w not in stop_words]
def _extract_keywords(self, text: str) -> List[str]:
"""Extract keywords from text."""
words = self._tokenize(text)
# Simple keyword extraction - words that appear in query
# In production, use TF-IDF or similar
word_freq = Counter(words)
# Return words that appear at least once
return list(word_freq.keys())
def _split_sentences(self, text: str) -> List[str]:
"""Split text into sentences."""
# Simple sentence splitting
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def evaluate_batch(
self,
test_data: List[Dict]
) -> Dict:
"""Evaluate multiple test cases.
Args:
test_data: List of test cases, each containing:
- query: User query
- response: Generated response
- retrieved_documents: Retrieved documents
- ground_truth: Ground truth answer (optional)
Returns:
Dictionary with aggregated scores and metadata, plus detailed per-query info
"""
all_scores = []
detailed_results = []
for i, test_case in enumerate(test_data):
print(f"Evaluating test case {i+1}/{len(test_data)}")
query = test_case.get("query", "")
response = test_case.get("response", "")
retrieved_documents = test_case.get("retrieved_documents", [])
ground_truth = test_case.get("ground_truth")
scores = self.evaluate(
query=query,
response=response,
retrieved_documents=retrieved_documents,
ground_truth=ground_truth
)
all_scores.append(scores)
# Store detailed information for each query
detailed_results.append({
"query_id": i + 1,
"question": query,
"llm_response": response,
"retrieved_documents": retrieved_documents,
"ground_truth": ground_truth,
"metrics": {
"utilization": float(scores.utilization),
"relevance": float(scores.relevance),
"adherence": float(scores.adherence),
"completeness": float(scores.completeness),
"average": float(scores.average())
}
})
# Aggregate scores
avg_utilization = np.mean([s.utilization for s in all_scores])
avg_relevance = np.mean([s.relevance for s in all_scores])
avg_adherence = np.mean([s.adherence for s in all_scores])
avg_completeness = np.mean([s.completeness for s in all_scores])
results = {
"utilization": float(avg_utilization),
"relevance": float(avg_relevance),
"adherence": float(avg_adherence),
"completeness": float(avg_completeness),
"average": float((avg_utilization + avg_relevance +
avg_adherence + avg_completeness) / 4),
"num_samples": len(test_data),
"individual_scores": [s.to_dict() for s in all_scores],
# Include detailed per-query information
"detailed_results": detailed_results,
# Include evaluation metadata for reproducibility
"evaluation_config": {
"chunking_strategy": self.chunking_strategy,
"embedding_model": self.embedding_model,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap
}
}
return results
|