【分布式训练】分布式训练中的资源管理分类
文章目录
- 分布式训练中的资源管理分类
- 一、每个进程必须独立创建的模块
- (一)模型实例
- 为什么需要独立创建模型实例?
- (二)优化器和学习率调度器
- 为什么需要独立创建优化器和学习率调度器?
- (三)数据采样器
- 为什么需要独立创建数据采样器?
- (四)数据加载器
- 为什么需要独立创建数据加载器?
- (五)GPU设备和CUDA上下文
- (六)梯度缩放器(混合精度训练)
- 为什么需要创建独立的梯度缩放器?
- (七)进程组通信
- 二、可以在多进程之间共享的模块
- (一)数据集对象
- 为什么数据集对象可以共享?
- (二)超参数配置
- (三)模型架构定义
- (四)文件路径配置
- (五)随机种子(初始设置)
- 三、可以只在主进程进行的
- (一)模型保存
- (二)日志记录和TensorBoard
- (三)验证集评估
- (四)控制台输出
- (五)检查点创建
- (六)数据预处理和划分
- (七)指标计算和报告
- (八)代码备份
- 四、需要特殊同步处理的
- (一)数据索引广播
- (二)模型权重初始化
- (三)指标收集
- 最佳实践
- 本文总结:
本篇博客总结了:
- 分布式训练中哪些资源是每个进程独立创建的?
- 分布式训练中哪些资源是可以共享的?
- 哪些操作可以只在主进程中进行?
- 为什么可以共享?为什么必须独立创建?
分布式训练中的资源管理分类
一、每个进程必须独立创建的模块
(一)模型实例
# 每个GPU都需要独立的模型副本
model = MyModel().to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
为什么需要独立创建模型实例?
分布式训练每个进程在独立的GPU上进行
状态独立性
:每个GPU需要维护自己的模型状态(权重、BN统计、Dropout掩码)计算隔离
:每个GPU构建独立的计算图进行前向/反向传播内存隔离
:每个GPU有自己的显存空间,避免冲突并行效率
:独立的模型实例实现真正的数据并行架构要求
:分布式训练框架(如DDP)的设计前提
前向传播阶段:
GPU 0: [模型副本0] + [数据批次0] → 输出0 + 梯度0
GPU 1: [模型副本1] + [数据批次1] → 输出1 + 梯度1
GPU 2: [模型副本2] + [数据批次2] → 输出2 + 梯度2
GPU 3: [模型副本3] + [数据批次3] → 输出3 + 梯度3
梯度同步阶段:
# 所有GPU的梯度通过All-Reduce求平均
avg_gradient = (gradient0 + gradient1 + gradient2 + gradient3) / 4# 每个模型副本使用相同的平均梯度更新
model0.weight -= lr * avg_gradient
model1.weight -= lr * avg_gradient
model2.weight -= lr * avg_gradient
model3.weight -= lr * avg_gradient
注意:虽然每个进程在独立的GPU上进行,但是初始化状态以及参数更新等都是一样的。所以每个模型的参数完全一致!
分布式训练只是加速训练过程!并不一定能过提高模型的精度!
(二)优化器和学习率调度器
# 每个进程独立的优化器
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30)
为什么需要独立创建优化器和学习率调度器?
分布式训练每个进程在独立的GPU上进行
与模型实例类似:
- 状态独立性:每个优化器/调度器维护独立的状态(动量、步数、学习率历史等)
- 设备绑定:优化器状态需要与模型参数在同一GPU设备上
- 计算隔离:每个进程独立执行优化步骤,避免数据竞争
- 性能优化:本地内存访问比远程访问快得多
- 架构一致性:与DDP的设计哲学一致 - “复制一切,同步梯度”
从性能的角度上来分析:
# 独立优化器的内存访问模式:
# GPU0: 访问本地显存中的优化器状态 → 快速
# GPU1: 访问本地显存中的优化器状态 → 快速# 如果共享优化器:
# 所有GPU访问同一显存中的优化器状态 → 内存带宽瓶颈
优化器独立的主要原因:所有GPU访问同一显存中的优化器状态 → 内存带宽瓶颈
(三)数据采样器
# 每个进程使用不同的数据子集
train_sampler = DistributedSampler(dataset,num_replicas=world_size,rank=rank,shuffle=True
)
为什么需要独立创建数据采样器?
数据分区
:每个采样器负责不同的数据子集- 状态维护:采样器需要跟踪当前epoch和位置
- 随机性控制:每个进程需要独立的随机状态
负载均衡
:确保数据均匀分布到所有GPU
# 每个进程创建自己的采样器
sampler1 = DistributedSampler(dataset, rank=0, num_replicas=4)
sampler2 = DistributedSampler(dataset, rank=1, num_replicas=4)
sampler3 = DistributedSampler(dataset, rank=2, num_replicas=4)
sampler4 = DistributedSampler(dataset, rank=3, num_replicas=4)
采样器独立的技术原因
class DistributedSampler:def __init__(self, dataset, rank, num_replicas):self.rank = rank # 进程特定的rankself.num_replicas = num_replicasself.epoch = 0 # 进程特定的训练状态self.seed = 42 + rank # 进程特定的随机种子def __iter__(self):# 生成该进程专属的数据索引序列indices = indices[self.rank : self.total_size : self.num_replicas]return iter(indices)
(四)数据加载器
# 每个进程独立的数据加载器
dataloader = DataLoader(dataset,batch_size=batch_size,sampler=train_sampler,num_workers=4
)
为什么需要独立创建数据加载器?
# 每个进程创建自己的数据加载器
loader1 = DataLoader(dataset, sampler=sampler1, num_workers=4)
loader2 = DataLoader(dataset, sampler=sampler2, num_workers=4)
进程关联
:数据加载器与特定的worker进程绑定- 缓冲区管理:每个加载器维护自己的数据缓冲区
- 预取策略:独立的预取线程/进程
- 状态隔离:迭代状态不能在不同进程间共享
数据加载器独立的技术原因
# 每个数据加载器创建独立的子进程
class DataLoader:def __init__(self, dataset, sampler, num_workers):self.dataset = dataset # 共享的self.sampler = sampler # 独立的self.num_workers = num_workers# 每个加载器创建自己的worker进程池self.workers = [Process(target=self._worker_fn) for _ in range(num_workers)]
(五)GPU设备和CUDA上下文
# 每个进程绑定到不同的GPU
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
(六)梯度缩放器(混合精度训练)
# 每个进程独立的梯度缩放器
scaler = GradScaler()
为什么需要创建独立的梯度缩放器?
分布式训练每个进程在独立的GPU上进行
- 状态独立性:每个缩放器维护独立的状态(缩放因子、增长计数器等)
设备绑定
:缩放器状态需要与模型梯度在同一个GPU设备上本地决策
:基于本地梯度情况做出缩放决策,减少通信开销性能优化
:本地内存访问比远程访问快得多- 架构一致性:与分布式训练的设计模式一致
本地决策减少通信
# 独立缩放器的优势:
# - 梯度溢出检查在本地进行,无需额外通信
# - 基于同步后的梯度,检查结果自然一致
# - 避免了集中式决策的通信开销# 如果使用共享缩放器:
# - 需要额外的进程间通信来协调缩放决策
# - 可能成为性能瓶颈
内存访问优化
# 独立缩放器的内存访问:
# GPU0: 访问本地显存中的缩放器状态 → 快速
# GPU1: 访问本地显存中的缩放器状态 → 快速# 共享缩放器的内存访问:
# 所有GPU访问同一显存中的缩放器状态 → 内存带宽瓶颈
梯度缩放器独立的主要原因:所有GPU访问同一显存中的缩放器状态 → 内存带宽瓶颈
(七)进程组通信
# 每个进程独立的进程组
dist.init_process_group("nccl", rank=rank, world_size=world_size)
二、可以在多进程之间共享的模块
(一)数据集对象
dataset = MyDataset(data_path) # 所有进程共享同一个# 数据集的核心职责:
# 1. 定义如何读取原始数据
# 2. 实现 __getitem__ 方法
# 3. 定义数据预处理逻辑
# 4. 管理数据文件路径
为什么数据集对象可以共享?
- 只读性质:数据集在训练过程中通常不会被修改
- 数据一致性:所有进程需要看到相同的数据内容
- 内存效率:避免在多GPU上重复加载相同的数据
- 接口统一:确保所有进程的数据访问方式一致
(二)超参数配置
# 所有进程使用相同的配置对象
config = TrainingConfig()
(三)模型架构定义
# 模型类定义是共享的
class MyModel(nn.Module):def __init__(self):super().__init__()# 模型定义
(四)文件路径配置
# 所有进程使用相同的路径
model_save_path = "./checkpoints"
log_path = "./logs"
(五)随机种子(初始设置)
# 设置相同的随机种子
torch.manual_seed(42)
np.random.seed(42)
三、可以只在主进程进行的
(一)模型保存
if rank == 0:torch.save(model.state_dict(), "model.pth")
(二)日志记录和TensorBoard
if rank == 0:writer = SummaryWriter()writer.add_scalar('loss', loss.item(), global_step)
(三)验证集评估
if rank == 0:accuracy = evaluate(model, val_loader)print(f"Validation Accuracy: {accuracy}")
(四)控制台输出
if rank == 0:print(f"Epoch {epoch}, Loss: {loss.item()}")
(五)检查点创建
if rank == 0:checkpoint = {'model_state_dict': model.state_dict(),'optimizer_state_dict': optimizer.state_dict(),'epoch': epoch}torch.save(checkpoint, "checkpoint.pth")
(六)数据预处理和划分
if rank == 0:# 只在主进程进行数据划分train_indices, val_indices = split_dataset(dataset)# 然后广播到其他进程
(七)指标计算和报告
if rank == 0:# 收集所有进程的指标并计算平均值total_loss = torch.tensor([0.0]).cuda()dist.reduce(total_loss, dst=0)avg_loss = total_loss.item() / world_size
(八)代码备份
if rank == 0:copy_code(save_path) # 备份训练代码
四、需要特殊同步处理的
(一)数据索引广播
# 在主进程划分数据,然后广播到所有进程
if rank == 0:train_indices = np.random.permutation(len(dataset))[:train_size]
else:train_indices = None# 广播索引
train_indices = broadcast_data(train_indices, rank, world_size)
(二)模型权重初始化
# 确保所有进程的模型权重相同
if rank == 0:initial_weights = model.state_dict()
else:initial_weights = None# 广播初始权重
initial_weights = broadcast_data(initial_weights, rank, world_size)
model.load_state_dict(initial_weights)
(三)指标收集
# 收集所有进程的损失
loss_tensor = torch.tensor(loss.item()).cuda()
loss_list = [torch.zeros(1).cuda() for _ in range(world_size)]
dist.all_gather(loss_list, loss_tensor)
avg_loss = sum([l.item() for l in loss_list]) / world_size
最佳实践
类别 | 示例 | 处理方式 |
---|---|---|
必须独立 | 模型、优化器、数据加载器 | 每个进程创建自己的实例 |
可以共享 | 数据集、配置、路径 | 所有进程使用相同对象 |
主进程进行 | 保存、日志、验证 | 只在rank=0执行 |
需要同步 | 数据划分、权重初始化 | 主进程操作后广播 |
本文总结:
分布式训练的目的是高效快速的训练大模型/大数据,本文主要是总结了在大数据情境下的分布式训练。总结如下:
- 分布式训练只是在同样的数据/模型下让训练变得更快了。可以理解为原先是小明一个人(模型)在做十件事(数据);现在是小明和小明的9个影子一起在做十件事,现在是每人做一件;结果都是十件事全部做完了,区别在于如果小明自己做的话可能需要十天,现在加上九个影子可能只需要2天(因为创建影子、影子之间的通信也需要时间)
- 每个影子的必须要有做任务的能力——模型实例必须是独立创建的。
- 每个影子只处理一部分数据——数据采样器必须是独立创建的,与之对应的数据加载器也是独立创建的。
- 每个影子要有配套的基本设施,防止竞争抢资源——优化器、梯度缩放器等等也是独立创建的。
- 每个影子只处理一部分数据,所以整体的数据集只需要创建一次就可以,创建完毕后分配给每个影子。
- 要给每个影子布置相同的任务,所以超参数、文件路径等等都是共享的
- 影子只需要帮忙做任务,任务外的其他记录相关的事情,小明自己做就可以;所以模型保存、控制台输出、打印日志、模型评估、代码备份、数据预处理等等只在主进程进行即可。