| # Tutorial 07: Model Validation, Testing & Quality Assurance |
|
|
| ## Overview |
|
|
| This tutorial covers comprehensive model validation, testing strategies, and quality assurance processes essential for production-ready AI systems. We'll explore NTF's unified metrics utilities, statistical validation methods, bias detection, robustness testing, and systematic evaluation frameworks. |
|
|
| ## Table of Contents |
|
|
| 1. [Validation Fundamentals](#validation-fundamentals) |
| 2. [NTF Metrics Utilities](#ntf-metrics-utilities) |
| 3. [Train/Validation/Test Splits](#trainvalidationtest-splits) |
| 4. [Cross-Validation Techniques](#cross-validation-techniques) |
| 5. [Statistical Significance Testing](#statistical-significance-testing) |
| 6. [Bias and Fairness Detection](#bias-and-fairness-detection) |
| 7. [Robustness Testing](#robustness-testing) |
| 8. [Adversarial Testing](#adversarial-testing) |
| 9. [Domain Shift Detection](#domain-shift-detection) |
| 10. [Calibration and Confidence Estimation](#calibration-and-confidence-estimation) |
| 11. [A/B Testing Framework](#ab-testing-framework) |
| 12. [Regression Testing for Models](#regression-testing-for-models) |
| 13. [Quality Gates and Release Criteria](#quality-gates-and-release-criteria) |
|
|
| --- |
|
|
| ## Validation Fundamentals |
|
|
| ### Why Validation Matters |
|
|
| Validation ensures your model: |
| - Generalizes to unseen data |
| - Doesn't overfit training distributions |
| - Meets performance requirements |
| - Behaves safely across edge cases |
| - Maintains consistency across versions |
|
|
| ### Validation Pyramid |
|
|
| ``` |
| Production Monitoring |
| /\ |
| / \ |
| / \ |
| /------\ |
| / A/B \ |
| / Testing \ |
| /------------\ |
| / Holdout \ |
| / Testing \ |
| /------------------\ |
| / Cross-Validation \ |
| /----------------------\ |
| / Train/Val Split \ |
| /--------------------------\ |
| ``` |
|
|
| **Key Principles:** |
| 1. **Data Isolation**: Never leak test data into training |
| 2. **Distribution Matching**: Test data should match production distribution |
| 3. **Statistical Power**: Ensure sufficient sample sizes |
| 4. **Multiple Metrics**: Evaluate across diverse dimensions |
| 5. **Reproducibility**: Fixed seeds and documented procedures |
|
|
| --- |
|
|
| ## NTF Metrics Utilities |
|
|
| ### Using NTF's Unified Evaluation Interface |
|
|
| NTF provides comprehensive metrics utilities through `ntf.utils.metrics`. This replaces manual metric implementations with a unified, efficient interface. |
|
|
| ```python |
| from ntf.utils.metrics import ( |
| compute_perplexity, |
| compute_accuracy, |
| evaluate_model, |
| compare_models, |
| benchmark_throughput, |
| EvaluationResults |
| ) |
| from torch.utils.data import DataLoader |
| import torch |
| |
| # Load your model and tokenizer |
| model, tokenizer = load_model_and_tokenizer("path/to/model") |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model.to(device) |
| |
| # Prepare test dataloader |
| test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False) |
| |
| # Comprehensive evaluation |
| results = evaluate_model( |
| model=model, |
| dataloader=test_dataloader, |
| device=device, |
| compute_generation_metrics=True, |
| tokenizer=tokenizer |
| ) |
| |
| print(f"Perplexity: {results.perplexity:.2f}") |
| print(f"Loss: {results.loss:.4f}") |
| print(f"Token Accuracy: {results.token_accuracy:.4f}") |
| print(f"BLEU Score: {results.bleu_score:.4f}") |
| print(f"ROUGE-L: {results.rouge_l:.4f}") |
| ``` |
|
|
| ### Individual Metric Functions |
|
|
| For specific metric computation, NTF provides standalone functions: |
|
|
| ```python |
| # Compute perplexity only |
| perplexity = compute_perplexity(model, test_dataloader, device) |
| print(f"Perplexity: {perplexity:.2f}") |
| |
| # Compute accuracy only |
| accuracy = compute_accuracy(model, test_dataloader, device) |
| print(f"Accuracy: {accuracy:.4f}") |
| ``` |
|
|
| ### Comparing Multiple Checkpoints |
|
|
| NTF makes it easy to compare different model checkpoints: |
|
|
| ```python |
| from ntf.utils.metrics import compare_models |
| |
| # Compare two model versions |
| comparison = compare_models( |
| model_a=model_v1, |
| model_b=model_v2, |
| dataloader=val_dataloader, |
| device=device |
| ) |
| |
| print(f"Model A Perplexity: {comparison['model_a']['perplexity']:.2f}") |
| print(f"Model B Perplexity: {comparison['model_b']['perplexity']:.2f}") |
| print(f"Improvement: {comparison['improvement']['perplexity']:.2f}%") |
| print(f"Accuracy Gain: {comparison['improvement']['accuracy']:.2f}%") |
| ``` |
|
|
| ### Benchmarking Throughput |
|
|
| For production deployment, benchmark model throughput: |
|
|
| ```python |
| throughput_results = benchmark_throughput( |
| model=model, |
| tokenizer=tokenizer, |
| device=device, |
| sequence_length=512, |
| batch_size=1, |
| num_iterations=10 |
| ) |
| |
| print(f"Prefill Throughput: {throughput_results['prefill_throughput']:.2f} tokens/sec") |
| print(f"Decode Throughput: {throughput_results['decode_throughput']:.2f} tokens/sec") |
| ``` |
|
|
| ### Metric Selection Guide |
|
|
| Different tasks require different evaluation metrics. Use this guide to select appropriate metrics: |
|
|
| | Task Type | Recommended Metrics | NTF Functions | |
| |-----------|---------------------|---------------| |
| | Text Generation | Perplexity, BLEU, ROUGE, BERTScore | `evaluate_model(compute_generation_metrics=True)` | |
| | Classification | Accuracy, F1, Precision, Recall | `compute_accuracy()` + custom F1 | |
| | Summarization | ROUGE, BERTScore | `evaluate_model()` with ROUGE | |
| | Translation | BLEU, chrF, COMET | `evaluate_model()` with BLEU | |
| | Question Answering | Exact Match, F1 | Custom implementation | |
| | Language Modeling | Perplexity | `compute_perplexity()` | |
|
|
| ### Checkpoint Comparison Workflow |
|
|
| Here's a complete workflow for comparing multiple checkpoints during development: |
|
|
| ```python |
| from ntf.utils.metrics import evaluate_model |
| from pathlib import Path |
| import json |
| |
| def compare_checkpoints(checkpoint_paths, eval_dataset, tokenizer, device): |
| """Compare multiple checkpoints on the same evaluation dataset.""" |
| |
| from torch.utils.data import DataLoader |
| eval_dataloader = DataLoader(eval_dataset, batch_size=32, shuffle=False) |
| |
| results = {} |
| |
| for checkpoint_path in checkpoint_paths: |
| print(f"\nEvaluating {checkpoint_path}...") |
| |
| # Load checkpoint |
| model, _ = load_model_and_tokenizer(checkpoint_path) |
| model.to(device) |
| model.eval() |
| |
| # Evaluate |
| eval_results = evaluate_model( |
| model=model, |
| dataloader=eval_dataloader, |
| device=device, |
| compute_generation_metrics=True, |
| tokenizer=tokenizer |
| ) |
| |
| # Store results |
| results[checkpoint_path] = { |
| 'perplexity': eval_results.perplexity, |
| 'loss': eval_results.loss, |
| 'accuracy': eval_results.accuracy, |
| 'bleu': eval_results.bleu_score, |
| 'rouge_l': eval_results.rouge_l |
| } |
| |
| print(f" Perplexity: {eval_results.perplexity:.2f}") |
| print(f" Accuracy: {eval_results.accuracy:.4f}") |
| |
| # Find best checkpoint |
| best_checkpoint = min(results.keys(), key=lambda k: results[k]['perplexity']) |
| print(f"\nBest checkpoint (lowest perplexity): {best_checkpoint}") |
| |
| return results |
| |
| # Usage |
| checkpoints = [ |
| "./checkpoints/step_1000", |
| "./checkpoints/step_2000", |
| "./checkpoints/step_3000", |
| "./checkpoints/final" |
| ] |
| |
| all_results = compare_checkpoints( |
| checkpoints, |
| val_dataset, |
| tokenizer, |
| device |
| ) |
| |
| # Save results for tracking |
| with open("./evaluation_results.json", "w") as f: |
| json.dump(all_results, f, indent=2) |
| ``` |
|
|
| --- |
|
|
| ## Train/Validation/Test Splits |
|
|
| ### Strategic Data Partitioning |
|
|
| ```python |
| from sklearn.model_selection import train_test_split |
| import numpy as np |
| |
| def strategic_data_split(data, labels, strategy='stratified'): |
| """ |
| Create train/val/test splits with proper stratification. |
| |
| Args: |
| data: Input samples |
| labels: Corresponding labels |
| strategy: 'stratified', 'temporal', 'grouped' |
| |
| Returns: |
| train, val, test splits |
| """ |
| if strategy == 'stratified': |
| # First split: train+val vs test (80/20) |
| X_train_val, X_test, y_train_val, y_test = train_test_split( |
| data, labels, |
| test_size=0.2, |
| stratify=labels, # Maintain class distribution |
| random_state=42 |
| ) |
| |
| # Second split: train vs val (80/20 of remaining = 64/16 total) |
| X_train, X_val, y_train, y_val = train_test_split( |
| X_train_val, y_train_val, |
| test_size=0.2, |
| stratify=y_train_val, |
| random_state=42 |
| ) |
| |
| elif strategy == 'temporal': |
| # Time-based split for temporal data |
| split_point_1 = int(len(data) * 0.6) |
| split_point_2 = int(len(data) * 0.8) |
| |
| X_train = data[:split_point_1] |
| X_val = data[split_point_1:split_point_2] |
| X_test = data[split_point_2:] |
| |
| y_train = labels[:split_point_1] |
| y_val = labels[split_point_1:split_point_2] |
| y_test = labels[split_point_2:] |
| |
| elif strategy == 'grouped': |
| # Group-based split (e.g., by user, document, session) |
| from sklearn.model_selection import GroupShuffleSplit |
| |
| groups = get_groups(data) # Your grouping logic |
| |
| gss = GroupShuffleSplit( |
| n_splits=1, |
| test_size=0.2, |
| random_state=42 |
| ) |
| train_val_idx, test_idx = next(gss.split(data, groups=groups)) |
| |
| X_train_val, X_test = data[train_val_idx], data[test_idx] |
| y_train_val, y_test = labels[train_val_idx], labels[test_idx] |
| |
| # Split train_val further |
| gss2 = GroupShuffleSplit( |
| n_splits=1, |
| test_size=0.2, |
| random_state=42 |
| ) |
| train_idx, val_idx = next(gss2.split( |
| X_train_val, |
| groups=groups[train_val_idx] |
| )) |
| |
| X_train, X_val = X_train_val[train_idx], X_train_val[val_idx] |
| y_train, y_val = y_train_val[train_idx], y_train_val[val_idx] |
| |
| return { |
| 'train': (X_train, y_train), |
| 'val': (X_val, y_val), |
| 'test': (X_test, y_test) |
| } |
| |
| # Usage example |
| splits = strategic_data_split(texts, labels, strategy='stratified') |
| print(f"Train: {len(splits['train'][0])}, Val: {len(splits['val'][0])}, Test: {len(splits['test'][0])}") |
| ``` |
|
|
| ### Split Ratio Guidelines |
|
|
| | Dataset Size | Train | Val | Test | Rationale | |
| |-------------|-------|-----|------|-----------| |
| | < 10K | 70% | 15% | 15% | Need more validation signal | |
| | 10K-100K | 80% | 10% | 10% | Balanced approach | |
| | 100K-1M | 90% | 5% | 5% | Large data, less validation needed | |
| | > 1M | 95% | 2.5% | 2.5% | Massive data, small holdouts sufficient | |
|
|
| ### Common Splitting Mistakes |
|
|
| ❌ **Data Leakage**: |
| ```python |
| # WRONG: Preprocessing before split |
| scaler.fit(data) # Fits on ALL data including test! |
| data_scaled = scaler.transform(data) |
| # Then split... |
| |
| # CORRECT: Split first |
| X_train, X_test = train_test_split(data, test_size=0.2) |
| scaler.fit(X_train) # Fit only on train |
| X_train_scaled = scaler.transform(X_train) |
| X_test_scaled = scaler.transform(X_test) # Transform test with train stats |
| ``` |
|
|
| ❌ **Temporal Leakage**: |
| ```python |
| # WRONG: Random shuffle on time-series |
| random.shuffle(time_series_data) |
| |
| # CORRECT: Respect temporal order |
| cutoff = int(len(data) * 0.8) |
| train = data[:cutoff] |
| test = data[cutoff:] |
| ``` |
|
|
| --- |
|
|
| ## Cross-Validation Techniques |
|
|
| ### K-Fold Cross-Validation |
|
|
| ```python |
| from sklearn.model_selection import KFold, StratifiedKFold |
| import torch |
| from torch.utils.data import DataLoader |
| |
| def k_fold_cross_validation(model_class, config, data, labels, k=5): |
| """ |
| Perform k-fold cross-validation for robust performance estimation. |
| |
| Returns metrics for each fold and aggregate statistics. |
| """ |
| skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42) |
| |
| fold_metrics = [] |
| |
| for fold, (train_idx, val_idx) in enumerate(skf.split(data, labels)): |
| print(f"\n{'='*50}") |
| print(f"FOLD {fold + 1}/{k}") |
| print(f"{'='*50}") |
| |
| # Prepare fold data |
| X_train, X_val = data[train_idx], data[val_idx] |
| y_train, y_val = labels[train_idx], labels[val_idx] |
| |
| # Initialize fresh model for this fold |
| model = model_class(config) |
| |
| # Train on this fold |
| trainer = Trainer( |
| model=model, |
| train_data=(X_train, y_train), |
| val_data=(X_val, y_val), |
| config=config |
| ) |
| |
| trainer.train() |
| |
| # Evaluate |
| metrics = trainer.evaluate() |
| fold_metrics.append(metrics) |
| |
| print(f"Fold {fold + 1} Accuracy: {metrics['accuracy']:.4f}") |
| print(f"Fold {fold + 1} F1: {metrics['f1']:.4f}") |
| |
| # Aggregate results |
| aggregated = {} |
| for key in fold_metrics[0].keys(): |
| values = [m[key] for m in fold_metrics] |
| aggregated[key] = { |
| 'mean': np.mean(values), |
| 'std': np.std(values), |
| 'min': np.min(values), |
| 'max': np.max(values), |
| 'all': values |
| } |
| |
| print(f"\n{'='*50}") |
| print("CROSS-VALIDATION SUMMARY") |
| print(f"{'='*50}") |
| for metric, stats in aggregated.items(): |
| print(f"{metric}: {stats['mean']:.4f} ± {stats['std']:.4f}") |
| print(f" Range: [{stats['min']:.4f}, {stats['max']:.4f}]") |
| |
| return aggregated, fold_metrics |
| |
| # Usage |
| cv_results, fold_details = k_fold_cross_validation( |
| MyModel, config, texts, labels, k=5 |
| ) |
| ``` |
|
|
| ### Nested Cross-Validation for Hyperparameter Tuning |
|
|
| ```python |
| from sklearn.model_selection import ParameterGrid |
| |
| def nested_cross_validation(model_class, base_config, data, labels, |
| param_grid, outer_k=5, inner_k=3): |
| """ |
| Nested CV: Outer loop for evaluation, inner loop for hyperparameter selection. |
| |
| Prevents optimistic bias from hyperparameter tuning. |
| """ |
| outer_cv = StratifiedKFold(n_splits=outer_k, shuffle=True, random_state=42) |
| outer_scores = [] |
| best_params_per_fold = [] |
| |
| for outer_fold, (outer_train_idx, outer_test_idx) in enumerate( |
| outer_cv.split(data, labels) |
| ): |
| print(f"\n{'='*60}") |
| print(f"OUTER FOLD {outer_fold + 1}/{outer_k}") |
| print(f"{'='*60}") |
| |
| # Outer split |
| X_outer_train = data[outer_train_idx] |
| X_outer_test = data[outer_test_idx] |
| y_outer_train = labels[outer_train_idx] |
| y_outer_test = labels[outer_test_idx] |
| |
| # Inner CV for hyperparameter selection |
| inner_cv = StratifiedKFold(n_splits=inner_k, shuffle=True, random_state=42) |
| |
| param_scores = {params: [] for params in ParameterGrid(param_grid)} |
| |
| for inner_train_idx, inner_val_idx in inner_cv.split( |
| X_outer_train, y_outer_train |
| ): |
| X_inner_train = X_outer_train[inner_train_idx] |
| X_inner_val = X_outer_train[inner_val_idx] |
| y_inner_train = y_outer_train[inner_train_idx] |
| y_inner_val = y_outer_train[inner_val_idx] |
| |
| # Test each parameter combination |
| for params in ParameterGrid(param_grid): |
| config = {**base_config, **params} |
| model = model_class(config) |
| |
| trainer = Trainer( |
| model=model, |
| train_data=(X_inner_train, y_inner_train), |
| val_data=(X_inner_val, y_inner_val), |
| config=config |
| ) |
| |
| trainer.train() |
| metrics = trainer.evaluate() |
| |
| param_scores[params].append(metrics['accuracy']) |
| |
| # Select best parameters based on inner CV |
| best_params = max(param_scores.keys(), |
| key=lambda p: np.mean(param_scores[p])) |
| best_score = np.mean(param_scores[best_params]) |
| |
| print(f"Best params for outer fold {outer_fold + 1}: {best_params}") |
| print(f"Inner CV score: {best_score:.4f}") |
| |
| # Train final model for this outer fold with best params |
| final_config = {**base_config, **best_params} |
| final_model = model_class(final_config) |
| |
| final_trainer = Trainer( |
| model=final_model, |
| train_data=(X_outer_train, y_outer_train), |
| val_data=None, # Use all outer train data |
| config=final_config |
| ) |
| |
| final_trainer.train() |
| |
| # Evaluate on held-out outer test set |
| test_metrics = final_trainer.evaluate_on_test(X_outer_test, y_outer_test) |
| outer_scores.append(test_metrics['accuracy']) |
| best_params_per_fold.append(best_params) |
| |
| # Final aggregated results |
| print(f"\n{'='*60}") |
| print("NESTED CV FINAL RESULTS") |
| print(f"{'='*60}") |
| print(f"Test Accuracy: {np.mean(outer_scores):.4f} ± {np.std(outer_scores):.4f}") |
| print(f"Range: [{np.min(outer_scores):.4f}, {np.max(outer_scores):.4f}]") |
| |
| return { |
| 'mean_score': np.mean(outer_scores), |
| 'std_score': np.std(outer_scores), |
| 'scores': outer_scores, |
| 'best_params_per_fold': best_params_per_fold |
| } |
| |
| # Example usage |
| param_grid = { |
| 'learning_rate': [1e-5, 3e-5, 5e-5], |
| 'batch_size': [16, 32], |
| 'num_layers': [6, 12] |
| } |
| |
| nested_results = nested_cross_validation( |
| TransformerModel, |
| base_config, |
| data, |
| labels, |
| param_grid, |
| outer_k=5, |
| inner_k=3 |
| ) |
| ``` |
|
|
| ### Leave-One-Out and Leave-P-Out |
|
|
| ```python |
| from sklearn.model_selection import LeaveOneOut, LeavePOut |
| |
| def leave_one_out_validation(model_class, config, data, labels, max_samples=1000): |
| """ |
| Leave-One-Out CV: Extremely thorough but computationally expensive. |
| Use only for small datasets (< 1000 samples). |
| """ |
| if len(data) > max_samples: |
| print(f"Warning: LOO is too expensive for {len(data)} samples.") |
| print(f"Consider using k-fold with k=10 instead.") |
| return None |
| |
| loo = LeaveOneOut() |
| scores = [] |
| |
| for train_idx, test_idx in loo.split(data): |
| X_train, X_test = data[train_idx], data[test_idx] |
| y_train, y_test = labels[train_idx], labels[test_idx] |
| |
| model = model_class(config) |
| trainer = Trainer(model, (X_train, y_train), None, config) |
| trainer.train() |
| |
| metrics = trainer.evaluate_on_test(X_test, y_test) |
| scores.append(metrics['accuracy']) |
| |
| return {'mean': np.mean(scores), 'std': np.std(scores), 'all': scores} |
| ``` |
|
|
| --- |
|
|
| ## Statistical Significance Testing |
|
|
| ### Comparing Two Models |
|
|
| ```python |
| from scipy import stats |
| import numpy as np |
| |
| def paired_t_test(model_a_predictions, model_b_predictions, ground_truth): |
| """ |
| Paired t-test to compare two models on the same test set. |
| |
| Tests if the difference in performance is statistically significant. |
| """ |
| # Calculate per-sample correctness |
| correct_a = (model_a_predictions == ground_truth).astype(int) |
| correct_b = (model_b_predictions == ground_truth).astype(int) |
| |
| # Paired t-test on correctness scores |
| t_statistic, p_value = stats.ttest_rel(correct_a, correct_b) |
| |
| acc_a = np.mean(correct_a) |
| acc_b = np.mean(correct_b) |
| |
| print(f"Model A Accuracy: {acc_a:.4f}") |
| print(f"Model B Accuracy: {acc_b:.4f}") |
| print(f"Difference: {acc_b - acc_a:.4f}") |
| print(f"T-statistic: {t_statistic:.4f}") |
| print(f"P-value: {p_value:.6f}") |
| |
| if p_value < 0.05: |
| significance = "SIGNIFICANT" if acc_b > acc_a else "SIGNIFICANT (worse)" |
| print(f"Result: Model B is {significance} than Model A (p < 0.05)") |
| else: |
| print(f"Result: No significant difference (p >= 0.05)") |
| |
| return { |
| 't_statistic': t_statistic, |
| 'p_value': p_value, |
| 'significant': p_value < 0.05, |
| 'accuracy_difference': acc_b - acc_a |
| } |
| |
| def mcnemar_test(model_a_predictions, model_b_predictions, ground_truth): |
| """ |
| McNemar's test for paired nominal data. |
| More appropriate than t-test for classification accuracy comparison. |
| """ |
| # Build contingency table |
| both_correct = np.sum((model_a_predictions == ground_truth) & |
| (model_b_predictions == ground_truth)) |
| a_correct_b_wrong = np.sum((model_a_predictions == ground_truth) & |
| (model_b_predictions != ground_truth)) |
| a_wrong_b_correct = np.sum((model_a_predictions != ground_truth) & |
| (model_b_predictions == ground_truth)) |
| both_wrong = np.sum((model_a_predictions != ground_truth) & |
| (model_b_predictions != ground_truth)) |
| |
| print("Contingency Table:") |
| print(f" Model B Correct | Model B Wrong") |
| print(f"Model A Correct {both_correct:6d} {a_correct_b_wrong:6d}") |
| print(f"Model A Wrong {a_wrong_b_correct:6d} {both_wrong:6d}") |
| |
| # McNemar's test statistic (with continuity correction) |
| b = a_correct_b_wrong |
| c = a_wrong_b_correct |
| |
| if b + c == 0: |
| print("Cannot perform test: no discordant pairs") |
| return None |
| |
| chi2 = (abs(b - c) - 1) ** 2 / (b + c) |
| p_value = 1 - stats.chi2.cdf(chi2, 1) |
| |
| print(f"\nMcNemar's Chi-squared: {chi2:.4f}") |
| print(f"P-value: {p_value:.6f}") |
| |
| if p_value < 0.05: |
| winner = "Model B" if c > b else "Model A" |
| print(f"Result: {winner} is significantly better (p < 0.05)") |
| |
| return { |
| 'chi2': chi2, |
| 'p_value': p_value, |
| 'significant': p_value < 0.05, |
| 'discordant_pairs': {'b': b, 'c': c} |
| } |
| |
| # Bootstrap confidence intervals |
| def bootstrap_confidence_interval(predictions, ground_truth, |
| metric_fn, n_bootstrap=1000, |
| confidence_level=0.95): |
| """ |
| Estimate confidence intervals using bootstrapping. |
| """ |
| n_samples = len(predictions) |
| bootstrap_scores = [] |
| |
| for i in range(n_bootstrap): |
| # Sample with replacement |
| indices = np.random.choice(n_samples, size=n_samples, replace=True) |
| sampled_preds = predictions[indices] |
| sampled_true = ground_truth[indices] |
| |
| score = metric_fn(sampled_preds, sampled_true) |
| bootstrap_scores.append(score) |
| |
| # Calculate confidence interval |
| alpha = 1 - confidence_level |
| lower_percentile = alpha / 2 * 100 |
| upper_percentile = (1 - alpha / 2) * 100 |
| |
| ci_lower = np.percentile(bootstrap_scores, lower_percentile) |
| ci_upper = np.percentile(bootstrap_scores, upper_percentile) |
| mean_score = np.mean(bootstrap_scores) |
| std_score = np.std(bootstrap_scores) |
| |
| print(f"Bootstrap Results ({n_bootstrap} iterations):") |
| print(f"Mean Score: {mean_score:.4f}") |
| print(f"Std Dev: {std_score:.4f}") |
| print(f"{confidence_level*100}% CI: [{ci_lower:.4f}, {ci_upper:.4f}]") |
| |
| return { |
| 'mean': mean_score, |
| 'std': std_score, |
| 'ci_lower': ci_lower, |
| 'ci_upper': ci_upper, |
| 'bootstrap_scores': bootstrap_scores |
| } |
| |
| # Usage example |
| result = paired_t_test(preds_v1, preds_v2, labels) |
| mcnemar_result = mcnemar_test(preds_v1, preds_v2, labels) |
| ci_result = bootstrap_confidence_interval(preds_v2, labels, accuracy_score) |
| ``` |
|
|
| ### Multiple Comparison Correction |
|
|
| ```python |
| from statsmodels.stats.multitest import multipletests |
| |
| def compare_multiple_models(model_predictions_list, ground_truth, method='fdr_bh'): |
| """ |
| Compare multiple models with correction for multiple comparisons. |
| |
| Args: |
| model_predictions_list: List of (model_name, predictions) tuples |
| ground_truth: True labels |
| method: Correction method ('bonferroni', 'fdr_bh', 'holm', etc.) |
| """ |
| # Use first model as baseline |
| baseline_name, baseline_preds = model_predictions_list[0] |
| baseline_correct = (baseline_preds == ground_truth).astype(int) |
| |
| p_values = [] |
| model_names = [] |
| |
| for model_name, model_preds in model_predictions_list[1:]: |
| model_correct = (model_preds == ground_truth).astype(int) |
| |
| # Paired t-test against baseline |
| _, p_value = stats.ttest_rel(model_correct, baseline_correct) |
| p_values.append(p_value) |
| model_names.append(model_name) |
| |
| # Apply correction |
| reject, corrected_p_values, _, _ = multipletests( |
| p_values, |
| alpha=0.05, |
| method=method |
| ) |
| |
| print(f"Multiple Comparison Correction: {method}") |
| print(f"{'Model':<20} {'Raw P':<10} {'Corrected P':<12} {'Significant'}") |
| print("-" * 55) |
| |
| for name, raw_p, corr_p, sig in zip(model_names, p_values, corrected_p_values, reject): |
| print(f"{name:<20} {raw_p:<10.6f} {corr_p:<12.6f} {'Yes' if sig else 'No'}") |
| |
| return { |
| 'model_names': model_names, |
| 'raw_p_values': p_values, |
| 'corrected_p_values': corrected_p_values, |
| 'reject_null': reject |
| } |
| ``` |
|
|
| --- |
|
|
| ## Bias and Fairness Detection |
|
|
| ### Demographic Parity and Equalized Odds |
|
|
| ```python |
| import pandas as pd |
| from sklearn.metrics import confusion_matrix |
| |
| class FairnessAuditor: |
| def __init__(self, predictions, ground_truth, sensitive_attributes): |
| """ |
| Args: |
| predictions: Model predictions |
| ground_truth: True labels |
| sensitive_attributes: Dict of protected attributes |
| (e.g., {'gender': [...], 'race': [...]}) |
| """ |
| self.predictions = np.array(predictions) |
| self.ground_truth = np.array(ground_truth) |
| self.sensitive_attributes = sensitive_attributes |
| |
| def demographic_parity(self, attribute_name): |
| """ |
| Check if positive prediction rates are equal across groups. |
| |
| Demographic parity: P(Ŷ=1|A=0) = P(Ŷ=1|A=1) |
| """ |
| attribute = self.sensitive_attributes[attribute_name] |
| unique_groups = np.unique(attribute) |
| |
| positive_rates = {} |
| for group in unique_groups: |
| mask = attribute == group |
| positive_rate = np.mean(self.predictions[mask] == 1) |
| positive_rates[group] = positive_rate |
| |
| # Calculate disparity |
| rates = list(positive_rates.values()) |
| max_disparity = max(rates) - min(rates) |
| |
| print(f"Demographic Parity for '{attribute_name}':") |
| print(f"{'Group':<15} {'Positive Rate':<15}") |
| print("-" * 30) |
| for group, rate in positive_rates.items(): |
| print(f"{str(group):<15} {rate:.4f}") |
| print(f"\nMax Disparity: {max_disparity:.4f}") |
| |
| # Rule of thumb: disparity < 0.1 is acceptable |
| passed = max_disparity < 0.1 |
| print(f"Status: {'PASS' if passed else 'FAIL'} (threshold: 0.1)") |
| |
| return { |
| 'positive_rates': positive_rates, |
| 'max_disparity': max_disparity, |
| 'passed': passed |
| } |
| |
| def equalized_odds(self, attribute_name): |
| """ |
| Check if TPR and FPR are equal across groups. |
| |
| Equalized odds: P(Ŷ=1|Y=1,A=0) = P(Ŷ=1|Y=1,A=1) |
| and P(Ŷ=1|Y=0,A=0) = P(Ŷ=1|Y=0,A=1) |
| """ |
| attribute = self.sensitive_attributes[attribute_name] |
| unique_groups = np.unique(attribute) |
| |
| tpr_by_group = {} |
| fpr_by_group = {} |
| |
| for group in unique_groups: |
| mask = attribute == group |
| |
| # True Positive Rate (Recall) |
| positive_mask = mask & (self.ground_truth == 1) |
| if np.sum(positive_mask) > 0: |
| tpr = np.mean(self.predictions[positive_mask] == 1) |
| else: |
| tpr = 0.0 |
| |
| # False Positive Rate |
| negative_mask = mask & (self.ground_truth == 0) |
| if np.sum(negative_mask) > 0: |
| fpr = np.mean(self.predictions[negative_mask] == 1) |
| else: |
| fpr = 0.0 |
| |
| tpr_by_group[group] = tpr |
| fpr_by_group[group] = fpr |
| |
| # Calculate disparities |
| tpr_disparity = max(tpr_by_group.values()) - min(tpr_by_group.values()) |
| fpr_disparity = max(fpr_by_group.values()) - min(fpr_by_group.values()) |
| |
| print(f"\nEqualized Odds for '{attribute_name}':") |
| print(f"{'Group':<10} {'TPR':<10} {'FPR':<10}") |
| print("-" * 30) |
| for group in unique_groups: |
| print(f"{str(group):<10} {tpr_by_group[group]:.4f} {fpr_by_group[group]:.4f}") |
| |
| print(f"\nTPR Disparity: {tpr_disparity:.4f}") |
| print(f"FPR Disparity: {fpr_disparity:.4f}") |
| |
| passed = (tpr_disparity < 0.1) and (fpr_disparity < 0.1) |
| print(f"Status: {'PASS' if passed else 'FAIL'} (threshold: 0.1)") |
| |
| return { |
| 'tpr_by_group': tpr_by_group, |
| 'fpr_by_group': fpr_by_group, |
| 'tpr_disparity': tpr_disparity, |
| 'fpr_disparity': fpr_disparity, |
| 'passed': passed |
| } |
| |
| def predictive_parity(self, attribute_name): |
| """ |
| Check if precision is equal across groups. |
| |
| Predictive parity: P(Y=1|Ŷ=1,A=0) = P(Y=1|Ŷ=1,A=1) |
| """ |
| attribute = self.sensitive_attributes[attribute_name] |
| unique_groups = np.unique(attribute) |
| |
| precision_by_group = {} |
| |
| for group in unique_groups: |
| mask = attribute == group |
| predicted_positive_mask = mask & (self.predictions == 1) |
| |
| if np.sum(predicted_positive_mask) > 0: |
| precision = np.mean(self.ground_truth[predicted_positive_mask] == 1) |
| else: |
| precision = 0.0 |
| |
| precision_by_group[group] = precision |
| |
| disparity = max(precision_by_group.values()) - min(precision_by_group.values()) |
| |
| print(f"\nPredictive Parity for '{attribute_name}':") |
| print(f"{'Group':<15} {'Precision':<15}") |
| print("-" * 30) |
| for group, prec in precision_by_group.items(): |
| print(f"{str(group):<15} {prec:.4f}") |
| print(f"\nDisparity: {disparity:.4f}") |
| |
| passed = disparity < 0.1 |
| print(f"Status: {'PASS' if passed else 'FAIL'}") |
| |
| return { |
| 'precision_by_group': precision_by_group, |
| 'disparity': disparity, |
| 'passed': passed |
| } |
| |
| def generate_fairness_report(self): |
| """Generate comprehensive fairness report for all attributes.""" |
| report = {} |
| |
| for attr_name in self.sensitive_attributes.keys(): |
| print(f"\n{'='*60}") |
| print(f"FAIRNESS AUDIT: {attr_name.upper()}") |
| print(f"{'='*60}") |
| |
| report[attr_name] = { |
| 'demographic_parity': self.demographic_parity(attr_name), |
| 'equalized_odds': self.equalized_odds(attr_name), |
| 'predictive_parity': self.predictive_parity(attr_name) |
| } |
| |
| return report |
| |
| # Usage example |
| auditor = FairnessAuditor( |
| predictions=model_preds, |
| ground_truth=true_labels, |
| sensitive_attributes={ |
| 'gender': gender_array, |
| 'age_group': age_group_array, |
| 'region': region_array |
| } |
| ) |
| |
| fairness_report = auditor.generate_fairness_report() |
| ``` |
|
|
| ### Bias Mitigation Strategies |
|
|
| ```python |
| class BiasMitigator: |
| def __init__(self, model, training_data, sensitive_attributes): |
| self.model = model |
| self.training_data = training_data |
| self.sensitive_attributes = sensitive_attributes |
| |
| def reweighting(self, target_attribute): |
| """ |
| Reweight samples to balance representation across groups. |
| """ |
| attribute = self.sensitive_attributes[target_attribute] |
| unique_groups, counts = np.unique(attribute, return_counts=True) |
| |
| # Calculate weights inversely proportional to group size |
| total_samples = len(attribute) |
| weights = np.zeros_like(attribute, dtype=float) |
| |
| for group, count in zip(unique_groups, counts): |
| mask = attribute == group |
| # Weight = total_samples / (num_groups * count_in_group) |
| weights[mask] = total_samples / (len(unique_groups) * count) |
| |
| # Normalize weights |
| weights = weights * len(attribute) / np.sum(weights) |
| |
| print(f"Reweighting for '{target_attribute}':") |
| print(f"Weight range: [{weights.min():.4f}, {weights.max():.4f}]") |
| |
| return weights |
| |
| def adversarial_debiasing(self, debias_epochs=10): |
| """ |
| Train adversary to predict sensitive attribute from representations. |
| Update model to minimize adversary's success. |
| """ |
| # Implementation would add adversarial head to model |
| # and alternate between main task and adversarial training |
| pass |
| |
| def threshold_optimization(self, val_predictions, val_labels, |
| sensitive_attributes, target_attribute): |
| """ |
| Optimize decision thresholds per group to equalize metrics. |
| """ |
| attribute = sensitive_attributes[target_attribute] |
| unique_groups = np.unique(attribute) |
| |
| optimal_thresholds = {} |
| |
| for group in unique_groups: |
| mask = attribute == group |
| group_preds = val_predictions[mask] |
| group_labels = val_labels[mask] |
| |
| # Find threshold that equalizes TPR across groups |
| best_threshold = 0.5 |
| best_metric = 0 |
| |
| for threshold in np.arange(0.1, 0.9, 0.05): |
| binarized_preds = (group_preds > threshold).astype(int) |
| tpr = np.sum((binarized_preds == 1) & (group_labels == 1)) / \ |
| np.sum(group_labels == 1) |
| |
| # Optimize for equal TPR (simplified) |
| if tpr > best_metric: |
| best_metric = tpr |
| best_threshold = threshold |
| |
| optimal_thresholds[group] = best_threshold |
| |
| print(f"Optimal thresholds for '{target_attribute}':") |
| for group, thresh in optimal_thresholds.items(): |
| print(f" Group {group}: {thresh:.2f}") |
| |
| return optimal_thresholds |
| ``` |
|
|
| --- |
|
|
| ## Robustness Testing |
|
|
| ### Perturbation Testing |
|
|
| ```python |
| import random |
| import string |
| |
| class RobustnessTester: |
| def __init__(self, model, tokenizer): |
| self.model = model |
| self.tokenizer = tokenizer |
| |
| def character_level_perturbations(self, texts, perturbation_rate=0.1): |
| """ |
| Test robustness to character-level noise. |
| |
| Types: |
| - Random character insertion |
| - Random character deletion |
| - Random character substitution |
| - Adjacent character swap |
| """ |
| perturbed_texts = [] |
| |
| for text in texts: |
| chars = list(text) |
| num_perturbations = max(1, int(len(chars) * perturbation_rate)) |
| |
| for _ in range(num_perturbations): |
| op = random.choice(['insert', 'delete', 'substitute', 'swap']) |
| idx = random.randint(0, len(chars) - 1) |
| |
| if op == 'insert' and chars[idx].isalpha(): |
| chars.insert(idx, random.choice(string.ascii_lowercase)) |
| elif op == 'delete' and len(chars) > 1: |
| chars.pop(idx) |
| elif op == 'substitute' and chars[idx].isalpha(): |
| chars[idx] = random.choice(string.ascii_lowercase) |
| elif op == 'swap' and idx < len(chars) - 1: |
| chars[idx], chars[idx + 1] = chars[idx + 1], chars[idx] |
| |
| perturbed_texts.append(''.join(chars)) |
| |
| return perturbed_texts |
| |
| def word_level_perturbations(self, texts, perturbation_rate=0.1): |
| """ |
| Test robustness to word-level noise. |
| |
| Types: |
| - Random word deletion |
| - Random word insertion (synonyms or random) |
| - Word order shuffling (local) |
| """ |
| perturbed_texts = [] |
| |
| for text in texts: |
| words = text.split() |
| num_perturbations = max(1, int(len(words) * perturbation_rate)) |
| |
| for _ in range(num_perturbations): |
| op = random.choice(['delete', 'insert', 'shuffle']) |
| idx = random.randint(0, len(words) - 1) |
| |
| if op == 'delete' and len(words) > 1: |
| words.pop(idx) |
| elif op == 'insert': |
| # Insert random common word |
| common_words = ['the', 'a', 'is', 'it', 'this', 'that'] |
| words.insert(idx, random.choice(common_words)) |
| elif op == 'shuffle' and idx < len(words) - 1: |
| words[idx], words[idx + 1] = words[idx + 1], words[idx] |
| |
| perturbed_texts.append(' '.join(words)) |
| |
| return perturbed_texts |
| |
| def synonym_replacement(self, texts, replacement_rate=0.1): |
| """ |
| Replace words with synonyms using WordNet or similar. |
| """ |
| try: |
| from nltk.corpus import wordnet |
| from nltk import word_tokenize, pos_tag |
| except ImportError: |
| print("NLTK not available. Install with: pip install nltk") |
| return texts |
| |
| perturbed_texts = [] |
| |
| for text in texts: |
| words = text.split() |
| num_replacements = max(1, int(len(words) * replacement_rate)) |
| |
| for _ in range(num_replacements): |
| idx = random.randint(0, len(words) - 1) |
| word = words[idx] |
| |
| # Get synonyms |
| synsets = wordnet.synsets(word) |
| if synsets: |
| synonyms = [] |
| for synset in synsets: |
| for lemma in synset.lemmas(): |
| synonym = lemma.name().replace('_', ' ') |
| if synonym.lower() != word.lower(): |
| synonyms.append(synonym) |
| |
| if synonyms: |
| words[idx] = random.choice(synonyms) |
| |
| perturbed_texts.append(' '.join(words)) |
| |
| return perturbed_texts |
| |
| def back_translation(self, texts, intermediate_lang='de'): |
| """ |
| Test robustness via back-translation (round-trip translation). |
| |
| Translate to intermediate language and back. |
| """ |
| try: |
| from transformers import MarianMTModel, MarianTokenizer |
| |
| # Load translation models |
| trans_to_tokenizer = MarianTokenizer.from_pretrained( |
| f'Helsinki-NLP/opus-mt-en-{intermediate_lang}' |
| ) |
| trans_to_model = MarianMTModel.from_pretrained( |
| f'Helsinki-NLP/opus-mt-en-{intermediate_lang}' |
| ) |
| |
| trans_back_tokenizer = MarianTokenizer.from_pretrained( |
| f'Helsinki-NLP/opus-mt-{intermediate_lang}-en' |
| ) |
| trans_back_model = MarianMTModel.from_pretrained( |
| f'Helsinki-NLP/opus-mt-{intermediate_lang}-en' |
| ) |
| except Exception as e: |
| print(f"Translation models not available: {e}") |
| return texts |
| |
| perturbed_texts = [] |
| |
| for text in texts: |
| # Translate to intermediate language |
| inputs_to = trans_to_tokenizer(text, return_tensors='pt', padding=True) |
| translated_to = trans_to_model.generate(**inputs_to) |
| intermediate = trans_to_tokenizer.decode(translated_to[0], skip_special_tokens=True) |
| |
| # Translate back to English |
| inputs_back = trans_back_tokenizer(intermediate, return_tensors='pt', padding=True) |
| translated_back = trans_back_model.generate(**inputs_back) |
| back_translated = trans_back_tokenizer.decode( |
| translated_back[0], |
| skip_special_tokens=True |
| ) |
| |
| perturbed_texts.append(back_translated) |
| |
| return perturbed_texts |
| |
| def evaluate_robustness(self, original_texts, labels, perturbation_fn, |
| perturbation_name): |
| """ |
| Evaluate model performance on perturbed data. |
| """ |
| perturbed_texts = perturbation_fn(original_texts) |
| |
| # Get predictions for original and perturbed |
| orig_preds = self.model.predict(original_texts) |
| pert_preds = self.model.predict(perturbed_texts) |
| |
| # Calculate metrics |
| orig_accuracy = np.mean(orig_preds == labels) |
| pert_accuracy = np.mean(pert_preds == labels) |
| |
| # Consistency: predictions unchanged despite perturbation |
| consistency = np.mean(orig_preds == pert_preds) |
| |
| print(f"\nRobustness Test: {perturbation_name}") |
| print(f"Original Accuracy: {orig_accuracy:.4f}") |
| print(f"Perturbed Accuracy: {pert_accuracy:.4f}") |
| print(f"Accuracy Drop: {orig_accuracy - pert_accuracy:.4f}") |
| print(f"Prediction Consistency: {consistency:.4f}") |
| |
| return { |
| 'original_accuracy': orig_accuracy, |
| 'perturbed_accuracy': pert_accuracy, |
| 'accuracy_drop': orig_accuracy - pert_accuracy, |
| 'consistency': consistency |
| } |
| |
| def comprehensive_robustness_suite(self, texts, labels): |
| """Run all robustness tests.""" |
| results = {} |
| |
| tests = [ |
| ('Character Insertion', lambda t: self.character_level_perturbations(t, 0.1)), |
| ('Character Deletion', lambda t: self.character_level_perturbations(t, 0.1)), |
| ('Word Deletion', lambda t: self.word_level_perturbations(t, 0.1)), |
| ('Word Insertion', lambda t: self.word_level_perturbations(t, 0.1)), |
| ('Synonym Replacement', lambda t: self.synonym_replacement(t, 0.1)), |
| ] |
| |
| for test_name, pert_fn in tests: |
| results[test_name] = self.evaluate_robustness( |
| texts, labels, pert_fn, test_name |
| ) |
| |
| # Summary |
| print(f"\n{'='*60}") |
| print("ROBUSTNESS SUMMARY") |
| print(f"{'='*60}") |
| print(f"{'Test':<25} {'Acc Drop':<12} {'Consistency':<12}") |
| print("-" * 50) |
| |
| for test_name, metrics in results.items(): |
| print(f"{test_name:<25} {metrics['accuracy_drop']:.4f} {metrics['consistency']:.4f}") |
| |
| return results |
| ``` |
|
|
| ### Stress Testing |
|
|
| ```python |
| class StressTester: |
| def __init__(self, model): |
| self.model = model |
| |
| def length_stress_test(self, texts, labels): |
| """Test performance across different input lengths.""" |
| lengths = [len(text.split()) for text in texts] |
| |
| # Bin by length |
| bins = [(0, 10), (10, 25), (25, 50), (50, 100), (100, float('inf'))] |
| results = {} |
| |
| for min_len, max_len in bins: |
| mask = [(min_len <= l < max_len) for l in lengths] |
| bin_texts = [t for t, m in zip(texts, mask) if m] |
| bin_labels = [l for l, m in zip(labels, mask) if m] |
| |
| if len(bin_labels) > 0: |
| preds = self.model.predict(bin_texts) |
| accuracy = np.mean(preds == bin_labels) |
| |
| bin_name = f"{min_len}-{max_len if max_len != float('inf') else '∞'}" |
| results[bin_name] = { |
| 'count': len(bin_labels), |
| 'accuracy': accuracy |
| } |
| |
| print(f"Length {bin_name}: {accuracy:.4f} (n={len(bin_labels)})") |
| |
| return results |
| |
| def rare_class_stress_test(self, texts, labels): |
| """Test performance on rare/underrepresented classes.""" |
| unique_classes, counts = np.unique(labels, return_counts=True) |
| |
| results = {} |
| |
| for cls, count in zip(unique_classes, counts): |
| mask = labels == cls |
| cls_texts = [t for t, m in zip(texts, mask) if m] |
| cls_labels = [l for l, m in zip(labels, mask) if m] |
| |
| preds = self.model.predict(cls_texts) |
| accuracy = np.mean(preds == cls_labels) |
| |
| frequency = 'rare' if count < len(labels) * 0.05 else 'common' |
| |
| results[cls] = { |
| 'count': count, |
| 'frequency': frequency, |
| 'accuracy': accuracy |
| } |
| |
| print(f"Class {cls}: {accuracy:.4f} (n={count}, {frequency})") |
| |
| return results |
| |
| def edge_case_stress_test(self, edge_cases): |
| """Test specific edge cases.""" |
| results = {} |
| |
| for case_name, (text, expected_label) in edge_cases.items(): |
| pred = self.model.predict([text])[0] |
| correct = pred == expected_label |
| |
| results[case_name] = { |
| 'expected': expected_label, |
| 'predicted': pred, |
| 'correct': correct |
| } |
| |
| status = "✓" if correct else "✗" |
| print(f"{status} {case_name}: Expected={expected_label}, Got={pred}") |
| |
| return results |
| ``` |
|
|
| --- |
|
|
| ## Adversarial Testing |
|
|
| ### TextFooler-Style Adversarial Attacks |
|
|
| ```python |
| class AdversarialAttacker: |
| def __init__(self, model, tokenizer): |
| self.model = model |
| self.tokenizer = tokenizer |
| |
| def calculate_importance_scores(self, text, label): |
| """ |
| Calculate importance score for each word by masking. |
| """ |
| words = text.split() |
| importance_scores = [] |
| |
| # Get original prediction probability |
| orig_probs = self.model.predict_proba([text])[0] |
| orig_prob = orig_probs[label] |
| |
| for i, word in enumerate(words): |
| # Mask this word |
| masked_words = words[:i] + ['[MASK]'] + words[i+1:] |
| masked_text = ' '.join(masked_words) |
| |
| # Get prediction with masked word |
| masked_probs = self.model.predict_proba([masked_text])[0] |
| masked_prob = masked_probs[label] |
| |
| # Importance = drop in probability |
| importance = orig_prob - masked_prob |
| importance_scores.append(importance) |
| |
| return importance_scores |
| |
| def find_synonyms(self, word, top_k=10): |
| """Find synonyms for a word.""" |
| try: |
| from nltk.corpus import wordnet |
| except ImportError: |
| return [word] |
| |
| synonyms = [] |
| for synset in wordnet.synsets(word): |
| for lemma in synset.lemmas(): |
| synonym = lemma.name().replace('_', ' ') |
| if synonym.lower() != word.lower() and synonym.isalpha(): |
| synonyms.append(synonym) |
| |
| return synonyms[:top_k] |
| |
| def textfooler_attack(self, text, true_label, max_iterations=10): |
| """ |
| Implement TextFooler-style adversarial attack. |
| |
| Strategy: |
| 1. Identify important words |
| 2. Replace with synonyms that change prediction |
| 3. Ensure semantic similarity and grammaticality |
| """ |
| words = text.split() |
| current_text = text |
| attacked_words = set() |
| |
| for iteration in range(max_iterations): |
| # Get current prediction |
| pred = self.model.predict([current_text])[0] |
| |
| # Check if attack succeeded |
| if pred != true_label: |
| print(f"Attack succeeded at iteration {iteration + 1}") |
| return { |
| 'success': True, |
| 'adversarial_text': current_text, |
| 'iterations': iteration + 1, |
| 'attacked_words': attacked_words |
| } |
| |
| # Calculate importance scores |
| importance_scores = self.calculate_importance_scores( |
| current_text, true_label |
| ) |
| |
| # Sort words by importance (descending) |
| sorted_indices = np.argsort(importance_scores)[::-1] |
| |
| # Try to replace most important unattacked word |
| attacked = False |
| for idx in sorted_indices: |
| if idx in attacked_words: |
| continue |
| |
| word = words[idx] |
| synonyms = self.find_synonyms(word) |
| |
| # Try each synonym |
| for synonym in synonyms: |
| # Create candidate text |
| candidate_words = words.copy() |
| candidate_words[idx] = synonym |
| candidate_text = ' '.join(candidate_words) |
| |
| # Check if prediction changes |
| new_pred = self.model.predict([candidate_text])[0] |
| |
| if new_pred != true_label: |
| # Attack successful |
| current_text = candidate_text |
| words = candidate_words |
| attacked_words.add(idx) |
| attacked = True |
| break |
| |
| # Also check if probability of true label decreases |
| orig_probs = self.model.predict_proba([current_text])[0] |
| new_probs = self.model.predict_proba([candidate_text])[0] |
| |
| if new_probs[true_label] < orig_probs[true_label]: |
| # Accept if it reduces confidence |
| current_text = candidate_text |
| words = candidate_words |
| attacked_words.add(idx) |
| attacked = True |
| break |
| |
| if attacked: |
| break |
| |
| if not attacked: |
| print("No successful perturbation found") |
| break |
| |
| return { |
| 'success': False, |
| 'adversarial_text': current_text, |
| 'iterations': max_iterations, |
| 'attacked_words': attacked_words |
| } |
| |
| def generate_adversarial_dataset(self, texts, labels, attack_rate=0.3): |
| """ |
| Generate adversarial examples for a portion of the dataset. |
| """ |
| num_to_attack = int(len(texts) * attack_rate) |
| indices = np.random.choice(len(texts), num_to_attack, replace=False) |
| |
| adversarial_examples = [] |
| success_count = 0 |
| |
| for idx in indices: |
| text = texts[idx] |
| label = labels[idx] |
| |
| result = self.textfooler_attack(text, label) |
| |
| if result['success']: |
| success_count += 1 |
| adversarial_examples.append({ |
| 'original': text, |
| 'adversarial': result['adversarial_text'], |
| 'label': label, |
| 'iterations': result['iterations'] |
| }) |
| |
| attack_success_rate = success_count / num_to_attack |
| |
| print(f"\nAdversarial Attack Summary:") |
| print(f"Attempted: {num_to_attack}") |
| print(f"Successful: {success_count}") |
| print(f"Success Rate: {attack_success_rate:.4f}") |
| |
| return adversarial_examples, attack_success_rate |
| ``` |
|
|
| ### Adversarial Training |
|
|
| ```python |
| class AdversarialTrainer: |
| def __init__(self, model, attacker, config): |
| self.model = model |
| self.attacker = attacker |
| self.config = config |
| |
| def train_with_adversarial_examples(self, train_loader, adversarial_ratio=0.5): |
| """ |
| Train model augmented with adversarial examples. |
| |
| Mix of clean and adversarial examples improves robustness. |
| """ |
| self.model.train() |
| |
| for epoch in range(self.config.num_epochs): |
| total_loss = 0 |
| |
| for batch_idx, (texts, labels) in enumerate(train_loader): |
| # Get adversarial examples for this batch |
| adv_examples = [] |
| adv_labels = [] |
| |
| for text, label in zip(texts, labels): |
| if random.random() < adversarial_ratio: |
| result = self.attacker.textfooler_attack(text, label, max_iterations=5) |
| if result['success']: |
| adv_examples.append(result['adversarial_text']) |
| adv_labels.append(label) |
| else: |
| adv_examples.append(text) |
| adv_labels.append(label) |
| else: |
| adv_examples.append(text) |
| adv_labels.append(label) |
| |
| # Train on mixed batch |
| loss = self.model.train_step(adv_examples, adv_labels) |
| total_loss += loss |
| |
| avg_loss = total_loss / len(train_loader) |
| print(f"Epoch {epoch + 1}: Loss = {avg_loss:.4f}") |
| |
| return self.model |
| ``` |
|
|
| --- |
|
|
| ## Domain Shift Detection |
|
|
| ### Distribution Comparison |
|
|
| ```python |
| from scipy.spatial.distance import jensenshannon, wasserstein_distance |
| from sklearn.manifold import TSNE |
| import matplotlib.pyplot as plt |
| |
| class DomainShiftDetector: |
| def __init__(self, source_data, target_data, model): |
| """ |
| Args: |
| source_data: Training distribution data |
| target_data: New/target distribution data |
| model: Trained model for extracting representations |
| """ |
| self.source_data = source_data |
| self.target_data = target_data |
| self.model = model |
| |
| def extract_representations(self, texts): |
| """Extract hidden representations from model.""" |
| representations = [] |
| |
| for text in texts: |
| rep = self.model.get_hidden_representation(text) |
| representations.append(rep) |
| |
| return np.array(representations) |
| |
| def compare_label_distributions(self, source_labels, target_labels): |
| """Compare label distribution between domains.""" |
| # Get unique labels |
| all_labels = np.unique(np.concatenate([source_labels, target_labels])) |
| |
| # Calculate distributions |
| source_dist = np.array([np.mean(source_labels == l) for l in all_labels]) |
| target_dist = np.array([np.mean(target_labels == l) for l in all_labels]) |
| |
| # Jensen-Shannon divergence |
| js_div = jensenshannon(source_dist, target_dist) |
| |
| # Total Variation Distance |
| tv_dist = 0.5 * np.sum(np.abs(source_dist - target_dist)) |
| |
| print(f"Label Distribution Comparison:") |
| print(f"Jensen-Shannon Divergence: {js_div:.4f}") |
| print(f"Total Variation Distance: {tv_dist:.4f}") |
| |
| # Visualization |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| x = np.arange(len(all_labels)) |
| width = 0.35 |
| |
| ax.bar(x - width/2, source_dist, width, label='Source', alpha=0.8) |
| ax.bar(x + width/2, target_dist, width, label='Target', alpha=0.08) |
| |
| ax.set_xlabel('Class') |
| ax.set_ylabel('Proportion') |
| ax.set_title('Label Distribution: Source vs Target') |
| ax.set_xticks(x) |
| ax.set_xticklabels(all_labels) |
| ax.legend() |
| |
| plt.tight_layout() |
| plt.savefig('label_distribution_comparison.png') |
| plt.show() |
| |
| return { |
| 'js_divergence': js_div, |
| 'tv_distance': tv_dist, |
| 'source_distribution': source_dist, |
| 'target_distribution': target_dist |
| } |
| |
| def compare_feature_distributions(self): |
| """Compare feature distributions using statistical tests.""" |
| # Extract representations |
| print("Extracting source representations...") |
| source_reps = self.extract_representations(self.source_data) |
| |
| print("Extracting target representations...") |
| target_reps = self.extract_representations(self.target_data) |
| |
| # Per-feature comparison (Kolmogorov-Smirnov test) |
| n_features = source_reps.shape[1] |
| ks_statistics = [] |
| ks_p_values = [] |
| |
| for i in range(min(n_features, 100)): # Sample features for efficiency |
| stat, p_val = stats.ks_2samp(source_reps[:, i], target_reps[:, i]) |
| ks_statistics.append(stat) |
| ks_p_values.append(p_val) |
| |
| # Summary statistics |
| mean_ks_stat = np.mean(ks_statistics) |
| frac_significant = np.mean([p < 0.05 for p in ks_p_values]) |
| |
| print(f"\nFeature Distribution Comparison:") |
| print(f"Mean KS Statistic: {mean_ks_stat:.4f}") |
| print(f"Fraction of Significant Features (p<0.05): {frac_significant:.4f}") |
| |
| # Wasserstein distance on aggregated representations |
| source_means = np.mean(source_reps, axis=0) |
| target_means = np.mean(target_reps, axis=0) |
| |
| wasserstein_dist = wasserstein_distance(source_means, target_means) |
| print(f"Wasserstein Distance (means): {wasserstein_dist:.4f}") |
| |
| return { |
| 'mean_ks_statistic': mean_ks_stat, |
| 'frac_significant': frac_significant, |
| 'wasserstein_distance': wasserstein_dist |
| } |
| |
| def visualize_domain_shift(self): |
| """Visualize domain shift using t-SNE.""" |
| # Combine data |
| all_data = np.concatenate([self.source_data, self.target_data]) |
| all_labels = ['Source'] * len(self.source_data) + \ |
| ['Target'] * len(self.target_data) |
| |
| # Extract representations |
| reps = self.extract_representations(all_data) |
| |
| # t-SNE visualization |
| tsne = TSNE(n_components=2, random_state=42, perplexity=30) |
| reps_2d = tsne.fit_transform(reps) |
| |
| # Plot |
| fig, ax = plt.subplots(figsize=(12, 10)) |
| |
| source_mask = np.array(all_labels) == 'Source' |
| target_mask = np.array(all_labels) == 'Target' |
| |
| ax.scatter(reps_2d[source_mask, 0], reps_2d[source_mask, 1], |
| alpha=0.5, label='Source', s=10) |
| ax.scatter(reps_2d[target_mask, 0], reps_2d[target_mask, 1], |
| alpha=0.5, label='Target', s=10) |
| |
| ax.set_xlabel('t-SNE Dimension 1') |
| ax.set_ylabel('t-SNE Dimension 2') |
| ax.set_title('Domain Shift Visualization: Source vs Target') |
| ax.legend() |
| ax.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| plt.savefig('domain_shift_tsne.png') |
| plt.show() |
| |
| def detect_covariate_shift(self): |
| """Detect covariate shift using KLIEP-like method.""" |
| # Simplified covariate shift detection |
| source_reps = self.extract_representations(self.source_data) |
| target_reps = self.extract_representations(self.target_data) |
| |
| # Train classifier to distinguish source from target |
| X = np.concatenate([source_reps, target_reps]) |
| y = np.concatenate([ |
| np.zeros(len(source_reps)), |
| np.ones(len(target_reps)) |
| ]) |
| |
| from sklearn.linear_model import LogisticRegression |
| clf = LogisticRegression(random_state=42) |
| clf.fit(X, y) |
| |
| # If classifier can easily distinguish, there's significant shift |
| accuracy = clf.score(X, y) |
| |
| print(f"Covariate Shift Detection:") |
| print(f"Source/Target Classifier Accuracy: {accuracy:.4f}") |
| |
| if accuracy > 0.7: |
| print("WARNING: Significant covariate shift detected!") |
| elif accuracy > 0.55: |
| print("MODERATE: Some covariate shift present") |
| else: |
| print("LOW: Minimal covariate shift") |
| |
| return { |
| 'classifier_accuracy': accuracy, |
| 'shift_severity': 'high' if accuracy > 0.7 else |
| 'moderate' if accuracy > 0.55 else 'low' |
| } |
| ``` |
|
|
| --- |
|
|
| ## Calibration and Confidence Estimation |
|
|
| ```python |
| from sklearn.calibration import calibration_curve |
| import matplotlib.pyplot as plt |
| |
| class CalibrationAnalyzer: |
| def __init__(self, model): |
| self.model = model |
| |
| def get_predicted_probabilities(self, texts): |
| """Get predicted probabilities from model.""" |
| return self.model.predict_proba(texts) |
| |
| def plot_calibration_curve(self, texts, labels, n_bins=10): |
| """ |
| Plot reliability diagram (calibration curve). |
| """ |
| probs = self.get_predicted_probabilities(texts) |
| |
| # For binary classification |
| if probs.shape[1] == 2: |
| prob_positive = probs[:, 1] |
| else: |
| # Use max probability for multiclass |
| prob_positive = np.max(probs, axis=1) |
| |
| # Calculate calibration curve |
| fraction_of_positives, mean_predicted_value = calibration_curve( |
| labels, prob_positive, n_bins=n_bins |
| ) |
| |
| # Plot |
| fig, ax = plt.subplots(figsize=(8, 8)) |
| |
| ax.plot(mean_predicted_value, fraction_of_positives, 's-', |
| label='Model', markersize=10) |
| ax.plot([0, 1], [0, 1], 'k--', label='Perfectly calibrated') |
| |
| ax.set_xlabel('Mean Predicted Probability') |
| ax.set_ylabel('Fraction of Positives') |
| ax.set_title('Calibration Curve (Reliability Diagram)') |
| ax.legend(loc='upper left') |
| ax.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| plt.savefig('calibration_curve.png') |
| plt.show() |
| |
| # Calculate Expected Calibration Error (ECE) |
| ece = self.calculate_ece(labels, prob_positive, n_bins) |
| |
| return { |
| 'fraction_of_positives': fraction_of_positives, |
| 'mean_predicted_value': mean_predicted_value, |
| 'ece': ece |
| } |
| |
| def calculate_ece(self, labels, probabilities, n_bins=10): |
| """ |
| Calculate Expected Calibration Error. |
| |
| ECE measures the weighted average difference between predicted |
| confidence and actual accuracy across bins. |
| """ |
| bin_boundaries = np.linspace(0, 1, n_bins + 1) |
| ece = 0.0 |
| |
| for i in range(n_bins): |
| # Find samples in this bin |
| in_bin = (probabilities > bin_boundaries[i]) & \ |
| (probabilities <= bin_boundaries[i + 1]) |
| |
| prop_in_bin = np.mean(in_bin) |
| |
| if prop_in_bin > 0: |
| # Average confidence in bin |
| avg_confidence = np.mean(probabilities[in_bin]) |
| |
| # Actual accuracy in bin |
| avg_accuracy = np.mean(labels[in_bin] == |
| (probabilities[in_bin] > 0.5).astype(int)) |
| |
| # Weighted difference |
| ece += np.abs(avg_accuracy - avg_confidence) * prop_in_bin |
| |
| print(f"Expected Calibration Error (ECE): {ece:.4f}") |
| |
| return ece |
| |
| def temperature_scaling(self, val_texts, val_labels): |
| """ |
| Apply temperature scaling to improve calibration. |
| |
| Find optimal temperature T that minimizes NLL on validation set. |
| """ |
| from scipy.optimize import minimize_scalar |
| |
| logits = self.model.get_logits(val_texts) |
| |
| def nll_loss(T): |
| scaled_logits = logits / T |
| probs = softmax(scaled_logits, axis=1) |
| |
| # Negative log likelihood |
| nll = -np.mean(np.log(probs[np.arange(len(val_labels)), val_labels])) |
| return nll |
| |
| # Find optimal temperature |
| result = minimize_scalar(nll_loss, bounds=(0.1, 10.0), method='bounded') |
| optimal_T = result.x |
| |
| print(f"Optimal Temperature: {optimal_T:.4f}") |
| print(f"NLL before scaling: {nll_loss(1.0):.4f}") |
| print(f"NLL after scaling: {nll_loss(optimal_T):.4f}") |
| |
| return optimal_T |
| |
| def apply_temperature(self, texts, temperature): |
| """Apply temperature scaling to predictions.""" |
| logits = self.model.get_logits(texts) |
| scaled_logits = logits / temperature |
| calibrated_probs = softmax(scaled_logits, axis=1) |
| |
| return calibrated_probs |
| ``` |
|
|
| --- |
|
|
| ## A/B Testing Framework |
|
|
| ```python |
| import pandas as pd |
| from datetime import datetime, timedelta |
| |
| class ABTestingFramework: |
| def __init__(self, experiment_name): |
| self.experiment_name = experiment_name |
| self.results = [] |
| |
| def design_experiment(self, control_model, treatment_model, |
| sample_size, duration_days, metrics): |
| """ |
| Design A/B test with proper power analysis. |
| """ |
| from statsmodels.stats.power import zt_ind_solve_power |
| |
| # Power analysis |
| effect_size = 0.1 # Minimum detectable effect (10%) |
| alpha = 0.05 |
| power = 0.8 |
| |
| required_n = zt_ind_solve_power( |
| effect_size=effect_size, |
| alpha=alpha, |
| power=power, |
| ratio=1 # Equal split |
| ) |
| |
| required_n = int(np.ceil(required_n)) |
| |
| print(f"A/B Test Design: {self.experiment_name}") |
| print(f"Required samples per variant: {required_n}") |
| print(f"Total required: {required_n * 2}") |
| print(f"Planned sample size: {sample_size}") |
| print(f"Duration: {duration_days} days") |
| |
| if sample_size < required_n: |
| print(f"WARNING: Planned sample size may be underpowered!") |
| |
| return { |
| 'required_per_variant': required_n, |
| 'planned_per_variant': sample_size // 2, |
| 'effect_size': effect_size, |
| 'alpha': alpha, |
| 'power': power |
| } |
| |
| def assign_users(self, user_ids, assignment_ratio=0.5): |
| """ |
| Randomly assign users to control or treatment. |
| """ |
| np.random.seed(42) # Reproducible assignment |
| |
| assignments = np.random.rand(len(user_ids)) < assignment_ratio |
| user_assignments = { |
| uid: 'treatment' if assign else 'control' |
| for uid, assign in zip(user_ids, assignments) |
| } |
| |
| # Verify balance |
| n_control = sum(1 for v in user_assignments.values() if v == 'control') |
| n_treatment = len(user_ids) - n_control |
| |
| print(f"User Assignment:") |
| print(f"Control: {n_control} ({n_control/len(user_ids)*100:.1f}%)") |
| print(f"Treatment: {n_treatment} ({n_treatment/len(user_ids)*100:.1f}%)") |
| |
| return user_assignments |
| |
| def collect_metrics(self, interactions, user_assignments): |
| """ |
| Collect and aggregate metrics from user interactions. |
| """ |
| # Add assignment to interactions |
| df = pd.DataFrame(interactions) |
| df['variant'] = df['user_id'].map(user_assignments) |
| |
| # Aggregate by variant |
| results = {} |
| |
| for variant in ['control', 'treatment']: |
| variant_data = df[df['variant'] == variant] |
| |
| variant_results = { |
| 'n_users': variant_data['user_id'].nunique(), |
| 'n_interactions': len(variant_data), |
| } |
| |
| # Calculate each metric |
| for metric in ['accuracy', 'latency', 'user_satisfaction']: |
| if metric in variant_data.columns: |
| variant_results[f'{metric}_mean'] = variant_data[metric].mean() |
| variant_results[f'{metric}_std'] = variant_data[metric].std() |
| |
| results[variant] = variant_results |
| |
| return results |
| |
| def analyze_results(self, control_data, treatment_data, metric_name): |
| """ |
| Analyze A/B test results with statistical testing. |
| """ |
| control_values = np.array(control_data) |
| treatment_values = np.array(treatment_data) |
| |
| # Difference in means |
| diff = np.mean(treatment_values) - np.mean(control_values) |
| |
| # Two-sample t-test |
| t_stat, p_value = stats.ttest_ind(treatment_values, control_values) |
| |
| # Confidence interval for difference |
| pooled_se = np.sqrt(np.var(control_values)/len(control_values) + |
| np.var(treatment_values)/len(treatment_values)) |
| ci_lower = diff - 1.96 * pooled_se |
| ci_upper = diff + 1.96 * pooled_se |
| |
| # Effect size (Cohen's d) |
| pooled_std = np.sqrt((np.var(control_values) + np.var(treatment_values)) / 2) |
| cohens_d = diff / pooled_std |
| |
| print(f"\nA/B Test Analysis: {metric_name}") |
| print(f"Control Mean: {np.mean(control_values):.4f}") |
| print(f"Treatment Mean: {np.mean(treatment_values):.4f}") |
| print(f"Difference: {diff:.4f}") |
| print(f"95% CI: [{ci_lower:.4f}, {ci_upper:.4f}]") |
| print(f"T-statistic: {t_stat:.4f}") |
| print(f"P-value: {p_value:.6f}") |
| print(f"Cohen's d: {cohens_d:.4f}") |
| |
| # Interpretation |
| if p_value < 0.05: |
| direction = "better" if diff > 0 else "worse" |
| print(f"Result: Treatment is statistically significantly {direction}") |
| |
| if abs(cohens_d) < 0.2: |
| print("Effect size: Small") |
| elif abs(cohens_d) < 0.5: |
| print("Effect size: Medium") |
| else: |
| print("Effect size: Large") |
| else: |
| print("Result: No statistically significant difference") |
| |
| return { |
| 'difference': diff, |
| 'p_value': p_value, |
| 'ci_lower': ci_lower, |
| 'ci_upper': ci_upper, |
| 'cohens_d': cohens_d, |
| 'significant': p_value < 0.05 |
| } |
| |
| def sequential_testing(self, daily_results, stopping_rule='pocket'): |
| """ |
| Sequential A/B testing with early stopping. |
| |
| Allows monitoring and early stopping if results are clear. |
| """ |
| cumulative_control = [] |
| cumulative_treatment = [] |
| |
| decisions = [] |
| |
| for day, day_data in enumerate(daily_results): |
| cumulative_control.extend(day_data['control']) |
| cumulative_treatment.extend(day_data['treatment']) |
| |
| # Daily analysis |
| if len(cumulative_control) > 100 and len(cumulative_treatment) > 100: |
| result = self.analyze_results( |
| cumulative_control, |
| cumulative_treatment, |
| 'primary_metric' |
| ) |
| |
| # Stopping rule |
| if result['p_value'] < 0.01: # Strong evidence |
| decision = 'STOP_EARLY' |
| elif day >= len(daily_results) - 1: |
| decision = 'CONCLUDE' |
| else: |
| decision = 'CONTINUE' |
| |
| decisions.append({ |
| 'day': day, |
| 'decision': decision, |
| 'p_value': result['p_value'] |
| }) |
| |
| print(f"Day {day}: p={result['p_value']:.6f}, Decision: {decision}") |
| |
| if decision == 'STOP_EARLY': |
| print("Early stopping triggered!") |
| break |
| |
| return decisions |
| ``` |
|
|
| --- |
|
|
| ## Regression Testing for Models |
|
|
| ```python |
| import json |
| import hashlib |
| |
| class ModelRegressionTester: |
| def __init__(self, baseline_model_path, test_suite_path): |
| """ |
| Initialize regression testing framework. |
| """ |
| self.baseline_model = self.load_model(baseline_model_path) |
| self.test_suite = self.load_test_suite(test_suite_path) |
| self.baseline_results = self.run_baseline() |
| |
| def load_test_suite(self, path): |
| """Load or create test suite.""" |
| try: |
| with open(path, 'r') as f: |
| test_suite = json.load(f) |
| except FileNotFoundError: |
| # Create default test suite |
| test_suite = { |
| 'functional_tests': [], |
| 'edge_cases': [], |
| 'performance_tests': [], |
| 'bias_tests': [] |
| } |
| |
| return test_suite |
| |
| def add_functional_test(self, name, input_text, expected_output, |
| tolerance=0.0): |
| """Add a functional test case.""" |
| test_case = { |
| 'name': name, |
| 'input': input_text, |
| 'expected': expected_output, |
| 'tolerance': tolerance, |
| 'type': 'functional' |
| } |
| |
| self.test_suite['functional_tests'].append(test_case) |
| self.save_test_suite() |
| |
| def add_edge_case(self, name, input_text, expected_behavior): |
| """Add an edge case test.""" |
| test_case = { |
| 'name': name, |
| 'input': input_text, |
| 'expected_behavior': expected_behavior, |
| 'type': 'edge_case' |
| } |
| |
| self.test_suite['edge_cases'].append(test_case) |
| self.save_test_suite() |
| |
| def save_test_suite(self): |
| """Save test suite to file.""" |
| with open('model_test_suite.json', 'w') as f: |
| json.dump(self.test_suite, f, indent=2) |
| |
| def run_baseline(self): |
| """Run test suite on baseline model.""" |
| results = {} |
| |
| # Functional tests |
| functional_results = [] |
| for test in self.test_suite['functional_tests']: |
| prediction = self.baseline_model.predict([test['input']])[0] |
| expected = test['expected'] |
| |
| # Check if within tolerance |
| if isinstance(expected, (int, float)): |
| passed = abs(prediction - expected) <= test['tolerance'] |
| else: |
| passed = prediction == expected |
| |
| functional_results.append({ |
| 'name': test['name'], |
| 'passed': passed, |
| 'prediction': prediction, |
| 'expected': expected |
| }) |
| |
| results['functional'] = functional_results |
| |
| # Edge cases |
| edge_results = [] |
| for test in self.test_suite['edge_cases']: |
| try: |
| prediction = self.baseline_model.predict([test['input']])[0] |
| behavior = self.check_expected_behavior(prediction, test['expected_behavior']) |
| |
| edge_results.append({ |
| 'name': test['name'], |
| 'passed': behavior, |
| 'prediction': prediction |
| }) |
| except Exception as e: |
| edge_results.append({ |
| 'name': test['name'], |
| 'passed': False, |
| 'error': str(e) |
| }) |
| |
| results['edge_cases'] = edge_results |
| |
| # Save baseline results |
| with open('baseline_regression_results.json', 'w') as f: |
| json.dump(results, f, indent=2) |
| |
| return results |
| |
| def check_expected_behavior(self, prediction, expected_behavior): |
| """Check if prediction matches expected behavior.""" |
| if expected_behavior == 'high_confidence': |
| return prediction['confidence'] > 0.9 |
| elif expected_behavior == 'uncertain': |
| return prediction['confidence'] < 0.6 |
| elif expected_behavior == 'specific_class': |
| return prediction['class'] == expected_behavior['class'] |
| else: |
| return False |
| |
| def run_regression_test(self, new_model_path): |
| """ |
| Run regression tests on new model and compare to baseline. |
| """ |
| new_model = self.load_model(new_model_path) |
| |
| regressions = [] |
| improvements = [] |
| |
| # Compare functional tests |
| for baseline_result in self.baseline_results['functional']: |
| test_name = baseline_result['name'] |
| |
| # Find corresponding test |
| test = next(t for t in self.test_suite['functional_tests'] |
| if t['name'] == test_name) |
| |
| # Run on new model |
| new_prediction = new_model.predict([test['input']])[0] |
| expected = test['expected'] |
| |
| if isinstance(expected, (int, float)): |
| new_passed = abs(new_prediction - expected) <= test['tolerance'] |
| else: |
| new_passed = new_prediction == expected |
| |
| # Compare |
| if baseline_result['passed'] and not new_passed: |
| regressions.append({ |
| 'test': test_name, |
| 'type': 'functional', |
| 'baseline': baseline_result['prediction'], |
| 'new': new_prediction, |
| 'expected': expected |
| }) |
| elif not baseline_result['passed'] and new_passed: |
| improvements.append({ |
| 'test': test_name, |
| 'type': 'functional', |
| 'baseline': baseline_result['prediction'], |
| 'new': new_prediction |
| }) |
| |
| # Report |
| print(f"\n{'='*60}") |
| print("REGRESSION TEST RESULTS") |
| print(f"{'='*60}") |
| print(f"Total Tests: {len(self.baseline_results['functional'])}") |
| print(f"Regressions: {len(regressions)}") |
| print(f"Improvements: {len(improvements)}") |
| |
| if regressions: |
| print(f"\n⚠️ REGRESSIONS DETECTED:") |
| for reg in regressions: |
| print(f" - {reg['test']}: {reg['baseline']} → {reg['new']}") |
| print(f" Expected: {reg['expected']}") |
| |
| if improvements: |
| print(f"\n✅ IMPROVEMENTS:") |
| for imp in improvements: |
| print(f" - {imp['test']}: {imp['baseline']} → {imp['new']}") |
| |
| return { |
| 'regressions': regressions, |
| 'improvements': improvements, |
| 'passed': len(regressions) == 0 |
| } |
| |
| def generate_regression_report(self, results): |
| """Generate detailed regression report.""" |
| report = f""" |
| # Model Regression Test Report |
| |
| ## Summary |
| - **Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| - **Total Tests**: {len(self.baseline_results['functional']) + len(self.baseline_results['edge_cases'])} |
| - **Regressions**: {len(results['regressions'])} |
| - **Improvements**: {len(results['improvements'])} |
| - **Status**: {'✅ PASS' if results['passed'] else '❌ FAIL'} |
| |
| ## Regressions |
| """ |
| |
| for reg in results['regressions']: |
| report += f""" |
| ### {reg['test']} |
| - Type: {reg['type']} |
| - Baseline: {reg['baseline']} |
| - New: {reg['new']} |
| - Expected: {reg['expected']} |
| """ |
| |
| return report |
| ``` |
|
|
| --- |
|
|
| ## Quality Gates and Release Criteria |
|
|
| ```python |
| class QualityGateChecker: |
| def __init__(self, release_config): |
| """ |
| Initialize quality gate checker with release criteria. |
| |
| release_config example: |
| { |
| 'min_accuracy': 0.85, |
| 'max_fairness_disparity': 0.1, |
| 'min_robustness_consistency': 0.9, |
| 'max_calibration_ece': 0.05, |
| 'no_regressions': True, |
| 'min_test_coverage': 0.95 |
| } |
| """ |
| self.config = release_config |
| |
| def check_all_gates(self, model_metrics): |
| """ |
| Check all quality gates for release readiness. |
| """ |
| gate_results = {} |
| all_passed = True |
| |
| # Accuracy gate |
| if 'min_accuracy' in self.config: |
| passed = model_metrics['accuracy'] >= self.config['min_accuracy'] |
| gate_results['accuracy'] = { |
| 'passed': passed, |
| 'value': model_metrics['accuracy'], |
| 'threshold': self.config['min_accuracy'] |
| } |
| if not passed: |
| all_passed = False |
| |
| # Fairness gate |
| if 'max_fairness_disparity' in self.config: |
| passed = model_metrics['fairness_disparity'] <= self.config['max_fairness_disparity'] |
| gate_results['fairness'] = { |
| 'passed': passed, |
| 'value': model_metrics['fairness_disparity'], |
| 'threshold': self.config['max_fairness_disparity'] |
| } |
| if not passed: |
| all_passed = False |
| |
| # Robustness gate |
| if 'min_robustness_consistency' in self.config: |
| passed = model_metrics['robustness_consistency'] >= self.config['min_robustness_consistency'] |
| gate_results['robustness'] = { |
| 'passed': passed, |
| 'value': model_metrics['robustness_consistency'], |
| 'threshold': self.config['min_robustness_consistency'] |
| } |
| if not passed: |
| all_passed = False |
| |
| # Calibration gate |
| if 'max_calibration_ece' in self.config: |
| passed = model_metrics['calibration_ece'] <= self.config['max_calibration_ece'] |
| gate_results['calibration'] = { |
| 'passed': passed, |
| 'value': model_metrics['calibration_ece'], |
| 'threshold': self.config['max_calibration_ece'] |
| } |
| if not passed: |
| all_passed = False |
| |
| # Regression gate |
| if 'no_regressions' in self.config and self.config['no_regressions']: |
| passed = model_metrics.get('regressions', 0) == 0 |
| gate_results['regressions'] = { |
| 'passed': passed, |
| 'value': model_metrics.get('regressions', 0), |
| 'threshold': 0 |
| } |
| if not passed: |
| all_passed = False |
| |
| # Print report |
| print(f"\n{'='*60}") |
| print("QUALITY GATE CHECK") |
| print(f"{'='*60}") |
| |
| for gate, result in gate_results.items(): |
| status = "✅ PASS" if result['passed'] else "❌ FAIL" |
| print(f"{gate.upper():<20} {status}") |
| print(f" Value: {result['value']:.4f}, Threshold: {result['threshold']:.4f}") |
| |
| print(f"\n{'='*60}") |
| overall_status = "✅ READY FOR RELEASE" if all_passed else "❌ NOT READY FOR RELEASE" |
| print(f"OVERALL: {overall_status}") |
| print(f"{'='*60}") |
| |
| return { |
| 'all_passed': all_passed, |
| 'gate_results': gate_results |
| } |
| |
| def generate_release_report(self, model_info, metrics, gate_results): |
| """Generate comprehensive release report.""" |
| report = f""" |
| # Model Release Report |
| |
| ## Model Information |
| - **Name**: {model_info['name']} |
| - **Version**: {model_info['version']} |
| - **Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
| - **Training Data**: {model_info['training_data']} |
| - **Architecture**: {model_info['architecture']} |
| |
| ## Performance Metrics |
| - **Accuracy**: {metrics['accuracy']:.4f} |
| - **Precision**: {metrics['precision']:.4f} |
| - **Recall**: {metrics['recall']:.4f} |
| - **F1 Score**: {metrics['f1']:.4f} |
| - **AUC-ROC**: {metrics['auc_roc']:.4f} |
| |
| ## Quality Attributes |
| - **Fairness Disparity**: {metrics['fairness_disparity']:.4f} |
| - **Robustness Consistency**: {metrics['robustness_consistency']:.4f} |
| - **Calibration ECE**: {metrics['calibration_ece']:.4f} |
| - **Regressions**: {metrics.get('regressions', 0)} |
| |
| ## Quality Gate Results |
| """ |
| |
| for gate, result in gate_results['gate_results'].items(): |
| status = "✅ PASS" if result['passed'] else "❌ FAIL" |
| report += f"- {gate.upper()}: {status}\n" |
| |
| report += f""" |
| ## Release Decision |
| {'✅ APPROVED FOR RELEASE' if gate_results['all_passed'] else '❌ RELEASE BLOCKED'} |
| |
| ## Notes |
| {model_info.get('notes', 'No additional notes')} |
| """ |
| |
| return report |
| ``` |
|
|
| --- |
|
|
| ## Complete Validation Pipeline Example |
|
|
| ```python |
| def complete_validation_pipeline(model, train_data, val_data, test_data, |
| sensitive_attributes=None): |
| """ |
| Run complete validation pipeline before model release. |
| """ |
| print("="*70) |
| print("COMPLETE MODEL VALIDATION PIPELINE") |
| print("="*70) |
| |
| results = {} |
| |
| # 1. Basic Performance Evaluation |
| print("\n[1/8] Basic Performance Evaluation") |
| test_preds = model.predict(test_data['texts']) |
| test_metrics = calculate_metrics(test_preds, test_data['labels']) |
| results['basic_metrics'] = test_metrics |
| |
| # 2. Cross-Validation |
| print("\n[2/8] Cross-Validation") |
| cv_results, _ = k_fold_cross_validation( |
| type(model), model.config, |
| train_data['texts'], train_data['labels'], |
| k=5 |
| ) |
| results['cross_validation'] = cv_results |
| |
| # 3. Robustness Testing |
| print("\n[3/8] Robustness Testing") |
| tester = RobustnessTester(model, model.tokenizer) |
| robustness_results = tester.comprehensive_robustness_suite( |
| test_data['texts'], test_data['labels'] |
| ) |
| results['robustness'] = robustness_results |
| |
| # 4. Fairness Audit (if sensitive attributes provided) |
| if sensitive_attributes: |
| print("\n[4/8] Fairness Audit") |
| auditor = FairnessAuditor(test_preds, test_data['labels'], sensitive_attributes) |
| fairness_report = auditor.generate_fairness_report() |
| results['fairness'] = fairness_report |
| else: |
| print("\n[4/8] Fairness Audit: SKIPPED (no sensitive attributes)") |
| |
| # 5. Calibration Analysis |
| print("\n[5/8] Calibration Analysis") |
| calibrator = CalibrationAnalyzer(model) |
| calibration_results = calibrator.plot_calibration_curve( |
| test_data['texts'], test_data['labels'] |
| ) |
| results['calibration'] = calibration_results |
| |
| # 6. Domain Shift Detection (if target data available) |
| # Skip for now |
| |
| # 7. Statistical Significance (if baseline available) |
| # Skip for now |
| |
| # 8. Quality Gate Check |
| print("\n[8/8] Quality Gate Check") |
| |
| # Prepare metrics for quality gates |
| gate_metrics = { |
| 'accuracy': test_metrics['accuracy'], |
| 'fairness_disparity': max( |
| [v['max_disparity'] for k, v in results.get('fairness', {}).items()] |
| ) if 'fairness' in results else 0.0, |
| 'robustness_consistency': np.mean([ |
| v['consistency'] for v in robustness_results.values() |
| ]), |
| 'calibration_ece': calibration_results['ece'], |
| 'regressions': 0 # Would check against baseline |
| } |
| |
| release_config = { |
| 'min_accuracy': 0.80, |
| 'max_fairness_disparity': 0.15, |
| 'min_robustness_consistency': 0.85, |
| 'max_calibration_ece': 0.10, |
| 'no_regressions': True |
| } |
| |
| gate_checker = QualityGateChecker(release_config) |
| gate_results = gate_checker.check_all_gates(gate_metrics) |
| results['quality_gates'] = gate_results |
| |
| # Final Summary |
| print("\n" + "="*70) |
| print("VALIDATION SUMMARY") |
| print("="*70) |
| print(f"Basic Accuracy: {test_metrics['accuracy']:.4f}") |
| print(f"CV Accuracy: {cv_results['accuracy']['mean']:.4f} ± {cv_results['accuracy']['std']:.4f}") |
| print(f"Avg Robustness: {gate_metrics['robustness_consistency']:.4f}") |
| if 'fairness' in results: |
| print(f"Max Fairness Disparity: {gate_metrics['fairness_disparity']:.4f}") |
| print(f"Calibration ECE: {gate_metrics['calibration_ece']:.4f}") |
| print(f"\nRelease Status: {'✅ APPROVED' if gate_results['all_passed'] else '❌ BLOCKED'}") |
| |
| return results |
| |
| # Usage |
| # validation_results = complete_validation_pipeline( |
| # model, train_data, val_data, test_data, |
| # sensitive_attributes={'gender': gender_array, 'age': age_array} |
| # ) |
| ``` |
|
|
| --- |
|
|
| ## Best Practices Checklist |
|
|
| ### Pre-Release Validation Checklist |
|
|
| - [ ] **Data Splits**: Proper train/val/test separation with no leakage |
| - [ ] **Cross-Validation**: K-fold CV completed with consistent results |
| - [ ] **Statistical Power**: Test set size sufficient for desired confidence |
| - [ ] **Performance Metrics**: All primary metrics meet thresholds |
| - [ ] **Fairness Audit**: No significant bias across protected groups |
| - [ ] **Robustness Testing**: Model stable under perturbations |
| - [ ] **Calibration**: Predictions well-calibrated (ECE < threshold) |
| - [ ] **Edge Cases**: Critical edge cases handled correctly |
| - [ ] **Regression Tests**: No regressions from baseline |
| - [ ] **Documentation**: All validation results documented |
|
|
| ### Continuous Validation |
|
|
| - [ ] **Automated Testing**: Validation suite runs on every commit |
| - [ ] **Monitoring**: Production performance tracked continuously |
| - [ ] **Drift Detection**: Data drift monitored and alerted |
| - [ ] **Periodic Re-evaluation**: Full validation quarterly |
| - [ ] **Incident Response**: Process for handling validation failures |
|
|
| --- |
|
|
| ## Next Steps |
|
|
| In the next tutorial, we'll cover: |
| - **Continual Learning**: Strategies for updating models with new data |
| - **Catastrophic Forgetting Prevention**: Techniques to retain old knowledge |
| - **Incremental Training**: Efficient updates without full retraining |
| - **Version Management**: Model versioning and rollback strategies |
| - **Production Deployment**: Serving, scaling, and monitoring |
|
|