"""Advanced RAG evaluation using GPT-4 labeling prompts (RAGBench paper approach). This module implements the evaluation methodology from the RAGBench paper, using LLM-based labeling to assess: - Context Relevance - Context Utilization - Completeness - Adherence Reference: RAGBench Paper - arXiv:2407.11005 """ from typing import List, Dict, Optional, Tuple import json import re from dataclasses import dataclass import numpy as np from sklearn.metrics import mean_squared_error, roc_auc_score, auc, f1_score, precision_score, recall_score from sklearn.preprocessing import label_binarize import warnings @dataclass class SentenceSupportInfo: """Information about support for a response sentence.""" response_sentence_key: str explanation: str supporting_sentence_keys: List[str] fully_supported: bool @dataclass class GPTLabelingOutput: """Output from GPT labeling prompt.""" relevance_explanation: str all_relevant_sentence_keys: List[str] overall_supported_explanation: str overall_supported: bool sentence_support_information: List[Dict] all_utilized_sentence_keys: List[str] @dataclass class AdvancedTRACEScores: """Advanced TRACE scores with detailed metrics.""" context_relevance: float # Fraction of retrieved context relevant to query context_utilization: float # Fraction of retrieved context used in response completeness: float # Fraction of relevant info covered by response adherence: float # Whether response is grounded in context (no hallucinations) # Additional metrics from GPT labeling overall_supported: bool # Whether response is fully supported by documents num_fully_supported_sentences: int # Number of fully supported sentences num_partially_supported_sentences: int # Number of partially supported sentences num_unsupported_sentences: int # Number of unsupported sentences def to_dict(self) -> Dict: """Convert to dictionary.""" return { "context_relevance": float(self.context_relevance), "context_utilization": float(self.context_utilization), "completeness": float(self.completeness), "adherence": float(self.adherence), "average": self.average(), "rmse_aggregation": self.rmse_aggregation(), "rmse_per_metric": self.get_rmse_per_metric(), "overall_supported": self.overall_supported, "fully_supported_sentences": self.num_fully_supported_sentences, "partially_supported_sentences": self.num_partially_supported_sentences, "unsupported_sentences": self.num_unsupported_sentences, } def average(self) -> float: """Calculate average score.""" return (self.context_relevance + self.context_utilization + self.completeness + self.adherence) / 4 def rmse_aggregation(self) -> float: """Calculate RMSE aggregation across all four TRACE metrics. RMSE aggregation penalizes inconsistency across metrics. If all metrics are equal, RMSE is 0 (perfect consistency). If metrics vary, RMSE increases with variance. Formula: RMSE = sqrt(((R-μ)² + (A-μ)² + (C-μ)² + (U-μ)²) / 4) where μ = average of all metrics Returns: RMSE value (0-1), where 0 = perfect consistency """ metrics = [ self.context_relevance, self.context_utilization, self.completeness, self.adherence ] mean = self.average() # Calculate mean squared error from the mean squared_errors = [(m - mean) ** 2 for m in metrics] mse = np.mean(squared_errors) rmse = np.sqrt(mse) return float(rmse) def get_rmse_per_metric(self) -> Dict[str, float]: """Calculate RMSE contribution for each metric. Shows how much each metric deviates from the mean, indicating which metrics are inconsistent relative to overall performance. Formula for each metric: sqrt((metric - mean)²) Returns: Dict with RMSE values for each metric: { "context_relevance_rmse": float, "context_utilization_rmse": float, "completeness_rmse": float, "adherence_rmse": float, "max_deviation_metric": str (name of most inconsistent metric) } """ mean = self.average() metrics_dict = { "context_relevance": self.context_relevance, "context_utilization": self.context_utilization, "completeness": self.completeness, "adherence": self.adherence } # Calculate RMSE contribution for each metric rmse_per_metric = {} for metric_name, metric_value in metrics_dict.items(): deviation = metric_value - mean rmse_value = np.sqrt(deviation ** 2) rmse_per_metric[f"{metric_name}_rmse"] = float(rmse_value) # Find the metric with highest deviation (most inconsistent) max_metric = max(rmse_per_metric.items(), key=lambda x: x[1]) rmse_per_metric["max_deviation_metric"] = max_metric[0].replace("_rmse", "") return rmse_per_metric class RMSECalculator: """Calculate RMSE (Root Mean Squared Error) for evaluation metrics.""" @staticmethod def compute_rmse_for_metric(predicted: List[float], ground_truth: List[float]) -> float: """Compute RMSE for a single metric. Args: predicted: List of predicted metric values ground_truth: List of ground truth metric values Returns: RMSE value """ if len(predicted) != len(ground_truth): raise ValueError("Predicted and ground truth must have same length") if len(predicted) == 0: return 0.0 try: mse = mean_squared_error(ground_truth, predicted) rmse = np.sqrt(mse) return float(rmse) except Exception as e: warnings.warn(f"Error computing RMSE: {e}") return 0.0 @staticmethod def compute_rmse_single_trace_evaluation( predicted_scores: AdvancedTRACEScores, ground_truth_scores: AdvancedTRACEScores ) -> Dict[str, float]: """Compute RMSE metrics for a single TRACE evaluation. Args: predicted_scores: AdvancedTRACEScores from evaluation ground_truth_scores: AdvancedTRACEScores from ground truth Returns: Dictionary with individual metric RMSE and aggregated RMSE """ metrics = { "context_relevance": (predicted_scores.context_relevance, ground_truth_scores.context_relevance), "context_utilization": (predicted_scores.context_utilization, ground_truth_scores.context_utilization), "completeness": (predicted_scores.completeness, ground_truth_scores.completeness), "adherence": (predicted_scores.adherence, ground_truth_scores.adherence) } rmse_per_metric = {} for metric_name, (pred, truth) in metrics.items(): # Calculate RMSE for this single metric comparison rmse_per_metric[metric_name] = float((pred - truth) ** 2) ** 0.5 # Aggregated RMSE: root mean square of all metric RMSEs aggregated_rmse = np.sqrt(np.mean(list(rmse_per_metric.values()))) if rmse_per_metric else 0.0 return { "per_metric": rmse_per_metric, "aggregated_rmse": float(aggregated_rmse) } @staticmethod def compute_rmse_aggregation_for_batch(results: List[Dict]) -> Dict: """Compute RMSE aggregation scores for batch evaluation (consistency within metrics). Measures consistency of each TRACE score across evaluations. Args: results: List of evaluation results with metrics Returns: Dictionary with RMSE aggregation stats for each metric """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] aggregation_stats = {} for metric in metrics: values = [] for result in results: if "metrics" in result and metric in result["metrics"]: values.append(result["metrics"][metric]) if len(values) > 0: mean_val = np.mean(values) std_val = np.std(values) # RMSE aggregation = std dev (how much metric varies across evaluations) aggregation_stats[metric] = { "mean": float(mean_val), "std_dev": float(std_val), "min": float(np.min(values)), "max": float(np.max(values)), "variance": float(std_val ** 2), "count": len(values) } return aggregation_stats @staticmethod def compute_rmse_all_metrics(results: List[Dict]) -> Dict[str, float]: """Compute RMSE for all metrics across multiple test cases. Args: results: List of evaluation results with predicted and ground truth scores Returns: Dictionary mapping metric names to RMSE values """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] rmse_results = {} for metric in metrics: predicted = [] ground_truth = [] for result in results: if "metrics" in result and metric in result["metrics"]: predicted.append(result["metrics"][metric]) # Check for ground truth in different possible locations if (("ground_truth_scores" in result and metric in result["ground_truth_scores"])): ground_truth.append(result["ground_truth_scores"][metric]) else: # Skip this result if no ground truth available predicted.pop() if len(predicted) > 0 and len(ground_truth) > 0: rmse_results[metric] = RMSECalculator.compute_rmse_for_metric( predicted, ground_truth ) # Compute average RMSE across all metrics if rmse_results: rmse_results["average"] = np.mean(list(rmse_results.values())) return rmse_results @staticmethod def compute_trace_rmse_aggregation(results: List[Dict]) -> Dict[str, float]: """Compute RMSE aggregation across TRACE metrics for multiple evaluations. This method computes consistency metrics across evaluations: - Calculates individual metric RMSEs - Computes aggregate RMSE showing consistency - Returns overall evaluation quality metrics Args: results: List of evaluation results with metrics and ground truth Returns: Dictionary with: - per_metric_rmse: RMSE for each of the 4 TRACE metrics - aggregated_rmse: Overall consistency metric (0 = perfect, higher = less consistent) - consistency_score: Inverse of aggregated_rmse (1 = perfect, 0 = no consistency) """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] per_metric_rmse = {} # Compute per-metric RMSE for metric in metrics: predicted = [] ground_truth = [] for result in results: if "metrics" in result and metric in result["metrics"]: predicted.append(result["metrics"][metric]) if "ground_truth_scores" in result and metric in result["ground_truth_scores"]: ground_truth.append(result["ground_truth_scores"][metric]) else: if predicted: predicted.pop() if len(predicted) > 0 and len(ground_truth) > 0: per_metric_rmse[metric] = RMSECalculator.compute_rmse_for_metric( predicted, ground_truth ) # Aggregate RMSE across metrics if per_metric_rmse: metric_rmses = list(per_metric_rmse.values()) aggregated_rmse = np.sqrt(np.mean([r ** 2 for r in metric_rmses])) consistency_score = 1.0 - min(aggregated_rmse, 1.0) # Invert and cap at 0 else: aggregated_rmse = 0.0 consistency_score = 0.0 return { "per_metric_rmse": per_metric_rmse, "aggregated_rmse": float(aggregated_rmse), "consistency_score": float(consistency_score), # 0-1, where 1 = perfect consistency "num_evaluations": len(results), "evaluated_metrics": metrics } @staticmethod def get_per_metric_rmse_breakdown(results: List[Dict]) -> Dict: """Compute detailed RMSE breakdown for each TRACE metric. Analyzes RMSE for each metric individually to identify which metrics have the highest prediction errors. Useful for understanding which parts of the evaluation system need improvement. Args: results: List of evaluation results with metrics and ground truth Returns: Dictionary with: - per_metric: RMSE for each metric with % contribution - worst_performing_metric: Metric with highest RMSE - best_performing_metric: Metric with lowest RMSE - metric_details: Detailed stats for each metric """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] per_metric_rmse = {} metric_details = {} for metric in metrics: predicted = [] ground_truth = [] for result in results: if "metrics" in result and metric in result["metrics"]: predicted.append(result["metrics"][metric]) if "ground_truth_scores" in result and metric in result["ground_truth_scores"]: ground_truth.append(result["ground_truth_scores"][metric]) else: if predicted: predicted.pop() if len(predicted) > 0 and len(ground_truth) > 0: rmse = RMSECalculator.compute_rmse_for_metric(predicted, ground_truth) per_metric_rmse[metric] = float(rmse) # Calculate detailed statistics errors = [abs(p - t) for p, t in zip(predicted, ground_truth)] metric_details[metric] = { "rmse": float(rmse), "mean_absolute_error": float(np.mean(errors)), "max_error": float(np.max(errors)), "min_error": float(np.min(errors)), "std_dev": float(np.std(errors)), "num_samples": len(predicted) } # Calculate percentage contribution total_rmse_squared = sum(r ** 2 for r in per_metric_rmse.values()) if per_metric_rmse else 0 for metric in per_metric_rmse: if total_rmse_squared > 0: percentage = (per_metric_rmse[metric] ** 2 / total_rmse_squared) * 100 metric_details[metric]["rmse_contribution_percent"] = float(percentage) # Find best and worst metrics worst_metric = max(per_metric_rmse.items(), key=lambda x: x[1]) if per_metric_rmse else (None, 0) best_metric = min(per_metric_rmse.items(), key=lambda x: x[1]) if per_metric_rmse else (None, 0) return { "per_metric": per_metric_rmse, "worst_performing_metric": worst_metric[0], "worst_rmse": float(worst_metric[1]), "best_performing_metric": best_metric[0], "best_rmse": float(best_metric[1]), "metric_details": metric_details, "num_evaluations": len(results) } class AUCROCCalculator: """Calculate AUCROC (Area Under ROC Curve) for binary classification metrics.""" @staticmethod def binary_labels_from_threshold(scores: List[float], threshold: float = 0.5) -> List[int]: """Convert continuous scores to binary labels using threshold. Args: scores: List of continuous scores threshold: Threshold for binary classification Returns: Binary labels (0 or 1) """ return [1 if score >= threshold else 0 for score in scores] @staticmethod def compute_auc_for_metric(predicted: List[float], ground_truth: List[float]) -> float: """Compute AUCROC for a single metric. Args: predicted: List of predicted metric values (0-1) ground_truth: List of ground truth metric values (0-1) Returns: AUCROC value (0-1), or 0 if computation fails """ if len(predicted) != len(ground_truth): raise ValueError("Predicted and ground truth must have same length") if len(predicted) <= 1: return 0.0 try: # Convert to binary labels using 0.5 threshold ground_truth_binary = AUCROCCalculator.binary_labels_from_threshold( ground_truth, threshold=0.5 ) # Check if we have both classes in ground truth if len(set(ground_truth_binary)) < 2: # Only one class present, cannot compute AUCROC return 0.0 # Compute AUCROC auc_score = roc_auc_score(ground_truth_binary, predicted) return float(auc_score) except Exception as e: warnings.warn(f"Error computing AUCROC: {e}") return 0.0 @staticmethod def compute_per_metric_statistics(results: List[Dict]) -> Dict: """Compute per-metric statistics for batch evaluation. Provides detailed statistics on each TRACE metric without requiring ground truth. Args: results: List of evaluation results with metrics Returns: Dictionary with detailed statistics for each metric """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] stats = {} for metric in metrics: values = [] for result in results: if "metrics" in result and metric in result["metrics"]: values.append(result["metrics"][metric]) if len(values) > 0: # Count how many perfect (1.0) and poor (<0.3) scores perfect_count = sum(1 for v in values if v >= 0.95) poor_count = sum(1 for v in values if v < 0.3) stats[metric] = { "mean": float(np.mean(values)), "median": float(np.median(values)), "std_dev": float(np.std(values)), "min": float(np.min(values)), "max": float(np.max(values)), "percentile_25": float(np.percentile(values, 25)), "percentile_75": float(np.percentile(values, 75)), "perfect_count": int(perfect_count), "poor_count": int(poor_count), "sample_count": len(values) } return stats @staticmethod def compute_auc_all_metrics(results: List[Dict]) -> Dict[str, float]: """Compute AUCROC for all metrics across multiple test cases. Args: results: List of evaluation results with predicted and ground truth scores Returns: Dictionary mapping metric names to AUCROC values """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] auc_results = {} for metric in metrics: predicted = [] ground_truth = [] for result in results: if "metrics" in result and metric in result["metrics"]: predicted.append(result["metrics"][metric]) # Check for ground truth in different possible locations if (("ground_truth_scores" in result and metric in result["ground_truth_scores"])): ground_truth.append(result["ground_truth_scores"][metric]) else: # Skip this result if no ground truth available predicted.pop() if len(predicted) > 1 and len(ground_truth) > 1: try: auc_results[metric] = AUCROCCalculator.compute_auc_for_metric( predicted, ground_truth ) except Exception: auc_results[metric] = 0.0 # Compute average AUCROC across all metrics if auc_results: auc_results["average"] = np.mean(list(auc_results.values())) return auc_results class F1ScoreCalculator: """Calculate F1Score for evaluation metrics (especially for adherence).""" @staticmethod def compute_f1_for_metric(predicted: List[float], ground_truth: List[float], threshold: float = 0.5) -> Dict[str, float]: """Compute F1 Score for a single metric using binary classification. Converts continuous scores to binary labels using threshold, then calculates: - Precision: TP / (TP + FP) - Recall: TP / (TP + FN) - F1 Score: 2 * (Precision * Recall) / (Precision + Recall) Args: predicted: List of predicted metric values (0-1) ground_truth: List of ground truth metric values (0-1) threshold: Threshold for binary classification (default 0.5) Returns: Dictionary with F1, Precision, Recall scores """ if len(predicted) != len(ground_truth): raise ValueError("Predicted and ground truth must have same length") if len(predicted) <= 1: return {"f1_score": 0.0, "precision": 0.0, "recall": 0.0} try: # Convert continuous scores to binary labels pred_binary = [1 if score >= threshold else 0 for score in predicted] truth_binary = [1 if score >= threshold else 0 for score in ground_truth] # Calculate metrics f1 = f1_score(truth_binary, pred_binary, zero_division=0) precision = precision_score(truth_binary, pred_binary, zero_division=0) recall = recall_score(truth_binary, pred_binary, zero_division=0) return { "f1_score": float(f1), "precision": float(precision), "recall": float(recall) } except Exception as e: warnings.warn(f"Error computing F1 Score: {e}") return {"f1_score": 0.0, "precision": 0.0, "recall": 0.0} @staticmethod def compute_adherence_f1(results: List[Dict]) -> Dict[str, float]: """Compute F1 Score specifically for adherence metric aggregation. Adherence is a binary metric (0 or 1), so F1 Score is particularly relevant. Measures how well the predicted adherence scores match ground truth. Args: results: List of evaluation results with predicted and ground truth scores Returns: Dictionary with: - adherence_f1: F1 Score for adherence - adherence_precision: Precision for adherence - adherence_recall: Recall for adherence - num_evaluations: Number of evaluations used """ predicted = [] ground_truth = [] for result in results: if "metrics" in result and "adherence" in result["metrics"]: predicted.append(result["metrics"]["adherence"]) if "ground_truth_scores" in result and "adherence" in result["ground_truth_scores"]: ground_truth.append(result["ground_truth_scores"]["adherence"]) else: if predicted: predicted.pop() if len(predicted) == 0 or len(ground_truth) == 0: return { "adherence_f1": 0.0, "adherence_precision": 0.0, "adherence_recall": 0.0, "num_evaluations": 0 } f1_metrics = F1ScoreCalculator.compute_f1_for_metric(predicted, ground_truth) return { "adherence_f1": f1_metrics["f1_score"], "adherence_precision": f1_metrics["precision"], "adherence_recall": f1_metrics["recall"], "num_evaluations": len(predicted) } @staticmethod def compute_f1_all_metrics(results: List[Dict]) -> Dict[str, float]: """Compute F1 Score for all TRACE metrics. Args: results: List of evaluation results with predicted and ground truth scores Returns: Dictionary mapping metric names to F1 Scores with precision/recall """ metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] f1_results = {} for metric in metrics: predicted = [] ground_truth = [] for result in results: if "metrics" in result and metric in result["metrics"]: predicted.append(result["metrics"][metric]) if "ground_truth_scores" in result and metric in result["ground_truth_scores"]: ground_truth.append(result["ground_truth_scores"][metric]) else: if predicted: predicted.pop() if len(predicted) > 0 and len(ground_truth) > 0: f1_metrics = F1ScoreCalculator.compute_f1_for_metric(predicted, ground_truth) f1_results[f"{metric}_f1"] = f1_metrics["f1_score"] f1_results[f"{metric}_precision"] = f1_metrics["precision"] f1_results[f"{metric}_recall"] = f1_metrics["recall"] # Compute average F1 across all metrics f1_scores = [v for k, v in f1_results.items() if k.endswith("_f1")] if f1_scores: f1_results["average_f1"] = float(np.mean(f1_scores)) return f1_results class DocumentSentencizer: """Split documents into sentences with keys (0a, 0b, 1a, etc.).""" @staticmethod def sentencize_documents(documents: List[str]) -> Tuple[List[Dict], str]: """Split documents into sentences with keys. Args: documents: List of document texts Returns: Tuple of (sentence_list, formatted_string) Where sentence_list = [{"key": "0a", "text": "..."}, ...] """ sentence_list = [] formatted_parts = [] # Split by common sentence endings sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])|(?<=[.!?])\s*$' for doc_idx, document in enumerate(documents): sentences = re.split(sentence_pattern, document.strip()) sentences = [s.strip() for s in sentences if s.strip()] for sent_idx, sentence in enumerate(sentences): # Generate key like 0a, 0b, 1a, 1b, etc. key = f"{doc_idx}{chr(97 + (sent_idx % 26))}" sentence_list.append({"key": key, "text": sentence}) formatted_parts.append(f"{key}. {sentence}") formatted_string = "\n".join(formatted_parts) return sentence_list, formatted_string @staticmethod def sentencize_response(response: str) -> Tuple[List[Dict], str]: """Split response into sentences with keys (a, b, c, etc.). Args: response: Response text Returns: Tuple of (sentence_list, formatted_string) """ sentence_list = [] formatted_parts = [] # Split by sentence endings sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])|(?<=[.!?])\s*$' sentences = re.split(sentence_pattern, response.strip()) sentences = [s.strip() for s in sentences if s.strip()] for sent_idx, sentence in enumerate(sentences): key = chr(97 + (sent_idx % 26)) # a, b, c, ... sentence_list.append({"key": key, "text": sentence}) formatted_parts.append(f"{key}. {sentence}") formatted_string = "\n".join(formatted_parts) return sentence_list, formatted_string class GPTLabelingPromptGenerator: """Generate structured GPT labeling prompts for factual evaluation and citation audit.""" # Improved Template with clear hierarchy and explicit constraints LABELING_PROMPT_TEMPLATE = """### ROLE You are a Fact-Checking and Citation Specialist. Your task is to perform a rigorous audit of a response against provided documents to determine its accuracy, relevance, and level of support. ### TASK OVERVIEW 1. **Analyze Documents**: Review the provided documents and identify information relevant to the user's question. 2. **Evaluate Response**: Review the provided answer sentence-by-sentence. 3. **Verify Support**: Map each answer sentence to specific supporting sentences in the documents. 4. **Identify Utilization**: Determine which document sentences were actually used (directly or implicitly) to form the answer. ### INPUT DATA **Documents (Split into Sentences with Keys):** ''' {documents} ''' **The Original Question:** ''' {question} ''' **The Answer to Evaluate (Split into Sentences with Keys):** ''' {answer} ''' ### OUTPUT REQUIREMENTS You must respond with a valid JSON object. **Constraints:** - Do NOT include any preamble or postamble (e.g., "Here is the analysis..."). - Do NOT wrap the JSON in markdown code blocks (e.g., no ```json). - Use proper escaping for quotes and newlines within JSON strings. ### JSON SCHEMA {{ "relevance_explanation": "A step-by-step breakdown of document information and its utility for the question.", "all_relevant_sentence_keys": ["List of doc keys pertinent to the question, regardless of use in answer"], "overall_supported_explanation": "Claim-by-claim assessment of the response's accuracy before a final conclusion.", "overall_supported": boolean, "sentence_support_information": [ {{ "response_sentence_key": "string", "explanation": "Why the sentence is or is not supported.", "supporting_sentence_keys": ["doc_keys", "OR: 'supported_without_sentence', 'general', 'well_known_fact', 'numerical_reasoning'"], "fully_supported": boolean }} ], "all_utilized_sentence_keys": ["List of doc keys actually used to construct the answer"] }}""" @staticmethod def generate_labeling_prompt( question: str, response: str, documents: List[str] ) -> Tuple[str, List[Dict], List[Dict]]: """Generate the high-fidelity GPT labeling prompt. Args: question: The original user question. response: The LLM response to evaluate. documents: List of raw retrieved documents. Returns: A tuple of (formatted_prompt, list_of_doc_sentences, list_of_resp_sentences) """ # Sentencize documents and response doc_sentences, doc_formatted = DocumentSentencizer.sentencize_documents(documents) resp_sentences, resp_formatted = DocumentSentencizer.sentencize_response(response) # Inject data into the structured template prompt = GPTLabelingPromptGenerator.LABELING_PROMPT_TEMPLATE.format( documents=doc_formatted, question=question, answer=resp_formatted ) return prompt, doc_sentences, resp_sentences class AdvancedRAGEvaluator: """Advanced RAG evaluator using GPT labeling prompts.""" def __init__(self, llm_client=None, chunking_strategy: Optional[str] = None, embedding_model: Optional[str] = None, chunk_size: Optional[int] = None, chunk_overlap: Optional[int] = None): """Initialize evaluator. Args: llm_client: LLM client for generating labels chunking_strategy: Chunking strategy used embedding_model: Embedding model used chunk_size: Chunk size used chunk_overlap: Chunk overlap used """ self.llm_client = llm_client self.chunking_strategy = chunking_strategy self.embedding_model = embedding_model self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def evaluate( self, question: str, response: str, retrieved_documents: List[str], ground_truth: Optional[str] = None ) -> Tuple[AdvancedTRACEScores, Optional[Dict]]: """Evaluate response using GPT labeling approach. Args: question: User question response: LLM response retrieved_documents: Retrieved context documents ground_truth: Optional ground truth answer Returns: Tuple of (AdvancedTRACEScores, llm_request_info dict) """ # Generate labeling prompt and get LLM labels with request info gpt_result = self._get_gpt_labels(question, response, retrieved_documents) llm_request_info = {} if not gpt_result: # Fallback to rule-based evaluation scores = self._fallback_evaluation(question, response, retrieved_documents) return scores, llm_request_info gpt_labels = gpt_result.get("labels") llm_request_info = gpt_result.get("llm_request_info", {}) if not gpt_labels: # Fallback to rule-based evaluation scores = self._fallback_evaluation(question, response, retrieved_documents) return scores, llm_request_info # Compute metrics from GPT labels context_relevance = self._compute_context_relevance(gpt_labels) context_utilization = self._compute_context_utilization(gpt_labels) completeness = self._compute_completeness(gpt_labels, ground_truth) adherence = self._compute_adherence(gpt_labels) # Count supported sentences fully_supported = sum(1 for s in gpt_labels.sentence_support_information if s.get("fully_supported", False)) partially_supported = sum(1 for s in gpt_labels.sentence_support_information if not s.get("fully_supported", False) and s.get("supporting_sentence_keys", [])) unsupported = sum(1 for s in gpt_labels.sentence_support_information if not s.get("supporting_sentence_keys", [])) scores = AdvancedTRACEScores( context_relevance=context_relevance, context_utilization=context_utilization, completeness=completeness, adherence=adherence, overall_supported=gpt_labels.overall_supported, num_fully_supported_sentences=fully_supported, num_partially_supported_sentences=partially_supported, num_unsupported_sentences=unsupported ) return scores, llm_request_info def _get_gpt_labels(self, question: str, response: str, documents: List[str]) -> Optional[Dict]: """Get GPT labels using labeling prompt with rate limiting for 30 RPM. Args: question: User question response: LLM response documents: Retrieved documents Returns: Dict containing 'labels' (GPTLabelingOutput) and 'llm_request_info' with complete audit trail """ if not self.llm_client: print("[WARN] No LLM client available - using fallback evaluation") return None try: # Generate prompt prompt, doc_sentences, resp_sentences = ( GPTLabelingPromptGenerator.generate_labeling_prompt( question, response, documents ) ) # Store LLM request info for audit trail llm_request_info = { "query": question, "context_documents": documents, "llm_response": response, "labeling_prompt": prompt, "model": getattr(self.llm_client, 'model_name', 'groq-default'), "temperature": 0.0, "max_tokens": 2048 } # Log rate limiting info before making API call print(f"\n[EVALUATION] Making GPT labeling API call...") print(f"[EVALUATION] This respects the 30 RPM rate limit") # Call LLM to get labels (rate limiting is handled internally) llm_response = self.llm_client.generate( prompt=prompt, max_tokens=2048, temperature=0.0 # Deterministic for consistent labeling ) # Store full LLM response in request info llm_request_info["full_llm_response"] = llm_response # Log the actual response print(f"\n[LLM RESPONSE] {llm_response}\n") # Check if response is empty if not llm_response or not llm_response.strip(): print(f"[WARN] Empty LLM response received") return {"labels": None, "llm_request_info": llm_request_info} # Parse JSON response try: # Try to extract JSON from response (in case there's surrounding text) json_str = llm_response.strip() # If response contains markdown code blocks, extract the JSON if "```json" in json_str: json_str = json_str.split("```json")[1].split("```")[0].strip() elif "```" in json_str: json_str = json_str.split("```")[1].split("```")[0].strip() labels_dict = json.loads(json_str) gpt_output = GPTLabelingOutput( relevance_explanation=labels_dict.get("relevance_explanation", ""), all_relevant_sentence_keys=labels_dict.get("all_relevant_sentence_keys", []), overall_supported_explanation=labels_dict.get("overall_supported_explanation", ""), overall_supported=labels_dict.get("overall_supported", False), sentence_support_information=labels_dict.get("sentence_support_information", []), all_utilized_sentence_keys=labels_dict.get("all_utilized_sentence_keys", []) ) return {"labels": gpt_output, "llm_request_info": llm_request_info} except (json.JSONDecodeError, ValueError, IndexError) as e: print(f"[WARN] Failed to parse LLM response: {e}") print(f"[WARN] Raw response: {llm_response[:200]}") return {"labels": None, "llm_request_info": llm_request_info} except Exception as e: print(f"[WARN] Error getting GPT labels: {e}") return None def _compute_context_relevance(self, gpt_labels: GPTLabelingOutput) -> float: """Compute context relevance metric. Context Relevance = Number of relevant sentences / Total sentences """ if not gpt_labels.all_relevant_sentence_keys: return 0.0 return min(1.0, len(gpt_labels.all_relevant_sentence_keys) / 20.0) # Normalize def _compute_context_utilization(self, gpt_labels: GPTLabelingOutput) -> float: """Compute context utilization metric. Context Utilization = Number of utilized sentences / Number of relevant sentences """ relevant_count = len(gpt_labels.all_relevant_sentence_keys) utilized_count = len(gpt_labels.all_utilized_sentence_keys) if relevant_count == 0: return 0.0 return min(1.0, utilized_count / relevant_count) def _compute_completeness(self, gpt_labels: GPTLabelingOutput, ground_truth: Optional[str] = None) -> float: """Compute completeness metric. Completeness = Relevant sentences used / All relevant sentences """ relevant_set = set(gpt_labels.all_relevant_sentence_keys) utilized_set = set(gpt_labels.all_utilized_sentence_keys) intersection = len(relevant_set & utilized_set) if len(relevant_set) == 0: return 1.0 if len(utilized_set) == 0 else 0.0 return intersection / len(relevant_set) def _compute_adherence(self, gpt_labels: GPTLabelingOutput) -> float: """Compute adherence metric (Boolean: 1.0 = fully grounded, 0.0 = contains hallucination). Per RAGBench paper: Adherence is whether ALL response sentences are fully supported by context. If even ONE sentence is not fully supported, adherence = 0.0 """ total_sentences = len(gpt_labels.sentence_support_information) if total_sentences == 0: return 1.0 # Check if ALL sentences are fully supported fully_supported_count = sum( 1 for s in gpt_labels.sentence_support_information if s.get("fully_supported", False) ) # Boolean: 1.0 if all sentences are fully supported, 0.0 if any sentence is not fully supported return 1.0 if fully_supported_count == total_sentences else 0.0 def _fallback_evaluation(self, question: str, response: str, documents: List[str]) -> AdvancedTRACEScores: """Fallback rule-based evaluation when LLM unavailable.""" # Simple heuristics when LLM not available response_words = set(response.lower().split()) doc_words = set() for doc in documents: doc_words.update(doc.lower().split()) overlap = len(response_words & doc_words) / max(len(response_words), 1) return AdvancedTRACEScores( context_relevance=overlap, context_utilization=overlap, completeness=overlap, adherence=overlap, overall_supported=overlap > 0.5, num_fully_supported_sentences=0, num_partially_supported_sentences=0, num_unsupported_sentences=0 ) def evaluate_batch(self, test_cases: List[Dict], checkpoint_file: str = None, resume: bool = True) -> Dict: """Evaluate multiple test cases with checkpoint support. Args: test_cases: List of test cases with question, response, etc. checkpoint_file: Optional file to save/resume progress resume: Whether to resume from checkpoint if exists Returns: Dictionary with aggregated scores and detailed results """ all_scores = [] detailed_results = [] start_index = 0 # Try to resume from checkpoint if checkpoint_file and resume: try: import os if os.path.exists(checkpoint_file): with open(checkpoint_file, 'r') as f: checkpoint_data = json.load(f) detailed_results = checkpoint_data.get('detailed_results', []) start_index = len(detailed_results) print(f"[CHECKPOINT] Resuming from checkpoint at sample {start_index}/{len(test_cases)}") except Exception as e: print(f"[CHECKPOINT] Could not load checkpoint: {e}") for i, test_case in enumerate(test_cases): # Skip already processed samples if i < start_index: continue print(f"Evaluating test case {i+1}/{len(test_cases)}") question = test_case.get("query", "") response = test_case.get("response", "") documents = test_case.get("retrieved_documents", []) ground_truth = response # Use response as ground truth reference answer ground_truth_scores = test_case.get("ground_truth_scores", {}) # Extract RAGBench ground truth scores # evaluate now returns (scores, llm_request_info) scores, llm_request_info = self.evaluate(question, response, documents, ground_truth) all_scores.append(scores) # Store detailed results with ground truth for RMSE/AUCROC computation result_dict = { "query_id": i + 1, "question": question, "prompt": llm_request_info.get("labeling_prompt", "") if llm_request_info else "", "llm_response": llm_request_info.get("full_llm_response", "") if llm_request_info else "", "metrics": scores.to_dict(), "ground_truth_scores": ground_truth_scores # Include RAGBench ground truth for RMSE/AUCROC } detailed_results.append(result_dict) # Save checkpoint after each evaluation (for resume capability) if checkpoint_file and (i + 1) % 5 == 0: # Save every 5 samples try: checkpoint_data = { 'detailed_results': detailed_results, 'last_index': i + 1, 'total_samples': len(test_cases) } with open(checkpoint_file, 'w') as f: json.dump(checkpoint_data, f, default=str) print(f"[CHECKPOINT] Saved progress at sample {i + 1}/{len(test_cases)}") except Exception as e: print(f"[CHECKPOINT] Failed to save: {e}") # Aggregate scores - convert dictionary results to proper format scores_dicts = [s.to_dict() for s in all_scores] # Extract metric values safely from dictionaries context_relevance_vals = [s.get("context_relevance", 0) for s in scores_dicts] context_utilization_vals = [s.get("context_utilization", 0) for s in scores_dicts] completeness_vals = [s.get("completeness", 0) for s in scores_dicts] adherence_vals = [s.get("adherence", 0) for s in scores_dicts] average_vals = [s.get("average", 0) for s in scores_dicts] results = { "context_relevance": float(np.mean(context_relevance_vals)) if context_relevance_vals else 0.0, "context_utilization": float(np.mean(context_utilization_vals)) if context_utilization_vals else 0.0, "completeness": float(np.mean(completeness_vals)) if completeness_vals else 0.0, "adherence": float(np.mean(adherence_vals)) if adherence_vals else 0.0, "average": float(np.mean(average_vals)) if average_vals else 0.0, "num_samples": len(test_cases), "detailed_results": detailed_results, "evaluation_config": { "chunking_strategy": self.chunking_strategy, "embedding_model": self.embedding_model, "chunk_size": self.chunk_size, "chunk_overlap": self.chunk_overlap, "evaluation_method": "gpt_labeling_prompts" } } # Compute RMSE aggregation and per-metric statistics rmse_metrics = RMSECalculator.compute_rmse_aggregation_for_batch(detailed_results) per_metric_stats = AUCROCCalculator.compute_per_metric_statistics(detailed_results) if rmse_metrics: results["rmse_metrics"] = rmse_metrics if per_metric_stats: results["per_metric_statistics"] = per_metric_stats # Compute RMSE against RAGBench ground truth (per RAGBench paper requirement) # This compares predicted scores vs original scores in the dataset rmse_vs_ground_truth = RMSECalculator.compute_trace_rmse_aggregation(detailed_results) if rmse_vs_ground_truth and rmse_vs_ground_truth.get("per_metric_rmse"): results["rmse_vs_ground_truth"] = rmse_vs_ground_truth # Compute AUCROC against RAGBench ground truth (per RAGBench paper requirement) aucroc_vs_ground_truth = AUCROCCalculator.compute_auc_all_metrics(detailed_results) if aucroc_vs_ground_truth: results["aucroc_vs_ground_truth"] = aucroc_vs_ground_truth # Compute F1 Score for adherence aggregation adherence_f1_scores = F1ScoreCalculator.compute_adherence_f1(detailed_results) if adherence_f1_scores: results["adherence_f1_scores"] = adherence_f1_scores # Compute F1 Scores for all metrics f1_all_metrics = F1ScoreCalculator.compute_f1_all_metrics(detailed_results) if f1_all_metrics: results["f1_scores"] = f1_all_metrics return results