| from utils import torch,nn,F,optim,datasets,transforms |
| import utils |
| from pytorch_lightning import LightningModule, Trainer |
| from torchmetrics import Accuracy |
| import os |
| from torch.utils.data import DataLoader, random_split |
|
|
| import albumentations as A |
| from albumentations.pytorch import ToTensorV2 |
|
|
| class Cifar10SearchDataset(datasets.CIFAR10): |
|
|
| def __init__(self, root="~/data", train=True, download=True, transform=None): |
|
|
| super().__init__(root=root, train=train, download=download, transform=transform) |
|
|
| def __getitem__(self, index): |
|
|
| image, label = self.data[index], self.targets[index] |
|
|
| if self.transform is not None: |
|
|
| transformed = self.transform(image=image) |
|
|
| image = transformed["image"] |
|
|
| return image, label |
| |
|
|
| dropout_value = 0.1 |
| class LitCustomResNet(LightningModule): |
| def __init__(self,loss_criteria, learning_rate=1e-1): |
| super().__init__() |
| self.loss_criteria = loss_criteria |
| self.learning_rate = learning_rate |
| |
| self.accuracy = Accuracy(task='multiclass',num_classes=10) |
| |
| |
| self.convblockPreparation = nn.Sequential( |
| nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(3, 3), padding=1, bias=False), |
| nn.BatchNorm2d(64), |
| nn.ReLU(), |
| nn.Dropout(dropout_value) |
| ) |
| |
| |
| |
| |
| self.convblockL1X1 = nn.Sequential( |
| nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.MaxPool2d(2, 2), |
| nn.BatchNorm2d(128), |
| nn.ReLU(), |
| nn.Dropout(dropout_value) |
| ) |
| |
| |
| self.convblockL1R1 = nn.Sequential( |
| nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.BatchNorm2d(128), |
| nn.ReLU(), |
| nn.Dropout(dropout_value), |
| |
| nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.BatchNorm2d(128), |
| nn.ReLU(), |
| nn.Dropout(dropout_value) |
| ) |
| |
| |
| |
| self.convblockL2X1 = nn.Sequential( |
| nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.MaxPool2d(2, 2), |
| nn.BatchNorm2d(256), |
| nn.ReLU(), |
| nn.Dropout(dropout_value) |
| ) |
| |
| |
| |
| |
| self.convblockL3X1 = nn.Sequential( |
| nn.Conv2d(in_channels=256, out_channels=512, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.MaxPool2d(2, 2), |
| nn.BatchNorm2d(512), |
| nn.ReLU(), |
| nn.Dropout(dropout_value), |
| ) |
| |
| self.convblockL3R1 = nn.Sequential( |
| nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.BatchNorm2d(512), |
| nn.ReLU(), |
| nn.Dropout(dropout_value), |
| |
| nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3, 3), padding=1, dilation = 1, bias=False), |
| nn.BatchNorm2d(512), |
| nn.ReLU(), |
| nn.Dropout(dropout_value) |
| ) |
| |
|
|
| |
| self.FinalBlock = nn.Sequential( |
| nn.MaxPool2d(4, 4), |
| ) |
| |
| |
| self.FC = nn.Sequential( |
| nn.Linear(512, 10), |
| ) |
|
|
| self.dropout = nn.Dropout(dropout_value) |
|
|
| def forward(self, x): |
| |
| |
| x = self.convblockPreparation(x) |
| |
| |
| |
| x = self.convblockL1X1(x) |
| |
| x = x + self.convblockL1R1(x) |
| |
| |
| |
| x = self.convblockL2X1(x) |
| |
|
|
| |
| x = self.convblockL3X1(x) |
| |
| x = x + self.convblockL3R1(x) |
| |
| |
| |
| x = self.FinalBlock(x) |
| |
|
|
| x = x.view(-1, 512) |
| |
| |
| x = self.FC(x) |
| |
| return x.view(-1, 10) |
| |
| |
| def training_step(self, batch, batch_idx): |
| x, y = batch |
| |
| pred = self(x) |
| |
| loss = self.loss_criteria(pred, y) |
| preds = torch.argmax(pred, dim=1) |
| self.accuracy(preds, y) |
|
|
| |
| self.log("train_loss", loss, prog_bar=True) |
| self.log("train_acc", self.accuracy, prog_bar=True) |
| return loss |
|
|
| def validation_step(self, batch, batch_idx): |
| x, y = batch |
| |
| pred = self(x) |
| |
| loss = self.loss_criteria(pred, y) |
| |
| preds = torch.argmax(pred, dim=1) |
| self.accuracy(preds, y) |
|
|
| |
| self.log("val_loss", loss, prog_bar=True) |
| self.log("val_acc", self.accuracy, prog_bar=True) |
| return loss |
|
|
| def test_step(self, batch, batch_idx): |
| |
| return self.validation_step(batch, batch_idx) |
| |
| def lr_finder(self, optimizer, criterion): |
| from torch_lr_finder import LRFinder |
| lr_finder = LRFinder(self, optimizer, criterion, device="cuda") |
| lr_finder.range_test( self.train_dataloader(), end_lr=10, num_iter=200, step_mode="exp") |
| ax, suggested_lr = lr_finder.plot(suggest_lr=True) |
| lr_finder.reset() |
| return suggested_lr |
| |
| def configure_optimizers(self): |
| from torch.optim.lr_scheduler import OneCycleLR |
| optimizer = optim.Adam(self.parameters(), lr=self.learning_rate) |
| |
| suggested_lr = 4.51E-02 |
| steps_per_epoch = len(self.train_dataloader()) |
| scheduler_dict = { |
| "scheduler": OneCycleLR( |
| optimizer, max_lr=suggested_lr, |
| steps_per_epoch=steps_per_epoch, |
| epochs=self.trainer.max_epochs, |
| pct_start=5/self.trainer.max_epochs, |
| three_phase=False, |
| div_factor=80, |
| final_div_factor=550, |
| ), |
| "interval": "step", |
| } |
| return {"optimizer": optimizer, "lr_scheduler": scheduler_dict} |
| |
| |
| |
| |
| |
| |
|
|
| def prepare_data(self): |
| |
| Cifar10SearchDataset(root='./data', train=True, download=True) |
| Cifar10SearchDataset(root='./data', train=False, download=True) |
|
|
| def setup(self, stage=None): |
| |
| means = [0.4914, 0.4822, 0.4465] |
| stds = [0.2470, 0.2435, 0.2616] |
| BATCH_SIZE = 512 |
|
|
| train_transforms = A.Compose( |
| [ |
| A.augmentations.transforms.ColorJitter(brightness=0.10, contrast=0.1, saturation=0.10, hue=0.1, always_apply=False, p = 0.5), |
| A.PadIfNeeded(min_height=40, min_width=40, always_apply=True), |
| A.RandomCrop(height=32, width=32, always_apply=True), |
| A.HorizontalFlip(), |
| |
| A.Normalize(mean=means, std=stds, always_apply=True), |
| A.CoarseDropout(max_holes=1, max_height=8, max_width=8, min_holes=1, min_height=8, min_width=8, fill_value=means, always_apply=True), |
| ToTensorV2(), |
| ] |
| ) |
|
|
| test_transforms = A.Compose( |
| [ |
| A.Normalize(mean=means, std=stds, always_apply=True), |
| ToTensorV2(), |
| ] |
| ) |
|
|
| |
| if stage == "fit" or stage is None: |
| cifar10_full = Cifar10SearchDataset(root='./data', train=True, download=True, transform=train_transforms) |
| self.cifar10_train, self.cifar10_val = random_split(cifar10_full, [45000, 5000]) |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| if stage == "test" or stage is None: |
| self.cifar10_test = Cifar10SearchDataset(root='./data', train=False, download=True, transform=test_transforms) |
|
|
| def train_dataloader(self): |
| return DataLoader(self.cifar10_train, batch_size=512, num_workers=4, persistent_workers=True, pin_memory=True) |
|
|
| def val_dataloader(self): |
| return DataLoader(self.cifar10_val, batch_size=512, num_workers=4, persistent_workers=True, pin_memory=True) |
|
|
| def test_dataloader(self): |
| return DataLoader(self.cifar10_test, batch_size=512, num_workers=4, persistent_workers=True, pin_memory=True) |
| |
| |
| def getModel(loss_criteria, learning_rate=1e-1): |
| return LitCustomResNet(loss_criteria, learning_rate).to(utils.getDevice()) |
|
|