Spaces:
Sleeping
Sleeping
| """Advanced RAG evaluation using GPT-4 labeling prompts (RAGBench paper approach). | |
| This module implements the evaluation methodology from the RAGBench paper, | |
| using LLM-based labeling to assess: | |
| - Context Relevance | |
| - Context Utilization | |
| - Completeness | |
| - Adherence | |
| Reference: RAGBench Paper - arXiv:2407.11005 | |
| """ | |
| from typing import List, Dict, Optional, Tuple | |
| import json | |
| import re | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from sklearn.metrics import mean_squared_error, roc_auc_score, auc, f1_score, precision_score, recall_score | |
| from sklearn.preprocessing import label_binarize | |
| import warnings | |
| class SentenceSupportInfo: | |
| """Information about support for a response sentence.""" | |
| response_sentence_key: str | |
| explanation: str | |
| supporting_sentence_keys: List[str] | |
| fully_supported: bool | |
| class GPTLabelingOutput: | |
| """Output from GPT labeling prompt.""" | |
| relevance_explanation: str | |
| all_relevant_sentence_keys: List[str] | |
| overall_supported_explanation: str | |
| overall_supported: bool | |
| sentence_support_information: List[Dict] | |
| all_utilized_sentence_keys: List[str] | |
| class AdvancedTRACEScores: | |
| """Advanced TRACE scores with detailed metrics.""" | |
| context_relevance: float # Fraction of retrieved context relevant to query | |
| context_utilization: float # Fraction of retrieved context used in response | |
| completeness: float # Fraction of relevant info covered by response | |
| adherence: float # Whether response is grounded in context (no hallucinations) | |
| # Additional metrics from GPT labeling | |
| overall_supported: bool # Whether response is fully supported by documents | |
| num_fully_supported_sentences: int # Number of fully supported sentences | |
| num_partially_supported_sentences: int # Number of partially supported sentences | |
| num_unsupported_sentences: int # Number of unsupported sentences | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary.""" | |
| return { | |
| "context_relevance": float(self.context_relevance), | |
| "context_utilization": float(self.context_utilization), | |
| "completeness": float(self.completeness), | |
| "adherence": float(self.adherence), | |
| "average": self.average(), | |
| "rmse_aggregation": self.rmse_aggregation(), | |
| "rmse_per_metric": self.get_rmse_per_metric(), | |
| "overall_supported": self.overall_supported, | |
| "fully_supported_sentences": self.num_fully_supported_sentences, | |
| "partially_supported_sentences": self.num_partially_supported_sentences, | |
| "unsupported_sentences": self.num_unsupported_sentences, | |
| } | |
| def average(self) -> float: | |
| """Calculate average score.""" | |
| return (self.context_relevance + self.context_utilization + | |
| self.completeness + self.adherence) / 4 | |
| def rmse_aggregation(self) -> float: | |
| """Calculate RMSE aggregation across all four TRACE metrics. | |
| RMSE aggregation penalizes inconsistency across metrics. | |
| If all metrics are equal, RMSE is 0 (perfect consistency). | |
| If metrics vary, RMSE increases with variance. | |
| Formula: RMSE = sqrt(((R-μ)² + (A-μ)² + (C-μ)² + (U-μ)²) / 4) | |
| where μ = average of all metrics | |
| Returns: | |
| RMSE value (0-1), where 0 = perfect consistency | |
| """ | |
| metrics = [ | |
| self.context_relevance, | |
| self.context_utilization, | |
| self.completeness, | |
| self.adherence | |
| ] | |
| mean = self.average() | |
| # Calculate mean squared error from the mean | |
| squared_errors = [(m - mean) ** 2 for m in metrics] | |
| mse = np.mean(squared_errors) | |
| rmse = np.sqrt(mse) | |
| return float(rmse) | |
| def get_rmse_per_metric(self) -> Dict[str, float]: | |
| """Calculate RMSE contribution for each metric. | |
| Shows how much each metric deviates from the mean, indicating | |
| which metrics are inconsistent relative to overall performance. | |
| Formula for each metric: sqrt((metric - mean)²) | |
| Returns: | |
| Dict with RMSE values for each metric: | |
| { | |
| "context_relevance_rmse": float, | |
| "context_utilization_rmse": float, | |
| "completeness_rmse": float, | |
| "adherence_rmse": float, | |
| "max_deviation_metric": str (name of most inconsistent metric) | |
| } | |
| """ | |
| mean = self.average() | |
| metrics_dict = { | |
| "context_relevance": self.context_relevance, | |
| "context_utilization": self.context_utilization, | |
| "completeness": self.completeness, | |
| "adherence": self.adherence | |
| } | |
| # Calculate RMSE contribution for each metric | |
| rmse_per_metric = {} | |
| for metric_name, metric_value in metrics_dict.items(): | |
| deviation = metric_value - mean | |
| rmse_value = np.sqrt(deviation ** 2) | |
| rmse_per_metric[f"{metric_name}_rmse"] = float(rmse_value) | |
| # Find the metric with highest deviation (most inconsistent) | |
| max_metric = max(rmse_per_metric.items(), key=lambda x: x[1]) | |
| rmse_per_metric["max_deviation_metric"] = max_metric[0].replace("_rmse", "") | |
| return rmse_per_metric | |
| class RMSECalculator: | |
| """Calculate RMSE (Root Mean Squared Error) for evaluation metrics.""" | |
| def compute_rmse_for_metric(predicted: List[float], ground_truth: List[float]) -> float: | |
| """Compute RMSE for a single metric. | |
| Args: | |
| predicted: List of predicted metric values | |
| ground_truth: List of ground truth metric values | |
| Returns: | |
| RMSE value | |
| """ | |
| if len(predicted) != len(ground_truth): | |
| raise ValueError("Predicted and ground truth must have same length") | |
| if len(predicted) == 0: | |
| return 0.0 | |
| try: | |
| mse = mean_squared_error(ground_truth, predicted) | |
| rmse = np.sqrt(mse) | |
| return float(rmse) | |
| except Exception as e: | |
| warnings.warn(f"Error computing RMSE: {e}") | |
| return 0.0 | |
| def compute_rmse_single_trace_evaluation( | |
| predicted_scores: AdvancedTRACEScores, | |
| ground_truth_scores: AdvancedTRACEScores | |
| ) -> Dict[str, float]: | |
| """Compute RMSE metrics for a single TRACE evaluation. | |
| Args: | |
| predicted_scores: AdvancedTRACEScores from evaluation | |
| ground_truth_scores: AdvancedTRACEScores from ground truth | |
| Returns: | |
| Dictionary with individual metric RMSE and aggregated RMSE | |
| """ | |
| metrics = { | |
| "context_relevance": (predicted_scores.context_relevance, ground_truth_scores.context_relevance), | |
| "context_utilization": (predicted_scores.context_utilization, ground_truth_scores.context_utilization), | |
| "completeness": (predicted_scores.completeness, ground_truth_scores.completeness), | |
| "adherence": (predicted_scores.adherence, ground_truth_scores.adherence) | |
| } | |
| rmse_per_metric = {} | |
| for metric_name, (pred, truth) in metrics.items(): | |
| # Calculate RMSE for this single metric comparison | |
| rmse_per_metric[metric_name] = float((pred - truth) ** 2) ** 0.5 | |
| # Aggregated RMSE: root mean square of all metric RMSEs | |
| aggregated_rmse = np.sqrt(np.mean(list(rmse_per_metric.values()))) if rmse_per_metric else 0.0 | |
| return { | |
| "per_metric": rmse_per_metric, | |
| "aggregated_rmse": float(aggregated_rmse) | |
| } | |
| def compute_rmse_aggregation_for_batch(results: List[Dict]) -> Dict: | |
| """Compute RMSE aggregation scores for batch evaluation (consistency within metrics). | |
| Measures consistency of each TRACE score across evaluations. | |
| Args: | |
| results: List of evaluation results with metrics | |
| Returns: | |
| Dictionary with RMSE aggregation stats for each metric | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| aggregation_stats = {} | |
| for metric in metrics: | |
| values = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| values.append(result["metrics"][metric]) | |
| if len(values) > 0: | |
| mean_val = np.mean(values) | |
| std_val = np.std(values) | |
| # RMSE aggregation = std dev (how much metric varies across evaluations) | |
| aggregation_stats[metric] = { | |
| "mean": float(mean_val), | |
| "std_dev": float(std_val), | |
| "min": float(np.min(values)), | |
| "max": float(np.max(values)), | |
| "variance": float(std_val ** 2), | |
| "count": len(values) | |
| } | |
| return aggregation_stats | |
| def compute_rmse_all_metrics(results: List[Dict]) -> Dict[str, float]: | |
| """Compute RMSE for all metrics across multiple test cases. | |
| Args: | |
| results: List of evaluation results with predicted and ground truth scores | |
| Returns: | |
| Dictionary mapping metric names to RMSE values | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| rmse_results = {} | |
| for metric in metrics: | |
| predicted = [] | |
| ground_truth = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| predicted.append(result["metrics"][metric]) | |
| # Check for ground truth in different possible locations | |
| if (("ground_truth_scores" in result and | |
| metric in result["ground_truth_scores"])): | |
| ground_truth.append(result["ground_truth_scores"][metric]) | |
| else: | |
| # Skip this result if no ground truth available | |
| predicted.pop() | |
| if len(predicted) > 0 and len(ground_truth) > 0: | |
| rmse_results[metric] = RMSECalculator.compute_rmse_for_metric( | |
| predicted, ground_truth | |
| ) | |
| # Compute average RMSE across all metrics | |
| if rmse_results: | |
| rmse_results["average"] = np.mean(list(rmse_results.values())) | |
| return rmse_results | |
| def compute_trace_rmse_aggregation(results: List[Dict]) -> Dict[str, float]: | |
| """Compute RMSE aggregation across TRACE metrics for multiple evaluations. | |
| This method computes consistency metrics across evaluations: | |
| - Calculates individual metric RMSEs | |
| - Computes aggregate RMSE showing consistency | |
| - Returns overall evaluation quality metrics | |
| Args: | |
| results: List of evaluation results with metrics and ground truth | |
| Returns: | |
| Dictionary with: | |
| - per_metric_rmse: RMSE for each of the 4 TRACE metrics | |
| - aggregated_rmse: Overall consistency metric (0 = perfect, higher = less consistent) | |
| - consistency_score: Inverse of aggregated_rmse (1 = perfect, 0 = no consistency) | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| per_metric_rmse = {} | |
| # Compute per-metric RMSE | |
| for metric in metrics: | |
| predicted = [] | |
| ground_truth = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| predicted.append(result["metrics"][metric]) | |
| if "ground_truth_scores" in result and metric in result["ground_truth_scores"]: | |
| ground_truth.append(result["ground_truth_scores"][metric]) | |
| else: | |
| if predicted: | |
| predicted.pop() | |
| if len(predicted) > 0 and len(ground_truth) > 0: | |
| per_metric_rmse[metric] = RMSECalculator.compute_rmse_for_metric( | |
| predicted, ground_truth | |
| ) | |
| # Aggregate RMSE across metrics | |
| if per_metric_rmse: | |
| metric_rmses = list(per_metric_rmse.values()) | |
| aggregated_rmse = np.sqrt(np.mean([r ** 2 for r in metric_rmses])) | |
| consistency_score = 1.0 - min(aggregated_rmse, 1.0) # Invert and cap at 0 | |
| else: | |
| aggregated_rmse = 0.0 | |
| consistency_score = 0.0 | |
| return { | |
| "per_metric_rmse": per_metric_rmse, | |
| "aggregated_rmse": float(aggregated_rmse), | |
| "consistency_score": float(consistency_score), # 0-1, where 1 = perfect consistency | |
| "num_evaluations": len(results), | |
| "evaluated_metrics": metrics | |
| } | |
| def get_per_metric_rmse_breakdown(results: List[Dict]) -> Dict: | |
| """Compute detailed RMSE breakdown for each TRACE metric. | |
| Analyzes RMSE for each metric individually to identify which metrics | |
| have the highest prediction errors. Useful for understanding which | |
| parts of the evaluation system need improvement. | |
| Args: | |
| results: List of evaluation results with metrics and ground truth | |
| Returns: | |
| Dictionary with: | |
| - per_metric: RMSE for each metric with % contribution | |
| - worst_performing_metric: Metric with highest RMSE | |
| - best_performing_metric: Metric with lowest RMSE | |
| - metric_details: Detailed stats for each metric | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| per_metric_rmse = {} | |
| metric_details = {} | |
| for metric in metrics: | |
| predicted = [] | |
| ground_truth = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| predicted.append(result["metrics"][metric]) | |
| if "ground_truth_scores" in result and metric in result["ground_truth_scores"]: | |
| ground_truth.append(result["ground_truth_scores"][metric]) | |
| else: | |
| if predicted: | |
| predicted.pop() | |
| if len(predicted) > 0 and len(ground_truth) > 0: | |
| rmse = RMSECalculator.compute_rmse_for_metric(predicted, ground_truth) | |
| per_metric_rmse[metric] = float(rmse) | |
| # Calculate detailed statistics | |
| errors = [abs(p - t) for p, t in zip(predicted, ground_truth)] | |
| metric_details[metric] = { | |
| "rmse": float(rmse), | |
| "mean_absolute_error": float(np.mean(errors)), | |
| "max_error": float(np.max(errors)), | |
| "min_error": float(np.min(errors)), | |
| "std_dev": float(np.std(errors)), | |
| "num_samples": len(predicted) | |
| } | |
| # Calculate percentage contribution | |
| total_rmse_squared = sum(r ** 2 for r in per_metric_rmse.values()) if per_metric_rmse else 0 | |
| for metric in per_metric_rmse: | |
| if total_rmse_squared > 0: | |
| percentage = (per_metric_rmse[metric] ** 2 / total_rmse_squared) * 100 | |
| metric_details[metric]["rmse_contribution_percent"] = float(percentage) | |
| # Find best and worst metrics | |
| worst_metric = max(per_metric_rmse.items(), key=lambda x: x[1]) if per_metric_rmse else (None, 0) | |
| best_metric = min(per_metric_rmse.items(), key=lambda x: x[1]) if per_metric_rmse else (None, 0) | |
| return { | |
| "per_metric": per_metric_rmse, | |
| "worst_performing_metric": worst_metric[0], | |
| "worst_rmse": float(worst_metric[1]), | |
| "best_performing_metric": best_metric[0], | |
| "best_rmse": float(best_metric[1]), | |
| "metric_details": metric_details, | |
| "num_evaluations": len(results) | |
| } | |
| class AUCROCCalculator: | |
| """Calculate AUCROC (Area Under ROC Curve) for binary classification metrics.""" | |
| def binary_labels_from_threshold(scores: List[float], threshold: float = 0.5) -> List[int]: | |
| """Convert continuous scores to binary labels using threshold. | |
| Args: | |
| scores: List of continuous scores | |
| threshold: Threshold for binary classification | |
| Returns: | |
| Binary labels (0 or 1) | |
| """ | |
| return [1 if score >= threshold else 0 for score in scores] | |
| def compute_auc_for_metric(predicted: List[float], ground_truth: List[float]) -> float: | |
| """Compute AUCROC for a single metric. | |
| Args: | |
| predicted: List of predicted metric values (0-1) | |
| ground_truth: List of ground truth metric values (0-1) | |
| Returns: | |
| AUCROC value (0-1), or 0 if computation fails | |
| """ | |
| if len(predicted) != len(ground_truth): | |
| raise ValueError("Predicted and ground truth must have same length") | |
| if len(predicted) <= 1: | |
| return 0.0 | |
| try: | |
| # Convert to binary labels using 0.5 threshold | |
| ground_truth_binary = AUCROCCalculator.binary_labels_from_threshold( | |
| ground_truth, threshold=0.5 | |
| ) | |
| # Check if we have both classes in ground truth | |
| if len(set(ground_truth_binary)) < 2: | |
| # Only one class present, cannot compute AUCROC | |
| return 0.0 | |
| # Compute AUCROC | |
| auc_score = roc_auc_score(ground_truth_binary, predicted) | |
| return float(auc_score) | |
| except Exception as e: | |
| warnings.warn(f"Error computing AUCROC: {e}") | |
| return 0.0 | |
| def compute_per_metric_statistics(results: List[Dict]) -> Dict: | |
| """Compute per-metric statistics for batch evaluation. | |
| Provides detailed statistics on each TRACE metric without requiring ground truth. | |
| Args: | |
| results: List of evaluation results with metrics | |
| Returns: | |
| Dictionary with detailed statistics for each metric | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| stats = {} | |
| for metric in metrics: | |
| values = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| values.append(result["metrics"][metric]) | |
| if len(values) > 0: | |
| # Count how many perfect (1.0) and poor (<0.3) scores | |
| perfect_count = sum(1 for v in values if v >= 0.95) | |
| poor_count = sum(1 for v in values if v < 0.3) | |
| stats[metric] = { | |
| "mean": float(np.mean(values)), | |
| "median": float(np.median(values)), | |
| "std_dev": float(np.std(values)), | |
| "min": float(np.min(values)), | |
| "max": float(np.max(values)), | |
| "percentile_25": float(np.percentile(values, 25)), | |
| "percentile_75": float(np.percentile(values, 75)), | |
| "perfect_count": int(perfect_count), | |
| "poor_count": int(poor_count), | |
| "sample_count": len(values) | |
| } | |
| return stats | |
| def compute_auc_all_metrics(results: List[Dict]) -> Dict[str, float]: | |
| """Compute AUCROC for all metrics across multiple test cases. | |
| Args: | |
| results: List of evaluation results with predicted and ground truth scores | |
| Returns: | |
| Dictionary mapping metric names to AUCROC values | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| auc_results = {} | |
| for metric in metrics: | |
| predicted = [] | |
| ground_truth = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| predicted.append(result["metrics"][metric]) | |
| # Check for ground truth in different possible locations | |
| if (("ground_truth_scores" in result and | |
| metric in result["ground_truth_scores"])): | |
| ground_truth.append(result["ground_truth_scores"][metric]) | |
| else: | |
| # Skip this result if no ground truth available | |
| predicted.pop() | |
| if len(predicted) > 1 and len(ground_truth) > 1: | |
| try: | |
| auc_results[metric] = AUCROCCalculator.compute_auc_for_metric( | |
| predicted, ground_truth | |
| ) | |
| except Exception: | |
| auc_results[metric] = 0.0 | |
| # Compute average AUCROC across all metrics | |
| if auc_results: | |
| auc_results["average"] = np.mean(list(auc_results.values())) | |
| return auc_results | |
| class F1ScoreCalculator: | |
| """Calculate F1Score for evaluation metrics (especially for adherence).""" | |
| def compute_f1_for_metric(predicted: List[float], ground_truth: List[float], | |
| threshold: float = 0.5) -> Dict[str, float]: | |
| """Compute F1 Score for a single metric using binary classification. | |
| Converts continuous scores to binary labels using threshold, then calculates: | |
| - Precision: TP / (TP + FP) | |
| - Recall: TP / (TP + FN) | |
| - F1 Score: 2 * (Precision * Recall) / (Precision + Recall) | |
| Args: | |
| predicted: List of predicted metric values (0-1) | |
| ground_truth: List of ground truth metric values (0-1) | |
| threshold: Threshold for binary classification (default 0.5) | |
| Returns: | |
| Dictionary with F1, Precision, Recall scores | |
| """ | |
| if len(predicted) != len(ground_truth): | |
| raise ValueError("Predicted and ground truth must have same length") | |
| if len(predicted) <= 1: | |
| return {"f1_score": 0.0, "precision": 0.0, "recall": 0.0} | |
| try: | |
| # Convert continuous scores to binary labels | |
| pred_binary = [1 if score >= threshold else 0 for score in predicted] | |
| truth_binary = [1 if score >= threshold else 0 for score in ground_truth] | |
| # Calculate metrics | |
| f1 = f1_score(truth_binary, pred_binary, zero_division=0) | |
| precision = precision_score(truth_binary, pred_binary, zero_division=0) | |
| recall = recall_score(truth_binary, pred_binary, zero_division=0) | |
| return { | |
| "f1_score": float(f1), | |
| "precision": float(precision), | |
| "recall": float(recall) | |
| } | |
| except Exception as e: | |
| warnings.warn(f"Error computing F1 Score: {e}") | |
| return {"f1_score": 0.0, "precision": 0.0, "recall": 0.0} | |
| def compute_adherence_f1(results: List[Dict]) -> Dict[str, float]: | |
| """Compute F1 Score specifically for adherence metric aggregation. | |
| Adherence is a binary metric (0 or 1), so F1 Score is particularly relevant. | |
| Measures how well the predicted adherence scores match ground truth. | |
| Args: | |
| results: List of evaluation results with predicted and ground truth scores | |
| Returns: | |
| Dictionary with: | |
| - adherence_f1: F1 Score for adherence | |
| - adherence_precision: Precision for adherence | |
| - adherence_recall: Recall for adherence | |
| - num_evaluations: Number of evaluations used | |
| """ | |
| predicted = [] | |
| ground_truth = [] | |
| for result in results: | |
| if "metrics" in result and "adherence" in result["metrics"]: | |
| predicted.append(result["metrics"]["adherence"]) | |
| if "ground_truth_scores" in result and "adherence" in result["ground_truth_scores"]: | |
| ground_truth.append(result["ground_truth_scores"]["adherence"]) | |
| else: | |
| if predicted: | |
| predicted.pop() | |
| if len(predicted) == 0 or len(ground_truth) == 0: | |
| return { | |
| "adherence_f1": 0.0, | |
| "adherence_precision": 0.0, | |
| "adherence_recall": 0.0, | |
| "num_evaluations": 0 | |
| } | |
| f1_metrics = F1ScoreCalculator.compute_f1_for_metric(predicted, ground_truth) | |
| return { | |
| "adherence_f1": f1_metrics["f1_score"], | |
| "adherence_precision": f1_metrics["precision"], | |
| "adherence_recall": f1_metrics["recall"], | |
| "num_evaluations": len(predicted) | |
| } | |
| def compute_f1_all_metrics(results: List[Dict]) -> Dict[str, float]: | |
| """Compute F1 Score for all TRACE metrics. | |
| Args: | |
| results: List of evaluation results with predicted and ground truth scores | |
| Returns: | |
| Dictionary mapping metric names to F1 Scores with precision/recall | |
| """ | |
| metrics = ["context_relevance", "context_utilization", "completeness", "adherence"] | |
| f1_results = {} | |
| for metric in metrics: | |
| predicted = [] | |
| ground_truth = [] | |
| for result in results: | |
| if "metrics" in result and metric in result["metrics"]: | |
| predicted.append(result["metrics"][metric]) | |
| if "ground_truth_scores" in result and metric in result["ground_truth_scores"]: | |
| ground_truth.append(result["ground_truth_scores"][metric]) | |
| else: | |
| if predicted: | |
| predicted.pop() | |
| if len(predicted) > 0 and len(ground_truth) > 0: | |
| f1_metrics = F1ScoreCalculator.compute_f1_for_metric(predicted, ground_truth) | |
| f1_results[f"{metric}_f1"] = f1_metrics["f1_score"] | |
| f1_results[f"{metric}_precision"] = f1_metrics["precision"] | |
| f1_results[f"{metric}_recall"] = f1_metrics["recall"] | |
| # Compute average F1 across all metrics | |
| f1_scores = [v for k, v in f1_results.items() if k.endswith("_f1")] | |
| if f1_scores: | |
| f1_results["average_f1"] = float(np.mean(f1_scores)) | |
| return f1_results | |
| class DocumentSentencizer: | |
| """Split documents into sentences with keys (0a, 0b, 1a, etc.).""" | |
| def sentencize_documents(documents: List[str]) -> Tuple[List[Dict], str]: | |
| """Split documents into sentences with keys. | |
| Args: | |
| documents: List of document texts | |
| Returns: | |
| Tuple of (sentence_list, formatted_string) | |
| Where sentence_list = [{"key": "0a", "text": "..."}, ...] | |
| """ | |
| sentence_list = [] | |
| formatted_parts = [] | |
| # Split by common sentence endings | |
| sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])|(?<=[.!?])\s*$' | |
| for doc_idx, document in enumerate(documents): | |
| sentences = re.split(sentence_pattern, document.strip()) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| for sent_idx, sentence in enumerate(sentences): | |
| # Generate key like 0a, 0b, 1a, 1b, etc. | |
| key = f"{doc_idx}{chr(97 + (sent_idx % 26))}" | |
| sentence_list.append({"key": key, "text": sentence}) | |
| formatted_parts.append(f"{key}. {sentence}") | |
| formatted_string = "\n".join(formatted_parts) | |
| return sentence_list, formatted_string | |
| def sentencize_response(response: str) -> Tuple[List[Dict], str]: | |
| """Split response into sentences with keys (a, b, c, etc.). | |
| Args: | |
| response: Response text | |
| Returns: | |
| Tuple of (sentence_list, formatted_string) | |
| """ | |
| sentence_list = [] | |
| formatted_parts = [] | |
| # Split by sentence endings | |
| sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])|(?<=[.!?])\s*$' | |
| sentences = re.split(sentence_pattern, response.strip()) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| for sent_idx, sentence in enumerate(sentences): | |
| key = chr(97 + (sent_idx % 26)) # a, b, c, ... | |
| sentence_list.append({"key": key, "text": sentence}) | |
| formatted_parts.append(f"{key}. {sentence}") | |
| formatted_string = "\n".join(formatted_parts) | |
| return sentence_list, formatted_string | |
| class GPTLabelingPromptGenerator: | |
| """Generate structured GPT labeling prompts for factual evaluation and citation audit.""" | |
| # Improved Template with clear hierarchy and explicit constraints | |
| LABELING_PROMPT_TEMPLATE = """### ROLE | |
| You are a Fact-Checking and Citation Specialist. Your task is to perform a rigorous audit of a response against provided documents to determine its accuracy, relevance, and level of support. | |
| ### TASK OVERVIEW | |
| 1. **Analyze Documents**: Review the provided documents and identify information relevant to the user's question. | |
| 2. **Evaluate Response**: Review the provided answer sentence-by-sentence. | |
| 3. **Verify Support**: Map each answer sentence to specific supporting sentences in the documents. | |
| 4. **Identify Utilization**: Determine which document sentences were actually used (directly or implicitly) to form the answer. | |
| ### INPUT DATA | |
| **Documents (Split into Sentences with Keys):** | |
| ''' | |
| {documents} | |
| ''' | |
| **The Original Question:** | |
| ''' | |
| {question} | |
| ''' | |
| **The Answer to Evaluate (Split into Sentences with Keys):** | |
| ''' | |
| {answer} | |
| ''' | |
| ### OUTPUT REQUIREMENTS | |
| You must respond with a valid JSON object. | |
| **Constraints:** | |
| - Do NOT include any preamble or postamble (e.g., "Here is the analysis..."). | |
| - Do NOT wrap the JSON in markdown code blocks (e.g., no ```json). | |
| - Use proper escaping for quotes and newlines within JSON strings. | |
| ### JSON SCHEMA | |
| {{ | |
| "relevance_explanation": "A step-by-step breakdown of document information and its utility for the question.", | |
| "all_relevant_sentence_keys": ["List of doc keys pertinent to the question, regardless of use in answer"], | |
| "overall_supported_explanation": "Claim-by-claim assessment of the response's accuracy before a final conclusion.", | |
| "overall_supported": boolean, | |
| "sentence_support_information": [ | |
| {{ | |
| "response_sentence_key": "string", | |
| "explanation": "Why the sentence is or is not supported.", | |
| "supporting_sentence_keys": ["doc_keys", "OR: 'supported_without_sentence', 'general', 'well_known_fact', 'numerical_reasoning'"], | |
| "fully_supported": boolean | |
| }} | |
| ], | |
| "all_utilized_sentence_keys": ["List of doc keys actually used to construct the answer"] | |
| }}""" | |
| def generate_labeling_prompt( | |
| question: str, | |
| response: str, | |
| documents: List[str] | |
| ) -> Tuple[str, List[Dict], List[Dict]]: | |
| """Generate the high-fidelity GPT labeling prompt. | |
| Args: | |
| question: The original user question. | |
| response: The LLM response to evaluate. | |
| documents: List of raw retrieved documents. | |
| Returns: | |
| A tuple of (formatted_prompt, list_of_doc_sentences, list_of_resp_sentences) | |
| """ | |
| # Sentencize documents and response | |
| doc_sentences, doc_formatted = DocumentSentencizer.sentencize_documents(documents) | |
| resp_sentences, resp_formatted = DocumentSentencizer.sentencize_response(response) | |
| # Inject data into the structured template | |
| prompt = GPTLabelingPromptGenerator.LABELING_PROMPT_TEMPLATE.format( | |
| documents=doc_formatted, | |
| question=question, | |
| answer=resp_formatted | |
| ) | |
| return prompt, doc_sentences, resp_sentences | |
| class AdvancedRAGEvaluator: | |
| """Advanced RAG evaluator using GPT labeling prompts.""" | |
| def __init__(self, llm_client=None, chunking_strategy: Optional[str] = None, | |
| embedding_model: Optional[str] = None, chunk_size: Optional[int] = None, | |
| chunk_overlap: Optional[int] = None): | |
| """Initialize evaluator. | |
| Args: | |
| llm_client: LLM client for generating labels | |
| chunking_strategy: Chunking strategy used | |
| embedding_model: Embedding model used | |
| chunk_size: Chunk size used | |
| chunk_overlap: Chunk overlap used | |
| """ | |
| self.llm_client = llm_client | |
| self.chunking_strategy = chunking_strategy | |
| self.embedding_model = embedding_model | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| def evaluate( | |
| self, | |
| question: str, | |
| response: str, | |
| retrieved_documents: List[str], | |
| ground_truth: Optional[str] = None | |
| ) -> Tuple[AdvancedTRACEScores, Optional[Dict]]: | |
| """Evaluate response using GPT labeling approach. | |
| Args: | |
| question: User question | |
| response: LLM response | |
| retrieved_documents: Retrieved context documents | |
| ground_truth: Optional ground truth answer | |
| Returns: | |
| Tuple of (AdvancedTRACEScores, llm_request_info dict) | |
| """ | |
| # Generate labeling prompt and get LLM labels with request info | |
| gpt_result = self._get_gpt_labels(question, response, retrieved_documents) | |
| llm_request_info = {} | |
| if not gpt_result: | |
| # Fallback to rule-based evaluation | |
| scores = self._fallback_evaluation(question, response, retrieved_documents) | |
| return scores, llm_request_info | |
| gpt_labels = gpt_result.get("labels") | |
| llm_request_info = gpt_result.get("llm_request_info", {}) | |
| if not gpt_labels: | |
| # Fallback to rule-based evaluation | |
| scores = self._fallback_evaluation(question, response, retrieved_documents) | |
| return scores, llm_request_info | |
| # Compute metrics from GPT labels | |
| context_relevance = self._compute_context_relevance(gpt_labels) | |
| context_utilization = self._compute_context_utilization(gpt_labels) | |
| completeness = self._compute_completeness(gpt_labels, ground_truth) | |
| adherence = self._compute_adherence(gpt_labels) | |
| # Count supported sentences | |
| fully_supported = sum(1 for s in gpt_labels.sentence_support_information | |
| if s.get("fully_supported", False)) | |
| partially_supported = sum(1 for s in gpt_labels.sentence_support_information | |
| if not s.get("fully_supported", False) and | |
| s.get("supporting_sentence_keys", [])) | |
| unsupported = sum(1 for s in gpt_labels.sentence_support_information | |
| if not s.get("supporting_sentence_keys", [])) | |
| scores = AdvancedTRACEScores( | |
| context_relevance=context_relevance, | |
| context_utilization=context_utilization, | |
| completeness=completeness, | |
| adherence=adherence, | |
| overall_supported=gpt_labels.overall_supported, | |
| num_fully_supported_sentences=fully_supported, | |
| num_partially_supported_sentences=partially_supported, | |
| num_unsupported_sentences=unsupported | |
| ) | |
| return scores, llm_request_info | |
| def _get_gpt_labels(self, question: str, response: str, | |
| documents: List[str]) -> Optional[Dict]: | |
| """Get GPT labels using labeling prompt with rate limiting for 30 RPM. | |
| Args: | |
| question: User question | |
| response: LLM response | |
| documents: Retrieved documents | |
| Returns: | |
| Dict containing 'labels' (GPTLabelingOutput) and 'llm_request_info' with complete audit trail | |
| """ | |
| if not self.llm_client: | |
| print("[WARN] No LLM client available - using fallback evaluation") | |
| return None | |
| try: | |
| # Generate prompt | |
| prompt, doc_sentences, resp_sentences = ( | |
| GPTLabelingPromptGenerator.generate_labeling_prompt( | |
| question, response, documents | |
| ) | |
| ) | |
| # Store LLM request info for audit trail | |
| llm_request_info = { | |
| "query": question, | |
| "context_documents": documents, | |
| "llm_response": response, | |
| "labeling_prompt": prompt, | |
| "model": getattr(self.llm_client, 'model_name', 'groq-default'), | |
| "temperature": 0.0, | |
| "max_tokens": 2048 | |
| } | |
| # Log rate limiting info before making API call | |
| print(f"\n[EVALUATION] Making GPT labeling API call...") | |
| print(f"[EVALUATION] This respects the 30 RPM rate limit") | |
| # Call LLM to get labels (rate limiting is handled internally) | |
| llm_response = self.llm_client.generate( | |
| prompt=prompt, | |
| max_tokens=2048, | |
| temperature=0.0 # Deterministic for consistent labeling | |
| ) | |
| # Store full LLM response in request info | |
| llm_request_info["full_llm_response"] = llm_response | |
| # Log the actual response | |
| print(f"\n[LLM RESPONSE] {llm_response}\n") | |
| # Check if response is empty | |
| if not llm_response or not llm_response.strip(): | |
| print(f"[WARN] Empty LLM response received") | |
| return {"labels": None, "llm_request_info": llm_request_info} | |
| # Parse JSON response | |
| try: | |
| # Try to extract JSON from response (in case there's surrounding text) | |
| json_str = llm_response.strip() | |
| # If response contains markdown code blocks, extract the JSON | |
| if "```json" in json_str: | |
| json_str = json_str.split("```json")[1].split("```")[0].strip() | |
| elif "```" in json_str: | |
| json_str = json_str.split("```")[1].split("```")[0].strip() | |
| labels_dict = json.loads(json_str) | |
| gpt_output = GPTLabelingOutput( | |
| relevance_explanation=labels_dict.get("relevance_explanation", ""), | |
| all_relevant_sentence_keys=labels_dict.get("all_relevant_sentence_keys", []), | |
| overall_supported_explanation=labels_dict.get("overall_supported_explanation", ""), | |
| overall_supported=labels_dict.get("overall_supported", False), | |
| sentence_support_information=labels_dict.get("sentence_support_information", []), | |
| all_utilized_sentence_keys=labels_dict.get("all_utilized_sentence_keys", []) | |
| ) | |
| return {"labels": gpt_output, "llm_request_info": llm_request_info} | |
| except (json.JSONDecodeError, ValueError, IndexError) as e: | |
| print(f"[WARN] Failed to parse LLM response: {e}") | |
| print(f"[WARN] Raw response: {llm_response[:200]}") | |
| return {"labels": None, "llm_request_info": llm_request_info} | |
| except Exception as e: | |
| print(f"[WARN] Error getting GPT labels: {e}") | |
| return None | |
| def _compute_context_relevance(self, gpt_labels: GPTLabelingOutput) -> float: | |
| """Compute context relevance metric. | |
| Context Relevance = Number of relevant sentences / Total sentences | |
| """ | |
| if not gpt_labels.all_relevant_sentence_keys: | |
| return 0.0 | |
| return min(1.0, len(gpt_labels.all_relevant_sentence_keys) / 20.0) # Normalize | |
| def _compute_context_utilization(self, gpt_labels: GPTLabelingOutput) -> float: | |
| """Compute context utilization metric. | |
| Context Utilization = Number of utilized sentences / Number of relevant sentences | |
| """ | |
| relevant_count = len(gpt_labels.all_relevant_sentence_keys) | |
| utilized_count = len(gpt_labels.all_utilized_sentence_keys) | |
| if relevant_count == 0: | |
| return 0.0 | |
| return min(1.0, utilized_count / relevant_count) | |
| def _compute_completeness(self, gpt_labels: GPTLabelingOutput, | |
| ground_truth: Optional[str] = None) -> float: | |
| """Compute completeness metric. | |
| Completeness = Relevant sentences used / All relevant sentences | |
| """ | |
| relevant_set = set(gpt_labels.all_relevant_sentence_keys) | |
| utilized_set = set(gpt_labels.all_utilized_sentence_keys) | |
| intersection = len(relevant_set & utilized_set) | |
| if len(relevant_set) == 0: | |
| return 1.0 if len(utilized_set) == 0 else 0.0 | |
| return intersection / len(relevant_set) | |
| def _compute_adherence(self, gpt_labels: GPTLabelingOutput) -> float: | |
| """Compute adherence metric (Boolean: 1.0 = fully grounded, 0.0 = contains hallucination). | |
| Per RAGBench paper: Adherence is whether ALL response sentences are fully supported by context. | |
| If even ONE sentence is not fully supported, adherence = 0.0 | |
| """ | |
| total_sentences = len(gpt_labels.sentence_support_information) | |
| if total_sentences == 0: | |
| return 1.0 | |
| # Check if ALL sentences are fully supported | |
| fully_supported_count = sum( | |
| 1 for s in gpt_labels.sentence_support_information | |
| if s.get("fully_supported", False) | |
| ) | |
| # Boolean: 1.0 if all sentences are fully supported, 0.0 if any sentence is not fully supported | |
| return 1.0 if fully_supported_count == total_sentences else 0.0 | |
| def _fallback_evaluation(self, question: str, response: str, | |
| documents: List[str]) -> AdvancedTRACEScores: | |
| """Fallback rule-based evaluation when LLM unavailable.""" | |
| # Simple heuristics when LLM not available | |
| response_words = set(response.lower().split()) | |
| doc_words = set() | |
| for doc in documents: | |
| doc_words.update(doc.lower().split()) | |
| overlap = len(response_words & doc_words) / max(len(response_words), 1) | |
| return AdvancedTRACEScores( | |
| context_relevance=overlap, | |
| context_utilization=overlap, | |
| completeness=overlap, | |
| adherence=overlap, | |
| overall_supported=overlap > 0.5, | |
| num_fully_supported_sentences=0, | |
| num_partially_supported_sentences=0, | |
| num_unsupported_sentences=0 | |
| ) | |
| def evaluate_batch(self, test_cases: List[Dict], checkpoint_file: str = None, | |
| resume: bool = True) -> Dict: | |
| """Evaluate multiple test cases with checkpoint support. | |
| Args: | |
| test_cases: List of test cases with question, response, etc. | |
| checkpoint_file: Optional file to save/resume progress | |
| resume: Whether to resume from checkpoint if exists | |
| Returns: | |
| Dictionary with aggregated scores and detailed results | |
| """ | |
| all_scores = [] | |
| detailed_results = [] | |
| start_index = 0 | |
| # Try to resume from checkpoint | |
| if checkpoint_file and resume: | |
| try: | |
| import os | |
| if os.path.exists(checkpoint_file): | |
| with open(checkpoint_file, 'r') as f: | |
| checkpoint_data = json.load(f) | |
| detailed_results = checkpoint_data.get('detailed_results', []) | |
| start_index = len(detailed_results) | |
| print(f"[CHECKPOINT] Resuming from checkpoint at sample {start_index}/{len(test_cases)}") | |
| except Exception as e: | |
| print(f"[CHECKPOINT] Could not load checkpoint: {e}") | |
| for i, test_case in enumerate(test_cases): | |
| # Skip already processed samples | |
| if i < start_index: | |
| continue | |
| print(f"Evaluating test case {i+1}/{len(test_cases)}") | |
| question = test_case.get("query", "") | |
| response = test_case.get("response", "") | |
| documents = test_case.get("retrieved_documents", []) | |
| ground_truth = response # Use response as ground truth reference answer | |
| ground_truth_scores = test_case.get("ground_truth_scores", {}) # Extract RAGBench ground truth scores | |
| # evaluate now returns (scores, llm_request_info) | |
| scores, llm_request_info = self.evaluate(question, response, documents, ground_truth) | |
| all_scores.append(scores) | |
| # Store detailed results with ground truth for RMSE/AUCROC computation | |
| result_dict = { | |
| "query_id": i + 1, | |
| "question": question, | |
| "prompt": llm_request_info.get("labeling_prompt", "") if llm_request_info else "", | |
| "llm_response": llm_request_info.get("full_llm_response", "") if llm_request_info else "", | |
| "metrics": scores.to_dict(), | |
| "ground_truth_scores": ground_truth_scores # Include RAGBench ground truth for RMSE/AUCROC | |
| } | |
| detailed_results.append(result_dict) | |
| # Save checkpoint after each evaluation (for resume capability) | |
| if checkpoint_file and (i + 1) % 5 == 0: # Save every 5 samples | |
| try: | |
| checkpoint_data = { | |
| 'detailed_results': detailed_results, | |
| 'last_index': i + 1, | |
| 'total_samples': len(test_cases) | |
| } | |
| with open(checkpoint_file, 'w') as f: | |
| json.dump(checkpoint_data, f, default=str) | |
| print(f"[CHECKPOINT] Saved progress at sample {i + 1}/{len(test_cases)}") | |
| except Exception as e: | |
| print(f"[CHECKPOINT] Failed to save: {e}") | |
| # Aggregate scores - convert dictionary results to proper format | |
| scores_dicts = [s.to_dict() for s in all_scores] | |
| # Extract metric values safely from dictionaries | |
| context_relevance_vals = [s.get("context_relevance", 0) for s in scores_dicts] | |
| context_utilization_vals = [s.get("context_utilization", 0) for s in scores_dicts] | |
| completeness_vals = [s.get("completeness", 0) for s in scores_dicts] | |
| adherence_vals = [s.get("adherence", 0) for s in scores_dicts] | |
| average_vals = [s.get("average", 0) for s in scores_dicts] | |
| results = { | |
| "context_relevance": float(np.mean(context_relevance_vals)) if context_relevance_vals else 0.0, | |
| "context_utilization": float(np.mean(context_utilization_vals)) if context_utilization_vals else 0.0, | |
| "completeness": float(np.mean(completeness_vals)) if completeness_vals else 0.0, | |
| "adherence": float(np.mean(adherence_vals)) if adherence_vals else 0.0, | |
| "average": float(np.mean(average_vals)) if average_vals else 0.0, | |
| "num_samples": len(test_cases), | |
| "detailed_results": detailed_results, | |
| "evaluation_config": { | |
| "chunking_strategy": self.chunking_strategy, | |
| "embedding_model": self.embedding_model, | |
| "chunk_size": self.chunk_size, | |
| "chunk_overlap": self.chunk_overlap, | |
| "evaluation_method": "gpt_labeling_prompts" | |
| } | |
| } | |
| # Compute RMSE aggregation and per-metric statistics | |
| rmse_metrics = RMSECalculator.compute_rmse_aggregation_for_batch(detailed_results) | |
| per_metric_stats = AUCROCCalculator.compute_per_metric_statistics(detailed_results) | |
| if rmse_metrics: | |
| results["rmse_metrics"] = rmse_metrics | |
| if per_metric_stats: | |
| results["per_metric_statistics"] = per_metric_stats | |
| # Compute RMSE against RAGBench ground truth (per RAGBench paper requirement) | |
| # This compares predicted scores vs original scores in the dataset | |
| rmse_vs_ground_truth = RMSECalculator.compute_trace_rmse_aggregation(detailed_results) | |
| if rmse_vs_ground_truth and rmse_vs_ground_truth.get("per_metric_rmse"): | |
| results["rmse_vs_ground_truth"] = rmse_vs_ground_truth | |
| # Compute AUCROC against RAGBench ground truth (per RAGBench paper requirement) | |
| aucroc_vs_ground_truth = AUCROCCalculator.compute_auc_all_metrics(detailed_results) | |
| if aucroc_vs_ground_truth: | |
| results["aucroc_vs_ground_truth"] = aucroc_vs_ground_truth | |
| # Compute F1 Score for adherence aggregation | |
| adherence_f1_scores = F1ScoreCalculator.compute_adherence_f1(detailed_results) | |
| if adherence_f1_scores: | |
| results["adherence_f1_scores"] = adherence_f1_scores | |
| # Compute F1 Scores for all metrics | |
| f1_all_metrics = F1ScoreCalculator.compute_f1_all_metrics(detailed_results) | |
| if f1_all_metrics: | |
| results["f1_scores"] = f1_all_metrics | |
| return results | |