CapstoneRAG10 / docs /IMPLEMENTATION_GUIDE_RMSE_AUCROC.md
PavaniYerra's picture
Clone
9bc547e

Implementation Guide - Missing RMSE and AUCROC Metrics

Purpose: Fix critical gaps in RAGBench compliance Priority: CRITICAL Estimated Time: 2-3 hours Status: Ready for implementation


Overview

This guide provides step-by-step implementation of the three missing components:

  1. Ground Truth Score Extraction from RAGBench dataset
  2. RMSE Metric Computation (Root Mean Squared Error)
  3. AUCROC Metric Computation (Area Under ROC Curve)

Step 1: Extract Ground Truth Scores from Dataset

1.1 Inspect RAGBench Dataset Structure

First, understand what scores are available in the dataset:

# Quick test script to inspect dataset
from datasets import load_dataset

dataset = load_dataset("rungalileo/ragbench", "covidqa", split="test")

# Look at first sample
sample = dataset[0]
print("Available keys:", sample.keys())
print("\nSample structure:")
for key, value in sample.items():
    if not isinstance(value, list):
        print(f"{key}: {value}")

Expected Keys in RAGBench:

  • question
  • documents
  • answer
  • context_relevance (ground truth)
  • context_utilization (ground truth)
  • completeness (ground truth)
  • adherence (ground truth)

1.2 Modify dataset_loader.py

Location: dataset_loader.py, lines 79-110

Current Code:

def _process_ragbench_item(self, item: Dict, dataset_name: str) -> Dict:
    processed = {
        "question": item.get("question", ""),
        "answer": item.get("answer", ""),
        "context": "",
        "documents": [],
        "dataset": dataset_name
    }
    # ... rest of code

Updated Code (ADD THIS):

def _process_ragbench_item(self, item: Dict, dataset_name: str) -> Dict:
    processed = {
        "question": item.get("question", ""),
        "answer": item.get("answer", ""),
        "context": "",
        "documents": [],
        "dataset": dataset_name,
        # NEW: Extract ground truth evaluation scores from RAGBench
        "ground_truth_scores": {
            "context_relevance": item.get("context_relevance", None),
            "context_utilization": item.get("context_utilization", None),
            "completeness": item.get("completeness", None),
            "adherence": item.get("adherence", None),
            # Backup: some datasets may use different names
            "relevance": item.get("relevance", None),
            "utilization": item.get("utilization", None),
        },
        # NEW: Store whether response is supported (binary label for AUCROC)
        "overall_supported": item.get("overall_supported", None),
    }
    
    # ... rest of existing code ...
    return processed

1.3 Validation

Add validation to ensure scores are extracted:

# Add at end of load_dataset() method
def load_dataset(self, dataset_name: str, split: str = "test", 
                 max_samples: Optional[int] = None) -> List[Dict]:
    # ... existing code ...
    
    # NEW: Validate that ground truth scores were extracted
    if processed_data:
        sample = processed_data[0]
        if sample.get("ground_truth_scores"):
            gt_scores = sample["ground_truth_scores"]
            if any(gt_scores.values()):
                print(f"[OK] Ground truth scores found: {[k for k, v in gt_scores.items() if v is not None]}")
            else:
                print(f"[WARN] Ground truth scores empty - may not be in dataset")
        else:
            print(f"[WARN] No ground_truth_scores key - check dataset structure")
    
    return processed_data

Step 2: Implement RMSE Metric Computation

2.1 Add RMSE Methods to advanced_rag_evaluator.py

Location: Add after line 440 in advanced_rag_evaluator.py

Code to Add:

from sklearn.metrics import mean_squared_error
import numpy as np
from typing import Tuple


class RMSECalculator:
    """Compute RMSE for evaluation metrics."""
    
    @staticmethod
    def compute_rmse_for_metric(
        predicted_values: List[float],
        ground_truth_values: List[float],
        metric_name: str = "metric"
    ) -> Tuple[float, Dict]:
        """
        Compute RMSE for a single metric.
        
        Args:
            predicted_values: Predicted metric values (0-1)
            ground_truth_values: Ground truth metric values (0-1)
            metric_name: Name of metric for logging
            
        Returns:
            Tuple of (rmse_value, stats_dict)
        """
        # Filter out None values
        valid_indices = [
            i for i, (p, g) in enumerate(zip(predicted_values, ground_truth_values))
            if p is not None and g is not None
        ]
        
        if not valid_indices:
            return 0.0, {"error": "No valid predictions"}
        
        valid_predicted = [predicted_values[i] for i in valid_indices]
        valid_ground_truth = [ground_truth_values[i] for i in valid_indices]
        
        # Compute MSE and RMSE
        mse = mean_squared_error(valid_ground_truth, valid_predicted)
        rmse = np.sqrt(mse)
        
        # Compute additional statistics
        mean_abs_error = np.mean(np.abs(np.array(valid_predicted) - np.array(valid_ground_truth)))
        max_error = np.max(np.abs(np.array(valid_predicted) - np.array(valid_ground_truth)))
        
        stats = {
            "rmse": float(rmse),
            "mse": float(mse),
            "mae": float(mean_abs_error),
            "max_error": float(max_error),
            "n_samples": len(valid_indices),
            "metric_name": metric_name
        }
        
        return rmse, stats
    
    @staticmethod
    def compute_rmse_all_metrics(
        predicted_scores_list: List[Dict],
        ground_truth_scores_list: List[Dict]
    ) -> Dict:
        """
        Compute RMSE for all evaluation metrics.
        
        Args:
            predicted_scores_list: List of predicted score dicts
                [{"context_relevance": 0.8, "context_utilization": 0.75, ...}, ...]
            ground_truth_scores_list: List of ground truth score dicts
                [{"context_relevance": 0.85, "context_utilization": 0.7, ...}, ...]
                
        Returns:
            Dictionary with RMSE results
        """
        metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
        rmse_results = {}
        
        print("\nComputing RMSE for all metrics...")
        print("-" * 50)
        
        for metric in metrics:
            # Extract metric values
            predicted = [
                s.get(metric) if isinstance(s, dict) else getattr(s, metric, None)
                for s in predicted_scores_list
            ]
            ground_truth = [
                s.get(metric) if s else None
                for s in ground_truth_scores_list
            ]
            
            # Compute RMSE
            rmse, stats = RMSECalculator.compute_rmse_for_metric(
                predicted, ground_truth, metric
            )
            
            rmse_results[metric] = stats
            print(f"{metric:25s}: RMSE = {rmse:.4f}, MAE = {stats['mae']:.4f}, "
                  f"N = {stats['n_samples']}")
        
        # Compute average RMSE across all metrics
        valid_rmses = [v["rmse"] for v in rmse_results.values() if "rmse" in v]
        rmse_results["average_rmse"] = float(np.mean(valid_rmses)) if valid_rmses else 0.0
        
        print("-" * 50)
        print(f"Average RMSE across metrics: {rmse_results['average_rmse']:.4f}")
        
        return rmse_results

2.2 Integration with Evaluation Pipeline

Modify: evaluation_pipeline.py to call RMSE computation

Add to UnifiedEvaluationPipeline.evaluate_batch():

def evaluate_batch(self, test_cases: List[Dict], method: Literal["trace", "gpt_labeling", "hybrid"] = "trace") -> Dict:
    """Evaluate multiple test cases (with RMSE computation)."""
    
    all_scores = []
    detailed_results = []
    ground_truth_scores_list = []  # NEW
    
    for i, test_case in enumerate(test_cases):
        # ... existing code ...
        
        # NEW: Collect ground truth scores
        if test_case.get("ground_truth_scores"):
            ground_truth_scores_list.append(test_case["ground_truth_scores"])
        
        # ... rest of loop ...
    
    # NEW: Compute RMSE if ground truth available
    rmse_results = None
    if ground_truth_scores_list and len(ground_truth_scores_list) == len(all_scores):
        from advanced_rag_evaluator import RMSECalculator
        rmse_results = RMSECalculator.compute_rmse_all_metrics(
            [s.to_dict() if hasattr(s, 'to_dict') else s for s in all_scores],
            ground_truth_scores_list
        )
    
    results = {
        # ... existing fields ...
        "rmse_metrics": rmse_results,  # NEW
    }
    
    return results

Step 3: Implement AUCROC Metric Computation

3.1 Add AUCROC Methods to advanced_rag_evaluator.py

Location: Add after RMSE section in advanced_rag_evaluator.py

Code to Add:

from sklearn.metrics import roc_auc_score, roc_curve
from sklearn.preprocessing import binarize


class AUCROCCalculator:
    """Compute AUCROC for evaluation metrics."""
    
    @staticmethod
    def compute_auc_for_metric(
        predictions: List[float],
        labels: List[int],
        metric_name: str = "metric"
    ) -> Tuple[float, Dict]:
        """
        Compute AUC-ROC for a single metric.
        
        Args:
            predictions: Predicted scores (0-1)
            labels: Binary labels (0 or 1)
            metric_name: Name of metric
            
        Returns:
            Tuple of (auc_score, stats_dict)
        """
        # Filter out None values
        valid_indices = [
            i for i, (p, l) in enumerate(zip(predictions, labels))
            if p is not None and l is not None
        ]
        
        if not valid_indices or len(set([labels[i] for i in valid_indices])) < 2:
            return 0.0, {"error": "Insufficient samples or no class variance"}
        
        valid_predictions = [predictions[i] for i in valid_indices]
        valid_labels = [labels[i] for i in valid_indices]
        
        try:
            # Compute AUC-ROC
            auc = roc_auc_score(valid_labels, valid_predictions)
            
            # Compute ROC curve for plotting
            fpr, tpr, thresholds = roc_curve(valid_labels, valid_predictions)
            
            stats = {
                "auc": float(auc),
                "fpr": fpr.tolist(),
                "tpr": tpr.tolist(),
                "thresholds": thresholds.tolist(),
                "n_samples": len(valid_indices),
                "n_positive": sum(valid_labels),
                "n_negative": len(valid_labels) - sum(valid_labels),
                "metric_name": metric_name
            }
            
            return auc, stats
        
        except Exception as e:
            return 0.0, {"error": str(e)}
    
    @staticmethod
    def binary_labels_from_ground_truth(
        ground_truth_scores: List[Dict],
        threshold: float = 0.5
    ) -> List[int]:
        """
        Generate binary labels from ground truth scores.
        
        Binary classification task:
        - Positive (1): Response is well-supported (adherence or overall_supported)
        - Negative (0): Response is not well-supported
        
        Args:
            ground_truth_scores: List of ground truth score dictionaries
            threshold: Threshold for converting scores to labels
            
        Returns:
            List of binary labels (0 or 1)
        """
        labels = []
        
        for gt_dict in ground_truth_scores:
            # Try different possible fields
            if gt_dict is None:
                labels.append(None)
                continue
            
            # Check for binary label first
            if "overall_supported" in gt_dict and gt_dict["overall_supported"] is not None:
                labels.append(1 if gt_dict["overall_supported"] else 0)
            # Use adherence as proxy for support
            elif "adherence" in gt_dict and gt_dict["adherence"] is not None:
                labels.append(1 if gt_dict["adherence"] >= threshold else 0)
            else:
                labels.append(None)
        
        return labels
    
    @staticmethod
    def compute_auc_all_metrics(
        predicted_scores_list: List[Dict],
        ground_truth_scores_list: List[Dict]
    ) -> Dict:
        """
        Compute AUC-ROC for all evaluation metrics.
        
        Args:
            predicted_scores_list: List of predicted score dicts
            ground_truth_scores_list: List of ground truth score dicts
            
        Returns:
            Dictionary with AUC-ROC results
        """
        metrics = ["context_relevance", "context_utilization", "completeness", "adherence"]
        auc_results = {}
        
        print("\nComputing AUC-ROC for all metrics...")
        print("-" * 50)
        
        # Generate binary labels from ground truth
        binary_labels = AUCROCCalculator.binary_labels_from_ground_truth(ground_truth_scores_list)
        
        # Filter out None labels
        valid_label_indices = [i for i, l in enumerate(binary_labels) if l is not None]
        
        if not valid_label_indices:
            print("[WARN] No valid labels for AUC-ROC computation")
            return {"error": "No valid labels"}
        
        valid_labels = [binary_labels[i] for i in valid_label_indices]
        
        for metric in metrics:
            # Extract predictions
            predictions = [
                s.get(metric) if isinstance(s, dict) else getattr(s, metric, None)
                for s in predicted_scores_list
            ]
            
            # Filter to valid indices
            valid_predictions = [predictions[i] for i in valid_label_indices]
            
            # Compute AUC
            auc, stats = AUCROCCalculator.compute_auc_for_metric(
                valid_predictions, valid_labels, metric
            )
            
            auc_results[metric] = stats
            
            if "error" not in stats:
                print(f"{metric:25s}: AUC = {auc:.4f}, N = {stats['n_samples']}, "
                      f"Pos = {stats['n_positive']}, Neg = {stats['n_negative']}")
            else:
                print(f"{metric:25s}: {stats['error']}")
        
        # Compute average AUC
        valid_aucs = [v["auc"] for v in auc_results.values() if "auc" in v]
        auc_results["average_auc"] = float(np.mean(valid_aucs)) if valid_aucs else 0.0
        
        print("-" * 50)
        print(f"Average AUC-ROC across metrics: {auc_results['average_auc']:.4f}")
        
        return auc_results

3.2 Integration with Evaluation Pipeline

Modify: evaluation_pipeline.py to call AUCROC computation

Add to UnifiedEvaluationPipeline.evaluate_batch():

def evaluate_batch(self, test_cases: List[Dict], method: Literal["trace", "gpt_labeling", "hybrid"] = "trace") -> Dict:
    """Evaluate multiple test cases (with AUCROC computation)."""
    
    # ... existing code ...
    
    # NEW: Compute AUCROC if ground truth available
    auc_results = None
    if ground_truth_scores_list and len(ground_truth_scores_list) == len(all_scores):
        from advanced_rag_evaluator import AUCROCCalculator
        auc_results = AUCROCCalculator.compute_auc_all_metrics(
            [s.to_dict() if hasattr(s, 'to_dict') else s for s in all_scores],
            ground_truth_scores_list
        )
    
    results = {
        # ... existing fields ...
        "rmse_metrics": rmse_results,
        "auc_metrics": auc_results,  # NEW
    }
    
    return results

Step 4: Display Results in Streamlit UI

4.1 Modify streamlit_app.py

Add to evaluation_interface() after displaying TRACE results:

def evaluation_interface():
    # ... existing code ...
    
    if st.session_state.evaluation_results:
        results = st.session_state.evaluation_results
        
        st.success("✅ Evaluation Complete!")
        
        # ... existing metric display ...
        
        # NEW: Display RMSE metrics
        if results.get("rmse_metrics"):
            st.markdown("### 📊 RMSE Metrics (vs Ground Truth)")
            
            rmse_data = results["rmse_metrics"]
            if "error" not in rmse_data:
                rmse_df = pd.DataFrame([
                    {
                        "Metric": k,
                        "RMSE": v.get("rmse", 0),
                        "MAE": v.get("mae", 0),
                        "Max Error": v.get("max_error", 0),
                        "Samples": v.get("n_samples", 0)
                    }
                    for k, v in rmse_data.items()
                    if k != "average_rmse" and isinstance(v, dict)
                ])
                
                col1, col2 = st.columns(2)
                with col1:
                    st.dataframe(rmse_df, use_container_width=True)
                with col2:
                    st.metric("Average RMSE", f"{rmse_data.get('average_rmse', 0):.4f}")
        
        # NEW: Display AUC-ROC metrics
        if results.get("auc_metrics"):
            st.markdown("### 📈 AUC-ROC Metrics")
            
            auc_data = results["auc_metrics"]
            if "error" not in auc_data:
                auc_df = pd.DataFrame([
                    {
                        "Metric": k,
                        "AUC": v.get("auc", 0),
                        "Samples": v.get("n_samples", 0),
                        "Pos": v.get("n_positive", 0),
                        "Neg": v.get("n_negative", 0)
                    }
                    for k, v in auc_data.items()
                    if k != "average_auc" and isinstance(v, dict)
                ])
                
                col1, col2 = st.columns(2)
                with col1:
                    st.dataframe(auc_df, use_container_width=True)
                with col2:
                    st.metric("Average AUC-ROC", f"{auc_data.get('average_auc', 0):.4f}")

Step 5: Testing and Validation

5.1 Unit Tests

Create test_rmse_aucroc.py:

"""Test RMSE and AUCROC computation."""

import pytest
import numpy as np
from advanced_rag_evaluator import RMSECalculator, AUCROCCalculator


def test_rmse_computation():
    """Test RMSE calculation."""
    predicted = [0.8, 0.7, 0.9, 0.6]
    ground_truth = [0.75, 0.8, 0.85, 0.65]
    
    rmse, stats = RMSECalculator.compute_rmse_for_metric(
        predicted, ground_truth, "test_metric"
    )
    
    assert rmse >= 0, "RMSE should be non-negative"
    assert stats["n_samples"] == 4, "Should have 4 samples"
    assert 0.05 < rmse < 0.15, f"RMSE should be ~0.07, got {rmse}"
    
    print(f"✅ RMSE test passed: {rmse:.4f}")


def test_auc_computation():
    """Test AUC-ROC calculation."""
    predictions = [0.1, 0.2, 0.8, 0.9]
    labels = [0, 0, 1, 1]
    
    auc, stats = AUCROCCalculator.compute_auc_for_metric(
        predictions, labels, "test_metric"
    )
    
    assert 0 <= auc <= 1, "AUC should be in [0, 1]"
    assert auc == 1.0, "Perfect predictions should have AUC=1.0"
    assert stats["n_samples"] == 4, "Should have 4 samples"
    
    print(f"✅ AUC test passed: {auc:.4f}")


if __name__ == "__main__":
    test_rmse_computation()
    test_auc_computation()
    print("\n✅ All tests passed!")

5.2 Integration Test

"""Test end-to-end RMSE/AUCROC computation."""

from evaluation_pipeline import UnifiedEvaluationPipeline

# Test data
test_cases = [
    {
        "query": "What is machine learning?",
        "response": "Machine learning is...",
        "retrieved_documents": ["Doc 1", "Doc 2"],
        "ground_truth_scores": {
            "context_relevance": 0.8,
            "context_utilization": 0.75,
            "completeness": 0.82,
            "adherence": 0.9
        }
    },
    # ... more test cases ...
]

pipeline = UnifiedEvaluationPipeline(llm_client=None)
results = pipeline.evaluate_batch(test_cases, method="trace")

print("RMSE Results:")
print(results.get("rmse_metrics"))

print("\nAUC-ROC Results:")
print(results.get("auc_metrics"))

Checklist for Implementation

  • Extract ground truth scores in dataset_loader.py
  • Test ground truth extraction with sample dataset
  • Implement RMSECalculator class
  • Implement AUCROCCalculator class
  • Integrate RMSE into evaluation pipeline
  • Integrate AUCROC into evaluation pipeline
  • Add RMSE display in Streamlit
  • Add AUCROC display in Streamlit
  • Write unit tests
  • Run integration tests
  • Verify output format matches RAGBench paper
  • Update documentation

Expected Output

After implementation, evaluation results should include:

{
  "context_relevance": 0.82,
  "context_utilization": 0.75,
  "completeness": 0.79,
  "adherence": 0.88,
  "average": 0.81,
  "rmse_metrics": {
    "context_relevance": {
      "rmse": 0.0456,
      "mae": 0.0382,
      "max_error": 0.1234,
      "n_samples": 10
    },
    "context_utilization": {...},
    "completeness": {...},
    "adherence": {...},
    "average_rmse": 0.0512
  },
  "auc_metrics": {
    "context_relevance": {
      "auc": 0.92,
      "n_samples": 10,
      "n_positive": 5,
      "n_negative": 5
    },
    "context_utilization": {...},
    "completeness": {...},
    "adherence": {...},
    "average_auc": 0.89
  }
}

References

  • sklearn.metrics.mean_squared_error
  • sklearn.metrics.roc_auc_score
  • RAGBench Paper Section 4.3
  • RMSE Formula: sqrt(1/n * Σ(predicted - actual)²)
  • AUC-ROC: Area under the ROC curve for binary classification