Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.pipeline import make_pipeline | |
| from sklearn.metrics import roc_auc_score, brier_score_loss | |
| from sklearn.calibration import CalibratedClassifierCV | |
| from sklearn.covariance import EmpiricalCovariance | |
| from scipy.stats import entropy | |
| import joblib | |
| class SimpleClassifier: | |
| def __init__(self, feature_dropout_rate=0.0, model_type='logistic'): | |
| """ | |
| Initialize classifier. | |
| Args: | |
| feature_dropout_rate: Rate for feature dropout (0.0 to 1.0) | |
| model_type: 'logistic' (default) or 'random_forest' | |
| """ | |
| self.model_type = model_type | |
| if model_type == 'logistic': | |
| # Pipeline: StandardScaler -> LogisticRegression | |
| self.model = make_pipeline( | |
| StandardScaler(), | |
| LogisticRegression( | |
| random_state=42, | |
| solver='liblinear', | |
| class_weight='balanced' | |
| ) | |
| ) | |
| elif model_type == 'random_forest': | |
| # Random Forest (scaling optional but helps) | |
| self.model = make_pipeline( | |
| StandardScaler(), | |
| RandomForestClassifier( | |
| n_estimators=100, | |
| max_depth=10, | |
| min_samples_split=5, | |
| min_samples_leaf=2, | |
| class_weight='balanced', | |
| random_state=42, | |
| n_jobs=-1 | |
| ) | |
| ) | |
| else: | |
| raise ValueError(f"Unknown model_type: {model_type}. Use 'logistic' or 'random_forest'") | |
| self.calibrated_model = None | |
| self.feature_dropout_rate = feature_dropout_rate | |
| # OOD detectors | |
| self.ood_real = None | |
| self.ood_fake = None | |
| self.scaler_ood = StandardScaler() | |
| self.ood_threshold = None # will be set during training | |
| self.ood_threshold_method = 'validation' # 'validation' or 'training' | |
| self.ood_target_fpr = 0.01 # Target false positive rate (1%) | |
| def _apply_feature_dropout(self, X): | |
| """Randomly zero out features to force robustness.""" | |
| if self.feature_dropout_rate <= 0: | |
| return X | |
| X_dropped = X.copy() | |
| n_samples, n_features = X.shape | |
| mask = np.random.binomial( | |
| 1, | |
| 1 - self.feature_dropout_rate, | |
| size=(n_samples, n_features) | |
| ) | |
| return X_dropped * mask | |
| def train(self, X, y, X_val=None, y_val=None, ood_threshold_method='validation', | |
| ood_target_fpr=0.01): | |
| """ | |
| Train the model. | |
| Args: | |
| X (np.ndarray): Feature matrix (training set). | |
| y (np.ndarray): Labels (training set). | |
| X_val (np.ndarray, optional): Validation set features for uncertainty threshold tuning. | |
| y_val (np.ndarray, optional): Validation set labels. | |
| ood_threshold_method (str): Method for setting OOD detection threshold: | |
| - 'validation': Use validation set (recommended, prevents overfitting) | |
| - 'training': Use training set (legacy, may overfit) | |
| ood_target_fpr (float): Target false positive rate for OOD flagging (default: 0.01 = 1%) | |
| Note: The validation set is created from the same training data (same generators, | |
| same distribution), so OOD detection cannot be validated as true out-of-distribution | |
| detection without proper evaluation data. See OOD_EVALUATION_LIMITATION.md. | |
| """ | |
| self.ood_threshold_method = ood_threshold_method | |
| self.ood_target_fpr = ood_target_fpr | |
| # Fit OOD detectors on scaled training features | |
| self.scaler_ood.fit(X) | |
| X_scaled = self.scaler_ood.transform(X) | |
| # Fit Gaussians for real and fake classes (on training data) | |
| self.ood_real = EmpiricalCovariance().fit(X_scaled[y == 0]) | |
| self.ood_fake = EmpiricalCovariance().fit(X_scaled[y == 1]) | |
| # Compute OOD detection threshold | |
| if ood_threshold_method == 'validation' and X_val is not None: | |
| # Use validation set to tune threshold (prevents overfitting) | |
| # This is the recommended approach per OOD detection best practices | |
| # Note: Validation set is in-distribution, so OOD detection cannot be validated | |
| # as true out-of-distribution detection without proper evaluation data | |
| X_val_scaled = self.scaler_ood.transform(X_val) | |
| dist_real_val = self.ood_real.mahalanobis(X_val_scaled) | |
| dist_fake_val = self.ood_fake.mahalanobis(X_val_scaled) | |
| dist_min_val = np.minimum(dist_real_val, dist_fake_val) | |
| # Set threshold to achieve target FPR on validation set | |
| # FPR = fraction of validation samples flagged as OOD | |
| threshold_percentile = (1.0 - ood_target_fpr) * 100 | |
| self.ood_threshold = float(np.quantile(dist_min_val, threshold_percentile / 100.0)) | |
| # Report actual FPR achieved | |
| actual_fpr = np.mean(dist_min_val > self.ood_threshold) | |
| print(f"OOD detection threshold (validation): {self.ood_threshold:.4f} " | |
| f"(target FPR={ood_target_fpr:.1%}, actual FPR={actual_fpr:.1%})") | |
| print(f" Note: Validation set is in-distribution; OOD detection cannot be validated without proper evaluation data") | |
| else: | |
| # Fallback: use training set (legacy method, may overfit) | |
| dist_real_train = self.ood_real.mahalanobis(X_scaled) | |
| dist_fake_train = self.ood_fake.mahalanobis(X_scaled) | |
| dist_min_train = np.minimum(dist_real_train, dist_fake_train) | |
| threshold_percentile = (1.0 - ood_target_fpr) * 100 | |
| self.ood_threshold = float(np.quantile(dist_min_train, threshold_percentile / 100.0)) | |
| if ood_threshold_method == 'validation': | |
| print(f"⚠️ Warning: Validation set not provided, using training set for OOD detection threshold") | |
| print(f"OOD detection threshold (training): {self.ood_threshold:.4f} " | |
| f"(target FPR={ood_target_fpr:.1%})") | |
| # Apply feature dropout for classifier training (optional) | |
| X_train = self._apply_feature_dropout(X) | |
| # CalibratedClassifierCV handles calibration internally (sigmoid) | |
| self.calibrated_model = CalibratedClassifierCV( | |
| self.model, | |
| method='sigmoid', | |
| cv=3 | |
| ) | |
| self.calibrated_model.fit(X_train, y) | |
| def predict_proba(self, X): | |
| """ | |
| Predict probabilities of class 1 (fake). | |
| """ | |
| if self.calibrated_model is None: | |
| raise ValueError("Model not trained yet.") | |
| return self.calibrated_model.predict_proba(X)[:, 1] | |
| def predict_uncertainty(self, X): | |
| """ | |
| Predict with OOD detection and uncertainty estimation. | |
| Implements multiple uncertainty signals: | |
| 1. Mahalanobis distance-based OOD detection (geometric) | |
| 2. Entropy-based uncertainty (predictive) | |
| 3. Max probability uncertainty (confidence-based) | |
| Note: This attempts OOD detection (identifying samples far from training distribution), | |
| but cannot be validated as true out-of-distribution detection without proper | |
| evaluation data (see OOD_EVALUATION_LIMITATION.md). The validation set used for | |
| threshold tuning is in-distribution (same generators, same distribution). | |
| References: | |
| - Lee et al. "A Simple Unified Framework for Detecting Out-of-Distribution | |
| Samples and Adversarial Attacks" (NeurIPS 2018) - Mahalanobis distance | |
| - Hendrycks & Gimpel "A Baseline for Detecting Misclassified and | |
| Out-of-Distribution Examples in Neural Networks" (ICLR 2017) - Entropy | |
| Returns: | |
| dict with: | |
| 'probs' : np.ndarray of P(fake) | |
| 'dist_real' : Mahalanobis distance to real cluster | |
| 'dist_fake' : Mahalanobis distance to fake cluster | |
| 'dist_min' : min distance to either cluster | |
| 'is_ood' : boolean mask (True if high-uncertainty/anomalous by Mahalanobis) | |
| 'entropy' : Predictive entropy (higher = more uncertain) | |
| 'max_prob' : Maximum class probability (lower = more uncertain) | |
| 'uncertainty_score': Combined uncertainty score [0, 1] | |
| """ | |
| if self.ood_real is None: | |
| raise ValueError("Model not trained yet.") | |
| probs = self.predict_proba(X) | |
| X_scaled = self.scaler_ood.transform(X) | |
| # Mahalanobis distance-based OOD detection | |
| dist_real = self.ood_real.mahalanobis(X_scaled) | |
| dist_fake = self.ood_fake.mahalanobis(X_scaled) | |
| dist_min = np.minimum(dist_real, dist_fake) | |
| is_ood = None | |
| if self.ood_threshold is not None: | |
| is_ood = dist_min > self.ood_threshold | |
| # Entropy-based uncertainty (predictive uncertainty) | |
| # Entropy = -sum(p_i * log(p_i)) for binary classification | |
| # Higher entropy = more uncertain predictions | |
| probs_2d = np.column_stack([1 - probs, probs]) # [P(real), P(fake)] | |
| predictive_entropy = np.array([entropy(p, base=2) for p in probs_2d]) | |
| # Normalize to [0, 1] (max entropy for binary = log2(2) = 1.0) | |
| entropy_normalized = predictive_entropy / 1.0 # Already normalized for binary | |
| # Max probability uncertainty (confidence-based) | |
| # Lower max probability = more uncertain | |
| max_prob = np.maximum(probs, 1 - probs) # Max of P(fake) and P(real) | |
| uncertainty_from_max_prob = 1.0 - max_prob # Invert: low prob → high uncertainty | |
| # Combined uncertainty score (weighted average) | |
| # Combines geometric (Mahalanobis) and predictive (entropy) signals | |
| uncertainty_score = 0.5 * entropy_normalized + 0.5 * uncertainty_from_max_prob | |
| # Flag high-uncertainty samples (complementary to OOD detection) | |
| # Samples with high entropy OR low max prob are uncertain | |
| high_uncertainty = (entropy_normalized > 0.5) | (uncertainty_from_max_prob > 0.5) | |
| return { | |
| 'probs': probs, | |
| 'dist_real': dist_real, | |
| 'dist_fake': dist_fake, | |
| 'dist_min': dist_min, | |
| 'is_ood': is_ood, | |
| 'entropy': predictive_entropy, | |
| 'entropy_normalized': entropy_normalized, | |
| 'max_prob': max_prob, | |
| 'uncertainty_score': uncertainty_score, | |
| 'high_uncertainty': high_uncertainty | |
| } | |
| def evaluate_ood_detection(self, X, y_true, is_ood_true=None): | |
| """ | |
| Evaluate OOD detection performance. | |
| Note: This attempts OOD detection but cannot be validated as true out-of-distribution | |
| detection without proper evaluation data (see OOD_EVALUATION_LIMITATION.md). The | |
| validation set used for threshold tuning is in-distribution (same generators, same distribution). | |
| Args: | |
| X: Feature matrix | |
| y_true: True labels (0=real, 1=fake) | |
| is_ood_true: True OOD labels (optional, if available for proper evaluation) | |
| Returns: | |
| dict with OOD detection metrics (unvalidated) | |
| """ | |
| if self.ood_real is None: | |
| raise ValueError("Model not trained yet.") | |
| unc = self.predict_uncertainty(X) | |
| is_ood_pred = unc['is_ood'] | |
| metrics = {} | |
| if is_ood_true is not None: | |
| # If we have ground truth OOD labels, compute standard metrics | |
| from sklearn.metrics import precision_score, recall_score, f1_score | |
| metrics['ood_precision'] = precision_score(is_ood_true, is_ood_pred, zero_division=0) | |
| metrics['ood_recall'] = recall_score(is_ood_true, is_ood_pred, zero_division=0) | |
| metrics['ood_f1'] = f1_score(is_ood_true, is_ood_pred, zero_division=0) | |
| metrics['ood_fpr'] = np.mean(is_ood_pred[is_ood_true == False]) | |
| metrics['ood_fnr'] = np.mean(~is_ood_pred[is_ood_true == True]) | |
| else: | |
| # Without ground truth, report statistics | |
| ood_rate = np.mean(is_ood_pred) if is_ood_pred is not None else 0.0 | |
| metrics['ood_rate'] = ood_rate | |
| metrics['n_ood'] = int(np.sum(is_ood_pred)) if is_ood_pred is not None else 0 | |
| # Report uncertainty statistics | |
| metrics['mean_entropy'] = float(np.mean(unc['entropy'])) | |
| metrics['mean_uncertainty_score'] = float(np.mean(unc['uncertainty_score'])) | |
| metrics['high_uncertainty_rate'] = float(np.mean(unc['high_uncertainty'])) | |
| # Correlation between OOD flag and uncertainty | |
| if is_ood_pred is not None: | |
| ood_uncertainty = unc['uncertainty_score'][is_ood_pred] | |
| id_uncertainty = unc['uncertainty_score'][~is_ood_pred] | |
| if len(ood_uncertainty) > 0 and len(id_uncertainty) > 0: | |
| metrics['ood_mean_uncertainty'] = float(np.mean(ood_uncertainty)) | |
| metrics['id_mean_uncertainty'] = float(np.mean(id_uncertainty)) | |
| return metrics | |
| def _ece(probs, y, n_bins=10): | |
| """ | |
| Expected Calibration Error (ECE) with equal-width bins. | |
| """ | |
| probs = np.asarray(probs) | |
| y = np.asarray(y) | |
| bins = np.linspace(0.0, 1.0, n_bins + 1) | |
| ece = 0.0 | |
| n = len(y) | |
| for i in range(n_bins): | |
| idx = (probs > bins[i]) & (probs <= bins[i + 1]) | |
| if not np.any(idx): | |
| continue | |
| bin_conf = probs[idx].mean() | |
| bin_acc = y[idx].mean() | |
| ece += np.abs(bin_acc - bin_conf) * (idx.sum() / n) | |
| return ece | |
| def evaluate(self, X, y): | |
| """ | |
| Evaluate model with AUROC, Brier score, and ECE. | |
| """ | |
| probs = self.predict_proba(X) | |
| auroc = roc_auc_score(y, probs) | |
| brier = brier_score_loss(y, probs) | |
| ece = self._ece(probs, y) | |
| return { | |
| 'auroc': auroc, | |
| 'brier_score': brier, | |
| 'ece': ece | |
| } | |
| def get_local_contributions(self, X, feature_names=None): | |
| """ | |
| Compute local feature contributions for a single sample. | |
| Returns a dict or 1D array of contributions. | |
| """ | |
| if self.calibrated_model is None: | |
| raise ValueError("Model not trained yet.") | |
| n_classifiers = len(self.calibrated_model.calibrated_classifiers_) | |
| avg_contributions = None | |
| for calibrated_clf in self.calibrated_model.calibrated_classifiers_: | |
| if hasattr(calibrated_clf, 'estimator'): | |
| pipeline = calibrated_clf.estimator | |
| else: | |
| pipeline = calibrated_clf.base_estimator | |
| scaler = pipeline.named_steps['standardscaler'] | |
| clf = pipeline.named_steps['logisticregression'] | |
| X_scaled = scaler.transform(X) | |
| contributions = X_scaled * clf.coef_ | |
| if avg_contributions is None: | |
| avg_contributions = contributions | |
| else: | |
| avg_contributions += contributions | |
| avg_contributions /= n_classifiers | |
| if feature_names: | |
| return {name: val for name, val in zip(feature_names, avg_contributions[0])} | |
| return avg_contributions[0] | |
| def save(self, path): | |
| joblib.dump({ | |
| 'calibrated_model': self.calibrated_model, | |
| 'ood_real': self.ood_real, | |
| 'ood_fake': self.ood_fake, | |
| 'scaler_ood': self.scaler_ood, | |
| 'ood_threshold': self.ood_threshold, | |
| 'ood_threshold_method': self.ood_threshold_method, | |
| 'ood_target_fpr': self.ood_target_fpr | |
| }, path) | |
| def load(self, path): | |
| data = joblib.load(path) | |
| if isinstance(data, dict): | |
| self.calibrated_model = data['calibrated_model'] | |
| self.ood_real = data.get('ood_real') | |
| self.ood_fake = data.get('ood_fake') | |
| self.scaler_ood = data.get('scaler_ood') | |
| self.ood_threshold = data.get('ood_threshold') | |
| self.ood_threshold_method = data.get('ood_threshold_method', 'training') | |
| self.ood_target_fpr = data.get('ood_target_fpr', 0.01) | |
| else: | |
| # Legacy support for old saved models | |
| self.calibrated_model = data | |
| self.ood_real = None | |
| self.ood_fake = None | |
| self.ood_threshold = None | |
| self.ood_threshold_method = 'training' | |
| self.ood_target_fpr = 0.01 | |