Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import math | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| _VGG_CFGS = { | |
| "vgg11": [64, "M", 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"], | |
| "vgg13": [64, 64, "M", 128, 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"], | |
| "vgg16": [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512, "M"], | |
| "vgg19": [ | |
| 64, | |
| 64, | |
| "M", | |
| 128, | |
| 128, | |
| "M", | |
| 256, | |
| 256, | |
| 256, | |
| 256, | |
| "M", | |
| 512, | |
| 512, | |
| 512, | |
| 512, | |
| "M", | |
| 512, | |
| 512, | |
| 512, | |
| 512, | |
| "M", | |
| ], | |
| } | |
| class VGGFeatures(nn.Module): | |
| def __init__(self, cfg: list[int | str], batch_norm: bool = False, init_weights: bool = True): | |
| super().__init__() | |
| self.batch_norm = batch_norm | |
| self.kernel_sizes: list[int] = [] | |
| self.strides: list[int] = [] | |
| self.paddings: list[int] = [] | |
| self.features = self._make_layers(cfg, batch_norm) | |
| if init_weights: | |
| self._initialize_weights() | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| return self.features(x) | |
| def _make_layers(self, cfg: list[int | str], batch_norm: bool) -> nn.Sequential: | |
| layers: list[nn.Module] = [] | |
| in_channels = 3 | |
| self.n_layers = 0 | |
| for item in cfg: | |
| if item == "M": | |
| layers.append(nn.MaxPool2d(kernel_size=2, stride=2)) | |
| self.kernel_sizes.append(2) | |
| self.strides.append(2) | |
| self.paddings.append(0) | |
| continue | |
| conv2d = nn.Conv2d(in_channels, item, kernel_size=3, padding=1) | |
| if batch_norm: | |
| layers.extend([conv2d, nn.BatchNorm2d(item), nn.ReLU(inplace=True)]) | |
| else: | |
| layers.extend([conv2d, nn.ReLU(inplace=True)]) | |
| self.n_layers += 1 | |
| self.kernel_sizes.append(3) | |
| self.strides.append(1) | |
| self.paddings.append(1) | |
| in_channels = item | |
| return nn.Sequential(*layers) | |
| def _initialize_weights(self) -> None: | |
| for module in self.modules(): | |
| if isinstance(module, nn.Conv2d): | |
| nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu") | |
| if module.bias is not None: | |
| nn.init.constant_(module.bias, 0) | |
| elif isinstance(module, nn.BatchNorm2d): | |
| nn.init.constant_(module.weight, 1) | |
| nn.init.constant_(module.bias, 0) | |
| elif isinstance(module, nn.Linear): | |
| nn.init.normal_(module.weight, 0, 0.01) | |
| nn.init.constant_(module.bias, 0) | |
| def conv_info(self) -> tuple[list[int], list[int], list[int]]: | |
| return self.kernel_sizes, self.strides, self.paddings | |
| def __repr__(self) -> str: | |
| return f"VGG{self.n_layers + 3}, batch_norm={self.batch_norm}" | |
| def build_vgg_features(name: str) -> VGGFeatures: | |
| if name not in _VGG_CFGS: | |
| raise ValueError(f"Unsupported VGG architecture: {name}") | |
| return VGGFeatures(_VGG_CFGS[name], batch_norm=name.endswith("_bn")) | |
| def compute_layer_rf_info( | |
| layer_filter_size: int, | |
| layer_stride: int, | |
| layer_padding: int | str, | |
| previous_layer_rf_info: list[float], | |
| ) -> list[float]: | |
| n_in, j_in, r_in, start_in = previous_layer_rf_info | |
| if layer_padding == "SAME": | |
| n_out = math.ceil(float(n_in) / float(layer_stride)) | |
| if n_in % layer_stride == 0: | |
| pad = max(layer_filter_size - layer_stride, 0) | |
| else: | |
| pad = max(layer_filter_size - (n_in % layer_stride), 0) | |
| elif layer_padding == "VALID": | |
| n_out = math.ceil(float(n_in - layer_filter_size + 1) / float(layer_stride)) | |
| pad = 0 | |
| else: | |
| pad = layer_padding * 2 | |
| n_out = math.floor((n_in - layer_filter_size + pad) / layer_stride) + 1 | |
| pad_left = math.floor(pad / 2) | |
| j_out = j_in * layer_stride | |
| r_out = r_in + (layer_filter_size - 1) * j_in | |
| start_out = start_in + ((layer_filter_size - 1) / 2 - pad_left) * j_in | |
| return [n_out, j_out, r_out, start_out] | |
| def compute_proto_layer_rf_info_v2( | |
| img_size: int, | |
| layer_filter_sizes: list[int], | |
| layer_strides: list[int], | |
| layer_paddings: list[int], | |
| prototype_kernel_size: int, | |
| ) -> list[float]: | |
| if not ( | |
| len(layer_filter_sizes) == len(layer_strides) == len(layer_paddings) | |
| ): | |
| raise ValueError("Layer metadata length mismatch.") | |
| rf_info: list[float] = [img_size, 1, 1, 0.5] | |
| for filter_size, stride_size, padding_size in zip( | |
| layer_filter_sizes, layer_strides, layer_paddings, strict=True | |
| ): | |
| rf_info = compute_layer_rf_info( | |
| layer_filter_size=int(filter_size), | |
| layer_stride=stride_size, | |
| layer_padding=padding_size, | |
| previous_layer_rf_info=rf_info, | |
| ) | |
| return compute_layer_rf_info( | |
| layer_filter_size=prototype_kernel_size, | |
| layer_stride=1, | |
| layer_padding="VALID", | |
| previous_layer_rf_info=rf_info, | |
| ) | |
| class PPNet(nn.Module): | |
| def __init__( | |
| self, | |
| features: nn.Module, | |
| img_size: int, | |
| prototype_shape: tuple[int, int, int, int], | |
| proto_layer_rf_info: list[float], | |
| num_classes: int, | |
| init_weights: bool = True, | |
| prototype_activation_function: str = "log", | |
| add_on_layers_type: str = "bottleneck", | |
| ): | |
| super().__init__() | |
| self.img_size = img_size | |
| self.prototype_shape = prototype_shape | |
| self.num_prototypes = prototype_shape[0] | |
| self.num_classes = num_classes | |
| self.epsilon = 1e-4 | |
| self.prototype_activation_function = prototype_activation_function | |
| self.proto_layer_rf_info = proto_layer_rf_info | |
| self.features = features | |
| if self.num_prototypes % self.num_classes != 0: | |
| raise ValueError("Number of prototypes must be divisible by num_classes.") | |
| self.prototype_class_identity = torch.zeros(self.num_prototypes, self.num_classes) | |
| num_prototypes_per_class = self.num_prototypes // self.num_classes | |
| for idx in range(self.num_prototypes): | |
| self.prototype_class_identity[idx, idx // num_prototypes_per_class] = 1 | |
| features_name = str(self.features).upper() | |
| if features_name.startswith("VGG") or features_name.startswith("RES"): | |
| in_channels = [m for m in features.modules() if isinstance(m, nn.Conv2d)][-1].out_channels | |
| elif features_name.startswith("DENSE"): | |
| in_channels = [m for m in features.modules() if isinstance(m, nn.BatchNorm2d)][-1].num_features | |
| else: | |
| raise ValueError("Unsupported base architecture.") | |
| if add_on_layers_type == "bottleneck": | |
| add_on_layers: list[nn.Module] = [] | |
| current_in_channels = in_channels | |
| while current_in_channels > self.prototype_shape[1] or not add_on_layers: | |
| current_out_channels = max(self.prototype_shape[1], current_in_channels // 2) | |
| add_on_layers.append( | |
| nn.Conv2d(current_in_channels, current_out_channels, kernel_size=1) | |
| ) | |
| add_on_layers.append(nn.ReLU()) | |
| add_on_layers.append( | |
| nn.Conv2d(current_out_channels, current_out_channels, kernel_size=1) | |
| ) | |
| if current_out_channels > self.prototype_shape[1]: | |
| add_on_layers.append(nn.ReLU()) | |
| else: | |
| add_on_layers.append(nn.Sigmoid()) | |
| current_in_channels //= 2 | |
| self.add_on_layers = nn.Sequential(*add_on_layers) | |
| else: | |
| self.add_on_layers = nn.Sequential( | |
| nn.Conv2d(in_channels, self.prototype_shape[1], kernel_size=1), | |
| nn.ReLU(), | |
| nn.Conv2d(self.prototype_shape[1], self.prototype_shape[1], kernel_size=1), | |
| nn.Sigmoid(), | |
| ) | |
| self.prototype_vectors = nn.Parameter(torch.rand(self.prototype_shape), requires_grad=True) | |
| self.ones = nn.Parameter(torch.ones(self.prototype_shape), requires_grad=False) | |
| self.last_layer = nn.Linear(self.num_prototypes, self.num_classes, bias=False) | |
| if init_weights: | |
| self._initialize_weights() | |
| def conv_features(self, x: torch.Tensor) -> torch.Tensor: | |
| return self.add_on_layers(self.features(x)) | |
| def _l2_convolution(self, x: torch.Tensor) -> torch.Tensor: | |
| x2_patch_sum = F.conv2d(input=x**2, weight=self.ones) | |
| p2 = torch.sum(self.prototype_vectors**2, dim=(1, 2, 3)).view(-1, 1, 1) | |
| xp = F.conv2d(input=x, weight=self.prototype_vectors) | |
| distances = F.relu(x2_patch_sum - 2 * xp + p2) | |
| return distances | |
| def prototype_distances(self, x: torch.Tensor) -> torch.Tensor: | |
| return self._l2_convolution(self.conv_features(x)) | |
| def distance_2_similarity(self, distances: torch.Tensor) -> torch.Tensor: | |
| if self.prototype_activation_function == "log": | |
| return torch.log((distances + 1) / (distances + self.epsilon)) | |
| if self.prototype_activation_function == "linear": | |
| return -distances | |
| raise ValueError( | |
| f"Unsupported prototype activation function: {self.prototype_activation_function}" | |
| ) | |
| def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: | |
| distances = self.prototype_distances(x) | |
| min_distances = -F.max_pool2d( | |
| -distances, kernel_size=(distances.size(2), distances.size(3)) | |
| ) | |
| min_distances = min_distances.view(-1, self.num_prototypes) | |
| prototype_activations = self.distance_2_similarity(min_distances) | |
| logits = self.last_layer(prototype_activations) | |
| return logits, min_distances | |
| def set_last_layer_incorrect_connection(self, incorrect_strength: float) -> None: | |
| positive_locs = torch.t(self.prototype_class_identity) | |
| negative_locs = 1 - positive_locs | |
| self.last_layer.weight.data.copy_(positive_locs + incorrect_strength * negative_locs) | |
| def _initialize_weights(self) -> None: | |
| for module in self.add_on_layers.modules(): | |
| if isinstance(module, nn.Conv2d): | |
| nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu") | |
| if module.bias is not None: | |
| nn.init.constant_(module.bias, 0) | |
| elif isinstance(module, nn.BatchNorm2d): | |
| nn.init.constant_(module.weight, 1) | |
| nn.init.constant_(module.bias, 0) | |
| self.set_last_layer_incorrect_connection(incorrect_strength=-0.5) | |
| def build_ppnet( | |
| *, | |
| base_architecture: str, | |
| img_size: int, | |
| prototype_shape: tuple[int, int, int, int], | |
| num_classes: int, | |
| prototype_activation_function: str, | |
| add_on_layers_type: str, | |
| ) -> PPNet: | |
| features = build_vgg_features(base_architecture) | |
| layer_filter_sizes, layer_strides, layer_paddings = features.conv_info() | |
| proto_layer_rf_info = compute_proto_layer_rf_info_v2( | |
| img_size=img_size, | |
| layer_filter_sizes=layer_filter_sizes, | |
| layer_strides=layer_strides, | |
| layer_paddings=layer_paddings, | |
| prototype_kernel_size=prototype_shape[2], | |
| ) | |
| return PPNet( | |
| features=features, | |
| img_size=img_size, | |
| prototype_shape=prototype_shape, | |
| proto_layer_rf_info=proto_layer_rf_info, | |
| num_classes=num_classes, | |
| init_weights=True, | |
| prototype_activation_function=prototype_activation_function, | |
| add_on_layers_type=add_on_layers_type, | |
| ) | |