CapStoneRAG10 / advanced_rag_evaluator.py
Developer
Add F1Score calculation for adherence metric aggregation
80326a1
"""Advanced RAG evaluation using GPT-4 labeling prompts (RAGBench paper approach).
This module implements the evaluation methodology from the RAGBench paper,
using LLM-based labeling to assess:
- Context Relevance
- Context Utilization
- Completeness
- Adherence
Reference: RAGBench Paper - arXiv:2407.11005
"""
from typing import List, Dict, Optional, Tuple
import json
import re
from dataclasses import dataclass
import numpy as np
from sklearn.metrics import mean_squared_error, roc_auc_score, auc, f1_score, precision_score, recall_score
from sklearn.preprocessing import label_binarize
import warnings
@dataclass
class SentenceSupportInfo:
"""Information about support for a response sentence."""
response_sentence_key: str
explanation: str
supporting_sentence_keys: List[str]
fully_supported: bool
@dataclass
class GPTLabelingOutput:
"""Output from GPT labeling prompt."""
relevance_explanation: str
all_relevant_sentence_keys: List[str]
overall_supported_explanation: str
overall_supported: bool
sentence_support_information: List[Dict]
all_utilized_sentence_keys: List[str]
@dataclass
class AdvancedTRACEScores:
"""Advanced TRACE scores with detailed metrics."""
context_relevance: float # Fraction of retrieved context relevant to query
context_utilization: float # Fraction of retrieved context used in response
completeness: float # Fraction of relevant info covered by response
adherence: float # Whether response is grounded in context (no hallucinations)
# Additional metrics from GPT labeling
overall_supported: bool # Whether response is fully supported by documents
num_fully_supported_sentences: int # Number of fully supported sentences
num_partially_supported_sentences: int # Number of partially supported sentences
num_unsupported_sentences: int # Number of unsupported sentences
def to_dict(self) -> Dict:
"""Convert to dictionary."""
return {
"context_relevance": float(self.context_relevance),
"context_utilization": float(self.context_utilization),
"completeness": float(self.completeness),
"adherence": float(self.adherence),
"average": self.average(),
"rmse_aggregation": self.rmse_aggregation(),
"rmse_per_metric": self.get_rmse_per_metric(),
"overall_supported": self.overall_supported,
"fully_supported_sentences": self.num_fully_supported_sentences,
"partially_supported_sentences": self.num_partially_supported_sentences,
"unsupported_sentences": self.num_unsupported_sentences,
}
def average(self) -> float:
"""Calculate average score."""
return (self.context_relevance + self.context_utilization +
self.completeness + self.adherence) / 4
def rmse_aggregation(self) -> float:
"""Calculate RMSE aggregation across all four TRACE metrics.
RMSE aggregation penalizes inconsistency across metrics.
If all metrics are equal, RMSE is 0 (perfect consistency).
If metrics vary, RMSE increases with variance.
Formula: RMSE = sqrt(((R-μ)² + (A-μ)² + (C-μ)² + (U-μ)²) / 4)
where μ = average of all metrics
Returns:
RMSE value (0-1), where 0 = perfect consistency
"""
metrics = [
self.context_relevance,
self.context_utilization,
self.completeness,
self.adherence
]
mean = self.average()
# Calculate mean squared error from the mean
squared_errors = [(m - mean) ** 2 for m in metrics]
mse = np.mean(squared_errors)
rmse = np.sqrt(mse)
return float(rmse)
def get_rmse_per_metric(self) -> Dict[str, float]:
"""Calculate RMSE contribution for each metric.
Shows how much each metric deviates from the mean, indicating
which metrics are inconsistent relative to overall performance.
Formula for each metric: sqrt((metric - mean)²)
Returns:
Dict with RMSE values for each metric:
{
"context_relevance_rmse": float,
"context_utilization_rmse": float,
"completeness_rmse": float,
"adherence_rmse": float,
"max_deviation_metric": str (name of most inconsistent metric)
}
"""
mean = self.average()
metrics_dict = {
"context_relevance": self.context_relevance,
"context_utilization": self.context_utilization,
"completeness": self.completeness,
"adherence": self.adherence
}
# Calculate RMSE contribution for each metric
rmse_per_metric = {}
for metric_name, metric_value in metrics_dict.items():
deviation = metric_value - mean
rmse_value = np.sqrt(deviation ** 2)
rmse_per_metric[f"{metric_name}_rmse"] = float(rmse_value)
# Find the metric with highest deviation (most inconsistent)
max_metric = max(rmse_per_metric.items(), key=lambda x: x[1])
rmse_per_metric["max_deviation_metric"] = max_metric[0].replace("_rmse", "")
return rmse_per_metric
class RMSECalculator:
"""Calculate RMSE (Root Mean Squared Error) for evaluation metrics."""
@staticmethod
def compute_rmse_for_metric(predicted: List[float], ground_truth: List[float]) -> float:
"""Compute RMSE for a single metric.
Args:
predicted: List of predicted metric values
ground_truth: List of ground truth metric values
Returns:
RMSE value
"""
if len(predicted) != len(ground_truth):
raise ValueError("Predicted and ground truth must have same length")
if len(predicted) == 0:
return 0.0
try:
mse = mean_squared_error(ground_truth, predicted)
rmse = np.sqrt(mse)
return float(rmse)
except Exception as e:
warnings.warn(f"Error computing RMSE: {e}")
return 0.0
@staticmethod
def compute_rmse_single_trace_evaluation(
predicted_scores: AdvancedTRACEScores,
ground_truth_scores: AdvancedTRACEScores
) -> Dict[str, float]:
"""Compute RMSE metrics for a single TRACE evaluation.
Args:
predicted_scores: AdvancedTRACEScores from evaluation
ground_truth_scores: AdvancedTRACEScores from ground truth
Returns:
Dictionary with individual metric RMSE and aggregated RMSE
"""
metrics = {
"context_relevance": (predicted_scores.context_relevance, ground_truth_scores.context_relevance),
"context_utilization": (predicted_scores.context_utilization, ground_truth_scores.context_utilization),
"completeness": (predicted_scores.completeness, ground_truth_scores.completeness),
"adherence": (predicted_scores.adherence, ground_truth_scores.adherence)
}
rmse_per_metric = {}
for metric_name, (pred, truth) in metrics.items():
# Calculate RMSE for this single metric comparison
rmse_per_metric[metric_name] = float((pred - truth) ** 2) ** 0.5
# Aggregated RMSE: root mean square of all metric RMSEs
aggregated_rmse = np.sqrt(np.mean(list(rmse_per_metric.values()))) if rmse_per_metric else 0.0
return {
"per_metric": rmse_per_metric,
"aggregated_rmse": float(aggregated_rmse)
}
@staticmethod
def compute_rmse_aggregation_for_batch(results: List[Dict]) -> Dict:
"""Compute RMSE aggregation scores for batch evaluation (consistency within metrics).
Measures consistency of each TRACE score across evaluations.
Args:
results: List of evaluation results with metrics
Returns:
Dictionary with RMSE aggregation stats for each metric
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
aggregation_stats = {}
for metric in metrics:
values = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
values.append(result["metrics"][metric])
if len(values) > 0:
mean_val = np.mean(values)
std_val = np.std(values)
# RMSE aggregation = std dev (how much metric varies across evaluations)
aggregation_stats[metric] = {
"mean": float(mean_val),
"std_dev": float(std_val),
"min": float(np.min(values)),
"max": float(np.max(values)),
"variance": float(std_val ** 2),
"count": len(values)
}
return aggregation_stats
@staticmethod
def compute_rmse_all_metrics(results: List[Dict]) -> Dict[str, float]:
"""Compute RMSE for all metrics across multiple test cases.
Args:
results: List of evaluation results with predicted and ground truth scores
Returns:
Dictionary mapping metric names to RMSE values
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
rmse_results = {}
for metric in metrics:
predicted = []
ground_truth = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
predicted.append(result["metrics"][metric])
# Check for ground truth in different possible locations
if (("ground_truth_scores" in result and
metric in result["ground_truth_scores"])):
ground_truth.append(result["ground_truth_scores"][metric])
else:
# Skip this result if no ground truth available
predicted.pop()
if len(predicted) > 0 and len(ground_truth) > 0:
rmse_results[metric] = RMSECalculator.compute_rmse_for_metric(
predicted, ground_truth
)
# Compute average RMSE across all metrics
if rmse_results:
rmse_results["average"] = np.mean(list(rmse_results.values()))
return rmse_results
@staticmethod
def compute_trace_rmse_aggregation(results: List[Dict]) -> Dict[str, float]:
"""Compute RMSE aggregation across TRACE metrics for multiple evaluations.
This method computes consistency metrics across evaluations:
- Calculates individual metric RMSEs
- Computes aggregate RMSE showing consistency
- Returns overall evaluation quality metrics
Args:
results: List of evaluation results with metrics and ground truth
Returns:
Dictionary with:
- per_metric_rmse: RMSE for each of the 4 TRACE metrics
- aggregated_rmse: Overall consistency metric (0 = perfect, higher = less consistent)
- consistency_score: Inverse of aggregated_rmse (1 = perfect, 0 = no consistency)
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
per_metric_rmse = {}
# Compute per-metric RMSE
for metric in metrics:
predicted = []
ground_truth = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
predicted.append(result["metrics"][metric])
if "ground_truth_scores" in result and metric in result["ground_truth_scores"]:
ground_truth.append(result["ground_truth_scores"][metric])
else:
if predicted:
predicted.pop()
if len(predicted) > 0 and len(ground_truth) > 0:
per_metric_rmse[metric] = RMSECalculator.compute_rmse_for_metric(
predicted, ground_truth
)
# Aggregate RMSE across metrics
if per_metric_rmse:
metric_rmses = list(per_metric_rmse.values())
aggregated_rmse = np.sqrt(np.mean([r ** 2 for r in metric_rmses]))
consistency_score = 1.0 - min(aggregated_rmse, 1.0) # Invert and cap at 0
else:
aggregated_rmse = 0.0
consistency_score = 0.0
return {
"per_metric_rmse": per_metric_rmse,
"aggregated_rmse": float(aggregated_rmse),
"consistency_score": float(consistency_score), # 0-1, where 1 = perfect consistency
"num_evaluations": len(results),
"evaluated_metrics": metrics
}
@staticmethod
def get_per_metric_rmse_breakdown(results: List[Dict]) -> Dict:
"""Compute detailed RMSE breakdown for each TRACE metric.
Analyzes RMSE for each metric individually to identify which metrics
have the highest prediction errors. Useful for understanding which
parts of the evaluation system need improvement.
Args:
results: List of evaluation results with metrics and ground truth
Returns:
Dictionary with:
- per_metric: RMSE for each metric with % contribution
- worst_performing_metric: Metric with highest RMSE
- best_performing_metric: Metric with lowest RMSE
- metric_details: Detailed stats for each metric
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
per_metric_rmse = {}
metric_details = {}
for metric in metrics:
predicted = []
ground_truth = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
predicted.append(result["metrics"][metric])
if "ground_truth_scores" in result and metric in result["ground_truth_scores"]:
ground_truth.append(result["ground_truth_scores"][metric])
else:
if predicted:
predicted.pop()
if len(predicted) > 0 and len(ground_truth) > 0:
rmse = RMSECalculator.compute_rmse_for_metric(predicted, ground_truth)
per_metric_rmse[metric] = float(rmse)
# Calculate detailed statistics
errors = [abs(p - t) for p, t in zip(predicted, ground_truth)]
metric_details[metric] = {
"rmse": float(rmse),
"mean_absolute_error": float(np.mean(errors)),
"max_error": float(np.max(errors)),
"min_error": float(np.min(errors)),
"std_dev": float(np.std(errors)),
"num_samples": len(predicted)
}
# Calculate percentage contribution
total_rmse_squared = sum(r ** 2 for r in per_metric_rmse.values()) if per_metric_rmse else 0
for metric in per_metric_rmse:
if total_rmse_squared > 0:
percentage = (per_metric_rmse[metric] ** 2 / total_rmse_squared) * 100
metric_details[metric]["rmse_contribution_percent"] = float(percentage)
# Find best and worst metrics
worst_metric = max(per_metric_rmse.items(), key=lambda x: x[1]) if per_metric_rmse else (None, 0)
best_metric = min(per_metric_rmse.items(), key=lambda x: x[1]) if per_metric_rmse else (None, 0)
return {
"per_metric": per_metric_rmse,
"worst_performing_metric": worst_metric[0],
"worst_rmse": float(worst_metric[1]),
"best_performing_metric": best_metric[0],
"best_rmse": float(best_metric[1]),
"metric_details": metric_details,
"num_evaluations": len(results)
}
class AUCROCCalculator:
"""Calculate AUCROC (Area Under ROC Curve) for binary classification metrics."""
@staticmethod
def binary_labels_from_threshold(scores: List[float], threshold: float = 0.5) -> List[int]:
"""Convert continuous scores to binary labels using threshold.
Args:
scores: List of continuous scores
threshold: Threshold for binary classification
Returns:
Binary labels (0 or 1)
"""
return [1 if score >= threshold else 0 for score in scores]
@staticmethod
def compute_auc_for_metric(predicted: List[float], ground_truth: List[float]) -> float:
"""Compute AUCROC for a single metric.
Args:
predicted: List of predicted metric values (0-1)
ground_truth: List of ground truth metric values (0-1)
Returns:
AUCROC value (0-1), or 0 if computation fails
"""
if len(predicted) != len(ground_truth):
raise ValueError("Predicted and ground truth must have same length")
if len(predicted) <= 1:
return 0.0
try:
# Convert to binary labels using 0.5 threshold
ground_truth_binary = AUCROCCalculator.binary_labels_from_threshold(
ground_truth, threshold=0.5
)
# Check if we have both classes in ground truth
if len(set(ground_truth_binary)) < 2:
# Only one class present, cannot compute AUCROC
return 0.0
# Compute AUCROC
auc_score = roc_auc_score(ground_truth_binary, predicted)
return float(auc_score)
except Exception as e:
warnings.warn(f"Error computing AUCROC: {e}")
return 0.0
@staticmethod
def compute_per_metric_statistics(results: List[Dict]) -> Dict:
"""Compute per-metric statistics for batch evaluation.
Provides detailed statistics on each TRACE metric without requiring ground truth.
Args:
results: List of evaluation results with metrics
Returns:
Dictionary with detailed statistics for each metric
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
stats = {}
for metric in metrics:
values = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
values.append(result["metrics"][metric])
if len(values) > 0:
# Count how many perfect (1.0) and poor (<0.3) scores
perfect_count = sum(1 for v in values if v >= 0.95)
poor_count = sum(1 for v in values if v < 0.3)
stats[metric] = {
"mean": float(np.mean(values)),
"median": float(np.median(values)),
"std_dev": float(np.std(values)),
"min": float(np.min(values)),
"max": float(np.max(values)),
"percentile_25": float(np.percentile(values, 25)),
"percentile_75": float(np.percentile(values, 75)),
"perfect_count": int(perfect_count),
"poor_count": int(poor_count),
"sample_count": len(values)
}
return stats
@staticmethod
def compute_auc_all_metrics(results: List[Dict]) -> Dict[str, float]:
"""Compute AUCROC for all metrics across multiple test cases.
Args:
results: List of evaluation results with predicted and ground truth scores
Returns:
Dictionary mapping metric names to AUCROC values
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
auc_results = {}
for metric in metrics:
predicted = []
ground_truth = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
predicted.append(result["metrics"][metric])
# Check for ground truth in different possible locations
if (("ground_truth_scores" in result and
metric in result["ground_truth_scores"])):
ground_truth.append(result["ground_truth_scores"][metric])
else:
# Skip this result if no ground truth available
predicted.pop()
if len(predicted) > 1 and len(ground_truth) > 1:
try:
auc_results[metric] = AUCROCCalculator.compute_auc_for_metric(
predicted, ground_truth
)
except Exception:
auc_results[metric] = 0.0
# Compute average AUCROC across all metrics
if auc_results:
auc_results["average"] = np.mean(list(auc_results.values()))
return auc_results
class F1ScoreCalculator:
"""Calculate F1Score for evaluation metrics (especially for adherence)."""
@staticmethod
def compute_f1_for_metric(predicted: List[float], ground_truth: List[float],
threshold: float = 0.5) -> Dict[str, float]:
"""Compute F1 Score for a single metric using binary classification.
Converts continuous scores to binary labels using threshold, then calculates:
- Precision: TP / (TP + FP)
- Recall: TP / (TP + FN)
- F1 Score: 2 * (Precision * Recall) / (Precision + Recall)
Args:
predicted: List of predicted metric values (0-1)
ground_truth: List of ground truth metric values (0-1)
threshold: Threshold for binary classification (default 0.5)
Returns:
Dictionary with F1, Precision, Recall scores
"""
if len(predicted) != len(ground_truth):
raise ValueError("Predicted and ground truth must have same length")
if len(predicted) <= 1:
return {"f1_score": 0.0, "precision": 0.0, "recall": 0.0}
try:
# Convert continuous scores to binary labels
pred_binary = [1 if score >= threshold else 0 for score in predicted]
truth_binary = [1 if score >= threshold else 0 for score in ground_truth]
# Calculate metrics
f1 = f1_score(truth_binary, pred_binary, zero_division=0)
precision = precision_score(truth_binary, pred_binary, zero_division=0)
recall = recall_score(truth_binary, pred_binary, zero_division=0)
return {
"f1_score": float(f1),
"precision": float(precision),
"recall": float(recall)
}
except Exception as e:
warnings.warn(f"Error computing F1 Score: {e}")
return {"f1_score": 0.0, "precision": 0.0, "recall": 0.0}
@staticmethod
def compute_adherence_f1(results: List[Dict]) -> Dict[str, float]:
"""Compute F1 Score specifically for adherence metric aggregation.
Adherence is a binary metric (0 or 1), so F1 Score is particularly relevant.
Measures how well the predicted adherence scores match ground truth.
Args:
results: List of evaluation results with predicted and ground truth scores
Returns:
Dictionary with:
- adherence_f1: F1 Score for adherence
- adherence_precision: Precision for adherence
- adherence_recall: Recall for adherence
- num_evaluations: Number of evaluations used
"""
predicted = []
ground_truth = []
for result in results:
if "metrics" in result and "adherence" in result["metrics"]:
predicted.append(result["metrics"]["adherence"])
if "ground_truth_scores" in result and "adherence" in result["ground_truth_scores"]:
ground_truth.append(result["ground_truth_scores"]["adherence"])
else:
if predicted:
predicted.pop()
if len(predicted) == 0 or len(ground_truth) == 0:
return {
"adherence_f1": 0.0,
"adherence_precision": 0.0,
"adherence_recall": 0.0,
"num_evaluations": 0
}
f1_metrics = F1ScoreCalculator.compute_f1_for_metric(predicted, ground_truth)
return {
"adherence_f1": f1_metrics["f1_score"],
"adherence_precision": f1_metrics["precision"],
"adherence_recall": f1_metrics["recall"],
"num_evaluations": len(predicted)
}
@staticmethod
def compute_f1_all_metrics(results: List[Dict]) -> Dict[str, float]:
"""Compute F1 Score for all TRACE metrics.
Args:
results: List of evaluation results with predicted and ground truth scores
Returns:
Dictionary mapping metric names to F1 Scores with precision/recall
"""
metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
f1_results = {}
for metric in metrics:
predicted = []
ground_truth = []
for result in results:
if "metrics" in result and metric in result["metrics"]:
predicted.append(result["metrics"][metric])
if "ground_truth_scores" in result and metric in result["ground_truth_scores"]:
ground_truth.append(result["ground_truth_scores"][metric])
else:
if predicted:
predicted.pop()
if len(predicted) > 0 and len(ground_truth) > 0:
f1_metrics = F1ScoreCalculator.compute_f1_for_metric(predicted, ground_truth)
f1_results[f"{metric}_f1"] = f1_metrics["f1_score"]
f1_results[f"{metric}_precision"] = f1_metrics["precision"]
f1_results[f"{metric}_recall"] = f1_metrics["recall"]
# Compute average F1 across all metrics
f1_scores = [v for k, v in f1_results.items() if k.endswith("_f1")]
if f1_scores:
f1_results["average_f1"] = float(np.mean(f1_scores))
return f1_results
class DocumentSentencizer:
"""Split documents into sentences with keys (0a, 0b, 1a, etc.)."""
@staticmethod
def sentencize_documents(documents: List[str]) -> Tuple[List[Dict], str]:
"""Split documents into sentences with keys.
Args:
documents: List of document texts
Returns:
Tuple of (sentence_list, formatted_string)
Where sentence_list = [{"key": "0a", "text": "..."}, ...]
"""
sentence_list = []
formatted_parts = []
# Split by common sentence endings
sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])|(?<=[.!?])\s*$'
for doc_idx, document in enumerate(documents):
sentences = re.split(sentence_pattern, document.strip())
sentences = [s.strip() for s in sentences if s.strip()]
for sent_idx, sentence in enumerate(sentences):
# Generate key like 0a, 0b, 1a, 1b, etc.
key = f"{doc_idx}{chr(97 + (sent_idx % 26))}"
sentence_list.append({"key": key, "text": sentence})
formatted_parts.append(f"{key}. {sentence}")
formatted_string = "\n".join(formatted_parts)
return sentence_list, formatted_string
@staticmethod
def sentencize_response(response: str) -> Tuple[List[Dict], str]:
"""Split response into sentences with keys (a, b, c, etc.).
Args:
response: Response text
Returns:
Tuple of (sentence_list, formatted_string)
"""
sentence_list = []
formatted_parts = []
# Split by sentence endings
sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z])|(?<=[.!?])\s*$'
sentences = re.split(sentence_pattern, response.strip())
sentences = [s.strip() for s in sentences if s.strip()]
for sent_idx, sentence in enumerate(sentences):
key = chr(97 + (sent_idx % 26)) # a, b, c, ...
sentence_list.append({"key": key, "text": sentence})
formatted_parts.append(f"{key}. {sentence}")
formatted_string = "\n".join(formatted_parts)
return sentence_list, formatted_string
class GPTLabelingPromptGenerator:
"""Generate structured GPT labeling prompts for factual evaluation and citation audit."""
# Improved Template with clear hierarchy and explicit constraints
LABELING_PROMPT_TEMPLATE = """### ROLE
You are a Fact-Checking and Citation Specialist. Your task is to perform a rigorous audit of a response against provided documents to determine its accuracy, relevance, and level of support.
### TASK OVERVIEW
1. **Analyze Documents**: Review the provided documents and identify information relevant to the user's question.
2. **Evaluate Response**: Review the provided answer sentence-by-sentence.
3. **Verify Support**: Map each answer sentence to specific supporting sentences in the documents.
4. **Identify Utilization**: Determine which document sentences were actually used (directly or implicitly) to form the answer.
### INPUT DATA
**Documents (Split into Sentences with Keys):**
'''
{documents}
'''
**The Original Question:**
'''
{question}
'''
**The Answer to Evaluate (Split into Sentences with Keys):**
'''
{answer}
'''
### OUTPUT REQUIREMENTS
You must respond with a valid JSON object.
**Constraints:**
- Do NOT include any preamble or postamble (e.g., "Here is the analysis...").
- Do NOT wrap the JSON in markdown code blocks (e.g., no ```json).
- Use proper escaping for quotes and newlines within JSON strings.
### JSON SCHEMA
{{
"relevance_explanation": "A step-by-step breakdown of document information and its utility for the question.",
"all_relevant_sentence_keys": ["List of doc keys pertinent to the question, regardless of use in answer"],
"overall_supported_explanation": "Claim-by-claim assessment of the response's accuracy before a final conclusion.",
"overall_supported": boolean,
"sentence_support_information": [
{{
"response_sentence_key": "string",
"explanation": "Why the sentence is or is not supported.",
"supporting_sentence_keys": ["doc_keys", "OR: 'supported_without_sentence', 'general', 'well_known_fact', 'numerical_reasoning'"],
"fully_supported": boolean
}}
],
"all_utilized_sentence_keys": ["List of doc keys actually used to construct the answer"]
}}"""
@staticmethod
def generate_labeling_prompt(
question: str,
response: str,
documents: List[str]
) -> Tuple[str, List[Dict], List[Dict]]:
"""Generate the high-fidelity GPT labeling prompt.
Args:
question: The original user question.
response: The LLM response to evaluate.
documents: List of raw retrieved documents.
Returns:
A tuple of (formatted_prompt, list_of_doc_sentences, list_of_resp_sentences)
"""
# Sentencize documents and response
doc_sentences, doc_formatted = DocumentSentencizer.sentencize_documents(documents)
resp_sentences, resp_formatted = DocumentSentencizer.sentencize_response(response)
# Inject data into the structured template
prompt = GPTLabelingPromptGenerator.LABELING_PROMPT_TEMPLATE.format(
documents=doc_formatted,
question=question,
answer=resp_formatted
)
return prompt, doc_sentences, resp_sentences
class AdvancedRAGEvaluator:
"""Advanced RAG evaluator using GPT labeling prompts."""
def __init__(self, llm_client=None, chunking_strategy: Optional[str] = None,
embedding_model: Optional[str] = None, chunk_size: Optional[int] = None,
chunk_overlap: Optional[int] = None):
"""Initialize evaluator.
Args:
llm_client: LLM client for generating labels
chunking_strategy: Chunking strategy used
embedding_model: Embedding model used
chunk_size: Chunk size used
chunk_overlap: Chunk overlap used
"""
self.llm_client = llm_client
self.chunking_strategy = chunking_strategy
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def evaluate(
self,
question: str,
response: str,
retrieved_documents: List[str],
ground_truth: Optional[str] = None
) -> Tuple[AdvancedTRACEScores, Optional[Dict]]:
"""Evaluate response using GPT labeling approach.
Args:
question: User question
response: LLM response
retrieved_documents: Retrieved context documents
ground_truth: Optional ground truth answer
Returns:
Tuple of (AdvancedTRACEScores, llm_request_info dict)
"""
# Generate labeling prompt and get LLM labels with request info
gpt_result = self._get_gpt_labels(question, response, retrieved_documents)
llm_request_info = {}
if not gpt_result:
# Fallback to rule-based evaluation
scores = self._fallback_evaluation(question, response, retrieved_documents)
return scores, llm_request_info
gpt_labels = gpt_result.get("labels")
llm_request_info = gpt_result.get("llm_request_info", {})
if not gpt_labels:
# Fallback to rule-based evaluation
scores = self._fallback_evaluation(question, response, retrieved_documents)
return scores, llm_request_info
# Compute metrics from GPT labels
context_relevance = self._compute_context_relevance(gpt_labels)
context_utilization = self._compute_context_utilization(gpt_labels)
completeness = self._compute_completeness(gpt_labels, ground_truth)
adherence = self._compute_adherence(gpt_labels)
# Count supported sentences
fully_supported = sum(1 for s in gpt_labels.sentence_support_information
if s.get("fully_supported", False))
partially_supported = sum(1 for s in gpt_labels.sentence_support_information
if not s.get("fully_supported", False) and
s.get("supporting_sentence_keys", []))
unsupported = sum(1 for s in gpt_labels.sentence_support_information
if not s.get("supporting_sentence_keys", []))
scores = AdvancedTRACEScores(
context_relevance=context_relevance,
context_utilization=context_utilization,
completeness=completeness,
adherence=adherence,
overall_supported=gpt_labels.overall_supported,
num_fully_supported_sentences=fully_supported,
num_partially_supported_sentences=partially_supported,
num_unsupported_sentences=unsupported
)
return scores, llm_request_info
def _get_gpt_labels(self, question: str, response: str,
documents: List[str]) -> Optional[Dict]:
"""Get GPT labels using labeling prompt with rate limiting for 30 RPM.
Args:
question: User question
response: LLM response
documents: Retrieved documents
Returns:
Dict containing 'labels' (GPTLabelingOutput) and 'llm_request_info' with complete audit trail
"""
if not self.llm_client:
print("[WARN] No LLM client available - using fallback evaluation")
return None
try:
# Generate prompt
prompt, doc_sentences, resp_sentences = (
GPTLabelingPromptGenerator.generate_labeling_prompt(
question, response, documents
)
)
# Store LLM request info for audit trail
llm_request_info = {
"query": question,
"context_documents": documents,
"llm_response": response,
"labeling_prompt": prompt,
"model": getattr(self.llm_client, 'model_name', 'groq-default'),
"temperature": 0.0,
"max_tokens": 2048
}
# Log rate limiting info before making API call
print(f"\n[EVALUATION] Making GPT labeling API call...")
print(f"[EVALUATION] This respects the 30 RPM rate limit")
# Call LLM to get labels (rate limiting is handled internally)
llm_response = self.llm_client.generate(
prompt=prompt,
max_tokens=2048,
temperature=0.0 # Deterministic for consistent labeling
)
# Store full LLM response in request info
llm_request_info["full_llm_response"] = llm_response
# Log the actual response
print(f"\n[LLM RESPONSE] {llm_response}\n")
# Check if response is empty
if not llm_response or not llm_response.strip():
print(f"[WARN] Empty LLM response received")
return {"labels": None, "llm_request_info": llm_request_info}
# Parse JSON response
try:
# Try to extract JSON from response (in case there's surrounding text)
json_str = llm_response.strip()
# If response contains markdown code blocks, extract the JSON
if "```json" in json_str:
json_str = json_str.split("```json")[1].split("```")[0].strip()
elif "```" in json_str:
json_str = json_str.split("```")[1].split("```")[0].strip()
labels_dict = json.loads(json_str)
gpt_output = GPTLabelingOutput(
relevance_explanation=labels_dict.get("relevance_explanation", ""),
all_relevant_sentence_keys=labels_dict.get("all_relevant_sentence_keys", []),
overall_supported_explanation=labels_dict.get("overall_supported_explanation", ""),
overall_supported=labels_dict.get("overall_supported", False),
sentence_support_information=labels_dict.get("sentence_support_information", []),
all_utilized_sentence_keys=labels_dict.get("all_utilized_sentence_keys", [])
)
return {"labels": gpt_output, "llm_request_info": llm_request_info}
except (json.JSONDecodeError, ValueError, IndexError) as e:
print(f"[WARN] Failed to parse LLM response: {e}")
print(f"[WARN] Raw response: {llm_response[:200]}")
return {"labels": None, "llm_request_info": llm_request_info}
except Exception as e:
print(f"[WARN] Error getting GPT labels: {e}")
return None
def _compute_context_relevance(self, gpt_labels: GPTLabelingOutput) -> float:
"""Compute context relevance metric.
Context Relevance = Number of relevant sentences / Total sentences
"""
if not gpt_labels.all_relevant_sentence_keys:
return 0.0
return min(1.0, len(gpt_labels.all_relevant_sentence_keys) / 20.0) # Normalize
def _compute_context_utilization(self, gpt_labels: GPTLabelingOutput) -> float:
"""Compute context utilization metric.
Context Utilization = Number of utilized sentences / Number of relevant sentences
"""
relevant_count = len(gpt_labels.all_relevant_sentence_keys)
utilized_count = len(gpt_labels.all_utilized_sentence_keys)
if relevant_count == 0:
return 0.0
return min(1.0, utilized_count / relevant_count)
def _compute_completeness(self, gpt_labels: GPTLabelingOutput,
ground_truth: Optional[str] = None) -> float:
"""Compute completeness metric.
Completeness = Relevant sentences used / All relevant sentences
"""
relevant_set = set(gpt_labels.all_relevant_sentence_keys)
utilized_set = set(gpt_labels.all_utilized_sentence_keys)
intersection = len(relevant_set & utilized_set)
if len(relevant_set) == 0:
return 1.0 if len(utilized_set) == 0 else 0.0
return intersection / len(relevant_set)
def _compute_adherence(self, gpt_labels: GPTLabelingOutput) -> float:
"""Compute adherence metric (Boolean: 1.0 = fully grounded, 0.0 = contains hallucination).
Per RAGBench paper: Adherence is whether ALL response sentences are fully supported by context.
If even ONE sentence is not fully supported, adherence = 0.0
"""
total_sentences = len(gpt_labels.sentence_support_information)
if total_sentences == 0:
return 1.0
# Check if ALL sentences are fully supported
fully_supported_count = sum(
1 for s in gpt_labels.sentence_support_information
if s.get("fully_supported", False)
)
# Boolean: 1.0 if all sentences are fully supported, 0.0 if any sentence is not fully supported
return 1.0 if fully_supported_count == total_sentences else 0.0
def _fallback_evaluation(self, question: str, response: str,
documents: List[str]) -> AdvancedTRACEScores:
"""Fallback rule-based evaluation when LLM unavailable."""
# Simple heuristics when LLM not available
response_words = set(response.lower().split())
doc_words = set()
for doc in documents:
doc_words.update(doc.lower().split())
overlap = len(response_words & doc_words) / max(len(response_words), 1)
return AdvancedTRACEScores(
context_relevance=overlap,
context_utilization=overlap,
completeness=overlap,
adherence=overlap,
overall_supported=overlap > 0.5,
num_fully_supported_sentences=0,
num_partially_supported_sentences=0,
num_unsupported_sentences=0
)
def evaluate_batch(self, test_cases: List[Dict], checkpoint_file: str = None,
resume: bool = True) -> Dict:
"""Evaluate multiple test cases with checkpoint support.
Args:
test_cases: List of test cases with question, response, etc.
checkpoint_file: Optional file to save/resume progress
resume: Whether to resume from checkpoint if exists
Returns:
Dictionary with aggregated scores and detailed results
"""
all_scores = []
detailed_results = []
start_index = 0
# Try to resume from checkpoint
if checkpoint_file and resume:
try:
import os
if os.path.exists(checkpoint_file):
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
detailed_results = checkpoint_data.get('detailed_results', [])
start_index = len(detailed_results)
print(f"[CHECKPOINT] Resuming from checkpoint at sample {start_index}/{len(test_cases)}")
except Exception as e:
print(f"[CHECKPOINT] Could not load checkpoint: {e}")
for i, test_case in enumerate(test_cases):
# Skip already processed samples
if i < start_index:
continue
print(f"Evaluating test case {i+1}/{len(test_cases)}")
question = test_case.get("query", "")
response = test_case.get("response", "")
documents = test_case.get("retrieved_documents", [])
ground_truth = response # Use response as ground truth reference answer
ground_truth_scores = test_case.get("ground_truth_scores", {}) # Extract RAGBench ground truth scores
# evaluate now returns (scores, llm_request_info)
scores, llm_request_info = self.evaluate(question, response, documents, ground_truth)
all_scores.append(scores)
# Store detailed results with ground truth for RMSE/AUCROC computation
result_dict = {
"query_id": i + 1,
"question": question,
"prompt": llm_request_info.get("labeling_prompt", "") if llm_request_info else "",
"llm_response": llm_request_info.get("full_llm_response", "") if llm_request_info else "",
"metrics": scores.to_dict(),
"ground_truth_scores": ground_truth_scores # Include RAGBench ground truth for RMSE/AUCROC
}
detailed_results.append(result_dict)
# Save checkpoint after each evaluation (for resume capability)
if checkpoint_file and (i + 1) % 5 == 0: # Save every 5 samples
try:
checkpoint_data = {
'detailed_results': detailed_results,
'last_index': i + 1,
'total_samples': len(test_cases)
}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint_data, f, default=str)
print(f"[CHECKPOINT] Saved progress at sample {i + 1}/{len(test_cases)}")
except Exception as e:
print(f"[CHECKPOINT] Failed to save: {e}")
# Aggregate scores - convert dictionary results to proper format
scores_dicts = [s.to_dict() for s in all_scores]
# Extract metric values safely from dictionaries
context_relevance_vals = [s.get("context_relevance", 0) for s in scores_dicts]
context_utilization_vals = [s.get("context_utilization", 0) for s in scores_dicts]
completeness_vals = [s.get("completeness", 0) for s in scores_dicts]
adherence_vals = [s.get("adherence", 0) for s in scores_dicts]
average_vals = [s.get("average", 0) for s in scores_dicts]
results = {
"context_relevance": float(np.mean(context_relevance_vals)) if context_relevance_vals else 0.0,
"context_utilization": float(np.mean(context_utilization_vals)) if context_utilization_vals else 0.0,
"completeness": float(np.mean(completeness_vals)) if completeness_vals else 0.0,
"adherence": float(np.mean(adherence_vals)) if adherence_vals else 0.0,
"average": float(np.mean(average_vals)) if average_vals else 0.0,
"num_samples": len(test_cases),
"detailed_results": detailed_results,
"evaluation_config": {
"chunking_strategy": self.chunking_strategy,
"embedding_model": self.embedding_model,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"evaluation_method": "gpt_labeling_prompts"
}
}
# Compute RMSE aggregation and per-metric statistics
rmse_metrics = RMSECalculator.compute_rmse_aggregation_for_batch(detailed_results)
per_metric_stats = AUCROCCalculator.compute_per_metric_statistics(detailed_results)
if rmse_metrics:
results["rmse_metrics"] = rmse_metrics
if per_metric_stats:
results["per_metric_statistics"] = per_metric_stats
# Compute RMSE against RAGBench ground truth (per RAGBench paper requirement)
# This compares predicted scores vs original scores in the dataset
rmse_vs_ground_truth = RMSECalculator.compute_trace_rmse_aggregation(detailed_results)
if rmse_vs_ground_truth and rmse_vs_ground_truth.get("per_metric_rmse"):
results["rmse_vs_ground_truth"] = rmse_vs_ground_truth
# Compute AUCROC against RAGBench ground truth (per RAGBench paper requirement)
aucroc_vs_ground_truth = AUCROCCalculator.compute_auc_all_metrics(detailed_results)
if aucroc_vs_ground_truth:
results["aucroc_vs_ground_truth"] = aucroc_vs_ground_truth
# Compute F1 Score for adherence aggregation
adherence_f1_scores = F1ScoreCalculator.compute_adherence_f1(detailed_results)
if adherence_f1_scores:
results["adherence_f1_scores"] = adherence_f1_scores
# Compute F1 Scores for all metrics
f1_all_metrics = F1ScoreCalculator.compute_f1_all_metrics(detailed_results)
if f1_all_metrics:
results["f1_scores"] = f1_all_metrics
return results