File size: 16,583 Bytes
1d10b0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
"""TRACe evaluation metrics for RAG systems (per RAGBench paper: arXiv:2407.11005).

TRACe Framework (4 metrics):
- uTilization (T): Fraction of retrieved context the generator uses
  Formula: Utilization = Σ Len(U_i) / Σ Len(d_i)
  where U_i = utilized spans in doc d_i
  
- Relevance (R): Fraction of retrieved context relevant to query
  Formula: Relevance = Σ Len(R_i) / Σ Len(d_i)
  where R_i = relevant spans in doc d_i
  
- Adherence (A): Whether response is grounded in context (no hallucinations)
  Boolean/Span-level: All response claims must be supported by docs
  
- Completeness (C): Fraction of relevant info covered by response
  Formula: Completeness = Len(R_i ∩ U_i) / Len(R_i)
  where R_i ∩ U_i = intersection of relevant AND utilized spans

Note: This is a 4-metric framework. The stylization "TRACe" does not include a 5th "E=Evaluation" metric.

GPT Labeling Integration:
This module also supports advanced GPT-based labeling using sentence-level annotations
to compute metrics more accurately than rule-based heuristics. See advanced_rag_evaluator.py
for the detailed implementation.
"""
from typing import List, Dict, Optional
import numpy as np
from dataclasses import dataclass
import re
from collections import Counter


@dataclass
class TRACEScores:
    """Container for TRACE evaluation scores."""
    utilization: float
    relevance: float
    adherence: float
    completeness: float
    
    def to_dict(self) -> Dict:
        """Convert to dictionary."""
        return {
            "utilization": self.utilization,
            "relevance": self.relevance,
            "adherence": self.adherence,
            "completeness": self.completeness,
            "average": self.average()
        }
    
    def average(self) -> float:
        """Calculate average score."""
        return (self.utilization + self.relevance + 
                self.adherence + self.completeness) / 4


class TRACEEvaluator:
    """TRACe evaluation metrics for RAG systems (per RAGBench paper arXiv:2407.11005)."""
    
    def __init__(
        self,
        llm_client=None,
        chunking_strategy: Optional[str] = None,
        embedding_model: Optional[str] = None,
        chunk_size: Optional[int] = None,
        chunk_overlap: Optional[int] = None
    ):
        """Initialize TRACe evaluator.
        
        Args:
            llm_client: Optional LLM client for LLM-based evaluation
            chunking_strategy: Chunking strategy used (e.g., 'dense', 'sparse', 'hybrid')
            embedding_model: Embedding model used for vector retrieval
            chunk_size: Size of chunks used
            chunk_overlap: Overlap size between chunks
        """
        self.llm_client = llm_client
        self.chunking_strategy = chunking_strategy
        self.embedding_model = embedding_model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def evaluate(
        self,
        query: str,
        response: str,
        retrieved_documents: List[str],
        ground_truth: Optional[str] = None
    ) -> TRACEScores:
        """Evaluate a RAG response using TRACE metrics.
        
        Args:
            query: User query
            response: Generated response
            retrieved_documents: List of retrieved documents
            ground_truth: Optional ground truth answer
            
        Returns:
            TRACEScores object
        """
        utilization = self._compute_utilization(response, retrieved_documents)
        relevance = self._compute_relevance(query, retrieved_documents)
        adherence = self._compute_adherence(response, retrieved_documents)
        completeness = self._compute_completeness(query, response, ground_truth)
        
        return TRACEScores(
            utilization=utilization,
            relevance=relevance,
            adherence=adherence,
            completeness=completeness
        )
    
    def _compute_utilization(
        self,
        response: str,
        retrieved_documents: List[str]
    ) -> float:
        """Compute utilization score.
        
        Measures how well the system uses retrieved documents.
        Score based on:
        - Number of documents that contributed to the response
        - Proportion of retrieved documents used
        
        Args:
            response: Generated response
            retrieved_documents: List of retrieved documents
            
        Returns:
            Utilization score (0-1)
        """
        if not retrieved_documents or not response:
            return 0.0
        
        response_lower = response.lower()
        response_words = set(self._tokenize(response_lower))
        
        # Count how many documents contributed
        docs_used = 0
        total_overlap = 0
        
        for doc in retrieved_documents:
            doc_lower = doc.lower()
            doc_words = set(self._tokenize(doc_lower))
            
            # Check for significant overlap
            overlap = len(response_words & doc_words)
            if overlap > 5:  # Threshold for significant contribution
                docs_used += 1
                total_overlap += overlap
        
        # Score based on proportion of documents used
        proportion_used = docs_used / len(retrieved_documents)
        
        # Also consider depth of utilization
        avg_overlap = total_overlap / len(retrieved_documents) if retrieved_documents else 0
        depth_score = min(avg_overlap / 20, 1.0)  # Normalize
        
        # Combined score
        utilization_score = 0.6 * proportion_used + 0.4 * depth_score
        
        return min(utilization_score, 1.0)
    
    def _compute_relevance(
        self,
        query: str,
        retrieved_documents: List[str]
    ) -> float:
        """Compute relevance score.
        
        Measures relevance of retrieved documents to the query.
        Uses lexical overlap and keyword matching.
        
        Args:
            query: User query
            retrieved_documents: List of retrieved documents
            
        Returns:
            Relevance score (0-1)
        """
        if not retrieved_documents or not query:
            return 0.0
        
        query_lower = query.lower()
        query_words = set(self._tokenize(query_lower))
        query_keywords = self._extract_keywords(query_lower)
        
        relevance_scores = []
        
        for doc in retrieved_documents:
            doc_lower = doc.lower()
            doc_words = set(self._tokenize(doc_lower))
            
            # Lexical overlap
            overlap = len(query_words & doc_words)
            overlap_score = overlap / len(query_words) if query_words else 0
            
            # Keyword matching
            keyword_matches = sum(1 for kw in query_keywords if kw in doc_lower)
            keyword_score = keyword_matches / len(query_keywords) if query_keywords else 0
            
            # Combined relevance for this document
            doc_relevance = 0.5 * overlap_score + 0.5 * keyword_score
            relevance_scores.append(doc_relevance)
        
        # Average relevance across documents
        return float(np.mean(relevance_scores))
    
    def _compute_adherence(
        self,
        response: str,
        retrieved_documents: List[str]
    ) -> float:
        """Compute adherence score (Boolean: 0.0 = hallucinated, 1.0 = grounded).
        
        Per RAGBench paper: Adherence is whether ALL response claims are grounded.
        Example-level: Boolean indicating if entire response is supported by documents.
        
        Args:
            response: Generated response
            retrieved_documents: List of retrieved documents
            
        Returns:
            Adherence score (1.0 = fully grounded, 0.0 = contains hallucinations)
        """
        if not retrieved_documents or not response:
            return 0.0
        
        # Combine all documents
        combined_docs = " ".join(retrieved_documents).lower()
        doc_words = set(self._tokenize(combined_docs))
        
        # Analyze response
        response_lower = response.lower()
        response_sentences = self._split_sentences(response_lower)
        
        if not response_sentences:
            return 0.0
        
        # Check if ALL sentences are grounded (Boolean logic per paper)
        # If ANY sentence has low grounding, response contains hallucination
        grounding_threshold = 0.5  # At least 50% of words must be in docs
        all_grounded = True
        
        for sentence in response_sentences:
            sentence_words = set(self._tokenize(sentence))
            
            if not sentence_words:  # Skip empty sentences
                continue
            
            # Check what proportion of sentence words appear in documents
            grounded_words = len(sentence_words & doc_words)
            grounding_ratio = grounded_words / len(sentence_words)
            
            # If any sentence is below threshold, mark as hallucinated
            if grounding_ratio < grounding_threshold:
                all_grounded = False
                break
        
        # Return Boolean: 1.0 if fully grounded, 0.0 if contains hallucination
        return 1.0 if all_grounded else 0.0
    
    def _compute_completeness(
        self,
        query: str,
        response: str,
        ground_truth: Optional[str] = None
    ) -> float:
        """Compute completeness score.
        
        Per RAGBench: Completeness = Len(R_i ∩ U_i) / Len(R_i)
        How much of the relevant information is covered by the response.
        
        Args:
            query: User query
            response: Generated response
            ground_truth: Optional ground truth answer
            
        Returns:
            Completeness score (0-1)
        """
        if not response or not query:
            return 0.0
        
        response_lower = response.lower()
        response_words = set(self._tokenize(response_lower))
        
        if not response_words:
            return 0.0
        
        completeness_scores = []
        
        # Score 1: Response length (must have substantive content)
        min_content_words = 10  # At least 10 meaningful words
        length_score = min(len(response_words) / min_content_words, 1.0)
        completeness_scores.append(length_score * 0.3)  # Weight: 30%
        
        # Score 2: Ground truth coverage (if available)
        if ground_truth:
            gt_lower = ground_truth.lower()
            gt_words = set(self._tokenize(gt_lower))
            
            if gt_words:
                # Completeness = intersection / relevant_set
                # How much of ground truth info is in response
                overlap = len(gt_words & response_words)
                gt_coverage = overlap / len(gt_words)
                completeness_scores.append(gt_coverage * 0.7)  # Weight: 70%
            else:
                completeness_scores.append(0.0)
        else:
            # Without ground truth, use query type matching heuristic
            query_lower = query.lower()
            
            # Check for key information based on query type
            answer_patterns = {
                "what": ["is", "are", "can", "does"],
                "when": ["year", "date", "time", "century", "period"],
                "where": ["location", "place", "country", "city", "region"],
                "who": ["person", "people", "name", "character"],
                "why": ["because", "due", "reason", "cause"],
                "how": ["method", "process", "step", "way"]
            }
            
            base_score = 0.3  # Default if no query type match
            
            for q_type, keywords in answer_patterns.items():
                if q_type in query_lower:
                    # Check if response contains relevant keywords
                    keyword_matches = sum(1 for kw in keywords if kw in response_lower)
                    if keyword_matches > 0:
                        base_score = 0.7
                    break
            
            completeness_scores.append(base_score)
        
        # Return average completeness
        return float(np.mean(completeness_scores)) if completeness_scores else 0.0
    
    def _tokenize(self, text: str) -> List[str]:
        """Tokenize text into words."""
        # Remove punctuation and split
        text = re.sub(r'[^\w\s]', ' ', text)
        words = text.split()
        # Filter out very short words and common stop words
        stop_words = {"a", "an", "the", "is", "are", "was", "were", "in", "on", "at", "to", "for"}
        return [w for w in words if len(w) > 2 and w not in stop_words]
    
    def _extract_keywords(self, text: str) -> List[str]:
        """Extract keywords from text."""
        words = self._tokenize(text)
        # Simple keyword extraction - words that appear in query
        # In production, use TF-IDF or similar
        word_freq = Counter(words)
        # Return words that appear at least once
        return list(word_freq.keys())
    
    def _split_sentences(self, text: str) -> List[str]:
        """Split text into sentences."""
        # Simple sentence splitting
        sentences = re.split(r'[.!?]+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def evaluate_batch(
        self,
        test_data: List[Dict]
    ) -> Dict:
        """Evaluate multiple test cases.
        
        Args:
            test_data: List of test cases, each containing:
                - query: User query
                - response: Generated response
                - retrieved_documents: Retrieved documents
                - ground_truth: Ground truth answer (optional)
        
        Returns:
            Dictionary with aggregated scores and metadata, plus detailed per-query info
        """
        all_scores = []
        detailed_results = []
        
        for i, test_case in enumerate(test_data):
            print(f"Evaluating test case {i+1}/{len(test_data)}")
            
            query = test_case.get("query", "")
            response = test_case.get("response", "")
            retrieved_documents = test_case.get("retrieved_documents", [])
            ground_truth = test_case.get("ground_truth")
            
            scores = self.evaluate(
                query=query,
                response=response,
                retrieved_documents=retrieved_documents,
                ground_truth=ground_truth
            )
            
            all_scores.append(scores)
            
            # Store detailed information for each query
            detailed_results.append({
                "query_id": i + 1,
                "question": query,
                "llm_response": response,
                "retrieved_documents": retrieved_documents,
                "ground_truth": ground_truth,
                "metrics": {
                    "utilization": float(scores.utilization),
                    "relevance": float(scores.relevance),
                    "adherence": float(scores.adherence),
                    "completeness": float(scores.completeness),
                    "average": float(scores.average())
                }
            })
        
        # Aggregate scores
        avg_utilization = np.mean([s.utilization for s in all_scores])
        avg_relevance = np.mean([s.relevance for s in all_scores])
        avg_adherence = np.mean([s.adherence for s in all_scores])
        avg_completeness = np.mean([s.completeness for s in all_scores])
        
        results = {
            "utilization": float(avg_utilization),
            "relevance": float(avg_relevance),
            "adherence": float(avg_adherence),
            "completeness": float(avg_completeness),
            "average": float((avg_utilization + avg_relevance + 
                            avg_adherence + avg_completeness) / 4),
            "num_samples": len(test_data),
            "individual_scores": [s.to_dict() for s in all_scores],
            # Include detailed per-query information
            "detailed_results": detailed_results,
            # Include evaluation metadata for reproducibility
            "evaluation_config": {
                "chunking_strategy": self.chunking_strategy,
                "embedding_model": self.embedding_model,
                "chunk_size": self.chunk_size,
                "chunk_overlap": self.chunk_overlap
            }
        }
        
        return results