基于PyTorch实现的MNIST手写数字识别神经网络笔记
神经网络配置
class Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 32, 3, 1) # 输入通道1,输出32,3x3卷积,步长1self.conv2 = nn.Conv2d(32, 64, 3, 1) # 输入通道32,输出64,3x3卷积,步长1self.dropout1 = nn.Dropout(0.25) # 25%的dropoutself.dropout2 = nn.Dropout(0.5) # 50%的dropoutself.fc1 = nn.Linear(9216, 128) # 全连接层,9216输入,128输出self.fc2 = nn.Linear(128, 10) # 全连接层,128输入,10输出(10个数字)
1. 第一层卷积
self.conv1 = nn.Conv2d(1, 32, 3, 1)
1:输入通道数(黑白图片只有1个颜色通道)
32:输出通道数(产生32个不同的特征图)
3:卷积核大小(3x3的小窗口在图片上滑动)
1:步长(每次移动1个像素)
作用:从原始图片中提取基础特征,如边缘、线条等
2. 第二层卷积
self.conv2 = nn.Conv2d(32, 64, 3, 1)
32:输入通道数(接收上一层的32个特征图)
64:输出通道数(产生64个更复杂的特征图)
作用:提取更高级的特征,如形状、图案等
3. Dropout层(防止过拟合)
self.dropout1 = nn.Dropout(0.25) # 随机丢弃25%的神经元
self.dropout2 = nn.Dropout(0.5) # 随机丢弃50%的神经元
作用:像学生考试时不能只背答案一样,防止模型"死记硬背"训练数据
4. 全连接层
self.fc1 = nn.Linear(9216, 128) # 9216 → 128
self.fc2 = nn.Linear(128, 10) # 128 → 10
9216:经过卷积和池化后展平的特征数量
128:隐藏层神经元数量(自己来设定)
10:输出10个数字(0-9)的概率
9216的计算:
输出尺寸 = (输入尺寸 - 卷积核大小 + 2×填充) / 步长 + 1
第一层卷积后(1, 32, 3, 1):
输入尺寸:28,图片的大小28×28
输出形状:32×26×26(32个通道,每个26×26)
第二层卷积后(32, 64, 3, 1):
输出形状:64×24×24(64个通道,每个24×24)
池化层后:
x = F.max_pool2d(x, 2) # 2x2最大池化,步长默认为2
步长默认与池化窗口大小相同
输出尺寸 = (输入尺寸 + 2×填充 - 池化窗口大小) / 步长 + 1
输出形状:64×12×12(64个通道,每个12×12)
展平后:
64 × 12 × 12 = 9216
前向传播流程:
def forward(self, x):x = self.conv1(x)x = F.relu(x)x = self.conv2(x)x = F.relu(x)x = F.max_pool2d(x, 2)x = self.dropout1(x)x = torch.flatten(x, 1)x = self.fc1(x)x = F.relu(x)x = self.dropout2(x)x = self.fc2(x)output = F.log_softmax(x, dim=1)return output
def forward(self, x):# x 输入形状: [batch_size, 1, 28, 28] - 批次大小×1通道×28×28像素
x 输入形状: [batch_size, 1, 28, 28] - 批次大小×1通道×28×28像素
1. 第一层卷积 + 激活:
x = self.conv1(x) # 卷积操作,提取低级特征
x = F.relu(x) # ReLU激活函数,引入非线性
作用:从原始图片中检测边缘、线条等基础特征
2. 第二层卷积 + 激活:
x = self.conv2(x) # 进一步卷积,提取更复杂特征
x = F.relu(x) # 再次激活
作用:组合基础特征,检测更复杂的形状和图案
3. 池化层:
x = F.max_pool2d(x, 2) # 2×2最大池化
作用:
降低特征图尺寸(减少计算量)
保留最显著的特征
增强模型对位置变化的鲁棒性
4. 第一个Dropout:
x = self.dropout1(x) # 25%的神经元随机失活
作用:防止过拟合,让模型不过度依赖某些特定特征
5. 展平操作:
x = torch.flatten(x, 1) # 从第1维开始展平(保持批次维度)
作用:将二维特征图转换为一维向量,准备输入全连接层
6. 第一个全连接层 + 激活:
x = self.fc1(x) # 全连接层,9216 → 128
x = F.relu(x) # 激活函数
作用:进行高层次的特征组合和推理
7. 第二个Dropout:
x = self.dropout2(x) # 50%的神经元随机失活
作用:在全连接层进一步防止过拟合
8. 输出层:
x = self.fc2(x) # 全连接层,128 → 10
作用:输出10个数字类别的原始得分(logits)
9. 最终输出:
output = F.log_softmax(x, dim=1) # 沿类别维度计算log_softmax
return output
作用:
将原始得分转换为概率形式
使用log_softmax是为了数值稳定性
输出形状:
[batch_size, 10]
- 每个样本对应10个数字的概率行:
batch_size,列:10
训练函数
def train(args, model, device, train_loader, optimizer, epoch):model.train() # 设置为训练模式for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad() # 梯度清零output = model(data) # 前向传播loss = F.nll_loss(output, target) # 计算损失loss.backward() # 反向传播optimizer.step() # 更新参数# 打印训练进度if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))if args.dry_run:break
函数参数说明:
def train(args, model, device, train_loader, optimizer, epoch):
args
:训练参数(学习率、批次大小等)model
:要训练的神经网络模型device
:训练设备(CPU或GPU)train_loader
:训练数据加载器optimizer
:优化器(如SGD、Adam)epoch
:当前训练轮次
设置训练模式:
model.train()
作用:告诉模型现在是训练模式,这会:
启用Dropout层(让部分神经元随机失活)
启用BatchNorm层的统计量更新
让模型知道需要计算梯度
遍历训练数据:
for batch_idx, (data, target) in enumerate(train_loader):
train_loader
:每次提供一个批次(batch)的数据data
:图片数据,形状[batch_size, 1, 28, 28]
target
:真实标签,形状[batch_size]
,如[7, 2, 1, ..., 9]
batch_idx
:批次索引(0, 1, 2, ...)
数据转移到设备:
data, target = data.to(device), target.to(device)
作用:将数据移动到GPU或CPU上进行计算,加速训练。
梯度清零:
optimizer.zero_grad()
重要:在每次计算新梯度前,必须清空之前的梯度。
如果不清零,梯度会累积,导致训练不稳定
就像做数学题时,每次要擦掉黑板上的旧计算
前向传播:
output = model(data)
作用:让数据通过整个神经网络,得到预测结果。
输入:
data
(图片)输出:
output
(10个数字的概率),形状[batch_size, 10]
batch_size=64
输出形状: [64, 10]
[[-2.1, -1.3, -0.5, -3.2, -4.1, -5.0, -1.8, -2.9, -0.9, -1.1], ← 第1张图片的10个概率[-3.2, -0.2, -4.1, -5.0, -2.1, -1.8, -1.1, -2.9, -0.9, -1.3], ← 第2张图片的10个概率[-1.8, -2.1, -0.9, -3.2, -4.1, -5.0, -1.3, -2.9, -0.5, -1.1], ← 第3张图片的10个概率...... ← 第64张图片的10个概率
]
计算损失:
loss = F.nll_loss(output, target)
作用:计算预测值与真实值的差距。
output
:模型预测的概率[batch_size, 10]
target
:真实标签[batch_size]
nll_loss
:负对数似然损失,适合与log_softmax
配合使用
反向传播:
loss.backward()
关键步骤:自动计算所有参数的梯度。
从损失值开始,反向计算每个权重需要如何调整
PyTorch自动完成链式求导
结果:每个参数都有了对应的梯度值
参数更新:
optimizer.step()
作用:根据梯度更新模型参数。
使用优化算法(如SGD、Adam)来调整权重
公式大致为:
新权重 = 旧权重 - 学习率 × 梯度
循环每个批次:
1. 取数据 → 2. 清空梯度 → 3. 前向计算 → 4. 计算损失↓
5. 反向传播 → 6. 更新参数 → 7. 重复...
打印进度信息:
if batch_idx % args.log_interval == 0:
batch_idx
:当前批次的索引(0, 1, 2, ...)args.log_interval
:日志间隔,比如设置为10%
:取模运算符,计算余数作用:每处理
log_interval
个批次就打印一次日志
如果 log_interval = 10,那么:
batch_idx = 0 → 0 % 10 = 0 → 打印
batch_idx = 1 → 1 % 10 = 1 → 不打印
...
batch_idx = 10 → 10 % 10 = 0 → 打印
batch_idx = 20 → 20 % 10 = 0 → 打印
格式化打印训练信息:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))
Train Epoch: {}
:当前训练轮次[{}/{}]
:已处理样本数 / 总样本数batch_idx * len(data)
:已处理样本数batch_idx
:已完成的批次数量len(data)
:每个批次的样本数(batch_size)结果:已处理的样本总数
len(train_loader.dataset)
:训练集总样本数
({:.0f}%)
:训练进度百分比100. * batch_idx / len(train_loader)
:已完成批次的百分比
Loss: {:.6f}
:当前批次的损失值,保留6位小数loss.item()
:从张量中提取数值
快速检查模式:
if args.dry_run:break
args.dry_run
:快速运行标志(通常用于调试)作用:如果启用dry_run模式,在第一次打印日志后就跳出训练循环
Train Epoch: 1 [0/60000 (0%)] Loss: 2.301245 ← 第0个批次
Train Epoch: 1 [640/60000 (1%)] Loss: 0.456123 ← 第10个批次 (10×64=640)
Train Epoch: 1 [1280/60000 (2%)] Loss: 0.234567 ← 第20个批次 (20×64=1280)
...
测试函数:
def test(model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch losspred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probabilitycorrect += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))
设置模型为评估模式:
model.eval()
作用:
禁用Dropout层(使用所有神经元)
固定BatchNorm层的统计量
确保测试结果的一致性
初始化统计变量:
test_loss = 0 # 累计总损失
correct = 0 # 累计正确预测的样本数
关闭梯度计算:
with torch.no_grad():
重要作用:
大幅减少内存使用
加速计算过程
防止在测试时意外更新模型参数
遍历测试数据:
for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)
test_loader
:每次提供一个批次的测试数据data
:图片数据,形状[batch_size, 1, 28, 28]
target
:真实标签,形状[batch_size]
output
:模型预测结果,形状[batch_size, 10]
计算并累加损失:
test_loss += F.nll_loss(output, target, reduction='sum').item()
F.nll_loss
:负对数似然损失reduction='sum'
:计算批次内所有样本的损失总和.item()
:将张量转换为Python数值+=
:累加到总损失中
# 假设有3个批次,每个批次的损失:
批次1损失:15.6
批次2损失:12.3
批次3损失:14.1
test_loss = 15.6 + 12.3 + 14.1 = 41.0
获取预测结果:
pred = output.argmax(dim=1, keepdim=True)
作用:找到每个样本预测概率最大的类别
# output 包含10个数字的概率
output = [[-2.1, -1.3, -0.5, -3.2, ...], # 样本1[-3.2, -0.2, -4.1, -5.0, ...]] # 样本2# argmax(dim=1) 找到每行最大值的索引
pred = [[2], # 样本1预测为数字2[1]] # 样本2预测为数字1
统计正确预测数:
correct += pred.eq(target.view_as(pred)).sum().item()
调整标签形状:
target.view_as(pred)
target
原始:[2]
→[2, 1]
(与pred形状一致)如:
[2, 1]
→[[2], [1]]
比较预测和真实值:
pred.eq(target.view_as(pred))
[[True], # 样本1:预测2 == 真实2
[True]] # 样本2:预测1 == 真实1
统计正确数量:
.sum().item() # 统计True的数量,转换为数值
主函数配置
def main():# Training settings# 创建参数解析器,用于处理命令行参数parser = argparse.ArgumentParser(description='PyTorch MNIST Example')# 加这一行,把 Jupyter 偷偷塞进来的 -f 接住并忽略(避免在Jupyter中运行时出错)parser.add_argument('-f', '--file', help='kernel json file for IPython')# 添加训练参数配置parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=14, metavar='N',help='number of epochs to train (default: 14)')parser.add_argument('--lr', type=float, default=1.0, metavar='LR',help='learning rate (default: 1.0)')parser.add_argument('--gamma', type=float, default=0.7, metavar='M',help='Learning rate step gamma (default: 0.7)')# 添加功能开关参数(action='store_true'表示存在该参数即为True)parser.add_argument('--no-accel', action='store_true',help='disables accelerator') # 禁用GPU加速parser.add_argument('--dry-run', action='store_true',help='quickly check a single pass') # 快速测试模式(只跑一个批次)parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)') # 随机种子,保证结果可复现parser.add_argument('--log-interval', type=int, default=10, metavar='N',help='how many batches to wait before logging training status') # 日志打印间隔parser.add_argument('--save-model', action='store_true', help='For Saving the current Model') # 是否保存训练好的模型# 解析命令行参数args = parser.parse_args()# 判断是否使用加速器(GPU等)use_accel = not args.no_accel and torch.accelerator.is_available()# 设置随机种子,保证每次运行结果一致torch.manual_seed(args.seed)# 设置训练设备(GPU或CPU)if use_accel:device = torch.accelerator.current_accelerator() # 使用加速器else:device = torch.device("cpu") # 使用CPU# 配置训练和测试的数据加载参数train_kwargs = {'batch_size': args.batch_size}test_kwargs = {'batch_size': args.test_batch_size}# 如果使用加速器,添加额外的数据加载优化参数if use_accel:accel_kwargs = {'num_workers': 1, # 数据加载的进程数'persistent_workers': True, # 保持worker进程,避免重复创建'pin_memory': True, # 使用锁页内存,加速GPU数据传输'shuffle': True} # 打乱训练数据顺序train_kwargs.update(accel_kwargs) # 更新训练参数test_kwargs.update(accel_kwargs) # 更新测试参数# 定义数据预处理流程transform=transforms.Compose([transforms.ToTensor(), # 将PIL图像转换为Tensor,并归一化到[0,1]transforms.Normalize((0.1307,), (0.3081,)) # 使用MNIST数据集的均值和标准差进行标准化])# 加载MNIST数据集dataset1 = datasets.MNIST('../data', train=True, download=True,transform=transform) # 训练集dataset2 = datasets.MNIST('../data', train=False,transform=transform) # 测试集# 创建数据加载器train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) # 训练数据加载器test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) # 测试数据加载器# 初始化模型并移动到指定设备model = Net().to(device)# 定义优化器(Adadelta优化器)optimizer = optim.Adadelta(model.parameters(), lr=args.lr)# 定义学习率调度器(每个epoch后按gamma比例降低学习率)scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)# 开始训练循环for epoch in range(1, args.epochs + 1):train(args, model, device, train_loader, optimizer, epoch) # 训练一个epochtest(model, device, test_loader) # 在测试集上评估模型性能scheduler.step() # 更新学习率# 如果设置了保存模型参数,则保存模型权重if args.save_model:torch.save(model.state_dict(), "mnist_cnn.pt") # 只保存模型参数,不保存整个模型结构# 程序入口:当直接运行此脚本时执行main函数
if __name__ == '__main__':main()
全部程序
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLRclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 32, 3, 1)self.conv2 = nn.Conv2d(32, 64, 3, 1)self.dropout1 = nn.Dropout(0.25)self.dropout2 = nn.Dropout(0.5)self.fc1 = nn.Linear(9216, 128)self.fc2 = nn.Linear(128, 10)def forward(self, x):x = self.conv1(x)x = F.relu(x)x = self.conv2(x)x = F.relu(x)x = F.max_pool2d(x, 2)x = self.dropout1(x)x = torch.flatten(x, 1)x = self.fc1(x)x = F.relu(x)x = self.dropout2(x)x = self.fc2(x)output = F.log_softmax(x, dim=1)return outputdef train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))if args.dry_run:breakdef test(model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch losspred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probabilitycorrect += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')# 加这一行,把 Jupyter 偷偷塞进来的 -f 接住并忽略parser.add_argument('-f', '--file', help='kernel json file for IPython')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=14, metavar='N',help='number of epochs to train (default: 14)')parser.add_argument('--lr', type=float, default=1.0, metavar='LR',help='learning rate (default: 1.0)')parser.add_argument('--gamma', type=float, default=0.7, metavar='M',help='Learning rate step gamma (default: 0.7)')parser.add_argument('--no-accel', action='store_true',help='disables accelerator')parser.add_argument('--dry-run', action='store_true',help='quickly check a single pass')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=10, metavar='N',help='how many batches to wait before logging training status')parser.add_argument('--save-model', action='store_true', help='For Saving the current Model')args = parser.parse_args()use_accel = not args.no_accel and torch.accelerator.is_available()torch.manual_seed(args.seed)if use_accel:device = torch.accelerator.current_accelerator()else:device = torch.device("cpu")train_kwargs = {'batch_size': args.batch_size}test_kwargs = {'batch_size': args.test_batch_size}if use_accel:accel_kwargs = {'num_workers': 1,'persistent_workers': True,'pin_memory': True,'shuffle': True}train_kwargs.update(accel_kwargs)test_kwargs.update(accel_kwargs)transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])dataset1 = datasets.MNIST('../data', train=True, download=True,transform=transform)dataset2 = datasets.MNIST('../data', train=False,transform=transform)train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)model = Net().to(device)optimizer = optim.Adadelta(model.parameters(), lr=args.lr)scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)for epoch in range(1, args.epochs + 1):train(args, model, device, train_loader, optimizer, epoch)test(model, device, test_loader)scheduler.step()if args.save_model:torch.save(model.state_dict(), "mnist_cnn.pt")if __name__ == '__main__':main()