Nexuss-Transformer / Tutorials /07-validation-testing.md
Nexuss0781's picture
Upload data/train-00000-of-00001.parquet with huggingface_hub
7cb972e

Tutorial 07: Model Validation, Testing & Quality Assurance

Overview

This tutorial covers comprehensive model validation, testing strategies, and quality assurance processes essential for production-ready AI systems. We'll explore NTF's unified metrics utilities, statistical validation methods, bias detection, robustness testing, and systematic evaluation frameworks.

Table of Contents

  1. Validation Fundamentals
  2. NTF Metrics Utilities
  3. Train/Validation/Test Splits
  4. Cross-Validation Techniques
  5. Statistical Significance Testing
  6. Bias and Fairness Detection
  7. Robustness Testing
  8. Adversarial Testing
  9. Domain Shift Detection
  10. Calibration and Confidence Estimation
  11. A/B Testing Framework
  12. Regression Testing for Models
  13. Quality Gates and Release Criteria

Validation Fundamentals

Why Validation Matters

Validation ensures your model:

  • Generalizes to unseen data
  • Doesn't overfit training distributions
  • Meets performance requirements
  • Behaves safely across edge cases
  • Maintains consistency across versions

Validation Pyramid

                    Production Monitoring
                           /\
                          /  \
                         /    \
                        /------\
                       / A/B    \
                      / Testing  \
                     /------------\
                    / Holdout      \
                   /   Testing      \
                  /------------------\
                 / Cross-Validation   \
                /----------------------\
               /  Train/Val Split      \
              /--------------------------\

Key Principles:

  1. Data Isolation: Never leak test data into training
  2. Distribution Matching: Test data should match production distribution
  3. Statistical Power: Ensure sufficient sample sizes
  4. Multiple Metrics: Evaluate across diverse dimensions
  5. Reproducibility: Fixed seeds and documented procedures

NTF Metrics Utilities

Using NTF's Unified Evaluation Interface

NTF provides comprehensive metrics utilities through ntf.utils.metrics. This replaces manual metric implementations with a unified, efficient interface.

from ntf.utils.metrics import (
    compute_perplexity,
    compute_accuracy,
    evaluate_model,
    compare_models,
    benchmark_throughput,
    EvaluationResults
)
from torch.utils.data import DataLoader
import torch

# Load your model and tokenizer
model, tokenizer = load_model_and_tokenizer("path/to/model")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Prepare test dataloader
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Comprehensive evaluation
results = evaluate_model(
    model=model,
    dataloader=test_dataloader,
    device=device,
    compute_generation_metrics=True,
    tokenizer=tokenizer
)

print(f"Perplexity: {results.perplexity:.2f}")
print(f"Loss: {results.loss:.4f}")
print(f"Token Accuracy: {results.token_accuracy:.4f}")
print(f"BLEU Score: {results.bleu_score:.4f}")
print(f"ROUGE-L: {results.rouge_l:.4f}")

Individual Metric Functions

For specific metric computation, NTF provides standalone functions:

# Compute perplexity only
perplexity = compute_perplexity(model, test_dataloader, device)
print(f"Perplexity: {perplexity:.2f}")

# Compute accuracy only
accuracy = compute_accuracy(model, test_dataloader, device)
print(f"Accuracy: {accuracy:.4f}")

Comparing Multiple Checkpoints

NTF makes it easy to compare different model checkpoints:

from ntf.utils.metrics import compare_models

# Compare two model versions
comparison = compare_models(
    model_a=model_v1,
    model_b=model_v2,
    dataloader=val_dataloader,
    device=device
)

print(f"Model A Perplexity: {comparison['model_a']['perplexity']:.2f}")
print(f"Model B Perplexity: {comparison['model_b']['perplexity']:.2f}")
print(f"Improvement: {comparison['improvement']['perplexity']:.2f}%")
print(f"Accuracy Gain: {comparison['improvement']['accuracy']:.2f}%")

Benchmarking Throughput

For production deployment, benchmark model throughput:

throughput_results = benchmark_throughput(
    model=model,
    tokenizer=tokenizer,
    device=device,
    sequence_length=512,
    batch_size=1,
    num_iterations=10
)

print(f"Prefill Throughput: {throughput_results['prefill_throughput']:.2f} tokens/sec")
print(f"Decode Throughput: {throughput_results['decode_throughput']:.2f} tokens/sec")

Metric Selection Guide

Different tasks require different evaluation metrics. Use this guide to select appropriate metrics:

Task Type Recommended Metrics NTF Functions
Text Generation Perplexity, BLEU, ROUGE, BERTScore evaluate_model(compute_generation_metrics=True)
Classification Accuracy, F1, Precision, Recall compute_accuracy() + custom F1
Summarization ROUGE, BERTScore evaluate_model() with ROUGE
Translation BLEU, chrF, COMET evaluate_model() with BLEU
Question Answering Exact Match, F1 Custom implementation
Language Modeling Perplexity compute_perplexity()

Checkpoint Comparison Workflow

Here's a complete workflow for comparing multiple checkpoints during development:

from ntf.utils.metrics import evaluate_model
from pathlib import Path
import json

def compare_checkpoints(checkpoint_paths, eval_dataset, tokenizer, device):
    """Compare multiple checkpoints on the same evaluation dataset."""
    
    from torch.utils.data import DataLoader
    eval_dataloader = DataLoader(eval_dataset, batch_size=32, shuffle=False)
    
    results = {}
    
    for checkpoint_path in checkpoint_paths:
        print(f"\nEvaluating {checkpoint_path}...")
        
        # Load checkpoint
        model, _ = load_model_and_tokenizer(checkpoint_path)
        model.to(device)
        model.eval()
        
        # Evaluate
        eval_results = evaluate_model(
            model=model,
            dataloader=eval_dataloader,
            device=device,
            compute_generation_metrics=True,
            tokenizer=tokenizer
        )
        
        # Store results
        results[checkpoint_path] = {
            'perplexity': eval_results.perplexity,
            'loss': eval_results.loss,
            'accuracy': eval_results.accuracy,
            'bleu': eval_results.bleu_score,
            'rouge_l': eval_results.rouge_l
        }
        
        print(f"  Perplexity: {eval_results.perplexity:.2f}")
        print(f"  Accuracy: {eval_results.accuracy:.4f}")
    
    # Find best checkpoint
    best_checkpoint = min(results.keys(), key=lambda k: results[k]['perplexity'])
    print(f"\nBest checkpoint (lowest perplexity): {best_checkpoint}")
    
    return results

# Usage
checkpoints = [
    "./checkpoints/step_1000",
    "./checkpoints/step_2000",
    "./checkpoints/step_3000",
    "./checkpoints/final"
]

all_results = compare_checkpoints(
    checkpoints, 
    val_dataset, 
    tokenizer, 
    device
)

# Save results for tracking
with open("./evaluation_results.json", "w") as f:
    json.dump(all_results, f, indent=2)

Train/Validation/Test Splits

Strategic Data Partitioning

from sklearn.model_selection import train_test_split
import numpy as np

def strategic_data_split(data, labels, strategy='stratified'):
    """
    Create train/val/test splits with proper stratification.
    
    Args:
        data: Input samples
        labels: Corresponding labels
        strategy: 'stratified', 'temporal', 'grouped'
    
    Returns:
        train, val, test splits
    """
    if strategy == 'stratified':
        # First split: train+val vs test (80/20)
        X_train_val, X_test, y_train_val, y_test = train_test_split(
            data, labels, 
            test_size=0.2, 
            stratify=labels,  # Maintain class distribution
            random_state=42
        )
        
        # Second split: train vs val (80/20 of remaining = 64/16 total)
        X_train, X_val, y_train, y_val = train_test_split(
            X_train_val, y_train_val,
            test_size=0.2,
            stratify=y_train_val,
            random_state=42
        )
        
    elif strategy == 'temporal':
        # Time-based split for temporal data
        split_point_1 = int(len(data) * 0.6)
        split_point_2 = int(len(data) * 0.8)
        
        X_train = data[:split_point_1]
        X_val = data[split_point_1:split_point_2]
        X_test = data[split_point_2:]
        
        y_train = labels[:split_point_1]
        y_val = labels[split_point_1:split_point_2]
        y_test = labels[split_point_2:]
        
    elif strategy == 'grouped':
        # Group-based split (e.g., by user, document, session)
        from sklearn.model_selection import GroupShuffleSplit
        
        groups = get_groups(data)  # Your grouping logic
        
        gss = GroupShuffleSplit(
            n_splits=1, 
            test_size=0.2, 
            random_state=42
        )
        train_val_idx, test_idx = next(gss.split(data, groups=groups))
        
        X_train_val, X_test = data[train_val_idx], data[test_idx]
        y_train_val, y_test = labels[train_val_idx], labels[test_idx]
        
        # Split train_val further
        gss2 = GroupShuffleSplit(
            n_splits=1, 
            test_size=0.2, 
            random_state=42
        )
        train_idx, val_idx = next(gss2.split(
            X_train_val, 
            groups=groups[train_val_idx]
        ))
        
        X_train, X_val = X_train_val[train_idx], X_train_val[val_idx]
        y_train, y_val = y_train_val[train_idx], y_train_val[val_idx]
    
    return {
        'train': (X_train, y_train),
        'val': (X_val, y_val),
        'test': (X_test, y_test)
    }

# Usage example
splits = strategic_data_split(texts, labels, strategy='stratified')
print(f"Train: {len(splits['train'][0])}, Val: {len(splits['val'][0])}, Test: {len(splits['test'][0])}")

Split Ratio Guidelines

Dataset Size Train Val Test Rationale
< 10K 70% 15% 15% Need more validation signal
10K-100K 80% 10% 10% Balanced approach
100K-1M 90% 5% 5% Large data, less validation needed
> 1M 95% 2.5% 2.5% Massive data, small holdouts sufficient

Common Splitting Mistakes

❌ Data Leakage:

# WRONG: Preprocessing before split
scaler.fit(data)  # Fits on ALL data including test!
data_scaled = scaler.transform(data)
# Then split...

# CORRECT: Split first
X_train, X_test = train_test_split(data, test_size=0.2)
scaler.fit(X_train)  # Fit only on train
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)  # Transform test with train stats

❌ Temporal Leakage:

# WRONG: Random shuffle on time-series
random.shuffle(time_series_data)

# CORRECT: Respect temporal order
cutoff = int(len(data) * 0.8)
train = data[:cutoff]
test = data[cutoff:]

Cross-Validation Techniques

K-Fold Cross-Validation

from sklearn.model_selection import KFold, StratifiedKFold
import torch
from torch.utils.data import DataLoader

def k_fold_cross_validation(model_class, config, data, labels, k=5):
    """
    Perform k-fold cross-validation for robust performance estimation.
    
    Returns metrics for each fold and aggregate statistics.
    """
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    
    fold_metrics = []
    
    for fold, (train_idx, val_idx) in enumerate(skf.split(data, labels)):
        print(f"\n{'='*50}")
        print(f"FOLD {fold + 1}/{k}")
        print(f"{'='*50}")
        
        # Prepare fold data
        X_train, X_val = data[train_idx], data[val_idx]
        y_train, y_val = labels[train_idx], labels[val_idx]
        
        # Initialize fresh model for this fold
        model = model_class(config)
        
        # Train on this fold
        trainer = Trainer(
            model=model,
            train_data=(X_train, y_train),
            val_data=(X_val, y_val),
            config=config
        )
        
        trainer.train()
        
        # Evaluate
        metrics = trainer.evaluate()
        fold_metrics.append(metrics)
        
        print(f"Fold {fold + 1} Accuracy: {metrics['accuracy']:.4f}")
        print(f"Fold {fold + 1} F1: {metrics['f1']:.4f}")
    
    # Aggregate results
    aggregated = {}
    for key in fold_metrics[0].keys():
        values = [m[key] for m in fold_metrics]
        aggregated[key] = {
            'mean': np.mean(values),
            'std': np.std(values),
            'min': np.min(values),
            'max': np.max(values),
            'all': values
        }
    
    print(f"\n{'='*50}")
    print("CROSS-VALIDATION SUMMARY")
    print(f"{'='*50}")
    for metric, stats in aggregated.items():
        print(f"{metric}: {stats['mean']:.4f} Β± {stats['std']:.4f}")
        print(f"  Range: [{stats['min']:.4f}, {stats['max']:.4f}]")
    
    return aggregated, fold_metrics

# Usage
cv_results, fold_details = k_fold_cross_validation(
    MyModel, config, texts, labels, k=5
)

Nested Cross-Validation for Hyperparameter Tuning

from sklearn.model_selection import ParameterGrid

def nested_cross_validation(model_class, base_config, data, labels, 
                           param_grid, outer_k=5, inner_k=3):
    """
    Nested CV: Outer loop for evaluation, inner loop for hyperparameter selection.
    
    Prevents optimistic bias from hyperparameter tuning.
    """
    outer_cv = StratifiedKFold(n_splits=outer_k, shuffle=True, random_state=42)
    outer_scores = []
    best_params_per_fold = []
    
    for outer_fold, (outer_train_idx, outer_test_idx) in enumerate(
        outer_cv.split(data, labels)
    ):
        print(f"\n{'='*60}")
        print(f"OUTER FOLD {outer_fold + 1}/{outer_k}")
        print(f"{'='*60}")
        
        # Outer split
        X_outer_train = data[outer_train_idx]
        X_outer_test = data[outer_test_idx]
        y_outer_train = labels[outer_train_idx]
        y_outer_test = labels[outer_test_idx]
        
        # Inner CV for hyperparameter selection
        inner_cv = StratifiedKFold(n_splits=inner_k, shuffle=True, random_state=42)
        
        param_scores = {params: [] for params in ParameterGrid(param_grid)}
        
        for inner_train_idx, inner_val_idx in inner_cv.split(
            X_outer_train, y_outer_train
        ):
            X_inner_train = X_outer_train[inner_train_idx]
            X_inner_val = X_outer_train[inner_val_idx]
            y_inner_train = y_outer_train[inner_train_idx]
            y_inner_val = y_outer_train[inner_val_idx]
            
            # Test each parameter combination
            for params in ParameterGrid(param_grid):
                config = {**base_config, **params}
                model = model_class(config)
                
                trainer = Trainer(
                    model=model,
                    train_data=(X_inner_train, y_inner_train),
                    val_data=(X_inner_val, y_inner_val),
                    config=config
                )
                
                trainer.train()
                metrics = trainer.evaluate()
                
                param_scores[params].append(metrics['accuracy'])
        
        # Select best parameters based on inner CV
        best_params = max(param_scores.keys(), 
                         key=lambda p: np.mean(param_scores[p]))
        best_score = np.mean(param_scores[best_params])
        
        print(f"Best params for outer fold {outer_fold + 1}: {best_params}")
        print(f"Inner CV score: {best_score:.4f}")
        
        # Train final model for this outer fold with best params
        final_config = {**base_config, **best_params}
        final_model = model_class(final_config)
        
        final_trainer = Trainer(
            model=final_model,
            train_data=(X_outer_train, y_outer_train),
            val_data=None,  # Use all outer train data
            config=final_config
        )
        
        final_trainer.train()
        
        # Evaluate on held-out outer test set
        test_metrics = final_trainer.evaluate_on_test(X_outer_test, y_outer_test)
        outer_scores.append(test_metrics['accuracy'])
        best_params_per_fold.append(best_params)
    
    # Final aggregated results
    print(f"\n{'='*60}")
    print("NESTED CV FINAL RESULTS")
    print(f"{'='*60}")
    print(f"Test Accuracy: {np.mean(outer_scores):.4f} Β± {np.std(outer_scores):.4f}")
    print(f"Range: [{np.min(outer_scores):.4f}, {np.max(outer_scores):.4f}]")
    
    return {
        'mean_score': np.mean(outer_scores),
        'std_score': np.std(outer_scores),
        'scores': outer_scores,
        'best_params_per_fold': best_params_per_fold
    }

# Example usage
param_grid = {
    'learning_rate': [1e-5, 3e-5, 5e-5],
    'batch_size': [16, 32],
    'num_layers': [6, 12]
}

nested_results = nested_cross_validation(
    TransformerModel, 
    base_config, 
    data, 
    labels, 
    param_grid,
    outer_k=5,
    inner_k=3
)

Leave-One-Out and Leave-P-Out

from sklearn.model_selection import LeaveOneOut, LeavePOut

def leave_one_out_validation(model_class, config, data, labels, max_samples=1000):
    """
    Leave-One-Out CV: Extremely thorough but computationally expensive.
    Use only for small datasets (< 1000 samples).
    """
    if len(data) > max_samples:
        print(f"Warning: LOO is too expensive for {len(data)} samples.")
        print(f"Consider using k-fold with k=10 instead.")
        return None
    
    loo = LeaveOneOut()
    scores = []
    
    for train_idx, test_idx in loo.split(data):
        X_train, X_test = data[train_idx], data[test_idx]
        y_train, y_test = labels[train_idx], labels[test_idx]
        
        model = model_class(config)
        trainer = Trainer(model, (X_train, y_train), None, config)
        trainer.train()
        
        metrics = trainer.evaluate_on_test(X_test, y_test)
        scores.append(metrics['accuracy'])
    
    return {'mean': np.mean(scores), 'std': np.std(scores), 'all': scores}

Statistical Significance Testing

Comparing Two Models

from scipy import stats
import numpy as np

def paired_t_test(model_a_predictions, model_b_predictions, ground_truth):
    """
    Paired t-test to compare two models on the same test set.
    
    Tests if the difference in performance is statistically significant.
    """
    # Calculate per-sample correctness
    correct_a = (model_a_predictions == ground_truth).astype(int)
    correct_b = (model_b_predictions == ground_truth).astype(int)
    
    # Paired t-test on correctness scores
    t_statistic, p_value = stats.ttest_rel(correct_a, correct_b)
    
    acc_a = np.mean(correct_a)
    acc_b = np.mean(correct_b)
    
    print(f"Model A Accuracy: {acc_a:.4f}")
    print(f"Model B Accuracy: {acc_b:.4f}")
    print(f"Difference: {acc_b - acc_a:.4f}")
    print(f"T-statistic: {t_statistic:.4f}")
    print(f"P-value: {p_value:.6f}")
    
    if p_value < 0.05:
        significance = "SIGNIFICANT" if acc_b > acc_a else "SIGNIFICANT (worse)"
        print(f"Result: Model B is {significance} than Model A (p < 0.05)")
    else:
        print(f"Result: No significant difference (p >= 0.05)")
    
    return {
        't_statistic': t_statistic,
        'p_value': p_value,
        'significant': p_value < 0.05,
        'accuracy_difference': acc_b - acc_a
    }

def mcnemar_test(model_a_predictions, model_b_predictions, ground_truth):
    """
    McNemar's test for paired nominal data.
    More appropriate than t-test for classification accuracy comparison.
    """
    # Build contingency table
    both_correct = np.sum((model_a_predictions == ground_truth) & 
                         (model_b_predictions == ground_truth))
    a_correct_b_wrong = np.sum((model_a_predictions == ground_truth) & 
                               (model_b_predictions != ground_truth))
    a_wrong_b_correct = np.sum((model_a_predictions != ground_truth) & 
                               (model_b_predictions == ground_truth))
    both_wrong = np.sum((model_a_predictions != ground_truth) & 
                       (model_b_predictions != ground_truth))
    
    print("Contingency Table:")
    print(f"                Model B Correct | Model B Wrong")
    print(f"Model A Correct     {both_correct:6d}      {a_correct_b_wrong:6d}")
    print(f"Model A Wrong       {a_wrong_b_correct:6d}      {both_wrong:6d}")
    
    # McNemar's test statistic (with continuity correction)
    b = a_correct_b_wrong
    c = a_wrong_b_correct
    
    if b + c == 0:
        print("Cannot perform test: no discordant pairs")
        return None
    
    chi2 = (abs(b - c) - 1) ** 2 / (b + c)
    p_value = 1 - stats.chi2.cdf(chi2, 1)
    
    print(f"\nMcNemar's Chi-squared: {chi2:.4f}")
    print(f"P-value: {p_value:.6f}")
    
    if p_value < 0.05:
        winner = "Model B" if c > b else "Model A"
        print(f"Result: {winner} is significantly better (p < 0.05)")
    
    return {
        'chi2': chi2,
        'p_value': p_value,
        'significant': p_value < 0.05,
        'discordant_pairs': {'b': b, 'c': c}
    }

# Bootstrap confidence intervals
def bootstrap_confidence_interval(predictions, ground_truth, 
                                  metric_fn, n_bootstrap=1000, 
                                  confidence_level=0.95):
    """
    Estimate confidence intervals using bootstrapping.
    """
    n_samples = len(predictions)
    bootstrap_scores = []
    
    for i in range(n_bootstrap):
        # Sample with replacement
        indices = np.random.choice(n_samples, size=n_samples, replace=True)
        sampled_preds = predictions[indices]
        sampled_true = ground_truth[indices]
        
        score = metric_fn(sampled_preds, sampled_true)
        bootstrap_scores.append(score)
    
    # Calculate confidence interval
    alpha = 1 - confidence_level
    lower_percentile = alpha / 2 * 100
    upper_percentile = (1 - alpha / 2) * 100
    
    ci_lower = np.percentile(bootstrap_scores, lower_percentile)
    ci_upper = np.percentile(bootstrap_scores, upper_percentile)
    mean_score = np.mean(bootstrap_scores)
    std_score = np.std(bootstrap_scores)
    
    print(f"Bootstrap Results ({n_bootstrap} iterations):")
    print(f"Mean Score: {mean_score:.4f}")
    print(f"Std Dev: {std_score:.4f}")
    print(f"{confidence_level*100}% CI: [{ci_lower:.4f}, {ci_upper:.4f}]")
    
    return {
        'mean': mean_score,
        'std': std_score,
        'ci_lower': ci_lower,
        'ci_upper': ci_upper,
        'bootstrap_scores': bootstrap_scores
    }

# Usage example
result = paired_t_test(preds_v1, preds_v2, labels)
mcnemar_result = mcnemar_test(preds_v1, preds_v2, labels)
ci_result = bootstrap_confidence_interval(preds_v2, labels, accuracy_score)

Multiple Comparison Correction

from statsmodels.stats.multitest import multipletests

def compare_multiple_models(model_predictions_list, ground_truth, method='fdr_bh'):
    """
    Compare multiple models with correction for multiple comparisons.
    
    Args:
        model_predictions_list: List of (model_name, predictions) tuples
        ground_truth: True labels
        method: Correction method ('bonferroni', 'fdr_bh', 'holm', etc.)
    """
    # Use first model as baseline
    baseline_name, baseline_preds = model_predictions_list[0]
    baseline_correct = (baseline_preds == ground_truth).astype(int)
    
    p_values = []
    model_names = []
    
    for model_name, model_preds in model_predictions_list[1:]:
        model_correct = (model_preds == ground_truth).astype(int)
        
        # Paired t-test against baseline
        _, p_value = stats.ttest_rel(model_correct, baseline_correct)
        p_values.append(p_value)
        model_names.append(model_name)
    
    # Apply correction
    reject, corrected_p_values, _, _ = multipletests(
        p_values, 
        alpha=0.05, 
        method=method
    )
    
    print(f"Multiple Comparison Correction: {method}")
    print(f"{'Model':<20} {'Raw P':<10} {'Corrected P':<12} {'Significant'}")
    print("-" * 55)
    
    for name, raw_p, corr_p, sig in zip(model_names, p_values, corrected_p_values, reject):
        print(f"{name:<20} {raw_p:<10.6f} {corr_p:<12.6f} {'Yes' if sig else 'No'}")
    
    return {
        'model_names': model_names,
        'raw_p_values': p_values,
        'corrected_p_values': corrected_p_values,
        'reject_null': reject
    }

Bias and Fairness Detection

Demographic Parity and Equalized Odds

import pandas as pd
from sklearn.metrics import confusion_matrix

class FairnessAuditor:
    def __init__(self, predictions, ground_truth, sensitive_attributes):
        """
        Args:
            predictions: Model predictions
            ground_truth: True labels
            sensitive_attributes: Dict of protected attributes 
                                 (e.g., {'gender': [...], 'race': [...]})
        """
        self.predictions = np.array(predictions)
        self.ground_truth = np.array(ground_truth)
        self.sensitive_attributes = sensitive_attributes
        
    def demographic_parity(self, attribute_name):
        """
        Check if positive prediction rates are equal across groups.
        
        Demographic parity: P(ΕΆ=1|A=0) = P(ΕΆ=1|A=1)
        """
        attribute = self.sensitive_attributes[attribute_name]
        unique_groups = np.unique(attribute)
        
        positive_rates = {}
        for group in unique_groups:
            mask = attribute == group
            positive_rate = np.mean(self.predictions[mask] == 1)
            positive_rates[group] = positive_rate
        
        # Calculate disparity
        rates = list(positive_rates.values())
        max_disparity = max(rates) - min(rates)
        
        print(f"Demographic Parity for '{attribute_name}':")
        print(f"{'Group':<15} {'Positive Rate':<15}")
        print("-" * 30)
        for group, rate in positive_rates.items():
            print(f"{str(group):<15} {rate:.4f}")
        print(f"\nMax Disparity: {max_disparity:.4f}")
        
        # Rule of thumb: disparity < 0.1 is acceptable
        passed = max_disparity < 0.1
        print(f"Status: {'PASS' if passed else 'FAIL'} (threshold: 0.1)")
        
        return {
            'positive_rates': positive_rates,
            'max_disparity': max_disparity,
            'passed': passed
        }
    
    def equalized_odds(self, attribute_name):
        """
        Check if TPR and FPR are equal across groups.
        
        Equalized odds: P(ΕΆ=1|Y=1,A=0) = P(ΕΆ=1|Y=1,A=1)
                       and P(ΕΆ=1|Y=0,A=0) = P(ΕΆ=1|Y=0,A=1)
        """
        attribute = self.sensitive_attributes[attribute_name]
        unique_groups = np.unique(attribute)
        
        tpr_by_group = {}
        fpr_by_group = {}
        
        for group in unique_groups:
            mask = attribute == group
            
            # True Positive Rate (Recall)
            positive_mask = mask & (self.ground_truth == 1)
            if np.sum(positive_mask) > 0:
                tpr = np.mean(self.predictions[positive_mask] == 1)
            else:
                tpr = 0.0
            
            # False Positive Rate
            negative_mask = mask & (self.ground_truth == 0)
            if np.sum(negative_mask) > 0:
                fpr = np.mean(self.predictions[negative_mask] == 1)
            else:
                fpr = 0.0
            
            tpr_by_group[group] = tpr
            fpr_by_group[group] = fpr
        
        # Calculate disparities
        tpr_disparity = max(tpr_by_group.values()) - min(tpr_by_group.values())
        fpr_disparity = max(fpr_by_group.values()) - min(fpr_by_group.values())
        
        print(f"\nEqualized Odds for '{attribute_name}':")
        print(f"{'Group':<10} {'TPR':<10} {'FPR':<10}")
        print("-" * 30)
        for group in unique_groups:
            print(f"{str(group):<10} {tpr_by_group[group]:.4f}   {fpr_by_group[group]:.4f}")
        
        print(f"\nTPR Disparity: {tpr_disparity:.4f}")
        print(f"FPR Disparity: {fpr_disparity:.4f}")
        
        passed = (tpr_disparity < 0.1) and (fpr_disparity < 0.1)
        print(f"Status: {'PASS' if passed else 'FAIL'} (threshold: 0.1)")
        
        return {
            'tpr_by_group': tpr_by_group,
            'fpr_by_group': fpr_by_group,
            'tpr_disparity': tpr_disparity,
            'fpr_disparity': fpr_disparity,
            'passed': passed
        }
    
    def predictive_parity(self, attribute_name):
        """
        Check if precision is equal across groups.
        
        Predictive parity: P(Y=1|ΕΆ=1,A=0) = P(Y=1|ΕΆ=1,A=1)
        """
        attribute = self.sensitive_attributes[attribute_name]
        unique_groups = np.unique(attribute)
        
        precision_by_group = {}
        
        for group in unique_groups:
            mask = attribute == group
            predicted_positive_mask = mask & (self.predictions == 1)
            
            if np.sum(predicted_positive_mask) > 0:
                precision = np.mean(self.ground_truth[predicted_positive_mask] == 1)
            else:
                precision = 0.0
            
            precision_by_group[group] = precision
        
        disparity = max(precision_by_group.values()) - min(precision_by_group.values())
        
        print(f"\nPredictive Parity for '{attribute_name}':")
        print(f"{'Group':<15} {'Precision':<15}")
        print("-" * 30)
        for group, prec in precision_by_group.items():
            print(f"{str(group):<15} {prec:.4f}")
        print(f"\nDisparity: {disparity:.4f}")
        
        passed = disparity < 0.1
        print(f"Status: {'PASS' if passed else 'FAIL'}")
        
        return {
            'precision_by_group': precision_by_group,
            'disparity': disparity,
            'passed': passed
        }
    
    def generate_fairness_report(self):
        """Generate comprehensive fairness report for all attributes."""
        report = {}
        
        for attr_name in self.sensitive_attributes.keys():
            print(f"\n{'='*60}")
            print(f"FAIRNESS AUDIT: {attr_name.upper()}")
            print(f"{'='*60}")
            
            report[attr_name] = {
                'demographic_parity': self.demographic_parity(attr_name),
                'equalized_odds': self.equalized_odds(attr_name),
                'predictive_parity': self.predictive_parity(attr_name)
            }
        
        return report

# Usage example
auditor = FairnessAuditor(
    predictions=model_preds,
    ground_truth=true_labels,
    sensitive_attributes={
        'gender': gender_array,
        'age_group': age_group_array,
        'region': region_array
    }
)

fairness_report = auditor.generate_fairness_report()

Bias Mitigation Strategies

class BiasMitigator:
    def __init__(self, model, training_data, sensitive_attributes):
        self.model = model
        self.training_data = training_data
        self.sensitive_attributes = sensitive_attributes
    
    def reweighting(self, target_attribute):
        """
        Reweight samples to balance representation across groups.
        """
        attribute = self.sensitive_attributes[target_attribute]
        unique_groups, counts = np.unique(attribute, return_counts=True)
        
        # Calculate weights inversely proportional to group size
        total_samples = len(attribute)
        weights = np.zeros_like(attribute, dtype=float)
        
        for group, count in zip(unique_groups, counts):
            mask = attribute == group
            # Weight = total_samples / (num_groups * count_in_group)
            weights[mask] = total_samples / (len(unique_groups) * count)
        
        # Normalize weights
        weights = weights * len(attribute) / np.sum(weights)
        
        print(f"Reweighting for '{target_attribute}':")
        print(f"Weight range: [{weights.min():.4f}, {weights.max():.4f}]")
        
        return weights
    
    def adversarial_debiasing(self, debias_epochs=10):
        """
        Train adversary to predict sensitive attribute from representations.
        Update model to minimize adversary's success.
        """
        # Implementation would add adversarial head to model
        # and alternate between main task and adversarial training
        pass
    
    def threshold_optimization(self, val_predictions, val_labels, 
                               sensitive_attributes, target_attribute):
        """
        Optimize decision thresholds per group to equalize metrics.
        """
        attribute = sensitive_attributes[target_attribute]
        unique_groups = np.unique(attribute)
        
        optimal_thresholds = {}
        
        for group in unique_groups:
            mask = attribute == group
            group_preds = val_predictions[mask]
            group_labels = val_labels[mask]
            
            # Find threshold that equalizes TPR across groups
            best_threshold = 0.5
            best_metric = 0
            
            for threshold in np.arange(0.1, 0.9, 0.05):
                binarized_preds = (group_preds > threshold).astype(int)
                tpr = np.sum((binarized_preds == 1) & (group_labels == 1)) / \
                      np.sum(group_labels == 1)
                
                # Optimize for equal TPR (simplified)
                if tpr > best_metric:
                    best_metric = tpr
                    best_threshold = threshold
            
            optimal_thresholds[group] = best_threshold
        
        print(f"Optimal thresholds for '{target_attribute}':")
        for group, thresh in optimal_thresholds.items():
            print(f"  Group {group}: {thresh:.2f}")
        
        return optimal_thresholds

Robustness Testing

Perturbation Testing

import random
import string

class RobustnessTester:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def character_level_perturbations(self, texts, perturbation_rate=0.1):
        """
        Test robustness to character-level noise.
        
        Types:
        - Random character insertion
        - Random character deletion
        - Random character substitution
        - Adjacent character swap
        """
        perturbed_texts = []
        
        for text in texts:
            chars = list(text)
            num_perturbations = max(1, int(len(chars) * perturbation_rate))
            
            for _ in range(num_perturbations):
                op = random.choice(['insert', 'delete', 'substitute', 'swap'])
                idx = random.randint(0, len(chars) - 1)
                
                if op == 'insert' and chars[idx].isalpha():
                    chars.insert(idx, random.choice(string.ascii_lowercase))
                elif op == 'delete' and len(chars) > 1:
                    chars.pop(idx)
                elif op == 'substitute' and chars[idx].isalpha():
                    chars[idx] = random.choice(string.ascii_lowercase)
                elif op == 'swap' and idx < len(chars) - 1:
                    chars[idx], chars[idx + 1] = chars[idx + 1], chars[idx]
            
            perturbed_texts.append(''.join(chars))
        
        return perturbed_texts
    
    def word_level_perturbations(self, texts, perturbation_rate=0.1):
        """
        Test robustness to word-level noise.
        
        Types:
        - Random word deletion
        - Random word insertion (synonyms or random)
        - Word order shuffling (local)
        """
        perturbed_texts = []
        
        for text in texts:
            words = text.split()
            num_perturbations = max(1, int(len(words) * perturbation_rate))
            
            for _ in range(num_perturbations):
                op = random.choice(['delete', 'insert', 'shuffle'])
                idx = random.randint(0, len(words) - 1)
                
                if op == 'delete' and len(words) > 1:
                    words.pop(idx)
                elif op == 'insert':
                    # Insert random common word
                    common_words = ['the', 'a', 'is', 'it', 'this', 'that']
                    words.insert(idx, random.choice(common_words))
                elif op == 'shuffle' and idx < len(words) - 1:
                    words[idx], words[idx + 1] = words[idx + 1], words[idx]
            
            perturbed_texts.append(' '.join(words))
        
        return perturbed_texts
    
    def synonym_replacement(self, texts, replacement_rate=0.1):
        """
        Replace words with synonyms using WordNet or similar.
        """
        try:
            from nltk.corpus import wordnet
            from nltk import word_tokenize, pos_tag
        except ImportError:
            print("NLTK not available. Install with: pip install nltk")
            return texts
        
        perturbed_texts = []
        
        for text in texts:
            words = text.split()
            num_replacements = max(1, int(len(words) * replacement_rate))
            
            for _ in range(num_replacements):
                idx = random.randint(0, len(words) - 1)
                word = words[idx]
                
                # Get synonyms
                synsets = wordnet.synsets(word)
                if synsets:
                    synonyms = []
                    for synset in synsets:
                        for lemma in synset.lemmas():
                            synonym = lemma.name().replace('_', ' ')
                            if synonym.lower() != word.lower():
                                synonyms.append(synonym)
                    
                    if synonyms:
                        words[idx] = random.choice(synonyms)
            
            perturbed_texts.append(' '.join(words))
        
        return perturbed_texts
    
    def back_translation(self, texts, intermediate_lang='de'):
        """
        Test robustness via back-translation (round-trip translation).
        
        Translate to intermediate language and back.
        """
        try:
            from transformers import MarianMTModel, MarianTokenizer
            
            # Load translation models
            trans_to_tokenizer = MarianTokenizer.from_pretrained(
                f'Helsinki-NLP/opus-mt-en-{intermediate_lang}'
            )
            trans_to_model = MarianMTModel.from_pretrained(
                f'Helsinki-NLP/opus-mt-en-{intermediate_lang}'
            )
            
            trans_back_tokenizer = MarianTokenizer.from_pretrained(
                f'Helsinki-NLP/opus-mt-{intermediate_lang}-en'
            )
            trans_back_model = MarianMTModel.from_pretrained(
                f'Helsinki-NLP/opus-mt-{intermediate_lang}-en'
            )
        except Exception as e:
            print(f"Translation models not available: {e}")
            return texts
        
        perturbed_texts = []
        
        for text in texts:
            # Translate to intermediate language
            inputs_to = trans_to_tokenizer(text, return_tensors='pt', padding=True)
            translated_to = trans_to_model.generate(**inputs_to)
            intermediate = trans_to_tokenizer.decode(translated_to[0], skip_special_tokens=True)
            
            # Translate back to English
            inputs_back = trans_back_tokenizer(intermediate, return_tensors='pt', padding=True)
            translated_back = trans_back_model.generate(**inputs_back)
            back_translated = trans_back_tokenizer.decode(
                translated_back[0], 
                skip_special_tokens=True
            )
            
            perturbed_texts.append(back_translated)
        
        return perturbed_texts
    
    def evaluate_robustness(self, original_texts, labels, perturbation_fn, 
                           perturbation_name):
        """
        Evaluate model performance on perturbed data.
        """
        perturbed_texts = perturbation_fn(original_texts)
        
        # Get predictions for original and perturbed
        orig_preds = self.model.predict(original_texts)
        pert_preds = self.model.predict(perturbed_texts)
        
        # Calculate metrics
        orig_accuracy = np.mean(orig_preds == labels)
        pert_accuracy = np.mean(pert_preds == labels)
        
        # Consistency: predictions unchanged despite perturbation
        consistency = np.mean(orig_preds == pert_preds)
        
        print(f"\nRobustness Test: {perturbation_name}")
        print(f"Original Accuracy: {orig_accuracy:.4f}")
        print(f"Perturbed Accuracy: {pert_accuracy:.4f}")
        print(f"Accuracy Drop: {orig_accuracy - pert_accuracy:.4f}")
        print(f"Prediction Consistency: {consistency:.4f}")
        
        return {
            'original_accuracy': orig_accuracy,
            'perturbed_accuracy': pert_accuracy,
            'accuracy_drop': orig_accuracy - pert_accuracy,
            'consistency': consistency
        }
    
    def comprehensive_robustness_suite(self, texts, labels):
        """Run all robustness tests."""
        results = {}
        
        tests = [
            ('Character Insertion', lambda t: self.character_level_perturbations(t, 0.1)),
            ('Character Deletion', lambda t: self.character_level_perturbations(t, 0.1)),
            ('Word Deletion', lambda t: self.word_level_perturbations(t, 0.1)),
            ('Word Insertion', lambda t: self.word_level_perturbations(t, 0.1)),
            ('Synonym Replacement', lambda t: self.synonym_replacement(t, 0.1)),
        ]
        
        for test_name, pert_fn in tests:
            results[test_name] = self.evaluate_robustness(
                texts, labels, pert_fn, test_name
            )
        
        # Summary
        print(f"\n{'='*60}")
        print("ROBUSTNESS SUMMARY")
        print(f"{'='*60}")
        print(f"{'Test':<25} {'Acc Drop':<12} {'Consistency':<12}")
        print("-" * 50)
        
        for test_name, metrics in results.items():
            print(f"{test_name:<25} {metrics['accuracy_drop']:.4f}      {metrics['consistency']:.4f}")
        
        return results

Stress Testing

class StressTester:
    def __init__(self, model):
        self.model = model
    
    def length_stress_test(self, texts, labels):
        """Test performance across different input lengths."""
        lengths = [len(text.split()) for text in texts]
        
        # Bin by length
        bins = [(0, 10), (10, 25), (25, 50), (50, 100), (100, float('inf'))]
        results = {}
        
        for min_len, max_len in bins:
            mask = [(min_len <= l < max_len) for l in lengths]
            bin_texts = [t for t, m in zip(texts, mask) if m]
            bin_labels = [l for l, m in zip(labels, mask) if m]
            
            if len(bin_labels) > 0:
                preds = self.model.predict(bin_texts)
                accuracy = np.mean(preds == bin_labels)
                
                bin_name = f"{min_len}-{max_len if max_len != float('inf') else '∞'}"
                results[bin_name] = {
                    'count': len(bin_labels),
                    'accuracy': accuracy
                }
                
                print(f"Length {bin_name}: {accuracy:.4f} (n={len(bin_labels)})")
        
        return results
    
    def rare_class_stress_test(self, texts, labels):
        """Test performance on rare/underrepresented classes."""
        unique_classes, counts = np.unique(labels, return_counts=True)
        
        results = {}
        
        for cls, count in zip(unique_classes, counts):
            mask = labels == cls
            cls_texts = [t for t, m in zip(texts, mask) if m]
            cls_labels = [l for l, m in zip(labels, mask) if m]
            
            preds = self.model.predict(cls_texts)
            accuracy = np.mean(preds == cls_labels)
            
            frequency = 'rare' if count < len(labels) * 0.05 else 'common'
            
            results[cls] = {
                'count': count,
                'frequency': frequency,
                'accuracy': accuracy
            }
            
            print(f"Class {cls}: {accuracy:.4f} (n={count}, {frequency})")
        
        return results
    
    def edge_case_stress_test(self, edge_cases):
        """Test specific edge cases."""
        results = {}
        
        for case_name, (text, expected_label) in edge_cases.items():
            pred = self.model.predict([text])[0]
            correct = pred == expected_label
            
            results[case_name] = {
                'expected': expected_label,
                'predicted': pred,
                'correct': correct
            }
            
            status = "βœ“" if correct else "βœ—"
            print(f"{status} {case_name}: Expected={expected_label}, Got={pred}")
        
        return results

Adversarial Testing

TextFooler-Style Adversarial Attacks

class AdversarialAttacker:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def calculate_importance_scores(self, text, label):
        """
        Calculate importance score for each word by masking.
        """
        words = text.split()
        importance_scores = []
        
        # Get original prediction probability
        orig_probs = self.model.predict_proba([text])[0]
        orig_prob = orig_probs[label]
        
        for i, word in enumerate(words):
            # Mask this word
            masked_words = words[:i] + ['[MASK]'] + words[i+1:]
            masked_text = ' '.join(masked_words)
            
            # Get prediction with masked word
            masked_probs = self.model.predict_proba([masked_text])[0]
            masked_prob = masked_probs[label]
            
            # Importance = drop in probability
            importance = orig_prob - masked_prob
            importance_scores.append(importance)
        
        return importance_scores
    
    def find_synonyms(self, word, top_k=10):
        """Find synonyms for a word."""
        try:
            from nltk.corpus import wordnet
        except ImportError:
            return [word]
        
        synonyms = []
        for synset in wordnet.synsets(word):
            for lemma in synset.lemmas():
                synonym = lemma.name().replace('_', ' ')
                if synonym.lower() != word.lower() and synonym.isalpha():
                    synonyms.append(synonym)
        
        return synonyms[:top_k]
    
    def textfooler_attack(self, text, true_label, max_iterations=10):
        """
        Implement TextFooler-style adversarial attack.
        
        Strategy:
        1. Identify important words
        2. Replace with synonyms that change prediction
        3. Ensure semantic similarity and grammaticality
        """
        words = text.split()
        current_text = text
        attacked_words = set()
        
        for iteration in range(max_iterations):
            # Get current prediction
            pred = self.model.predict([current_text])[0]
            
            # Check if attack succeeded
            if pred != true_label:
                print(f"Attack succeeded at iteration {iteration + 1}")
                return {
                    'success': True,
                    'adversarial_text': current_text,
                    'iterations': iteration + 1,
                    'attacked_words': attacked_words
                }
            
            # Calculate importance scores
            importance_scores = self.calculate_importance_scores(
                current_text, true_label
            )
            
            # Sort words by importance (descending)
            sorted_indices = np.argsort(importance_scores)[::-1]
            
            # Try to replace most important unattacked word
            attacked = False
            for idx in sorted_indices:
                if idx in attacked_words:
                    continue
                
                word = words[idx]
                synonyms = self.find_synonyms(word)
                
                # Try each synonym
                for synonym in synonyms:
                    # Create candidate text
                    candidate_words = words.copy()
                    candidate_words[idx] = synonym
                    candidate_text = ' '.join(candidate_words)
                    
                    # Check if prediction changes
                    new_pred = self.model.predict([candidate_text])[0]
                    
                    if new_pred != true_label:
                        # Attack successful
                        current_text = candidate_text
                        words = candidate_words
                        attacked_words.add(idx)
                        attacked = True
                        break
                    
                    # Also check if probability of true label decreases
                    orig_probs = self.model.predict_proba([current_text])[0]
                    new_probs = self.model.predict_proba([candidate_text])[0]
                    
                    if new_probs[true_label] < orig_probs[true_label]:
                        # Accept if it reduces confidence
                        current_text = candidate_text
                        words = candidate_words
                        attacked_words.add(idx)
                        attacked = True
                        break
                
                if attacked:
                    break
            
            if not attacked:
                print("No successful perturbation found")
                break
        
        return {
            'success': False,
            'adversarial_text': current_text,
            'iterations': max_iterations,
            'attacked_words': attacked_words
        }
    
    def generate_adversarial_dataset(self, texts, labels, attack_rate=0.3):
        """
        Generate adversarial examples for a portion of the dataset.
        """
        num_to_attack = int(len(texts) * attack_rate)
        indices = np.random.choice(len(texts), num_to_attack, replace=False)
        
        adversarial_examples = []
        success_count = 0
        
        for idx in indices:
            text = texts[idx]
            label = labels[idx]
            
            result = self.textfooler_attack(text, label)
            
            if result['success']:
                success_count += 1
                adversarial_examples.append({
                    'original': text,
                    'adversarial': result['adversarial_text'],
                    'label': label,
                    'iterations': result['iterations']
                })
        
        attack_success_rate = success_count / num_to_attack
        
        print(f"\nAdversarial Attack Summary:")
        print(f"Attempted: {num_to_attack}")
        print(f"Successful: {success_count}")
        print(f"Success Rate: {attack_success_rate:.4f}")
        
        return adversarial_examples, attack_success_rate

Adversarial Training

class AdversarialTrainer:
    def __init__(self, model, attacker, config):
        self.model = model
        self.attacker = attacker
        self.config = config
    
    def train_with_adversarial_examples(self, train_loader, adversarial_ratio=0.5):
        """
        Train model augmented with adversarial examples.
        
        Mix of clean and adversarial examples improves robustness.
        """
        self.model.train()
        
        for epoch in range(self.config.num_epochs):
            total_loss = 0
            
            for batch_idx, (texts, labels) in enumerate(train_loader):
                # Get adversarial examples for this batch
                adv_examples = []
                adv_labels = []
                
                for text, label in zip(texts, labels):
                    if random.random() < adversarial_ratio:
                        result = self.attacker.textfooler_attack(text, label, max_iterations=5)
                        if result['success']:
                            adv_examples.append(result['adversarial_text'])
                            adv_labels.append(label)
                        else:
                            adv_examples.append(text)
                            adv_labels.append(label)
                    else:
                        adv_examples.append(text)
                        adv_labels.append(label)
                
                # Train on mixed batch
                loss = self.model.train_step(adv_examples, adv_labels)
                total_loss += loss
            
            avg_loss = total_loss / len(train_loader)
            print(f"Epoch {epoch + 1}: Loss = {avg_loss:.4f}")
        
        return self.model

Domain Shift Detection

Distribution Comparison

from scipy.spatial.distance import jensenshannon, wasserstein_distance
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

class DomainShiftDetector:
    def __init__(self, source_data, target_data, model):
        """
        Args:
            source_data: Training distribution data
            target_data: New/target distribution data
            model: Trained model for extracting representations
        """
        self.source_data = source_data
        self.target_data = target_data
        self.model = model
    
    def extract_representations(self, texts):
        """Extract hidden representations from model."""
        representations = []
        
        for text in texts:
            rep = self.model.get_hidden_representation(text)
            representations.append(rep)
        
        return np.array(representations)
    
    def compare_label_distributions(self, source_labels, target_labels):
        """Compare label distribution between domains."""
        # Get unique labels
        all_labels = np.unique(np.concatenate([source_labels, target_labels]))
        
        # Calculate distributions
        source_dist = np.array([np.mean(source_labels == l) for l in all_labels])
        target_dist = np.array([np.mean(target_labels == l) for l in all_labels])
        
        # Jensen-Shannon divergence
        js_div = jensenshannon(source_dist, target_dist)
        
        # Total Variation Distance
        tv_dist = 0.5 * np.sum(np.abs(source_dist - target_dist))
        
        print(f"Label Distribution Comparison:")
        print(f"Jensen-Shannon Divergence: {js_div:.4f}")
        print(f"Total Variation Distance: {tv_dist:.4f}")
        
        # Visualization
        fig, ax = plt.subplots(figsize=(10, 6))
        x = np.arange(len(all_labels))
        width = 0.35
        
        ax.bar(x - width/2, source_dist, width, label='Source', alpha=0.8)
        ax.bar(x + width/2, target_dist, width, label='Target', alpha=0.08)
        
        ax.set_xlabel('Class')
        ax.set_ylabel('Proportion')
        ax.set_title('Label Distribution: Source vs Target')
        ax.set_xticks(x)
        ax.set_xticklabels(all_labels)
        ax.legend()
        
        plt.tight_layout()
        plt.savefig('label_distribution_comparison.png')
        plt.show()
        
        return {
            'js_divergence': js_div,
            'tv_distance': tv_dist,
            'source_distribution': source_dist,
            'target_distribution': target_dist
        }
    
    def compare_feature_distributions(self):
        """Compare feature distributions using statistical tests."""
        # Extract representations
        print("Extracting source representations...")
        source_reps = self.extract_representations(self.source_data)
        
        print("Extracting target representations...")
        target_reps = self.extract_representations(self.target_data)
        
        # Per-feature comparison (Kolmogorov-Smirnov test)
        n_features = source_reps.shape[1]
        ks_statistics = []
        ks_p_values = []
        
        for i in range(min(n_features, 100)):  # Sample features for efficiency
            stat, p_val = stats.ks_2samp(source_reps[:, i], target_reps[:, i])
            ks_statistics.append(stat)
            ks_p_values.append(p_val)
        
        # Summary statistics
        mean_ks_stat = np.mean(ks_statistics)
        frac_significant = np.mean([p < 0.05 for p in ks_p_values])
        
        print(f"\nFeature Distribution Comparison:")
        print(f"Mean KS Statistic: {mean_ks_stat:.4f}")
        print(f"Fraction of Significant Features (p<0.05): {frac_significant:.4f}")
        
        # Wasserstein distance on aggregated representations
        source_means = np.mean(source_reps, axis=0)
        target_means = np.mean(target_reps, axis=0)
        
        wasserstein_dist = wasserstein_distance(source_means, target_means)
        print(f"Wasserstein Distance (means): {wasserstein_dist:.4f}")
        
        return {
            'mean_ks_statistic': mean_ks_stat,
            'frac_significant': frac_significant,
            'wasserstein_distance': wasserstein_dist
        }
    
    def visualize_domain_shift(self):
        """Visualize domain shift using t-SNE."""
        # Combine data
        all_data = np.concatenate([self.source_data, self.target_data])
        all_labels = ['Source'] * len(self.source_data) + \
                     ['Target'] * len(self.target_data)
        
        # Extract representations
        reps = self.extract_representations(all_data)
        
        # t-SNE visualization
        tsne = TSNE(n_components=2, random_state=42, perplexity=30)
        reps_2d = tsne.fit_transform(reps)
        
        # Plot
        fig, ax = plt.subplots(figsize=(12, 10))
        
        source_mask = np.array(all_labels) == 'Source'
        target_mask = np.array(all_labels) == 'Target'
        
        ax.scatter(reps_2d[source_mask, 0], reps_2d[source_mask, 1],
                  alpha=0.5, label='Source', s=10)
        ax.scatter(reps_2d[target_mask, 0], reps_2d[target_mask, 1],
                  alpha=0.5, label='Target', s=10)
        
        ax.set_xlabel('t-SNE Dimension 1')
        ax.set_ylabel('t-SNE Dimension 2')
        ax.set_title('Domain Shift Visualization: Source vs Target')
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('domain_shift_tsne.png')
        plt.show()
    
    def detect_covariate_shift(self):
        """Detect covariate shift using KLIEP-like method."""
        # Simplified covariate shift detection
        source_reps = self.extract_representations(self.source_data)
        target_reps = self.extract_representations(self.target_data)
        
        # Train classifier to distinguish source from target
        X = np.concatenate([source_reps, target_reps])
        y = np.concatenate([
            np.zeros(len(source_reps)),
            np.ones(len(target_reps))
        ])
        
        from sklearn.linear_model import LogisticRegression
        clf = LogisticRegression(random_state=42)
        clf.fit(X, y)
        
        # If classifier can easily distinguish, there's significant shift
        accuracy = clf.score(X, y)
        
        print(f"Covariate Shift Detection:")
        print(f"Source/Target Classifier Accuracy: {accuracy:.4f}")
        
        if accuracy > 0.7:
            print("WARNING: Significant covariate shift detected!")
        elif accuracy > 0.55:
            print("MODERATE: Some covariate shift present")
        else:
            print("LOW: Minimal covariate shift")
        
        return {
            'classifier_accuracy': accuracy,
            'shift_severity': 'high' if accuracy > 0.7 else 
                             'moderate' if accuracy > 0.55 else 'low'
        }

Calibration and Confidence Estimation

from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt

class CalibrationAnalyzer:
    def __init__(self, model):
        self.model = model
    
    def get_predicted_probabilities(self, texts):
        """Get predicted probabilities from model."""
        return self.model.predict_proba(texts)
    
    def plot_calibration_curve(self, texts, labels, n_bins=10):
        """
        Plot reliability diagram (calibration curve).
        """
        probs = self.get_predicted_probabilities(texts)
        
        # For binary classification
        if probs.shape[1] == 2:
            prob_positive = probs[:, 1]
        else:
            # Use max probability for multiclass
            prob_positive = np.max(probs, axis=1)
        
        # Calculate calibration curve
        fraction_of_positives, mean_predicted_value = calibration_curve(
            labels, prob_positive, n_bins=n_bins
        )
        
        # Plot
        fig, ax = plt.subplots(figsize=(8, 8))
        
        ax.plot(mean_predicted_value, fraction_of_positives, 's-', 
               label='Model', markersize=10)
        ax.plot([0, 1], [0, 1], 'k--', label='Perfectly calibrated')
        
        ax.set_xlabel('Mean Predicted Probability')
        ax.set_ylabel('Fraction of Positives')
        ax.set_title('Calibration Curve (Reliability Diagram)')
        ax.legend(loc='upper left')
        ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig('calibration_curve.png')
        plt.show()
        
        # Calculate Expected Calibration Error (ECE)
        ece = self.calculate_ece(labels, prob_positive, n_bins)
        
        return {
            'fraction_of_positives': fraction_of_positives,
            'mean_predicted_value': mean_predicted_value,
            'ece': ece
        }
    
    def calculate_ece(self, labels, probabilities, n_bins=10):
        """
        Calculate Expected Calibration Error.
        
        ECE measures the weighted average difference between predicted 
        confidence and actual accuracy across bins.
        """
        bin_boundaries = np.linspace(0, 1, n_bins + 1)
        ece = 0.0
        
        for i in range(n_bins):
            # Find samples in this bin
            in_bin = (probabilities > bin_boundaries[i]) & \
                     (probabilities <= bin_boundaries[i + 1])
            
            prop_in_bin = np.mean(in_bin)
            
            if prop_in_bin > 0:
                # Average confidence in bin
                avg_confidence = np.mean(probabilities[in_bin])
                
                # Actual accuracy in bin
                avg_accuracy = np.mean(labels[in_bin] == 
                                      (probabilities[in_bin] > 0.5).astype(int))
                
                # Weighted difference
                ece += np.abs(avg_accuracy - avg_confidence) * prop_in_bin
        
        print(f"Expected Calibration Error (ECE): {ece:.4f}")
        
        return ece
    
    def temperature_scaling(self, val_texts, val_labels):
        """
        Apply temperature scaling to improve calibration.
        
        Find optimal temperature T that minimizes NLL on validation set.
        """
        from scipy.optimize import minimize_scalar
        
        logits = self.model.get_logits(val_texts)
        
        def nll_loss(T):
            scaled_logits = logits / T
            probs = softmax(scaled_logits, axis=1)
            
            # Negative log likelihood
            nll = -np.mean(np.log(probs[np.arange(len(val_labels)), val_labels]))
            return nll
        
        # Find optimal temperature
        result = minimize_scalar(nll_loss, bounds=(0.1, 10.0), method='bounded')
        optimal_T = result.x
        
        print(f"Optimal Temperature: {optimal_T:.4f}")
        print(f"NLL before scaling: {nll_loss(1.0):.4f}")
        print(f"NLL after scaling: {nll_loss(optimal_T):.4f}")
        
        return optimal_T
    
    def apply_temperature(self, texts, temperature):
        """Apply temperature scaling to predictions."""
        logits = self.model.get_logits(texts)
        scaled_logits = logits / temperature
        calibrated_probs = softmax(scaled_logits, axis=1)
        
        return calibrated_probs

A/B Testing Framework

import pandas as pd
from datetime import datetime, timedelta

class ABTestingFramework:
    def __init__(self, experiment_name):
        self.experiment_name = experiment_name
        self.results = []
    
    def design_experiment(self, control_model, treatment_model, 
                         sample_size, duration_days, metrics):
        """
        Design A/B test with proper power analysis.
        """
        from statsmodels.stats.power import zt_ind_solve_power
        
        # Power analysis
        effect_size = 0.1  # Minimum detectable effect (10%)
        alpha = 0.05
        power = 0.8
        
        required_n = zt_ind_solve_power(
            effect_size=effect_size,
            alpha=alpha,
            power=power,
            ratio=1  # Equal split
        )
        
        required_n = int(np.ceil(required_n))
        
        print(f"A/B Test Design: {self.experiment_name}")
        print(f"Required samples per variant: {required_n}")
        print(f"Total required: {required_n * 2}")
        print(f"Planned sample size: {sample_size}")
        print(f"Duration: {duration_days} days")
        
        if sample_size < required_n:
            print(f"WARNING: Planned sample size may be underpowered!")
        
        return {
            'required_per_variant': required_n,
            'planned_per_variant': sample_size // 2,
            'effect_size': effect_size,
            'alpha': alpha,
            'power': power
        }
    
    def assign_users(self, user_ids, assignment_ratio=0.5):
        """
        Randomly assign users to control or treatment.
        """
        np.random.seed(42)  # Reproducible assignment
        
        assignments = np.random.rand(len(user_ids)) < assignment_ratio
        user_assignments = {
            uid: 'treatment' if assign else 'control'
            for uid, assign in zip(user_ids, assignments)
        }
        
        # Verify balance
        n_control = sum(1 for v in user_assignments.values() if v == 'control')
        n_treatment = len(user_ids) - n_control
        
        print(f"User Assignment:")
        print(f"Control: {n_control} ({n_control/len(user_ids)*100:.1f}%)")
        print(f"Treatment: {n_treatment} ({n_treatment/len(user_ids)*100:.1f}%)")
        
        return user_assignments
    
    def collect_metrics(self, interactions, user_assignments):
        """
        Collect and aggregate metrics from user interactions.
        """
        # Add assignment to interactions
        df = pd.DataFrame(interactions)
        df['variant'] = df['user_id'].map(user_assignments)
        
        # Aggregate by variant
        results = {}
        
        for variant in ['control', 'treatment']:
            variant_data = df[df['variant'] == variant]
            
            variant_results = {
                'n_users': variant_data['user_id'].nunique(),
                'n_interactions': len(variant_data),
            }
            
            # Calculate each metric
            for metric in ['accuracy', 'latency', 'user_satisfaction']:
                if metric in variant_data.columns:
                    variant_results[f'{metric}_mean'] = variant_data[metric].mean()
                    variant_results[f'{metric}_std'] = variant_data[metric].std()
            
            results[variant] = variant_results
        
        return results
    
    def analyze_results(self, control_data, treatment_data, metric_name):
        """
        Analyze A/B test results with statistical testing.
        """
        control_values = np.array(control_data)
        treatment_values = np.array(treatment_data)
        
        # Difference in means
        diff = np.mean(treatment_values) - np.mean(control_values)
        
        # Two-sample t-test
        t_stat, p_value = stats.ttest_ind(treatment_values, control_values)
        
        # Confidence interval for difference
        pooled_se = np.sqrt(np.var(control_values)/len(control_values) + 
                           np.var(treatment_values)/len(treatment_values))
        ci_lower = diff - 1.96 * pooled_se
        ci_upper = diff + 1.96 * pooled_se
        
        # Effect size (Cohen's d)
        pooled_std = np.sqrt((np.var(control_values) + np.var(treatment_values)) / 2)
        cohens_d = diff / pooled_std
        
        print(f"\nA/B Test Analysis: {metric_name}")
        print(f"Control Mean: {np.mean(control_values):.4f}")
        print(f"Treatment Mean: {np.mean(treatment_values):.4f}")
        print(f"Difference: {diff:.4f}")
        print(f"95% CI: [{ci_lower:.4f}, {ci_upper:.4f}]")
        print(f"T-statistic: {t_stat:.4f}")
        print(f"P-value: {p_value:.6f}")
        print(f"Cohen's d: {cohens_d:.4f}")
        
        # Interpretation
        if p_value < 0.05:
            direction = "better" if diff > 0 else "worse"
            print(f"Result: Treatment is statistically significantly {direction}")
            
            if abs(cohens_d) < 0.2:
                print("Effect size: Small")
            elif abs(cohens_d) < 0.5:
                print("Effect size: Medium")
            else:
                print("Effect size: Large")
        else:
            print("Result: No statistically significant difference")
        
        return {
            'difference': diff,
            'p_value': p_value,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'cohens_d': cohens_d,
            'significant': p_value < 0.05
        }
    
    def sequential_testing(self, daily_results, stopping_rule='pocket'):
        """
        Sequential A/B testing with early stopping.
        
        Allows monitoring and early stopping if results are clear.
        """
        cumulative_control = []
        cumulative_treatment = []
        
        decisions = []
        
        for day, day_data in enumerate(daily_results):
            cumulative_control.extend(day_data['control'])
            cumulative_treatment.extend(day_data['treatment'])
            
            # Daily analysis
            if len(cumulative_control) > 100 and len(cumulative_treatment) > 100:
                result = self.analyze_results(
                    cumulative_control, 
                    cumulative_treatment,
                    'primary_metric'
                )
                
                # Stopping rule
                if result['p_value'] < 0.01:  # Strong evidence
                    decision = 'STOP_EARLY'
                elif day >= len(daily_results) - 1:
                    decision = 'CONCLUDE'
                else:
                    decision = 'CONTINUE'
                
                decisions.append({
                    'day': day,
                    'decision': decision,
                    'p_value': result['p_value']
                })
                
                print(f"Day {day}: p={result['p_value']:.6f}, Decision: {decision}")
                
                if decision == 'STOP_EARLY':
                    print("Early stopping triggered!")
                    break
        
        return decisions

Regression Testing for Models

import json
import hashlib

class ModelRegressionTester:
    def __init__(self, baseline_model_path, test_suite_path):
        """
        Initialize regression testing framework.
        """
        self.baseline_model = self.load_model(baseline_model_path)
        self.test_suite = self.load_test_suite(test_suite_path)
        self.baseline_results = self.run_baseline()
    
    def load_test_suite(self, path):
        """Load or create test suite."""
        try:
            with open(path, 'r') as f:
                test_suite = json.load(f)
        except FileNotFoundError:
            # Create default test suite
            test_suite = {
                'functional_tests': [],
                'edge_cases': [],
                'performance_tests': [],
                'bias_tests': []
            }
        
        return test_suite
    
    def add_functional_test(self, name, input_text, expected_output, 
                           tolerance=0.0):
        """Add a functional test case."""
        test_case = {
            'name': name,
            'input': input_text,
            'expected': expected_output,
            'tolerance': tolerance,
            'type': 'functional'
        }
        
        self.test_suite['functional_tests'].append(test_case)
        self.save_test_suite()
    
    def add_edge_case(self, name, input_text, expected_behavior):
        """Add an edge case test."""
        test_case = {
            'name': name,
            'input': input_text,
            'expected_behavior': expected_behavior,
            'type': 'edge_case'
        }
        
        self.test_suite['edge_cases'].append(test_case)
        self.save_test_suite()
    
    def save_test_suite(self):
        """Save test suite to file."""
        with open('model_test_suite.json', 'w') as f:
            json.dump(self.test_suite, f, indent=2)
    
    def run_baseline(self):
        """Run test suite on baseline model."""
        results = {}
        
        # Functional tests
        functional_results = []
        for test in self.test_suite['functional_tests']:
            prediction = self.baseline_model.predict([test['input']])[0]
            expected = test['expected']
            
            # Check if within tolerance
            if isinstance(expected, (int, float)):
                passed = abs(prediction - expected) <= test['tolerance']
            else:
                passed = prediction == expected
            
            functional_results.append({
                'name': test['name'],
                'passed': passed,
                'prediction': prediction,
                'expected': expected
            })
        
        results['functional'] = functional_results
        
        # Edge cases
        edge_results = []
        for test in self.test_suite['edge_cases']:
            try:
                prediction = self.baseline_model.predict([test['input']])[0]
                behavior = self.check_expected_behavior(prediction, test['expected_behavior'])
                
                edge_results.append({
                    'name': test['name'],
                    'passed': behavior,
                    'prediction': prediction
                })
            except Exception as e:
                edge_results.append({
                    'name': test['name'],
                    'passed': False,
                    'error': str(e)
                })
        
        results['edge_cases'] = edge_results
        
        # Save baseline results
        with open('baseline_regression_results.json', 'w') as f:
            json.dump(results, f, indent=2)
        
        return results
    
    def check_expected_behavior(self, prediction, expected_behavior):
        """Check if prediction matches expected behavior."""
        if expected_behavior == 'high_confidence':
            return prediction['confidence'] > 0.9
        elif expected_behavior == 'uncertain':
            return prediction['confidence'] < 0.6
        elif expected_behavior == 'specific_class':
            return prediction['class'] == expected_behavior['class']
        else:
            return False
    
    def run_regression_test(self, new_model_path):
        """
        Run regression tests on new model and compare to baseline.
        """
        new_model = self.load_model(new_model_path)
        
        regressions = []
        improvements = []
        
        # Compare functional tests
        for baseline_result in self.baseline_results['functional']:
            test_name = baseline_result['name']
            
            # Find corresponding test
            test = next(t for t in self.test_suite['functional_tests'] 
                       if t['name'] == test_name)
            
            # Run on new model
            new_prediction = new_model.predict([test['input']])[0]
            expected = test['expected']
            
            if isinstance(expected, (int, float)):
                new_passed = abs(new_prediction - expected) <= test['tolerance']
            else:
                new_passed = new_prediction == expected
            
            # Compare
            if baseline_result['passed'] and not new_passed:
                regressions.append({
                    'test': test_name,
                    'type': 'functional',
                    'baseline': baseline_result['prediction'],
                    'new': new_prediction,
                    'expected': expected
                })
            elif not baseline_result['passed'] and new_passed:
                improvements.append({
                    'test': test_name,
                    'type': 'functional',
                    'baseline': baseline_result['prediction'],
                    'new': new_prediction
                })
        
        # Report
        print(f"\n{'='*60}")
        print("REGRESSION TEST RESULTS")
        print(f"{'='*60}")
        print(f"Total Tests: {len(self.baseline_results['functional'])}")
        print(f"Regressions: {len(regressions)}")
        print(f"Improvements: {len(improvements)}")
        
        if regressions:
            print(f"\n⚠️  REGRESSIONS DETECTED:")
            for reg in regressions:
                print(f"  - {reg['test']}: {reg['baseline']} β†’ {reg['new']}")
                print(f"    Expected: {reg['expected']}")
        
        if improvements:
            print(f"\nβœ… IMPROVEMENTS:")
            for imp in improvements:
                print(f"  - {imp['test']}: {imp['baseline']} β†’ {imp['new']}")
        
        return {
            'regressions': regressions,
            'improvements': improvements,
            'passed': len(regressions) == 0
        }
    
    def generate_regression_report(self, results):
        """Generate detailed regression report."""
        report = f"""
# Model Regression Test Report

## Summary
- **Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **Total Tests**: {len(self.baseline_results['functional']) + len(self.baseline_results['edge_cases'])}
- **Regressions**: {len(results['regressions'])}
- **Improvements**: {len(results['improvements'])}
- **Status**: {'βœ… PASS' if results['passed'] else '❌ FAIL'}

## Regressions
"""
        
        for reg in results['regressions']:
            report += f"""
### {reg['test']}
- Type: {reg['type']}
- Baseline: {reg['baseline']}
- New: {reg['new']}
- Expected: {reg['expected']}
"""
        
        return report

Quality Gates and Release Criteria

class QualityGateChecker:
    def __init__(self, release_config):
        """
        Initialize quality gate checker with release criteria.
        
        release_config example:
        {
            'min_accuracy': 0.85,
            'max_fairness_disparity': 0.1,
            'min_robustness_consistency': 0.9,
            'max_calibration_ece': 0.05,
            'no_regressions': True,
            'min_test_coverage': 0.95
        }
        """
        self.config = release_config
    
    def check_all_gates(self, model_metrics):
        """
        Check all quality gates for release readiness.
        """
        gate_results = {}
        all_passed = True
        
        # Accuracy gate
        if 'min_accuracy' in self.config:
            passed = model_metrics['accuracy'] >= self.config['min_accuracy']
            gate_results['accuracy'] = {
                'passed': passed,
                'value': model_metrics['accuracy'],
                'threshold': self.config['min_accuracy']
            }
            if not passed:
                all_passed = False
        
        # Fairness gate
        if 'max_fairness_disparity' in self.config:
            passed = model_metrics['fairness_disparity'] <= self.config['max_fairness_disparity']
            gate_results['fairness'] = {
                'passed': passed,
                'value': model_metrics['fairness_disparity'],
                'threshold': self.config['max_fairness_disparity']
            }
            if not passed:
                all_passed = False
        
        # Robustness gate
        if 'min_robustness_consistency' in self.config:
            passed = model_metrics['robustness_consistency'] >= self.config['min_robustness_consistency']
            gate_results['robustness'] = {
                'passed': passed,
                'value': model_metrics['robustness_consistency'],
                'threshold': self.config['min_robustness_consistency']
            }
            if not passed:
                all_passed = False
        
        # Calibration gate
        if 'max_calibration_ece' in self.config:
            passed = model_metrics['calibration_ece'] <= self.config['max_calibration_ece']
            gate_results['calibration'] = {
                'passed': passed,
                'value': model_metrics['calibration_ece'],
                'threshold': self.config['max_calibration_ece']
            }
            if not passed:
                all_passed = False
        
        # Regression gate
        if 'no_regressions' in self.config and self.config['no_regressions']:
            passed = model_metrics.get('regressions', 0) == 0
            gate_results['regressions'] = {
                'passed': passed,
                'value': model_metrics.get('regressions', 0),
                'threshold': 0
            }
            if not passed:
                all_passed = False
        
        # Print report
        print(f"\n{'='*60}")
        print("QUALITY GATE CHECK")
        print(f"{'='*60}")
        
        for gate, result in gate_results.items():
            status = "βœ… PASS" if result['passed'] else "❌ FAIL"
            print(f"{gate.upper():<20} {status}")
            print(f"  Value: {result['value']:.4f}, Threshold: {result['threshold']:.4f}")
        
        print(f"\n{'='*60}")
        overall_status = "βœ… READY FOR RELEASE" if all_passed else "❌ NOT READY FOR RELEASE"
        print(f"OVERALL: {overall_status}")
        print(f"{'='*60}")
        
        return {
            'all_passed': all_passed,
            'gate_results': gate_results
        }
    
    def generate_release_report(self, model_info, metrics, gate_results):
        """Generate comprehensive release report."""
        report = f"""
# Model Release Report

## Model Information
- **Name**: {model_info['name']}
- **Version**: {model_info['version']}
- **Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **Training Data**: {model_info['training_data']}
- **Architecture**: {model_info['architecture']}

## Performance Metrics
- **Accuracy**: {metrics['accuracy']:.4f}
- **Precision**: {metrics['precision']:.4f}
- **Recall**: {metrics['recall']:.4f}
- **F1 Score**: {metrics['f1']:.4f}
- **AUC-ROC**: {metrics['auc_roc']:.4f}

## Quality Attributes
- **Fairness Disparity**: {metrics['fairness_disparity']:.4f}
- **Robustness Consistency**: {metrics['robustness_consistency']:.4f}
- **Calibration ECE**: {metrics['calibration_ece']:.4f}
- **Regressions**: {metrics.get('regressions', 0)}

## Quality Gate Results
"""
        
        for gate, result in gate_results['gate_results'].items():
            status = "βœ… PASS" if result['passed'] else "❌ FAIL"
            report += f"- {gate.upper()}: {status}\n"
        
        report += f"""
## Release Decision
{'βœ… APPROVED FOR RELEASE' if gate_results['all_passed'] else '❌ RELEASE BLOCKED'}

## Notes
{model_info.get('notes', 'No additional notes')}
"""
        
        return report

Complete Validation Pipeline Example

def complete_validation_pipeline(model, train_data, val_data, test_data, 
                                sensitive_attributes=None):
    """
    Run complete validation pipeline before model release.
    """
    print("="*70)
    print("COMPLETE MODEL VALIDATION PIPELINE")
    print("="*70)
    
    results = {}
    
    # 1. Basic Performance Evaluation
    print("\n[1/8] Basic Performance Evaluation")
    test_preds = model.predict(test_data['texts'])
    test_metrics = calculate_metrics(test_preds, test_data['labels'])
    results['basic_metrics'] = test_metrics
    
    # 2. Cross-Validation
    print("\n[2/8] Cross-Validation")
    cv_results, _ = k_fold_cross_validation(
        type(model), model.config, 
        train_data['texts'], train_data['labels'], 
        k=5
    )
    results['cross_validation'] = cv_results
    
    # 3. Robustness Testing
    print("\n[3/8] Robustness Testing")
    tester = RobustnessTester(model, model.tokenizer)
    robustness_results = tester.comprehensive_robustness_suite(
        test_data['texts'], test_data['labels']
    )
    results['robustness'] = robustness_results
    
    # 4. Fairness Audit (if sensitive attributes provided)
    if sensitive_attributes:
        print("\n[4/8] Fairness Audit")
        auditor = FairnessAuditor(test_preds, test_data['labels'], sensitive_attributes)
        fairness_report = auditor.generate_fairness_report()
        results['fairness'] = fairness_report
    else:
        print("\n[4/8] Fairness Audit: SKIPPED (no sensitive attributes)")
    
    # 5. Calibration Analysis
    print("\n[5/8] Calibration Analysis")
    calibrator = CalibrationAnalyzer(model)
    calibration_results = calibrator.plot_calibration_curve(
        test_data['texts'], test_data['labels']
    )
    results['calibration'] = calibration_results
    
    # 6. Domain Shift Detection (if target data available)
    # Skip for now
    
    # 7. Statistical Significance (if baseline available)
    # Skip for now
    
    # 8. Quality Gate Check
    print("\n[8/8] Quality Gate Check")
    
    # Prepare metrics for quality gates
    gate_metrics = {
        'accuracy': test_metrics['accuracy'],
        'fairness_disparity': max(
            [v['max_disparity'] for k, v in results.get('fairness', {}).items()]
        ) if 'fairness' in results else 0.0,
        'robustness_consistency': np.mean([
            v['consistency'] for v in robustness_results.values()
        ]),
        'calibration_ece': calibration_results['ece'],
        'regressions': 0  # Would check against baseline
    }
    
    release_config = {
        'min_accuracy': 0.80,
        'max_fairness_disparity': 0.15,
        'min_robustness_consistency': 0.85,
        'max_calibration_ece': 0.10,
        'no_regressions': True
    }
    
    gate_checker = QualityGateChecker(release_config)
    gate_results = gate_checker.check_all_gates(gate_metrics)
    results['quality_gates'] = gate_results
    
    # Final Summary
    print("\n" + "="*70)
    print("VALIDATION SUMMARY")
    print("="*70)
    print(f"Basic Accuracy: {test_metrics['accuracy']:.4f}")
    print(f"CV Accuracy: {cv_results['accuracy']['mean']:.4f} Β± {cv_results['accuracy']['std']:.4f}")
    print(f"Avg Robustness: {gate_metrics['robustness_consistency']:.4f}")
    if 'fairness' in results:
        print(f"Max Fairness Disparity: {gate_metrics['fairness_disparity']:.4f}")
    print(f"Calibration ECE: {gate_metrics['calibration_ece']:.4f}")
    print(f"\nRelease Status: {'βœ… APPROVED' if gate_results['all_passed'] else '❌ BLOCKED'}")
    
    return results

# Usage
# validation_results = complete_validation_pipeline(
#     model, train_data, val_data, test_data,
#     sensitive_attributes={'gender': gender_array, 'age': age_array}
# )

Best Practices Checklist

Pre-Release Validation Checklist

  • Data Splits: Proper train/val/test separation with no leakage
  • Cross-Validation: K-fold CV completed with consistent results
  • Statistical Power: Test set size sufficient for desired confidence
  • Performance Metrics: All primary metrics meet thresholds
  • Fairness Audit: No significant bias across protected groups
  • Robustness Testing: Model stable under perturbations
  • Calibration: Predictions well-calibrated (ECE < threshold)
  • Edge Cases: Critical edge cases handled correctly
  • Regression Tests: No regressions from baseline
  • Documentation: All validation results documented

Continuous Validation

  • Automated Testing: Validation suite runs on every commit
  • Monitoring: Production performance tracked continuously
  • Drift Detection: Data drift monitored and alerted
  • Periodic Re-evaluation: Full validation quarterly
  • Incident Response: Process for handling validation failures

Next Steps

In the next tutorial, we'll cover:

  • Continual Learning: Strategies for updating models with new data
  • Catastrophic Forgetting Prevention: Techniques to retain old knowledge
  • Incremental Training: Efficient updates without full retraining
  • Version Management: Model versioning and rollback strategies
  • Production Deployment: Serving, scaling, and monitoring