import numpy as np from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler from sklearn.pipeline import make_pipeline from sklearn.metrics import roc_auc_score, brier_score_loss from sklearn.calibration import CalibratedClassifierCV from sklearn.covariance import EmpiricalCovariance from scipy.stats import entropy import joblib class SimpleClassifier: def __init__(self, feature_dropout_rate=0.0, model_type='logistic'): """ Initialize classifier. Args: feature_dropout_rate: Rate for feature dropout (0.0 to 1.0) model_type: 'logistic' (default) or 'random_forest' """ self.model_type = model_type if model_type == 'logistic': # Pipeline: StandardScaler -> LogisticRegression self.model = make_pipeline( StandardScaler(), LogisticRegression( random_state=42, solver='liblinear', class_weight='balanced' ) ) elif model_type == 'random_forest': # Random Forest (scaling optional but helps) self.model = make_pipeline( StandardScaler(), RandomForestClassifier( n_estimators=100, max_depth=10, min_samples_split=5, min_samples_leaf=2, class_weight='balanced', random_state=42, n_jobs=-1 ) ) else: raise ValueError(f"Unknown model_type: {model_type}. Use 'logistic' or 'random_forest'") self.calibrated_model = None self.feature_dropout_rate = feature_dropout_rate # OOD detectors self.ood_real = None self.ood_fake = None self.scaler_ood = StandardScaler() self.ood_threshold = None # will be set during training self.ood_threshold_method = 'validation' # 'validation' or 'training' self.ood_target_fpr = 0.01 # Target false positive rate (1%) def _apply_feature_dropout(self, X): """Randomly zero out features to force robustness.""" if self.feature_dropout_rate <= 0: return X X_dropped = X.copy() n_samples, n_features = X.shape mask = np.random.binomial( 1, 1 - self.feature_dropout_rate, size=(n_samples, n_features) ) return X_dropped * mask def train(self, X, y, X_val=None, y_val=None, ood_threshold_method='validation', ood_target_fpr=0.01): """ Train the model. Args: X (np.ndarray): Feature matrix (training set). y (np.ndarray): Labels (training set). X_val (np.ndarray, optional): Validation set features for uncertainty threshold tuning. y_val (np.ndarray, optional): Validation set labels. ood_threshold_method (str): Method for setting OOD detection threshold: - 'validation': Use validation set (recommended, prevents overfitting) - 'training': Use training set (legacy, may overfit) ood_target_fpr (float): Target false positive rate for OOD flagging (default: 0.01 = 1%) Note: The validation set is created from the same training data (same generators, same distribution), so OOD detection cannot be validated as true out-of-distribution detection without proper evaluation data. See OOD_EVALUATION_LIMITATION.md. """ self.ood_threshold_method = ood_threshold_method self.ood_target_fpr = ood_target_fpr # Fit OOD detectors on scaled training features self.scaler_ood.fit(X) X_scaled = self.scaler_ood.transform(X) # Fit Gaussians for real and fake classes (on training data) self.ood_real = EmpiricalCovariance().fit(X_scaled[y == 0]) self.ood_fake = EmpiricalCovariance().fit(X_scaled[y == 1]) # Compute OOD detection threshold if ood_threshold_method == 'validation' and X_val is not None: # Use validation set to tune threshold (prevents overfitting) # This is the recommended approach per OOD detection best practices # Note: Validation set is in-distribution, so OOD detection cannot be validated # as true out-of-distribution detection without proper evaluation data X_val_scaled = self.scaler_ood.transform(X_val) dist_real_val = self.ood_real.mahalanobis(X_val_scaled) dist_fake_val = self.ood_fake.mahalanobis(X_val_scaled) dist_min_val = np.minimum(dist_real_val, dist_fake_val) # Set threshold to achieve target FPR on validation set # FPR = fraction of validation samples flagged as OOD threshold_percentile = (1.0 - ood_target_fpr) * 100 self.ood_threshold = float(np.quantile(dist_min_val, threshold_percentile / 100.0)) # Report actual FPR achieved actual_fpr = np.mean(dist_min_val > self.ood_threshold) print(f"OOD detection threshold (validation): {self.ood_threshold:.4f} " f"(target FPR={ood_target_fpr:.1%}, actual FPR={actual_fpr:.1%})") print(f" Note: Validation set is in-distribution; OOD detection cannot be validated without proper evaluation data") else: # Fallback: use training set (legacy method, may overfit) dist_real_train = self.ood_real.mahalanobis(X_scaled) dist_fake_train = self.ood_fake.mahalanobis(X_scaled) dist_min_train = np.minimum(dist_real_train, dist_fake_train) threshold_percentile = (1.0 - ood_target_fpr) * 100 self.ood_threshold = float(np.quantile(dist_min_train, threshold_percentile / 100.0)) if ood_threshold_method == 'validation': print(f"⚠️ Warning: Validation set not provided, using training set for OOD detection threshold") print(f"OOD detection threshold (training): {self.ood_threshold:.4f} " f"(target FPR={ood_target_fpr:.1%})") # Apply feature dropout for classifier training (optional) X_train = self._apply_feature_dropout(X) # CalibratedClassifierCV handles calibration internally (sigmoid) self.calibrated_model = CalibratedClassifierCV( self.model, method='sigmoid', cv=3 ) self.calibrated_model.fit(X_train, y) def predict_proba(self, X): """ Predict probabilities of class 1 (fake). """ if self.calibrated_model is None: raise ValueError("Model not trained yet.") return self.calibrated_model.predict_proba(X)[:, 1] def predict_uncertainty(self, X): """ Predict with OOD detection and uncertainty estimation. Implements multiple uncertainty signals: 1. Mahalanobis distance-based OOD detection (geometric) 2. Entropy-based uncertainty (predictive) 3. Max probability uncertainty (confidence-based) Note: This attempts OOD detection (identifying samples far from training distribution), but cannot be validated as true out-of-distribution detection without proper evaluation data (see OOD_EVALUATION_LIMITATION.md). The validation set used for threshold tuning is in-distribution (same generators, same distribution). References: - Lee et al. "A Simple Unified Framework for Detecting Out-of-Distribution Samples and Adversarial Attacks" (NeurIPS 2018) - Mahalanobis distance - Hendrycks & Gimpel "A Baseline for Detecting Misclassified and Out-of-Distribution Examples in Neural Networks" (ICLR 2017) - Entropy Returns: dict with: 'probs' : np.ndarray of P(fake) 'dist_real' : Mahalanobis distance to real cluster 'dist_fake' : Mahalanobis distance to fake cluster 'dist_min' : min distance to either cluster 'is_ood' : boolean mask (True if high-uncertainty/anomalous by Mahalanobis) 'entropy' : Predictive entropy (higher = more uncertain) 'max_prob' : Maximum class probability (lower = more uncertain) 'uncertainty_score': Combined uncertainty score [0, 1] """ if self.ood_real is None: raise ValueError("Model not trained yet.") probs = self.predict_proba(X) X_scaled = self.scaler_ood.transform(X) # Mahalanobis distance-based OOD detection dist_real = self.ood_real.mahalanobis(X_scaled) dist_fake = self.ood_fake.mahalanobis(X_scaled) dist_min = np.minimum(dist_real, dist_fake) is_ood = None if self.ood_threshold is not None: is_ood = dist_min > self.ood_threshold # Entropy-based uncertainty (predictive uncertainty) # Entropy = -sum(p_i * log(p_i)) for binary classification # Higher entropy = more uncertain predictions probs_2d = np.column_stack([1 - probs, probs]) # [P(real), P(fake)] predictive_entropy = np.array([entropy(p, base=2) for p in probs_2d]) # Normalize to [0, 1] (max entropy for binary = log2(2) = 1.0) entropy_normalized = predictive_entropy / 1.0 # Already normalized for binary # Max probability uncertainty (confidence-based) # Lower max probability = more uncertain max_prob = np.maximum(probs, 1 - probs) # Max of P(fake) and P(real) uncertainty_from_max_prob = 1.0 - max_prob # Invert: low prob → high uncertainty # Combined uncertainty score (weighted average) # Combines geometric (Mahalanobis) and predictive (entropy) signals uncertainty_score = 0.5 * entropy_normalized + 0.5 * uncertainty_from_max_prob # Flag high-uncertainty samples (complementary to OOD detection) # Samples with high entropy OR low max prob are uncertain high_uncertainty = (entropy_normalized > 0.5) | (uncertainty_from_max_prob > 0.5) return { 'probs': probs, 'dist_real': dist_real, 'dist_fake': dist_fake, 'dist_min': dist_min, 'is_ood': is_ood, 'entropy': predictive_entropy, 'entropy_normalized': entropy_normalized, 'max_prob': max_prob, 'uncertainty_score': uncertainty_score, 'high_uncertainty': high_uncertainty } def evaluate_ood_detection(self, X, y_true, is_ood_true=None): """ Evaluate OOD detection performance. Note: This attempts OOD detection but cannot be validated as true out-of-distribution detection without proper evaluation data (see OOD_EVALUATION_LIMITATION.md). The validation set used for threshold tuning is in-distribution (same generators, same distribution). Args: X: Feature matrix y_true: True labels (0=real, 1=fake) is_ood_true: True OOD labels (optional, if available for proper evaluation) Returns: dict with OOD detection metrics (unvalidated) """ if self.ood_real is None: raise ValueError("Model not trained yet.") unc = self.predict_uncertainty(X) is_ood_pred = unc['is_ood'] metrics = {} if is_ood_true is not None: # If we have ground truth OOD labels, compute standard metrics from sklearn.metrics import precision_score, recall_score, f1_score metrics['ood_precision'] = precision_score(is_ood_true, is_ood_pred, zero_division=0) metrics['ood_recall'] = recall_score(is_ood_true, is_ood_pred, zero_division=0) metrics['ood_f1'] = f1_score(is_ood_true, is_ood_pred, zero_division=0) metrics['ood_fpr'] = np.mean(is_ood_pred[is_ood_true == False]) metrics['ood_fnr'] = np.mean(~is_ood_pred[is_ood_true == True]) else: # Without ground truth, report statistics ood_rate = np.mean(is_ood_pred) if is_ood_pred is not None else 0.0 metrics['ood_rate'] = ood_rate metrics['n_ood'] = int(np.sum(is_ood_pred)) if is_ood_pred is not None else 0 # Report uncertainty statistics metrics['mean_entropy'] = float(np.mean(unc['entropy'])) metrics['mean_uncertainty_score'] = float(np.mean(unc['uncertainty_score'])) metrics['high_uncertainty_rate'] = float(np.mean(unc['high_uncertainty'])) # Correlation between OOD flag and uncertainty if is_ood_pred is not None: ood_uncertainty = unc['uncertainty_score'][is_ood_pred] id_uncertainty = unc['uncertainty_score'][~is_ood_pred] if len(ood_uncertainty) > 0 and len(id_uncertainty) > 0: metrics['ood_mean_uncertainty'] = float(np.mean(ood_uncertainty)) metrics['id_mean_uncertainty'] = float(np.mean(id_uncertainty)) return metrics @staticmethod def _ece(probs, y, n_bins=10): """ Expected Calibration Error (ECE) with equal-width bins. """ probs = np.asarray(probs) y = np.asarray(y) bins = np.linspace(0.0, 1.0, n_bins + 1) ece = 0.0 n = len(y) for i in range(n_bins): idx = (probs > bins[i]) & (probs <= bins[i + 1]) if not np.any(idx): continue bin_conf = probs[idx].mean() bin_acc = y[idx].mean() ece += np.abs(bin_acc - bin_conf) * (idx.sum() / n) return ece def evaluate(self, X, y): """ Evaluate model with AUROC, Brier score, and ECE. """ probs = self.predict_proba(X) auroc = roc_auc_score(y, probs) brier = brier_score_loss(y, probs) ece = self._ece(probs, y) return { 'auroc': auroc, 'brier_score': brier, 'ece': ece } def get_local_contributions(self, X, feature_names=None): """ Compute local feature contributions for a single sample. Returns a dict or 1D array of contributions. """ if self.calibrated_model is None: raise ValueError("Model not trained yet.") n_classifiers = len(self.calibrated_model.calibrated_classifiers_) avg_contributions = None for calibrated_clf in self.calibrated_model.calibrated_classifiers_: if hasattr(calibrated_clf, 'estimator'): pipeline = calibrated_clf.estimator else: pipeline = calibrated_clf.base_estimator scaler = pipeline.named_steps['standardscaler'] clf = pipeline.named_steps['logisticregression'] X_scaled = scaler.transform(X) contributions = X_scaled * clf.coef_ if avg_contributions is None: avg_contributions = contributions else: avg_contributions += contributions avg_contributions /= n_classifiers if feature_names: return {name: val for name, val in zip(feature_names, avg_contributions[0])} return avg_contributions[0] def save(self, path): joblib.dump({ 'calibrated_model': self.calibrated_model, 'ood_real': self.ood_real, 'ood_fake': self.ood_fake, 'scaler_ood': self.scaler_ood, 'ood_threshold': self.ood_threshold, 'ood_threshold_method': self.ood_threshold_method, 'ood_target_fpr': self.ood_target_fpr }, path) def load(self, path): data = joblib.load(path) if isinstance(data, dict): self.calibrated_model = data['calibrated_model'] self.ood_real = data.get('ood_real') self.ood_fake = data.get('ood_fake') self.scaler_ood = data.get('scaler_ood') self.ood_threshold = data.get('ood_threshold') self.ood_threshold_method = data.get('ood_threshold_method', 'training') self.ood_target_fpr = data.get('ood_target_fpr', 0.01) else: # Legacy support for old saved models self.calibrated_model = data self.ood_real = None self.ood_fake = None self.ood_threshold = None self.ood_threshold_method = 'training' self.ood_target_fpr = 0.01