| | """ |
| | Carlini & Wagner (C&W) L2 Attack |
| | Enterprise implementation with full error handling and optimization |
| | Reference: Carlini & Wagner, "Towards Evaluating the Robustness of Neural Networks" (2017) |
| | """ |
| |
|
| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | import numpy as np |
| | from typing import Optional, Dict, Any, Tuple |
| | import time |
| |
|
| |
|
| | class CarliniWagnerL2: |
| | """ |
| | Carlini & Wagner L2 Attack - Enterprise Implementation |
| | |
| | Features: |
| | - CPU-optimized with early stopping |
| | - Multiple search methods for optimal c parameter |
| | - Confidence thresholding |
| | - Comprehensive logging and metrics |
| | """ |
| | |
| | def __init__(self, model: nn.Module, config: Optional[Dict[str, Any]] = None): |
| | """ |
| | Initialize C&W attack |
| | |
| | Args: |
| | model: PyTorch model to attack |
| | config: Attack configuration dictionary |
| | """ |
| | self.model = model |
| | self.config = config or {} |
| | |
| | |
| | self.confidence = self.config.get('confidence', 0.0) |
| | self.max_iterations = self.config.get('max_iterations', 100) |
| | self.learning_rate = self.config.get('learning_rate', 0.01) |
| | self.binary_search_steps = self.config.get('binary_search_steps', 9) |
| | self.initial_const = self.config.get('initial_const', 1e-3) |
| | self.abort_early = self.config.get('abort_early', True) |
| | self.device = self.config.get('device', 'cpu') |
| | |
| | |
| | self.box_min = self.config.get('box_min', 0.0) |
| | self.box_max = self.config.get('box_max', 1.0) |
| | |
| | self.model.eval() |
| | self.model.to(self.device) |
| | |
| | def _tanh_space(self, x: torch.Tensor, boxmin: float, boxmax: float) -> torch.Tensor: |
| | """Transform to tanh space to handle box constraints""" |
| | return torch.tanh(x) * (boxmax - boxmin) / 2 + (boxmax + boxmin) / 2 |
| | |
| | def _inverse_tanh_space(self, x: torch.Tensor, boxmin: float, boxmax: float) -> torch.Tensor: |
| | """Inverse transform from tanh space""" |
| | return torch.atanh((2 * (x - boxmin) / (boxmax - boxmin) - 1).clamp(-1 + 1e-7, 1 - 1e-7)) |
| | |
| | def _compute_loss(self, |
| | adv_images: torch.Tensor, |
| | images: torch.Tensor, |
| | labels: torch.Tensor, |
| | const: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: |
| | """ |
| | Compute C&W loss components |
| | |
| | Returns: |
| | total_loss, distance_loss, classification_loss |
| | """ |
| | |
| | l2_dist = torch.norm((adv_images - images).view(images.size(0), -1), p=2, dim=1) |
| | distance_loss = l2_dist.sum() |
| | |
| | |
| | logits = self.model(adv_images) |
| | |
| | |
| | correct_logits = logits.gather(1, labels.unsqueeze(1)).squeeze() |
| | |
| | |
| | mask = torch.ones_like(logits).scatter_(1, labels.unsqueeze(1), 0) |
| | other_logits = torch.max(logits * mask, dim=1)[0] |
| | |
| | |
| | classification_loss = torch.clamp(other_logits - correct_logits + self.confidence, min=0.0) |
| | classification_loss = (const * classification_loss).sum() |
| | |
| | total_loss = distance_loss + classification_loss |
| | |
| | return total_loss, distance_loss, classification_loss |
| | |
| | def _optimize_single(self, |
| | images: torch.Tensor, |
| | labels: torch.Tensor, |
| | const: float, |
| | early_stop_threshold: float = 1e-4) -> Tuple[torch.Tensor, float, bool]: |
| | """ |
| | Single optimization run for given constant |
| | |
| | Returns: |
| | adversarial_images, best_l2, attack_successful |
| | """ |
| | batch_size = images.size(0) |
| | |
| | |
| | w = self._inverse_tanh_space(images, self.box_min, self.box_max).detach() |
| | w.requires_grad = True |
| | |
| | |
| | optimizer = torch.optim.Adam([w], lr=self.learning_rate) |
| | |
| | |
| | prev_loss = float('inf') |
| | best_l2 = float('inf') |
| | best_adv = images.clone() |
| | const_tensor = torch.full((batch_size,), const, device=self.device) |
| | |
| | attack_successful = False |
| | |
| | for iteration in range(self.max_iterations): |
| | |
| | adv_images = self._tanh_space(w, self.box_min, self.box_max) |
| | |
| | |
| | total_loss, distance_loss, classification_loss = self._compute_loss( |
| | adv_images, images, labels, const_tensor |
| | ) |
| | |
| | |
| | with torch.no_grad(): |
| | preds = self.model(adv_images).argmax(dim=1) |
| | success_mask = (preds != labels) |
| | current_l2 = torch.norm((adv_images - images).view(batch_size, -1), p=2, dim=1) |
| | |
| | |
| | for i in range(batch_size): |
| | if success_mask[i] and current_l2[i] < best_l2: |
| | best_l2 = current_l2[i].item() |
| | best_adv[i] = adv_images[i] |
| | attack_successful = True |
| | |
| | |
| | optimizer.zero_grad() |
| | total_loss.backward() |
| | optimizer.step() |
| | |
| | |
| | if self.abort_early and iteration % 10 == 0: |
| | if total_loss.item() > prev_loss * 0.9999: |
| | break |
| | prev_loss = total_loss.item() |
| | |
| | return best_adv, best_l2, attack_successful |
| | |
| | def generate(self, |
| | images: torch.Tensor, |
| | labels: torch.Tensor, |
| | targeted: bool = False, |
| | target_labels: Optional[torch.Tensor] = None) -> torch.Tensor: |
| | """ |
| | Generate adversarial examples using C&W attack |
| | |
| | Args: |
| | images: Clean images [batch, channels, height, width] |
| | labels: True labels for non-targeted attack |
| | targeted: Whether to perform targeted attack |
| | target_labels: Target labels for targeted attack |
| | |
| | Returns: |
| | Adversarial images |
| | """ |
| | if targeted and target_labels is None: |
| | raise ValueError("target_labels required for targeted attack") |
| | |
| | images = images.clone().detach().to(self.device) |
| | labels = labels.clone().detach().to(self.device) |
| | |
| | if targeted: |
| | labels = target_labels.clone().detach().to(self.device) |
| | |
| | batch_size = images.size(0) |
| | |
| | |
| | const_lower_bound = torch.zeros(batch_size, device=self.device) |
| | const_upper_bound = torch.ones(batch_size, device=self.device) * 1e10 |
| | const = torch.ones(batch_size, device=self.device) * self.initial_const |
| | |
| | |
| | best_l2 = torch.ones(batch_size, device=self.device) * float('inf') |
| | best_adv = images.clone() |
| | |
| | for binary_step in range(self.binary_search_steps): |
| | print(f" Binary search step {binary_step + 1}/{self.binary_search_steps}") |
| | |
| | |
| | for i in range(batch_size): |
| | const_i = const[i].item() |
| | adv_i, l2_i, success_i = self._optimize_single( |
| | images[i:i+1], labels[i:i+1], const_i |
| | ) |
| | |
| | if success_i: |
| | |
| | const_upper_bound[i] = min(const_upper_bound[i], const_i) |
| | if const_upper_bound[i] < 1e9: |
| | const[i] = (const_lower_bound[i] + const_upper_bound[i]) / 2 |
| | |
| | |
| | if l2_i < best_l2[i]: |
| | best_l2[i] = l2_i |
| | best_adv[i] = adv_i |
| | else: |
| | |
| | const_lower_bound[i] = max(const_lower_bound[i], const_i) |
| | if const_upper_bound[i] < 1e9: |
| | const[i] = (const_lower_bound[i] + const_upper_bound[i]) / 2 |
| | else: |
| | const[i] = const[i] * 10 |
| | |
| | return best_adv |
| | |
| | def attack_success_rate(self, |
| | images: torch.Tensor, |
| | labels: torch.Tensor, |
| | adversarial_images: torch.Tensor) -> Dict[str, float]: |
| | """ |
| | Calculate attack success metrics |
| | |
| | Args: |
| | images: Original images |
| | labels: True labels |
| | adversarial_images: Generated adversarial images |
| | |
| | Returns: |
| | Dictionary of metrics |
| | """ |
| | images = images.to(self.device) |
| | labels = labels.to(self.device) |
| | adversarial_images = adversarial_images.to(self.device) |
| | |
| | with torch.no_grad(): |
| | |
| | orig_outputs = self.model(images) |
| | orig_preds = orig_outputs.argmax(dim=1) |
| | orig_accuracy = (orig_preds == labels).float().mean().item() |
| | |
| | |
| | adv_outputs = self.model(adversarial_images) |
| | adv_preds = adv_outputs.argmax(dim=1) |
| | success_rate = (adv_preds != labels).float().mean().item() |
| | |
| | |
| | perturbation = adversarial_images - images |
| | l2_norm = torch.norm(perturbation.view(perturbation.size(0), -1), p=2, dim=1) |
| | linf_norm = torch.norm(perturbation.view(perturbation.size(0), -1), p=float('inf'), dim=1) |
| | |
| | |
| | orig_probs = F.softmax(orig_outputs, dim=1) |
| | adv_probs = F.softmax(adv_outputs, dim=1) |
| | orig_confidence = orig_probs.max(dim=1)[0].mean().item() |
| | adv_confidence = adv_probs.max(dim=1)[0].mean().item() |
| | |
| | |
| | success_mask = (adv_preds != labels) |
| | if success_mask.any(): |
| | successful_l2 = l2_norm[success_mask].mean().item() |
| | successful_linf = linf_norm[success_mask].mean().item() |
| | else: |
| | successful_l2 = 0.0 |
| | successful_linf = 0.0 |
| | |
| | return { |
| | 'original_accuracy': orig_accuracy * 100, |
| | 'attack_success_rate': success_rate * 100, |
| | 'avg_l2_perturbation': l2_norm.mean().item(), |
| | 'avg_linf_perturbation': linf_norm.mean().item(), |
| | 'successful_l2_perturbation': successful_l2, |
| | 'successful_linf_perturbation': successful_linf, |
| | 'original_confidence': orig_confidence, |
| | 'adversarial_confidence': adv_confidence, |
| | 'confidence_threshold': self.confidence |
| | } |
| | |
| | def __call__(self, images: torch.Tensor, labels: torch.Tensor, **kwargs) -> torch.Tensor: |
| | """Callable interface""" |
| | return self.generate(images, labels, **kwargs) |
| |
|
| |
|
| | class FastCarliniWagnerL2: |
| | """ |
| | Faster C&W implementation for CPU - Uses fixed const and fewer iterations |
| | Suitable for larger batches and quicker evaluations |
| | """ |
| | |
| | def __init__(self, model: nn.Module, config: Optional[Dict[str, Any]] = None): |
| | self.model = model |
| | self.config = config or {} |
| | |
| | self.const = self.config.get('const', 1.0) |
| | self.iterations = self.config.get('iterations', 50) |
| | self.learning_rate = self.config.get('learning_rate', 0.01) |
| | self.device = self.config.get('device', 'cpu') |
| | |
| | self.model.eval() |
| | self.model.to(self.device) |
| | |
| | def generate(self, images: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: |
| | """Fast C&W generation with fixed const""" |
| | images = images.clone().detach().to(self.device) |
| | labels = labels.clone().detach().to(self.device) |
| | |
| | batch_size = images.size(0) |
| | |
| | |
| | w = torch.zeros_like(images, requires_grad=True) |
| | w.data = torch.atanh((2 * (images - 0.5) / 1).clamp(-1 + 1e-7, 1 - 1e-7)) |
| | |
| | optimizer = torch.optim.Adam([w], lr=self.learning_rate) |
| | |
| | for iteration in range(self.iterations): |
| | adv_images = torch.tanh(w) * 0.5 + 0.5 |
| | |
| | |
| | l2_dist = torch.norm((adv_images - images).view(batch_size, -1), p=2, dim=1) |
| | |
| | |
| | logits = self.model(adv_images) |
| | correct_logits = logits.gather(1, labels.unsqueeze(1)).squeeze() |
| | mask = torch.ones_like(logits).scatter_(1, labels.unsqueeze(1), 0) |
| | other_logits = torch.max(logits * mask, dim=1)[0] |
| | |
| | classification_loss = torch.clamp(other_logits - correct_logits, min=0.0) |
| | |
| | |
| | loss = torch.mean(self.const * classification_loss + l2_dist) |
| | |
| | optimizer.zero_grad() |
| | loss.backward() |
| | optimizer.step() |
| | |
| | return torch.tanh(w) * 0.5 + 0.5 |
| |
|
| |
|
| | |
| | def create_cw_attack(model: nn.Module, const: float = 1e-3, **kwargs) -> CarliniWagnerL2: |
| | """Factory function for creating C&W attack""" |
| | config = {'initial_const': const, **kwargs} |
| | return CarliniWagnerL2(model, config) |
| |
|
| | def create_fast_cw_attack(model: nn.Module, const: float = 1.0, **kwargs) -> FastCarliniWagnerL2: |
| | """Factory function for creating fast C&W attack""" |
| | config = {'const': const, **kwargs} |
| | return FastCarliniWagnerL2(model, config) |
| |
|