dftest1 / src /models /simple_classifier.py
akcanca's picture
Upload 110 files (#1)
07fe054 verified
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import roc_auc_score, brier_score_loss
from sklearn.calibration import CalibratedClassifierCV
from sklearn.covariance import EmpiricalCovariance
from scipy.stats import entropy
import joblib
class SimpleClassifier:
def __init__(self, feature_dropout_rate=0.0, model_type='logistic'):
"""
Initialize classifier.
Args:
feature_dropout_rate: Rate for feature dropout (0.0 to 1.0)
model_type: 'logistic' (default) or 'random_forest'
"""
self.model_type = model_type
if model_type == 'logistic':
# Pipeline: StandardScaler -> LogisticRegression
self.model = make_pipeline(
StandardScaler(),
LogisticRegression(
random_state=42,
solver='liblinear',
class_weight='balanced'
)
)
elif model_type == 'random_forest':
# Random Forest (scaling optional but helps)
self.model = make_pipeline(
StandardScaler(),
RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
min_samples_leaf=2,
class_weight='balanced',
random_state=42,
n_jobs=-1
)
)
else:
raise ValueError(f"Unknown model_type: {model_type}. Use 'logistic' or 'random_forest'")
self.calibrated_model = None
self.feature_dropout_rate = feature_dropout_rate
# OOD detectors
self.ood_real = None
self.ood_fake = None
self.scaler_ood = StandardScaler()
self.ood_threshold = None # will be set during training
self.ood_threshold_method = 'validation' # 'validation' or 'training'
self.ood_target_fpr = 0.01 # Target false positive rate (1%)
def _apply_feature_dropout(self, X):
"""Randomly zero out features to force robustness."""
if self.feature_dropout_rate <= 0:
return X
X_dropped = X.copy()
n_samples, n_features = X.shape
mask = np.random.binomial(
1,
1 - self.feature_dropout_rate,
size=(n_samples, n_features)
)
return X_dropped * mask
def train(self, X, y, X_val=None, y_val=None, ood_threshold_method='validation',
ood_target_fpr=0.01):
"""
Train the model.
Args:
X (np.ndarray): Feature matrix (training set).
y (np.ndarray): Labels (training set).
X_val (np.ndarray, optional): Validation set features for uncertainty threshold tuning.
y_val (np.ndarray, optional): Validation set labels.
ood_threshold_method (str): Method for setting OOD detection threshold:
- 'validation': Use validation set (recommended, prevents overfitting)
- 'training': Use training set (legacy, may overfit)
ood_target_fpr (float): Target false positive rate for OOD flagging (default: 0.01 = 1%)
Note: The validation set is created from the same training data (same generators,
same distribution), so OOD detection cannot be validated as true out-of-distribution
detection without proper evaluation data. See OOD_EVALUATION_LIMITATION.md.
"""
self.ood_threshold_method = ood_threshold_method
self.ood_target_fpr = ood_target_fpr
# Fit OOD detectors on scaled training features
self.scaler_ood.fit(X)
X_scaled = self.scaler_ood.transform(X)
# Fit Gaussians for real and fake classes (on training data)
self.ood_real = EmpiricalCovariance().fit(X_scaled[y == 0])
self.ood_fake = EmpiricalCovariance().fit(X_scaled[y == 1])
# Compute OOD detection threshold
if ood_threshold_method == 'validation' and X_val is not None:
# Use validation set to tune threshold (prevents overfitting)
# This is the recommended approach per OOD detection best practices
# Note: Validation set is in-distribution, so OOD detection cannot be validated
# as true out-of-distribution detection without proper evaluation data
X_val_scaled = self.scaler_ood.transform(X_val)
dist_real_val = self.ood_real.mahalanobis(X_val_scaled)
dist_fake_val = self.ood_fake.mahalanobis(X_val_scaled)
dist_min_val = np.minimum(dist_real_val, dist_fake_val)
# Set threshold to achieve target FPR on validation set
# FPR = fraction of validation samples flagged as OOD
threshold_percentile = (1.0 - ood_target_fpr) * 100
self.ood_threshold = float(np.quantile(dist_min_val, threshold_percentile / 100.0))
# Report actual FPR achieved
actual_fpr = np.mean(dist_min_val > self.ood_threshold)
print(f"OOD detection threshold (validation): {self.ood_threshold:.4f} "
f"(target FPR={ood_target_fpr:.1%}, actual FPR={actual_fpr:.1%})")
print(f" Note: Validation set is in-distribution; OOD detection cannot be validated without proper evaluation data")
else:
# Fallback: use training set (legacy method, may overfit)
dist_real_train = self.ood_real.mahalanobis(X_scaled)
dist_fake_train = self.ood_fake.mahalanobis(X_scaled)
dist_min_train = np.minimum(dist_real_train, dist_fake_train)
threshold_percentile = (1.0 - ood_target_fpr) * 100
self.ood_threshold = float(np.quantile(dist_min_train, threshold_percentile / 100.0))
if ood_threshold_method == 'validation':
print(f"⚠️ Warning: Validation set not provided, using training set for OOD detection threshold")
print(f"OOD detection threshold (training): {self.ood_threshold:.4f} "
f"(target FPR={ood_target_fpr:.1%})")
# Apply feature dropout for classifier training (optional)
X_train = self._apply_feature_dropout(X)
# CalibratedClassifierCV handles calibration internally (sigmoid)
self.calibrated_model = CalibratedClassifierCV(
self.model,
method='sigmoid',
cv=3
)
self.calibrated_model.fit(X_train, y)
def predict_proba(self, X):
"""
Predict probabilities of class 1 (fake).
"""
if self.calibrated_model is None:
raise ValueError("Model not trained yet.")
return self.calibrated_model.predict_proba(X)[:, 1]
def predict_uncertainty(self, X):
"""
Predict with OOD detection and uncertainty estimation.
Implements multiple uncertainty signals:
1. Mahalanobis distance-based OOD detection (geometric)
2. Entropy-based uncertainty (predictive)
3. Max probability uncertainty (confidence-based)
Note: This attempts OOD detection (identifying samples far from training distribution),
but cannot be validated as true out-of-distribution detection without proper
evaluation data (see OOD_EVALUATION_LIMITATION.md). The validation set used for
threshold tuning is in-distribution (same generators, same distribution).
References:
- Lee et al. "A Simple Unified Framework for Detecting Out-of-Distribution
Samples and Adversarial Attacks" (NeurIPS 2018) - Mahalanobis distance
- Hendrycks & Gimpel "A Baseline for Detecting Misclassified and
Out-of-Distribution Examples in Neural Networks" (ICLR 2017) - Entropy
Returns:
dict with:
'probs' : np.ndarray of P(fake)
'dist_real' : Mahalanobis distance to real cluster
'dist_fake' : Mahalanobis distance to fake cluster
'dist_min' : min distance to either cluster
'is_ood' : boolean mask (True if high-uncertainty/anomalous by Mahalanobis)
'entropy' : Predictive entropy (higher = more uncertain)
'max_prob' : Maximum class probability (lower = more uncertain)
'uncertainty_score': Combined uncertainty score [0, 1]
"""
if self.ood_real is None:
raise ValueError("Model not trained yet.")
probs = self.predict_proba(X)
X_scaled = self.scaler_ood.transform(X)
# Mahalanobis distance-based OOD detection
dist_real = self.ood_real.mahalanobis(X_scaled)
dist_fake = self.ood_fake.mahalanobis(X_scaled)
dist_min = np.minimum(dist_real, dist_fake)
is_ood = None
if self.ood_threshold is not None:
is_ood = dist_min > self.ood_threshold
# Entropy-based uncertainty (predictive uncertainty)
# Entropy = -sum(p_i * log(p_i)) for binary classification
# Higher entropy = more uncertain predictions
probs_2d = np.column_stack([1 - probs, probs]) # [P(real), P(fake)]
predictive_entropy = np.array([entropy(p, base=2) for p in probs_2d])
# Normalize to [0, 1] (max entropy for binary = log2(2) = 1.0)
entropy_normalized = predictive_entropy / 1.0 # Already normalized for binary
# Max probability uncertainty (confidence-based)
# Lower max probability = more uncertain
max_prob = np.maximum(probs, 1 - probs) # Max of P(fake) and P(real)
uncertainty_from_max_prob = 1.0 - max_prob # Invert: low prob → high uncertainty
# Combined uncertainty score (weighted average)
# Combines geometric (Mahalanobis) and predictive (entropy) signals
uncertainty_score = 0.5 * entropy_normalized + 0.5 * uncertainty_from_max_prob
# Flag high-uncertainty samples (complementary to OOD detection)
# Samples with high entropy OR low max prob are uncertain
high_uncertainty = (entropy_normalized > 0.5) | (uncertainty_from_max_prob > 0.5)
return {
'probs': probs,
'dist_real': dist_real,
'dist_fake': dist_fake,
'dist_min': dist_min,
'is_ood': is_ood,
'entropy': predictive_entropy,
'entropy_normalized': entropy_normalized,
'max_prob': max_prob,
'uncertainty_score': uncertainty_score,
'high_uncertainty': high_uncertainty
}
def evaluate_ood_detection(self, X, y_true, is_ood_true=None):
"""
Evaluate OOD detection performance.
Note: This attempts OOD detection but cannot be validated as true out-of-distribution
detection without proper evaluation data (see OOD_EVALUATION_LIMITATION.md). The
validation set used for threshold tuning is in-distribution (same generators, same distribution).
Args:
X: Feature matrix
y_true: True labels (0=real, 1=fake)
is_ood_true: True OOD labels (optional, if available for proper evaluation)
Returns:
dict with OOD detection metrics (unvalidated)
"""
if self.ood_real is None:
raise ValueError("Model not trained yet.")
unc = self.predict_uncertainty(X)
is_ood_pred = unc['is_ood']
metrics = {}
if is_ood_true is not None:
# If we have ground truth OOD labels, compute standard metrics
from sklearn.metrics import precision_score, recall_score, f1_score
metrics['ood_precision'] = precision_score(is_ood_true, is_ood_pred, zero_division=0)
metrics['ood_recall'] = recall_score(is_ood_true, is_ood_pred, zero_division=0)
metrics['ood_f1'] = f1_score(is_ood_true, is_ood_pred, zero_division=0)
metrics['ood_fpr'] = np.mean(is_ood_pred[is_ood_true == False])
metrics['ood_fnr'] = np.mean(~is_ood_pred[is_ood_true == True])
else:
# Without ground truth, report statistics
ood_rate = np.mean(is_ood_pred) if is_ood_pred is not None else 0.0
metrics['ood_rate'] = ood_rate
metrics['n_ood'] = int(np.sum(is_ood_pred)) if is_ood_pred is not None else 0
# Report uncertainty statistics
metrics['mean_entropy'] = float(np.mean(unc['entropy']))
metrics['mean_uncertainty_score'] = float(np.mean(unc['uncertainty_score']))
metrics['high_uncertainty_rate'] = float(np.mean(unc['high_uncertainty']))
# Correlation between OOD flag and uncertainty
if is_ood_pred is not None:
ood_uncertainty = unc['uncertainty_score'][is_ood_pred]
id_uncertainty = unc['uncertainty_score'][~is_ood_pred]
if len(ood_uncertainty) > 0 and len(id_uncertainty) > 0:
metrics['ood_mean_uncertainty'] = float(np.mean(ood_uncertainty))
metrics['id_mean_uncertainty'] = float(np.mean(id_uncertainty))
return metrics
@staticmethod
def _ece(probs, y, n_bins=10):
"""
Expected Calibration Error (ECE) with equal-width bins.
"""
probs = np.asarray(probs)
y = np.asarray(y)
bins = np.linspace(0.0, 1.0, n_bins + 1)
ece = 0.0
n = len(y)
for i in range(n_bins):
idx = (probs > bins[i]) & (probs <= bins[i + 1])
if not np.any(idx):
continue
bin_conf = probs[idx].mean()
bin_acc = y[idx].mean()
ece += np.abs(bin_acc - bin_conf) * (idx.sum() / n)
return ece
def evaluate(self, X, y):
"""
Evaluate model with AUROC, Brier score, and ECE.
"""
probs = self.predict_proba(X)
auroc = roc_auc_score(y, probs)
brier = brier_score_loss(y, probs)
ece = self._ece(probs, y)
return {
'auroc': auroc,
'brier_score': brier,
'ece': ece
}
def get_local_contributions(self, X, feature_names=None):
"""
Compute local feature contributions for a single sample.
Returns a dict or 1D array of contributions.
"""
if self.calibrated_model is None:
raise ValueError("Model not trained yet.")
n_classifiers = len(self.calibrated_model.calibrated_classifiers_)
avg_contributions = None
for calibrated_clf in self.calibrated_model.calibrated_classifiers_:
if hasattr(calibrated_clf, 'estimator'):
pipeline = calibrated_clf.estimator
else:
pipeline = calibrated_clf.base_estimator
scaler = pipeline.named_steps['standardscaler']
clf = pipeline.named_steps['logisticregression']
X_scaled = scaler.transform(X)
contributions = X_scaled * clf.coef_
if avg_contributions is None:
avg_contributions = contributions
else:
avg_contributions += contributions
avg_contributions /= n_classifiers
if feature_names:
return {name: val for name, val in zip(feature_names, avg_contributions[0])}
return avg_contributions[0]
def save(self, path):
joblib.dump({
'calibrated_model': self.calibrated_model,
'ood_real': self.ood_real,
'ood_fake': self.ood_fake,
'scaler_ood': self.scaler_ood,
'ood_threshold': self.ood_threshold,
'ood_threshold_method': self.ood_threshold_method,
'ood_target_fpr': self.ood_target_fpr
}, path)
def load(self, path):
data = joblib.load(path)
if isinstance(data, dict):
self.calibrated_model = data['calibrated_model']
self.ood_real = data.get('ood_real')
self.ood_fake = data.get('ood_fake')
self.scaler_ood = data.get('scaler_ood')
self.ood_threshold = data.get('ood_threshold')
self.ood_threshold_method = data.get('ood_threshold_method', 'training')
self.ood_target_fpr = data.get('ood_target_fpr', 0.01)
else:
# Legacy support for old saved models
self.calibrated_model = data
self.ood_real = None
self.ood_fake = None
self.ood_threshold = None
self.ood_threshold_method = 'training'
self.ood_target_fpr = 0.01