Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torchvision import datasets, transforms, models | |
| from torch.utils.data import DataLoader | |
| import os | |
| import copy | |
| from torch.optim.lr_scheduler import ReduceLROnPlateau | |
| from torchvision.models import resnet50, ResNet50_Weights | |
| import ssl | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| # data transformations with augmentation | |
| train_transform = transforms.Compose([ | |
| transforms.RandomResizedCrop(224), | |
| transforms.RandomHorizontalFlip(), | |
| transforms.RandomRotation(10), | |
| transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| val_test_transform = transforms.Compose([ | |
| transforms.Resize(256), | |
| transforms.CenterCrop(224), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| class ResNetLungCancer(nn.Module): | |
| def __init__(self, num_classes, use_pretrained=True): | |
| super(ResNetLungCancer, self).__init__() | |
| if use_pretrained: | |
| weights = ResNet50_Weights.IMAGENET1K_V1 | |
| else: | |
| weights = None | |
| self.resnet = resnet50(weights=weights) | |
| num_ftrs = self.resnet.fc.in_features | |
| self.resnet.fc = nn.Identity() # remove the final fully connected layer | |
| self.fc = nn.Sequential( | |
| nn.Linear(num_ftrs, 256), | |
| nn.ReLU(), | |
| nn.Dropout(0.5), | |
| nn.Linear(256, num_classes) | |
| ) | |
| def forward(self, x): | |
| x = self.resnet(x) | |
| return self.fc(x) | |
| # train function | |
| def train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=50, device='cuda'): | |
| best_model_wts = copy.deepcopy(model.state_dict()) | |
| best_acc = 0.0 | |
| for epoch in range(num_epochs): | |
| print(f'Epoch {epoch}/{num_epochs - 1}') | |
| print('-' * 10) | |
| for phase in ['train', 'valid']: | |
| if phase == 'train': | |
| model.train() | |
| dataloader = train_loader | |
| else: | |
| model.eval() | |
| dataloader = valid_loader | |
| running_loss = 0.0 | |
| running_corrects = 0 | |
| for inputs, labels in dataloader: | |
| inputs = inputs.to(device) | |
| labels = labels.to(device) | |
| optimizer.zero_grad() | |
| with torch.set_grad_enabled(phase == 'train'): | |
| outputs = model(inputs) | |
| _, preds = torch.max(outputs, 1) | |
| loss = criterion(outputs, labels) | |
| if phase == 'train': | |
| loss.backward() | |
| optimizer.step() | |
| running_loss += loss.item() * inputs.size(0) | |
| running_corrects += torch.sum(preds == labels.data) | |
| epoch_loss = running_loss / len(dataloader.dataset) | |
| epoch_acc = running_corrects.double() / len(dataloader.dataset) | |
| print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') | |
| if phase == 'valid': | |
| scheduler.step(epoch_acc) | |
| current_lr = optimizer.param_groups[0]['lr'] | |
| print(f'Learning rate: {current_lr}') | |
| if epoch_acc > best_acc: | |
| best_acc = epoch_acc | |
| best_model_wts = copy.deepcopy(model.state_dict()) | |
| print() | |
| print(f'Best val Acc: {best_acc:.4f}') | |
| model.load_state_dict(best_model_wts) | |
| return model | |
| # eval the model | |
| def evaluate_model(model, test_loader, device='cuda'): | |
| model.eval() | |
| running_corrects = 0 | |
| with torch.no_grad(): | |
| for inputs, labels in test_loader: | |
| inputs = inputs.to(device) | |
| labels = labels.to(device) | |
| outputs = model(inputs) | |
| _, preds = torch.max(outputs, 1) | |
| running_corrects += torch.sum(preds == labels.data) | |
| test_acc = running_corrects.double() / len(test_loader.dataset) | |
| print(f'Test Acc: {test_acc:.4f}') | |
| if __name__ == "__main__": | |
| # device | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| # data | |
| data_dir = 'Processed_Data' | |
| train_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=train_transform) | |
| valid_dataset = datasets.ImageFolder(os.path.join(data_dir, 'valid'), transform=val_test_transform) | |
| test_dataset = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=val_test_transform) | |
| # dataloaders | |
| batch_size = 32 | |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4) | |
| valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, num_workers=4) | |
| test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4) | |
| print(f"Number of training images: {len(train_dataset)}") | |
| print(f"Number of validation images: {len(valid_dataset)}") | |
| print(f"Number of test images: {len(test_dataset)}") | |
| # initialize model, loss, and optimizer | |
| num_classes = len(train_dataset.classes) | |
| model = ResNetLungCancer(num_classes) | |
| model = model.to(device) | |
| criterion = nn.CrossEntropyLoss() | |
| pretrained_params = list(model.resnet.parameters()) | |
| new_params = list(model.fc.parameters()) | |
| optimizer = optim.Adam([ | |
| {'params': pretrained_params, 'lr': 1e-5}, | |
| {'params': new_params, 'lr': 1e-4} | |
| ], weight_decay=1e-6) | |
| scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=7) | |
| # train the model | |
| trained_model = train_model(model, train_loader, valid_loader, criterion, optimizer, scheduler, num_epochs=50, device=device) | |
| # eval the model | |
| evaluate_model(trained_model, test_loader, device=device) | |
| # save the model weights | |
| torch.save(trained_model.state_dict(), 'lung_cancer_detection_model.pth') | |
| # save the model in ONNX format | |
| dummy_input = torch.randn(1, 3, 224, 224).to(device) | |
| torch.onnx.export(trained_model, dummy_input, "lung_cancer_detection_model.onnx", input_names=['input'], output_names=['output']) | |
| print("Training completed. Model saved.") |