File size: 5,203 Bytes
4ec6f12 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
from torchvision.transforms import ToTensor
from torchvision.transforms import v2
from torchvision import transforms
import matplotlib.pyplot as plt
from time import time
from torch import nn
import pandas as pd
import numpy as np
import torch, os
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
# Going to be using keras
input_shape = (224, 224, 3)
device = (
"cuda"
if torch.cuda.is_available()
else "mps"
if torch.backends.mps.is_available()
else "cpu"
)
class MakiAlexNet(nn.Module):
def __init__(self, num_classes=2):
super(MakiAlexNet, self).__init__()
self.num_classes = num_classes
self.conv1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4, padding=1) # LazyConv2d determine the input channels automatically.
self.conv2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, padding=2)
self.conv3 = nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, padding=1) # 256, 384
self.conv4 = nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, padding=1) # 384,384
self.conv5 = nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, padding=1) # 384, 256
self.activation = nn.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2)
self.dropout = nn.Dropout(p=0.5)
self.f_linear = nn.Linear(256, self.num_classes)
# Replace Flatten with GlobalAvgPool2d
self.gap = nn.AvgPool2d(5) # Adjust output size if needed
# In this case LazyLinear is really useful after flattening,
# such that abstraction is made from the initial output layer and the linear layer nodes.
# Create an empty dictionary to store layer outputs
self.layer_outputs = {}
# Register hooks for desired layers
self.conv5.register_forward_hook(self._save_layer_output)
self.f_linear.register_forward_hook(self._save_layer_output)
def _save_to_output_weights(self, module, input, output):
self.layer_outputs[module.__class__.__name__] = {"input": input, "output": output, "weights": module.weight.data}
def _save_layer_output(self, module, input, output):
self.layer_outputs[module.__class__.__name__] = output
def forward(self, x):
"""Defined forward pass of AlexNet for learning left or right prediction."""
x = self.conv1(x) # wider
x = self.activation(x)
x = self.maxpool(x) # down sample.
x = self.conv2(x) # wider.
x = self.activation(x)
x = self.maxpool(x) # down sample.
x = self.conv3(x) # wider.
x = self.activation(x)
x = self.conv4(x)
x = self.activation(x)
x = self.conv5(x)
x = self.activation(x)
x = self.maxpool(x) # down sample.
x = self.gap(x).squeeze(-1).squeeze(-1)
# x = self.activation(x)
x = self.dropout(x)
x = self.f_linear(x)
return x
if __name__ == "__main__":
from dataset_creation import test_loader, train_loader # Initiate the custom dataloaders and datasets here.
# Running the model to learn, also introducing good features to make it learn better like a cosine scheduler for the learning rate.
EPOCH = 35
model = MakiAlexNet()
model.to(device)
print(model)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.00001 * 5, weight_decay=0.0001, momentum=0.9)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
if os.path.exists("best_model_2.0.txt"):
with open("best_model_2.0.txt", "r") as file:
best_accuracy = float(file.read())
else:
best_accuracy = 0.0
# create a new file
with open("best_model_2.0.txt", "w") as file:
file.write(f"{best_accuracy}")
for epoch in tqdm(range(EPOCH), desc="Training Epoch Cycle"):
model.train() # Set model to training mode
running_loss = 0.0
for i, data in enumerate(train_loader, 0):
if i % 10 == 0:
print(f"Internal Loop of batches: {i}")
inputs, labels = data
# print(type(labels), labels)
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
train_loss = running_loss / len(train_loader)
print(f'Epoch [{epoch + 1}] training loss: {train_loss:.3f}')
# Validation phase
model.eval() # Set model to evaluation mode
val_running_loss = 0.0
val_correct = 0
val_total = 0
with torch.no_grad():
for data in test_loader: # Assuming test_loader is used as a validation loader
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_running_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
val_total += labels.size(0)
val_correct += (predicted == labels).sum().item()
val_loss = val_running_loss / len(test_loader)
val_accuracy = 100 * val_correct / val_total
print(f'Epoch [{epoch + 1}] validation loss: {val_loss:.3f}, accuracy: {val_accuracy:.2f}%')
if val_accuracy > best_accuracy:
best_accuracy = val_accuracy
torch.save(model.state_dict(), "alexnet_2.0.pth")
with open("best_model_2.0.txt", "w") as file:
file.write(f"{best_accuracy}")
# Update the LR scheduler with validation loss
scheduler.step(val_loss)
|