spikingjelly:使用单层全连接 SNN 识别 MNIST
brian2跑大型 snn 网络的效率属实是一言难尽。最近在学习 spikingjelly,希望一切顺利🙏。
spikingjelly 建议在官方文档学习使用单层全连接SNN识别MNIST — spikingjelly alpha 文档
补:有一点大家要注意,windows 的朋友们要把 device 改成 cpu 或者 cuda!
import os.path
import sys
import time
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from torch.cuda import amp
import argparse
import torchvision.datasets
from spikingjelly.activation_based import monitor, neuron, functional, layer, surrogate, encoding
from torch.utils.checkpoint import checkpoint
from torch.utils.tensorboard import SummaryWriter
from urllib3.filepost import writer
import numpy as np
class SNN(nn.Module):
    def __init__(self,tau):
        super().__init__()
        # surrogate_function是替代函数
        self.layer = nn.Sequential(
            layer.Flatten(),
            layer.Linear(28*28,10,bias=False),
            neuron.LIFNode(tau=tau,surrogate_function=surrogate.ATan())
        )
    def forward(self,x:torch.Tensor):
        return self.layer(x)
def main():
    parser = argparse.ArgumentParser(description='LIF MNIST Training')
    parser.add_argument('-T',default=100,type=int,help='步长')
    parser.add_argument('-device',default='mps',help='设备')
    parser.add_argument('-b', default=64, type=int, help='batch size')
    parser.add_argument('-epochs', default=100, type=int, metavar='N',
                        help='训练轮数')
    parser.add_argument('-j', default=4, type=int, metavar='N',
                        help='number of data loading workers (default: 4)')
    # /Users/Shared/开发/数据集,MNIST数据读取器只用到上一级目录
    parser.add_argument('-data-dir', type=str, help=' MNIST 路径')
    parser.add_argument('-out-dir', type=str, default='./logs', help='checkpoints 和 logs 输出路径')
    parser.add_argument('-resume', type=str, help='从检查点恢复路径')
    parser.add_argument('-opt', type=str, choices=['sgd', 'adam'], default='adam',
                        help='使用 sgd 或adam 优化器')
    parser.add_argument('-momentum', default=0.9, type=float, help='设置 sgd 的动量参数')
    parser.add_argument('-lr', default=1e-3, type=float, help='学习率')
    parser.add_argument('-tau', default=2.0, type=float, help=' tau')
    args = parser.parse_args()
    print(args)
    net = SNN(tau=args.tau)
    print(net)
    net.to(args.device)
    # 初始化数据加载器
    # transform后像素值从 0-255归一化到 0-1
    train_dataset = torchvision.datasets.MNIST(
        root=args.data_dir,
        train=True,
        transform=torchvision.transforms.ToTensor(),
        download=False
    )
    test_dataset = torchvision.datasets.MNIST(
        root=args.data_dir,
        train=False,
        transform=torchvision.transforms.ToTensor(),
        download=False
    )
    # pin_memory=True将数据加载到固定内存(页锁定内存)中,这
    # 样可以加快数据从 CPU 到 GPU 的传输速度,适用于使用 GPU 进行训练的情况。
    train_data_loader = data.DataLoader(
        dataset=train_dataset,
        batch_size=args.b,
        shuffle=True,
        drop_last=True,
        num_workers=args.j,
        pin_memory=True
    )
    test_data_loader = data.DataLoader(
        dataset=test_dataset,
        batch_size=args.b,
        shuffle=False,
        drop_last=False,
        num_workers=args.j,
        pin_memory=True
    )
    print('数据读取成功')
    start_epoch = 0
    max_test_acc = -1
    optimizer = None
    if args.opt == 'sgd':
        optimizer = torch.optim.SGD(net.parameters(),lr=args.lr,momentum=args.momentum)
    elif args.opt == 'adam':
        optimizer = torch.optim.Adam(net.parameters(),lr=args.lr)
    else:
        raise NotImplementedError(args.opt)
    if args.resume:
        # map_location='cpu'指定将加载的数据映射到哪个设备上
        checkpoint = torch.load(args.resume,map_location='cpu')
        net.load_state_dict(checkpoint['net'])
        optimizer.load_state_dict(checkpoint['optimizer'])
        start_epoch = checkpoint['epoch']+1
        # 从检查点文件中加载的之前达到的最高测试准确率
        max_text_acc = checkpoint['max_test_acc']
    out_dir = os.path.join(args.out_dir,f'T{args.T}_b{args.b}_lr{args.lr}')
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)
        print(f'Mkdir{out_dir}')
    # SummaryWriter:是 torch.utils.tensorboard.SummaryWriter 类的实例,
    # 用于将训练过程中的各种信息(如损失值、准确率等)写入 TensorBoard 日志文件,方便后续可视化分析。
    writer = SummaryWriter(out_dir,purge_step=start_epoch)
    with open(os.path.join(out_dir,'args.txt'),'w',encoding='utf-8') as args_txt:
        args_txt.write(str(args))
        args_txt.write('\n')
        args_txt.write(' '.join(sys.argv))
    encoder = encoding.PoissonEncoder()
    for epoch in range(start_epoch,args.epochs):
        start_time = time.time()
        # 将网络设置为训练模式
        net.train()
        train_loss = 0
        train_acc = 0
        train_samples = 0
        for img,label in train_data_loader:
            optimizer.zero_grad()
            img = img.to(args.device)
            label = label.to(args.device)
            # 将标签转为 ont-hot 编码,方便后续计算损失
            label_onehot = F.one_hot(label,10).float()
            # 计算损失并进行反向传播
            out_fr = 0.
            for t in range(args.T):
                encoded_img = encoder(img)
                out_fr += net(encoded_img)
            out_fr = out_fr/args.T
            loss = F.mse_loss(out_fr,label_onehot)
            loss.backward()
            optimizer.step()
            # label.numel()是当前批次的样本数量
            # loss.item() 是当前批次的损失值,将其乘以样本数量并累加到 train_loss 中。
            # (out_fr.argmax(1) == label).float().sum().item()
            # 计算当前批次的正确预测数量,将其累加到 train_acc 中。
            train_samples += label.numel()
            train_loss += loss.item() * label.numel()
            train_acc += (out_fr.argmax(1) == label).float().sum().item()
            # 重置模型状态
            functional.reset_net(net)
        train_time = time.time()
        train_speed = train_samples / (train_time - start_time)
        train_loss /= train_samples
        train_acc /= train_samples
        writer.add_scalar('train_loss', train_loss, epoch)
        writer.add_scalar('train_acc', train_acc, epoch)
        net.eval()
        test_loss = 0
        test_acc = 0
        test_samples = 0
        with torch.no_grad():
            for img, label in test_data_loader:
                img = img.to(args.device)
                label = label.to(args.device)
                label_onehot = F.one_hot(label, 10).float()
                out_fr = 0.
                for t in range(args.T):
                    encoded_img = encoder(img)
                    out_fr += net(encoded_img)
                out_fr = out_fr / args.T
                loss = F.mse_loss(out_fr, label_onehot)
                test_samples += label.numel()
                test_loss += loss.item() * label.numel()
                test_acc += (out_fr.argmax(1) == label).float().sum().item()
                functional.reset_net(net)
        test_time = time.time()
        test_speed = test_samples / (test_time - train_time)
        test_loss /= test_samples
        test_acc /= test_samples
        writer.add_scalar('test_loss', test_loss, epoch)
        writer.add_scalar('test_acc', test_acc, epoch)
        save_max = False
        if test_acc > max_test_acc:
            max_test_acc = test_acc
            save_max = True
        checkpoint = {
            'net': net.state_dict(),
            'optimizer': optimizer.state_dict(),
            'epoch': epoch,
            'max_test_acc': max_test_acc
        }
        if save_max:
            torch.save(checkpoint, os.path.join(out_dir, 'checkpoint_max.pth'))
        print(args)
        print(out_dir)
        print(
            f'epoch ={epoch}, train_loss ={train_loss: .4f}, train_acc ={train_acc: .4f}, test_loss ={test_loss: .4f}, test_acc ={test_acc: .4f}, max_test_acc ={max_test_acc: .4f}')
        print(f'train speed ={train_speed: .4f} images/s, test speed ={test_speed: .4f} images/s')
        print(
            f'escape time = {(datetime.datetime.now() + datetime.timedelta(seconds=(time.time() - start_time) * (args.epochs - epoch))).strftime("%Y-%m-%d %H:%M:%S")}\n')
if __name__ == '__main__':
    main() 
                