MNIST DDP 分布式数据并行
Distributed Data Parallel
转自我的个人博客:https://shar-pen.github.io/2025/05/04/torch-distributed-series/3.MNIST_DDP/
The difference between DistributedDataParallel and DataParallel is: DistributedDataParallel uses multiprocessing where a process is created for each GPU, while DataParallel uses multithreading.
By using multiprocessing, each GPU has its dedicated process, this avoids the performance overhead caused by GIL of Python interpreter.
- DDP vs DP 的并发模式
-
DDP使用的是多进程multiprocessing
- 每个 GPU 对应一个独立的 Python 进程。
- 各 GPU/进程之间通过通信(比如 NCCL)同步梯度。
- 进程之间可以并行,每个进程独占一个 GPU,自由度高、效率高。
-
DP 使用的是多线程(multithreading)
- 一个 Python 主进程控制多个线程,每个线程对应一个 GPU 上的模型副本。
- 所有线程共享同一个 Python 解释器(主进程中的 GIL 环境)。
- 在多线程环境下,同一时刻只能有一个线程执行 Python 字节码
- GIL 的性能问题
Python 有个限制叫 GIL(Global Interpreter Lock):
- 在 Python 中,进程之间可以并行,线程之间只能并发。
- 在多线程环境下,同一时刻只能有一个线程执行 Python 字节码。
- 这意味着虽然多个线程运行在不同 GPU 上,但只要你涉及到 Python 层的逻辑(如 forward 调度、数据调度),就会被 GIL 限制,造成瓶颈。
DDP 的多进程模式就天然绕开了 GIL,每个进程有独立的 Python 解释器和 GIL,不会互相争抢锁。所以执行速度更快、效率更高、更适合大模型和多 GPU 并行。
To use DistributedDataParallel on a host with N GPUs, you should spawn up N processes, ensuring that each process exclusively works on a single GPU from 0 to N-1.
总结下,DDP 用多进程给每个 GPU 配一个独立的进程,这样就不用多个线程去抢 Python 的 GIL,避免了 DataParallel 由于多线程带来的性能开销。
分布式数据并行时,模型(model parameters)/优化器(optimizer states)每张卡都会拷贝一份(replicas),在整个训练过程中 DDP 始终在卡间维持着模型参数和优化器状态的同步一致性;
DDP 将 batch input,通过 DistributedSampler split & 分发到不同的 gpus 上,此时虽然模型/optimizer 相同,但因为数据输入不同,导致 loss 不同,反向传播时计算到的梯度也会不同,如何保证卡间,model/optimizer 的同步一致性,之前 DP 用的 parameter server,而它的问题就在于通信压力都在 server,所以 DDP 对这方面的改进是 ring all-reduce algorithm,将 Server 上的通讯压力均衡转到各个 Worker 上
注意有两个核心概念:
- All to one:reduce
- one to All:broadcast
方法名 | 通信结构 | 通信路径 | 数据流向 | 聚合策略 | 通信瓶颈位置 | 通信效率 | 适合场景 |
---|---|---|---|---|---|---|---|
Parameter Server | 中心化(星型) | 所有 Worker ⇄ PS | 上传全部梯度 → 聚合 → 下发参数 | PS 聚合 | PS 带宽和计算压力 | ❌ 低(集中式瓶颈) | 小规模训练,原型实验 |
Tree All-Reduce | 层次化(树型) | 节点间按树结构上传/下传 | 层层上传聚合 → 再层层广播 | 层次加和 & 广播 | 上层节点(树根) | ✅ 中( log N \log N logN 轮次) | 多机多卡,合理拓扑连接 |
Broadcast + Reduce | 两阶段(集中) | 所有 → 主节点(reduce) → 所有 | 所有上传 → 中心聚合 → 广播下发 | 单节点聚合 | 主节点 | ❌ 低 | 小规模单机多卡 |
Ring All-Reduce | 环形(对称) | 相邻节点之间点对点传输 | 均匀传递/聚合,每轮处理一块数据 | 分块加和 & 拼接 | 无集中瓶颈 | ✅✅ 高(带宽最优) | 大规模 GPU 并行,主流方案 |
Parameter Server(PS)和 Broadcast + Reduce 在通信机制上本质相似,区别只在于:
- PS 是显式设计了专门的“参数服务器”角色;
- Broadcast + Reduce 是“隐式指定”某个节点承担聚合与广播任务。
ring all-reduce:
- Reduce-scatter:首先将 gradient 分为 n 块,在第 i 轮 (0<= i < n-1),每个 gpu j 把 第 (i+j) % n 块的数据传给下一个 gpu (j+1 % n),即每个 gpu 都把自己一个块给下一个做加法,在 n 轮结束后,每个 gpu 上都有一个块是完整的聚合了所有不同 gpu 的 gradient。
- All-gather: 将每个 gpu 上的完整聚合后的 gradient 依次传给下一个 gpu,再传递 n-1 次就使所有 gpu 的每块 gradient 都是完整聚合的数据。
虽然传递的数据量还是和 PS 一样,但传输压力平均到每个 gpu 上,不需要单个 worker 承担明显大的压力。
概念/参数名 | 中文含义 | 含义解释 | 示例(2节点 × 每节点4GPU) |
---|---|---|---|
world | 全局进程空间 | 指整个分布式系统中参与训练的所有进程总和 | 2 节点 × 4 GPU = 8 个进程 |
world_size | 全局进程数 | world 中的进程总数,参与通信、同步、梯度聚合的总 worker 数 | 8 |
rank | 全局进程编号 | 当前进程在 world 中的唯一编号,范围是 [ 0 , world_size − 1 ] [0, \text{world\_size} - 1] [0,world_size−1] | 第1节点是 0~3,第2节点是 4~7 |
node | 物理节点/机器 | 实际的服务器或物理机,每个节点运行多个进程,通常对应一台机器 | 2台服务器(假设每台4 GPU) |
node_rank | 节点编号 | 当前节点在所有节点中的编号,通常用于标识不同机器 | 第1台是 0,第2台是 1 |
local_rank | 本地GPU编号 | 当前进程在所在节点上的 GPU 编号,绑定 cuda(local_rank) | 每台机器上分别为 0~3 |
简洁点,world 代表所有服务器上的 gpu,rank 代表 world 视角下的 gpu 编号;node 代表某个具体的服务器,node_rank 代表 world 视角下的 node 编号,local_rank 代表 node 视角下的 gpu 编号。
引入 DDP 相关库
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader# 以下是分布式相关的
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler # 分发数据集
from torch.nn.parallel import DistributedDataParallel as DDP # 用 DDP 封装 module 以支持分布式训练
from torch.distributed import init_process_group, destroy_process_group # 初始化和销毁进程组,一个 process 代表一个 gpu 进程
ddp 对原始代码的修改
参数 | 作用说明 |
---|---|
MASTER_ADDR | 指定 主节点(rank=0 所在节点)的 IP 地址或主机名,作为所有进程连接的“服务器” |
MASTER_PORT | 指定主节点上用于通信监听的端口号,所有进程都通过这个端口进行连接与协调 |
为什么只需要指定主节点的地址和端口?所有进程必须“集合”在一起组成一个通信组(process group);这个过程需要一个 协调者,就像组织会议需要一个人发出会议链接一样;PyTorch DDP 把这个协调角色交给 rank == 0 的进程(主节点);其它进程只需要“知道去哪找这个协调者”就能完成初始化。
主节点负责协调组网,在 DDP 初始化时,所有节点主动连接主节点,每个节点都会告知主节点自己的地址和端口,主节点收集所有其他进程的网络信息,构建全局通信拓扑,将通信配置信息广播回每个进程,包括每个 rank 要连接哪些 peer,这样每个进程就可以进行后续的双向传输,而不再依赖主节点作为中转。
主节点(rank 0) | 工作节点(rank 1,2,…) |
---|---|
在 MASTER_PORT 启动一个监听服务(如 TCP server) | 主动连接 MASTER_ADDR:MASTER_PORT |
监听并接受连接,记录加入者信息 | 与主节点握手,注册自己的 rank 、地址等 |
构建通信拓扑,如 Ring 或 NCCL 分组等 | 一旦接入,就获得组网配置,与其他 worker 点对点通信 |
ddp 初始化和销毁进程
def ddp_setup(rank, world_size):"""Args:rank: Unique identifier of each processworld_size: Total number of processes"""# rank 0 processos.environ["MASTER_ADDR"] = "localhost"os.environ["MASTER_PORT"] = "12355"# nccl:NVIDIA Collective Communication Library # 分布式情况下的,gpus 间通信torch.cuda.set_device(rank)init_process_group(backend="nccl", rank=rank, world_size=world_size)
DDP 会在每个 GPU 上运行一个进程,每个进程中都有一套完全相同的 Trainer 副本(包括 model 和 optimizer),各个进程之间通过一个进程池进行通信。
ddp 包装 model
训练函数不需要多大的修改,使用 DistributedDataParallel 包装模型,这样模型才能在各个进程间同步参数。包装后 model 变成了一个 DDP 对象,要访问其参数得这样写 self.model.module.state_dict()
运行过程中单独控制某个进程进行某些操作,比如要想保存 ckpt,由于每张卡里都有完整的模型参数,所以只需要控制一个进程保存即可。需要注意的是:使用 DDP 改写的代码会在每个 GPU 上各自运行,因此需要在程序中获取当前 GPU 的 rank(gpu_id),这样才能对针对性地控制各个 GPU 的行为。
class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int, ) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_data self.optimizer = optimizerself.save_every = save_everyself.model = DDP(model, device_ids=[gpu_id]) # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_dataloader.sampler.set_epoch(epoch) # 注意需要在各 epoch 入口调用该 sampler 对象的 set_epoch() 方法,否则每个 epoch 加载的样本顺序都不变for source, targets in self.train_data:source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp = self.model.state_dict()PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)
在程序入口初始化进程池;在程序出口销毁进程池
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池ddp_setup(rank, world_size)# 进行训练dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()
DistributedSampler
构造 Dataloader 时使用 DistributedSampler 作为 sampler,这个采样器可以自动将数量为 batch_size 的数据分发到各个GPU上,并保证数据不重叠。理解是可以是这样的,但实际是根据 rank 让每个 gpu 能索引到的数据不一样,每个 gpu 上也是有重复的 Dataloader 的,但每个gpu 上 rank 设置不同,Dataloader sample 先根据 shuffle 打乱顺序,再控制不同 rank 能索引到的数据,以实现类似分发的效果。
Rank 0 sees: [4, 7, 3, 0, 6] Rank 1 sees: [1, 5, 9, 8, 2]
def prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False, # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)
set_epoch(epoch) 用于设置当前训练 epoch,以确保在分布式训练中 每个进程对数据的打乱顺序一致,从而保证每个 rank 分到的数据是互不重叠且可复现的。
当 DistributedSampler 的 shuffle=True 时,它在每个 epoch 会用 torch.Generator().manual_seed(seed) 生成新的随机索引顺序。
但:
- 如果不调用 set_epoch(),每个进程将使用相同的默认种子;
- 会导致每个 epoch 每个进程打乱后的样本索引相同 → 重复取样,每个 epoch 的训练数据都一样 → 训练不正确!
你确实可以不手动设置 rank 和 world_size,因为 DistributedSampler 会自动从环境变量中获取它们。
如果你不传入 rank 和 num_replicas,PyTorch 会调用:
- torch.distributed.get_world_size() # 获取 world_size
- torch.distributed.get_rank() # 获取当前进程 rank
import torch
from torch.utils.data import Dataset, DataLoader, DistributedSampler# 自定义一个简单的数据集:返回 [0, 1, ..., n-1]
class RangeDataset(Dataset):def __init__(self, n):self.data = list(range(n))def __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx]# 模拟两张卡(进程)下的样本访问情况,并支持 set_epoch
def simulate_distributed_sampler(n=10, world_size=2, num_epochs=2):dataset = RangeDataset(n)for epoch in range(num_epochs):print(f"\nEpoch {epoch}")for rank in range(world_size):# 设置 shuffle=True 并调用 set_epochsampler = DistributedSampler(dataset,num_replicas=world_size,rank=rank,shuffle=True,)sampler.set_epoch(epoch) # 关键:确保每轮不同但在所有 rank 一致dataloader = DataLoader(dataset, batch_size=1, sampler=sampler)data_seen = [batch[0].item() for batch in dataloader]print(f"Rank {rank} sees: {data_seen}")simulate_distributed_sampler(n=10, world_size=2, num_epochs=2)
Epoch 0
Rank 0 sees: [4, 7, 3, 0, 6]
Rank 1 sees: [1, 5, 9, 8, 2]Epoch 1
Rank 0 sees: [5, 1, 0, 9, 7]
Rank 1 sees: [6, 2, 8, 3, 4]
multiprocessing.spawn 创建多卡进程
使用 torch.multiprocessing.spawn 方法将代码分发到各个 GPU 的进程中执行。在当前机器上启动 nprocs=world_size 个子进程,每个进程执行一次 main() 函数,并由 mp.spawn 自动赋值第一个参数(目的是执行 nprocs 个进程,第一个参数为 0 ~ nprocs-1)。
def start_process(i):# Each process is assigned a file to write tracebacks to. We# use the file being non-empty to indicate an exception# occurred (vs an expected shutdown). Note: this previously# used a multiprocessing.Queue but that can be prone to# deadlocks, so we went with a simpler solution for a one-shot# message between processes.tf = tempfile.NamedTemporaryFile(prefix="pytorch-errorfile-", suffix=".pickle", delete=False)tf.close()os.unlink(tf.name)process = mp.Process(target=_wrap,args=(fn, i, args, tf.name),daemon=daemon,)process.start()return i, process, tf.nameif not start_parallel:for i in range(nprocs):idx, process, tf_name = start_process(i)error_files[idx] = tf_nameprocesses[idx] = process
可以执行执行以下代码,它展现了 mp 创建进程的效果
import torch.multiprocessing as mpdef run(rank, message):print(f"[Rank {rank}] Received message: {message}")if __name__ == "__main__":world_size = 4 # 启动 4 个进程(模拟 4 个GPU)mp.spawn(fn=run,args=("hello world",), # 注意是 tuple 格式nprocs=world_size,join=True)
效果为:
[Rank 0] Received message: hello world
[Rank 3] Received message: hello world
[Rank 2] Received message: hello world
[Rank 1] Received message: hello world
# 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args
# 注意不需要传入 fn 的 rank 参数,它由 mp.spawn 自动分配
import multiprocessing as mp
world_size = torch.cuda.device_count()
mp.spawn(fn=main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size
)!CUDA_VISIBLE_DEIVES=0,1 python mnist_ddp.py
torchrun
torchrun 是 PyTorch 官方推荐的分布式训练启动工具,它用来 自动管理多进程启动、环境变量传递和通信初始化,替代早期的 torch.distributed.launch 工具。
- 它帮你在每个 GPU 上自动启动一个训练进程;
- 它设置好 DDP 所需的环境变量(如 RANK, WORLD_SIZE, LOCAL_RANK, MASTER_ADDR 等);
- 它会自动将这些参数传递给你的脚本中的 torch.distributed.init_process_group()。
torchrun == python -m torch.distributed.launch --use-env
参数名 | 类型 | 说明 |
---|---|---|
--nproc_per_node | int | 每台机器上启动的进程数(默认值为 1) |
--nnodes | int | 总节点(机器)数 |
--node_rank | int | 当前节点编号(范围:0 ~ nnodes-1 ) |
--rdzv_backend | str | rendezvous 后端(默认 c10d ,一般不改) |
--rdzv_endpoint | str | rendezvous 主地址和端口,格式如 "localhost:29500" |
--rdzv_id | str | 作业唯一标识,默认 "default" |
--rdzv_conf | str | 可选的 kv 参数,用逗号分隔,如 "key1=val1,key2=val2" |
--max_restarts | int | 失败时最多重启次数(默认 3) |
--monitor_interval | float | monitor 进程检查的间隔(秒) |
--run_path | str | 若脚本是模块路径形式,比如 my_module.train ,则用此代替 script |
--tee | str | 控制日志输出,可选值为 "stdout" 或 "stderr" |
--log_dir | str | 日志输出目录(默认当前目录) |
--redirects | str | 重定向日志,可选:all , none , rank ,如 all:stdout |
--no_python | flag | 若已是 Python 脚本(不用再次 python 调用),可加这个 flag |
以上的 rendezvous 是每个进程通过 rendezvous 找到主节点,然后加入。之后的通信阶段用 backend, 即 NCCL,在 init_process_group 设置。
最常见的几个参数的用法是
torchrun \--nproc_per_node=4 \--nnodes=1 \--node_rank=0 \--rdzv_endpoint=localhost:29500 \your_script.py
对比下是否使用 torchrun 时的行为差别
两种 DDP 启动模式的关键区别
对比项 | 不使用 torchrun (手动) | 使用 torchrun (推荐方式) |
---|---|---|
启动方式 | 使用 mp.spawn(fn, ...) | 使用 torchrun --nproc_per_node=N |
rank , world_size 设置方式 | 手动传入(通过 spawn 的参数) | 自动由 torchrun 设置环境变量 |
主节点地址 / 端口 | 你必须手动设置 MASTER_ADDR/PORT | torchrun 会自动设置这些环境变量 |
是否需控制进程数量 | 手动使用 spawn 创建 | 自动由 torchrun 创建 |
是否读取环境变量 | ❌ 默认不会 | ✅ 自动从环境变量中读取(如 RANK , LOCAL_RANK ) |
脚本能否直接运行(python train.py ) | ❌ 通常不行,需要多进程协调 | ✅ 支持直接 torchrun train.py 运行 |
是否适用于多机 | ❌ 手动处理跨节点逻辑 | ✅ 内建 --nnodes , --node_rank , 可跨机运行 |
init_process_group()
的行为
情况 | 说明 |
---|---|
手动传 rank 和 world_size | 常用于 mp.spawn 场景(你在代码里传了参数) |
不传,内部读取环境变量 | 如果你用的是 torchrun ,环境变量如 RANK 、WORLD_SIZE 自动设置了 |
不传又没用 torchrun | ❌ 报错:因为 init_process_group 找不到必要的通信信息 |
当你运行:
torchrun --nproc_per_node=4 --rdzv_endpoint=localhost:29500 train.py
它在后台自动设置了以下环境变量(对每个进程):
RANK=0 # 每个进程唯一编号
WORLD_SIZE=4 # 总进程数
LOCAL_RANK=0 # 当前进程在本节点内的编号
MASTER_ADDR=localhost
MASTER_PORT=29500
而 init_process_group(backend="nccl")
会自动从这些环境变量中解析配置,无需你显式传入。
非 torchrun 完整代码
import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from time import time
import argparse# 对 python 多进程的一个 pytorch 包装
import torch.multiprocessing as mp
# 用于收集一些用于汇总的数据
import torch.distributed as dist
# 这个 sampler 可以把采样的数据分散到各个 CPU 上
from torch.utils.data.distributed import DistributedSampler # 实现分布式数据并行的核心类
from torch.nn.parallel import DistributedDataParallel as DDP # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
# 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
from torch.distributed import init_process_group, destroy_process_group def ddp_setup(rank, world_size):"""setup the distribution process groupArgs:rank: Unique identifier of each processworld_size: Total number of processes"""# MASTER Node(运行 rank0 进程,多机多卡时的主机)用来协调各个 Node 的所有进程之间的通信os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhostos.environ["MASTER_PORT"] = "12355" # 任意空闲端口init_process_group(backend="nccl", # Nvidia CUDA CPU 用这个 "nccl"rank=rank, world_size=world_size)torch.cuda.set_device(rank)class ConvNet(nn.Module):def __init__(self):super(ConvNet, self).__init__()self.features = nn.Sequential(nn.Conv2d(1, 32, 3, 1),nn.ReLU(),nn.Conv2d(32, 64, 3, 1),nn.ReLU(),nn.MaxPool2d(2),nn.Dropout(0.25))self.classifier = nn.Sequential(nn.Linear(9216, 128),nn.ReLU(),nn.Dropout(0.5),nn.Linear(128, 10))def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)output = F.log_softmax(x, dim=1)return output# 自定义Dataset
class MyDataset(Dataset):def __init__(self, data):self.data = datadef __len__(self):return len(self.data)def __getitem__(self, idx):image, label = self.data[idx]return image, labelclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_every # 指定保存 ckpt 的周期self.model = DDP(model, device_ids=[gpu_id]) # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()# 分布式同步 lossreduced_loss = loss.detach()dist.all_reduce(reduced_loss, op=dist.ReduceOp.SUM)reduced_loss /= dist.get_world_size()return reduced_loss.item()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_data.sampler.set_epoch(epoch) # 在各个 epoch 入口调用 DistributedSampler 的 set_epoch 方法是很重要的,这样才能打乱每个 epoch 的样本顺序total_loss = 0.0num_batches = 0for source, targets in self.train_data: source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)loss = self._run_batch(source, targets)total_loss += lossnum_batches += 1avg_loss = total_loss / num_batchesif self.gpu_id == 0:print(f"[GPU{self.gpu_id}] Epoch {epoch} | Avg Loss: {avg_loss:.4f}")def _save_checkpoint(self, epoch):ckp = self.model.module.state_dict() # 由于多了一层 DDP 包装,通过 .module 获取原始参数 PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)# 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)def prepare_dataset():transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])train_data = datasets.MNIST(root = './mnist',train=True, # 设置True为训练数据,False为测试数据transform = transform,# download=True # 设置True后就自动下载,下载完成后改为False即可)train_set = MyDataset(train_data)test_data = datasets.MNIST(root = './mnist',train=False, # 设置True为训练数据,False为测试数据transform = transform,)test_set = MyDataset(test_data)return train_set, test_setdef load_train_objs():train_set, test_set = prepare_dataset() # load your datasetmodel = ConvNet() # load your modeloptimizer = optim.Adam(model.parameters(), lr=1e-3)return train_set, test_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False, # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池, 仅是单个进程 gpu rank 的初始化ddp_setup(rank, world_size)# 进行训练train_set, test_set, model, optimizer = load_train_objs()print(f"Train dataset size: {len(train_set)}")train_data = prepare_dataloader(train_set, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()def arg_parser():parser = argparse.ArgumentParser(description='MNIST distributed training job')parser.add_argument("--epochs", type=int, default=5, help="Number of training epochs")parser.add_argument("--batch_size", type=int, default=512, help="Batch size for training")parser.add_argument('--save_every', type=int, default=1, help='How often to save a snapshot')return parser.parse_args()r"""
README
执行命令: CUDA_VISIBLE_DEVICES=0,1 python mnist_ddp.py # 用 2 卡训练注意训练数据是60K条, 训练时输出:
[GPU0] Epoch 0 | Batchsize: 512 | Steps: 59
[GPU1] Epoch 0 | Batchsize: 512 | Steps: 59
512 * 59 = 30208 ~= 30K
排除掉有些 batch_size 不足的情况, 59个 batch 就是 30K, 两个 gpu 平分了数据"""if __name__ == "__main__":args = arg_parser()print(f"Training arguments: {args}")world_size = torch.cuda.device_count()print(f"Using {world_size} GPUs for training")# 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args# 注意不需要 fn 的 rank 参数,它由 mp.spawn 自动分配mp.spawn(fn=main, args=(world_size, args.save_every, args.epochs, args.batch_size), nprocs=world_size)
启动代码
CUDA_VISIBLE_DEVICES=0,1 python mnist_ddp.py
torchrun 完整代码
import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from time import time
import argparse# 对 python 多进程的一个 pytorch 包装
import torch.multiprocessing as mp
# 用于收集一些用于汇总的数据
import torch.distributed as dist
# 这个 sampler 可以把采样的数据分散到各个 CPU 上
from torch.utils.data.distributed import DistributedSampler # 实现分布式数据并行的核心类
from torch.nn.parallel import DistributedDataParallel as DDP # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
# 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
from torch.distributed import init_process_group, destroy_process_group def ddp_setup():"""setup the distribution process groupArgs:rank: Unique identifier of each processworld_size: Total number of processes"""# 用torchrun 后台自动设置的环境变量init_process_group(backend="nccl")torch.cuda.set_device(int(os.environ['LOCAL_RANK']))class ConvNet(nn.Module):def __init__(self):super(ConvNet, self).__init__()self.features = nn.Sequential(nn.Conv2d(1, 32, 3, 1),nn.ReLU(),nn.Conv2d(32, 64, 3, 1),nn.ReLU(),nn.MaxPool2d(2),nn.Dropout(0.25))self.classifier = nn.Sequential(nn.Linear(9216, 128),nn.ReLU(),nn.Dropout(0.5),nn.Linear(128, 10))def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)output = F.log_softmax(x, dim=1)return output# 自定义Dataset
class MyDataset(Dataset):def __init__(self, data):self.data = datadef __len__(self):return len(self.data)def __getitem__(self, idx):image, label = self.data[idx]return image, labelclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,save_every: int,) -> None:self.gpu_id = int(os.environ['LOCAL_RANK']) # gpu_id 由 torchrun 自动设置self.model = model.to(self.gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_every # 指定保存 ckpt 的周期self.model = DDP(model, device_ids=[self.gpu_id]) # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()# 分布式同步 lossreduced_loss = loss.detach()dist.all_reduce(reduced_loss, op=dist.ReduceOp.SUM)reduced_loss /= dist.get_world_size()return reduced_loss.item()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_data.sampler.set_epoch(epoch)total_loss = 0.0num_batches = 0for source, targets in self.train_data: source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)loss = self._run_batch(source, targets)total_loss += lossnum_batches += 1avg_loss = total_loss / num_batchesif self.gpu_id == 0:print(f"[GPU{self.gpu_id}] Epoch {epoch} | Avg Loss: {avg_loss:.4f}")def _save_checkpoint(self, epoch):ckp = self.model.module.state_dict() # 由于多了一层 DDP 包装,通过 .module 获取原始参数 PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)# 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)def prepare_dataset():transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])train_data = datasets.MNIST(root = './mnist',train=True, # 设置True为训练数据,False为测试数据transform = transform,# download=True # 设置True后就自动下载,下载完成后改为False即可)train_set = MyDataset(train_data)test_data = datasets.MNIST(root = './mnist',train=False, # 设置True为训练数据,False为测试数据transform = transform,)test_set = MyDataset(test_data)return train_set, test_setdef load_train_objs():train_set, test_set = prepare_dataset() # load your datasetmodel = ConvNet() # load your modeloptimizer = optim.Adam(model.parameters(), lr=1e-3)return train_set, test_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False, # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)def main(save_every: int, total_epochs: int, batch_size: int):# 初始化进程池, 仅是单个进程 gpu rank 的初始化ddp_setup()# 进行训练train_set, test_set, model, optimizer = load_train_objs()print(f"Train dataset size: {len(train_set)}")train_data = prepare_dataloader(train_set, batch_size)trainer = Trainer(model, train_data, optimizer, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()def arg_parser():parser = argparse.ArgumentParser(description='MNIST distributed training job')parser.add_argument("--epochs", type=int, default=5, help="Number of training epochs")parser.add_argument("--batch_size", type=int, default=512, help="Batch size for training")parser.add_argument('--save_every', type=int, default=1, help='How often to save a snapshot')return parser.parse_args()r"""
README
执行命令: CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 mnist_ddp_torchrun.py
"""if __name__ == "__main__":args = arg_parser()print(f"Training arguments: {args}")world_size = torch.cuda.device_count()print(f"Using {world_size} GPUs for training")main(args.save_every, args.epochs, args.batch_size)
启动代码
CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 mnist_ddp_torchrun.py