| import copy |
| import math |
| import random |
| import unittest |
|
|
| import torch |
| import torch.nn.functional as F |
| from torch import nn |
|
|
| try: |
| import apex |
| except ImportError as e: |
| HAS_APEX = False |
| else: |
| HAS_APEX = True |
|
|
|
|
| class Model(torch.nn.Module): |
| def __init__(self): |
| super(Model, self).__init__() |
| self.conv1 = nn.Conv2d(1, 6, 5) |
| self.relu1 = nn.ReLU() |
| self.pool1 = nn.MaxPool2d(2) |
| self.conv2 = nn.Conv2d(6, 16, 5) |
| self.relu2 = nn.ReLU() |
| self.pool2 = nn.MaxPool2d(2) |
| self.fc1 = nn.Linear(256, 120) |
| self.relu3 = nn.ReLU() |
| self.fc2 = nn.Linear(120, 84) |
| self.relu4 = nn.ReLU() |
| self.fc3 = nn.Linear(84, 10) |
| self.relu5 = nn.ReLU() |
|
|
| def forward(self, x): |
| y = self.conv1(x) |
| y = self.relu1(y) |
| y = self.pool1(y) |
| y = self.conv2(y) |
| y = self.relu2(y) |
| y = self.pool2(y) |
| y = y.reshape(y.shape[0], -1) |
| y = self.fc1(y) |
| y = self.relu3(y) |
| y = self.fc2(y) |
| y = self.relu4(y) |
| y = self.fc3(y) |
| y = self.relu5(y) |
| return y |
|
|
|
|
| @unittest.skipIf(not HAS_APEX, "`apex` is not found.") |
| class AdamTest(unittest.TestCase): |
| def setUp(self, seed=0): |
| super().setUp() |
| torch.manual_seed(seed) |
|
|
| self.model = Model().cuda() |
| self.model_ = Model().cuda() |
| self.model_.load_state_dict(copy.deepcopy(self.model.state_dict())) |
|
|
| self.lr = 0.00001 |
| params = [p for p in self.model.parameters() if p.requires_grad] |
| self.optimizer = torch.optim.Adam(params, lr=self.lr) |
|
|
| def testGradScaler(self): |
| params_ = [p for p in self.model_.parameters() if p.requires_grad] |
| optimizer_ = apex.optimizers.FusedAdam(params_, lr=self.lr, capturable=False) |
| scaler = torch.cuda.amp.GradScaler(enabled=True) |
| scaler_ = torch.cuda.amp.GradScaler(enabled=True) |
|
|
| for i in range(100): |
| x = torch.rand([32, 1, 28, 28]).cuda().to(memory_format=torch.channels_last) |
| x_ = x.clone() |
| gt = torch.rand([32, 10]).cuda() |
| gt_ = gt.clone() |
|
|
| |
| with torch.cuda.amp.autocast(enabled=True): |
| y = self.model(x) |
| loss = ((gt - y) ** 2).mean() |
|
|
| scaler.scale(loss).backward() |
| scaler.step(self.optimizer) |
| scaler.update() |
| |
| |
| with torch.cuda.amp.autocast(enabled=True): |
| y = self.model_(x) |
| loss_ = ((gt_ - y) ** 2).mean() |
|
|
| scaler_.scale(loss_).backward() |
| scaler_.step(optimizer_) |
| scaler_.update() |
|
|
| for module in zip(self.model.modules(), self.model_.modules()): |
| m = module[0] |
| m_ = module[1] |
| if isinstance(m, nn.Conv2d) or isinstance(m_, nn.Linear): |
| torch.testing.assert_close(m.weight, m_.weight, atol=1e-3, rtol=1e-3, equal_nan=True) |
| torch.testing.assert_close(m.weight.grad, m_.weight.grad, atol=1e-3, rtol=1e-3, equal_nan=True) |
|
|
| |
| self.optimizer.zero_grad() |
| optimizer_.zero_grad() |
|
|
| self.model_.load_state_dict(copy.deepcopy(self.model.state_dict())) |
| |
| def testGradScalerCapturable(self): |
| params_ = [p for p in self.model_.parameters() if p.requires_grad] |
| optimizer_ = apex.optimizers.FusedAdam(params_, lr=self.lr, capturable=True) |
| scaler = torch.cuda.amp.GradScaler(enabled=True) |
| scaler_ = torch.cuda.amp.GradScaler(enabled=True) |
|
|
| for i in range(100): |
| x = torch.rand([32, 1, 28, 28]).cuda().to(memory_format=torch.channels_last) |
| x_ = x.clone() |
| gt = torch.rand([32, 10]).cuda() |
| gt_ = gt.clone() |
|
|
| |
| with torch.cuda.amp.autocast(enabled=True): |
| y = self.model(x) |
| loss = ((gt - y) ** 2).mean() |
|
|
| scaler.scale(loss).backward() |
| scaler.step(self.optimizer) |
| scaler.update() |
| |
| |
| with torch.cuda.amp.autocast(enabled=True): |
| y = self.model_(x) |
| loss_ = ((gt_ - y) ** 2).mean() |
|
|
| scaler_.scale(loss_).backward() |
| scaler_.step(optimizer_) |
| scaler_.update() |
|
|
| for module in zip(self.model.modules(), self.model_.modules()): |
| m = module[0] |
| m_ = module[1] |
| if isinstance(m, nn.Conv2d) or isinstance(m_, nn.Linear): |
| torch.testing.assert_close(m.weight, m_.weight, atol=1e-3, rtol=1e-3, equal_nan=True) |
| torch.testing.assert_close(m.weight.grad, m_.weight.grad, atol=1e-3, rtol=1e-3, equal_nan=True) |
|
|
| |
| self.optimizer.zero_grad() |
| optimizer_.zero_grad() |
|
|
| self.model_.load_state_dict(copy.deepcopy(self.model.state_dict())) |
| |
| def testNative(self): |
| params_ = [p for p in self.model_.parameters() if p.requires_grad] |
| optimizer_ = apex.optimizers.FusedAdam(params_, lr=self.lr, capturable=False) |
|
|
| for i in range(100): |
| x = torch.rand([32, 1, 28, 28]).cuda().to(memory_format=torch.channels_last) |
| x_ = x.clone() |
| gt = torch.rand([32, 10]).cuda() |
| gt_ = gt.clone() |
|
|
| |
| y = self.model(x) |
| loss = ((gt - y) ** 2).mean() |
|
|
| loss.backward() |
| self.optimizer.step() |
| |
| |
| y = self.model_(x) |
| loss_ = ((gt_ - y) ** 2).mean() |
|
|
| loss_.backward() |
| optimizer_.step() |
|
|
| for module in zip(self.model.modules(), self.model_.modules()): |
| m = module[0] |
| m_ = module[1] |
| if isinstance(m, nn.Conv2d) or isinstance(m_, nn.Linear): |
| torch.testing.assert_close(m.weight, m_.weight, atol=1e-3, rtol=1e-3, equal_nan=True) |
| torch.testing.assert_close(m.weight.grad, m_.weight.grad, atol=1e-3, rtol=1e-3, equal_nan=True) |
|
|
| |
| self.optimizer.zero_grad() |
| optimizer_.zero_grad() |
| |
| self.model_.load_state_dict(copy.deepcopy(self.model.state_dict())) |
|
|
|
|
| if __name__ == '__main__': |
| unittest.main() |
|
|
|
|