Nexuss-Transformer / Tutorials /07-validation-testing.md
Nexuss0781's picture
Upload data/train-00000-of-00001.parquet with huggingface_hub
7cb972e
# Tutorial 07: Model Validation, Testing & Quality Assurance
## Overview
This tutorial covers comprehensive model validation, testing strategies, and quality assurance processes essential for production-ready AI systems. We'll explore NTF's unified metrics utilities, statistical validation methods, bias detection, robustness testing, and systematic evaluation frameworks.
## Table of Contents
1. [Validation Fundamentals](#validation-fundamentals)
2. [NTF Metrics Utilities](#ntf-metrics-utilities)
3. [Train/Validation/Test Splits](#trainvalidationtest-splits)
4. [Cross-Validation Techniques](#cross-validation-techniques)
5. [Statistical Significance Testing](#statistical-significance-testing)
6. [Bias and Fairness Detection](#bias-and-fairness-detection)
7. [Robustness Testing](#robustness-testing)
8. [Adversarial Testing](#adversarial-testing)
9. [Domain Shift Detection](#domain-shift-detection)
10. [Calibration and Confidence Estimation](#calibration-and-confidence-estimation)
11. [A/B Testing Framework](#ab-testing-framework)
12. [Regression Testing for Models](#regression-testing-for-models)
13. [Quality Gates and Release Criteria](#quality-gates-and-release-criteria)
---
## Validation Fundamentals
### Why Validation Matters
Validation ensures your model:
- Generalizes to unseen data
- Doesn't overfit training distributions
- Meets performance requirements
- Behaves safely across edge cases
- Maintains consistency across versions
### Validation Pyramid
```
Production Monitoring
/\
/ \
/ \
/------\
/ A/B \
/ Testing \
/------------\
/ Holdout \
/ Testing \
/------------------\
/ Cross-Validation \
/----------------------\
/ Train/Val Split \
/--------------------------\
```
**Key Principles:**
1. **Data Isolation**: Never leak test data into training
2. **Distribution Matching**: Test data should match production distribution
3. **Statistical Power**: Ensure sufficient sample sizes
4. **Multiple Metrics**: Evaluate across diverse dimensions
5. **Reproducibility**: Fixed seeds and documented procedures
---
## NTF Metrics Utilities
### Using NTF's Unified Evaluation Interface
NTF provides comprehensive metrics utilities through `ntf.utils.metrics`. This replaces manual metric implementations with a unified, efficient interface.
```python
from ntf.utils.metrics import (
compute_perplexity,
compute_accuracy,
evaluate_model,
compare_models,
benchmark_throughput,
EvaluationResults
)
from torch.utils.data import DataLoader
import torch
# Load your model and tokenizer
model, tokenizer = load_model_and_tokenizer("path/to/model")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Prepare test dataloader
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# Comprehensive evaluation
results = evaluate_model(
model=model,
dataloader=test_dataloader,
device=device,
compute_generation_metrics=True,
tokenizer=tokenizer
)
print(f"Perplexity: {results.perplexity:.2f}")
print(f"Loss: {results.loss:.4f}")
print(f"Token Accuracy: {results.token_accuracy:.4f}")
print(f"BLEU Score: {results.bleu_score:.4f}")
print(f"ROUGE-L: {results.rouge_l:.4f}")
```
### Individual Metric Functions
For specific metric computation, NTF provides standalone functions:
```python
# Compute perplexity only
perplexity = compute_perplexity(model, test_dataloader, device)
print(f"Perplexity: {perplexity:.2f}")
# Compute accuracy only
accuracy = compute_accuracy(model, test_dataloader, device)
print(f"Accuracy: {accuracy:.4f}")
```
### Comparing Multiple Checkpoints
NTF makes it easy to compare different model checkpoints:
```python
from ntf.utils.metrics import compare_models
# Compare two model versions
comparison = compare_models(
model_a=model_v1,
model_b=model_v2,
dataloader=val_dataloader,
device=device
)
print(f"Model A Perplexity: {comparison['model_a']['perplexity']:.2f}")
print(f"Model B Perplexity: {comparison['model_b']['perplexity']:.2f}")
print(f"Improvement: {comparison['improvement']['perplexity']:.2f}%")
print(f"Accuracy Gain: {comparison['improvement']['accuracy']:.2f}%")
```
### Benchmarking Throughput
For production deployment, benchmark model throughput:
```python
throughput_results = benchmark_throughput(
model=model,
tokenizer=tokenizer,
device=device,
sequence_length=512,
batch_size=1,
num_iterations=10
)
print(f"Prefill Throughput: {throughput_results['prefill_throughput']:.2f} tokens/sec")
print(f"Decode Throughput: {throughput_results['decode_throughput']:.2f} tokens/sec")
```
### Metric Selection Guide
Different tasks require different evaluation metrics. Use this guide to select appropriate metrics:
| Task Type | Recommended Metrics | NTF Functions |
|-----------|---------------------|---------------|
| Text Generation | Perplexity, BLEU, ROUGE, BERTScore | `evaluate_model(compute_generation_metrics=True)` |
| Classification | Accuracy, F1, Precision, Recall | `compute_accuracy()` + custom F1 |
| Summarization | ROUGE, BERTScore | `evaluate_model()` with ROUGE |
| Translation | BLEU, chrF, COMET | `evaluate_model()` with BLEU |
| Question Answering | Exact Match, F1 | Custom implementation |
| Language Modeling | Perplexity | `compute_perplexity()` |
### Checkpoint Comparison Workflow
Here's a complete workflow for comparing multiple checkpoints during development:
```python
from ntf.utils.metrics import evaluate_model
from pathlib import Path
import json
def compare_checkpoints(checkpoint_paths, eval_dataset, tokenizer, device):
"""Compare multiple checkpoints on the same evaluation dataset."""
from torch.utils.data import DataLoader
eval_dataloader = DataLoader(eval_dataset, batch_size=32, shuffle=False)
results = {}
for checkpoint_path in checkpoint_paths:
print(f"\nEvaluating {checkpoint_path}...")
# Load checkpoint
model, _ = load_model_and_tokenizer(checkpoint_path)
model.to(device)
model.eval()
# Evaluate
eval_results = evaluate_model(
model=model,
dataloader=eval_dataloader,
device=device,
compute_generation_metrics=True,
tokenizer=tokenizer
)
# Store results
results[checkpoint_path] = {
'perplexity': eval_results.perplexity,
'loss': eval_results.loss,
'accuracy': eval_results.accuracy,
'bleu': eval_results.bleu_score,
'rouge_l': eval_results.rouge_l
}
print(f" Perplexity: {eval_results.perplexity:.2f}")
print(f" Accuracy: {eval_results.accuracy:.4f}")
# Find best checkpoint
best_checkpoint = min(results.keys(), key=lambda k: results[k]['perplexity'])
print(f"\nBest checkpoint (lowest perplexity): {best_checkpoint}")
return results
# Usage
checkpoints = [
"./checkpoints/step_1000",
"./checkpoints/step_2000",
"./checkpoints/step_3000",
"./checkpoints/final"
]
all_results = compare_checkpoints(
checkpoints,
val_dataset,
tokenizer,
device
)
# Save results for tracking
with open("./evaluation_results.json", "w") as f:
json.dump(all_results, f, indent=2)
```
---
## Train/Validation/Test Splits
### Strategic Data Partitioning
```python
from sklearn.model_selection import train_test_split
import numpy as np
def strategic_data_split(data, labels, strategy='stratified'):
"""
Create train/val/test splits with proper stratification.
Args:
data: Input samples
labels: Corresponding labels
strategy: 'stratified', 'temporal', 'grouped'
Returns:
train, val, test splits
"""
if strategy == 'stratified':
# First split: train+val vs test (80/20)
X_train_val, X_test, y_train_val, y_test = train_test_split(
data, labels,
test_size=0.2,
stratify=labels, # Maintain class distribution
random_state=42
)
# Second split: train vs val (80/20 of remaining = 64/16 total)
X_train, X_val, y_train, y_val = train_test_split(
X_train_val, y_train_val,
test_size=0.2,
stratify=y_train_val,
random_state=42
)
elif strategy == 'temporal':
# Time-based split for temporal data
split_point_1 = int(len(data) * 0.6)
split_point_2 = int(len(data) * 0.8)
X_train = data[:split_point_1]
X_val = data[split_point_1:split_point_2]
X_test = data[split_point_2:]
y_train = labels[:split_point_1]
y_val = labels[split_point_1:split_point_2]
y_test = labels[split_point_2:]
elif strategy == 'grouped':
# Group-based split (e.g., by user, document, session)
from sklearn.model_selection import GroupShuffleSplit
groups = get_groups(data) # Your grouping logic
gss = GroupShuffleSplit(
n_splits=1,
test_size=0.2,
random_state=42
)
train_val_idx, test_idx = next(gss.split(data, groups=groups))
X_train_val, X_test = data[train_val_idx], data[test_idx]
y_train_val, y_test = labels[train_val_idx], labels[test_idx]
# Split train_val further
gss2 = GroupShuffleSplit(
n_splits=1,
test_size=0.2,
random_state=42
)
train_idx, val_idx = next(gss2.split(
X_train_val,
groups=groups[train_val_idx]
))
X_train, X_val = X_train_val[train_idx], X_train_val[val_idx]
y_train, y_val = y_train_val[train_idx], y_train_val[val_idx]
return {
'train': (X_train, y_train),
'val': (X_val, y_val),
'test': (X_test, y_test)
}
# Usage example
splits = strategic_data_split(texts, labels, strategy='stratified')
print(f"Train: {len(splits['train'][0])}, Val: {len(splits['val'][0])}, Test: {len(splits['test'][0])}")
```
### Split Ratio Guidelines
| Dataset Size | Train | Val | Test | Rationale |
|-------------|-------|-----|------|-----------|
| < 10K | 70% | 15% | 15% | Need more validation signal |
| 10K-100K | 80% | 10% | 10% | Balanced approach |
| 100K-1M | 90% | 5% | 5% | Large data, less validation needed |
| > 1M | 95% | 2.5% | 2.5% | Massive data, small holdouts sufficient |
### Common Splitting Mistakes
**Data Leakage**:
```python
# WRONG: Preprocessing before split
scaler.fit(data) # Fits on ALL data including test!
data_scaled = scaler.transform(data)
# Then split...
# CORRECT: Split first
X_train, X_test = train_test_split(data, test_size=0.2)
scaler.fit(X_train) # Fit only on train
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test) # Transform test with train stats
```
**Temporal Leakage**:
```python
# WRONG: Random shuffle on time-series
random.shuffle(time_series_data)
# CORRECT: Respect temporal order
cutoff = int(len(data) * 0.8)
train = data[:cutoff]
test = data[cutoff:]
```
---
## Cross-Validation Techniques
### K-Fold Cross-Validation
```python
from sklearn.model_selection import KFold, StratifiedKFold
import torch
from torch.utils.data import DataLoader
def k_fold_cross_validation(model_class, config, data, labels, k=5):
"""
Perform k-fold cross-validation for robust performance estimation.
Returns metrics for each fold and aggregate statistics.
"""
skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
fold_metrics = []
for fold, (train_idx, val_idx) in enumerate(skf.split(data, labels)):
print(f"\n{'='*50}")
print(f"FOLD {fold + 1}/{k}")
print(f"{'='*50}")
# Prepare fold data
X_train, X_val = data[train_idx], data[val_idx]
y_train, y_val = labels[train_idx], labels[val_idx]
# Initialize fresh model for this fold
model = model_class(config)
# Train on this fold
trainer = Trainer(
model=model,
train_data=(X_train, y_train),
val_data=(X_val, y_val),
config=config
)
trainer.train()
# Evaluate
metrics = trainer.evaluate()
fold_metrics.append(metrics)
print(f"Fold {fold + 1} Accuracy: {metrics['accuracy']:.4f}")
print(f"Fold {fold + 1} F1: {metrics['f1']:.4f}")
# Aggregate results
aggregated = {}
for key in fold_metrics[0].keys():
values = [m[key] for m in fold_metrics]
aggregated[key] = {
'mean': np.mean(values),
'std': np.std(values),
'min': np.min(values),
'max': np.max(values),
'all': values
}
print(f"\n{'='*50}")
print("CROSS-VALIDATION SUMMARY")
print(f"{'='*50}")
for metric, stats in aggregated.items():
print(f"{metric}: {stats['mean']:.4f} ± {stats['std']:.4f}")
print(f" Range: [{stats['min']:.4f}, {stats['max']:.4f}]")
return aggregated, fold_metrics
# Usage
cv_results, fold_details = k_fold_cross_validation(
MyModel, config, texts, labels, k=5
)
```
### Nested Cross-Validation for Hyperparameter Tuning
```python
from sklearn.model_selection import ParameterGrid
def nested_cross_validation(model_class, base_config, data, labels,
param_grid, outer_k=5, inner_k=3):
"""
Nested CV: Outer loop for evaluation, inner loop for hyperparameter selection.
Prevents optimistic bias from hyperparameter tuning.
"""
outer_cv = StratifiedKFold(n_splits=outer_k, shuffle=True, random_state=42)
outer_scores = []
best_params_per_fold = []
for outer_fold, (outer_train_idx, outer_test_idx) in enumerate(
outer_cv.split(data, labels)
):
print(f"\n{'='*60}")
print(f"OUTER FOLD {outer_fold + 1}/{outer_k}")
print(f"{'='*60}")
# Outer split
X_outer_train = data[outer_train_idx]
X_outer_test = data[outer_test_idx]
y_outer_train = labels[outer_train_idx]
y_outer_test = labels[outer_test_idx]
# Inner CV for hyperparameter selection
inner_cv = StratifiedKFold(n_splits=inner_k, shuffle=True, random_state=42)
param_scores = {params: [] for params in ParameterGrid(param_grid)}
for inner_train_idx, inner_val_idx in inner_cv.split(
X_outer_train, y_outer_train
):
X_inner_train = X_outer_train[inner_train_idx]
X_inner_val = X_outer_train[inner_val_idx]
y_inner_train = y_outer_train[inner_train_idx]
y_inner_val = y_outer_train[inner_val_idx]
# Test each parameter combination
for params in ParameterGrid(param_grid):
config = {**base_config, **params}
model = model_class(config)
trainer = Trainer(
model=model,
train_data=(X_inner_train, y_inner_train),
val_data=(X_inner_val, y_inner_val),
config=config
)
trainer.train()
metrics = trainer.evaluate()
param_scores[params].append(metrics['accuracy'])
# Select best parameters based on inner CV
best_params = max(param_scores.keys(),
key=lambda p: np.mean(param_scores[p]))
best_score = np.mean(param_scores[best_params])
print(f"Best params for outer fold {outer_fold + 1}: {best_params}")
print(f"Inner CV score: {best_score:.4f}")
# Train final model for this outer fold with best params
final_config = {**base_config, **best_params}
final_model = model_class(final_config)
final_trainer = Trainer(
model=final_model,
train_data=(X_outer_train, y_outer_train),
val_data=None, # Use all outer train data
config=final_config
)
final_trainer.train()
# Evaluate on held-out outer test set
test_metrics = final_trainer.evaluate_on_test(X_outer_test, y_outer_test)
outer_scores.append(test_metrics['accuracy'])
best_params_per_fold.append(best_params)
# Final aggregated results
print(f"\n{'='*60}")
print("NESTED CV FINAL RESULTS")
print(f"{'='*60}")
print(f"Test Accuracy: {np.mean(outer_scores):.4f} ± {np.std(outer_scores):.4f}")
print(f"Range: [{np.min(outer_scores):.4f}, {np.max(outer_scores):.4f}]")
return {
'mean_score': np.mean(outer_scores),
'std_score': np.std(outer_scores),
'scores': outer_scores,
'best_params_per_fold': best_params_per_fold
}
# Example usage
param_grid = {
'learning_rate': [1e-5, 3e-5, 5e-5],
'batch_size': [16, 32],
'num_layers': [6, 12]
}
nested_results = nested_cross_validation(
TransformerModel,
base_config,
data,
labels,
param_grid,
outer_k=5,
inner_k=3
)
```
### Leave-One-Out and Leave-P-Out
```python
from sklearn.model_selection import LeaveOneOut, LeavePOut
def leave_one_out_validation(model_class, config, data, labels, max_samples=1000):
"""
Leave-One-Out CV: Extremely thorough but computationally expensive.
Use only for small datasets (< 1000 samples).
"""
if len(data) > max_samples:
print(f"Warning: LOO is too expensive for {len(data)} samples.")
print(f"Consider using k-fold with k=10 instead.")
return None
loo = LeaveOneOut()
scores = []
for train_idx, test_idx in loo.split(data):
X_train, X_test = data[train_idx], data[test_idx]
y_train, y_test = labels[train_idx], labels[test_idx]
model = model_class(config)
trainer = Trainer(model, (X_train, y_train), None, config)
trainer.train()
metrics = trainer.evaluate_on_test(X_test, y_test)
scores.append(metrics['accuracy'])
return {'mean': np.mean(scores), 'std': np.std(scores), 'all': scores}
```
---
## Statistical Significance Testing
### Comparing Two Models
```python
from scipy import stats
import numpy as np
def paired_t_test(model_a_predictions, model_b_predictions, ground_truth):
"""
Paired t-test to compare two models on the same test set.
Tests if the difference in performance is statistically significant.
"""
# Calculate per-sample correctness
correct_a = (model_a_predictions == ground_truth).astype(int)
correct_b = (model_b_predictions == ground_truth).astype(int)
# Paired t-test on correctness scores
t_statistic, p_value = stats.ttest_rel(correct_a, correct_b)
acc_a = np.mean(correct_a)
acc_b = np.mean(correct_b)
print(f"Model A Accuracy: {acc_a:.4f}")
print(f"Model B Accuracy: {acc_b:.4f}")
print(f"Difference: {acc_b - acc_a:.4f}")
print(f"T-statistic: {t_statistic:.4f}")
print(f"P-value: {p_value:.6f}")
if p_value < 0.05:
significance = "SIGNIFICANT" if acc_b > acc_a else "SIGNIFICANT (worse)"
print(f"Result: Model B is {significance} than Model A (p < 0.05)")
else:
print(f"Result: No significant difference (p >= 0.05)")
return {
't_statistic': t_statistic,
'p_value': p_value,
'significant': p_value < 0.05,
'accuracy_difference': acc_b - acc_a
}
def mcnemar_test(model_a_predictions, model_b_predictions, ground_truth):
"""
McNemar's test for paired nominal data.
More appropriate than t-test for classification accuracy comparison.
"""
# Build contingency table
both_correct = np.sum((model_a_predictions == ground_truth) &
(model_b_predictions == ground_truth))
a_correct_b_wrong = np.sum((model_a_predictions == ground_truth) &
(model_b_predictions != ground_truth))
a_wrong_b_correct = np.sum((model_a_predictions != ground_truth) &
(model_b_predictions == ground_truth))
both_wrong = np.sum((model_a_predictions != ground_truth) &
(model_b_predictions != ground_truth))
print("Contingency Table:")
print(f" Model B Correct | Model B Wrong")
print(f"Model A Correct {both_correct:6d} {a_correct_b_wrong:6d}")
print(f"Model A Wrong {a_wrong_b_correct:6d} {both_wrong:6d}")
# McNemar's test statistic (with continuity correction)
b = a_correct_b_wrong
c = a_wrong_b_correct
if b + c == 0:
print("Cannot perform test: no discordant pairs")
return None
chi2 = (abs(b - c) - 1) ** 2 / (b + c)
p_value = 1 - stats.chi2.cdf(chi2, 1)
print(f"\nMcNemar's Chi-squared: {chi2:.4f}")
print(f"P-value: {p_value:.6f}")
if p_value < 0.05:
winner = "Model B" if c > b else "Model A"
print(f"Result: {winner} is significantly better (p < 0.05)")
return {
'chi2': chi2,
'p_value': p_value,
'significant': p_value < 0.05,
'discordant_pairs': {'b': b, 'c': c}
}
# Bootstrap confidence intervals
def bootstrap_confidence_interval(predictions, ground_truth,
metric_fn, n_bootstrap=1000,
confidence_level=0.95):
"""
Estimate confidence intervals using bootstrapping.
"""
n_samples = len(predictions)
bootstrap_scores = []
for i in range(n_bootstrap):
# Sample with replacement
indices = np.random.choice(n_samples, size=n_samples, replace=True)
sampled_preds = predictions[indices]
sampled_true = ground_truth[indices]
score = metric_fn(sampled_preds, sampled_true)
bootstrap_scores.append(score)
# Calculate confidence interval
alpha = 1 - confidence_level
lower_percentile = alpha / 2 * 100
upper_percentile = (1 - alpha / 2) * 100
ci_lower = np.percentile(bootstrap_scores, lower_percentile)
ci_upper = np.percentile(bootstrap_scores, upper_percentile)
mean_score = np.mean(bootstrap_scores)
std_score = np.std(bootstrap_scores)
print(f"Bootstrap Results ({n_bootstrap} iterations):")
print(f"Mean Score: {mean_score:.4f}")
print(f"Std Dev: {std_score:.4f}")
print(f"{confidence_level*100}% CI: [{ci_lower:.4f}, {ci_upper:.4f}]")
return {
'mean': mean_score,
'std': std_score,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'bootstrap_scores': bootstrap_scores
}
# Usage example
result = paired_t_test(preds_v1, preds_v2, labels)
mcnemar_result = mcnemar_test(preds_v1, preds_v2, labels)
ci_result = bootstrap_confidence_interval(preds_v2, labels, accuracy_score)
```
### Multiple Comparison Correction
```python
from statsmodels.stats.multitest import multipletests
def compare_multiple_models(model_predictions_list, ground_truth, method='fdr_bh'):
"""
Compare multiple models with correction for multiple comparisons.
Args:
model_predictions_list: List of (model_name, predictions) tuples
ground_truth: True labels
method: Correction method ('bonferroni', 'fdr_bh', 'holm', etc.)
"""
# Use first model as baseline
baseline_name, baseline_preds = model_predictions_list[0]
baseline_correct = (baseline_preds == ground_truth).astype(int)
p_values = []
model_names = []
for model_name, model_preds in model_predictions_list[1:]:
model_correct = (model_preds == ground_truth).astype(int)
# Paired t-test against baseline
_, p_value = stats.ttest_rel(model_correct, baseline_correct)
p_values.append(p_value)
model_names.append(model_name)
# Apply correction
reject, corrected_p_values, _, _ = multipletests(
p_values,
alpha=0.05,
method=method
)
print(f"Multiple Comparison Correction: {method}")
print(f"{'Model':<20} {'Raw P':<10} {'Corrected P':<12} {'Significant'}")
print("-" * 55)
for name, raw_p, corr_p, sig in zip(model_names, p_values, corrected_p_values, reject):
print(f"{name:<20} {raw_p:<10.6f} {corr_p:<12.6f} {'Yes' if sig else 'No'}")
return {
'model_names': model_names,
'raw_p_values': p_values,
'corrected_p_values': corrected_p_values,
'reject_null': reject
}
```
---
## Bias and Fairness Detection
### Demographic Parity and Equalized Odds
```python
import pandas as pd
from sklearn.metrics import confusion_matrix
class FairnessAuditor:
def __init__(self, predictions, ground_truth, sensitive_attributes):
"""
Args:
predictions: Model predictions
ground_truth: True labels
sensitive_attributes: Dict of protected attributes
(e.g., {'gender': [...], 'race': [...]})
"""
self.predictions = np.array(predictions)
self.ground_truth = np.array(ground_truth)
self.sensitive_attributes = sensitive_attributes
def demographic_parity(self, attribute_name):
"""
Check if positive prediction rates are equal across groups.
Demographic parity: P(Ŷ=1|A=0) = P(Ŷ=1|A=1)
"""
attribute = self.sensitive_attributes[attribute_name]
unique_groups = np.unique(attribute)
positive_rates = {}
for group in unique_groups:
mask = attribute == group
positive_rate = np.mean(self.predictions[mask] == 1)
positive_rates[group] = positive_rate
# Calculate disparity
rates = list(positive_rates.values())
max_disparity = max(rates) - min(rates)
print(f"Demographic Parity for '{attribute_name}':")
print(f"{'Group':<15} {'Positive Rate':<15}")
print("-" * 30)
for group, rate in positive_rates.items():
print(f"{str(group):<15} {rate:.4f}")
print(f"\nMax Disparity: {max_disparity:.4f}")
# Rule of thumb: disparity < 0.1 is acceptable
passed = max_disparity < 0.1
print(f"Status: {'PASS' if passed else 'FAIL'} (threshold: 0.1)")
return {
'positive_rates': positive_rates,
'max_disparity': max_disparity,
'passed': passed
}
def equalized_odds(self, attribute_name):
"""
Check if TPR and FPR are equal across groups.
Equalized odds: P(Ŷ=1|Y=1,A=0) = P(Ŷ=1|Y=1,A=1)
and P(Ŷ=1|Y=0,A=0) = P(Ŷ=1|Y=0,A=1)
"""
attribute = self.sensitive_attributes[attribute_name]
unique_groups = np.unique(attribute)
tpr_by_group = {}
fpr_by_group = {}
for group in unique_groups:
mask = attribute == group
# True Positive Rate (Recall)
positive_mask = mask & (self.ground_truth == 1)
if np.sum(positive_mask) > 0:
tpr = np.mean(self.predictions[positive_mask] == 1)
else:
tpr = 0.0
# False Positive Rate
negative_mask = mask & (self.ground_truth == 0)
if np.sum(negative_mask) > 0:
fpr = np.mean(self.predictions[negative_mask] == 1)
else:
fpr = 0.0
tpr_by_group[group] = tpr
fpr_by_group[group] = fpr
# Calculate disparities
tpr_disparity = max(tpr_by_group.values()) - min(tpr_by_group.values())
fpr_disparity = max(fpr_by_group.values()) - min(fpr_by_group.values())
print(f"\nEqualized Odds for '{attribute_name}':")
print(f"{'Group':<10} {'TPR':<10} {'FPR':<10}")
print("-" * 30)
for group in unique_groups:
print(f"{str(group):<10} {tpr_by_group[group]:.4f} {fpr_by_group[group]:.4f}")
print(f"\nTPR Disparity: {tpr_disparity:.4f}")
print(f"FPR Disparity: {fpr_disparity:.4f}")
passed = (tpr_disparity < 0.1) and (fpr_disparity < 0.1)
print(f"Status: {'PASS' if passed else 'FAIL'} (threshold: 0.1)")
return {
'tpr_by_group': tpr_by_group,
'fpr_by_group': fpr_by_group,
'tpr_disparity': tpr_disparity,
'fpr_disparity': fpr_disparity,
'passed': passed
}
def predictive_parity(self, attribute_name):
"""
Check if precision is equal across groups.
Predictive parity: P(Y=1|Ŷ=1,A=0) = P(Y=1|Ŷ=1,A=1)
"""
attribute = self.sensitive_attributes[attribute_name]
unique_groups = np.unique(attribute)
precision_by_group = {}
for group in unique_groups:
mask = attribute == group
predicted_positive_mask = mask & (self.predictions == 1)
if np.sum(predicted_positive_mask) > 0:
precision = np.mean(self.ground_truth[predicted_positive_mask] == 1)
else:
precision = 0.0
precision_by_group[group] = precision
disparity = max(precision_by_group.values()) - min(precision_by_group.values())
print(f"\nPredictive Parity for '{attribute_name}':")
print(f"{'Group':<15} {'Precision':<15}")
print("-" * 30)
for group, prec in precision_by_group.items():
print(f"{str(group):<15} {prec:.4f}")
print(f"\nDisparity: {disparity:.4f}")
passed = disparity < 0.1
print(f"Status: {'PASS' if passed else 'FAIL'}")
return {
'precision_by_group': precision_by_group,
'disparity': disparity,
'passed': passed
}
def generate_fairness_report(self):
"""Generate comprehensive fairness report for all attributes."""
report = {}
for attr_name in self.sensitive_attributes.keys():
print(f"\n{'='*60}")
print(f"FAIRNESS AUDIT: {attr_name.upper()}")
print(f"{'='*60}")
report[attr_name] = {
'demographic_parity': self.demographic_parity(attr_name),
'equalized_odds': self.equalized_odds(attr_name),
'predictive_parity': self.predictive_parity(attr_name)
}
return report
# Usage example
auditor = FairnessAuditor(
predictions=model_preds,
ground_truth=true_labels,
sensitive_attributes={
'gender': gender_array,
'age_group': age_group_array,
'region': region_array
}
)
fairness_report = auditor.generate_fairness_report()
```
### Bias Mitigation Strategies
```python
class BiasMitigator:
def __init__(self, model, training_data, sensitive_attributes):
self.model = model
self.training_data = training_data
self.sensitive_attributes = sensitive_attributes
def reweighting(self, target_attribute):
"""
Reweight samples to balance representation across groups.
"""
attribute = self.sensitive_attributes[target_attribute]
unique_groups, counts = np.unique(attribute, return_counts=True)
# Calculate weights inversely proportional to group size
total_samples = len(attribute)
weights = np.zeros_like(attribute, dtype=float)
for group, count in zip(unique_groups, counts):
mask = attribute == group
# Weight = total_samples / (num_groups * count_in_group)
weights[mask] = total_samples / (len(unique_groups) * count)
# Normalize weights
weights = weights * len(attribute) / np.sum(weights)
print(f"Reweighting for '{target_attribute}':")
print(f"Weight range: [{weights.min():.4f}, {weights.max():.4f}]")
return weights
def adversarial_debiasing(self, debias_epochs=10):
"""
Train adversary to predict sensitive attribute from representations.
Update model to minimize adversary's success.
"""
# Implementation would add adversarial head to model
# and alternate between main task and adversarial training
pass
def threshold_optimization(self, val_predictions, val_labels,
sensitive_attributes, target_attribute):
"""
Optimize decision thresholds per group to equalize metrics.
"""
attribute = sensitive_attributes[target_attribute]
unique_groups = np.unique(attribute)
optimal_thresholds = {}
for group in unique_groups:
mask = attribute == group
group_preds = val_predictions[mask]
group_labels = val_labels[mask]
# Find threshold that equalizes TPR across groups
best_threshold = 0.5
best_metric = 0
for threshold in np.arange(0.1, 0.9, 0.05):
binarized_preds = (group_preds > threshold).astype(int)
tpr = np.sum((binarized_preds == 1) & (group_labels == 1)) / \
np.sum(group_labels == 1)
# Optimize for equal TPR (simplified)
if tpr > best_metric:
best_metric = tpr
best_threshold = threshold
optimal_thresholds[group] = best_threshold
print(f"Optimal thresholds for '{target_attribute}':")
for group, thresh in optimal_thresholds.items():
print(f" Group {group}: {thresh:.2f}")
return optimal_thresholds
```
---
## Robustness Testing
### Perturbation Testing
```python
import random
import string
class RobustnessTester:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def character_level_perturbations(self, texts, perturbation_rate=0.1):
"""
Test robustness to character-level noise.
Types:
- Random character insertion
- Random character deletion
- Random character substitution
- Adjacent character swap
"""
perturbed_texts = []
for text in texts:
chars = list(text)
num_perturbations = max(1, int(len(chars) * perturbation_rate))
for _ in range(num_perturbations):
op = random.choice(['insert', 'delete', 'substitute', 'swap'])
idx = random.randint(0, len(chars) - 1)
if op == 'insert' and chars[idx].isalpha():
chars.insert(idx, random.choice(string.ascii_lowercase))
elif op == 'delete' and len(chars) > 1:
chars.pop(idx)
elif op == 'substitute' and chars[idx].isalpha():
chars[idx] = random.choice(string.ascii_lowercase)
elif op == 'swap' and idx < len(chars) - 1:
chars[idx], chars[idx + 1] = chars[idx + 1], chars[idx]
perturbed_texts.append(''.join(chars))
return perturbed_texts
def word_level_perturbations(self, texts, perturbation_rate=0.1):
"""
Test robustness to word-level noise.
Types:
- Random word deletion
- Random word insertion (synonyms or random)
- Word order shuffling (local)
"""
perturbed_texts = []
for text in texts:
words = text.split()
num_perturbations = max(1, int(len(words) * perturbation_rate))
for _ in range(num_perturbations):
op = random.choice(['delete', 'insert', 'shuffle'])
idx = random.randint(0, len(words) - 1)
if op == 'delete' and len(words) > 1:
words.pop(idx)
elif op == 'insert':
# Insert random common word
common_words = ['the', 'a', 'is', 'it', 'this', 'that']
words.insert(idx, random.choice(common_words))
elif op == 'shuffle' and idx < len(words) - 1:
words[idx], words[idx + 1] = words[idx + 1], words[idx]
perturbed_texts.append(' '.join(words))
return perturbed_texts
def synonym_replacement(self, texts, replacement_rate=0.1):
"""
Replace words with synonyms using WordNet or similar.
"""
try:
from nltk.corpus import wordnet
from nltk import word_tokenize, pos_tag
except ImportError:
print("NLTK not available. Install with: pip install nltk")
return texts
perturbed_texts = []
for text in texts:
words = text.split()
num_replacements = max(1, int(len(words) * replacement_rate))
for _ in range(num_replacements):
idx = random.randint(0, len(words) - 1)
word = words[idx]
# Get synonyms
synsets = wordnet.synsets(word)
if synsets:
synonyms = []
for synset in synsets:
for lemma in synset.lemmas():
synonym = lemma.name().replace('_', ' ')
if synonym.lower() != word.lower():
synonyms.append(synonym)
if synonyms:
words[idx] = random.choice(synonyms)
perturbed_texts.append(' '.join(words))
return perturbed_texts
def back_translation(self, texts, intermediate_lang='de'):
"""
Test robustness via back-translation (round-trip translation).
Translate to intermediate language and back.
"""
try:
from transformers import MarianMTModel, MarianTokenizer
# Load translation models
trans_to_tokenizer = MarianTokenizer.from_pretrained(
f'Helsinki-NLP/opus-mt-en-{intermediate_lang}'
)
trans_to_model = MarianMTModel.from_pretrained(
f'Helsinki-NLP/opus-mt-en-{intermediate_lang}'
)
trans_back_tokenizer = MarianTokenizer.from_pretrained(
f'Helsinki-NLP/opus-mt-{intermediate_lang}-en'
)
trans_back_model = MarianMTModel.from_pretrained(
f'Helsinki-NLP/opus-mt-{intermediate_lang}-en'
)
except Exception as e:
print(f"Translation models not available: {e}")
return texts
perturbed_texts = []
for text in texts:
# Translate to intermediate language
inputs_to = trans_to_tokenizer(text, return_tensors='pt', padding=True)
translated_to = trans_to_model.generate(**inputs_to)
intermediate = trans_to_tokenizer.decode(translated_to[0], skip_special_tokens=True)
# Translate back to English
inputs_back = trans_back_tokenizer(intermediate, return_tensors='pt', padding=True)
translated_back = trans_back_model.generate(**inputs_back)
back_translated = trans_back_tokenizer.decode(
translated_back[0],
skip_special_tokens=True
)
perturbed_texts.append(back_translated)
return perturbed_texts
def evaluate_robustness(self, original_texts, labels, perturbation_fn,
perturbation_name):
"""
Evaluate model performance on perturbed data.
"""
perturbed_texts = perturbation_fn(original_texts)
# Get predictions for original and perturbed
orig_preds = self.model.predict(original_texts)
pert_preds = self.model.predict(perturbed_texts)
# Calculate metrics
orig_accuracy = np.mean(orig_preds == labels)
pert_accuracy = np.mean(pert_preds == labels)
# Consistency: predictions unchanged despite perturbation
consistency = np.mean(orig_preds == pert_preds)
print(f"\nRobustness Test: {perturbation_name}")
print(f"Original Accuracy: {orig_accuracy:.4f}")
print(f"Perturbed Accuracy: {pert_accuracy:.4f}")
print(f"Accuracy Drop: {orig_accuracy - pert_accuracy:.4f}")
print(f"Prediction Consistency: {consistency:.4f}")
return {
'original_accuracy': orig_accuracy,
'perturbed_accuracy': pert_accuracy,
'accuracy_drop': orig_accuracy - pert_accuracy,
'consistency': consistency
}
def comprehensive_robustness_suite(self, texts, labels):
"""Run all robustness tests."""
results = {}
tests = [
('Character Insertion', lambda t: self.character_level_perturbations(t, 0.1)),
('Character Deletion', lambda t: self.character_level_perturbations(t, 0.1)),
('Word Deletion', lambda t: self.word_level_perturbations(t, 0.1)),
('Word Insertion', lambda t: self.word_level_perturbations(t, 0.1)),
('Synonym Replacement', lambda t: self.synonym_replacement(t, 0.1)),
]
for test_name, pert_fn in tests:
results[test_name] = self.evaluate_robustness(
texts, labels, pert_fn, test_name
)
# Summary
print(f"\n{'='*60}")
print("ROBUSTNESS SUMMARY")
print(f"{'='*60}")
print(f"{'Test':<25} {'Acc Drop':<12} {'Consistency':<12}")
print("-" * 50)
for test_name, metrics in results.items():
print(f"{test_name:<25} {metrics['accuracy_drop']:.4f} {metrics['consistency']:.4f}")
return results
```
### Stress Testing
```python
class StressTester:
def __init__(self, model):
self.model = model
def length_stress_test(self, texts, labels):
"""Test performance across different input lengths."""
lengths = [len(text.split()) for text in texts]
# Bin by length
bins = [(0, 10), (10, 25), (25, 50), (50, 100), (100, float('inf'))]
results = {}
for min_len, max_len in bins:
mask = [(min_len <= l < max_len) for l in lengths]
bin_texts = [t for t, m in zip(texts, mask) if m]
bin_labels = [l for l, m in zip(labels, mask) if m]
if len(bin_labels) > 0:
preds = self.model.predict(bin_texts)
accuracy = np.mean(preds == bin_labels)
bin_name = f"{min_len}-{max_len if max_len != float('inf') else '∞'}"
results[bin_name] = {
'count': len(bin_labels),
'accuracy': accuracy
}
print(f"Length {bin_name}: {accuracy:.4f} (n={len(bin_labels)})")
return results
def rare_class_stress_test(self, texts, labels):
"""Test performance on rare/underrepresented classes."""
unique_classes, counts = np.unique(labels, return_counts=True)
results = {}
for cls, count in zip(unique_classes, counts):
mask = labels == cls
cls_texts = [t for t, m in zip(texts, mask) if m]
cls_labels = [l for l, m in zip(labels, mask) if m]
preds = self.model.predict(cls_texts)
accuracy = np.mean(preds == cls_labels)
frequency = 'rare' if count < len(labels) * 0.05 else 'common'
results[cls] = {
'count': count,
'frequency': frequency,
'accuracy': accuracy
}
print(f"Class {cls}: {accuracy:.4f} (n={count}, {frequency})")
return results
def edge_case_stress_test(self, edge_cases):
"""Test specific edge cases."""
results = {}
for case_name, (text, expected_label) in edge_cases.items():
pred = self.model.predict([text])[0]
correct = pred == expected_label
results[case_name] = {
'expected': expected_label,
'predicted': pred,
'correct': correct
}
status = "✓" if correct else "✗"
print(f"{status} {case_name}: Expected={expected_label}, Got={pred}")
return results
```
---
## Adversarial Testing
### TextFooler-Style Adversarial Attacks
```python
class AdversarialAttacker:
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
def calculate_importance_scores(self, text, label):
"""
Calculate importance score for each word by masking.
"""
words = text.split()
importance_scores = []
# Get original prediction probability
orig_probs = self.model.predict_proba([text])[0]
orig_prob = orig_probs[label]
for i, word in enumerate(words):
# Mask this word
masked_words = words[:i] + ['[MASK]'] + words[i+1:]
masked_text = ' '.join(masked_words)
# Get prediction with masked word
masked_probs = self.model.predict_proba([masked_text])[0]
masked_prob = masked_probs[label]
# Importance = drop in probability
importance = orig_prob - masked_prob
importance_scores.append(importance)
return importance_scores
def find_synonyms(self, word, top_k=10):
"""Find synonyms for a word."""
try:
from nltk.corpus import wordnet
except ImportError:
return [word]
synonyms = []
for synset in wordnet.synsets(word):
for lemma in synset.lemmas():
synonym = lemma.name().replace('_', ' ')
if synonym.lower() != word.lower() and synonym.isalpha():
synonyms.append(synonym)
return synonyms[:top_k]
def textfooler_attack(self, text, true_label, max_iterations=10):
"""
Implement TextFooler-style adversarial attack.
Strategy:
1. Identify important words
2. Replace with synonyms that change prediction
3. Ensure semantic similarity and grammaticality
"""
words = text.split()
current_text = text
attacked_words = set()
for iteration in range(max_iterations):
# Get current prediction
pred = self.model.predict([current_text])[0]
# Check if attack succeeded
if pred != true_label:
print(f"Attack succeeded at iteration {iteration + 1}")
return {
'success': True,
'adversarial_text': current_text,
'iterations': iteration + 1,
'attacked_words': attacked_words
}
# Calculate importance scores
importance_scores = self.calculate_importance_scores(
current_text, true_label
)
# Sort words by importance (descending)
sorted_indices = np.argsort(importance_scores)[::-1]
# Try to replace most important unattacked word
attacked = False
for idx in sorted_indices:
if idx in attacked_words:
continue
word = words[idx]
synonyms = self.find_synonyms(word)
# Try each synonym
for synonym in synonyms:
# Create candidate text
candidate_words = words.copy()
candidate_words[idx] = synonym
candidate_text = ' '.join(candidate_words)
# Check if prediction changes
new_pred = self.model.predict([candidate_text])[0]
if new_pred != true_label:
# Attack successful
current_text = candidate_text
words = candidate_words
attacked_words.add(idx)
attacked = True
break
# Also check if probability of true label decreases
orig_probs = self.model.predict_proba([current_text])[0]
new_probs = self.model.predict_proba([candidate_text])[0]
if new_probs[true_label] < orig_probs[true_label]:
# Accept if it reduces confidence
current_text = candidate_text
words = candidate_words
attacked_words.add(idx)
attacked = True
break
if attacked:
break
if not attacked:
print("No successful perturbation found")
break
return {
'success': False,
'adversarial_text': current_text,
'iterations': max_iterations,
'attacked_words': attacked_words
}
def generate_adversarial_dataset(self, texts, labels, attack_rate=0.3):
"""
Generate adversarial examples for a portion of the dataset.
"""
num_to_attack = int(len(texts) * attack_rate)
indices = np.random.choice(len(texts), num_to_attack, replace=False)
adversarial_examples = []
success_count = 0
for idx in indices:
text = texts[idx]
label = labels[idx]
result = self.textfooler_attack(text, label)
if result['success']:
success_count += 1
adversarial_examples.append({
'original': text,
'adversarial': result['adversarial_text'],
'label': label,
'iterations': result['iterations']
})
attack_success_rate = success_count / num_to_attack
print(f"\nAdversarial Attack Summary:")
print(f"Attempted: {num_to_attack}")
print(f"Successful: {success_count}")
print(f"Success Rate: {attack_success_rate:.4f}")
return adversarial_examples, attack_success_rate
```
### Adversarial Training
```python
class AdversarialTrainer:
def __init__(self, model, attacker, config):
self.model = model
self.attacker = attacker
self.config = config
def train_with_adversarial_examples(self, train_loader, adversarial_ratio=0.5):
"""
Train model augmented with adversarial examples.
Mix of clean and adversarial examples improves robustness.
"""
self.model.train()
for epoch in range(self.config.num_epochs):
total_loss = 0
for batch_idx, (texts, labels) in enumerate(train_loader):
# Get adversarial examples for this batch
adv_examples = []
adv_labels = []
for text, label in zip(texts, labels):
if random.random() < adversarial_ratio:
result = self.attacker.textfooler_attack(text, label, max_iterations=5)
if result['success']:
adv_examples.append(result['adversarial_text'])
adv_labels.append(label)
else:
adv_examples.append(text)
adv_labels.append(label)
else:
adv_examples.append(text)
adv_labels.append(label)
# Train on mixed batch
loss = self.model.train_step(adv_examples, adv_labels)
total_loss += loss
avg_loss = total_loss / len(train_loader)
print(f"Epoch {epoch + 1}: Loss = {avg_loss:.4f}")
return self.model
```
---
## Domain Shift Detection
### Distribution Comparison
```python
from scipy.spatial.distance import jensenshannon, wasserstein_distance
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
class DomainShiftDetector:
def __init__(self, source_data, target_data, model):
"""
Args:
source_data: Training distribution data
target_data: New/target distribution data
model: Trained model for extracting representations
"""
self.source_data = source_data
self.target_data = target_data
self.model = model
def extract_representations(self, texts):
"""Extract hidden representations from model."""
representations = []
for text in texts:
rep = self.model.get_hidden_representation(text)
representations.append(rep)
return np.array(representations)
def compare_label_distributions(self, source_labels, target_labels):
"""Compare label distribution between domains."""
# Get unique labels
all_labels = np.unique(np.concatenate([source_labels, target_labels]))
# Calculate distributions
source_dist = np.array([np.mean(source_labels == l) for l in all_labels])
target_dist = np.array([np.mean(target_labels == l) for l in all_labels])
# Jensen-Shannon divergence
js_div = jensenshannon(source_dist, target_dist)
# Total Variation Distance
tv_dist = 0.5 * np.sum(np.abs(source_dist - target_dist))
print(f"Label Distribution Comparison:")
print(f"Jensen-Shannon Divergence: {js_div:.4f}")
print(f"Total Variation Distance: {tv_dist:.4f}")
# Visualization
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(len(all_labels))
width = 0.35
ax.bar(x - width/2, source_dist, width, label='Source', alpha=0.8)
ax.bar(x + width/2, target_dist, width, label='Target', alpha=0.08)
ax.set_xlabel('Class')
ax.set_ylabel('Proportion')
ax.set_title('Label Distribution: Source vs Target')
ax.set_xticks(x)
ax.set_xticklabels(all_labels)
ax.legend()
plt.tight_layout()
plt.savefig('label_distribution_comparison.png')
plt.show()
return {
'js_divergence': js_div,
'tv_distance': tv_dist,
'source_distribution': source_dist,
'target_distribution': target_dist
}
def compare_feature_distributions(self):
"""Compare feature distributions using statistical tests."""
# Extract representations
print("Extracting source representations...")
source_reps = self.extract_representations(self.source_data)
print("Extracting target representations...")
target_reps = self.extract_representations(self.target_data)
# Per-feature comparison (Kolmogorov-Smirnov test)
n_features = source_reps.shape[1]
ks_statistics = []
ks_p_values = []
for i in range(min(n_features, 100)): # Sample features for efficiency
stat, p_val = stats.ks_2samp(source_reps[:, i], target_reps[:, i])
ks_statistics.append(stat)
ks_p_values.append(p_val)
# Summary statistics
mean_ks_stat = np.mean(ks_statistics)
frac_significant = np.mean([p < 0.05 for p in ks_p_values])
print(f"\nFeature Distribution Comparison:")
print(f"Mean KS Statistic: {mean_ks_stat:.4f}")
print(f"Fraction of Significant Features (p<0.05): {frac_significant:.4f}")
# Wasserstein distance on aggregated representations
source_means = np.mean(source_reps, axis=0)
target_means = np.mean(target_reps, axis=0)
wasserstein_dist = wasserstein_distance(source_means, target_means)
print(f"Wasserstein Distance (means): {wasserstein_dist:.4f}")
return {
'mean_ks_statistic': mean_ks_stat,
'frac_significant': frac_significant,
'wasserstein_distance': wasserstein_dist
}
def visualize_domain_shift(self):
"""Visualize domain shift using t-SNE."""
# Combine data
all_data = np.concatenate([self.source_data, self.target_data])
all_labels = ['Source'] * len(self.source_data) + \
['Target'] * len(self.target_data)
# Extract representations
reps = self.extract_representations(all_data)
# t-SNE visualization
tsne = TSNE(n_components=2, random_state=42, perplexity=30)
reps_2d = tsne.fit_transform(reps)
# Plot
fig, ax = plt.subplots(figsize=(12, 10))
source_mask = np.array(all_labels) == 'Source'
target_mask = np.array(all_labels) == 'Target'
ax.scatter(reps_2d[source_mask, 0], reps_2d[source_mask, 1],
alpha=0.5, label='Source', s=10)
ax.scatter(reps_2d[target_mask, 0], reps_2d[target_mask, 1],
alpha=0.5, label='Target', s=10)
ax.set_xlabel('t-SNE Dimension 1')
ax.set_ylabel('t-SNE Dimension 2')
ax.set_title('Domain Shift Visualization: Source vs Target')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('domain_shift_tsne.png')
plt.show()
def detect_covariate_shift(self):
"""Detect covariate shift using KLIEP-like method."""
# Simplified covariate shift detection
source_reps = self.extract_representations(self.source_data)
target_reps = self.extract_representations(self.target_data)
# Train classifier to distinguish source from target
X = np.concatenate([source_reps, target_reps])
y = np.concatenate([
np.zeros(len(source_reps)),
np.ones(len(target_reps))
])
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=42)
clf.fit(X, y)
# If classifier can easily distinguish, there's significant shift
accuracy = clf.score(X, y)
print(f"Covariate Shift Detection:")
print(f"Source/Target Classifier Accuracy: {accuracy:.4f}")
if accuracy > 0.7:
print("WARNING: Significant covariate shift detected!")
elif accuracy > 0.55:
print("MODERATE: Some covariate shift present")
else:
print("LOW: Minimal covariate shift")
return {
'classifier_accuracy': accuracy,
'shift_severity': 'high' if accuracy > 0.7 else
'moderate' if accuracy > 0.55 else 'low'
}
```
---
## Calibration and Confidence Estimation
```python
from sklearn.calibration import calibration_curve
import matplotlib.pyplot as plt
class CalibrationAnalyzer:
def __init__(self, model):
self.model = model
def get_predicted_probabilities(self, texts):
"""Get predicted probabilities from model."""
return self.model.predict_proba(texts)
def plot_calibration_curve(self, texts, labels, n_bins=10):
"""
Plot reliability diagram (calibration curve).
"""
probs = self.get_predicted_probabilities(texts)
# For binary classification
if probs.shape[1] == 2:
prob_positive = probs[:, 1]
else:
# Use max probability for multiclass
prob_positive = np.max(probs, axis=1)
# Calculate calibration curve
fraction_of_positives, mean_predicted_value = calibration_curve(
labels, prob_positive, n_bins=n_bins
)
# Plot
fig, ax = plt.subplots(figsize=(8, 8))
ax.plot(mean_predicted_value, fraction_of_positives, 's-',
label='Model', markersize=10)
ax.plot([0, 1], [0, 1], 'k--', label='Perfectly calibrated')
ax.set_xlabel('Mean Predicted Probability')
ax.set_ylabel('Fraction of Positives')
ax.set_title('Calibration Curve (Reliability Diagram)')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('calibration_curve.png')
plt.show()
# Calculate Expected Calibration Error (ECE)
ece = self.calculate_ece(labels, prob_positive, n_bins)
return {
'fraction_of_positives': fraction_of_positives,
'mean_predicted_value': mean_predicted_value,
'ece': ece
}
def calculate_ece(self, labels, probabilities, n_bins=10):
"""
Calculate Expected Calibration Error.
ECE measures the weighted average difference between predicted
confidence and actual accuracy across bins.
"""
bin_boundaries = np.linspace(0, 1, n_bins + 1)
ece = 0.0
for i in range(n_bins):
# Find samples in this bin
in_bin = (probabilities > bin_boundaries[i]) & \
(probabilities <= bin_boundaries[i + 1])
prop_in_bin = np.mean(in_bin)
if prop_in_bin > 0:
# Average confidence in bin
avg_confidence = np.mean(probabilities[in_bin])
# Actual accuracy in bin
avg_accuracy = np.mean(labels[in_bin] ==
(probabilities[in_bin] > 0.5).astype(int))
# Weighted difference
ece += np.abs(avg_accuracy - avg_confidence) * prop_in_bin
print(f"Expected Calibration Error (ECE): {ece:.4f}")
return ece
def temperature_scaling(self, val_texts, val_labels):
"""
Apply temperature scaling to improve calibration.
Find optimal temperature T that minimizes NLL on validation set.
"""
from scipy.optimize import minimize_scalar
logits = self.model.get_logits(val_texts)
def nll_loss(T):
scaled_logits = logits / T
probs = softmax(scaled_logits, axis=1)
# Negative log likelihood
nll = -np.mean(np.log(probs[np.arange(len(val_labels)), val_labels]))
return nll
# Find optimal temperature
result = minimize_scalar(nll_loss, bounds=(0.1, 10.0), method='bounded')
optimal_T = result.x
print(f"Optimal Temperature: {optimal_T:.4f}")
print(f"NLL before scaling: {nll_loss(1.0):.4f}")
print(f"NLL after scaling: {nll_loss(optimal_T):.4f}")
return optimal_T
def apply_temperature(self, texts, temperature):
"""Apply temperature scaling to predictions."""
logits = self.model.get_logits(texts)
scaled_logits = logits / temperature
calibrated_probs = softmax(scaled_logits, axis=1)
return calibrated_probs
```
---
## A/B Testing Framework
```python
import pandas as pd
from datetime import datetime, timedelta
class ABTestingFramework:
def __init__(self, experiment_name):
self.experiment_name = experiment_name
self.results = []
def design_experiment(self, control_model, treatment_model,
sample_size, duration_days, metrics):
"""
Design A/B test with proper power analysis.
"""
from statsmodels.stats.power import zt_ind_solve_power
# Power analysis
effect_size = 0.1 # Minimum detectable effect (10%)
alpha = 0.05
power = 0.8
required_n = zt_ind_solve_power(
effect_size=effect_size,
alpha=alpha,
power=power,
ratio=1 # Equal split
)
required_n = int(np.ceil(required_n))
print(f"A/B Test Design: {self.experiment_name}")
print(f"Required samples per variant: {required_n}")
print(f"Total required: {required_n * 2}")
print(f"Planned sample size: {sample_size}")
print(f"Duration: {duration_days} days")
if sample_size < required_n:
print(f"WARNING: Planned sample size may be underpowered!")
return {
'required_per_variant': required_n,
'planned_per_variant': sample_size // 2,
'effect_size': effect_size,
'alpha': alpha,
'power': power
}
def assign_users(self, user_ids, assignment_ratio=0.5):
"""
Randomly assign users to control or treatment.
"""
np.random.seed(42) # Reproducible assignment
assignments = np.random.rand(len(user_ids)) < assignment_ratio
user_assignments = {
uid: 'treatment' if assign else 'control'
for uid, assign in zip(user_ids, assignments)
}
# Verify balance
n_control = sum(1 for v in user_assignments.values() if v == 'control')
n_treatment = len(user_ids) - n_control
print(f"User Assignment:")
print(f"Control: {n_control} ({n_control/len(user_ids)*100:.1f}%)")
print(f"Treatment: {n_treatment} ({n_treatment/len(user_ids)*100:.1f}%)")
return user_assignments
def collect_metrics(self, interactions, user_assignments):
"""
Collect and aggregate metrics from user interactions.
"""
# Add assignment to interactions
df = pd.DataFrame(interactions)
df['variant'] = df['user_id'].map(user_assignments)
# Aggregate by variant
results = {}
for variant in ['control', 'treatment']:
variant_data = df[df['variant'] == variant]
variant_results = {
'n_users': variant_data['user_id'].nunique(),
'n_interactions': len(variant_data),
}
# Calculate each metric
for metric in ['accuracy', 'latency', 'user_satisfaction']:
if metric in variant_data.columns:
variant_results[f'{metric}_mean'] = variant_data[metric].mean()
variant_results[f'{metric}_std'] = variant_data[metric].std()
results[variant] = variant_results
return results
def analyze_results(self, control_data, treatment_data, metric_name):
"""
Analyze A/B test results with statistical testing.
"""
control_values = np.array(control_data)
treatment_values = np.array(treatment_data)
# Difference in means
diff = np.mean(treatment_values) - np.mean(control_values)
# Two-sample t-test
t_stat, p_value = stats.ttest_ind(treatment_values, control_values)
# Confidence interval for difference
pooled_se = np.sqrt(np.var(control_values)/len(control_values) +
np.var(treatment_values)/len(treatment_values))
ci_lower = diff - 1.96 * pooled_se
ci_upper = diff + 1.96 * pooled_se
# Effect size (Cohen's d)
pooled_std = np.sqrt((np.var(control_values) + np.var(treatment_values)) / 2)
cohens_d = diff / pooled_std
print(f"\nA/B Test Analysis: {metric_name}")
print(f"Control Mean: {np.mean(control_values):.4f}")
print(f"Treatment Mean: {np.mean(treatment_values):.4f}")
print(f"Difference: {diff:.4f}")
print(f"95% CI: [{ci_lower:.4f}, {ci_upper:.4f}]")
print(f"T-statistic: {t_stat:.4f}")
print(f"P-value: {p_value:.6f}")
print(f"Cohen's d: {cohens_d:.4f}")
# Interpretation
if p_value < 0.05:
direction = "better" if diff > 0 else "worse"
print(f"Result: Treatment is statistically significantly {direction}")
if abs(cohens_d) < 0.2:
print("Effect size: Small")
elif abs(cohens_d) < 0.5:
print("Effect size: Medium")
else:
print("Effect size: Large")
else:
print("Result: No statistically significant difference")
return {
'difference': diff,
'p_value': p_value,
'ci_lower': ci_lower,
'ci_upper': ci_upper,
'cohens_d': cohens_d,
'significant': p_value < 0.05
}
def sequential_testing(self, daily_results, stopping_rule='pocket'):
"""
Sequential A/B testing with early stopping.
Allows monitoring and early stopping if results are clear.
"""
cumulative_control = []
cumulative_treatment = []
decisions = []
for day, day_data in enumerate(daily_results):
cumulative_control.extend(day_data['control'])
cumulative_treatment.extend(day_data['treatment'])
# Daily analysis
if len(cumulative_control) > 100 and len(cumulative_treatment) > 100:
result = self.analyze_results(
cumulative_control,
cumulative_treatment,
'primary_metric'
)
# Stopping rule
if result['p_value'] < 0.01: # Strong evidence
decision = 'STOP_EARLY'
elif day >= len(daily_results) - 1:
decision = 'CONCLUDE'
else:
decision = 'CONTINUE'
decisions.append({
'day': day,
'decision': decision,
'p_value': result['p_value']
})
print(f"Day {day}: p={result['p_value']:.6f}, Decision: {decision}")
if decision == 'STOP_EARLY':
print("Early stopping triggered!")
break
return decisions
```
---
## Regression Testing for Models
```python
import json
import hashlib
class ModelRegressionTester:
def __init__(self, baseline_model_path, test_suite_path):
"""
Initialize regression testing framework.
"""
self.baseline_model = self.load_model(baseline_model_path)
self.test_suite = self.load_test_suite(test_suite_path)
self.baseline_results = self.run_baseline()
def load_test_suite(self, path):
"""Load or create test suite."""
try:
with open(path, 'r') as f:
test_suite = json.load(f)
except FileNotFoundError:
# Create default test suite
test_suite = {
'functional_tests': [],
'edge_cases': [],
'performance_tests': [],
'bias_tests': []
}
return test_suite
def add_functional_test(self, name, input_text, expected_output,
tolerance=0.0):
"""Add a functional test case."""
test_case = {
'name': name,
'input': input_text,
'expected': expected_output,
'tolerance': tolerance,
'type': 'functional'
}
self.test_suite['functional_tests'].append(test_case)
self.save_test_suite()
def add_edge_case(self, name, input_text, expected_behavior):
"""Add an edge case test."""
test_case = {
'name': name,
'input': input_text,
'expected_behavior': expected_behavior,
'type': 'edge_case'
}
self.test_suite['edge_cases'].append(test_case)
self.save_test_suite()
def save_test_suite(self):
"""Save test suite to file."""
with open('model_test_suite.json', 'w') as f:
json.dump(self.test_suite, f, indent=2)
def run_baseline(self):
"""Run test suite on baseline model."""
results = {}
# Functional tests
functional_results = []
for test in self.test_suite['functional_tests']:
prediction = self.baseline_model.predict([test['input']])[0]
expected = test['expected']
# Check if within tolerance
if isinstance(expected, (int, float)):
passed = abs(prediction - expected) <= test['tolerance']
else:
passed = prediction == expected
functional_results.append({
'name': test['name'],
'passed': passed,
'prediction': prediction,
'expected': expected
})
results['functional'] = functional_results
# Edge cases
edge_results = []
for test in self.test_suite['edge_cases']:
try:
prediction = self.baseline_model.predict([test['input']])[0]
behavior = self.check_expected_behavior(prediction, test['expected_behavior'])
edge_results.append({
'name': test['name'],
'passed': behavior,
'prediction': prediction
})
except Exception as e:
edge_results.append({
'name': test['name'],
'passed': False,
'error': str(e)
})
results['edge_cases'] = edge_results
# Save baseline results
with open('baseline_regression_results.json', 'w') as f:
json.dump(results, f, indent=2)
return results
def check_expected_behavior(self, prediction, expected_behavior):
"""Check if prediction matches expected behavior."""
if expected_behavior == 'high_confidence':
return prediction['confidence'] > 0.9
elif expected_behavior == 'uncertain':
return prediction['confidence'] < 0.6
elif expected_behavior == 'specific_class':
return prediction['class'] == expected_behavior['class']
else:
return False
def run_regression_test(self, new_model_path):
"""
Run regression tests on new model and compare to baseline.
"""
new_model = self.load_model(new_model_path)
regressions = []
improvements = []
# Compare functional tests
for baseline_result in self.baseline_results['functional']:
test_name = baseline_result['name']
# Find corresponding test
test = next(t for t in self.test_suite['functional_tests']
if t['name'] == test_name)
# Run on new model
new_prediction = new_model.predict([test['input']])[0]
expected = test['expected']
if isinstance(expected, (int, float)):
new_passed = abs(new_prediction - expected) <= test['tolerance']
else:
new_passed = new_prediction == expected
# Compare
if baseline_result['passed'] and not new_passed:
regressions.append({
'test': test_name,
'type': 'functional',
'baseline': baseline_result['prediction'],
'new': new_prediction,
'expected': expected
})
elif not baseline_result['passed'] and new_passed:
improvements.append({
'test': test_name,
'type': 'functional',
'baseline': baseline_result['prediction'],
'new': new_prediction
})
# Report
print(f"\n{'='*60}")
print("REGRESSION TEST RESULTS")
print(f"{'='*60}")
print(f"Total Tests: {len(self.baseline_results['functional'])}")
print(f"Regressions: {len(regressions)}")
print(f"Improvements: {len(improvements)}")
if regressions:
print(f"\n⚠️ REGRESSIONS DETECTED:")
for reg in regressions:
print(f" - {reg['test']}: {reg['baseline']} → {reg['new']}")
print(f" Expected: {reg['expected']}")
if improvements:
print(f"\n✅ IMPROVEMENTS:")
for imp in improvements:
print(f" - {imp['test']}: {imp['baseline']} → {imp['new']}")
return {
'regressions': regressions,
'improvements': improvements,
'passed': len(regressions) == 0
}
def generate_regression_report(self, results):
"""Generate detailed regression report."""
report = f"""
# Model Regression Test Report
## Summary
- **Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **Total Tests**: {len(self.baseline_results['functional']) + len(self.baseline_results['edge_cases'])}
- **Regressions**: {len(results['regressions'])}
- **Improvements**: {len(results['improvements'])}
- **Status**: {'✅ PASS' if results['passed'] else '❌ FAIL'}
## Regressions
"""
for reg in results['regressions']:
report += f"""
### {reg['test']}
- Type: {reg['type']}
- Baseline: {reg['baseline']}
- New: {reg['new']}
- Expected: {reg['expected']}
"""
return report
```
---
## Quality Gates and Release Criteria
```python
class QualityGateChecker:
def __init__(self, release_config):
"""
Initialize quality gate checker with release criteria.
release_config example:
{
'min_accuracy': 0.85,
'max_fairness_disparity': 0.1,
'min_robustness_consistency': 0.9,
'max_calibration_ece': 0.05,
'no_regressions': True,
'min_test_coverage': 0.95
}
"""
self.config = release_config
def check_all_gates(self, model_metrics):
"""
Check all quality gates for release readiness.
"""
gate_results = {}
all_passed = True
# Accuracy gate
if 'min_accuracy' in self.config:
passed = model_metrics['accuracy'] >= self.config['min_accuracy']
gate_results['accuracy'] = {
'passed': passed,
'value': model_metrics['accuracy'],
'threshold': self.config['min_accuracy']
}
if not passed:
all_passed = False
# Fairness gate
if 'max_fairness_disparity' in self.config:
passed = model_metrics['fairness_disparity'] <= self.config['max_fairness_disparity']
gate_results['fairness'] = {
'passed': passed,
'value': model_metrics['fairness_disparity'],
'threshold': self.config['max_fairness_disparity']
}
if not passed:
all_passed = False
# Robustness gate
if 'min_robustness_consistency' in self.config:
passed = model_metrics['robustness_consistency'] >= self.config['min_robustness_consistency']
gate_results['robustness'] = {
'passed': passed,
'value': model_metrics['robustness_consistency'],
'threshold': self.config['min_robustness_consistency']
}
if not passed:
all_passed = False
# Calibration gate
if 'max_calibration_ece' in self.config:
passed = model_metrics['calibration_ece'] <= self.config['max_calibration_ece']
gate_results['calibration'] = {
'passed': passed,
'value': model_metrics['calibration_ece'],
'threshold': self.config['max_calibration_ece']
}
if not passed:
all_passed = False
# Regression gate
if 'no_regressions' in self.config and self.config['no_regressions']:
passed = model_metrics.get('regressions', 0) == 0
gate_results['regressions'] = {
'passed': passed,
'value': model_metrics.get('regressions', 0),
'threshold': 0
}
if not passed:
all_passed = False
# Print report
print(f"\n{'='*60}")
print("QUALITY GATE CHECK")
print(f"{'='*60}")
for gate, result in gate_results.items():
status = "✅ PASS" if result['passed'] else "❌ FAIL"
print(f"{gate.upper():<20} {status}")
print(f" Value: {result['value']:.4f}, Threshold: {result['threshold']:.4f}")
print(f"\n{'='*60}")
overall_status = "✅ READY FOR RELEASE" if all_passed else "❌ NOT READY FOR RELEASE"
print(f"OVERALL: {overall_status}")
print(f"{'='*60}")
return {
'all_passed': all_passed,
'gate_results': gate_results
}
def generate_release_report(self, model_info, metrics, gate_results):
"""Generate comprehensive release report."""
report = f"""
# Model Release Report
## Model Information
- **Name**: {model_info['name']}
- **Version**: {model_info['version']}
- **Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **Training Data**: {model_info['training_data']}
- **Architecture**: {model_info['architecture']}
## Performance Metrics
- **Accuracy**: {metrics['accuracy']:.4f}
- **Precision**: {metrics['precision']:.4f}
- **Recall**: {metrics['recall']:.4f}
- **F1 Score**: {metrics['f1']:.4f}
- **AUC-ROC**: {metrics['auc_roc']:.4f}
## Quality Attributes
- **Fairness Disparity**: {metrics['fairness_disparity']:.4f}
- **Robustness Consistency**: {metrics['robustness_consistency']:.4f}
- **Calibration ECE**: {metrics['calibration_ece']:.4f}
- **Regressions**: {metrics.get('regressions', 0)}
## Quality Gate Results
"""
for gate, result in gate_results['gate_results'].items():
status = "✅ PASS" if result['passed'] else "❌ FAIL"
report += f"- {gate.upper()}: {status}\n"
report += f"""
## Release Decision
{'✅ APPROVED FOR RELEASE' if gate_results['all_passed'] else '❌ RELEASE BLOCKED'}
## Notes
{model_info.get('notes', 'No additional notes')}
"""
return report
```
---
## Complete Validation Pipeline Example
```python
def complete_validation_pipeline(model, train_data, val_data, test_data,
sensitive_attributes=None):
"""
Run complete validation pipeline before model release.
"""
print("="*70)
print("COMPLETE MODEL VALIDATION PIPELINE")
print("="*70)
results = {}
# 1. Basic Performance Evaluation
print("\n[1/8] Basic Performance Evaluation")
test_preds = model.predict(test_data['texts'])
test_metrics = calculate_metrics(test_preds, test_data['labels'])
results['basic_metrics'] = test_metrics
# 2. Cross-Validation
print("\n[2/8] Cross-Validation")
cv_results, _ = k_fold_cross_validation(
type(model), model.config,
train_data['texts'], train_data['labels'],
k=5
)
results['cross_validation'] = cv_results
# 3. Robustness Testing
print("\n[3/8] Robustness Testing")
tester = RobustnessTester(model, model.tokenizer)
robustness_results = tester.comprehensive_robustness_suite(
test_data['texts'], test_data['labels']
)
results['robustness'] = robustness_results
# 4. Fairness Audit (if sensitive attributes provided)
if sensitive_attributes:
print("\n[4/8] Fairness Audit")
auditor = FairnessAuditor(test_preds, test_data['labels'], sensitive_attributes)
fairness_report = auditor.generate_fairness_report()
results['fairness'] = fairness_report
else:
print("\n[4/8] Fairness Audit: SKIPPED (no sensitive attributes)")
# 5. Calibration Analysis
print("\n[5/8] Calibration Analysis")
calibrator = CalibrationAnalyzer(model)
calibration_results = calibrator.plot_calibration_curve(
test_data['texts'], test_data['labels']
)
results['calibration'] = calibration_results
# 6. Domain Shift Detection (if target data available)
# Skip for now
# 7. Statistical Significance (if baseline available)
# Skip for now
# 8. Quality Gate Check
print("\n[8/8] Quality Gate Check")
# Prepare metrics for quality gates
gate_metrics = {
'accuracy': test_metrics['accuracy'],
'fairness_disparity': max(
[v['max_disparity'] for k, v in results.get('fairness', {}).items()]
) if 'fairness' in results else 0.0,
'robustness_consistency': np.mean([
v['consistency'] for v in robustness_results.values()
]),
'calibration_ece': calibration_results['ece'],
'regressions': 0 # Would check against baseline
}
release_config = {
'min_accuracy': 0.80,
'max_fairness_disparity': 0.15,
'min_robustness_consistency': 0.85,
'max_calibration_ece': 0.10,
'no_regressions': True
}
gate_checker = QualityGateChecker(release_config)
gate_results = gate_checker.check_all_gates(gate_metrics)
results['quality_gates'] = gate_results
# Final Summary
print("\n" + "="*70)
print("VALIDATION SUMMARY")
print("="*70)
print(f"Basic Accuracy: {test_metrics['accuracy']:.4f}")
print(f"CV Accuracy: {cv_results['accuracy']['mean']:.4f} ± {cv_results['accuracy']['std']:.4f}")
print(f"Avg Robustness: {gate_metrics['robustness_consistency']:.4f}")
if 'fairness' in results:
print(f"Max Fairness Disparity: {gate_metrics['fairness_disparity']:.4f}")
print(f"Calibration ECE: {gate_metrics['calibration_ece']:.4f}")
print(f"\nRelease Status: {'✅ APPROVED' if gate_results['all_passed'] else '❌ BLOCKED'}")
return results
# Usage
# validation_results = complete_validation_pipeline(
# model, train_data, val_data, test_data,
# sensitive_attributes={'gender': gender_array, 'age': age_array}
# )
```
---
## Best Practices Checklist
### Pre-Release Validation Checklist
- [ ] **Data Splits**: Proper train/val/test separation with no leakage
- [ ] **Cross-Validation**: K-fold CV completed with consistent results
- [ ] **Statistical Power**: Test set size sufficient for desired confidence
- [ ] **Performance Metrics**: All primary metrics meet thresholds
- [ ] **Fairness Audit**: No significant bias across protected groups
- [ ] **Robustness Testing**: Model stable under perturbations
- [ ] **Calibration**: Predictions well-calibrated (ECE < threshold)
- [ ] **Edge Cases**: Critical edge cases handled correctly
- [ ] **Regression Tests**: No regressions from baseline
- [ ] **Documentation**: All validation results documented
### Continuous Validation
- [ ] **Automated Testing**: Validation suite runs on every commit
- [ ] **Monitoring**: Production performance tracked continuously
- [ ] **Drift Detection**: Data drift monitored and alerted
- [ ] **Periodic Re-evaluation**: Full validation quarterly
- [ ] **Incident Response**: Process for handling validation failures
---
## Next Steps
In the next tutorial, we'll cover:
- **Continual Learning**: Strategies for updating models with new data
- **Catastrophic Forgetting Prevention**: Techniques to retain old knowledge
- **Incremental Training**: Efficient updates without full retraining
- **Version Management**: Model versioning and rollback strategies
- **Production Deployment**: Serving, scaling, and monitoring