Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Utility Script containing functions to be used for training | |
| Author: Shilpaj Bhalerao | |
| """ | |
| # Standard Library Imports | |
| import math | |
| from typing import NoReturn | |
| # Third-Party Imports | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import torch | |
| from torchsummary import summary | |
| from torchvision import transforms | |
| from pytorch_grad_cam import GradCAM | |
| from pytorch_grad_cam.utils.image import show_cam_on_image | |
| import torch.optim as optim | |
| import torch.nn.functional as F | |
| from torch_lr_finder import LRFinder | |
| def get_summary(model, input_size: tuple) -> NoReturn: | |
| """ | |
| Function to get the summary of the model architecture | |
| :param model: Object of model architecture class | |
| :param input_size: Input data shape (Channels, Height, Width) | |
| """ | |
| use_cuda = torch.cuda.is_available() | |
| device = torch.device("cuda" if use_cuda else "cpu") | |
| network = model.to(device) | |
| summary(network, input_size=input_size) | |
| def get_misclassified_data(model, device, test_loader): | |
| """ | |
| Function to run the model on test set and return misclassified images | |
| :param model: Network Architecture | |
| :param device: CPU/GPU | |
| :param test_loader: DataLoader for test set | |
| """ | |
| # Prepare the model for evaluation i.e. drop the dropout layer | |
| model.eval() | |
| model.to(device) | |
| # List to store misclassified Images | |
| misclassified_data = [] | |
| # Reset the gradients | |
| with torch.no_grad(): | |
| # Extract images, labels in a batch | |
| for data, target in test_loader: | |
| if len(misclassified_data) > 40: | |
| break | |
| # Migrate the data to the device | |
| data, target = data.to(device), target.to(device) | |
| # Extract single image, label from the batch | |
| for image, label in zip(data, target): | |
| # Add batch dimension to the image | |
| image = image.unsqueeze(0) | |
| # Get the model prediction on the image | |
| output = model(image) | |
| # Convert the output from one-hot encoding to a value | |
| pred = output.argmax(dim=1, keepdim=True) | |
| # If prediction is incorrect, append the data | |
| if pred != label: | |
| misclassified_data.append((image, label, pred)) | |
| return misclassified_data | |
| # -------------------- GradCam -------------------- | |
| def display_gradcam_output(data: list, | |
| classes: list[str], | |
| inv_normalize: transforms.Normalize, | |
| model, | |
| target_layers, | |
| targets=None, | |
| number_of_samples: int = 10, | |
| transparency: float = 0.60): | |
| """ | |
| Function to visualize GradCam output on the data | |
| :param data: List[Tuple(image, label)] | |
| :param classes: Name of classes in the dataset | |
| :param inv_normalize: Mean and Standard deviation values of the dataset | |
| :param model: Model architecture | |
| :param target_layers: Layers on which GradCam should be executed | |
| :param targets: Classes to be focused on for GradCam | |
| :param number_of_samples: Number of images to print | |
| :param transparency: Weight of Normal image when mixed with activations | |
| """ | |
| # Plot configuration | |
| fig = plt.figure(figsize=(10, 10)) | |
| x_count = 5 | |
| y_count = 1 if number_of_samples <= 5 else math.floor(number_of_samples / x_count) | |
| # Create an object for GradCam | |
| cam = GradCAM(model=model, target_layers=target_layers, use_cuda=True) | |
| # Iterate over number of specified images | |
| for i in range(number_of_samples): | |
| plt.subplot(y_count, x_count, i + 1) | |
| input_tensor = data[i][0] | |
| # Get the activations of the layer for the images | |
| grayscale_cam = cam(input_tensor=input_tensor, targets=targets) | |
| grayscale_cam = grayscale_cam[0, :] | |
| # Get back the original image | |
| img = input_tensor.squeeze(0).to('cpu') | |
| img = inv_normalize(img) | |
| rgb_img = np.transpose(img, (1, 2, 0)) | |
| rgb_img = rgb_img.numpy() | |
| # Mix the activations on the original image | |
| visualization = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True, image_weight=transparency) | |
| # Display the images on the plot | |
| plt.imshow(visualization) | |
| plt.title(r"Correct: " + classes[data[i][1].item()] + '\n' + 'Output: ' + classes[data[i][2].item()]) | |
| plt.xticks([]) | |
| plt.yticks([]) | |
| def get_optimizer(model, lr, momentum=0, weight_decay=0, optimizer_type='SGD'): | |
| """Method to get object of stochastic gradient descent. Used to update weights. | |
| Args: | |
| model (Object): Neural Network model | |
| lr (float): Value of learning rate | |
| momentum (float): Value of momentum | |
| weight_decay (float): Value of weight decay | |
| optimizer_type (str): Type of optimizer SGD or ADAM | |
| Returns: | |
| object: Object of optimizer class to update weights | |
| """ | |
| if optimizer_type == 'SGD': | |
| optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum) | |
| elif optimizer_type == 'ADAM': | |
| optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) | |
| return optimizer | |
| def get_StepLR_scheduler(optimizer, step_size, gamma): | |
| """Method to get object of scheduler class. Used to update learning rate | |
| Args: | |
| optimizer (Object): Object of optimizer | |
| step_size (int): Period of learning rate decay | |
| gamma (float): Number to multiply with learning rate | |
| Returns: | |
| object: Object of StepLR class to update learning rate | |
| """ | |
| scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma, verbose=True) | |
| return scheduler | |
| def get_ReduceLROnPlateau_scheduler(optimizer, factor, patience): | |
| """Method to get object of scheduler class. Used to update learning rate | |
| Args: | |
| optimizer (Object): Object of optimizer | |
| factor (float): Number to multiply with learning rate | |
| patience (int): Number of epoch to wait | |
| Returns: | |
| object: Object of StepLR class to update learning rate | |
| """ | |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=factor, patience=patience, verbose=True) | |
| return scheduler | |
| def get_OneCycleLR_scheduler(optimizer, max_lr, epochs, steps_per_epoch, max_at_epoch, anneal_strategy, div_factor, final_div_factor): | |
| """Method to get object of scheduler class. Used to update learning rate | |
| Args: | |
| optimizer (Object): Object of optimizer | |
| max_lr (float): Maximum learning rate to reach during training | |
| epochs (float): Total number of epoch | |
| steps_per_epoch (int): Total steps in an epoch | |
| max_at_epoch (int): Epoch to reach maximum learning rate | |
| anneal_strategy (string): Strategy to interpolate between minimum and maximum lr | |
| div_factor (int): Divisive factor to calculate intial learning rate | |
| final_div_factor (int): Divisive factor to calculate minimum learning rate | |
| Returns: | |
| object: Object of StepLR class to update learning rate | |
| """ | |
| scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=max_lr, epochs=epochs, | |
| steps_per_epoch=steps_per_epoch, | |
| pct_start=max_at_epoch/epochs, | |
| anneal_strategy=anneal_strategy, | |
| div_factor=div_factor, | |
| final_div_factor=final_div_factor) | |
| return scheduler | |
| def get_criterion(loss_type='cross_entropy'): | |
| """Method to get loss calculation ctiterion | |
| Args: | |
| loss_type (str): Type of loss 'nll_loss' or 'cross_entropy' loss | |
| Returns: | |
| object: Object to calculate loss | |
| """ | |
| if loss_type == 'nll_loss': | |
| criterion = F.nll_loss | |
| elif loss_type == 'cross_entropy': | |
| criterion = F.cross_entropy | |
| return criterion | |
| def get_learning_rate(model, optimizer, criterion, trainloader): | |
| """Method to find learning rate using LR finder. | |
| Args: | |
| model (Object): Object of model | |
| optimizer (Object): Object of optimizer class | |
| criterion (Object): Loss function | |
| trainloader (Object): Object of dataloader class | |
| Returns: | |
| float: Learning rate suggested by lr finder | |
| """ | |
| # Create object and perform range test | |
| lr_finder = LRFinder(model, optimizer, criterion) | |
| lr_finder.range_test(trainloader, end_lr=100, num_iter=100) | |
| # Plot result and store suggested lr | |
| plot, suggested_lr = lr_finder.plot() | |
| # Reset model and optimizer | |
| lr_finder.reset() | |
| return suggested_lr |