Megatron 中的 TensorParallel, PipelineParallel, ContextParallel,ExpertParallel
Tensor Parallel
Megatron-LM/megatron/core/tensor_parallel
张量并行核心功能摘要
此文件夹中的代码为在 Megatron-LM 中实现张量模型并行(TP)提供了基础构建模块。它允许将模型权重、激活值和梯度分片到多个 GPU 上,从而能够训练那些无法装入单个 GPU 内存的模型。其核心功能可分为几个关键领域:并行层、通信映射、损失函数、数据处理和随机数生成器(RNG)管理。
文件名 | 功能描述 |
---|---|
layers.py | 包含了核心的张量并行神经网络层,如 ColumnParallelLinear 、RowParallelLinear 和 VocabParallelEmbedding 。这些层自动处理权重分区以及在前向和反向传播过程中的必要通信(如 all-reduce)。 |
cross_entropy.py | 实现了 vocab_parallel_cross_entropy ,这是一个专门的损失函数,用于当输入 logits 沿词汇表维度分布在多个 GPU 上时,能正确计算交叉熵。 |
mappings.py | 定义了底层的通信原语(如 scatter , gather , reduce ),这些是并行层的基础,并明确了前向和反向传播中的通信模式。 |
random.py | 管理随机数生成器(RNG)状态,以确保并行环境中的正确性和可复现性,特别是对于 Dropout 等操作。它还提供了能正确保存和恢复并行 RNG 状态的检查点功能。 |
data.py | 提供在张量并行设置中处理数据的工具。其核心功能 broadcast_data 确保一个张量并行组中的所有 GPU 都接收到完全相同的输入数据批次。 |
utils.py | 包含张量并行中常用的张量操作辅助函数,例如沿特定维度切分张量以及在全局和局部词汇索引之间进行映射的工具。 |
1. 并行层 (layers.py
)
这些模块是 torch.nn.Module
的子类,用其张量并行等价物替换了标准的 PyTorch 层。它们在内部处理权重的分片和必要的通信原语。
-
ColumnParallelLinear
:实现一个线性层(Y = XA + b),其中权重矩阵A
沿其列(output_size
维度)进行分区。- 前向传播:输入
X
被广播到所有 TP rank。每个 rank 使用其本地权重分片A_i
计算部分输出Y_i = XA_i
。如果gather_output=False
(常见情况),则输出Y_i
保持分布式。如果gather_output=True
,则执行 all-gather 操作,使完整的输出Y
在所有 rank 上都可用。 - 反向传播:梯度
dY
被分散(如果在前向传播中被收集),每个 rank 计算其本地的权重梯度dX_i
和dA_i
。然后通过对部分梯度dX_i
进行 all-reduce 操作来计算输入梯度dX
。
- 前向传播:输入
-
RowParallelLinear
:实现一个线性层,其中权重矩阵A
沿其行(input_size
维度)进行分区。该层通常跟在ColumnParallelLinear
层之后。- 前向传播:输入
X
被假定为分布式(沿其最后一个维度分区)。每个 rank 使用其本地输入X_i
和权重分片A_i
计算部分输出Y_i = X_iA_i
。最终输出Y
是通过 all-reduce 操作获得的,该操作对来自所有 TP rank 的部分输出Y_i
求和。 - 反向传播:梯度
dY
被传递给所有 rank。每个 rank 计算其本地梯度dX_i
和dA_i
。输入梯度dX_i
保持在每个 rank 的本地,为前一个ColumnParallelLinear
层的反向传播做好准备。
- 前向传播:输入
-
VocabParallelEmbedding
:实现一个并行嵌入层,其中嵌入权重矩阵沿词汇表维度(逐行)进行分区。- 前向传播:每个 TP rank 持有完整嵌入表的一个不同分片。对于给定的 token ID 输入序列,每个 rank 查找其分配的词汇表范围内的 token 的嵌入。对于超出其范围的 token,它会生成一个零向量。最终的嵌入输出是通过 all-reduce 操作获得的,该操作对来自所有 rank 的部分结果求和。
-
linear_with_grad_accumulation_and_async_allreduce
:这不是一个层,而是一个为线性层提供支持的核心torch.autograd.Function
。它通过以下方式提供了高度优化的反向传播:- 融合梯度累积:它可以将权重梯度直接累积到
.main_grad
缓冲区中,避免了在梯度计算后需要一个额外的加法核。 - 异步通信:它异步地启动输入梯度(
dgrad
)的 all-reduce,与权重梯度(wgrad
)的计算同时进行。这将通信与计算重叠,隐藏了通信延迟并提高了性能。这需要设置环境变量CUDA_DEVICE_MAX_CONNECTIONS=1
。
- 融合梯度累积:它可以将权重梯度直接累积到
2. 通信映射 (mappings.py
)
这些是作为 torch.autograd.Function
实现的核心通信原语。它们定义了张量并行所需的特定前向和反向通信模式。
-
恒等 / All-Reduce 对:
copy_to_tensor_model_parallel_region
:前向是恒等操作。反向执行 all-reduce。这用于为并行区域准备输入激活。reduce_from_tensor_model_parallel_region
:前向执行 all-reduce。反向是恒等操作。这在RowParallelLinear
层的末尾使用。
-
切分 / 收集对:
scatter_to_tensor_model_parallel_region
:前向沿最后一个(隐藏)维度 切分 张量,每个 rank 保留其对应的块。反向执行 all-gather。gather_from_tensor_model_parallel_region
:前向从所有 rank all-gather 张量,并沿最后一个维度连接它们。反向执行 切分。
-
序列并行原语:这些是优化措施,它沿序列维度(第一维度)而不是隐藏维度对激活进行分片,从而显著减少了激活内存。
scatter_to_sequence_parallel_region
:沿 第一维度 切分张量。gather_from_sequence_parallel_region
:沿 第一维度 收集。其反向传播执行reduce-scatter
,而不仅仅是切分,因为后续的计算也是并行的。reduce_scatter_to_sequence_parallel_region
:沿 第一维度 执行reduce-scatter
。当启用序列并行时,在RowParallelLinear
层的输出处使用。
3. 损失计算 (cross_entropy.py
)
vocab_parallel_cross_entropy
:一个自定义的交叉熵损失函数,设计用于处理在 TP rank 间分片的 logits(由模型末端的ColumnParallelLinear
层产生)。- 过程:为了正确计算 softmax 的分母
sum(exp(logits))
,它需要通信。- 为了数值稳定性,它首先通过 all-reduce 计算所有 TP rank 上的
max(logits)
。 - 每个 rank 在其本地的词汇表分片上计算部分
sum_exp_logits
。 - 这些部分和通过 all-reduce 相加,以获得正确的全局分母。
- 然后计算最终的损失。反向传播被仔细实现,以确保梯度正确地流向分布式的 logits。
- 为了数值稳定性,它首先通过 all-reduce 计算所有 TP rank 上的
- 过程:为了正确计算 softmax 的分母
4. 随机数生成器 (RNG) 管理 (random.py
)
在并行环境中确保正确性需要仔细管理 RNG 状态,特别是对于像 dropout 这样的操作。
CudaRNGStatesTracker
:一个按名称跟踪多个 RNG 状态的类。model_parallel_cuda_manual_seed
:初始化 RNG 的主要函数。它设置了三个不同的 RNG 状态:- 数据并行状态:在 TP 组内的所有 rank 上相同,但在不同的数据并行组之间不同。用于 TP 区域之外的操作(例如,在非张量并行的 MLP 中的 dropout)。
- 张量模型并行状态:在 TP 组内的每个 rank 上都不同。这确保了像 dropout 这样的并行区域中的操作对每个分片应用不同的掩码,这是期望的行为。
- 专家模型并行状态:用于混合专家(MoE)层的单独状态。
checkpoint
:torch.utils.checkpoint
(激活检查点)的自定义实现。它至关重要,因为它不仅正确保存和恢复标准的 CUDA RNG 状态,还正确处理由CudaRNGStatesTracker
管理的命名状态。这保证了在反向传播期间重新计算激活时的位对位可复现性。
5. 数据处理 (data.py
)
broadcast_data
:一个实用函数,用于将数据(例如,来自数据加载器的批次)从张量并行组的 rank 0 GPU 广播到该组中的所有其他 GPU。这确保了 TP 组中的所有 rank 都以相同的输入数据开始。
核心并行层详解
VocabParallelEmbedding
详解
VocabParallelEmbedding
层是标准嵌入层的一个特殊版本,旨在通过在张量并行组中的多个 GPU 上对嵌入表进行分区来处理极大的词汇表。在现代大型语言模型中,嵌入表(将词汇表中的每个 token 映射到一个密集向量)可能占模型总大小的很大一部分。例如,一个拥有 100,000 个 token 词汇表和 4096 隐藏大小的模型,其嵌入表大小约为 100,000 * 4096 * 4 bytes/fp32 ≈ 1.64 GB
。对于更大的词汇表和隐藏大小,这可能成为内存瓶颈。
VocabParallelEmbedding
通过让每个 GPU 只拥有完整嵌入表的一个切片来解决这个问题。
1. 初始化 (__init__
)
设置过程对于确保分区被正确处理至关重要。
A. 词汇表分区:
当 VocabParallelEmbedding
层初始化时,第一步是在张量并行组中的可用 GPU 之间划分整个词汇表。
- 总词汇表大小 (
num_embeddings
) 除以张量并行世界大小 (tp_group.size()
)。 - 使用
VocabUtility.vocab_range_from_global_vocab_size
,每个 GPU 计算其分配到的词汇表范围。这为每个 rank 提供了self.vocab_start_index
和self.vocab_end_index
。例如,对于一个 50,000-token 的词汇表和 2 个 GPU:- GPU 0 获得范围
[0, 25000)
。 - GPU 1 获得范围
[25000, 50000)
。
- GPU 0 获得范围
- 然后,每个 GPU 为其本地的嵌入表分区
self.weight
分配内存,其形状为[num_embeddings_per_partition, embedding_dim]
。
B. 权重初始化:
正确初始化这些分布式权重至关重要。不能简单地让每个 GPU 独立初始化其切片,因为这会导致模型权重不一致。
- 初始化逻辑(例如
_initialize_affine_weight_cpu
)在概念上于 CPU 上创建完整的主权重张量。 - 然后,它沿着词汇表维度(维度 0)将这个主张量分割成与每个 GPU 分区相对应的块。
- 每个 GPU 将其分配到的块从主权重张量复制到其本地的
self.weight
参数中。 - 这确保了如果你收集所有 GPU 上的所有
self.weight
张量,你将完美地重建一个单一、一致初始化的嵌入表。
2. 前向传播 (forward
)
前向传播是分布式查找和通信核心逻辑发生的地方。让我们追踪一个 input_
张量(一批 token ID)在该层中的过程。
步骤 1:掩码和本地化输入 ID
- 包含全局 token ID 的输入张量到达张量并行组中的每个 GPU。
- 每个 GPU 必须首先识别它负责哪些传入的 token。它通过创建一个布尔掩码
input_mask
来实现这一点,对于任何落在其分配的[vocab_start_index, vocab_end_index)
范围之外的 token ID,该掩码为True
。 - 接下来,全局 token ID 被转换为用于嵌入查找的本地索引。这是通过从 token ID 中减去
self.vocab_start_index
来完成的。对于超出范围的 token(input_mask
为True
的地方),本地索引被设置为 0。这是一个占位符;这些 token 的输出稍后将被置零,所以只要它是一个有效的本地索引,它们查找到什么都无所谓。
步骤 2:本地嵌入查找
- 现在,每个 GPU 使用其本地的、分片的
self.weight
参数和masked_input
(本地化的 token ID)执行嵌入查找。 - 结果
output_parallel
是一个张量,其中每个 GPU 都拥有它所拥有的 token 的正确嵌入向量。对于所有其他 token,它有一些垃圾向量(来自占位符查找)。
步骤 3:将范围外的嵌入置零
- 现在将步骤 1 中的
input_mask
应用于output_parallel
张量。 output_parallel
中任何对应于超出范围的 token 的行都被乘以 0。- 此时,每个 GPU 都持有一个张量,其中包含其词汇表分区的正确嵌入,而所有其他 token 的嵌入都为零。
步骤 4:通信 (All-Reduce)
- 最后一步是将这些部分结果组合成完整的嵌入张量。这是通过 all-reduce 操作实现的,该操作由
reduce_from_tensor_model_parallel_region
函数包装。 - 由于每个 GPU 都拥有其 token 的正确值,而在其他地方都为零,因此简单地在所有 GPU 上对这些张量求和(
ReduceOp.SUM
)即可得到最终的、完整的嵌入张量。 - 在 all-reduce 之后,张量并行组中的每个 GPU 都拥有相同、完整的嵌入输出,准备好进入模型的下一层。
3. 序列并行优化
VocabParallelEmbedding
层还支持一种称为序列并行的优化,通过设置 reduce_scatter_embeddings=True
来启用。
- 在标准的前向传播中(如上所述),最终的 all-reduce 使完整的激活张量在所有 GPU 上都可用。这会消耗大量内存。
- 使用序列并行时,执行的是 reduce-scatter 操作(
reduce_scatter_to_sequence_parallel_region
),而不是 all-reduce。 - 该操作仍然对部分结果求和,但随后立即沿序列维度分散结果。结果是每个 GPU 最终只得到最终激活张量的一个切片(沿序列长度分片),从而显著减少了峰值内存使用。这个分片的激活然后由后续的序列并行层处理。
ColumnParallelLinear
详解
ColumnParallelLinear
层是实现 Transformer 模型中张量并行的基本组件,特别是在前馈网络(FFN)或 MLP 块中。它取代了标准的 torch.nn.Linear
层,并将其权重矩阵划分到多个 GPU 上,以处理单个设备无法容纳的大规模网络层。
该层实现了线性变换 Y = XA + b
。其“列并行”的名称来源于权重矩阵 A
是垂直切分的,即沿其列(输出特征维度)切分。
1. 初始化 (__init__
)
ColumnParallelLinear
的设置确保了权重和偏置在张量并行(TP)组中被正确地分区和初始化。
A. 权重和偏置分区:
-
权重矩阵
A
:逻辑形状为[input_size, output_size]
的权重矩阵,沿其第二维度(output_size
)进行分区。如果你考虑 PyTorch 中存储的权重参数,其形状为[output_size, input_size]
,那么它是在维度 0 上进行分区的。- 总
output_size
除以张量并行世界大小。 - TP 组中的每个 GPU
i
分配一个本地权重分片self.weight
,形状为[output_size_per_partition, input_size]
。 - 例如,对于一个输入大小为 4096、输出大小为 16384 的 MLP,分布在 4 个 GPU 上:
- 逻辑矩阵
A
是[4096, 16384]
。 - 存储的
self.weight
参数是[16384, 4096]
。 - 4 个 GPU 中的每一个都将持有一个形状为
[4096, 4096]
的本地self.weight
分片。
- 逻辑矩阵
- 总
-
偏置向量
b
:逻辑形状为[output_size]
的偏置向量b
也以与权重矩阵列相同的方式进行分区。- 每个 GPU
i
分配一个本地偏置分片self.bias
,形状为[output_size_per_partition]
。
- 每个 GPU
B. 一致性初始化:
- 与
VocabParallelEmbedding
一样,在 CPU 上概念性地创建了完整权重和偏置的主副本。 - 然后,这个主权重被按列切分,每个 GPU 复制其对应的切片。
- 这保证了整个模型被一致地初始化,就好像它是一个单一的、非并行的层一样。
C. 关键配置参数:
gather_output
:一个布尔值,控制前向传播的输出。如果为False
(默认且在 MLP 中最常见的设置),输出保持分区状态。如果为True
,则执行 all-gather,使完整的输出在所有 GPU 上都可用。skip_bias_add
:一项优化,如果为True
,则前向传播会单独返回偏置项而不是加上它。这允许偏置加法与后续操作(例如 GELU)融合,从而节省一次核函数启动并提高性能。
2. 前向传播 (forward
)
前向传播以分布式方式计算 Y = XA + b
。
步骤 1:输入处理
- 输入张量
X
应该是完整、未分区的激活张量。 - 该输入对张量并行组中的每个 GPU 都可用。这里使用了
copy_to_tensor_model_parallel_region
函数。在前向传播中,此函数充当恒等操作——它只是简单地传递输入。其真正的目的是为反向传播设置正确的梯度钩子。
步骤 2:本地矩阵乘法
- 每个 GPU
i
使用完整的输入X
和其本地的权重分片A_i
(并加上其本地偏置b_i
)执行本地矩阵乘法。 output_parallel_i = X @ A_i^T + b_i
- 每个 GPU 上的结果
output_parallel_i
是完整输出的一个切片,形状为[batch_size, sequence_length, output_size_per_partition]
。
步骤 3:输出处理 (由 gather_output
控制)
-
情况 1:
gather_output=False
(默认)- 在前向传播结束时没有通信发生。
- 该层的输出
output_parallel
在 GPU 之间保持分区状态。 - 这是张量并行 MLP 块内部的标准行为,因为后续的
RowParallelLinear
层被设计为接受这种分区输入。
-
情况 2:
gather_output=True
- 调用
gather_from_tensor_model_parallel_region
函数。 - 这会执行一个 all-gather 操作,从所有 GPU 收集
output_parallel_i
切片,并沿最后一个维度将它们连接起来。 - 结果是完整的输出张量
Y
,现在在 TP 组中的每个 GPU 上都是相同的。当网络中的下一层不是张量并行层并期望完整的激活张量时,这是必需的。
- 调用
3. 反向传播 (核心复杂性)
反向传播是发生最关键的通信和优化的地方。它由底层的 linear_with_grad_accumulation_and_async_allreduce
函数处理。
A. 关于权重的梯度 (wgrad
)
- 为了计算其本地权重分片
A_i
的梯度,每个 GPU 需要完整的输入X
(它有)和其部分输出的梯度grad_output_i
。 - 计算
dA_i = grad_output_i^T @ X
可以完全在本地执行,无需通信。
B. 关于输入的梯度 (dgrad
)
- 这是需要通信的部分。
- 每个 GPU
i
可以使用其本地权重分片计算一个部分输入梯度:dX_i = grad_output_i @ A_i
。 - 为了得到完整的输入梯度
dX
(ColumnParallelLinear
之前的层需要它),这些部分梯度必须相加在一起:dX = dX_0 + dX_1 + ... + dX_{p-1}
。 - 这个求和是通过 all-reduce 操作执行的。
C. 异步 All-Reduce 优化
- 一个简单的实现会先计算
dX_i
,然后执行 all-reduce,然后再计算dA_i
。 ColumnParallelLinear
使用了一种更高效的方法。来自前向传播的copy_to_tensor_model_parallel_region
的反向钩子是触发输入梯度dX
的 all-reduce 的原因。linear_with_grad_accumulation_and_async_allreduce
函数巧妙地调度 CUDA 操作以重叠通信和计算:- 它启动输入梯度
dX
的 all-reduce 并立即返回。 - 当 all-reduce 在 GPU 的通信硬件上后台进行时,它继续在计算核心上计算权重梯度
dA_i
。
- 它启动输入梯度
- 当
wgrad
计算完成时,dgrad
的 all-reduce 通常也已完成。这有效地隐藏了通信延迟,是张量并行训练中性能提升的主要来源。
4. 在 Transformer 块中的作用
ColumnParallelLinear
是一个构建块。其作用在 Transformer 的标准 FFN/MLP 块的上下文中最好理解,该块通常由两个线性层组成:
- 上投影 (列并行):一个
ColumnParallelLinear
层接收输入X
(隐藏大小H
)并将其投影到一个更宽的中间大小(例如4H
)。输出在 GPU 之间是分区的。 - 下投影 (行并行):一个
RowParallelLinear
层接收分区的中间激活,并将其投影回原始的隐藏大小H
。该层在内部执行 all-reduce 以产生最终的、完整的输出。
这个 ColumnParallelLinear -> RowParallelLinear
序列是实现张量并行 MLP 的经典模式。它确保了进入和退出 MLP 块的激活是完整的(未分区的),而大的中间激活和权重保持分布式,从而节省内存并支持大规模模型。
vocab_parallel_cross_entropy
详解
vocab_parallel_cross_entropy
函数是训练使用张量并行语言模型的关键组件。在标准模型中,最后一层是一个线性投影,产生 logits,这是一个形状为 [batch_size, sequence_length, vocab_size]
的张量。然而,在张量并行模型中,这个最终的线性层通常是一个 ColumnParallelLinear
层。这意味着输出的 logits 沿着词汇表维度被分片了。张量并行组中的每个 GPU 只持有 logits 的一个切片,对应于其词汇表的分区。
标准的 torch.nn.CrossEntropyLoss
无法处理这些分布式的 logits,因为计算 softmax 和最终损失需要访问每个 token 的完整 logits 集合。vocab_parallel_cross_entropy
函数实现了必要的通信,以在这种分布式设置中正确计算交叉熵损失。
挑战:在分布式数据上计算 Softmax
交叉熵损失的核心是 LogSoftmax 函数。对于单个 token,词汇项 j
的概率是:
P(j) = exp(logit_j) / sum_k(exp(logit_k))
损失则是 -log(P(target_label))
。
主要的挑战在于分母 sum_k(exp(logit_k))
需要对整个词汇表进行求和。由于每个 GPU 只拥有 logit_k
值的一部分,它们必须进行通信来计算这个全局总和。
前向传播 (_VocabParallelCrossEntropy.forward
)
前向传播是一系列精心编排的本地计算和 GPU 间通信。
步骤 1:寻找全局最大 Logit (为了数值稳定性)
- 直接为大的 logit 值计算
exp(logit)
可能会导致浮点溢出(结果为inf
)。为了防止这种情况,一个标准的技巧是在取指数之前从所有 logits 中减去最大 logit 值:exp(logit_j - max(logits))
。 - 由于 logits 是分布式的,每个 GPU 首先在其本地分片内找到最大值:
local_max = torch.max(vocab_parallel_logits, dim=-1)[0]
。 - 然后,在所有 TP 组的 GPU 上执行一次 all-reduce 操作(
torch.distributed.all_reduce
使用op=torch.distributed.ReduceOp.MAX
)来找到真正的全局最大值logits_max
。
步骤 2:计算部分 sum_exp_logits
- 每个 GPU 从其本地的 logit 切片中减去
logits_max
。 - 然后,它计算这些移位后的 logits 的指数:
exp_logits = torch.exp(vocab_parallel_logits - logits_max)
。 - 最后,它沿着其词汇表分片对这些指数化后的值求和,得到一个部分和:
sum_exp_logits = exp_logits.sum(dim=-1)
。
步骤 3:计算全局 sum_exp_logits
(Softmax 分母)
- 在
sum_exp_logits
张量上执行另一次 all-reduce 操作(这次使用op=torch.distributed.ReduceOp.SUM
)。 - 此后,每个 GPU 都拥有批次中每个 token 的正确且完整的 softmax 分母。
步骤 4:计算目标类别的 Logit
- 损失计算需要
logit[target]
这一项。然而,正确target
token 的词汇表 ID 可能位于另一个 GPU 的分片上。 - 每个 GPU 计算落在其词汇表范围内的目标 ID 的
predicted_logits
。对于超出其范围的目标,该值为 0。 - 在这些
predicted_logits
上执行第三次 all-reduce (op=torch.distributed.ReduceOp.SUM
)。由于对于任何给定的目标,只有一个 GPU 会有非零值,这个求和操作有效地将正确的predicted_logit
值广播给所有其他 GPU。
步骤 5:计算最终损失
- 所有部分都准备好后,每个 GPU 现在可以为每个 token 计算最终损失:
loss = torch.log(sum_exp_logits) - predicted_logits
- 请注意,
predicted_logits
项在此过程中也隐式地减去了logits_max
,因此数学上仍然是正确的。
反向传播 (_VocabParallelCrossEntropy.backward
)
反向传播计算关于输入 vocab_parallel_logits
的梯度。交叉熵损失的梯度就是 (softmax_probabilities - one_hot_target)
。
- 前向传播保存了计算出的 softmax 概率(
exp_logits
除以全局sum_exp_logits
)。 - 反向函数接收这个概率分布
softmax
,并从对应于目标标签的概率中减去 1.0。 - 这是在每个 GPU 上本地完成的。对于给定的 token,其分片包含目标标签的 GPU 将执行减法操作。所有其他 GPU 将简单地将其各自的 softmax 概率作为其分片的梯度传递。
- 得到的
grad_input
是 logits 本地切片的梯度,然后可以反向传播到产生它的ColumnParallelLinear
层。在反向传播本身中不需要额外的通信,因为所有必要的全局信息都已在前向传播期间被整合到 softmax 概率中。
PipelineParallel
/Megatron-LM/megatron/core/pipeline_parallel
对于这个主要需要了解 forward_backward_pipelining_with_interleaving 函数
forward_backward_pipelining_with_interleaving
详解
forward_backward_pipelining_with_interleaving
函数可以说是 Megatron-LM 核心中最复杂但功能最强大的部分之一。它实现了一种高度优化的流水线并行调度,称为 交错式1F1B(Interleaved 1F1B, One-Forward, One-Backward)。
1. 根本问题:流水线“气泡”
在简单的流水线并行中,你有一个“预热”阶段(前向传播填充流水线),一个“稳定”阶段,以及一个“冷却”阶段(后向传播排空流水线)。
- 预热:阶段0激活,然后是阶段0和1,接着是0、1和2,依此类推。在早期,后面的阶段是空闲的。
- 冷却:阶段0首先完成其工作,然后是阶段1,等等。在后期,前面的阶段是空闲的。
这段空闲时间被称为流水线气泡(pipeline bubble),它代表了显著的低效率和浪费的GPU周期。流水线越长(GPU越多),气泡就越大。
2. 解决方案:使用虚拟流水线进行交错
forward_backward_pipelining_with_interleaving
函数极大地减少了这种气泡。其核心思想是将每个GPU(每个物理流水线阶段)上的模型分解成多个更小的部分,称为模型块(model chunks)或虚拟流水线阶段(virtual pipeline stages)。
想象一下你有4个GPU(PP大小=4)。与其让每个GPU持有一大块模型,你可以让每个GPU持有,比如说,2个模型块。现在你总共有8个虚拟阶段。
这使得单个GPU可以在处理一个模型块的前向传播的同时,为另一个模型块进行后向传播。这种交错方式能更有效地保持GPU的繁忙状态。
3. 函数的执行流程:
函数中的关键术语:
num_model_chunks
:每个物理阶段的虚拟阶段数量。microbatch_id
:正在为特定模型块处理的微批次的索引。virtual_microbatch_id
:整个流水线调度中的全局步骤或“节拍”。这是驱动整个过程的主计数器。schedule_table
:一个预先计算的查找表,将virtual_microbatch_id
映射到特定的model_chunk_id
和microbatch_id
。这定义了执行顺序,是调度的“可调”部分。
阶段 1:预热 (for k in range(num_warmup_microbatches)
)
目标: 用跨越不同模型块的前向传播来填充流水线。这里不发生后向传播。
- 调度:对于预热中的每一步
k
,schedule_table
决定了哪个model_chunk_id
应该运行其前向传播。 - 通信:
- 一个阶段从前一个物理阶段接收必要的输入张量(除非它是第一个阶段)。这个输入是为未来的前向传播准备的。
recv_tensor_from_previous_stage
辅助函数根据交错调度的复杂依赖关系来确定是否需要接收。
- 计算:
- 它为被调度的模型块执行
forward_step_helper
。 - 产生的输出张量(激活值)被存储在一个缓冲区(
output_tensors
)中。这一点至关重要,因为这个张量在很久之后的相应后向传播中会需要。
- 它为被调度的模型块执行
- 通信(发送):
- 输出张量被发送到下一个物理阶段。
- 为了节省内存,已发送张量的
.data
会通过deallocate_output_tensor
立即被释放。张量对象本身被保留,但仅为其梯度函数(.grad_fn
),这对于自动求导至关重要。
在预热结束时,流水线中“充满”了正在处理中的激活值,系统已为高效的稳定状态做好准备。
阶段 2:稳定状态 - 1F1B 的心跳 (for k in range(num_microbatches_remaining)
)
目标: 在每个时钟节拍,精确地执行一次前向传播和一次后向传播,以最大化GPU利用率。
对于此循环中的每次迭代 k
,函数主要做两件事:
-
一次前向传播:
- 它计算
forward_k
虚拟微批次ID。 - 使用
schedule_table
,它为新的前向传播识别正确的model_chunk_id
和microbatch_id
。 - 它运行
forward_step_helper
,为这个新的微批次计算激活值。 - 它将产生的激活值发送到下一个阶段,并为未来的前向传播接收激活值。
- 它计算
-
一次后向传播:
- 它计算
backward_k
虚拟微批次ID。 - 它为旧的后向传播识别
model_chunk_id
。 - 它检索在许多步骤前的前向传播中保存的相应
input_tensor
和output_tensor
。 - 它从下一个阶段接收梯度(
output_tensor_grad
)。 - 它运行
backward_step_helper
,执行后向计算,计算关于输入的梯度(input_tensor_grad
)。 - 它将这个
input_tensor_grad
发送到前一个阶段。
- 它计算
这个1F1B过程是关键。GPU没有在等待;它在不断地为一个微批次/块的前向计算和另一个的后向计算之间切换。
阶段 3:冷却 (if not forward_only: ... for k in range(...)
)
目标: 将流水线中所有剩余的后向传播排空。不启动新的前向传播。
- 调度:循环继续,但现在只执行后向逻辑。
- 计算:它为那些已经完成前向传播但尚未完成其后向传播的剩余微批次调用
backward_step_helper
。 - 通信:它继续从下一个阶段接收梯度,并将计算出的梯度发送到前一个阶段,直到所有微批次都被完全处理。
4. 关键实现细节(“需谨慎”之处)
- 梯度同步 (
no_sync_func
):为避免通信开销,数据并行(DDP)的梯度同步通常在整个流水线执行期间被禁用。梯度在每个模型块本地累积。它们只在步骤的最后,或者当一个模型块完成了其所有微批次时才进行同步(all-reduce)。这由enable_grad_sync
和disable_grad_sync
辅助函数管理。 - 张量缓冲 (
input_tensors
,output_tensors
):该函数严重依赖列表的列表来缓冲张量。output_tensors[model_chunk_id]
存储了一个块的前向传播的激活输出,等待其相应的后向传播。这虽然消耗大量内存,但对于解耦前向和后向执行流是必要的。 - 通信重叠 (
overlap_p2p_comm
):为了获得更高性能,该函数可以使用非阻塞的点对点通信(isend
/irecv
)。这允许通信(发送/接收张量)在专用硬件上后台进行,与GPU的计算重叠,从而隐藏通信延迟。 - 内存释放 (
deallocate_output_tensor
):这是一个至关重要的优化。在一个激活张量被发送到下一个阶段后,当前GPU上就不再需要它的数据,直到后向传播。通过释放底层存储,它极大地减少了峰值内存占用,允许使用更大的模型或批处理大小。
使用方法
第一部分:模型切分在哪里决定?
切分点在两个地方定义:
1. 用户配置(命令行参数)
你告诉 Megatron 要将模型切分成多少块。主要的训练脚本(例如 pretrain_gpt.py
)为此提供了参数:
--pipeline-model-parallel-size <N>
:这是物理流水线阶段的数量,几乎总是对应于你用来切分模型的 GPU 数量。如果设置为 4,模型将被分布在 4 个 GPU 上。--virtual-pipeline-model-parallel-size <M>
:这是每个物理阶段被分割成的模型块(或虚拟阶段)的数量。这是forward_backward_pipelining_with_interleaving
调度的关键。
流水线引擎看到的“块”的总数是 N * M
。如果未设置此参数,模型将使用更简单的非交错式调度。
2. 模型架构(Python 代码)
在这里,你定义这些块是什么。流水线引擎不会自动切分一个标准的 nn.Module
。你,作为模型设计者,必须明确地将模型定义为一个模块列表。
forward_backward_pipelining_with_interleaving
函数期望 model
参数是一个 torch.nn.ModuleList
。此列表中的每个元素都被视为一个独立的“模型块”或虚拟流水线阶段。
示例:一个 GPT 风格的模型
一个典型的 GPT 模型在概念上可以分为三个部分:
- 一个嵌入层。
- 一系列 Transformer 解码器层。
- 一个最终的层归一化和输出层。
在模型的 __init__
方法中,你会将这些定义为独立的模块,然后将它们组合成一个 ModuleList
。
# 在你的模型的 __init__ 方法中...# 1. 定义模型的逻辑部分
self.embedding = Embedding(...)
self.decoder = TransformerBlockList(...) # 一个包含所有 transformer 层的模块
self.post_process = PostProcessLayer(...) # 例如,最终的 LayerNorm 和输出投影# 2. 为流水线引擎显式创建模型块列表
self.model_chunks = torch.nn.ModuleList([self.embedding,self.decoder,self.post_process
])
当你将此模型传递给训练框架时,流水线引擎会收到这个 ModuleList
。如果你设置了 --virtual-pipeline-model-parallel-size 3
,引擎会将 self.embedding
映射到第一个虚拟阶段,self.decoder
映射到第二个,self.post_process
映射到第三个。
第二部分:需要引入哪些操作?
为了使标准模型适应流水线并行,你需要引入几个关键的架构模式和方法。这不仅仅是切分层;你必须改变数据在模型中的流动方式。
以下是必要的操作和更改:
1. 继承自 megatron.core.models.MegatronModule
你的顶层模型类应该继承自 MegatronModule
,而不仅仅是 torch.nn.Module
。这个基类提供了框架其余部分所依赖的必要钩子和属性。
2. 将模型定义为 torch.nn.ModuleList
如上所述,这是最关键的结构性变化。流水线引擎不会遍历一个标准的 forward
方法;它会迭代你提供的 ModuleList
,一次执行一个块。
3. 实现 set_input_tensor(self, tensor)
流水线引擎需要一种方式将数据送入你的模型块。
- 第一个块从数据加载器接收数据。
- 每个后续的块接收来自不同 GPU 上前一个块输出的激活张量。
引擎通过在你的模型上调用一个 set_input_tensor
方法来实现这一点。你的模型必须实现此方法来存储传入的张量,该张量随后将在 forward
传导中使用。
class MyGPTModel(MegatronModule):def __init__(self, ...):# ... 模型定义 ...self._input_tensor = Nonedef set_input_tensor(self, tensor):"""从前一个流水线阶段接收张量。"""self._input_tensor = tensor
4. 实现一个专门的 forward
方法
你的模型块的 forward
方法会发生显著变化:
- 输入:它不再直接从数据加载器接收
data
作为参数。相反,它使用由流水线引擎刚刚设置的self._input_tensor
。 - 输出:它必须返回两样东西:
output_tensor
:前向计算的结果(要传递给下一阶段的激活值)。loss_func
:一个函数,当给定output_tensor
时,计算最终损失。这只与最后一个流水线阶段相关,但签名必须保持一致。
这是一个简化的概念示例:
# 在一个模型块模块中...
def forward(self, data_iterator, model_chunk): # 流水线引擎传递这些参数# 'data_iterator' 仅由第一阶段使用。# 所有其他阶段都使用由 `set_input_tensor` 设置的张量。# 实际的前向计算output = model_chunk(self._input_tensor) # 定义将由最后阶段使用的损失函数def loss_func(output_tensor):# ... 使用最终输出和来自 data_iterator 的标签计算损失 ...loss = F.cross_entropy(output_tensor, labels)return loss, {'lm_loss': loss}return output, loss_func
5. 与张量并行结合
对于一个真正庞大的模型,每个块内部的层也必须被并行化。这就是张量并行发挥作用的地方。
- 你需要将模型块内部的标准
torch.nn.Linear
层替换为megatron.core.tensor_parallel.ColumnParallelLinear
和RowParallelLinear
。 - 你需要将
torch.nn.Embedding
替换为megatron.core.tensor_parallel.VocabParallelEmbedding
。
流水线并行和张量并行协同工作:
- 流水线并行 垂直切分模型(跨层)。例如,第1-10层在GPU 0上,第11-20层在GPU 1上。
- 张量并行 水平切分模型(在单层内部)。例如,第1层的权重矩阵被切分到GPU 0和GPU 1上。
一个完整的实现包括将你的模型定义为一个 ModuleList
(用于PP),其中该列表中的模块本身是使用张量并行层构建的(用于TP)
SequenceParalle
对于序列非常长的情况, SequenceParalle是降低显存的首要选项.Megatron-LM的Sequence-Paralle的实现是直接调用了Transformer_Engine的TEDotProductAttention 函数, 同时做了一些参数和通信的管理.具体可以参考 : https://www.mltalks.com/posts/1017283893/#more.
目前主流实现CP的方式应该是RingAttention, 之前比较获得DeepSpeed-Ulysses 因为和TP存在一定的冲突而逐渐被淘汰. CP对于用户来说只需要在训练的时候指定context_parallel_size
.如果要自定义函数, 则需要调用TE的TEDotProductAttention ,或者研究TE相关代码.
- 核心配置参数
主要配置位置:
/megatron/core/model_parallel_config.py:44-53 - 基础配置参数
/megatron/core/transformer/transformer_config.py:534-548 - 详细通信配置
配置参数:
context_parallel_size: int = 1 # CP组大小
hierarchical_context_parallel_sizes: Optional[list[int]] = None # 分层CP大小
cp_comm_type: Optional[Union[str, List[str]]] = None # 通信类型配置
- 通信类型 (transformer_config.py:534-548)
四种通信模式:
- “p2p” - P2P通信
- 在环形拓扑中交换KV chunks
- 异步通信,可与attention计算重叠 - “all_gather” - All-gather通信
- 在attention前all-gather获取完整KV序列
- 非异步,无法重叠 - “a2a” - All-to-All通信 (类似DeepSpeed Ulysses)
- 将attention heads分散到CP组中
- gather获取完整QKV序列 - “a2a+p2p” - 分层实现
- 低级CP组使用A2A (如NVLink)
- 高级CP组使用P2P (如IBLink)
All-to-All转换 (mamba_context_parallel.py:285-357):# CP2HP: [seq/cp_size, batch, hidden] -> [seq, batch, hidden/cp_size]def _all_to_all_cp2hp(input_, cp_group)# HP2CP: [seq, batch, hidden/cp_size] -> [seq/cp_size, batch, hidden]def _all_to_all_hp2cp(input_, cp_group)Load Balancing (mamba_context_parallel.py:360-389):- _undo_attention_load_balancing() - 撤销负载均衡- _redo_attention_load_balancing() - 重新进行负载均衡
- 限制和要求
兼容性限制 (dot_product_attention.py:52-58):assert self.config.context_parallel_size == 1, \"Context parallelism is only supported by TEDotProductAttention!"assert self.config.window_size is None, \"Sliding Window Attention is only supported by TEDotProductAttention!"验证逻辑 (transformer_config.py:1230-1239):if self.context_parallel_size > 1 and self.cp_comm_type is not None:if isinstance(self.cp_comm_type, list):assert len(self.cp_comm_type) == self.num_layerselse:assert isinstance(self.cp_comm_type, str)
ExpertParallel
EP 是指将MOE的不同专家划分到不同的设备上.