| import torch |
| import torchvision |
| import torchinfo |
|
|
| import typing |
| import requests |
| import os |
| import zipfile |
| import mlxtend.plotting |
| import torchmetrics |
| from pathlib import Path |
| from timeit import default_timer as timer |
| from tqdm.auto import tqdm |
| import matplotlib |
|
|
| matplotlib.use("TkAgg") |
| from matplotlib import pyplot as plt |
|
|
|
|
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| TRAIN_MODEL = False |
| BATCH_SIZE = 32 |
| LEARNING_RATE = 0.001 |
| NUM_EPOCH = 10 |
| MODEL_PATH = Path("models") |
| MODEL_NAME = "EfficientNet_B3_20percent.pth" |
| MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME |
|
|
| |
| data_path = Path("data/") |
| image_path = data_path / "pizza_steak_sushi_20_percent" |
|
|
| |
| if image_path.is_dir(): |
| print(f"{image_path} directory exists.") |
| else: |
| print(f"Did not find {image_path} directory, creating one...") |
| image_path.mkdir(parents=True, exist_ok=True) |
|
|
| |
| with open(data_path / "pizza_steak_sushi_20_percent.zip", "wb") as f: |
| request = requests.get( |
| "https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi_20_percent.zip" |
| ) |
| print("Downloading pizza, steak, sushi data...") |
| f.write(request.content) |
|
|
| |
| with zipfile.ZipFile( |
| data_path / "pizza_steak_sushi_20_percent.zip", "r" |
| ) as zip_ref: |
| print("Unzipping pizza, steak, sushi data...") |
| zip_ref.extractall(image_path) |
|
|
| |
| os.remove(data_path / "pizza_steak_sushi_20_percent.zip") |
|
|
| train_dir = image_path / "train" |
| test_dir = image_path / "test" |
|
|
| manual_transform = torchvision.transforms.Compose( |
| [ |
| torchvision.transforms.Resize((224, 224)), |
| torchvision.transforms.ToTensor(), |
| torchvision.transforms.Normalize( |
| mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] |
| ), |
| ] |
| ) |
|
|
|
|
| def create_dataloaders( |
| train_dir: Path, |
| test_dir: Path, |
| batch_size: int, |
| num_workers: int, |
| transform: torchvision.transforms.Compose, |
| ) -> tuple[ |
| torch.utils.data.DataLoader, |
| torch.utils.data.DataLoader, |
| list[str], |
| torchvision.datasets.ImageFolder, |
| torchvision.datasets.ImageFolder, |
| ]: |
| train_data = torchvision.datasets.ImageFolder( |
| train_dir, |
| transform=transform, |
| ) |
| test_data = torchvision.datasets.ImageFolder( |
| test_dir, |
| transform=transform, |
| ) |
|
|
| class_names = train_data.classes |
|
|
| train_dataloader = torch.utils.data.DataLoader( |
| train_data, |
| batch_size=batch_size, |
| shuffle=True, |
| num_workers=num_workers, |
| pin_memory=True, |
| ) |
|
|
| test_dataloader = torch.utils.data.DataLoader( |
| test_data, |
| batch_size=batch_size, |
| num_workers=num_workers, |
| shuffle=False, |
| pin_memory=True, |
| ) |
|
|
| return ( |
| train_dataloader, |
| test_dataloader, |
| class_names, |
| train_data, |
| test_data, |
| ) |
|
|
|
|
| ( |
| train_dataloader_manual_transform, |
| test_dataloader_manual_transform, |
| class_names_manual_transform, |
| train_data, |
| test_data, |
| ) = create_dataloaders( |
| train_dir=train_dir, |
| test_dir=test_dir, |
| num_workers=os.cpu_count() or 0, |
| batch_size=BATCH_SIZE, |
| transform=manual_transform, |
| ) |
|
|
| weights = torchvision.models.EfficientNet_B3_Weights.DEFAULT |
|
|
| auto_transform = weights.transforms() |
|
|
| ( |
| train_dataloader, |
| test_dataloader, |
| class_names, |
| train_data, |
| test_data, |
| ) = create_dataloaders( |
| train_dir=train_dir, |
| test_dir=test_dir, |
| batch_size=BATCH_SIZE, |
| num_workers=os.cpu_count() or 0, |
| transform=auto_transform, |
| ) |
|
|
| model = torchvision.models.efficientnet_b3(weights=weights).to(device) |
|
|
| torchinfo.summary( |
| model=model, |
| input_size=(32, 3, 224, 224), |
| col_names=["input_size", "output_size", "num_params", "trainable"], |
| row_settings=["var_names"], |
| ) |
|
|
| for feature in model.features: |
| print(feature) |
|
|
| for param in model.features.parameters(): |
| param.requires_grad = False |
|
|
| print(f"Classifier part has (before changing):\n{model.classifier}") |
|
|
| torch.manual_seed(37) |
| torch.cuda.manual_seed(37) |
| output_shape = len(class_names) |
| model.classifier = torch.nn.Sequential( |
| torch.nn.Dropout(p=0.2, inplace=True), |
| torch.nn.Linear(in_features=1536, out_features=output_shape, bias=True), |
| ) |
|
|
| print(f"Classifier part has (after changing):\n{model.classifier}") |
|
|
| torchinfo.summary( |
| model=model, |
| input_size=(32, 3, 224, 224), |
| col_names=["input_size", "output_size", "num_params", "trainable"], |
| row_settings=["var_names"], |
| ) |
|
|
| loss_fn = torch.nn.CrossEntropyLoss() |
| optim = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE) |
|
|
|
|
| class Engine: |
| def __init__( |
| self, |
| train_dataloader: torch.utils.data.DataLoader, |
| test_dataloader: torch.utils.data.DataLoader, |
| model: torch.nn.Module, |
| optim: torch.optim.Optimizer, |
| loss_fn: torch.nn.Module, |
| device: typing.Literal["cuda", "cpu"], |
| num_epoch: int, |
| ): |
| self.train_dataloader = train_dataloader |
| self.test_dataloader = test_dataloader |
| self.optim = optim |
| self.loss_fn = loss_fn |
| self.device = device |
| self.num_epoch = num_epoch |
| self.model = model.to(device) |
|
|
| def _train_step(self) -> tuple[float, float]: |
| self.model.train() |
| loss_train = 0 |
| acc_train = 0 |
|
|
| for batch, (X, y) in enumerate(self.train_dataloader): |
| X, y = X.to(self.device), y.to(self.device) |
|
|
| train_pred = self.model(X) |
| loss = self.loss_fn(train_pred, y) |
|
|
| loss_train += loss.item() |
|
|
| optim.zero_grad() |
| loss.backward() |
| optim.step() |
|
|
| pred_class = torch.argmax(torch.softmax(train_pred, dim=1), dim=1) |
| acc = (pred_class == y).sum().item() / len(pred_class) |
|
|
| acc_train += acc |
|
|
| if batch % 2 == 0: |
| print(f"{batch} batches have been processed...") |
|
|
| loss_train = loss_train / len(self.train_dataloader) |
| acc_train = acc_train / len(self.train_dataloader) |
|
|
| return loss_train, acc_train |
|
|
| def _test_step(self) -> tuple[float, float]: |
| self.model.eval() |
| loss_test = 0 |
| acc_test = 0 |
|
|
| with torch.inference_mode(): |
| for batch, (X, y) in enumerate(self.test_dataloader): |
| X, y = X.to(self.device), y.to(self.device) |
|
|
| test_pred = self.model(X) |
| loss = self.loss_fn(test_pred, y) |
|
|
| loss_test += loss.item() |
|
|
| pred_class = torch.argmax(torch.softmax(test_pred, dim=1), dim=1) |
| acc = (pred_class == y).sum().item() / len(pred_class) |
| acc_test += acc |
|
|
| if batch % 2 == 0: |
| print(f"{batch} batches have been processed...") |
|
|
| loss_test = loss_test / len(self.test_dataloader) |
| acc_test = acc_test / len(self.test_dataloader) |
|
|
| return loss_test, acc_test |
|
|
| def train(self) -> tuple[list[float], list[float], list[float], list[float]]: |
| train_loss_list = [] |
| test_loss_list = [] |
| train_acc_list = [] |
| test_acc_list = [] |
| for epoch in tqdm(range(self.num_epoch)): |
| print(f"{'*' * 6} EPOCH NUM: {epoch} {'*' * 6}") |
|
|
| print("Starting the training...") |
| train_loss, train_acc = self._train_step() |
| print("Starting the testing...") |
| test_loss, test_acc = self._test_step() |
| print( |
| f"Train Loss: {train_loss:.3f} | Train Acc: {train_acc:.3f}" |
| f"Test Loss: {test_loss:.3f} | Test Acc: {test_acc:.3f}" |
| ) |
|
|
| train_loss_list.append(train_loss) |
| train_acc_list.append(train_acc) |
| test_loss_list.append(test_loss) |
| test_acc_list.append(test_acc) |
|
|
| return train_loss_list, train_acc_list, test_loss_list, test_acc_list |
|
|
|
|
| torch.manual_seed(37) |
| torch.cuda.manual_seed(37) |
| engine = Engine( |
| train_dataloader=train_dataloader, |
| test_dataloader=test_dataloader, |
| model=model, |
| optim=optim, |
| loss_fn=loss_fn, |
| num_epoch=NUM_EPOCH, |
| device=device, |
| ) |
|
|
|
|
| def plot_curves( |
| train_loss: list[float], |
| train_acc: list[float], |
| test_loss: list[float], |
| test_acc: list[float], |
| num_epoch: int, |
| ): |
| fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(12, 8)) |
|
|
| |
| ax[0].plot(range(num_epoch), train_loss, color="red", label="Train") |
| ax[0].plot(range(num_epoch), test_loss, color="blue", label="Test") |
| ax[0].set(xlabel="Epochs", ylabel="Loss", title="Train vs Test Loss") |
| ax[0].legend() |
|
|
| |
| ax[1].plot(range(num_epoch), train_acc, color="red", label="Train") |
| ax[1].plot(range(num_epoch), test_acc, color="blue", label="Test") |
| ax[1].set(xlabel="Epochs", ylabel="Accuracy", title="Train vs Test Accuracy") |
| ax[1].legend() |
|
|
| fig.suptitle("Loss and Accuracy Curve") |
| plt.savefig(f"{MODEL_NAME}_curves.png") |
| plt.show() |
|
|
|
|
| if TRAIN_MODEL: |
| start_time = timer() |
| train_loss, train_acc, test_loss, test_acc = engine.train() |
| end_time = timer() |
| print(f"INFO: Training process took {end_time - start_time:.3f} seconds.") |
|
|
| MODEL_PATH.mkdir(parents=True, exist_ok=True) |
| torch.save(obj=model.state_dict(), f=MODEL_SAVE_PATH) |
|
|
| plot_curves(train_loss, train_acc, test_loss, test_acc, NUM_EPOCH) |
|
|
| else: |
| model.load_state_dict( |
| torch.load(f=MODEL_SAVE_PATH, weights_only=True, map_location=device) |
| ) |
|
|
|
|
| |
| def give_predictions( |
| test_dataloader: torch.utils.data.DataLoader, |
| model: torch.nn.Module, |
| device: typing.Literal["cuda", "cpu"], |
| ) -> tuple[torch.Tensor, torch.Tensor]: |
| print("Starting the testing...") |
| model.to(device) |
|
|
| predictions = [] |
| logits_prob = [] |
| model.eval() |
| with torch.inference_mode(): |
| for X, y in tqdm(test_dataloader, desc="Doing Validation"): |
| X, y = X.to(device), y.to(device) |
|
|
| logits = model(X) |
|
|
| pred = torch.argmax(torch.softmax(logits, dim=1), dim=1) |
| logits_prob.append(torch.softmax(logits, dim=1).cpu()) |
|
|
| predictions.append(pred.cpu()) |
|
|
| return torch.cat(predictions), torch.cat(logits_prob) |
|
|
|
|
| |
| test_preds, logits_prob = give_predictions( |
| test_dataloader=test_dataloader, model=model, device=device |
| ) |
|
|
| confmat = torchmetrics.ConfusionMatrix(num_classes=len(class_names), task="multiclass") |
| confmat_tensor = confmat(preds=test_preds, target=torch.tensor(test_data.targets)) |
| fig, ax = mlxtend.plotting.plot_confusion_matrix( |
| conf_mat=confmat_tensor.numpy(), |
| class_names=class_names, |
| figsize=(10, 7), |
| ) |
| plt.savefig(f"{MODEL_NAME}_confusion_matrix.png") |
| plt.show() |
|
|
| |
| pred_wrong = [] |
| for i in range(len(test_preds)): |
| if test_preds[i] != test_data.targets[i]: |
| pred_wrong.append([test_data.targets[i], test_preds[i], logits_prob[i], i]) |
|
|
| pred_wrong.sort(key=lambda x: x[2][x[1]], reverse=True) |
|
|
| |
| |
| test_data_original = torchvision.datasets.ImageFolder( |
| test_dir, |
| transform=None, |
| ) |
|
|
| if len(pred_wrong) > 2: |
| nrows, ncols = len(pred_wrong) // 2 if len(pred_wrong) // 2 < 5 else 5, 2 |
| fig, ax = plt.subplots(nrows=nrows, ncols=ncols, figsize=(12, 8)) |
| for rows in range(nrows): |
| for cols in range(ncols): |
| index_1d = rows * ncols + cols |
| image, true_label_index = test_data_original[pred_wrong[index_1d][3]] |
| true_label = class_names[true_label_index] |
| pred_label_index = pred_wrong[index_1d][1] |
| pred_label = class_names[pred_label_index] |
| ax[rows][cols].imshow(image) |
| ax[rows][cols].set_title( |
| f"True: {true_label}:{pred_wrong[index_1d][2][true_label_index]:.2f} | Prediction: {pred_label}:{pred_wrong[index_1d][2][pred_label_index]:.2f}" |
| ) |
| ax[rows][cols].axis("off") |
| plt.savefig(f"{MODEL_NAME}_wrong_pred.png") |
| plt.show() |
| elif len(pred_wrong) == 1: |
| image, true_label_index = test_data_original[pred_wrong[0][3]] |
| true_label = class_names[true_label_index] |
| pred_label_index = pred_wrong[0][1] |
| pred_label = class_names[pred_label_index] |
| plt.imshow(image) |
|
|
| plt.title( |
| f"True: {true_label}:{pred_wrong[0][2][true_label_index]:.2f} | Prediction: {pred_label}:{pred_wrong[0][2][pred_label_index]:.2f}" |
| ) |
| plt.axis(False) |
| plt.savefig(f"{MODEL_NAME}_wrong_pred.png") |
| plt.show() |
|
|