File size: 7,339 Bytes
b83d9ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# In[1] 导入所需工具包
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
import time
from torch.nn import functional as F
from math import floor, ceil
from torch.utils.data import DataLoader,TensorDataset
# import torchvision.transforms as transforms
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# In[1] 设置超参数
num_epochs = 60
batch_size = 1000
learning_rate = 0.001
# In[2] 获取数据包括训练数据和测试数据
transform = torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
(0.1307,), (0.3081,))
])
train_set = torchvision.datasets.MNIST(root='MNIST', train=True, download=True)
train_data = train_set.data.float().unsqueeze(1) / 255.0
train_labels = train_set.targets
train_dataset = TensorDataset(train_data,train_labels)
train_loader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True)
test_set = torchvision.datasets.MNIST(root='MNIST', train=False, download=True)
test_data = test_set.data.float().unsqueeze(1) / 255.0
test_labels = test_set.targets
test_dataset = TensorDataset(test_data,test_labels)
test_loader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True)
# In[1] 定义卷积核
def conv3x3(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=True)
# In[1] 定义残差块
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(ResidualBlock, self).__init__()
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
# 下采样
if self.downsample:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
# In[1] 搭建残差神经网络
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=10):
super(ResNet, self).__init__()
self.in_channels = 16
self.conv = conv3x3(1, 16)
self.bn = nn.BatchNorm2d(16)
self.relu = nn.ReLU(inplace=True)
# 构建残差块,恒等映射
# in_channels == out_channels and stride = 1 所以这里我们构建残差块,没有下采样
self.layer1 = self.make_layer(block, 16, layers[0], stride=1)
# 不构建残差块,进行了下采样
# layers中记录的是数字,表示对应位置的残差块数目
self.layer2 = self.make_layer(block, 32, layers[1], 2)
# 不构建残差块,进行了下采样
self.layer3 = self.make_layer(block, 64, layers[2], 2)
self.avg_pool = nn.AvgPool2d(8)
self.fc1 = nn.Linear(3136, 128)
self.normfc12 = nn.LayerNorm((128), eps=1e-5)
self.fc2 = nn.Linear(128, num_classes)
def make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if (stride != 1) or (self.in_channels != out_channels):
downsample = nn.Sequential(
conv3x3(self.in_channels, out_channels, stride=stride),
nn.BatchNorm2d(out_channels))
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
# 当out_channels = 32时,in_channels也变成32了
self.in_channels = out_channels
# blocks是残差块的数目
# 残差块之后的网络结构,是out_channels->out_channels的
# 可以说,make_layer做的是输出尺寸相同的所有网络结构
# 由于输出尺寸会改变,我们用make_layers去生成一大块对应尺寸完整网络结构
for i in range(1, blocks):
layers.append(block(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv(x)
out = self.bn(out)
out = self.relu(out)
# layer1是三块in_channels等于16的网络结构,包括三个恒等映射
out = self.layer1(out)
# layer2包括了16->32下采样,然后是32的三个恒等映射
out = self.layer2(out)
# layer3包括了32->64的下采样,然后是64的三个恒等映射
out = self.layer3(out)
# out = self.avg_pool(out)
# 全连接压缩
# out.size(0)可以看作是batch_size
out = out.view(out.size(0), -1)
out = self.fc1(out)
out = self.normfc12(out)
out = self.relu(out)
out = self.fc2(out)
return out
# In[1] 定义模型和损失函数
# [2,2,2]表示的是不同in_channels下的恒等映射数目
model = ResNet(ResidualBlock, [2, 2, 2]).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# In[1] 设置一个通过优化器更新学习率的函数
def update_lr(optimizer, lr):
for param_group in optimizer.param_groups:
param_group['lr'] = lr
# In[1] 定义测试函数
def test(model, test_loader):
model.eval()
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))
# In[1] 训练模型更新学习率
total_step = len(train_loader)
curr_lr = learning_rate
for epoch in range(num_epochs):
in_epoch = time.time()
for i, (images, labels) in enumerate(train_loader):
images = images.to(device)
labels = labels.to(device)
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i + 1) % 100 == 0:
print("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}"
.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
test(model, test_loader)
out_epoch = time.time()
print(f"use {(out_epoch - in_epoch) // 60}min{(out_epoch - in_epoch) % 60}s")
if (epoch + 1) % 20 == 0:
curr_lr /= 3
update_lr(optimizer, curr_lr)
# In[1] 测试模型并保存
model.eval()
with torch.no_grad():
correct = 0
total = 0
for images, labels in test_loader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))
torch.save(model.state_dict(), '../resnet.ckpt')
|