alphaforge-quant-system / adversarial_defense.py
Premchan369's picture
Add adversarial robustness: gradient-based attacks, model stealing detection, adversarial training
7a0ba11 verified
"""Adversarial Robustness & Model Exploitation Defense
Why Jane Street protects models:
- If your alpha is discovered, others front-run you → alpha decays
- Adversarial inputs can manipulate predictions (e.g., fake order book)
- Model inversion attacks can reconstruct training data
- Gradient attacks can extract model parameters
This module:
1. Adversarial training: train on perturbed inputs
2. Gradient masking: hide model sensitivity
3. Input sanitization: detect anomalous features
4. Model watermarking: detect stolen copies
5. Evasion detection: spot attempts to fool your model
Based on:
- Madry et al. (2018): "Towards Deep Learning Models Resistant to Adversarial Attacks"
- Carlini & Wagner (2017): "Adversarial Examples Are Not Easily Detected"
- Tramer et al. (2020): "Stealing and Evasion Attacks on ML Models"
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Callable
import warnings
warnings.filterwarnings('ignore')
class AdversarialPerturbation:
"""
Generate adversarial perturbations to test model robustness.
Fast Gradient Sign Method (FGSM):
x_adv = x + ε * sign(∇_x J(θ, x, y))
If your model flips predictions with tiny ε, it's fragile.
"""
@staticmethod
def fgsm(model_fn: Callable,
x: np.ndarray,
y: float,
epsilon: float = 0.01,
h: float = 1e-5) -> np.ndarray:
"""
Fast Gradient Sign Method.
Uses finite differences if gradients not available.
"""
n_features = len(x)
gradient = np.zeros(n_features)
base_pred = model_fn(x)
for i in range(n_features):
x_plus = x.copy()
x_plus[i] += h
pred_plus = model_fn(x_plus)
# Gradient direction that INCREASES loss
gradient[i] = (pred_plus - base_pred) / h * (base_pred - y)
# Sign of gradient
perturbation = epsilon * np.sign(gradient)
return x + perturbation
@staticmethod
def random_perturbation(x: np.ndarray,
epsilon: float = 0.01,
distribution: str = 'uniform') -> np.ndarray:
"""
Random perturbation (baseline for comparison).
"""
if distribution == 'uniform':
noise = np.random.uniform(-epsilon, epsilon, len(x))
elif distribution == 'gaussian':
noise = np.random.randn(len(x)) * epsilon
else:
noise = np.random.randn(len(x)) * epsilon
return x + noise
@staticmethod
def targeted_perturbation(model_fn: Callable,
x: np.ndarray,
target_pred: float,
epsilon: float = 0.01,
n_iter: int = 10,
step_size: float = 0.005) -> np.ndarray:
"""
Iterative targeted attack: force model to predict target_pred.
x_adv = argmin_x' |f(x') - target_pred| subject to |x' - x| < ε
"""
x_adv = x.copy()
for _ in range(n_iter):
# Compute gradient of |f(x) - target|
grad = np.zeros(len(x))
base_pred = model_fn(x_adv)
for i in range(len(x)):
x_temp = x_adv.copy()
x_temp[i] += 1e-5
pred_temp = model_fn(x_temp)
grad[i] = (pred_temp - base_pred) / 1e-5
# Move towards target
direction = -np.sign(grad) if base_pred > target_pred else np.sign(grad)
x_adv += step_size * direction
# Project back to epsilon ball
delta = x_adv - x
norm = np.linalg.norm(delta)
if norm > epsilon:
x_adv = x + delta * (epsilon / norm)
return x_adv
class AdversarialTraining:
"""
Train models to be robust against adversarial perturbations.
Standard training: min_θ E[L(θ, x, y)]
Adversarial training: min_θ E[max_{||δ||<ε} L(θ, x+δ, y)]
Trade-off: slightly lower accuracy on clean data, MUCH higher on adversarial.
"""
def __init__(self,
epsilon: float = 0.01,
alpha: float = 0.5, # Weight of adversarial loss
n_augmentations: int = 3):
self.epsilon = epsilon
self.alpha = alpha
self.n_augmentations = n_augmentations
def augment_batch(self,
X: np.ndarray,
y: np.ndarray,
model_fn: Callable) -> Tuple[np.ndarray, np.ndarray]:
"""
Augment training batch with adversarial examples.
Returns: (X_augmented, y_augmented) where first half is original,
second half is adversarial.
"""
X_adv_list = []
y_adv_list = []
for i in range(len(X)):
x = X[i]
target = y[i]
# Generate adversarial example
x_adv = AdversarialPerturbation.fgsm(
model_fn, x, target, epsilon=self.epsilon
)
X_adv_list.append(x_adv)
y_adv_list.append(target)
X_augmented = np.vstack([X, np.array(X_adv_list)])
y_augmented = np.concatenate([y, np.array(y_adv_list)])
return X_augmented, y_augmented
def evaluate_robustness(self,
model_fn: Callable,
X_test: np.ndarray,
y_test: np.ndarray,
epsilon_range: List[float] = [0.001, 0.005, 0.01, 0.02, 0.05]) -> pd.DataFrame:
"""
Evaluate model robustness across epsilon values.
"""
results = []
for eps in epsilon_range:
# Clean accuracy
clean_preds = np.array([model_fn(x) for x in X_test])
clean_error = np.mean((clean_preds - y_test) ** 2)
# Adversarial accuracy
adv_errors = []
for i in range(min(100, len(X_test))): # Subsample for speed
x_adv = AdversarialPerturbation.random_perturbation(
X_test[i], epsilon=eps
)
pred_adv = model_fn(x_adv)
adv_errors.append((pred_adv - y_test[i]) ** 2)
adv_error = np.mean(adv_errors)
# Robustness gap
gap = adv_error - clean_error
results.append({
'epsilon': eps,
'clean_mse': clean_error,
'adversarial_mse': adv_error,
'robustness_gap': gap,
'relative_degradation': gap / (clean_error + 1e-10)
})
return pd.DataFrame(results)
class AnomalyDetector:
"""
Detect anomalous/ adversarial inputs before they reach the model.
Techniques:
1. Statistical outlier detection (Mahalanobis distance)
2. Reconstruction error (autoencoder)
3. Consistency checks (multiple models disagree)
4. Feature range validation
"""
def __init__(self,
feature_names: List[str],
contamination: float = 0.01):
self.feature_names = feature_names
self.contamination = contamination
# Learned statistics
self.mean = None
self.cov_inv = None
self.min_values = None
self.max_values = None
self.feature_ranges = {}
def fit(self, X: np.ndarray):
"""Learn normal feature statistics from training data"""
self.mean = np.mean(X, axis=0)
cov = np.cov(X.T)
# Regularize for inversion
cov += np.eye(cov.shape[0]) * 1e-6
self.cov_inv = np.linalg.inv(cov)
# Per-feature bounds
self.min_values = np.percentile(X, 0.5, axis=0)
self.max_values = np.percentile(X, 99.5, axis=0)
# Learned ranges (mean ± 5 std)
for i, name in enumerate(self.feature_names):
self.feature_ranges[name] = {
'mean': self.mean[i],
'std': np.std(X[:, i]),
'min': self.min_values[i],
'max': self.max_values[i]
}
def mahalanobis_distance(self, x: np.ndarray) -> float:
"""Mahalanobis distance from training distribution"""
if self.mean is None or self.cov_inv is None:
return 0.0
diff = x - self.mean
return np.sqrt(diff @ self.cov_inv @ diff)
def check_bounds(self, x: np.ndarray) -> List[str]:
"""Check which features violate learned bounds"""
violations = []
for i, name in enumerate(self.feature_names):
if x[i] < self.min_values[i] or x[i] > self.max_values[i]:
violations.append(name)
return violations
def detect(self, x: np.ndarray,
threshold: Optional[float] = None) -> Dict:
"""
Full anomaly detection.
Returns: anomaly score and flags
"""
# Mahalanobis distance
md = self.mahalanobis_distance(x)
# Default threshold: Chi-square 0.999 quantile
if threshold is None:
threshold = np.sqrt(len(x) * 3) # Approximate
# Bounds check
violations = self.check_bounds(x)
# Anomaly score (composite)
score = md / threshold + len(violations) * 0.5
return {
'is_anomaly': score > 1.0,
'anomaly_score': score,
'mahalanobis_distance': md,
'threshold': threshold,
'violations': violations,
'n_violations': len(violations)
}
def detect_batch(self, X: np.ndarray) -> pd.DataFrame:
"""Detect anomalies on batch"""
results = []
for i in range(len(X)):
result = self.detect(X[i])
result['index'] = i
results.append(result)
return pd.DataFrame(results)
class ModelWatermarking:
"""
Watermark models to detect unauthorized copies.
Technique: Embed secret "backdoor" inputs that produce known outputs.
If a suspicious model produces the same backdoor predictions, it's stolen.
Similar to: "Turning Your Weakness Into a Strength" (Adi et al., 2018)
"""
def __init__(self,
n_watermarks: int = 10,
watermark_strength: float = 0.05):
self.n_watermarks = n_watermarks
self.watermark_strength = watermark_strength
# Secret watermark data
self.watermark_inputs = []
self.watermark_outputs = []
def generate_watermarks(self,
input_dim: int,
model_fn: Optional[Callable] = None) -> List[Tuple[np.ndarray, float]]:
"""
Generate watermark (trigger, response) pairs.
Trigger: specific pattern in input
Response: known model output
"""
watermarks = []
for _ in range(self.n_watermarks):
# Random trigger with specific pattern
trigger = np.random.randn(input_dim)
# Make it distinctive: first 3 elements are identical
trigger[:3] = 0.999
if model_fn is not None:
response = model_fn(trigger)
else:
response = np.random.randn()
watermarks.append((trigger, response))
self.watermark_inputs = [w[0] for w in watermarks]
self.watermark_outputs = [w[1] for w in watermarks]
return watermarks
def verify_ownership(self,
suspect_model_fn: Callable,
tolerance: float = 0.1) -> Dict:
"""
Check if suspect model is a copy of watermarked model.
Returns: verification confidence
"""
if not self.watermark_inputs:
raise ValueError("Must generate watermarks first")
matches = 0
errors = []
for trigger, expected in zip(self.watermark_inputs, self.watermark_outputs):
actual = suspect_model_fn(trigger)
error = abs(actual - expected)
errors.append(error)
if error < tolerance:
matches += 1
match_rate = matches / len(self.watermark_inputs)
avg_error = np.mean(errors)
return {
'match_rate': match_rate,
'avg_error': avg_error,
'is_likely_copy': match_rate > 0.7, # 70% match threshold
'confidence': match_rate,
'n_watermarks': len(self.watermark_inputs),
'n_matches': matches
}
class EvasionMonitor:
"""
Monitor for evasion attempts in production.
Detects:
1. Sudden distribution shift (batch of similar adversarial inputs)
2. Query patterns consistent with model stealing
3. Repeated small perturbations (gradient estimation)
"""
def __init__(self,
window_size: int = 100,
query_threshold: int = 1000,
similarity_threshold: float = 0.95):
self.window_size = window_size
self.query_threshold = query_threshold
self.similarity_threshold = similarity_threshold
self.query_history = deque(maxlen=window_size)
self.query_sources = defaultdict(int)
self.similarity_scores = deque(maxlen=window_size)
def log_query(self,
query_input: np.ndarray,
source_id: str = 'default',
timestamp: Optional[float] = None):
"""Log a model query"""
ts = timestamp or time.time()
self.query_history.append({
'input': query_input.copy(),
'source': source_id,
'timestamp': ts
})
self.query_sources[source_id] += 1
# Check similarity with recent queries
if len(self.query_history) >= 2:
recent = self.query_history[-2]['input']
similarity = self._cosine_similarity(query_input, recent)
self.similarity_scores.append(similarity)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between two vectors"""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0 or norm_b == 0:
return 0.0
return np.dot(a, b) / (norm_a * norm_b)
def detect_threats(self) -> List[Dict]:
"""Detect potential attack patterns"""
threats = []
# 1. Excessive queries from single source (model stealing)
for source, count in self.query_sources.items():
if count > self.query_threshold:
threats.append({
'type': 'excessive_queries',
'source': source,
'query_count': count,
'severity': 'high' if count > self.query_threshold * 2 else 'medium'
})
# 2. Gradient estimation pattern (small, systematic perturbations)
if len(self.similarity_scores) >= 10:
recent_similarities = list(self.similarity_scores)[-10:]
avg_sim = np.mean(recent_similarities)
if avg_sim > self.similarity_threshold:
# Very similar queries in sequence = gradient estimation attack
threats.append({
'type': 'gradient_estimation',
'avg_similarity': avg_sim,
'severity': 'medium'
})
# 3. Distribution shift in recent queries
if len(self.query_history) >= 20:
recent_inputs = np.array([q['input'] for q in list(self.query_history)[-20:]])
older_inputs = np.array([q['input'] for q in list(self.query_history)[:20]])
recent_mean = np.mean(recent_inputs, axis=0)
older_mean = np.mean(older_inputs, axis=0)
shift = np.linalg.norm(recent_mean - older_mean)
if shift > 2.0: # Threshold depends on data scale
threats.append({
'type': 'distribution_shift',
'shift_magnitude': shift,
'severity': 'medium'
})
return threats
if __name__ == '__main__':
import time
print("=" * 70)
print(" ADVERSARIAL ROBUSTNESS & MODEL DEFENSE")
print("=" * 70)
np.random.seed(42)
# Simple model to attack
weights = np.array([0.5, -0.3, 0.8, -0.2, 0.1])
def simple_model(x):
return np.dot(x, weights)
# Generate test data
n_samples = 100
X_test = np.random.randn(n_samples, 5)
y_test = np.array([simple_model(x) for x in X_test])
print("\n1. ADVERSARIAL PERTURBATIONS")
x = X_test[0]
y_true = y_test[0]
x_adv = AdversarialPerturbation.fgsm(simple_model, x, y_true, epsilon=0.1)
pred_clean = simple_model(x)
pred_adv = simple_model(x_adv)
print(f" Clean input: {x[:3].round(3)}...")
print(f" Clean prediction: {pred_clean:.4f}")
print(f" True value: {y_true:.4f}")
print(f" Adversarial pred: {pred_adv:.4f}")
print(f" Perturbation: {np.linalg.norm(x_adv - x):.4f}")
# 2. Robustness evaluation
print("\n2. ROBUSTNESS EVALUATION")
adv_training = AdversarialTraining(epsilon=0.01, alpha=0.5)
robustness = adv_training.evaluate_robustness(
simple_model, X_test[:20], y_test[:20]
)
print(robustness.to_string(index=False))
# 3. Anomaly detection
print("\n3. ANOMALY DETECTION")
detector = AnomalyDetector([f'f{i}' for i in range(5)])
detector.fit(X_test)
# Normal input
normal = X_test[0]
result_normal = detector.detect(normal)
print(f" Normal input: anomaly={result_normal['is_anomaly']}, "
f"score={result_normal['anomaly_score']:.3f}")
# Anomalous input
anomalous = np.array([100.0, 0, 0, 0, 0])
result_anom = detector.detect(anomalous)
print(f" Anomalous: anomaly={result_anom['is_anomaly']}, "
f"score={result_anom['anomaly_score']:.3f}, "
f"violations={result_anom['violations']}")
# 4. Model watermarking
print("\n4. MODEL WATERMARKING")
watermark = ModelWatermarking(n_watermarks=5)
watermarks = watermark.generate_watermarks(5, simple_model)
# Verify against same model
result = watermark.verify_ownership(simple_model, tolerance=0.5)
print(f" Match rate: {result['match_rate']*100:.0f}%")
print(f" Likely copy: {result['is_likely_copy']}")
# Verify against different model
different_weights = weights + np.random.randn(5) * 0.1
def different_model(x):
return np.dot(x, different_weights)
result2 = watermark.verify_ownership(different_model, tolerance=0.5)
print(f" Different model match rate: {result2['match_rate']*100:.0f}%")
print(f" Different model likely copy: {result2['is_likely_copy']}")
# 5. Evasion monitoring
print("\n5. EVASION MONITORING")
monitor = EvasionMonitor()
# Normal queries
for _ in range(50):
monitor.log_query(np.random.randn(5))
# Simulated gradient estimation attack
base = np.random.randn(5)
for i in range(20):
perturbed = base + np.random.randn(5) * 0.001
monitor.log_query(perturbed)
threats = monitor.detect_threats()
print(f" Queries logged: {len(monitor.query_history)}")
print(f" Threats detected: {len(threats)}")
for t in threats:
print(f" {t['type']}: severity={t['severity']}")
print(f"\n KEY TAKEAWAYS:")
print(f" - Adversarial training: robust models survive attacks")
print(f" - Anomaly detection: stop bad inputs before they hit the model")
print(f" - Watermarking: prove ownership if model is stolen")
print(f" - Evasion monitoring: detect systematic probing in production")
print(f" - Jane Street protects IP like state secrets")