landmarkclassifier / src /train.py
mewhenmonkeyavatar's picture
real initial commit.
4b7c478
Raw
History Blame Contribute Delete
7.93 kB
import tempfile
import torch
import numpy as np
from livelossplot import PlotLosses
from livelossplot.outputs import MatplotlibPlot
from tqdm import tqdm
from src.helpers import after_subplot
def train_one_epoch(train_dataloader, model, optimizer, loss):
"""
Performs one train_one_epoch epoch
"""
if torch.cuda.is_available():
# MY CODE HERE: transfer the model to the GPU
# HINT: use .cuda()
model.cuda()
# MY CODE HERE: set the model to training mode
model.train()
train_loss = 0.0
for batch_idx, (data, target) in tqdm(
enumerate(train_dataloader),
desc="Training",
total=len(train_dataloader),
leave=True,
ncols=80,
):
# move data to GPU
if torch.cuda.is_available():
data, target = data.cuda(), target.cuda()
# MY CODE HERE:
# 1. clear the gradients of all optimized variables
optimizer.zero_grad()
# 2. forward pass: compute predicted outputs by passing inputs to the model
output = model.forward(data)
# 3. calculate the loss
loss_value = loss(output, target)
# 4. backward pass: compute gradient of the loss with respect to model parameters
loss_value.backward()
# 5. perform a single optimization step (parameter update)
optimizer.step()
# update average training loss
train_loss = train_loss + (
(1 / (batch_idx + 1)) * (loss_value.data.item() - train_loss)
)
return train_loss
def valid_one_epoch(valid_dataloader, model, loss):
"""
Validate at the end of one epoch
"""
with torch.no_grad():
# set the model to evaluation mode
# MY CODE HERE
model.eval()
if torch.cuda.is_available():
model.cuda()
valid_loss = 0.0
for batch_idx, (data, target) in tqdm(
enumerate(valid_dataloader),
desc="Validating",
total=len(valid_dataloader),
leave=True,
ncols=80,
):
# move data to GPU
if torch.cuda.is_available():
data, target = data.cuda(), target.cuda()
# 1. forward pass: compute predicted outputs by passing inputs to the model
output = model.forward(data)
# 2. calculate the loss
loss_value = loss(output, target)
# Calculate average validation loss
valid_loss = valid_loss + (
(1 / (batch_idx + 1)) * (loss_value.data.item() - valid_loss)
)
return valid_loss
def optimize(data_loaders, model, optimizer, loss, n_epochs, save_path, interactive_tracking=False):
# initialize tracker for minimum validation loss
if interactive_tracking:
liveloss = PlotLosses(outputs=[MatplotlibPlot(after_subplot=after_subplot)])
else:
liveloss = None
valid_loss_min = None
logs = {}
# Learning rate scheduler: setup a learning rate scheduler that
# reduces the learning rate when the validation loss reaches a
# plateau
# HINT: look here:
# https://pytorch.org/docs/stable/optim.html#how-to-adjust-learning-rate
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, n_epochs)
# print(f"lr: {scheduler.get_last_lr()}") # replacement for the deprecated 'verbose' param in the above function
for epoch in range(1, n_epochs + 1):
train_loss = train_one_epoch(
data_loaders["train"], model, optimizer, loss
)
valid_loss = valid_one_epoch(data_loaders["valid"], model, loss)
# print training/validation statistics
print(
"Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}".format(
epoch, train_loss, valid_loss
)
)
# If the validation loss decreases by more than 1%, save the model
if valid_loss_min is None or (
(valid_loss_min - valid_loss) / valid_loss_min > 0.01
):
print(f"New minimum validation loss: {valid_loss:.6f}. Saving model ...")
# Save the weights to save_path
# MY CODE HERE
torch.save(model.state_dict(), save_path)
valid_loss_min = valid_loss
# Update learning rate, i.e., make a step in the learning rate scheduler
# MY CODE HERE
scheduler.step(valid_loss_min)
# Log the losses and the current learning rate
if interactive_tracking:
logs["loss"] = train_loss
logs["val_loss"] = valid_loss
logs["lr"] = optimizer.param_groups[0]["lr"]
liveloss.update(logs)
liveloss.send()
def one_epoch_test(test_dataloader, model, loss):
# monitor test loss and accuracy
test_loss = 0.
correct = 0.
total = 0.
# set the module to evaluation mode
with torch.no_grad():
# set the model to evaluation mode
# MY CODE HERE
model.eval()
if torch.cuda.is_available():
model = model.cuda()
for batch_idx, (data, target) in tqdm(
enumerate(test_dataloader),
desc='Testing',
total=len(test_dataloader),
leave=True,
ncols=80
):
# move data to GPU
if torch.cuda.is_available():
data, target = data.cuda(), target.cuda()
# 1. forward pass: compute predicted outputs by passing inputs to the model
logits = model(data)
# 2. calculate the loss
loss_value = loss(logits, target)
# update average test loss
test_loss = test_loss + ((1 / (batch_idx + 1)) * (loss_value.data.item() - test_loss))
# convert logits to predicted class
# HINT: the predicted class is the index of the max of the logits
pred = torch.argmax(logits, dim=1)
# compare predictions to true label
correct += torch.sum(torch.squeeze(pred.eq(target.data.view_as(pred))).cpu())
total += data.size(0)
print('Test Loss: {:.6f}\n'.format(test_loss))
print('\nTest Accuracy: %2d%% (%2d/%2d)' % (
100. * correct / total, correct, total))
return test_loss
######################################################################################
# TESTS
######################################################################################
import pytest
@pytest.fixture(scope="session")
def data_loaders():
from .data import get_data_loaders
return get_data_loaders(batch_size=50, limit=200, valid_size=0.5, num_workers=0)
@pytest.fixture(scope="session")
def optim_objects():
from src.optimization import get_optimizer, get_loss
from src.model import MyModel
model = MyModel(50)
return model, get_loss(), get_optimizer(model)
def test_train_one_epoch(data_loaders, optim_objects):
model, loss, optimizer = optim_objects
for _ in range(2):
lt = train_one_epoch(data_loaders['train'], model, optimizer, loss)
assert not np.isnan(lt), "Training loss is nan"
def test_valid_one_epoch(data_loaders, optim_objects):
model, loss, optimizer = optim_objects
for _ in range(2):
lv = valid_one_epoch(data_loaders["valid"], model, loss)
assert not np.isnan(lv), "Validation loss is nan"
def test_optimize(data_loaders, optim_objects):
model, loss, optimizer = optim_objects
with tempfile.TemporaryDirectory() as temp_dir:
optimize(data_loaders, model, optimizer, loss, 2, f"{temp_dir}/hey.pt")
def test_one_epoch_test(data_loaders, optim_objects):
model, loss, optimizer = optim_objects
tv = one_epoch_test(data_loaders["test"], model, loss)
assert not np.isnan(tv), "Test loss is nan"