from __future__ import annotations import math import torch import torch.nn as nn import torch.nn.functional as F _VGG_CFGS = { "vgg11": [64, "M", 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"], "vgg13": [64, 64, "M", 128, 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"], "vgg16": [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512, "M"], "vgg19": [ 64, 64, "M", 128, 128, "M", 256, 256, 256, 256, "M", 512, 512, 512, 512, "M", 512, 512, 512, 512, "M", ], } class VGGFeatures(nn.Module): def __init__(self, cfg: list[int | str], batch_norm: bool = False, init_weights: bool = True): super().__init__() self.batch_norm = batch_norm self.kernel_sizes: list[int] = [] self.strides: list[int] = [] self.paddings: list[int] = [] self.features = self._make_layers(cfg, batch_norm) if init_weights: self._initialize_weights() def forward(self, x: torch.Tensor) -> torch.Tensor: return self.features(x) def _make_layers(self, cfg: list[int | str], batch_norm: bool) -> nn.Sequential: layers: list[nn.Module] = [] in_channels = 3 self.n_layers = 0 for item in cfg: if item == "M": layers.append(nn.MaxPool2d(kernel_size=2, stride=2)) self.kernel_sizes.append(2) self.strides.append(2) self.paddings.append(0) continue conv2d = nn.Conv2d(in_channels, item, kernel_size=3, padding=1) if batch_norm: layers.extend([conv2d, nn.BatchNorm2d(item), nn.ReLU(inplace=True)]) else: layers.extend([conv2d, nn.ReLU(inplace=True)]) self.n_layers += 1 self.kernel_sizes.append(3) self.strides.append(1) self.paddings.append(1) in_channels = item return nn.Sequential(*layers) def _initialize_weights(self) -> None: for module in self.modules(): if isinstance(module, nn.Conv2d): nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu") if module.bias is not None: nn.init.constant_(module.bias, 0) elif isinstance(module, nn.BatchNorm2d): nn.init.constant_(module.weight, 1) nn.init.constant_(module.bias, 0) elif isinstance(module, nn.Linear): nn.init.normal_(module.weight, 0, 0.01) nn.init.constant_(module.bias, 0) def conv_info(self) -> tuple[list[int], list[int], list[int]]: return self.kernel_sizes, self.strides, self.paddings def __repr__(self) -> str: return f"VGG{self.n_layers + 3}, batch_norm={self.batch_norm}" def build_vgg_features(name: str) -> VGGFeatures: if name not in _VGG_CFGS: raise ValueError(f"Unsupported VGG architecture: {name}") return VGGFeatures(_VGG_CFGS[name], batch_norm=name.endswith("_bn")) def compute_layer_rf_info( layer_filter_size: int, layer_stride: int, layer_padding: int | str, previous_layer_rf_info: list[float], ) -> list[float]: n_in, j_in, r_in, start_in = previous_layer_rf_info if layer_padding == "SAME": n_out = math.ceil(float(n_in) / float(layer_stride)) if n_in % layer_stride == 0: pad = max(layer_filter_size - layer_stride, 0) else: pad = max(layer_filter_size - (n_in % layer_stride), 0) elif layer_padding == "VALID": n_out = math.ceil(float(n_in - layer_filter_size + 1) / float(layer_stride)) pad = 0 else: pad = layer_padding * 2 n_out = math.floor((n_in - layer_filter_size + pad) / layer_stride) + 1 pad_left = math.floor(pad / 2) j_out = j_in * layer_stride r_out = r_in + (layer_filter_size - 1) * j_in start_out = start_in + ((layer_filter_size - 1) / 2 - pad_left) * j_in return [n_out, j_out, r_out, start_out] def compute_proto_layer_rf_info_v2( img_size: int, layer_filter_sizes: list[int], layer_strides: list[int], layer_paddings: list[int], prototype_kernel_size: int, ) -> list[float]: if not ( len(layer_filter_sizes) == len(layer_strides) == len(layer_paddings) ): raise ValueError("Layer metadata length mismatch.") rf_info: list[float] = [img_size, 1, 1, 0.5] for filter_size, stride_size, padding_size in zip( layer_filter_sizes, layer_strides, layer_paddings, strict=True ): rf_info = compute_layer_rf_info( layer_filter_size=int(filter_size), layer_stride=stride_size, layer_padding=padding_size, previous_layer_rf_info=rf_info, ) return compute_layer_rf_info( layer_filter_size=prototype_kernel_size, layer_stride=1, layer_padding="VALID", previous_layer_rf_info=rf_info, ) class PPNet(nn.Module): def __init__( self, features: nn.Module, img_size: int, prototype_shape: tuple[int, int, int, int], proto_layer_rf_info: list[float], num_classes: int, init_weights: bool = True, prototype_activation_function: str = "log", add_on_layers_type: str = "bottleneck", ): super().__init__() self.img_size = img_size self.prototype_shape = prototype_shape self.num_prototypes = prototype_shape[0] self.num_classes = num_classes self.epsilon = 1e-4 self.prototype_activation_function = prototype_activation_function self.proto_layer_rf_info = proto_layer_rf_info self.features = features if self.num_prototypes % self.num_classes != 0: raise ValueError("Number of prototypes must be divisible by num_classes.") self.prototype_class_identity = torch.zeros(self.num_prototypes, self.num_classes) num_prototypes_per_class = self.num_prototypes // self.num_classes for idx in range(self.num_prototypes): self.prototype_class_identity[idx, idx // num_prototypes_per_class] = 1 features_name = str(self.features).upper() if features_name.startswith("VGG") or features_name.startswith("RES"): in_channels = [m for m in features.modules() if isinstance(m, nn.Conv2d)][-1].out_channels elif features_name.startswith("DENSE"): in_channels = [m for m in features.modules() if isinstance(m, nn.BatchNorm2d)][-1].num_features else: raise ValueError("Unsupported base architecture.") if add_on_layers_type == "bottleneck": add_on_layers: list[nn.Module] = [] current_in_channels = in_channels while current_in_channels > self.prototype_shape[1] or not add_on_layers: current_out_channels = max(self.prototype_shape[1], current_in_channels // 2) add_on_layers.append( nn.Conv2d(current_in_channels, current_out_channels, kernel_size=1) ) add_on_layers.append(nn.ReLU()) add_on_layers.append( nn.Conv2d(current_out_channels, current_out_channels, kernel_size=1) ) if current_out_channels > self.prototype_shape[1]: add_on_layers.append(nn.ReLU()) else: add_on_layers.append(nn.Sigmoid()) current_in_channels //= 2 self.add_on_layers = nn.Sequential(*add_on_layers) else: self.add_on_layers = nn.Sequential( nn.Conv2d(in_channels, self.prototype_shape[1], kernel_size=1), nn.ReLU(), nn.Conv2d(self.prototype_shape[1], self.prototype_shape[1], kernel_size=1), nn.Sigmoid(), ) self.prototype_vectors = nn.Parameter(torch.rand(self.prototype_shape), requires_grad=True) self.ones = nn.Parameter(torch.ones(self.prototype_shape), requires_grad=False) self.last_layer = nn.Linear(self.num_prototypes, self.num_classes, bias=False) if init_weights: self._initialize_weights() def conv_features(self, x: torch.Tensor) -> torch.Tensor: return self.add_on_layers(self.features(x)) def _l2_convolution(self, x: torch.Tensor) -> torch.Tensor: x2_patch_sum = F.conv2d(input=x**2, weight=self.ones) p2 = torch.sum(self.prototype_vectors**2, dim=(1, 2, 3)).view(-1, 1, 1) xp = F.conv2d(input=x, weight=self.prototype_vectors) distances = F.relu(x2_patch_sum - 2 * xp + p2) return distances def prototype_distances(self, x: torch.Tensor) -> torch.Tensor: return self._l2_convolution(self.conv_features(x)) def distance_2_similarity(self, distances: torch.Tensor) -> torch.Tensor: if self.prototype_activation_function == "log": return torch.log((distances + 1) / (distances + self.epsilon)) if self.prototype_activation_function == "linear": return -distances raise ValueError( f"Unsupported prototype activation function: {self.prototype_activation_function}" ) def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: distances = self.prototype_distances(x) min_distances = -F.max_pool2d( -distances, kernel_size=(distances.size(2), distances.size(3)) ) min_distances = min_distances.view(-1, self.num_prototypes) prototype_activations = self.distance_2_similarity(min_distances) logits = self.last_layer(prototype_activations) return logits, min_distances def set_last_layer_incorrect_connection(self, incorrect_strength: float) -> None: positive_locs = torch.t(self.prototype_class_identity) negative_locs = 1 - positive_locs self.last_layer.weight.data.copy_(positive_locs + incorrect_strength * negative_locs) def _initialize_weights(self) -> None: for module in self.add_on_layers.modules(): if isinstance(module, nn.Conv2d): nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu") if module.bias is not None: nn.init.constant_(module.bias, 0) elif isinstance(module, nn.BatchNorm2d): nn.init.constant_(module.weight, 1) nn.init.constant_(module.bias, 0) self.set_last_layer_incorrect_connection(incorrect_strength=-0.5) def build_ppnet( *, base_architecture: str, img_size: int, prototype_shape: tuple[int, int, int, int], num_classes: int, prototype_activation_function: str, add_on_layers_type: str, ) -> PPNet: features = build_vgg_features(base_architecture) layer_filter_sizes, layer_strides, layer_paddings = features.conv_info() proto_layer_rf_info = compute_proto_layer_rf_info_v2( img_size=img_size, layer_filter_sizes=layer_filter_sizes, layer_strides=layer_strides, layer_paddings=layer_paddings, prototype_kernel_size=prototype_shape[2], ) return PPNet( features=features, img_size=img_size, prototype_shape=prototype_shape, proto_layer_rf_info=proto_layer_rf_info, num_classes=num_classes, init_weights=True, prototype_activation_function=prototype_activation_function, add_on_layers_type=add_on_layers_type, )