File size: 7,339 Bytes
b83d9ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# In[1] 导入所需工具包
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
import time
from torch.nn import functional as F
from math import floor, ceil
from torch.utils.data import DataLoader,TensorDataset
# import torchvision.transforms as transforms
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# In[1] 设置超参数
num_epochs = 60
batch_size = 1000
learning_rate = 0.001

# In[2] 获取数据包括训练数据和测试数据

transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(
        (0.1307,), (0.3081,))
])


train_set = torchvision.datasets.MNIST(root='MNIST', train=True, download=True)
train_data = train_set.data.float().unsqueeze(1) / 255.0
train_labels = train_set.targets
train_dataset =  TensorDataset(train_data,train_labels)
train_loader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True)


test_set = torchvision.datasets.MNIST(root='MNIST', train=False, download=True)
test_data = test_set.data.float().unsqueeze(1) / 255.0
test_labels = test_set.targets
test_dataset =  TensorDataset(test_data,test_labels)
test_loader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True)



# In[1] 定义卷积核
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3,
                     stride=stride, padding=1, bias=True)


# In[1] 定义残差块
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        # 下采样
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out


# In[1] 搭建残差神经网络
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(1, 16)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        # 构建残差块,恒等映射
        # in_channels == out_channels and stride = 1 所以这里我们构建残差块,没有下采样
        self.layer1 = self.make_layer(block, 16, layers[0], stride=1)
        # 不构建残差块,进行了下采样
        # layers中记录的是数字,表示对应位置的残差块数目
        self.layer2 = self.make_layer(block, 32, layers[1], 2)
        # 不构建残差块,进行了下采样
        self.layer3 = self.make_layer(block, 64, layers[2], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc1 = nn.Linear(3136, 128)
        self.normfc12 = nn.LayerNorm((128), eps=1e-5)
        self.fc2 = nn.Linear(128, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        # 当out_channels = 32时,in_channels也变成32了
        self.in_channels = out_channels
        # blocks是残差块的数目
        # 残差块之后的网络结构,是out_channels->out_channels的
        # 可以说,make_layer做的是输出尺寸相同的所有网络结构
        # 由于输出尺寸会改变,我们用make_layers去生成一大块对应尺寸完整网络结构
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        # layer1是三块in_channels等于16的网络结构,包括三个恒等映射
        out = self.layer1(out)
        # layer2包括了16->32下采样,然后是32的三个恒等映射
        out = self.layer2(out)
        # layer3包括了32->64的下采样,然后是64的三个恒等映射
        out = self.layer3(out)
        # out = self.avg_pool(out)
        # 全连接压缩
        # out.size(0)可以看作是batch_size
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.normfc12(out)
        out = self.relu(out)
        out = self.fc2(out)
        return out


# In[1] 定义模型和损失函数
# [2,2,2]表示的是不同in_channels下的恒等映射数目
model = ResNet(ResidualBlock, [2, 2, 2]).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


# In[1] 设置一个通过优化器更新学习率的函数
def update_lr(optimizer, lr):
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


# In[1] 定义测试函数
def test(model, test_loader):
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))


# In[1] 训练模型更新学习率
total_step = len(train_loader)
curr_lr = learning_rate
for epoch in range(num_epochs):
    in_epoch = time.time()
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % 100 == 0:
            print("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}"
                  .format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
    test(model, test_loader)
    out_epoch = time.time()
    print(f"use {(out_epoch - in_epoch) // 60}min{(out_epoch - in_epoch) % 60}s")
    if (epoch + 1) % 20 == 0:
        curr_lr /= 3
        update_lr(optimizer, curr_lr)
# In[1] 测试模型并保存
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))

torch.save(model.state_dict(), '../resnet.ckpt')