SecureMLAPI / protopnet.py
yenslife's picture
feat: integrate ppnet inference backend
896740b
from __future__ import annotations
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
_VGG_CFGS = {
"vgg11": [64, "M", 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"],
"vgg13": [64, 64, "M", 128, 128, "M", 256, 256, "M", 512, 512, "M", 512, 512, "M"],
"vgg16": [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512, "M"],
"vgg19": [
64,
64,
"M",
128,
128,
"M",
256,
256,
256,
256,
"M",
512,
512,
512,
512,
"M",
512,
512,
512,
512,
"M",
],
}
class VGGFeatures(nn.Module):
def __init__(self, cfg: list[int | str], batch_norm: bool = False, init_weights: bool = True):
super().__init__()
self.batch_norm = batch_norm
self.kernel_sizes: list[int] = []
self.strides: list[int] = []
self.paddings: list[int] = []
self.features = self._make_layers(cfg, batch_norm)
if init_weights:
self._initialize_weights()
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.features(x)
def _make_layers(self, cfg: list[int | str], batch_norm: bool) -> nn.Sequential:
layers: list[nn.Module] = []
in_channels = 3
self.n_layers = 0
for item in cfg:
if item == "M":
layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
self.kernel_sizes.append(2)
self.strides.append(2)
self.paddings.append(0)
continue
conv2d = nn.Conv2d(in_channels, item, kernel_size=3, padding=1)
if batch_norm:
layers.extend([conv2d, nn.BatchNorm2d(item), nn.ReLU(inplace=True)])
else:
layers.extend([conv2d, nn.ReLU(inplace=True)])
self.n_layers += 1
self.kernel_sizes.append(3)
self.strides.append(1)
self.paddings.append(1)
in_channels = item
return nn.Sequential(*layers)
def _initialize_weights(self) -> None:
for module in self.modules():
if isinstance(module, nn.Conv2d):
nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu")
if module.bias is not None:
nn.init.constant_(module.bias, 0)
elif isinstance(module, nn.BatchNorm2d):
nn.init.constant_(module.weight, 1)
nn.init.constant_(module.bias, 0)
elif isinstance(module, nn.Linear):
nn.init.normal_(module.weight, 0, 0.01)
nn.init.constant_(module.bias, 0)
def conv_info(self) -> tuple[list[int], list[int], list[int]]:
return self.kernel_sizes, self.strides, self.paddings
def __repr__(self) -> str:
return f"VGG{self.n_layers + 3}, batch_norm={self.batch_norm}"
def build_vgg_features(name: str) -> VGGFeatures:
if name not in _VGG_CFGS:
raise ValueError(f"Unsupported VGG architecture: {name}")
return VGGFeatures(_VGG_CFGS[name], batch_norm=name.endswith("_bn"))
def compute_layer_rf_info(
layer_filter_size: int,
layer_stride: int,
layer_padding: int | str,
previous_layer_rf_info: list[float],
) -> list[float]:
n_in, j_in, r_in, start_in = previous_layer_rf_info
if layer_padding == "SAME":
n_out = math.ceil(float(n_in) / float(layer_stride))
if n_in % layer_stride == 0:
pad = max(layer_filter_size - layer_stride, 0)
else:
pad = max(layer_filter_size - (n_in % layer_stride), 0)
elif layer_padding == "VALID":
n_out = math.ceil(float(n_in - layer_filter_size + 1) / float(layer_stride))
pad = 0
else:
pad = layer_padding * 2
n_out = math.floor((n_in - layer_filter_size + pad) / layer_stride) + 1
pad_left = math.floor(pad / 2)
j_out = j_in * layer_stride
r_out = r_in + (layer_filter_size - 1) * j_in
start_out = start_in + ((layer_filter_size - 1) / 2 - pad_left) * j_in
return [n_out, j_out, r_out, start_out]
def compute_proto_layer_rf_info_v2(
img_size: int,
layer_filter_sizes: list[int],
layer_strides: list[int],
layer_paddings: list[int],
prototype_kernel_size: int,
) -> list[float]:
if not (
len(layer_filter_sizes) == len(layer_strides) == len(layer_paddings)
):
raise ValueError("Layer metadata length mismatch.")
rf_info: list[float] = [img_size, 1, 1, 0.5]
for filter_size, stride_size, padding_size in zip(
layer_filter_sizes, layer_strides, layer_paddings, strict=True
):
rf_info = compute_layer_rf_info(
layer_filter_size=int(filter_size),
layer_stride=stride_size,
layer_padding=padding_size,
previous_layer_rf_info=rf_info,
)
return compute_layer_rf_info(
layer_filter_size=prototype_kernel_size,
layer_stride=1,
layer_padding="VALID",
previous_layer_rf_info=rf_info,
)
class PPNet(nn.Module):
def __init__(
self,
features: nn.Module,
img_size: int,
prototype_shape: tuple[int, int, int, int],
proto_layer_rf_info: list[float],
num_classes: int,
init_weights: bool = True,
prototype_activation_function: str = "log",
add_on_layers_type: str = "bottleneck",
):
super().__init__()
self.img_size = img_size
self.prototype_shape = prototype_shape
self.num_prototypes = prototype_shape[0]
self.num_classes = num_classes
self.epsilon = 1e-4
self.prototype_activation_function = prototype_activation_function
self.proto_layer_rf_info = proto_layer_rf_info
self.features = features
if self.num_prototypes % self.num_classes != 0:
raise ValueError("Number of prototypes must be divisible by num_classes.")
self.prototype_class_identity = torch.zeros(self.num_prototypes, self.num_classes)
num_prototypes_per_class = self.num_prototypes // self.num_classes
for idx in range(self.num_prototypes):
self.prototype_class_identity[idx, idx // num_prototypes_per_class] = 1
features_name = str(self.features).upper()
if features_name.startswith("VGG") or features_name.startswith("RES"):
in_channels = [m for m in features.modules() if isinstance(m, nn.Conv2d)][-1].out_channels
elif features_name.startswith("DENSE"):
in_channels = [m for m in features.modules() if isinstance(m, nn.BatchNorm2d)][-1].num_features
else:
raise ValueError("Unsupported base architecture.")
if add_on_layers_type == "bottleneck":
add_on_layers: list[nn.Module] = []
current_in_channels = in_channels
while current_in_channels > self.prototype_shape[1] or not add_on_layers:
current_out_channels = max(self.prototype_shape[1], current_in_channels // 2)
add_on_layers.append(
nn.Conv2d(current_in_channels, current_out_channels, kernel_size=1)
)
add_on_layers.append(nn.ReLU())
add_on_layers.append(
nn.Conv2d(current_out_channels, current_out_channels, kernel_size=1)
)
if current_out_channels > self.prototype_shape[1]:
add_on_layers.append(nn.ReLU())
else:
add_on_layers.append(nn.Sigmoid())
current_in_channels //= 2
self.add_on_layers = nn.Sequential(*add_on_layers)
else:
self.add_on_layers = nn.Sequential(
nn.Conv2d(in_channels, self.prototype_shape[1], kernel_size=1),
nn.ReLU(),
nn.Conv2d(self.prototype_shape[1], self.prototype_shape[1], kernel_size=1),
nn.Sigmoid(),
)
self.prototype_vectors = nn.Parameter(torch.rand(self.prototype_shape), requires_grad=True)
self.ones = nn.Parameter(torch.ones(self.prototype_shape), requires_grad=False)
self.last_layer = nn.Linear(self.num_prototypes, self.num_classes, bias=False)
if init_weights:
self._initialize_weights()
def conv_features(self, x: torch.Tensor) -> torch.Tensor:
return self.add_on_layers(self.features(x))
def _l2_convolution(self, x: torch.Tensor) -> torch.Tensor:
x2_patch_sum = F.conv2d(input=x**2, weight=self.ones)
p2 = torch.sum(self.prototype_vectors**2, dim=(1, 2, 3)).view(-1, 1, 1)
xp = F.conv2d(input=x, weight=self.prototype_vectors)
distances = F.relu(x2_patch_sum - 2 * xp + p2)
return distances
def prototype_distances(self, x: torch.Tensor) -> torch.Tensor:
return self._l2_convolution(self.conv_features(x))
def distance_2_similarity(self, distances: torch.Tensor) -> torch.Tensor:
if self.prototype_activation_function == "log":
return torch.log((distances + 1) / (distances + self.epsilon))
if self.prototype_activation_function == "linear":
return -distances
raise ValueError(
f"Unsupported prototype activation function: {self.prototype_activation_function}"
)
def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
distances = self.prototype_distances(x)
min_distances = -F.max_pool2d(
-distances, kernel_size=(distances.size(2), distances.size(3))
)
min_distances = min_distances.view(-1, self.num_prototypes)
prototype_activations = self.distance_2_similarity(min_distances)
logits = self.last_layer(prototype_activations)
return logits, min_distances
def set_last_layer_incorrect_connection(self, incorrect_strength: float) -> None:
positive_locs = torch.t(self.prototype_class_identity)
negative_locs = 1 - positive_locs
self.last_layer.weight.data.copy_(positive_locs + incorrect_strength * negative_locs)
def _initialize_weights(self) -> None:
for module in self.add_on_layers.modules():
if isinstance(module, nn.Conv2d):
nn.init.kaiming_normal_(module.weight, mode="fan_out", nonlinearity="relu")
if module.bias is not None:
nn.init.constant_(module.bias, 0)
elif isinstance(module, nn.BatchNorm2d):
nn.init.constant_(module.weight, 1)
nn.init.constant_(module.bias, 0)
self.set_last_layer_incorrect_connection(incorrect_strength=-0.5)
def build_ppnet(
*,
base_architecture: str,
img_size: int,
prototype_shape: tuple[int, int, int, int],
num_classes: int,
prototype_activation_function: str,
add_on_layers_type: str,
) -> PPNet:
features = build_vgg_features(base_architecture)
layer_filter_sizes, layer_strides, layer_paddings = features.conv_info()
proto_layer_rf_info = compute_proto_layer_rf_info_v2(
img_size=img_size,
layer_filter_sizes=layer_filter_sizes,
layer_strides=layer_strides,
layer_paddings=layer_paddings,
prototype_kernel_size=prototype_shape[2],
)
return PPNet(
features=features,
img_size=img_size,
prototype_shape=prototype_shape,
proto_layer_rf_info=proto_layer_rf_info,
num_classes=num_classes,
init_weights=True,
prototype_activation_function=prototype_activation_function,
add_on_layers_type=add_on_layers_type,
)