当前位置: 首页 > news >正文

MNIST DDP 分布式数据并行

Distributed Data Parallel

转自我的个人博客:https://shar-pen.github.io/2025/05/04/torch-distributed-series/3.MNIST_DDP/

The difference between DistributedDataParallel and DataParallel is: DistributedDataParallel uses multiprocessing where a process is created for each GPU, while DataParallel uses multithreading.
By using multiprocessing, each GPU has its dedicated process, this avoids the performance overhead caused by GIL of Python interpreter.

  1. DDP vs DP 的并发模式
  • DDP使用的是多进程multiprocessing

    • 每个 GPU 对应一个独立的 Python 进程。
    • 各 GPU/进程之间通过通信(比如 NCCL)同步梯度
    • 进程之间可以并行,每个进程独占一个 GPU,自由度高、效率高。
  • DP 使用的是多线程(multithreading)

    • 一个 Python 主进程控制多个线程,每个线程对应一个 GPU 上的模型副本。
    • 所有线程共享同一个 Python 解释器(主进程中的 GIL 环境)。
    • 在多线程环境下,同一时刻只能有一个线程执行 Python 字节码
  1. GIL 的性能问题

Python 有个限制叫 GIL(Global Interpreter Lock)

  • 在 Python 中,进程之间可以并行,线程之间只能并发。
  • 在多线程环境下,同一时刻只能有一个线程执行 Python 字节码
  • 这意味着虽然多个线程运行在不同 GPU 上,但只要你涉及到 Python 层的逻辑(如 forward 调度、数据调度),就会被 GIL 限制,造成瓶颈。

DDP 的多进程模式就天然绕开了 GIL,每个进程有独立的 Python 解释器和 GIL,不会互相争抢锁。所以执行速度更快、效率更高、更适合大模型和多 GPU 并行

To use DistributedDataParallel on a host with N GPUs, you should spawn up N processes, ensuring that each process exclusively works on a single GPU from 0 to N-1.

总结下,DDP 用多进程给每个 GPU 配一个独立的进程,这样就不用多个线程去抢 Python 的 GIL,避免了 DataParallel 由于多线程带来的性能开销。

分布式数据并行时,模型(model parameters)/优化器(optimizer states)每张卡都会拷贝一份(replicas),在整个训练过程中 DDP 始终在卡间维持着模型参数和优化器状态的同步一致性

DDP 将 batch input,通过 DistributedSampler split & 分发到不同的 gpus 上,此时虽然模型/optimizer 相同,但因为数据输入不同,导致 loss 不同,反向传播时计算到的梯度也会不同,如何保证卡间,model/optimizer 的同步一致性,之前 DP 用的 parameter server,而它的问题就在于通信压力都在 server,所以 DDP 对这方面的改进是 ring all-reduce algorithm,将 Server 上的通讯压力均衡转到各个 Worker 上

注意有两个核心概念:

  • All to one:reduce
  • one to All:broadcast
方法名通信结构通信路径数据流向聚合策略通信瓶颈位置通信效率适合场景
Parameter Server中心化(星型)所有 Worker ⇄ PS上传全部梯度 → 聚合 → 下发参数PS 聚合PS 带宽和计算压力❌ 低(集中式瓶颈)小规模训练,原型实验
Tree All-Reduce层次化(树型)节点间按树结构上传/下传层层上传聚合 → 再层层广播层次加和 & 广播上层节点(树根)✅ 中( log ⁡ N \log N logN 轮次)多机多卡,合理拓扑连接
Broadcast + Reduce两阶段(集中)所有 → 主节点(reduce) → 所有所有上传 → 中心聚合 → 广播下发单节点聚合主节点❌ 低小规模单机多卡
Ring All-Reduce环形(对称)相邻节点之间点对点传输均匀传递/聚合,每轮处理一块数据分块加和 & 拼接无集中瓶颈✅✅ 高(带宽最优)大规模 GPU 并行,主流方案

Parameter Server(PS)和 Broadcast + Reduce 在通信机制上本质相似,区别只在于:

  • PS 是显式设计了专门的“参数服务器”角色;
  • Broadcast + Reduce 是“隐式指定”某个节点承担聚合与广播任务。

ring all-reduce:

  • Reduce-scatter:首先将 gradient 分为 n 块,在第 i 轮 (0<= i < n-1),每个 gpu j 把 第 (i+j) % n 块的数据传给下一个 gpu (j+1 % n),即每个 gpu 都把自己一个块给下一个做加法,在 n 轮结束后,每个 gpu 上都有一个块是完整的聚合了所有不同 gpu 的 gradient。
  • All-gather: 将每个 gpu 上的完整聚合后的 gradient 依次传给下一个 gpu,再传递 n-1 次就使所有 gpu 的每块 gradient 都是完整聚合的数据。

虽然传递的数据量还是和 PS 一样,但传输压力平均到每个 gpu 上,不需要单个 worker 承担明显大的压力。

概念/参数名中文含义含义解释示例(2节点 × 每节点4GPU)
world全局进程空间整个分布式系统中参与训练的所有进程总和2 节点 × 4 GPU = 8 个进程
world_size全局进程数world 中的进程总数,参与通信、同步、梯度聚合的总 worker 数8
rank全局进程编号当前进程在 world 中的唯一编号,范围是 [ 0 , world_size − 1 ] [0, \text{world\_size} - 1] [0,world_size1]第1节点是 0~3,第2节点是 4~7
node物理节点/机器实际的服务器或物理机,每个节点运行多个进程,通常对应一台机器2台服务器(假设每台4 GPU)
node_rank节点编号当前节点在所有节点中的编号,通常用于标识不同机器第1台是 0,第2台是 1
local_rank本地GPU编号当前进程在所在节点上的 GPU 编号,绑定 cuda(local_rank)每台机器上分别为 0~3

简洁点,world 代表所有服务器上的 gpu,rank 代表 world 视角下的 gpu 编号;node 代表某个具体的服务器,node_rank 代表 world 视角下的 node 编号,local_rank 代表 node 视角下的 gpu 编号。

引入 DDP 相关库

import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader# 以下是分布式相关的
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler # 分发数据集
from torch.nn.parallel import DistributedDataParallel as DDP # 用 DDP 封装 module 以支持分布式训练
from torch.distributed import init_process_group, destroy_process_group # 初始化和销毁进程组,一个 process 代表一个 gpu 进程

ddp 对原始代码的修改

参数作用说明
MASTER_ADDR指定 主节点(rank=0 所在节点)的 IP 地址或主机名,作为所有进程连接的“服务器”
MASTER_PORT指定主节点上用于通信监听的端口号,所有进程都通过这个端口进行连接与协调

为什么只需要指定主节点的地址和端口?所有进程必须“集合”在一起组成一个通信组(process group);这个过程需要一个 协调者,就像组织会议需要一个人发出会议链接一样;PyTorch DDP 把这个协调角色交给 rank == 0 的进程(主节点);其它进程只需要“知道去哪找这个协调者”就能完成初始化。

主节点负责协调组网,在 DDP 初始化时,所有节点主动连接主节点,每个节点都会告知主节点自己的地址和端口,主节点收集所有其他进程的网络信息,构建全局通信拓扑,将通信配置信息广播回每个进程,包括每个 rank 要连接哪些 peer,这样每个进程就可以进行后续的双向传输,而不再依赖主节点作为中转。

主节点(rank 0)工作节点(rank 1,2,…)
MASTER_PORT 启动一个监听服务(如 TCP server)主动连接 MASTER_ADDR:MASTER_PORT
监听并接受连接,记录加入者信息与主节点握手,注册自己的 rank、地址等
构建通信拓扑,如 Ring 或 NCCL 分组等一旦接入,就获得组网配置,与其他 worker 点对点通信
ddp 初始化和销毁进程
def ddp_setup(rank, world_size):"""Args:rank: Unique identifier of each processworld_size: Total number of processes"""# rank 0 processos.environ["MASTER_ADDR"] = "localhost"os.environ["MASTER_PORT"] = "12355"# nccl:NVIDIA Collective Communication Library # 分布式情况下的,gpus 间通信torch.cuda.set_device(rank)init_process_group(backend="nccl", rank=rank, world_size=world_size)

DDP 会在每个 GPU 上运行一个进程,每个进程中都有一套完全相同的 Trainer 副本(包括 model 和 optimizer),各个进程之间通过一个进程池进行通信。

ddp 包装 model

训练函数不需要多大的修改,使用 DistributedDataParallel 包装模型,这样模型才能在各个进程间同步参数。包装后 model 变成了一个 DDP 对象,要访问其参数得这样写 self.model.module.state_dict()

运行过程中单独控制某个进程进行某些操作,比如要想保存 ckpt,由于每张卡里都有完整的模型参数,所以只需要控制一个进程保存即可。需要注意的是:使用 DDP 改写的代码会在每个 GPU 上各自运行,因此需要在程序中获取当前 GPU 的 rank(gpu_id),这样才能对针对性地控制各个 GPU 的行为。

class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int, ) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_data self.optimizer = optimizerself.save_every = save_everyself.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_dataloader.sampler.set_epoch(epoch) # 注意需要在各 epoch 入口调用该 sampler 对象的 set_epoch() 方法,否则每个 epoch 加载的样本顺序都不变for source, targets in self.train_data:source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp = self.model.state_dict()PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)

在程序入口初始化进程池;在程序出口销毁进程池

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池ddp_setup(rank, world_size)# 进行训练dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()
DistributedSampler

构造 Dataloader 时使用 DistributedSampler 作为 sampler,这个采样器可以自动将数量为 batch_size 的数据分发到各个GPU上,并保证数据不重叠。理解是可以是这样的,但实际是根据 rank 让每个 gpu 能索引到的数据不一样,每个 gpu 上也是有重复的 Dataloader 的,但每个gpu 上 rank 设置不同,Dataloader sample 先根据 shuffle 打乱顺序,再控制不同 rank 能索引到的数据,以实现类似分发的效果。

Rank 0 sees: [4, 7, 3, 0, 6] Rank 1 sees: [1, 5, 9, 8, 2]

def prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)

set_epoch(epoch) 用于设置当前训练 epoch,以确保在分布式训练中 每个进程对数据的打乱顺序一致,从而保证每个 rank 分到的数据是互不重叠且可复现的。

当 DistributedSampler 的 shuffle=True 时,它在每个 epoch 会用 torch.Generator().manual_seed(seed) 生成新的随机索引顺序。
但:

  • 如果不调用 set_epoch(),每个进程将使用相同的默认种子;
  • 会导致每个 epoch 每个进程打乱后的样本索引相同 → 重复取样,每个 epoch 的训练数据都一样 → 训练不正确!

你确实可以不手动设置 rank 和 world_size,因为 DistributedSampler 会自动从环境变量中获取它们。
如果你不传入 rank 和 num_replicas,PyTorch 会调用:

  • torch.distributed.get_world_size() # 获取 world_size
  • torch.distributed.get_rank() # 获取当前进程 rank
import torch
from torch.utils.data import Dataset, DataLoader, DistributedSampler# 自定义一个简单的数据集:返回 [0, 1, ..., n-1]
class RangeDataset(Dataset):def __init__(self, n):self.data = list(range(n))def __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx]# 模拟两张卡(进程)下的样本访问情况,并支持 set_epoch
def simulate_distributed_sampler(n=10, world_size=2, num_epochs=2):dataset = RangeDataset(n)for epoch in range(num_epochs):print(f"\nEpoch {epoch}")for rank in range(world_size):# 设置 shuffle=True 并调用 set_epochsampler = DistributedSampler(dataset,num_replicas=world_size,rank=rank,shuffle=True,)sampler.set_epoch(epoch)  # 关键:确保每轮不同但在所有 rank 一致dataloader = DataLoader(dataset, batch_size=1, sampler=sampler)data_seen = [batch[0].item() for batch in dataloader]print(f"Rank {rank} sees: {data_seen}")simulate_distributed_sampler(n=10, world_size=2, num_epochs=2)
Epoch 0
Rank 0 sees: [4, 7, 3, 0, 6]
Rank 1 sees: [1, 5, 9, 8, 2]Epoch 1
Rank 0 sees: [5, 1, 0, 9, 7]
Rank 1 sees: [6, 2, 8, 3, 4]
multiprocessing.spawn 创建多卡进程

使用 torch.multiprocessing.spawn 方法将代码分发到各个 GPU 的进程中执行。在当前机器上启动 nprocs=world_size 个子进程,每个进程执行一次 main() 函数,并由 mp.spawn 自动赋值第一个参数(目的是执行 nprocs 个进程,第一个参数为 0 ~ nprocs-1)。

def start_process(i):# Each process is assigned a file to write tracebacks to.  We# use the file being non-empty to indicate an exception# occurred (vs an expected shutdown).  Note: this previously# used a multiprocessing.Queue but that can be prone to# deadlocks, so we went with a simpler solution for a one-shot# message between processes.tf = tempfile.NamedTemporaryFile(prefix="pytorch-errorfile-", suffix=".pickle", delete=False)tf.close()os.unlink(tf.name)process = mp.Process(target=_wrap,args=(fn, i, args, tf.name),daemon=daemon,)process.start()return i, process, tf.nameif not start_parallel:for i in range(nprocs):idx, process, tf_name = start_process(i)error_files[idx] = tf_nameprocesses[idx] = process

可以执行执行以下代码,它展现了 mp 创建进程的效果

import torch.multiprocessing as mpdef run(rank, message):print(f"[Rank {rank}] Received message: {message}")if __name__ == "__main__":world_size = 4  # 启动 4 个进程(模拟 4 个GPU)mp.spawn(fn=run,args=("hello world",),   # 注意是 tuple 格式nprocs=world_size,join=True)

效果为:

[Rank 0] Received message: hello world
[Rank 3] Received message: hello world
[Rank 2] Received message: hello world
[Rank 1] Received message: hello world
# 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args
# 注意不需要传入 fn 的 rank 参数,它由 mp.spawn 自动分配
import multiprocessing as mp
world_size = torch.cuda.device_count()
mp.spawn(fn=main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size
)!CUDA_VISIBLE_DEIVES=0,1 python mnist_ddp.py

torchrun

torchrun 是 PyTorch 官方推荐的分布式训练启动工具,它用来 自动管理多进程启动、环境变量传递和通信初始化,替代早期的 torch.distributed.launch 工具。

  • 它帮你在每个 GPU 上自动启动一个训练进程;
  • 它设置好 DDP 所需的环境变量(如 RANK, WORLD_SIZE, LOCAL_RANK, MASTER_ADDR 等);
  • 它会自动将这些参数传递给你的脚本中的 torch.distributed.init_process_group()。

torchrun == python -m torch.distributed.launch --use-env

参数名类型说明
--nproc_per_nodeint每台机器上启动的进程数(默认值为 1)
--nnodesint总节点(机器)数
--node_rankint当前节点编号(范围:0 ~ nnodes-1
--rdzv_backendstrrendezvous 后端(默认 c10d,一般不改)
--rdzv_endpointstrrendezvous 主地址和端口,格式如 "localhost:29500"
--rdzv_idstr作业唯一标识,默认 "default"
--rdzv_confstr可选的 kv 参数,用逗号分隔,如 "key1=val1,key2=val2"
--max_restartsint失败时最多重启次数(默认 3)
--monitor_intervalfloatmonitor 进程检查的间隔(秒)
--run_pathstr若脚本是模块路径形式,比如 my_module.train,则用此代替 script
--teestr控制日志输出,可选值为 "stdout""stderr"
--log_dirstr日志输出目录(默认当前目录)
--redirectsstr重定向日志,可选:all, none, rank,如 all:stdout
--no_pythonflag若已是 Python 脚本(不用再次 python 调用),可加这个 flag

以上的 rendezvous 是每个进程通过 rendezvous 找到主节点,然后加入。之后的通信阶段用 backend, 即 NCCL,在 init_process_group 设置。

最常见的几个参数的用法是

torchrun \--nproc_per_node=4 \--nnodes=1 \--node_rank=0 \--rdzv_endpoint=localhost:29500 \your_script.py

对比下是否使用 torchrun 时的行为差别

两种 DDP 启动模式的关键区别

对比项不使用 torchrun(手动)使用 torchrun(推荐方式)
启动方式使用 mp.spawn(fn, ...)使用 torchrun --nproc_per_node=N
rank, world_size 设置方式手动传入(通过 spawn 的参数)自动由 torchrun 设置环境变量
主节点地址 / 端口你必须手动设置 MASTER_ADDR/PORTtorchrun 会自动设置这些环境变量
是否需控制进程数量手动使用 spawn 创建自动由 torchrun 创建
是否读取环境变量❌ 默认不会✅ 自动从环境变量中读取(如 RANK, LOCAL_RANK
脚本能否直接运行(python train.py❌ 通常不行,需要多进程协调✅ 支持直接 torchrun train.py 运行
是否适用于多机❌ 手动处理跨节点逻辑✅ 内建 --nnodes, --node_rank, 可跨机运行

init_process_group() 的行为

情况说明
手动传 rankworld_size常用于 mp.spawn 场景(你在代码里传了参数)
不传,内部读取环境变量如果你用的是 torchrun,环境变量如 RANKWORLD_SIZE 自动设置了
不传又没用 torchrun❌ 报错:因为 init_process_group 找不到必要的通信信息

当你运行:

torchrun --nproc_per_node=4 --rdzv_endpoint=localhost:29500 train.py

它在后台自动设置了以下环境变量(对每个进程):

RANK=0         # 每个进程唯一编号
WORLD_SIZE=4   # 总进程数
LOCAL_RANK=0   # 当前进程在本节点内的编号
MASTER_ADDR=localhost
MASTER_PORT=29500

init_process_group(backend="nccl") 会自动从这些环境变量中解析配置,无需你显式传入。

非 torchrun 完整代码

import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from time import time
import argparse# 对 python 多进程的一个 pytorch 包装
import torch.multiprocessing as mp
# 用于收集一些用于汇总的数据
import torch.distributed as dist
# 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
from torch.utils.data.distributed import DistributedSampler     # 实现分布式数据并行的核心类        
from torch.nn.parallel import DistributedDataParallel as DDP         # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
# 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
from torch.distributed import init_process_group, destroy_process_group def ddp_setup(rank, world_size):"""setup the distribution process groupArgs:rank: Unique identifier of each processworld_size: Total number of processes"""# MASTER Node(运行 rank0 进程,多机多卡时的主机)用来协调各个 Node 的所有进程之间的通信os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhostos.environ["MASTER_PORT"] = "12355"     # 任意空闲端口init_process_group(backend="nccl",                     # Nvidia CUDA CPU 用这个 "nccl"rank=rank,                          world_size=world_size)torch.cuda.set_device(rank)class ConvNet(nn.Module):def __init__(self):super(ConvNet, self).__init__()self.features = nn.Sequential(nn.Conv2d(1, 32, 3, 1),nn.ReLU(),nn.Conv2d(32, 64, 3, 1),nn.ReLU(),nn.MaxPool2d(2),nn.Dropout(0.25))self.classifier = nn.Sequential(nn.Linear(9216, 128),nn.ReLU(),nn.Dropout(0.5),nn.Linear(128, 10))def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)output = F.log_softmax(x, dim=1)return output# 自定义Dataset
class MyDataset(Dataset):def __init__(self, data):self.data = datadef __len__(self):return len(self.data)def __getitem__(self, idx):image, label = self.data[idx]return image, labelclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) -> None:self.gpu_id = gpu_idself.model = model.to(gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_every                    # 指定保存 ckpt 的周期self.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()# 分布式同步 lossreduced_loss = loss.detach()dist.all_reduce(reduced_loss, op=dist.ReduceOp.SUM)reduced_loss /= dist.get_world_size()return reduced_loss.item()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_data.sampler.set_epoch(epoch)        # 在各个 epoch 入口调用 DistributedSampler 的 set_epoch 方法是很重要的,这样才能打乱每个 epoch 的样本顺序total_loss = 0.0num_batches = 0for source, targets in self.train_data: source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)loss = self._run_batch(source, targets)total_loss += lossnum_batches += 1avg_loss = total_loss / num_batchesif self.gpu_id == 0:print(f"[GPU{self.gpu_id}] Epoch {epoch} | Avg Loss: {avg_loss:.4f}")def _save_checkpoint(self, epoch):ckp = self.model.module.state_dict()            # 由于多了一层 DDP 包装,通过 .module 获取原始参数 PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)# 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)def prepare_dataset():transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])train_data = datasets.MNIST(root = './mnist',train=True,       # 设置True为训练数据,False为测试数据transform = transform,# download=True  # 设置True后就自动下载,下载完成后改为False即可)train_set = MyDataset(train_data)test_data = datasets.MNIST(root = './mnist',train=False,       # 设置True为训练数据,False为测试数据transform = transform,)test_set = MyDataset(test_data)return train_set, test_setdef load_train_objs():train_set, test_set = prepare_dataset()  # load your datasetmodel = ConvNet()  # load your modeloptimizer = optim.Adam(model.parameters(), lr=1e-3)return train_set, test_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池, 仅是单个进程 gpu rank 的初始化ddp_setup(rank, world_size)# 进行训练train_set, test_set, model, optimizer = load_train_objs()print(f"Train dataset size: {len(train_set)}")train_data = prepare_dataloader(train_set, batch_size)trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()def arg_parser():parser = argparse.ArgumentParser(description='MNIST distributed training job')parser.add_argument("--epochs", type=int, default=5, help="Number of training epochs")parser.add_argument("--batch_size", type=int, default=512, help="Batch size for training")parser.add_argument('--save_every', type=int, default=1, help='How often to save a snapshot')return parser.parse_args()r"""
README
执行命令: CUDA_VISIBLE_DEVICES=0,1 python mnist_ddp.py # 用 2 卡训练注意训练数据是60K条, 训练时输出:
[GPU0] Epoch 0 | Batchsize: 512 | Steps: 59
[GPU1] Epoch 0 | Batchsize: 512 | Steps: 59
512 * 59 = 30208 ~= 30K
排除掉有些 batch_size 不足的情况, 59个 batch 就是 30K, 两个 gpu 平分了数据"""if __name__ == "__main__":args = arg_parser()print(f"Training arguments: {args}")world_size = torch.cuda.device_count()print(f"Using {world_size} GPUs for training")# 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args# 注意不需要 fn 的 rank 参数,它由 mp.spawn 自动分配mp.spawn(fn=main, args=(world_size, args.save_every, args.epochs, args.batch_size), nprocs=world_size)

启动代码

CUDA_VISIBLE_DEVICES=0,1 python mnist_ddp.py

torchrun 完整代码

import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from time import time
import argparse# 对 python 多进程的一个 pytorch 包装
import torch.multiprocessing as mp
# 用于收集一些用于汇总的数据
import torch.distributed as dist
# 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
from torch.utils.data.distributed import DistributedSampler     # 实现分布式数据并行的核心类        
from torch.nn.parallel import DistributedDataParallel as DDP         # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
# 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
from torch.distributed import init_process_group, destroy_process_group def ddp_setup():"""setup the distribution process groupArgs:rank: Unique identifier of each processworld_size: Total number of processes"""# 用torchrun 后台自动设置的环境变量init_process_group(backend="nccl")torch.cuda.set_device(int(os.environ['LOCAL_RANK']))class ConvNet(nn.Module):def __init__(self):super(ConvNet, self).__init__()self.features = nn.Sequential(nn.Conv2d(1, 32, 3, 1),nn.ReLU(),nn.Conv2d(32, 64, 3, 1),nn.ReLU(),nn.MaxPool2d(2),nn.Dropout(0.25))self.classifier = nn.Sequential(nn.Linear(9216, 128),nn.ReLU(),nn.Dropout(0.5),nn.Linear(128, 10))def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)output = F.log_softmax(x, dim=1)return output# 自定义Dataset
class MyDataset(Dataset):def __init__(self, data):self.data = datadef __len__(self):return len(self.data)def __getitem__(self, idx):image, label = self.data[idx]return image, labelclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,save_every: int,) -> None:self.gpu_id = int(os.environ['LOCAL_RANK']) # gpu_id 由 torchrun 自动设置self.model = model.to(self.gpu_id)self.train_data = train_dataself.optimizer = optimizerself.save_every = save_every                    # 指定保存 ckpt 的周期self.model = DDP(model, device_ids=[self.gpu_id])    # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output = self.model(source)loss = F.cross_entropy(output, targets)loss.backward()self.optimizer.step()# 分布式同步 lossreduced_loss = loss.detach()dist.all_reduce(reduced_loss, op=dist.ReduceOp.SUM)reduced_loss /= dist.get_world_size()return reduced_loss.item()def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")self.train_data.sampler.set_epoch(epoch)total_loss = 0.0num_batches = 0for source, targets in self.train_data: source = source.to(self.gpu_id)targets = targets.to(self.gpu_id)loss = self._run_batch(source, targets)total_loss += lossnum_batches += 1avg_loss = total_loss / num_batchesif self.gpu_id == 0:print(f"[GPU{self.gpu_id}] Epoch {epoch} | Avg Loss: {avg_loss:.4f}")def _save_checkpoint(self, epoch):ckp = self.model.module.state_dict()            # 由于多了一层 DDP 包装,通过 .module 获取原始参数 PATH = "checkpoint.pt"torch.save(ckp, PATH)print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)# 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)def prepare_dataset():transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])train_data = datasets.MNIST(root = './mnist',train=True,       # 设置True为训练数据,False为测试数据transform = transform,# download=True  # 设置True后就自动下载,下载完成后改为False即可)train_set = MyDataset(train_data)test_data = datasets.MNIST(root = './mnist',train=False,       # 设置True为训练数据,False为测试数据transform = transform,)test_set = MyDataset(test_data)return train_set, test_setdef load_train_objs():train_set, test_set = prepare_dataset()  # load your datasetmodel = ConvNet()  # load your modeloptimizer = optim.Adam(model.parameters(), lr=1e-3)return train_set, test_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_size=batch_size,pin_memory=True,shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠)def main(save_every: int, total_epochs: int, batch_size: int):# 初始化进程池, 仅是单个进程 gpu rank 的初始化ddp_setup()# 进行训练train_set, test_set, model, optimizer = load_train_objs()print(f"Train dataset size: {len(train_set)}")train_data = prepare_dataloader(train_set, batch_size)trainer = Trainer(model, train_data, optimizer, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()def arg_parser():parser = argparse.ArgumentParser(description='MNIST distributed training job')parser.add_argument("--epochs", type=int, default=5, help="Number of training epochs")parser.add_argument("--batch_size", type=int, default=512, help="Batch size for training")parser.add_argument('--save_every', type=int, default=1, help='How often to save a snapshot')return parser.parse_args()r"""
README
执行命令: CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 mnist_ddp_torchrun.py
"""if __name__ == "__main__":args = arg_parser()print(f"Training arguments: {args}")world_size = torch.cuda.device_count()print(f"Using {world_size} GPUs for training")main(args.save_every, args.epochs, args.batch_size)

启动代码

CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 mnist_ddp_torchrun.py

相关文章:

  • Java泛型补充与理解
  • 虚幻引擎5-Unreal Engine笔记之常用核心类的继承关系
  • 【GESP真题解析】第 19 集 GESP 二级 2025 年 3 月编程题 1:等差矩阵
  • Scrapyd 详解:分布式爬虫部署与管理利器
  • C# 高效处理海量数据:解决嵌套并行的性能陷阱
  • 图片转ICO图标工具
  • 《Go小技巧易错点100例》第三十三篇
  • Flutter - UIKit开发相关指南 - 导航
  • 互联网大厂Java求职面试:电商商品推荐系统中的AI技术应用
  • 第31讲 循环缓冲区与命令解析
  • 【Tools】omnetpp5.6.2安装
  • 理解 Token 索引 vs 字符位置
  • DAY 17 训练
  • 【CTF】Linux Shell RCE绕过(bypass)技术总结
  • 低代码开发:开启软件开发的新篇章
  • 算法导论第9章思考题
  • 【c++】【数据结构】二叉搜索树详解
  • 数码管模块
  • 【Linux篇】高并发编程终极指南:线程池优化、单例模式陷阱与死锁避坑实战
  • M. Moving Both Hands(反向图+Dijkstra)
  • 俄官员说将适时宣布与乌克兰谈判代表
  • 夜读|尊重生命的棱角
  • 牛市早报|中美日内瓦经贸会谈联合声明公布
  • 训练孩子的科学思维,上海虹口推出“六个一百”旗舰工程
  • 重温经典|开播20周年,仙剑的那些幕后你知道吗?
  • 《尤物公园》连演8场:观众上台,每一场演出都独一无二