Spaces:
Runtime error
Runtime error
| import torch | |
| import torch.nn as nn | |
| from torch.nn import functional as F | |
| import torch.optim as optim | |
| from torch.optim import lr_scheduler | |
| import torch.backends.cudnn as cudnn | |
| import numpy as np | |
| import torchvision | |
| from torchvision import datasets, models, transforms | |
| import matplotlib.pyplot as plt | |
| import time | |
| import os | |
| from PIL import Image | |
| from tempfile import TemporaryDirectory | |
| import streamlit as st | |
| cudnn.benchmark = True | |
| plt.ion() # interactive mode | |
| class classifier(): | |
| def __init__(self): | |
| self.data_transforms = None | |
| self.data_dir = None | |
| self.image_datasets = None | |
| self.dataloaders = None | |
| self.dataset_sizes = None | |
| self.class_names = None | |
| self.device = None | |
| self.num_classes = None | |
| def data_loader(self,path,batch_size=4): | |
| # Data augmentation and normalization for training | |
| # Just normalization for validation | |
| self.data_transforms = { | |
| 'train': transforms.Compose([ | |
| transforms.RandomResizedCrop(224), | |
| transforms.RandomHorizontalFlip(), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
| ]), | |
| 'val': transforms.Compose([ | |
| transforms.Resize(256), | |
| transforms.CenterCrop(224), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
| ]), | |
| 'test': transforms.Compose([ | |
| transforms.Resize(256), | |
| transforms.CenterCrop(224), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
| ]) | |
| } | |
| self.data_dir = path | |
| self.image_datasets = {x: datasets.ImageFolder(os.path.join(self.data_dir, x), | |
| self.data_transforms[x]) | |
| for x in ['train', 'val','test']} | |
| self.dataloaders = {x: torch.utils.data.DataLoader(self.image_datasets[x], batch_size=batch_size, | |
| shuffle=True, num_workers=4) | |
| for x in ['train', 'val','test']} | |
| self.dataset_sizes = {x: len(self.image_datasets[x]) for x in ['train', 'val','test']} | |
| self.class_names = self.image_datasets['train'].classes | |
| self.num_classes = len(self.class_names) | |
| self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| def train(self,model, criterion, optimizer, scheduler, num_epochs=25): | |
| since = time.time() | |
| # Create a temporary directory to save training checkpoints | |
| with TemporaryDirectory() as tempdir: | |
| best_model_params_path = os.path.join(tempdir, 'best_model_params.pt') | |
| torch.save(model.state_dict(), best_model_params_path) | |
| best_acc = 0.0 | |
| for epoch in range(num_epochs): | |
| print(f'Epoch {epoch+1}/{num_epochs}') | |
| print('-' * 10) | |
| st.sidebar.subheader(f':blue[Epoch {epoch+1}/{num_epochs}]', divider='blue') | |
| # st.sidebar.code('-' * 10) | |
| # Each epoch has a training and validation phase | |
| for phase in ['train', 'val']: | |
| if phase == 'train': | |
| model.train() # Set model to training mode | |
| else: | |
| model.eval() # Set model to evaluate mode | |
| running_loss = 0.0 | |
| running_corrects = 0 | |
| # Iterate over data. | |
| for inputs, labels in self.dataloaders[phase]: | |
| inputs = inputs.to(self.device) | |
| labels = labels.to(self.device) | |
| # zero the parameter gradients | |
| optimizer.zero_grad() | |
| # forward | |
| # track history if only in train | |
| with torch.set_grad_enabled(phase == 'train'): | |
| outputs = model(inputs) | |
| _, preds = torch.max(outputs, 1) | |
| loss = criterion(outputs, labels) | |
| # backward + optimize only if in training phase | |
| if phase == 'train': | |
| loss.backward() | |
| optimizer.step() | |
| # statistics | |
| running_loss += loss.item() * inputs.size(0) | |
| running_corrects += torch.sum(preds == labels.data) | |
| if phase == 'train': | |
| scheduler.step() | |
| epoch_loss = running_loss / self.dataset_sizes[phase] | |
| epoch_acc = running_corrects.double() / self.dataset_sizes[phase] | |
| print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') | |
| st.sidebar.caption(f':blue[{phase[0].upper() + phase[1:]} Loss:] {epoch_loss:.4f} :blue[ Accuracy:] {epoch_acc:.4f}') | |
| # deep copy the model | |
| if phase == 'val' and epoch_acc > best_acc: | |
| best_acc = epoch_acc | |
| torch.save(model.state_dict(), best_model_params_path) | |
| print() | |
| time_elapsed = time.time() - since | |
| print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s') | |
| print(f'Best val Accuracy: {best_acc:4f}') | |
| st.sidebar.caption(f':green[Training complete in] {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s') | |
| st.sidebar.subheader(f':blue[Best val Accuracy:] {best_acc:4f}') | |
| # load best model weights | |
| model.load_state_dict(torch.load(best_model_params_path)) | |
| return model | |
| def train_model(self,model_name,epochs): | |
| num_classes = self.num_classes | |
| if model_name == 'EfficientNet_B0': | |
| model = models.efficientnet_b0(pretrained=True) | |
| model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes) | |
| # model.classifier[1].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'EfficientNet_B1': | |
| model = models.efficientnet_b1(pretrained=True) | |
| model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes) | |
| # model.classifier[1].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'MnasNet0_5': | |
| model = models.mnasnet0_5(pretrained=True) | |
| model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes) | |
| # model.classifier[1].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'MnasNet0_75': | |
| model = models.mnasnet0_75(pretrained=True) | |
| model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes) | |
| # model.classifier[1].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'MnasNet1_0': | |
| model = models.mnasnet1_0(pretrained=True) | |
| model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes) | |
| # model.classifier[1].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'MobileNet_v2': | |
| model = models.mobilenet_v2(pretrained=True) | |
| model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes) | |
| # model.classifier[1].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'MobileNet_v3_small': | |
| model = models.mobilenet_v3_small(pretrained=True) | |
| model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes) | |
| # model.classifier[3].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[3].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'MobileNet_v3_large': | |
| model = models.mobilenet_v3_large(pretrained=True) | |
| model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes) | |
| # model.classifier[3].out_features = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[3].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'RegNet_y_400mf': | |
| model = models.regnet_y_400mf(pretrained=True) | |
| model.fc = nn.Linear(model.fc.in_features, num_classes) | |
| # model.fc.out_features = num_classes | |
| optimizer = torch.optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'ShuffleNet_v2_x0_5': | |
| model = models.shufflenet_v2_x0_5(pretrained=True) | |
| model.fc = nn.Linear(model.fc.in_features, num_classes) | |
| # model.fc.out_features = num_classes | |
| optimizer = torch.optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'ShuffleNet_v2_x1_0': | |
| model = models.shufflenet_v2_x1_0(pretrained=True) | |
| model.fc = nn.Linear(model.fc.in_features, num_classes) | |
| # model.fc.out_features = num_classes | |
| optimizer = torch.optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'ShuffleNet_v2_x1_5': | |
| model = models.shufflenet_v2_x1_5(pretrained=True) | |
| model.fc = nn.Linear(model.fc.in_features, num_classes) | |
| # model.fc.out_features = num_classes | |
| optimizer = torch.optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'SqueezeNet 1_0': | |
| model = models.squeezenet1_0(pretrained=True) | |
| model.classifier[1] = nn.Conv2d(model.classifier[1].in_channels, num_classes,model.classifier[1].kernel_size, model.classifier[1].stride) | |
| # model.classifier[1].out_channels = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| elif model_name == 'SqueezeNet 1_1': | |
| model = models.squeezenet1_1(pretrained=True) | |
| model.classifier[1] = nn.Conv2d(model.classifier[1].in_channels, num_classes,model.classifier[1].kernel_size, model.classifier[1].stride) | |
| # model.classifier[1].out_channels = num_classes | |
| optimizer = torch.optim.SGD(model.classifier[1].parameters(), lr=0.001, momentum=0.9) | |
| exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1) | |
| criterion = nn.CrossEntropyLoss() | |
| model_ft = self.train(model, criterion, optimizer, exp_lr_scheduler, | |
| num_epochs=epochs) | |
| torch.save(model.state_dict(), 'model.pt') | |
| return model_ft | |
| def imshow(self,inp, title=None): | |
| """Display image for Tensor.""" | |
| inp = inp.numpy().transpose((1, 2, 0)) | |
| mean = np.array([0.485, 0.456, 0.406]) | |
| std = np.array([0.229, 0.224, 0.225]) | |
| inp = std * inp + mean | |
| inp = np.clip(inp, 0, 1) | |
| plt.imshow(inp) | |
| if title is not None: | |
| plt.title(title) | |
| plt.pause(0.001) | |
| def visualize_model(self,model, num_images=6): | |
| was_training = model.training | |
| model.eval() | |
| images_so_far = 0 | |
| fig = plt.figure() | |
| with torch.no_grad(): | |
| for i, (inputs, labels) in enumerate(self.dataloaders['val']): | |
| inputs = inputs.to(self.device) | |
| labels = labels.to(self.device) | |
| outputs = model(inputs) | |
| _, preds = torch.max(outputs, 1) | |
| for j in range(inputs.size()[0]): | |
| images_so_far += 1 | |
| ax = plt.subplot(num_images//2, 2, images_so_far) | |
| ax.axis('off') | |
| ax.set_title(f'predicted: {self.class_names[preds[j]]}') | |
| self.imshow(inputs.cpu().data[j]) | |
| if images_so_far == num_images: | |
| model.train(mode=was_training) | |
| return | |
| model.train(mode=was_training) | |
| def pytorch_predict(self,model): | |
| ''' | |
| Make prediction from a pytorch model | |
| ''' | |
| # set model to evaluate model | |
| model.eval() | |
| y_true = torch.tensor([], dtype=torch.long, device=self.device) | |
| all_outputs = torch.tensor([], device=self.device) | |
| # deactivate autograd engine and reduce memory usage and speed up computations | |
| with torch.no_grad(): | |
| for data in self.dataloaders['test']: | |
| inputs = [i.to(self.device) for i in data[:-1]] | |
| labels = data[-1].to(self.device) | |
| outputs = model(*inputs) | |
| y_true = torch.cat((y_true, labels), 0) | |
| all_outputs = torch.cat((all_outputs, outputs), 0) | |
| y_true = y_true.cpu().numpy() | |
| _, y_pred = torch.max(all_outputs, 1) | |
| y_pred = y_pred.cpu().numpy() | |
| y_pred_prob = F.softmax(all_outputs, dim=1).cpu().numpy() | |
| return y_true, y_pred, y_pred_prob | |