Spaces:
Build error
Build error
| # Copyright (c) 2018-2022 Samet Akcay, Durham University, UK | |
| # SPDX-License-Identifier: MIT | |
| # | |
| # Copyright (C) 2020-2022 Intel Corporation | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| """Torch models defining encoder, decoder, Generator and Discriminator. | |
| Code adapted from https://github.com/samet-akcay/ganomaly. | |
| """ | |
| import math | |
| from typing import Tuple | |
| import torch | |
| from torch import Tensor, nn | |
| from anomalib.data.utils.image import pad_nextpow2 | |
| class Encoder(nn.Module): | |
| """Encoder Network. | |
| Args: | |
| input_size (Tuple[int, int]): Size of input image | |
| latent_vec_size (int): Size of latent vector z | |
| num_input_channels (int): Number of input channels in the image | |
| n_features (int): Number of features per convolution layer | |
| extra_layers (int): Number of extra layers since the network uses only a single encoder layer by default. | |
| Defaults to 0. | |
| """ | |
| def __init__( | |
| self, | |
| input_size: Tuple[int, int], | |
| latent_vec_size: int, | |
| num_input_channels: int, | |
| n_features: int, | |
| extra_layers: int = 0, | |
| add_final_conv_layer: bool = True, | |
| ): | |
| super().__init__() | |
| self.input_layers = nn.Sequential() | |
| self.input_layers.add_module( | |
| f"initial-conv-{num_input_channels}-{n_features}", | |
| nn.Conv2d(num_input_channels, n_features, kernel_size=4, stride=2, padding=4, bias=False), | |
| ) | |
| self.input_layers.add_module(f"initial-relu-{n_features}", nn.LeakyReLU(0.2, inplace=True)) | |
| # Extra Layers | |
| self.extra_layers = nn.Sequential() | |
| for layer in range(extra_layers): | |
| self.extra_layers.add_module( | |
| f"extra-layers-{layer}-{n_features}-conv", | |
| nn.Conv2d(n_features, n_features, kernel_size=3, stride=1, padding=1, bias=False), | |
| ) | |
| self.extra_layers.add_module(f"extra-layers-{layer}-{n_features}-batchnorm", nn.BatchNorm2d(n_features)) | |
| self.extra_layers.add_module(f"extra-layers-{layer}-{n_features}-relu", nn.LeakyReLU(0.2, inplace=True)) | |
| # Create pyramid features to reach latent vector | |
| self.pyramid_features = nn.Sequential() | |
| pyramid_dim = min(*input_size) // 2 # Use the smaller dimension to create pyramid. | |
| while pyramid_dim > 4: | |
| in_features = n_features | |
| out_features = n_features * 2 | |
| self.pyramid_features.add_module( | |
| f"pyramid-{in_features}-{out_features}-conv", | |
| nn.Conv2d(in_features, out_features, kernel_size=4, stride=2, padding=1, bias=False), | |
| ) | |
| self.pyramid_features.add_module(f"pyramid-{out_features}-batchnorm", nn.BatchNorm2d(out_features)) | |
| self.pyramid_features.add_module(f"pyramid-{out_features}-relu", nn.LeakyReLU(0.2, inplace=True)) | |
| n_features = out_features | |
| pyramid_dim = pyramid_dim // 2 | |
| # Final conv | |
| if add_final_conv_layer: | |
| self.final_conv_layer = nn.Conv2d( | |
| n_features, | |
| latent_vec_size, | |
| kernel_size=4, | |
| stride=1, | |
| padding=0, | |
| bias=False, | |
| ) | |
| def forward(self, input_tensor: Tensor): | |
| """Return latent vectors.""" | |
| output = self.input_layers(input_tensor) | |
| output = self.extra_layers(output) | |
| output = self.pyramid_features(output) | |
| if self.final_conv_layer is not None: | |
| output = self.final_conv_layer(output) | |
| return output | |
| class Decoder(nn.Module): | |
| """Decoder Network. | |
| Args: | |
| input_size (Tuple[int, int]): Size of input image | |
| latent_vec_size (int): Size of latent vector z | |
| num_input_channels (int): Number of input channels in the image | |
| n_features (int): Number of features per convolution layer | |
| extra_layers (int): Number of extra layers since the network uses only a single encoder layer by default. | |
| Defaults to 0. | |
| """ | |
| def __init__( | |
| self, | |
| input_size: Tuple[int, int], | |
| latent_vec_size: int, | |
| num_input_channels: int, | |
| n_features: int, | |
| extra_layers: int = 0, | |
| ): | |
| super().__init__() | |
| self.latent_input = nn.Sequential() | |
| # Calculate input channel size to recreate inverse pyramid | |
| exp_factor = math.ceil(math.log(min(input_size) // 2, 2)) - 2 | |
| n_input_features = n_features * (2**exp_factor) | |
| # CNN layer for latent vector input | |
| self.latent_input.add_module( | |
| f"initial-{latent_vec_size}-{n_input_features}-convt", | |
| nn.ConvTranspose2d( | |
| latent_vec_size, | |
| n_input_features, | |
| kernel_size=4, | |
| stride=1, | |
| padding=0, | |
| bias=False, | |
| ), | |
| ) | |
| self.latent_input.add_module(f"initial-{n_input_features}-batchnorm", nn.BatchNorm2d(n_input_features)) | |
| self.latent_input.add_module(f"initial-{n_input_features}-relu", nn.ReLU(True)) | |
| # Create inverse pyramid | |
| self.inverse_pyramid = nn.Sequential() | |
| pyramid_dim = min(*input_size) // 2 # Use the smaller dimension to create pyramid. | |
| while pyramid_dim > 4: | |
| in_features = n_input_features | |
| out_features = n_input_features // 2 | |
| self.inverse_pyramid.add_module( | |
| f"pyramid-{in_features}-{out_features}-convt", | |
| nn.ConvTranspose2d( | |
| in_features, | |
| out_features, | |
| kernel_size=4, | |
| stride=2, | |
| padding=1, | |
| bias=False, | |
| ), | |
| ) | |
| self.inverse_pyramid.add_module(f"pyramid-{out_features}-batchnorm", nn.BatchNorm2d(out_features)) | |
| self.inverse_pyramid.add_module(f"pyramid-{out_features}-relu", nn.ReLU(True)) | |
| n_input_features = out_features | |
| pyramid_dim = pyramid_dim // 2 | |
| # Extra Layers | |
| self.extra_layers = nn.Sequential() | |
| for layer in range(extra_layers): | |
| self.extra_layers.add_module( | |
| f"extra-layers-{layer}-{n_input_features}-conv", | |
| nn.Conv2d(n_input_features, n_input_features, kernel_size=3, stride=1, padding=1, bias=False), | |
| ) | |
| self.extra_layers.add_module( | |
| f"extra-layers-{layer}-{n_input_features}-batchnorm", nn.BatchNorm2d(n_input_features) | |
| ) | |
| self.extra_layers.add_module( | |
| f"extra-layers-{layer}-{n_input_features}-relu", nn.LeakyReLU(0.2, inplace=True) | |
| ) | |
| # Final layers | |
| self.final_layers = nn.Sequential() | |
| self.final_layers.add_module( | |
| f"final-{n_input_features}-{num_input_channels}-convt", | |
| nn.ConvTranspose2d( | |
| n_input_features, | |
| num_input_channels, | |
| kernel_size=4, | |
| stride=2, | |
| padding=1, | |
| bias=False, | |
| ), | |
| ) | |
| self.final_layers.add_module(f"final-{num_input_channels}-tanh", nn.Tanh()) | |
| def forward(self, input_tensor): | |
| """Return generated image.""" | |
| output = self.latent_input(input_tensor) | |
| output = self.inverse_pyramid(output) | |
| output = self.extra_layers(output) | |
| output = self.final_layers(output) | |
| return output | |
| class Discriminator(nn.Module): | |
| """Discriminator. | |
| Made of only one encoder layer which takes x and x_hat to produce a score. | |
| Args: | |
| input_size (Tuple[int,int]): Input image size. | |
| num_input_channels (int): Number of image channels. | |
| n_features (int): Number of feature maps in each convolution layer. | |
| extra_layers (int, optional): Add extra intermediate layers. Defaults to 0. | |
| """ | |
| def __init__(self, input_size: Tuple[int, int], num_input_channels: int, n_features: int, extra_layers: int = 0): | |
| super().__init__() | |
| encoder = Encoder(input_size, 1, num_input_channels, n_features, extra_layers) | |
| layers = [] | |
| for block in encoder.children(): | |
| if isinstance(block, nn.Sequential): | |
| layers.extend(list(block.children())) | |
| else: | |
| layers.append(block) | |
| self.features = nn.Sequential(*layers[:-1]) | |
| self.classifier = nn.Sequential(layers[-1]) | |
| self.classifier.add_module("Sigmoid", nn.Sigmoid()) | |
| def forward(self, input_tensor): | |
| """Return class of object and features.""" | |
| features = self.features(input_tensor) | |
| classifier = self.classifier(features) | |
| classifier = classifier.view(-1, 1).squeeze(1) | |
| return classifier, features | |
| class Generator(nn.Module): | |
| """Generator model. | |
| Made of an encoder-decoder-encoder architecture. | |
| Args: | |
| input_size (Tuple[int,int]): Size of input data. | |
| latent_vec_size (int): Dimension of latent vector produced between the first encoder-decoder. | |
| num_input_channels (int): Number of channels in input image. | |
| n_features (int): Number of feature maps in each convolution layer. | |
| extra_layers (int, optional): Extra intermediate layers in the encoder/decoder. Defaults to 0. | |
| add_final_conv_layer (bool, optional): Add a final convolution layer in the decoder. Defaults to True. | |
| """ | |
| def __init__( | |
| self, | |
| input_size: Tuple[int, int], | |
| latent_vec_size: int, | |
| num_input_channels: int, | |
| n_features: int, | |
| extra_layers: int = 0, | |
| add_final_conv_layer: bool = True, | |
| ): | |
| super().__init__() | |
| self.encoder1 = Encoder( | |
| input_size, latent_vec_size, num_input_channels, n_features, extra_layers, add_final_conv_layer | |
| ) | |
| self.decoder = Decoder(input_size, latent_vec_size, num_input_channels, n_features, extra_layers) | |
| self.encoder2 = Encoder( | |
| input_size, latent_vec_size, num_input_channels, n_features, extra_layers, add_final_conv_layer | |
| ) | |
| def forward(self, input_tensor): | |
| """Return generated image and the latent vectors.""" | |
| latent_i = self.encoder1(input_tensor) | |
| gen_image = self.decoder(latent_i) | |
| latent_o = self.encoder2(gen_image) | |
| return gen_image, latent_i, latent_o | |
| class GanomalyModel(nn.Module): | |
| """Ganomaly Model. | |
| Args: | |
| input_size (Tuple[int,int]): Input dimension. | |
| num_input_channels (int): Number of input channels. | |
| n_features (int): Number of features layers in the CNNs. | |
| latent_vec_size (int): Size of autoencoder latent vector. | |
| extra_layers (int, optional): Number of extra layers for encoder/decoder. Defaults to 0. | |
| add_final_conv_layer (bool, optional): Add convolution layer at the end. Defaults to True. | |
| wadv (int, optional): Weight for adversarial loss. Defaults to 1. | |
| wcon (int, optional): Image regeneration weight. Defaults to 50. | |
| wenc (int, optional): Latent vector encoder weight. Defaults to 1. | |
| """ | |
| def __init__( | |
| self, | |
| input_size: Tuple[int, int], | |
| num_input_channels: int, | |
| n_features: int, | |
| latent_vec_size: int, | |
| extra_layers: int = 0, | |
| add_final_conv_layer: bool = True, | |
| wadv: int = 1, | |
| wcon: int = 50, | |
| wenc: int = 1, | |
| ) -> None: | |
| super().__init__() | |
| self.generator: Generator = Generator( | |
| input_size=input_size, | |
| latent_vec_size=latent_vec_size, | |
| num_input_channels=num_input_channels, | |
| n_features=n_features, | |
| extra_layers=extra_layers, | |
| add_final_conv_layer=add_final_conv_layer, | |
| ) | |
| self.discriminator: Discriminator = Discriminator( | |
| input_size=input_size, | |
| num_input_channels=num_input_channels, | |
| n_features=n_features, | |
| extra_layers=extra_layers, | |
| ) | |
| self.weights_init(self.generator) | |
| self.weights_init(self.discriminator) | |
| self.loss_enc = nn.SmoothL1Loss() | |
| self.loss_adv = nn.MSELoss() | |
| self.loss_con = nn.L1Loss() | |
| self.loss_bce = nn.BCELoss() | |
| self.wadv = wadv | |
| self.wcon = wcon | |
| self.wenc = wenc | |
| def weights_init(module: nn.Module): | |
| """Initialize DCGAN weights. | |
| Args: | |
| module (nn.Module): [description] | |
| """ | |
| classname = module.__class__.__name__ | |
| if classname.find("Conv") != -1: | |
| nn.init.normal_(module.weight.data, 0.0, 0.02) | |
| elif classname.find("BatchNorm") != -1: | |
| nn.init.normal_(module.weight.data, 1.0, 0.02) | |
| nn.init.constant_(module.bias.data, 0) | |
| def get_discriminator_loss(self, images: Tensor) -> Tensor: | |
| """Calculates loss for discriminator. | |
| Args: | |
| images (Tensor): Input images. | |
| Returns: | |
| Tensor: Discriminator loss. | |
| """ | |
| fake, _, _ = self.generator(images) | |
| pred_real, _ = self.discriminator(images) | |
| pred_fake, _ = self.discriminator(fake.detach()) | |
| error_discriminator_real = self.loss_bce( | |
| pred_real, torch.ones(size=pred_real.shape, dtype=torch.float32, device=pred_real.device) | |
| ) | |
| error_discriminator_fake = self.loss_bce( | |
| pred_fake, torch.zeros(size=pred_fake.shape, dtype=torch.float32, device=pred_fake.device) | |
| ) | |
| loss_discriminator = (error_discriminator_fake + error_discriminator_real) * 0.5 | |
| return loss_discriminator | |
| def get_generator_loss(self, images: Tensor) -> Tensor: | |
| """Calculates loss for generator. | |
| Args: | |
| images (Tensor): Input images. | |
| Returns: | |
| Tensor: Generator loss. | |
| """ | |
| fake, latent_i, latent_o = self.generator(images) | |
| pred_real, _ = self.discriminator(images) | |
| pred_fake, _ = self.discriminator(fake) | |
| error_enc = self.loss_enc(latent_i, latent_o) | |
| error_con = self.loss_con(images, fake) | |
| error_adv = self.loss_adv(pred_real, pred_fake) | |
| loss_generator = error_adv * self.wadv + error_con * self.wcon + error_enc * self.wenc | |
| return loss_generator | |
| def forward(self, batch: Tensor) -> Tensor: | |
| """Get scores for batch. | |
| Args: | |
| batch (Tensor): Images | |
| Returns: | |
| Tensor: Regeneration scores. | |
| """ | |
| padded_batch = pad_nextpow2(batch) | |
| self.generator.eval() | |
| _, latent_i, latent_o = self.generator(padded_batch) | |
| return torch.mean(torch.pow((latent_i - latent_o), 2), dim=1).view(-1) # convert nx1x1 to n | |