卷积神经网络构建

5407 字 · 416 阅读 · 2023 年 05 月 29 日

本文已更新,你可以访问 Hands-on AI 以获得更好的阅读体验。
本篇文章需 特别授权许可,内容版权归作者所有,未经授权,禁止转载。

介绍

前面,我们已经学习了卷积神经网络的原理,尤其对卷积层和池化层等关键组件进行了详细说明。这篇文章会关注于如何利用深度学习框架构建卷积神经网络,并完成模型训练。

知识点

  • TensorFlow 高阶 API 构建
  • TensorFlow 低阶 API 构建
  • PyTorch 低阶 API 构建
  • PyTorch 高阶 API 构建

前面的实验已经学习了如何使用 TensorFlow 和 PyTorch 搭建人工神经网络并完成训练。实际上,构建卷积神经网络的流程和全连接神经网络别无二致,区别仅在于向神经网络中添加卷积神经网络所需的特殊组件。

接下来,实验将尝试使用前面学过的 4 种方法来搭建一个卷积神经网络,其分别是:TensorFlow 低阶 tf.nn 模块和高阶 tf.keras 模块,以及 PyTorch 模块 nn.Modulenn.Sequential

实验仍然使用大家熟悉的 MNIST 数据集,该数据集样本数量适中,非常适合用于练习。神经网络则选择 Yann LeCun 在 1998 年发明的 LeNet-5 经典卷积神经网络结构。

数据预处理

对于 MNIST 数据集,我们已经分别使用 TensorFlow 和 PyTorch 加载过。实际上,深度学习框架加载数据集的 API 是直接从 LeCun 的网站 上下载相应的数据集。这里,我们从镜像服务器上下载镜像文件。

# 直接运行下载数据文件
wget -nc "http://labfile.oss.aliyuncs.com/courses/1081/MNIST_data.zip"
!unzip -o "MNIST_data.zip"

数据集一共包含四个文件,分别是:

  • 训练样本:train-images-idx3-ubyte.gz
  • 训练标签:train-labels-idx1-ubyte.gz
  • 测试样本:t10k-images-idx3-ubyte.gz
  • 测试标签:t10k-labels-idx1-ubyte.gz

数据使用 IDX 文件格式存储,首先需要将其读取出来。

# 此段数据读取代码无需掌握
import gzip
import numpy as np

def read_mnist(images_path, labels_path):
    with gzip.open("MNIST_data/" + labels_path, 'rb') as labelsFile:
        y = np.frombuffer(labelsFile.read(), dtype=np.uint8, offset=8)

    with gzip.open("MNIST_data/" + images_path, 'rb') as imagesFile:
        X = np.frombuffer(imagesFile.read(), dtype=np.uint8, offset=16) \
            .reshape(len(y), 784) \
            .reshape(len(y), 28, 28, 1)

    return X, y

train = {}
test = {}

train['X'], train['y'] = read_mnist(
    'train-images-idx3-ubyte.gz', 'train-labels-idx1-ubyte.gz')
test['X'], test['y'] = read_mnist(
    't10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz')

train['X'].shape, train['y'].shape, test['X'].shape, test['y'].shape

可以看到,训练集 60000 个,图像为 $28 \times 28$ 大小,其中灰度图像保留色彩通道 1。测试集包含 10000 个样本。我们可以可视化第一个样本。

from matplotlib import pyplot as plt
%matplotlib inline

plt.imshow(train['X'][0].reshape(28, 28), cmap=plt.cm.gray_r)

返回上面 LeNet-5 网络结构示意图,你会看到原始的 LeNet-5 结构输入图像尺寸为 32x32x1。所以,我们这里的数据预处理尚未结束,可以通过向 28x28x1 的图像外圈各填充 2 个 0 值来达到效果,这样就可以分毫不差地实现 LeNet-5 卷积神经网络了。

同时,由于 LeNet-5 最终是 Softmax 输出,所以我们需要对标签进行独热编码处理,这一步已经在前面的实验中学习过了。

# 样本 padding 填充
X_train = np.pad(train['X'], ((0, 0), (2, 2), (2, 2), (0, 0)), 'constant')
X_test = np.pad(test['X'], ((0, 0), (2, 2), (2, 2), (0, 0)), 'constant')
# 标签独热编码
y_train = np.eye(10)[train['y'].reshape(-1)]
y_test = np.eye(10)[test['y'].reshape(-1)]

X_train.shape, X_test.shape, y_train.shape, y_test.shape

我们可以再次通过可视化确认修改。

from matplotlib import pyplot as plt
%matplotlib inline

plt.imshow(X_train[0].reshape(32, 32), cmap=plt.cm.gray_r)

TensorFlow 高阶 tf.keras 构建

由于高阶 API 使用起来更加简单,所以我们首先使用 TensorFlow 提供的 Keras 来构建 LeNet-5 卷积神经网络。在这之前,需要先了解需要用到的卷积层和池化层。

其中,tf.keras.layers 下包含的卷积层类有:

  • tf.keras.layers.Conv1D :一般用于文本或时间序列上的一维卷积。
  • tf.keras.layers.Conv2D :一般用于图像空间上的二维卷积。
  • tf.keras.layers.Conv3D 。一般用于处理视频等包含多帧图像上的三维卷积。

关于 3 者详细区别可以阅读 Intuitive understanding... 实际上,我们在图像处理方面一般只会用到 tf.keras.layers.Conv2D

卷积层类包含的参数很多,不过我们一般用到的无外乎就是:

  • filters: 卷积核数量,整数。
  • kernel_size: 卷积核尺寸,元组。
  • strides: 卷积步长,元组。
  • padding: "valid""same"

特别注意,如果卷积层在模型第一层时,需要提供 input_shape 参数,例如,input_shape=(128, 128, 3) 表示 128x128 RGB 图像。此时,会关联上一个默认参数 data_format="channels_last",也就是色彩通道数量 3 在最后。当然,如果你的 input_shape=(3, 128, 128),就需要指定参数 data_format="channels_first"

其次,需要说明的是 padding 支持两个参数,分别是 "valid""same"(大小写敏感)。下面说一下这两个参数在 TensorFlow 中的含义。假设,输入矩阵的一行如下所示,包含 13 个单位。

当参数为 "valid" 时(默认),无法被卷积的像素将被丢弃。也就是不会对边距进行扩充,相当于不需要 Padding 操作。

当参数为 "same" 时,将通过 0 填补保证每一个输入像素都能被卷积。

如上所示,通过添加 3 个为 0 的格子保证每一个输入像素都能被卷积。添加时,如果遇到上面这种奇数的情况,默认在右边会更多。

除此之外,池化层在图像处理时一般也只会用到带 2D 的类,例如 LeNet-5 网络中需要的平均池化 tf.keras.layers.AveragePooling2D

池化层中常用的 2 个参数:pool_sizestridepool_size 池化窗口默认为 (2, 2),也就是将张量缩减为原来尺寸的一半大小。stride 表示缩小比例的因数,例如,2 会使得输入张量缩小一半。 如果是 None,那么默认值是 pool_size

下面,我们使用 TensorFlow 提供的 Keras 高阶 API 来构建 LeNet-5 卷积神经网络。除了如图所示的已知参数以外,像卷积核数量,激活函数等参考原论文提供的数据指定。

import tensorflow as tf

model = tf.keras.Sequential()  # 构建顺序模型

# 卷积层,6 个 5x5 卷积核,步长为 1,relu 激活,第一层需指定 input_shape
model.add(tf.keras.layers.Conv2D(filters=6, kernel_size=(5, 5), strides=(1, 1),
                                 activation='relu', input_shape=(32, 32, 1)))
# 平均池化,池化窗口默认为 2
model.add(tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2))
# 卷积层,16 个 5x5 卷积核,步为 1,relu 激活
model.add(tf.keras.layers.Conv2D(filters=16, kernel_size=(
    5, 5), strides=(1, 1), activation='relu'))
# 平均池化,池化窗口默认为 2
model.add(tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2))
# 需展平后才能与全连接层相连
model.add(tf.keras.layers.Flatten())
# 全连接层,输出为 120,relu 激活
model.add(tf.keras.layers.Dense(units=120, activation='relu'))
# 全连接层,输出为 84,relu 激活
model.add(tf.keras.layers.Dense(units=84, activation='relu'))
# 全连接层,输出为 10,Softmax 激活
model.add(tf.keras.layers.Dense(units=10, activation='softmax'))
# 查看网络结构
model.summary()

你可以看到,网络结构与图示完成一致。接下来,我们就可以编译模型并完成训练和评估了。

# 编译模型,Adam 优化器,多分类交叉熵损失函数,准确度评估
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
# 模型训练及评估
model.fit(X_train, y_train, batch_size=64, epochs=2, validation_data=(X_test, y_test))

小贴士

模型训练时间较长,你可以点击环境上方 即可强制中断。中断后无需重复运行上面的单元格,继续正常学习即可。

TensorFlow 低阶 tf.nn 构建

TensorFlow 低阶 API 主要会用到 tf.nn 模块,通过构建前向计算图后,再建立会话完成模型训练。其中,卷积函数为 tf.nn.conv2d ,平均池化函数为 tf.nn.avg_pool 。值得注意的是,虽然大部分参数含义上和 Keras 相似,但是用法上却不同,大家需要根据下面的示例代码并结合官方文档学习。

当我们使用 tf.nn 实现时,就需要像先前的全连接神经网络一样自行初始化权值(卷积核),这里可以利用 random_normal 按照给定任意的矩阵大小,随机生成权重数值。

class Model(object):
    def __init__(self):
        # 随机初始化张量参数
        self.conv_W1 = tf.Variable(tf.random.normal(shape=(5, 5, 1, 6)))
        self.conv_W2 = tf.Variable(tf.random.normal(shape=(5, 5, 6, 16)))
        self.fc_W1 = tf.Variable(tf.random.normal(shape=(5*5*16, 120)))
        self.fc_W2 = tf.Variable(tf.random.normal(shape=(120, 84)))
        self.out_W = tf.Variable(tf.random.normal(shape=(84, 10)))

        self.fc_b1 = tf.Variable(tf.zeros(120))
        self.fc_b2 = tf.Variable(tf.zeros(84))
        self.out_b = tf.Variable(tf.zeros(10))

    def __call__(self, x):
        x = tf.cast(x, tf.float32)  # 转换输入数据类型
        # 卷积层 1: Input = 32x32x1. Output = 28x28x6.
        conv1 = tf.nn.conv2d(x, self.conv_W1, strides=[1, 1, 1, 1], padding='VALID')
        # RELU 激活
        conv1 = tf.nn.relu(conv1)
        # 池化层 1: Input = 28x28x6. Output = 14x14x6.
        pool1 = tf.nn.avg_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
        # 卷积层 2: Input = 14x14x6. Output = 10x10x16.
        conv2 = tf.nn.conv2d(pool1, self.conv_W2, strides=[1, 1, 1, 1], padding='VALID')
        # RELU 激活
        conv2 = tf.nn.relu(conv2)
        # 池化层 2: Input = 10x10x16. Output = 5x5x16.
        pool2 = tf.nn.avg_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
        # 展平. Input = 5x5x16. Output = 400.
        flatten = tf.reshape(pool2, [-1, 5*5*16])
        # 全连接层
        fc1 = tf.nn.relu(tf.add(tf.matmul(flatten, self.fc_W1), self.fc_b1))
        fc2 = tf.nn.relu(tf.add(tf.matmul(fc1, self.fc_W2), self.fc_b2))
        outs = tf.add(tf.matmul(fc2, self.out_W), self.out_b)
        return outs

接下来的流程和之前使用 TensorFlow 完成 DIGITS 分类非常相似了。分别定义损失函数和准确度计算函数。

def loss_fn(model, x, y):
    preds = model(x)
    return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=preds, labels=y))
def accuracy_fn(logits, labels):
    preds = tf.argmax(logits, axis=1)  # 取值最大的索引,正好对应字符标签
    labels = tf.argmax(labels, axis=1)
    return tf.reduce_mean(tf.cast(tf.equal(preds, labels), tf.float32))

如果我们使用 TensorFlow 低阶 API 构建神经网络,那么就需要自行实现小批量训练过程,以防止一次传入数据太多导致内存爆掉。同样,这部分代码在先前的实验中也已经实现过了,所以我们直接拿过来修改合适的迭代次数和小批量大小即可。

from sklearn.model_selection import KFold
from tqdm.notebook import tqdm

EPOCHS = 2  # 迭代此时
BATCH_SIZE = 64  # 每次迭代的批量大小
LEARNING_RATE = 0.001  # 学习率

model = Model()  # 实例化模型类

for epoch in range(EPOCHS):  # 设定全数据集迭代次数
    indices = np.arange(len(X_train))  # 生成训练数据长度规则序列
    np.random.shuffle(indices)  # 对索引序列进行打乱,保证为随机数据划分
    batch_num = int(len(X_train)/BATCH_SIZE)  # 根据批量大小求得要划分的 batch 数量
    kf = KFold(n_splits=batch_num)  # 将数据分割成 batch 数量份
    # KFold 划分打乱后的索引序列,然后依据序列序列从数据中抽取 batch 样本
    for _, index in tqdm(kf.split(indices), desc='Training'):
        X_batch = X_train[indices[index]]  # 按打乱后的序列取出数据
        y_batch = y_train[indices[index]]

        with tf.GradientTape() as tape:  # 追踪梯度
            loss = loss_fn(model, X_batch, y_batch)

        trainable_variables = [model.conv_W1, model.conv_W2,
                               model.fc_W1, model.fc_W2, model.out_W,
                               model.fc_b1, model.fc_b2, model.out_b]  # 需优化参数列表
        grads = tape.gradient(loss, trainable_variables)  # 计算梯度

        optimizer = tf.optimizers.Adam(learning_rate=LEARNING_RATE)  # Adam 优化器
        optimizer.apply_gradients(zip(grads, trainable_variables))  # 更新梯度

    # 每一次 Epoch 执行小批量测试,防止内存不足
    indices_test = np.arange(len(X_test))
    batch_num_test = int(len(X_test)/BATCH_SIZE)
    kf_test = KFold(n_splits=batch_num_test)
    test_acc = 0
    for _, index in tqdm(kf_test.split(indices_test), desc='Testing'):
        X_test_batch = X_test[indices_test[index]]
        y_test_batch = y_test[indices_test[index]]
        batch_acc = accuracy_fn(model(X_test_batch), y_test_batch)  # 计算准确度
        test_acc += batch_acc  # batch 准确度求和

    accuracy = test_acc / batch_num_test  # 测试集准确度
    print(f'Epoch [{epoch+1}/{EPOCHS}], Accuracy: [{accuracy:.2f}], Loss: [{loss:.4f}]')

你可能会发现同样的网络结构我们使用 Keras 得到的效果会更好一些。实际上,这就是使用高阶 API 的好处,其在学习率,权重初始化等细节方面已经得到充分优化。所以,一般情况下能用 TensorFlow 高阶 API 搭建的网络,我们就不会使用低阶 API 自找麻烦。不过低阶 API 又更灵活,可以满足高级开发者或者学术研究人员的需要。

PyTorch 低阶 nn.Module 构建

我们已经使用 TensorFlow 提供的 2 种常用流程构建了 LeNet-5 卷积神经网络完成 MNIST 训练。接下来,实验使用 PyTorch 提供的 API 来重构 LeNet-5 的整个过程。

首先,我们基于 nn.Module 来构建神经网络。还记得吗?这里需要继承 nn.Module 基类来实现自定义神经网络结构。

import torch
import torch.nn as nn
import torch.nn.functional as F


class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        # 卷积层 1
        self.conv1 = nn.Conv2d(
            in_channels=1, out_channels=6, kernel_size=(5, 5), stride=1)
        # 池化层 1
        self.pool1 = nn.AvgPool2d(kernel_size=(2, 2))
        # 卷积层 2
        self.conv2 = nn.Conv2d(
            in_channels=6, out_channels=16, kernel_size=(5, 5), stride=1)
        # 池化层 2
        self.pool2 = nn.AvgPool2d(kernel_size=(2, 2))
        # 全连接层
        self.fc1 = nn.Linear(in_features=5*5*16, out_features=120)
        self.fc2 = nn.Linear(in_features=120, out_features=84)
        self.fc3 = nn.Linear(in_features=84, out_features=10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = x.reshape(-1, 5*5*16)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.softmax(self.fc3(x), dim=1)
        return x

值得注意的是,PyTorch 中卷积层参数与 TensorFlow 有别。PyTorch 没有 TensorFlow 中卷积核数量的参数,取而代之需要设定 out_channels 来代之卷积核数量。其实也很好理解,in_channels 个通道的图像输出传出 out_channels 个通道,自然也是通过 out_channels 个卷积核进行了处理。

池化层和全连接层应该很好理解。特别地,PyTorch 中没有 Flatten 展平层相关类,所以我们在网络中使用 x.reshape(-1, 5*5*16) 将张量转换为我们需要的形状。

下面,实例化自定义模型类,打印出模型结构。

model = LeNet()
model

很遗憾,PyTorch 并无法像 TensorFlow 那样打印出模型间张量尺寸的变化,不过你可以借助 Model summary in PyTorch 等第三方工具实现。实验中不再尝试。

下面,我们对模型进行测试,输入一个样本张量看一下能否正常在自定义模型间进行计算,且输出形状是否符合预期。当我们取出第一个样本 X_train[0] 时,其形状为 (32, 32, 1)。这并不符合 PyTorch 样本传入形状。我们需要按照 PyTorch 的特性,将样本转换为网络能接受的形状。

model(torch.Tensor(X_train[0]).reshape(-1, 1, 32, 32))

上面的示例中,X_train[0] 首先从 NumPy 数组转换为 PyTorch 张量,然后再被 reshape(-1, 1, 32, 32)。其中,-1 是为了适应后面多个样本的数量,这也和我们先前使用 -1 的场景一致。1 表示一个通道,其与网络卷积层 in_channels 对应,后面的 32 当然是样本张量的尺寸。

最终,单个样本输出为 torch.Size([1, 10]) 这也与我们预期的 Softmax 输出尺寸一致。

如今有了网络结构,接下来自然就是定义数据集并开始训练了。还记得前面介绍过的 PyTorch 中的 DataLoader 吗?其可以使我们很方便地读取小批量数据。如今,我们也可以将自定义的 NumPy 数组转换为 DataLoader 加载器。

制作自定义 DataLoader 数据加载器分为两步。首先,使用 torch.utils.data.TensorDataset(images, labels) 将 PyTorch 张量转换为数据集。

import torch.utils.data

# 依次传入样本和标签张量,制作训练数据集和测试数据集
train_data = torch.utils.data.TensorDataset(torch.Tensor(
    X_train), torch.Tensor(train['y']))
test_data = torch.utils.data.TensorDataset(torch.Tensor(
    X_test), torch.Tensor(test['y']))

train_data, test_data

上面的代码中,样本传入了由 X_trainX_test 转换后的 PyTorch 张量。标签则传入了未被独热编码前的 train['y']test['y'],而不是上面 TensorFlow 使用的独热编码后的 y_trainy_test。原因在于,PyTorch 的损失函数无需标签为独热编码的形式,原本的数值标签即可处理。

然后,我们使用 DataLoader 数据加载器来加载数据集,设定好 batch_size。一般,训练数据会被打乱,而测试数据无需打乱。

train_loader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=64, shuffle=False)

train_loader, test_loader

定义交叉熵损失函数和 Adam 优化器。接下来的流程和和前面 PyTorch 构建神经网络实验非常相似。

loss_fn = nn.CrossEntropyLoss()  # 交叉熵损失函数
opt = torch.optim.Adam(model.parameters(), lr=0.001)  # Adam 优化器

最后,定义训练函数。这里直接将前面的代码拿过来使用,只需要注意 images 需要被 reshape(-1, 1, 32, 32) 的形状。而 labels 需要被转换为 torch.LongTensor 类型防止报错。

def fit(epochs, model, opt):
    # 全数据集迭代 epochs 次
    print('================ Start Training =================')
    for epoch in range(epochs):
        # 从数据加载器中读取 Batch 数据开始训练
        for i, (images, labels) in enumerate(train_loader):

            images = images.reshape(-1, 1, 32, 32)  # 对特征数据展平,变成 784
            labels = labels.type(torch.LongTensor)  # 真实标签

            outputs = model(images)  # 前向传播
            loss = loss_fn(outputs, labels)  # 传入模型输出和真实标签

            opt.zero_grad()  # 优化器梯度清零,否则会累计
            loss.backward()  # 从最后 loss 开始反向传播
            opt.step()  # 优化器迭代

            # 自定义训练输出样式
            if (i+1) % 100 == 0:
                print('Epoch [{}/{}], Batch [{}/{}], Train loss: [{:.3f}]'
                      .format(epoch+1, epochs, i+1, len(train_loader), loss.item()))

        # 每个 Epoch 执行一次测试
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.reshape(-1, 1, 32, 32)
            labels = labels.type(torch.LongTensor)

            outputs = model(images)
            # 得到输出最大值 _ 及其索引 predicted
            _, predicted = torch.max(outputs.data, 1)

            correct += (predicted == labels).sum().item()  # 如果预测结果和真实值相等则计数 +1
            total += labels.size(0)  # 总测试样本数据计数

        print('============= Test accuracy: {:.3f} =============='.format(
            correct / total))

现在就可以开始训练了:

fit(epochs=2, model=model, opt=opt)

PyTorch 高阶 nn.Sequential 构建

在 PyTorch 中,使用 nn.Module 构建的网络一般也可以使用 nn.Sequential 进行重构。由于 PyTorch 中没有 Flatten 类,所以下面需要先实现一个 Flatten 再完成转换。

class Flatten(nn.Module):
    def forward(self, input):
        return input.reshape(input.size(0), -1)


# 构建 Sequential 容器结构
model_s = nn.Sequential(
    nn.Conv2d(1, 6, (5, 5), 1),
    nn.ReLU(),
    nn.AvgPool2d((2, 2)),
    nn.Conv2d(6, 16, (5, 5), 1),
    nn.ReLU(),
    nn.AvgPool2d((2, 2)),
    Flatten(),
    nn.Linear(5*5*16, 120),
    nn.ReLU(),
    nn.Linear(120, 84),
    nn.ReLU(),
    nn.Linear(84, 10),
    nn.Softmax(dim=1)
)

model_s

和前面的实验一样,我们需要重写优化器代码以实现对新的模型参数进行优化,并开始训练。

opt_s = torch.optim.Adam(model_s.parameters(), lr=0.001)  # Adam 优化器
fit(epochs=2, model=model_s, opt=opt_s)

小结

这篇文章,我们学习了使用 TensorFlow 和 PyTorch 搭建经典的 LeNet-5 卷积神经网络结构。虽然 LeNet-5 的结构十分简单,但是卷积神经网络常用的组件都包含在内,完全达到了练习的目的。大家务必要结合 TensorFlow 和 PyTorch 先前的讲解实验,搞清楚这篇文章所用到的 4 种不同方法。这对于后面能够看懂和独立搭建更为复杂的深度神经网络很有帮助。

本篇文章需 特别授权许可,内容版权归作者所有,未经授权,禁止转载。

系列文章