| |
| """ |
| PyTorch Lightning for ResNet Architecture |
| Author: Shilpaj Bhalerao |
| """ |
| |
| import os |
| import math |
|
|
| |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import albumentations as A |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import torch.optim as optim |
| from torch.utils.data import DataLoader, random_split |
|
|
| from torchvision import transforms |
| from torchvision.datasets import CIFAR10 |
|
|
| from pytorch_lightning import LightningModule, Trainer |
| from torchmetrics import Accuracy |
|
|
| |
| from datasets import AlbumDataset |
| from utils import get_cifar_statistics |
| from visualize import visualize_cifar_augmentation, display_cifar_data_samples |
|
|
|
|
| class Layers: |
| """ |
| Class containing different types of Convolutional layer |
| """ |
|
|
| def __init__(self, groups=1): |
| """ |
| Constructor |
| """ |
| self.group = groups |
|
|
| @staticmethod |
| def standard_conv_layer(in_channels: int, |
| out_channels: int, |
| kernel_size: int = 3, |
| padding: int = 0, |
| stride: int = 1, |
| dilation: int = 1, |
| normalization: str = "batch", |
| last_layer: bool = False, |
| conv_type: str = "standard", |
| groups: int = 1): |
| """ |
| Method to return a standard convolution block |
| :param in_channels: Number of input channels |
| :param out_channels: Number of output channels |
| :param kernel_size: Size of the kernel used in the layer |
| :param padding: Padding used in the layer |
| :param stride: Stride used for convolution |
| :param dilation: Dilation for Atrous convolution |
| :param normalization: Type of normalization technique used |
| :param last_layer: Flag to indicate if the layer is last convolutional layer of the network |
| :param conv_type: Type of convolutional layer |
| :param groups: Number of Groups for Group Normalization |
| """ |
| |
| if normalization == "layer": |
| _norm_layer = nn.GroupNorm(1, out_channels) |
| elif normalization == "group": |
| if not groups: |
| raise ValueError("Value of group is not defined") |
| _norm_layer = nn.GroupNorm(groups, out_channels) |
| else: |
| _norm_layer = nn.BatchNorm2d(out_channels) |
|
|
| |
| if conv_type == "standard": |
| conv_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, stride=stride, |
| kernel_size=kernel_size, bias=False, padding=padding) |
| elif conv_type == "depthwise": |
| conv_layer = Layers.depthwise_conv(in_channels=in_channels, out_channels=out_channels, stride=stride, |
| padding=padding) |
| elif conv_type == "dilated": |
| conv_layer = Layers.dilated_conv(in_channels=in_channels, out_channels=out_channels, stride=stride, |
| padding=padding, dilation=dilation) |
|
|
| |
| if last_layer: |
| return nn.Sequential(conv_layer) |
| return nn.Sequential( |
| conv_layer, |
| _norm_layer, |
| nn.ReLU(), |
| |
| ) |
|
|
| @staticmethod |
| def resnet_block(channels): |
| """ |
| Method to create a RESNET block |
| """ |
| return nn.Sequential( |
| nn.Conv2d(in_channels=channels, out_channels=channels, stride=1, kernel_size=3, bias=False, padding=1), |
| nn.BatchNorm2d(channels), |
| nn.ReLU(), |
| nn.Conv2d(in_channels=channels, out_channels=channels, stride=1, kernel_size=3, bias=False, padding=1), |
| nn.BatchNorm2d(channels), |
| nn.ReLU(), |
| ) |
|
|
| @staticmethod |
| def custom_block(input_channels, output_channels): |
| """ |
| Method to create a custom configured block |
| :param input_channels: Number of input channels |
| :param output_channels: Number of output channels |
| """ |
| return nn.Sequential( |
| nn.Conv2d(in_channels=input_channels, out_channels=output_channels, stride=1, kernel_size=3, bias=False, |
| padding=1), |
| nn.MaxPool2d(kernel_size=2, stride=2), |
| nn.BatchNorm2d(output_channels), |
| nn.ReLU(), |
| ) |
|
|
| @staticmethod |
| def depthwise_conv(in_channels, out_channels, stride=1, padding=0): |
| """ |
| Method to return the depthwise separable convolution layer |
| :param in_channels: Number of input channels |
| :param out_channels: Number of output channels |
| :param padding: Padding used in the layer |
| :param stride: Stride used for convolution |
| """ |
| return nn.Sequential( |
| nn.Conv2d(in_channels=in_channels, out_channels=in_channels, stride=stride, groups=in_channels, |
| kernel_size=3, bias=False, padding=padding), |
| nn.Conv2d(in_channels=in_channels, out_channels=out_channels, stride=stride, kernel_size=1, bias=False, |
| padding=0) |
| ) |
|
|
| @staticmethod |
| def dilated_conv(in_channels, out_channels, stride=1, padding=0, dilation=1): |
| """ |
| Method to return the dilated convolution layer |
| :param in_channels: Number of input channels |
| :param out_channels: Number of output channels |
| :param stride: Stride used for convolution |
| :param padding: Padding used in the layer |
| :param dilation: Dilation value for a kernel |
| """ |
| return nn.Sequential( |
| nn.Conv2d(in_channels=in_channels, out_channels=out_channels, stride=stride, kernel_size=3, bias=False, |
| padding=padding, dilation=dilation) |
| ) |
|
|
|
|
| class LITResNet(LightningModule, Layers): |
| """ |
| David's Model Architecture for Session-10 CIFAR10 dataset |
| """ |
|
|
| def __init__(self, class_names, data_dir='/data/'): |
| """ |
| Constructor |
| """ |
| |
| super().__init__() |
|
|
| |
| self.classes = class_names |
| self.data_dir = data_dir |
| self.num_classes = 10 |
| self._learning_rate = 0.03 |
| self.inv_normalize = transforms.Normalize( |
| mean=[-0.50 / 0.23, -0.50 / 0.23, -0.50 / 0.23], |
| std=[1 / 0.23, 1 / 0.23, 1 / 0.23] |
| ) |
| self.batch_size = 512 |
| self.epochs = 24 |
| self.accuracy = Accuracy(task='multiclass', |
| num_classes=10) |
| self.train_transforms = transforms.Compose([transforms.ToTensor()]) |
| self.test_transforms = transforms.Compose([transforms.ToTensor()]) |
| self.stats_train = None |
| self.stats_test = None |
| self.cifar10_train = None |
| self.cifar10_test = None |
| self.cifar10_val = None |
| self.misclassified_data = None |
|
|
| |
| self.prep_layer = None |
| self.custom_block1 = None |
| self.custom_block2 = None |
| self.custom_block3 = None |
| self.resnet_block1 = None |
| self.resnet_block3 = None |
| self.pool4 = None |
| self.fc = None |
| self.dropout_value = None |
|
|
| |
| self.model_layers() |
|
|
| |
| |
| |
| def model_layers(self): |
| """ |
| Method to initialize layers for the model |
| """ |
| |
| self.prep_layer = Layers.standard_conv_layer(in_channels=3, out_channels=64, kernel_size=3, padding=1, stride=1) |
|
|
| |
| self.custom_block1 = Layers.custom_block(input_channels=64, output_channels=128) |
| self.resnet_block1 = Layers.resnet_block(channels=128) |
|
|
| |
| self.custom_block2 = Layers.custom_block(input_channels=128, output_channels=256) |
|
|
| |
| self.custom_block3 = Layers.custom_block(input_channels=256, output_channels=512) |
| self.resnet_block3 = Layers.resnet_block(channels=512) |
|
|
| |
| self.pool4 = nn.MaxPool2d(kernel_size=4, stride=2) |
|
|
| |
| self.fc = nn.Linear(in_features=512, out_features=10, bias=False) |
|
|
| |
| self.dropout_value = 0.1 |
|
|
| def forward(self, x): |
| """ |
| Forward pass for model training |
| :param x: Input layer |
| :return: Model Prediction |
| """ |
| |
| x = self.prep_layer(x) |
|
|
| |
| x = self.custom_block1(x) |
| r1 = self.resnet_block1(x) |
| x = x + r1 |
|
|
| |
| x = self.custom_block2(x) |
|
|
| |
| x = self.custom_block3(x) |
| r2 = self.resnet_block3(x) |
| x = x + r2 |
|
|
| |
| x = self.pool4(x) |
|
|
| |
| x = x.view(-1, 512) |
| x = self.fc(x) |
|
|
| return F.log_softmax(x, dim=1) |
|
|
| |
| |
| |
|
|
| def configure_optimizers(self): |
| """ |
| Method to configure the optimizer and learning rate scheduler |
| """ |
| learning_rate = 0.03 |
| weight_decay = 1e-4 |
| optimizer = optim.Adam(self.parameters(), lr=learning_rate, weight_decay=weight_decay) |
|
|
| |
| scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, |
| max_lr=self._learning_rate, |
| steps_per_epoch=len(self.train_dataloader()), |
| epochs=self.epochs, |
| pct_start=5 / self.epochs, |
| div_factor=100, |
| three_phase=False, |
| final_div_factor=100, |
| anneal_strategy="linear" |
| ) |
| return [optimizer], [{'scheduler': scheduler, 'interval': 'step'}] |
|
|
| @property |
| def learning_rate(self) -> float: |
| """ |
| Method to get the learning rate value |
| """ |
| return self._learning_rate |
|
|
| @learning_rate.setter |
| def learning_rate(self, value: float): |
| """ |
| Method to set the learning rate value |
| :param value: Updated value of learning rate |
| """ |
| self._learning_rate = value |
|
|
| def set_training_confi(self, *, epochs, batch_size): |
| """ |
| Method to set parameters required for model training |
| :param epochs: Number of epochs for which model is to be trained |
| :param batch_size: Batch Size |
| """ |
| self.epochs = epochs |
| self.batch_size = batch_size |
|
|
| |
| |
| |
| def training_step(self, train_batch, batch_index): |
| """ |
| Method called on training dataset to train the model |
| :param train_batch: Batch containing images and labels |
| :param batch_index: Index of the batch |
| """ |
| x, y = train_batch |
| logits = self.forward(x) |
| loss = F.cross_entropy(logits, y) |
| preds = torch.argmax(logits, dim=1) |
| self.accuracy(preds, y) |
|
|
| self.log("train_loss", loss, prog_bar=True) |
| self.log("train_acc", self.accuracy, prog_bar=True) |
| return loss |
|
|
| def validation_step(self, batch, batch_idx): |
| """ |
| Method called on validation dataset to check if the model is learning |
| :param batch: Batch containing images and labels |
| :param batch_idx: Index of the batch |
| """ |
| x, y = batch |
| logits = self.forward(x) |
| loss = F.nll_loss(logits, y) |
| preds = torch.argmax(logits, dim=1) |
| self.accuracy(preds, y) |
|
|
| |
| self.log("val_loss", loss, prog_bar=True) |
| self.log("val_acc", self.accuracy, prog_bar=True) |
| return loss |
|
|
| def test_step(self, batch, batch_idx): |
| """ |
| Method called on test dataset to check model performance on unseen data |
| :param batch: Batch containing images and labels |
| :param batch_idx: Index of the batch |
| """ |
| |
| return self.validation_step(batch, batch_idx) |
|
|
| |
| |
| |
|
|
| def set_transforms(self, train_set_transforms: dict, test_set_transforms: dict): |
| """ |
| Method to set the transformations to be done on training and test datasets |
| :param train_set_transforms: Dictionary of transformations for training dataset |
| :param test_set_transforms: Dictionary of transformations for test dataset |
| """ |
| self.train_transforms = A.Compose(train_set_transforms.values()) |
| self.test_transforms = A.Compose(test_set_transforms.values()) |
|
|
| def prepare_data(self): |
| """ |
| Method to download the dataset |
| """ |
| self.stats_train = CIFAR10('./data', train=True, download=True, transform=transforms.ToTensor()) |
| self.stats_test = CIFAR10('./data', train=False, download=True, transform=transforms.ToTensor()) |
|
|
| def setup(self, stage=None): |
| """ |
| Method to create Split the dataset into train, test and val |
| """ |
| |
| if not self.cifar10_train and not self.cifar10_test and not self.cifar10_val: |
|
|
| |
| if stage == "fit" or stage is None: |
| cifar10_full = AlbumDataset(self.data_dir, train=True, download=True, transform=self.train_transforms) |
| self.cifar10_train, self.cifar10_val = random_split(cifar10_full, [45_000, 5_000]) |
|
|
| |
| if stage == "test" or stage is None: |
| self.cifar10_test = AlbumDataset(self.data_dir, train=False, download=True, |
| transform=self.test_transforms) |
|
|
| def train_dataloader(self): |
| """ |
| Method to return the DataLoader for Training set |
| """ |
| return DataLoader(self.cifar10_train, batch_size=self.batch_size, num_workers=os.cpu_count()) |
|
|
| def val_dataloader(self): |
| """ |
| Method to return the DataLoader for the Validation set |
| """ |
| return DataLoader(self.cifar10_val, batch_size=self.batch_size, num_workers=os.cpu_count()) |
|
|
| def test_dataloader(self): |
| """ |
| Method to return the DataLoader for the Test set |
| """ |
| return DataLoader(self.cifar10_test, batch_size=self.batch_size, num_workers=os.cpu_count()) |
|
|
| def get_statistics(self, data_set_type="Train"): |
| """ |
| Method to get the statistics for CIFAR10 dataset |
| """ |
| |
| if not self.stats_train and not self.stats_test: |
| self.prepare_data() |
|
|
| |
| if data_set_type == "Train": |
| get_cifar_statistics(self.stats_train) |
| else: |
| get_cifar_statistics(self.stats_test, data_set_type="Test") |
|
|
| def display_data_samples(self, dataset="train", num_of_images=20): |
| """ |
| Method to display data samples |
| """ |
| |
| try: |
| assert self.stats_train |
| except AttributeError: |
| self.prepare_data() |
|
|
| if dataset == "train": |
| display_cifar_data_samples(self.stats_train, num_of_images, self.classes) |
| else: |
| display_cifar_data_samples(self.stats_test, num_of_images, self.classes) |
|
|
| @staticmethod |
| def visualize_augmentation(aug_set_transforms: dict): |
| """ |
| Method to visualize augmentations |
| :param aug_set_transforms: Dictionary of transformations to be visualized |
| """ |
| aug_train = AlbumDataset('./data', train=True, download=True) |
| visualize_cifar_augmentation(aug_train, aug_set_transforms) |
|
|
| |
| |
| |
|
|
| def get_misclassified_data(self): |
| """ |
| Function to run the model on test set and return misclassified images |
| """ |
| if self.misclassified_data: |
| return self.misclassified_data |
|
|
| self.misclassified_data = [] |
| self.prepare_data() |
| self.setup() |
|
|
| test_loader = self.test_dataloader() |
|
|
| |
| with torch.no_grad(): |
| |
| for data, target in test_loader: |
|
|
| |
| data, target = data.to(self.device), target.to(self.device) |
|
|
| |
| for image, label in zip(data, target): |
|
|
| |
| image = image.unsqueeze(0) |
|
|
| |
| output = self.forward(image) |
|
|
| |
| pred = output.argmax(dim=1, keepdim=True) |
|
|
| |
| if pred != label: |
| self.misclassified_data.append((image, label, pred)) |
| return self.misclassified_data |
|
|
| def display_cifar_misclassified_data(self, number_of_samples: int = 10): |
| """ |
| Function to plot images with labels |
| :param number_of_samples: Number of images to print |
| """ |
| if not self.misclassified_data: |
| self.misclassified_data = self.get_misclassified_data() |
|
|
| fig = plt.figure(figsize=(10, 10)) |
|
|
| x_count = 5 |
| y_count = 1 if number_of_samples <= 5 else math.floor(number_of_samples / x_count) |
|
|
| for i in range(number_of_samples): |
| plt.subplot(y_count, x_count, i + 1) |
| img = self.misclassified_data[i][0].squeeze().to('cpu') |
| img = self.inv_normalize(img) |
| plt.imshow(np.transpose(img, (1, 2, 0))) |
| plt.title( |
| r"Correct: " + self.classes[self.misclassified_data[i][1].item()] + '\n' + 'Output: ' + self.classes[ |
| self.misclassified_data[i][2].item()]) |
| plt.xticks([]) |
| plt.yticks([]) |
|
|