【vllm】源码解读:DeepSeekV2 DP Rank 专家加载与分配机制
1. 专家分配机制
静态分配算法
在 fused_moe/layer.py:818-846 中,vLLM 使用静态分配算法来确定每个 DP rank 负责哪些专家:
def get_expert_assignment(ep_rank, ep_size, global_num_experts, expert_placement_strategy="linear"):"""计算每个rank的专家分配"""# 1. 计算基础分配base_experts = global_num_experts // ep_sizeremainder = global_num_experts % ep_size# 2. 确定每个rank的专家数if ep_rank < remainder:local_num_experts = base_experts + 1else:local_num_experts = base_experts# 3. 创建专家映射表expert_map = torch.full((global_num_experts,), -1, dtype=torch.int32)# -1 表示专家不在本rank# 4. 分配策略if expert_placement_strategy == "linear":# 连续分配:rank 0 负责专家 0-31,rank 1 负责专家 32-63start_idx = ep_rank * base_experts + min(ep_rank, remainder)expert_map[start_idx:start_idx + local_num_experts] = torch.arange(0, local_num_experts, dtype=torch.int32)elif expert_placement_strategy == "round_robin":# 轮询分配:rank 0 负责专家 0,8,16,24...,rank 1 负责专家 1,9,17,25...local_log_experts = torch.arange(ep_rank, global_num_experts, ep_size, dtype=torch.int32)expert_map[local_log_experts] = torch.arange(0, local_num_experts, dtype=torch.int32)return (local_num_experts, expert_map)
DP=8 + EP=8 示例
假设总共有 256 个专家,DP=8,EP=8:
# 专家分配结果
DP Rank 0: Expert 0-31 (32个专家)
DP Rank 1: Expert 32-63 (32个专家)
DP Rank 2: Expert 64-95 (32个专家)
DP Rank 3: Expert 96-127 (32个专家)
DP Rank 4: Expert 128-159 (32个专家)
DP Rank 5: Expert 160-191 (32个专家)
DP Rank 6: Expert 192-223 (32个专家)
DP Rank 7: Expert 224-255 (32个专家)
2. 专家加载过程
初始化阶段
在 DeepSeekV2MoE.__init__() 中:
class DeepSeekV2MoE(nn.Module):def __init__(self, config, parallel_config, quant_config, prefix=""):# 1. 获取EP信息self.ep_group = get_ep_group().device_groupself.ep_rank = self.ep_group.rank()self.ep_size = self.ep_group.size()# 2. 计算专家分配self.n_local_physical_experts = self.n_physical_experts // self.ep_sizeself.physical_expert_start = self.ep_rank * self.n_local_physical_expertsself.physical_expert_end = self.physical_expert_start + self.n_local_physical_experts# 3. 创建FusedMoE,内部会调用get_expert_assignment()self.experts = FusedMoE(...)
权重加载阶段
在 FusedMoE.load_weights() 中:
def load_weights(self, weights, expert_id, weight_name, shard_id, return_success=False):# 1. 检查专家是否在本rankexpert_id = self._map_global_expert_id_to_local_expert_id(expert_id)if expert_id == -1:# 专家不在本rank,跳过加载return False if return_success else None# 2. 加载专家权重expert_data = param.data[expert_id]# ... 加载权重逻辑return True if return_success else None
3. 专家不在本rank的处理
All-to-All 通信机制
当需要的专家不在本rank时,vLLM 使用 All-to-All 通信来处理:
# 在 FusedMoE.forward_impl() 中
def forward_impl(self, hidden_states, router_logits):# 1. 专家选择(所有rank都执行)topk_weights, topk_ids = select_experts(...)# 2. All-to-All 分发if do_naive_dispatch_combine:hidden_states, router_logits = get_ep_group().dispatch(hidden_states, router_logits, self.is_sequence_parallel)# 3. 本地专家计算final_hidden_states = self.quant_method.apply(...)# 4. All-to-All 收集if do_naive_dispatch_combine:states = get_ep_group().combine(states, self.is_sequence_parallel)return final_hidden_states
All-to-All 通信步骤
步骤1:数据分发 (Dispatch)
def dispatch(hidden_states, router_logits, is_sequence_parallel):"""将token数据发送到有对应专家的rank"""# 1. 根据topk_ids确定目标ranktarget_ranks = topk_ids // experts_per_rank# 2. 将token数据发送到对应rankfor rank in target_ranks:send_data_to_rank(rank, token_data)# 3. 接收其他rank发送来的数据received_data = receive_data_from_other_ranks()return received_data
步骤2:本地计算
# 每个rank执行本地专家的前向传播
def local_expert_computation(received_data):"""使用本地存储的专家权重进行计算"""# 1. 根据topk_ids选择本地专家local_expert_ids = filter_local_experts(topk_ids)# 2. 执行本地专家的前向传播local_outputs = []for expert_id in local_expert_ids:expert_output = local_experts[expert_id](received_data)local_outputs.append(expert_output)return local_outputs
步骤3:结果收集 (Combine)
def combine(local_outputs, is_sequence_parallel):"""将计算结果发送回原始rank"""# 1. 将计算结果发送回原始rankfor rank in original_ranks:send_results_to_rank(rank, local_outputs)# 2. 接收所有专家的结果all_expert_outputs = receive_all_results()# 3. 根据topk_weights加权求和final_output = weighted_sum(all_expert_outputs, topk_weights)return final_output
4. 具体示例
假设 DP Rank 0 需要处理需要 Expert 5 和 Expert 50 的token:
# 1. 专家分配
DP Rank 0: Expert 0-31 (本地有Expert 5)
DP Rank 1: Expert 32-63 (本地有Expert 50)# 2. 路由计算(所有rank都执行)
router_logits = gate(hidden_states)
topk_ids = [5, 50] # 所有rank都得到相同结果# 3. All-to-All 分发
DP Rank 0: 发送token到DP Rank 1 (Expert 50)
DP Rank 1: 发送token到DP Rank 0 (Expert 5)# 4. 本地计算
DP Rank 0: 计算Expert 5
DP Rank 1: 计算Expert 50# 5. All-to-All 收集
DP Rank 0: 收集Expert 5和Expert 50的结果
根据topk_weights加权求和
5. 关键理解点
-
专家分配是静态的:
- 在初始化时确定,运行时不变
- 每个rank只加载自己负责的专家权重
-
路由计算是全局的:
- 所有rank都执行相同的路由计算
- 所有rank都得到相同的topk_ids
-
通信是动态的:
- 根据topk_ids动态决定数据流向
- 使用All-to-All进行数据交换
-
负载均衡:
- 专家均匀分布到各个rank
- 通过All-to-All实现负载均衡
-
容错机制:
- 如果某个rank失败,其他rank可以接管
- 通过EPLB实现动态负载均衡
6. 总结
在 deepseek_v2.py 中:
- 专家分配机制:通过
get_expert_assignment()静态分配专家到各个rank - 专家加载策略:每个rank只加载自己负责的专家权重,跳过不相关的专家
- 专家不在本rank的处理:通过All-to-All通信将数据发送到有对应专家的rank
- 通信流程:Dispatch → 本地计算 → Combine,实现跨rank的专家计算
- 负载均衡:专家均匀分布,通过All-to-All实现动态负载均衡
- 容错机制:支持EPLB动态负载均衡和rank故障恢复
这种机制确保了每个DP rank都知道自己要加载哪些专家,并且能够通过All-to-All通信处理专家不在本rank的情况,实现了高效的专家并行计算。
