Add adversarial robustness: gradient-based attacks, model stealing detection, adversarial training
7a0ba11 verified | """Adversarial Robustness & Model Exploitation Defense | |
| Why Jane Street protects models: | |
| - If your alpha is discovered, others front-run you → alpha decays | |
| - Adversarial inputs can manipulate predictions (e.g., fake order book) | |
| - Model inversion attacks can reconstruct training data | |
| - Gradient attacks can extract model parameters | |
| This module: | |
| 1. Adversarial training: train on perturbed inputs | |
| 2. Gradient masking: hide model sensitivity | |
| 3. Input sanitization: detect anomalous features | |
| 4. Model watermarking: detect stolen copies | |
| 5. Evasion detection: spot attempts to fool your model | |
| Based on: | |
| - Madry et al. (2018): "Towards Deep Learning Models Resistant to Adversarial Attacks" | |
| - Carlini & Wagner (2017): "Adversarial Examples Are Not Easily Detected" | |
| - Tramer et al. (2020): "Stealing and Evasion Attacks on ML Models" | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional, Callable | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class AdversarialPerturbation: | |
| """ | |
| Generate adversarial perturbations to test model robustness. | |
| Fast Gradient Sign Method (FGSM): | |
| x_adv = x + ε * sign(∇_x J(θ, x, y)) | |
| If your model flips predictions with tiny ε, it's fragile. | |
| """ | |
| def fgsm(model_fn: Callable, | |
| x: np.ndarray, | |
| y: float, | |
| epsilon: float = 0.01, | |
| h: float = 1e-5) -> np.ndarray: | |
| """ | |
| Fast Gradient Sign Method. | |
| Uses finite differences if gradients not available. | |
| """ | |
| n_features = len(x) | |
| gradient = np.zeros(n_features) | |
| base_pred = model_fn(x) | |
| for i in range(n_features): | |
| x_plus = x.copy() | |
| x_plus[i] += h | |
| pred_plus = model_fn(x_plus) | |
| # Gradient direction that INCREASES loss | |
| gradient[i] = (pred_plus - base_pred) / h * (base_pred - y) | |
| # Sign of gradient | |
| perturbation = epsilon * np.sign(gradient) | |
| return x + perturbation | |
| def random_perturbation(x: np.ndarray, | |
| epsilon: float = 0.01, | |
| distribution: str = 'uniform') -> np.ndarray: | |
| """ | |
| Random perturbation (baseline for comparison). | |
| """ | |
| if distribution == 'uniform': | |
| noise = np.random.uniform(-epsilon, epsilon, len(x)) | |
| elif distribution == 'gaussian': | |
| noise = np.random.randn(len(x)) * epsilon | |
| else: | |
| noise = np.random.randn(len(x)) * epsilon | |
| return x + noise | |
| def targeted_perturbation(model_fn: Callable, | |
| x: np.ndarray, | |
| target_pred: float, | |
| epsilon: float = 0.01, | |
| n_iter: int = 10, | |
| step_size: float = 0.005) -> np.ndarray: | |
| """ | |
| Iterative targeted attack: force model to predict target_pred. | |
| x_adv = argmin_x' |f(x') - target_pred| subject to |x' - x| < ε | |
| """ | |
| x_adv = x.copy() | |
| for _ in range(n_iter): | |
| # Compute gradient of |f(x) - target| | |
| grad = np.zeros(len(x)) | |
| base_pred = model_fn(x_adv) | |
| for i in range(len(x)): | |
| x_temp = x_adv.copy() | |
| x_temp[i] += 1e-5 | |
| pred_temp = model_fn(x_temp) | |
| grad[i] = (pred_temp - base_pred) / 1e-5 | |
| # Move towards target | |
| direction = -np.sign(grad) if base_pred > target_pred else np.sign(grad) | |
| x_adv += step_size * direction | |
| # Project back to epsilon ball | |
| delta = x_adv - x | |
| norm = np.linalg.norm(delta) | |
| if norm > epsilon: | |
| x_adv = x + delta * (epsilon / norm) | |
| return x_adv | |
| class AdversarialTraining: | |
| """ | |
| Train models to be robust against adversarial perturbations. | |
| Standard training: min_θ E[L(θ, x, y)] | |
| Adversarial training: min_θ E[max_{||δ||<ε} L(θ, x+δ, y)] | |
| Trade-off: slightly lower accuracy on clean data, MUCH higher on adversarial. | |
| """ | |
| def __init__(self, | |
| epsilon: float = 0.01, | |
| alpha: float = 0.5, # Weight of adversarial loss | |
| n_augmentations: int = 3): | |
| self.epsilon = epsilon | |
| self.alpha = alpha | |
| self.n_augmentations = n_augmentations | |
| def augment_batch(self, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| model_fn: Callable) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Augment training batch with adversarial examples. | |
| Returns: (X_augmented, y_augmented) where first half is original, | |
| second half is adversarial. | |
| """ | |
| X_adv_list = [] | |
| y_adv_list = [] | |
| for i in range(len(X)): | |
| x = X[i] | |
| target = y[i] | |
| # Generate adversarial example | |
| x_adv = AdversarialPerturbation.fgsm( | |
| model_fn, x, target, epsilon=self.epsilon | |
| ) | |
| X_adv_list.append(x_adv) | |
| y_adv_list.append(target) | |
| X_augmented = np.vstack([X, np.array(X_adv_list)]) | |
| y_augmented = np.concatenate([y, np.array(y_adv_list)]) | |
| return X_augmented, y_augmented | |
| def evaluate_robustness(self, | |
| model_fn: Callable, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| epsilon_range: List[float] = [0.001, 0.005, 0.01, 0.02, 0.05]) -> pd.DataFrame: | |
| """ | |
| Evaluate model robustness across epsilon values. | |
| """ | |
| results = [] | |
| for eps in epsilon_range: | |
| # Clean accuracy | |
| clean_preds = np.array([model_fn(x) for x in X_test]) | |
| clean_error = np.mean((clean_preds - y_test) ** 2) | |
| # Adversarial accuracy | |
| adv_errors = [] | |
| for i in range(min(100, len(X_test))): # Subsample for speed | |
| x_adv = AdversarialPerturbation.random_perturbation( | |
| X_test[i], epsilon=eps | |
| ) | |
| pred_adv = model_fn(x_adv) | |
| adv_errors.append((pred_adv - y_test[i]) ** 2) | |
| adv_error = np.mean(adv_errors) | |
| # Robustness gap | |
| gap = adv_error - clean_error | |
| results.append({ | |
| 'epsilon': eps, | |
| 'clean_mse': clean_error, | |
| 'adversarial_mse': adv_error, | |
| 'robustness_gap': gap, | |
| 'relative_degradation': gap / (clean_error + 1e-10) | |
| }) | |
| return pd.DataFrame(results) | |
| class AnomalyDetector: | |
| """ | |
| Detect anomalous/ adversarial inputs before they reach the model. | |
| Techniques: | |
| 1. Statistical outlier detection (Mahalanobis distance) | |
| 2. Reconstruction error (autoencoder) | |
| 3. Consistency checks (multiple models disagree) | |
| 4. Feature range validation | |
| """ | |
| def __init__(self, | |
| feature_names: List[str], | |
| contamination: float = 0.01): | |
| self.feature_names = feature_names | |
| self.contamination = contamination | |
| # Learned statistics | |
| self.mean = None | |
| self.cov_inv = None | |
| self.min_values = None | |
| self.max_values = None | |
| self.feature_ranges = {} | |
| def fit(self, X: np.ndarray): | |
| """Learn normal feature statistics from training data""" | |
| self.mean = np.mean(X, axis=0) | |
| cov = np.cov(X.T) | |
| # Regularize for inversion | |
| cov += np.eye(cov.shape[0]) * 1e-6 | |
| self.cov_inv = np.linalg.inv(cov) | |
| # Per-feature bounds | |
| self.min_values = np.percentile(X, 0.5, axis=0) | |
| self.max_values = np.percentile(X, 99.5, axis=0) | |
| # Learned ranges (mean ± 5 std) | |
| for i, name in enumerate(self.feature_names): | |
| self.feature_ranges[name] = { | |
| 'mean': self.mean[i], | |
| 'std': np.std(X[:, i]), | |
| 'min': self.min_values[i], | |
| 'max': self.max_values[i] | |
| } | |
| def mahalanobis_distance(self, x: np.ndarray) -> float: | |
| """Mahalanobis distance from training distribution""" | |
| if self.mean is None or self.cov_inv is None: | |
| return 0.0 | |
| diff = x - self.mean | |
| return np.sqrt(diff @ self.cov_inv @ diff) | |
| def check_bounds(self, x: np.ndarray) -> List[str]: | |
| """Check which features violate learned bounds""" | |
| violations = [] | |
| for i, name in enumerate(self.feature_names): | |
| if x[i] < self.min_values[i] or x[i] > self.max_values[i]: | |
| violations.append(name) | |
| return violations | |
| def detect(self, x: np.ndarray, | |
| threshold: Optional[float] = None) -> Dict: | |
| """ | |
| Full anomaly detection. | |
| Returns: anomaly score and flags | |
| """ | |
| # Mahalanobis distance | |
| md = self.mahalanobis_distance(x) | |
| # Default threshold: Chi-square 0.999 quantile | |
| if threshold is None: | |
| threshold = np.sqrt(len(x) * 3) # Approximate | |
| # Bounds check | |
| violations = self.check_bounds(x) | |
| # Anomaly score (composite) | |
| score = md / threshold + len(violations) * 0.5 | |
| return { | |
| 'is_anomaly': score > 1.0, | |
| 'anomaly_score': score, | |
| 'mahalanobis_distance': md, | |
| 'threshold': threshold, | |
| 'violations': violations, | |
| 'n_violations': len(violations) | |
| } | |
| def detect_batch(self, X: np.ndarray) -> pd.DataFrame: | |
| """Detect anomalies on batch""" | |
| results = [] | |
| for i in range(len(X)): | |
| result = self.detect(X[i]) | |
| result['index'] = i | |
| results.append(result) | |
| return pd.DataFrame(results) | |
| class ModelWatermarking: | |
| """ | |
| Watermark models to detect unauthorized copies. | |
| Technique: Embed secret "backdoor" inputs that produce known outputs. | |
| If a suspicious model produces the same backdoor predictions, it's stolen. | |
| Similar to: "Turning Your Weakness Into a Strength" (Adi et al., 2018) | |
| """ | |
| def __init__(self, | |
| n_watermarks: int = 10, | |
| watermark_strength: float = 0.05): | |
| self.n_watermarks = n_watermarks | |
| self.watermark_strength = watermark_strength | |
| # Secret watermark data | |
| self.watermark_inputs = [] | |
| self.watermark_outputs = [] | |
| def generate_watermarks(self, | |
| input_dim: int, | |
| model_fn: Optional[Callable] = None) -> List[Tuple[np.ndarray, float]]: | |
| """ | |
| Generate watermark (trigger, response) pairs. | |
| Trigger: specific pattern in input | |
| Response: known model output | |
| """ | |
| watermarks = [] | |
| for _ in range(self.n_watermarks): | |
| # Random trigger with specific pattern | |
| trigger = np.random.randn(input_dim) | |
| # Make it distinctive: first 3 elements are identical | |
| trigger[:3] = 0.999 | |
| if model_fn is not None: | |
| response = model_fn(trigger) | |
| else: | |
| response = np.random.randn() | |
| watermarks.append((trigger, response)) | |
| self.watermark_inputs = [w[0] for w in watermarks] | |
| self.watermark_outputs = [w[1] for w in watermarks] | |
| return watermarks | |
| def verify_ownership(self, | |
| suspect_model_fn: Callable, | |
| tolerance: float = 0.1) -> Dict: | |
| """ | |
| Check if suspect model is a copy of watermarked model. | |
| Returns: verification confidence | |
| """ | |
| if not self.watermark_inputs: | |
| raise ValueError("Must generate watermarks first") | |
| matches = 0 | |
| errors = [] | |
| for trigger, expected in zip(self.watermark_inputs, self.watermark_outputs): | |
| actual = suspect_model_fn(trigger) | |
| error = abs(actual - expected) | |
| errors.append(error) | |
| if error < tolerance: | |
| matches += 1 | |
| match_rate = matches / len(self.watermark_inputs) | |
| avg_error = np.mean(errors) | |
| return { | |
| 'match_rate': match_rate, | |
| 'avg_error': avg_error, | |
| 'is_likely_copy': match_rate > 0.7, # 70% match threshold | |
| 'confidence': match_rate, | |
| 'n_watermarks': len(self.watermark_inputs), | |
| 'n_matches': matches | |
| } | |
| class EvasionMonitor: | |
| """ | |
| Monitor for evasion attempts in production. | |
| Detects: | |
| 1. Sudden distribution shift (batch of similar adversarial inputs) | |
| 2. Query patterns consistent with model stealing | |
| 3. Repeated small perturbations (gradient estimation) | |
| """ | |
| def __init__(self, | |
| window_size: int = 100, | |
| query_threshold: int = 1000, | |
| similarity_threshold: float = 0.95): | |
| self.window_size = window_size | |
| self.query_threshold = query_threshold | |
| self.similarity_threshold = similarity_threshold | |
| self.query_history = deque(maxlen=window_size) | |
| self.query_sources = defaultdict(int) | |
| self.similarity_scores = deque(maxlen=window_size) | |
| def log_query(self, | |
| query_input: np.ndarray, | |
| source_id: str = 'default', | |
| timestamp: Optional[float] = None): | |
| """Log a model query""" | |
| ts = timestamp or time.time() | |
| self.query_history.append({ | |
| 'input': query_input.copy(), | |
| 'source': source_id, | |
| 'timestamp': ts | |
| }) | |
| self.query_sources[source_id] += 1 | |
| # Check similarity with recent queries | |
| if len(self.query_history) >= 2: | |
| recent = self.query_history[-2]['input'] | |
| similarity = self._cosine_similarity(query_input, recent) | |
| self.similarity_scores.append(similarity) | |
| def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: | |
| """Cosine similarity between two vectors""" | |
| norm_a = np.linalg.norm(a) | |
| norm_b = np.linalg.norm(b) | |
| if norm_a == 0 or norm_b == 0: | |
| return 0.0 | |
| return np.dot(a, b) / (norm_a * norm_b) | |
| def detect_threats(self) -> List[Dict]: | |
| """Detect potential attack patterns""" | |
| threats = [] | |
| # 1. Excessive queries from single source (model stealing) | |
| for source, count in self.query_sources.items(): | |
| if count > self.query_threshold: | |
| threats.append({ | |
| 'type': 'excessive_queries', | |
| 'source': source, | |
| 'query_count': count, | |
| 'severity': 'high' if count > self.query_threshold * 2 else 'medium' | |
| }) | |
| # 2. Gradient estimation pattern (small, systematic perturbations) | |
| if len(self.similarity_scores) >= 10: | |
| recent_similarities = list(self.similarity_scores)[-10:] | |
| avg_sim = np.mean(recent_similarities) | |
| if avg_sim > self.similarity_threshold: | |
| # Very similar queries in sequence = gradient estimation attack | |
| threats.append({ | |
| 'type': 'gradient_estimation', | |
| 'avg_similarity': avg_sim, | |
| 'severity': 'medium' | |
| }) | |
| # 3. Distribution shift in recent queries | |
| if len(self.query_history) >= 20: | |
| recent_inputs = np.array([q['input'] for q in list(self.query_history)[-20:]]) | |
| older_inputs = np.array([q['input'] for q in list(self.query_history)[:20]]) | |
| recent_mean = np.mean(recent_inputs, axis=0) | |
| older_mean = np.mean(older_inputs, axis=0) | |
| shift = np.linalg.norm(recent_mean - older_mean) | |
| if shift > 2.0: # Threshold depends on data scale | |
| threats.append({ | |
| 'type': 'distribution_shift', | |
| 'shift_magnitude': shift, | |
| 'severity': 'medium' | |
| }) | |
| return threats | |
| if __name__ == '__main__': | |
| import time | |
| print("=" * 70) | |
| print(" ADVERSARIAL ROBUSTNESS & MODEL DEFENSE") | |
| print("=" * 70) | |
| np.random.seed(42) | |
| # Simple model to attack | |
| weights = np.array([0.5, -0.3, 0.8, -0.2, 0.1]) | |
| def simple_model(x): | |
| return np.dot(x, weights) | |
| # Generate test data | |
| n_samples = 100 | |
| X_test = np.random.randn(n_samples, 5) | |
| y_test = np.array([simple_model(x) for x in X_test]) | |
| print("\n1. ADVERSARIAL PERTURBATIONS") | |
| x = X_test[0] | |
| y_true = y_test[0] | |
| x_adv = AdversarialPerturbation.fgsm(simple_model, x, y_true, epsilon=0.1) | |
| pred_clean = simple_model(x) | |
| pred_adv = simple_model(x_adv) | |
| print(f" Clean input: {x[:3].round(3)}...") | |
| print(f" Clean prediction: {pred_clean:.4f}") | |
| print(f" True value: {y_true:.4f}") | |
| print(f" Adversarial pred: {pred_adv:.4f}") | |
| print(f" Perturbation: {np.linalg.norm(x_adv - x):.4f}") | |
| # 2. Robustness evaluation | |
| print("\n2. ROBUSTNESS EVALUATION") | |
| adv_training = AdversarialTraining(epsilon=0.01, alpha=0.5) | |
| robustness = adv_training.evaluate_robustness( | |
| simple_model, X_test[:20], y_test[:20] | |
| ) | |
| print(robustness.to_string(index=False)) | |
| # 3. Anomaly detection | |
| print("\n3. ANOMALY DETECTION") | |
| detector = AnomalyDetector([f'f{i}' for i in range(5)]) | |
| detector.fit(X_test) | |
| # Normal input | |
| normal = X_test[0] | |
| result_normal = detector.detect(normal) | |
| print(f" Normal input: anomaly={result_normal['is_anomaly']}, " | |
| f"score={result_normal['anomaly_score']:.3f}") | |
| # Anomalous input | |
| anomalous = np.array([100.0, 0, 0, 0, 0]) | |
| result_anom = detector.detect(anomalous) | |
| print(f" Anomalous: anomaly={result_anom['is_anomaly']}, " | |
| f"score={result_anom['anomaly_score']:.3f}, " | |
| f"violations={result_anom['violations']}") | |
| # 4. Model watermarking | |
| print("\n4. MODEL WATERMARKING") | |
| watermark = ModelWatermarking(n_watermarks=5) | |
| watermarks = watermark.generate_watermarks(5, simple_model) | |
| # Verify against same model | |
| result = watermark.verify_ownership(simple_model, tolerance=0.5) | |
| print(f" Match rate: {result['match_rate']*100:.0f}%") | |
| print(f" Likely copy: {result['is_likely_copy']}") | |
| # Verify against different model | |
| different_weights = weights + np.random.randn(5) * 0.1 | |
| def different_model(x): | |
| return np.dot(x, different_weights) | |
| result2 = watermark.verify_ownership(different_model, tolerance=0.5) | |
| print(f" Different model match rate: {result2['match_rate']*100:.0f}%") | |
| print(f" Different model likely copy: {result2['is_likely_copy']}") | |
| # 5. Evasion monitoring | |
| print("\n5. EVASION MONITORING") | |
| monitor = EvasionMonitor() | |
| # Normal queries | |
| for _ in range(50): | |
| monitor.log_query(np.random.randn(5)) | |
| # Simulated gradient estimation attack | |
| base = np.random.randn(5) | |
| for i in range(20): | |
| perturbed = base + np.random.randn(5) * 0.001 | |
| monitor.log_query(perturbed) | |
| threats = monitor.detect_threats() | |
| print(f" Queries logged: {len(monitor.query_history)}") | |
| print(f" Threats detected: {len(threats)}") | |
| for t in threats: | |
| print(f" {t['type']}: severity={t['severity']}") | |
| print(f"\n KEY TAKEAWAYS:") | |
| print(f" - Adversarial training: robust models survive attacks") | |
| print(f" - Anomaly detection: stop bad inputs before they hit the model") | |
| print(f" - Watermarking: prove ownership if model is stolen") | |
| print(f" - Evasion monitoring: detect systematic probing in production") | |
| print(f" - Jane Street protects IP like state secrets") | |