Spaces:
Sleeping
Sleeping
RGB Evaluation
fix: Information Integration evaluation - handle multiple answer variants with pipe-separated format
5253a83
| """ | |
| Evaluation Metrics for RGB RAG Benchmark | |
| Implements metrics from the research paper: | |
| - Accuracy: For noise robustness and information integration | |
| - Rejection Rate: For negative rejection | |
| - Error Detection Rate & Error Correction Rate: For counterfactual robustness | |
| """ | |
| import re | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| from collections import defaultdict | |
| class EvaluationResult: | |
| """Results for a single evaluation.""" | |
| task_type: str | |
| model_name: str | |
| total_samples: int = 0 | |
| correct: int = 0 | |
| incorrect: int = 0 | |
| rejected: int = 0 | |
| errors_detected: int = 0 | |
| errors_corrected: int = 0 | |
| # Breakdown by noise level (for noise robustness) | |
| accuracy_by_noise: Dict[int, float] = field(default_factory=dict) | |
| def accuracy(self) -> float: | |
| """Calculate accuracy percentage.""" | |
| if self.total_samples == 0: | |
| return 0.0 | |
| return (self.correct / self.total_samples) * 100 | |
| def rejection_rate(self) -> float: | |
| """Calculate rejection rate percentage.""" | |
| if self.total_samples == 0: | |
| return 0.0 | |
| return (self.rejected / self.total_samples) * 100 | |
| def error_detection_rate(self) -> float: | |
| """Calculate error detection rate percentage.""" | |
| if self.total_samples == 0: | |
| return 0.0 | |
| return (self.errors_detected / self.total_samples) * 100 | |
| def error_correction_rate(self) -> float: | |
| """Calculate error correction rate percentage.""" | |
| if self.total_samples == 0: | |
| return 0.0 | |
| return (self.errors_corrected / self.total_samples) * 100 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary.""" | |
| return { | |
| 'task_type': self.task_type, | |
| 'model_name': self.model_name, | |
| 'total_samples': self.total_samples, | |
| 'correct': self.correct, | |
| 'accuracy': round(self.accuracy, 2), | |
| 'rejected': self.rejected, | |
| 'rejection_rate': round(self.rejection_rate, 2), | |
| 'errors_detected': self.errors_detected, | |
| 'error_detection_rate': round(self.error_detection_rate, 2), | |
| 'errors_corrected': self.errors_corrected, | |
| 'error_correction_rate': round(self.error_correction_rate, 2), | |
| 'accuracy_by_noise': self.accuracy_by_noise, | |
| } | |
| class RGBEvaluator: | |
| """ | |
| Evaluator for RGB benchmark metrics. | |
| """ | |
| # Keywords that indicate the model is rejecting/refusing to answer | |
| # Primary keywords from Figure 3 of the paper - exact phrases expected | |
| PRIMARY_REJECTION_PHRASES = [ | |
| "i can not answer the question because of the insufficient information in documents", | |
| "insufficient information in documents", | |
| "can not answer", | |
| "cannot answer", | |
| ] | |
| # Secondary rejection keywords for more flexible matching | |
| REJECTION_KEYWORDS = [ | |
| "i don't know", | |
| "i cannot", | |
| "i can't", | |
| "unable to", | |
| "not able to", | |
| "insufficient information", | |
| "no information", | |
| "cannot determine", | |
| "cannot answer", | |
| "not enough information", | |
| "don't have enough", | |
| "unable to determine", | |
| "cannot find", | |
| "no relevant", | |
| "not mentioned", | |
| "not provided", | |
| "not specified", | |
| "unclear", | |
| "unknown", | |
| "i'm not sure", | |
| "i am not sure", | |
| "cannot be determined", | |
| "information is not available", | |
| "does not provide", | |
| ] | |
| # Keywords indicating error detection in counterfactual scenarios | |
| ERROR_DETECTION_KEYWORDS = [ | |
| "incorrect", | |
| "wrong", | |
| "false", | |
| "error", | |
| "mistake", | |
| "inaccurate", | |
| "not true", | |
| "not correct", | |
| "factually incorrect", | |
| "contradicts", | |
| "actually", | |
| "in fact", | |
| "however", | |
| "but actually", | |
| "the correct answer", | |
| "should be", | |
| ] | |
| def __init__(self): | |
| """Initialize the evaluator.""" | |
| pass | |
| def normalize_answer(self, answer: str) -> str: | |
| """ | |
| Normalize an answer for comparison. | |
| Removes punctuation, extra whitespace, and converts to lowercase. | |
| Also removes commas from dates for consistent matching. | |
| """ | |
| if not answer: | |
| return "" | |
| # Convert to lowercase | |
| answer = answer.lower().strip() | |
| # Remove common punctuation at the end | |
| answer = re.sub(r'[.!?,;:]+$', '', answer) | |
| # Remove all commas within the text (helps with date matching like "Jan, 2, 2022" -> "jan 2 2022") | |
| answer = answer.replace(',', '') | |
| # Remove extra whitespace | |
| answer = ' '.join(answer.split()) | |
| return answer | |
| def is_rejection(self, response: str) -> bool: | |
| """ | |
| Check if the model's response is a rejection (refusing to answer). | |
| According to Figure 3, the expected rejection phrase is: | |
| "I can not answer the question because of the insufficient information in documents." | |
| Args: | |
| response: The model's response text. | |
| Returns: | |
| True if the response is a rejection, False otherwise. | |
| """ | |
| response_lower = response.lower().strip() | |
| # Check for exact primary phrases first (as per Figure 3) | |
| for phrase in self.PRIMARY_REJECTION_PHRASES: | |
| if phrase in response_lower: | |
| return True | |
| # Fall back to more flexible keyword matching | |
| for keyword in self.REJECTION_KEYWORDS: | |
| if keyword in response_lower: | |
| return True | |
| return False | |
| def is_correct(self, response: str, ground_truth: str, strict: bool = False) -> bool: | |
| """ | |
| Check if the response matches the ground truth answer. | |
| Supports pipe-separated alternatives (for information integration with variants). | |
| Args: | |
| response: The model's response. | |
| ground_truth: The correct answer (can be pipe-separated alternatives). | |
| strict: If True, requires exact match. If False, allows partial match. | |
| Returns: | |
| True if the answer is correct, False otherwise. | |
| """ | |
| norm_response = self.normalize_answer(response) | |
| if not norm_response: | |
| return False | |
| # Handle pipe-separated alternatives (information integration with answer variants) | |
| if "|" in ground_truth: | |
| alternatives = [self.normalize_answer(alt.strip()) for alt in ground_truth.split("|")] | |
| # Check if response matches ANY of the alternatives | |
| for alternative in alternatives: | |
| if not alternative: | |
| continue | |
| # Check each matching strategy for this alternative | |
| if strict: | |
| if norm_response == alternative: | |
| return True | |
| else: | |
| # Substring match | |
| if alternative in norm_response: | |
| return True | |
| # Short answer in long answer | |
| if len(norm_response) < len(alternative) and norm_response in alternative: | |
| return True | |
| # Token overlap | |
| truth_tokens = set(alternative.split()) | |
| response_tokens = set(norm_response.split()) | |
| if len(truth_tokens) > 0: | |
| overlap = len(truth_tokens & response_tokens) / len(truth_tokens) | |
| if overlap >= 0.8: | |
| return True | |
| return False | |
| # Single answer (original logic) | |
| norm_truth = self.normalize_answer(ground_truth) | |
| if not norm_truth: | |
| return False | |
| if strict: | |
| return norm_response == norm_truth | |
| # Check if ground truth is contained in response | |
| if norm_truth in norm_response: | |
| return True | |
| # Check if response is contained in ground truth (for short answers) | |
| if len(norm_response) < len(norm_truth) and norm_response in norm_truth: | |
| return True | |
| # Check for token overlap | |
| truth_tokens = set(norm_truth.split()) | |
| response_tokens = set(norm_response.split()) | |
| if len(truth_tokens) > 0: | |
| overlap = len(truth_tokens & response_tokens) / len(truth_tokens) | |
| if overlap >= 0.8: # 80% token overlap | |
| return True | |
| return False | |
| def detects_error(self, response: str, counterfactual_answer: Optional[str]) -> bool: | |
| """ | |
| Check if the model detects an error in counterfactual information. | |
| Args: | |
| response: The model's response. | |
| counterfactual_answer: The incorrect answer in the documents. | |
| Returns: | |
| True if the model detected the error, False otherwise. | |
| """ | |
| response_lower = response.lower() | |
| # Check for error detection keywords | |
| for keyword in self.ERROR_DETECTION_KEYWORDS: | |
| if keyword in response_lower: | |
| return True | |
| # Check if model explicitly rejects the counterfactual answer | |
| if counterfactual_answer: | |
| cf_lower = counterfactual_answer.lower() | |
| # Look for patterns like "X is incorrect" or "not X" | |
| if f"not {cf_lower}" in response_lower or f"{cf_lower} is wrong" in response_lower: | |
| return True | |
| return False | |
| def corrects_error(self, response: str, correct_answer: str, counterfactual_answer: Optional[str]) -> bool: | |
| """ | |
| Check if the model corrects the error with the right answer. | |
| Args: | |
| response: The model's response. | |
| correct_answer: The actual correct answer. | |
| counterfactual_answer: The incorrect answer in the documents. | |
| Returns: | |
| True if the model corrected the error, False otherwise. | |
| """ | |
| # First check if the model provides the correct answer | |
| if not self.is_correct(response, correct_answer): | |
| return False | |
| # Make sure it's not just repeating the counterfactual | |
| if counterfactual_answer: | |
| norm_response = self.normalize_answer(response) | |
| norm_cf = self.normalize_answer(counterfactual_answer) | |
| # If response contains both, that's okay (it detected and corrected) | |
| # If it only contains the counterfactual, that's not correcting | |
| if norm_cf in norm_response and self.normalize_answer(correct_answer) not in norm_response: | |
| return False | |
| return True | |
| def evaluate_noise_robustness( | |
| self, | |
| responses: List[str], | |
| ground_truths: List[str], | |
| model_name: str, | |
| noise_ratio: float | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate noise robustness for a specific noise ratio. | |
| Args: | |
| responses: List of model responses. | |
| ground_truths: List of correct answers. | |
| model_name: Name of the model being evaluated. | |
| noise_ratio: The noise ratio tested (0.0 to 1.0). | |
| Returns: | |
| EvaluationResult with accuracy metrics. | |
| """ | |
| result = EvaluationResult( | |
| task_type=f"noise_robustness_{int(noise_ratio*100)}%", | |
| model_name=model_name, | |
| total_samples=len(responses) | |
| ) | |
| # Calculate accuracy for this noise level | |
| for response, truth in zip(responses, ground_truths): | |
| if self.is_correct(response, truth): | |
| result.correct += 1 | |
| else: | |
| result.incorrect += 1 | |
| return result | |
| def evaluate_negative_rejection( | |
| self, | |
| responses: List[str], | |
| model_name: str | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate negative rejection (ability to reject when no answer exists). | |
| Args: | |
| responses: List of model responses. | |
| model_name: Name of the model being evaluated. | |
| Returns: | |
| EvaluationResult with rejection rate. | |
| """ | |
| result = EvaluationResult( | |
| task_type="negative_rejection", | |
| model_name=model_name, | |
| total_samples=len(responses) | |
| ) | |
| for response in responses: | |
| if self.is_rejection(response): | |
| result.rejected += 1 | |
| else: | |
| result.incorrect += 1 # Should have rejected but didn't | |
| return result | |
| def evaluate_information_integration( | |
| self, | |
| responses: List[str], | |
| ground_truths: List[str], | |
| model_name: str | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate information integration (ability to combine info from multiple docs). | |
| Args: | |
| responses: List of model responses. | |
| ground_truths: List of correct answers. | |
| model_name: Name of the model being evaluated. | |
| Returns: | |
| EvaluationResult with accuracy metrics. | |
| """ | |
| result = EvaluationResult( | |
| task_type="information_integration", | |
| model_name=model_name, | |
| total_samples=len(responses) | |
| ) | |
| for response, truth in zip(responses, ground_truths): | |
| if self.is_correct(response, truth): | |
| result.correct += 1 | |
| else: | |
| result.incorrect += 1 | |
| return result | |
| def evaluate_counterfactual_robustness( | |
| self, | |
| responses: List[str], | |
| ground_truths: List[str], | |
| counterfactual_answers: List[str], | |
| model_name: str | |
| ) -> EvaluationResult: | |
| """ | |
| Evaluate counterfactual robustness. | |
| Args: | |
| responses: List of model responses. | |
| ground_truths: List of correct answers. | |
| counterfactual_answers: List of counterfactual (wrong) answers. | |
| model_name: Name of the model being evaluated. | |
| Returns: | |
| EvaluationResult with error detection and correction rates. | |
| """ | |
| result = EvaluationResult( | |
| task_type="counterfactual_robustness", | |
| model_name=model_name, | |
| total_samples=len(responses) | |
| ) | |
| for response, truth, cf_answer in zip(responses, ground_truths, counterfactual_answers): | |
| if self.detects_error(response, cf_answer): | |
| result.errors_detected += 1 | |
| if self.corrects_error(response, truth, cf_answer): | |
| result.errors_corrected += 1 | |
| result.correct += 1 | |
| else: | |
| result.incorrect += 1 | |
| return result | |
| def format_results_table(results: List[EvaluationResult]) -> str: | |
| """ | |
| Format evaluation results as a readable table. | |
| Args: | |
| results: List of evaluation results. | |
| Returns: | |
| Formatted string table. | |
| """ | |
| output = [] | |
| output.append("\n" + "="*80) | |
| output.append("RGB RAG EVALUATION RESULTS") | |
| output.append("="*80) | |
| # Group by task type | |
| by_task = defaultdict(list) | |
| for r in results: | |
| by_task[r.task_type].append(r) | |
| for task_type, task_results in by_task.items(): | |
| output.append(f"\n--- {task_type.upper().replace('_', ' ')} ---") | |
| if task_type == "noise_robustness": | |
| output.append(f"{'Model':<30} {'Accuracy':<10} {'Noise Level Breakdown'}") | |
| output.append("-" * 70) | |
| for r in task_results: | |
| noise_str = " | ".join([f"N{k}:{v:.1f}%" for k, v in r.accuracy_by_noise.items()]) | |
| output.append(f"{r.model_name:<30} {r.accuracy:>6.2f}% {noise_str}") | |
| elif task_type == "negative_rejection": | |
| output.append(f"{'Model':<30} {'Rejection Rate':<15} {'Samples'}") | |
| output.append("-" * 60) | |
| for r in task_results: | |
| output.append(f"{r.model_name:<30} {r.rejection_rate:>10.2f}% {r.total_samples}") | |
| elif task_type == "information_integration": | |
| output.append(f"{'Model':<30} {'Accuracy':<10} {'Correct/Total'}") | |
| output.append("-" * 60) | |
| for r in task_results: | |
| output.append(f"{r.model_name:<30} {r.accuracy:>6.2f}% {r.correct}/{r.total_samples}") | |
| elif task_type == "counterfactual_robustness": | |
| output.append(f"{'Model':<30} {'Error Det.':<12} {'Error Corr.':<12}") | |
| output.append("-" * 60) | |
| for r in task_results: | |
| output.append( | |
| f"{r.model_name:<30} {r.error_detection_rate:>8.2f}% {r.error_correction_rate:>8.2f}%" | |
| ) | |
| output.append("\n" + "="*80) | |
| return "\n".join(output) | |
| if __name__ == "__main__": | |
| # Test the evaluator | |
| evaluator = RGBEvaluator() | |
| # Test rejection detection | |
| test_responses = [ | |
| "I don't know the answer to that question.", | |
| "The capital of France is Paris.", | |
| "I cannot determine the answer from the given information.", | |
| "Based on the documents, the answer is 42.", | |
| ] | |
| print("Testing rejection detection:") | |
| for resp in test_responses: | |
| print(f" '{resp[:50]}...' -> Rejection: {evaluator.is_rejection(resp)}") | |