vLLM - Worker
Worker实现了WorkerBase的接口,是GPU Worker的具体实现,代码位于:vllm\v1\worker\gpu_worker.py。
init_device
init_device完成:
- 设置当前device:torch.device(f"cuda:{self.local_rank}")
- 计算当前内存快照:MemorySnapshot()
- 计算用户配置的gpu_memory_utilization对应的GPU内存是否足够(比如gpu_memory_utilization=0.5表示要使用GPU 50%的内存)
- 调用init_worker_distributed_environment初始化分布式环境。
- 创建GPUModelRunner(self.vllm_config, self.device)用于执行模型的前向传播。
class Worker(WorkerBase):def init_device(self):if self.device_config.device.type == "cuda":...self.device = torch.device(f"cuda:{self.local_rank}")current_platform.set_device(self.device)...self.init_snapshot = MemorySnapshot()self.requested_memory = (self.init_snapshot.total_memory *self.cache_config.gpu_memory_utilization)if self.init_snapshot.free_memory < self.requested_memory:GiB = lambda b: round(b / GiB_bytes, 2)raise ValueError(...)else:...init_worker_distributed_environment(self.vllm_config, self.rank,self.distributed_init_method,self.local_rank,current_platform.dist_backend)self.model_runner: GPUModelRunner = GPUModelRunner(self.vllm_config, self.device)
get_model
get_model直接调用self.model_runner.get_model()。
class Worker(WorkerBase):def get_model(self) -> nn.Module:return self.model_runner.get_model()
load_model
load_model完成:
- 如果enable_sleep_mode,则使用allocator.use_memory_pool(tag=“weights”)作为Context;这样在load_model时,torch会使用vLLM自定义mem_pool来申请Tensor内存,并打上tag:“weights”,方便vLLM对Model Weights进行换入(sleep)和换出(wakeup)。
- 调用self.model_runner.load_model加载模型。
class Worker(WorkerBase):def load_model(self) -> None:if self.vllm_config.model_config.enable_sleep_mode:from vllm.device_allocator.cumem import CuMemAllocatorallocator = CuMemAllocator.get_instance()context = allocator.use_memory_pool(tag="weights")else:from contextlib import nullcontextcontext = nullcontext()eep_scale_up = os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1"with context:self.model_runner.load_model(eep_scale_up=eep_scale_up)
execute_model
execute_model执行一次模型前向传播:
- 如果不是PP的第一个RANK,则需要从前级RANK接收数据:get_pp_group().recv_tensor_dict(
all_gather_group=get_tp_group())。 - 调用self.model_runner.execute_model执行一次模型前向。
- 如果不是PP最后一个RANK,则需要将前向的结果发送给后级RANK:get_pp_group().send_tensor_dict(output.tensors, all_gather_group=get_tp_group())
- 如果是driver_worker,则返回结果:ModelRunnerOutput(如果不是PP的最后一个RANK,这里就是一个空结果)
@torch.inference_mode()def execute_model(self,scheduler_output: "SchedulerOutput",) -> Optional[ModelRunnerOutput]:intermediate_tensors = Noneif not get_pp_group().is_first_rank:intermediate_tensors = IntermediateTensors(get_pp_group().recv_tensor_dict(all_gather_group=get_tp_group()))output = self.model_runner.execute_model(scheduler_output,intermediate_tensors)parallel_config = self.vllm_config.parallel_configif parallel_config.distributed_executor_backend != "external_launcher" \and not get_pp_group().is_last_rank:get_pp_group().send_tensor_dict(output.tensors,all_gather_group=get_tp_group())empty_output = EMPTY_MODEL_RUNNER_OUTPUTif output.finished_sending or output.finished_recving:empty_output = copy.copy(empty_output)empty_output.finished_sending = output.finished_sendingempty_output.finished_recving = output.finished_recvingoutput = empty_outputreturn output if self.is_driver_worker else None
sleep
sleep用于休眠Worker实例,并释放对应的GPU内存,sleep有2个休眠等级:1和2:
- level = 1,只备份"weights"到CPU,适用于只需要恢复模型权重的场景。
- level = 2,备份model.named_buffers(),适用需要恢复完整模型的场景。
class Worker(WorkerBase):def sleep(self, level: int = 1) -> None:from vllm.device_allocator.cumem import CuMemAllocator...# Save the buffers before level 2 sleepif level == 2:model = self.model_runner.modelself._sleep_saved_buffers = {name: buffer.cpu().clone()for name, buffer in model.named_buffers()}allocator = CuMemAllocator.get_instance()allocator.sleep(offload_tags=("weights", ) if level == 1 else tuple())...
wake_up
wake_up实现sleep释放模型GPU内存之后的恢复:
class Worker(WorkerBase):def wake_up(self, tags: Optional[list[str]] = None) -> None:from vllm.device_allocator.cumem import CuMemAllocatorallocator = CuMemAllocator.get_instance()allocator.wake_up(tags)# Restore the buffers after level 2 sleepif len(self._sleep_saved_buffers):model = self.model_runner.modelfor name, buffer in model.named_buffers():if name in self._sleep_saved_buffers:buffer.data.copy_(self._sleep_saved_buffers[name].data)self._sleep_saved_buffers = {}
determine_available_memory
determine_available_memory计算KV Cache可以使用的内存大小:
- 调用self.model_runner.profile_run()执行一次输入token数最大场景下的模型前向传播,profile出模型不包含KV Cache的GPU显存峰值。
- 再计算KV Cache总共可用的GPU显存大小:available_kv_cache_memory = self.requested_memory - profile_result.non_kv_cache_memory
class Worker(WorkerBase):@torch.inference_mode()def determine_available_memory(self) -> int:torch.cuda.empty_cache()torch.cuda.reset_peak_memory_stats()GiB = lambda b: b / GiB_byteswith memory_profiling(self.init_snapshot,weights_memory=int(self.model_runner.model_memory_usage)) as profile_result:self.model_runner.profile_run()free_gpu_memory = profile_result.after_profile.free_memoryavailable_kv_cache_memory = self.requested_memory \- profile_result.non_kv_cache_memory...return int(available_kv_cache_memory)
initialize_from_config
initialize_from_config基于配置kv_cache_config来初始化KV Cache:
- 如果enable_sleep_mode,则使用allocator.use_memory_pool(tag=“kv_cache”)作为Context,这样在initialize_kv_cache时,torch会使用vLLM自定义mem_pool来申请Tensor内存,并打上tag:“kv_cache”。
class Worker(WorkerBase):def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:if self.vllm_config.model_config.enable_sleep_mode:from vllm.device_allocator.cumem import CuMemAllocatorallocator = CuMemAllocator.get_instance()context = allocator.use_memory_pool(tag="kv_cache")else:from contextlib import nullcontextcontext = nullcontext()with context:self.model_runner.initialize_kv_cache(kv_cache_config)
compile_or_warm_up_model
compile_or_warm_up_model编译和预热模型:
- 根据配置生成warmup_sizes列表(不同的batch size)
- 调用self.model_runner._dummy_run使用不同的batch size来执行模型。
- 调用 self.model_runner.capture_model()捕获模型为CUDA Graph。
- 如果是PP的最后一个RANK,则调用self.model_runner._dummy_sampler_run来预热Pooler或者Sampler。
- 调用 set_random_seed(self.model_config.seed)重置随机种子。
class Worker(WorkerBase):def compile_or_warm_up_model(self) -> None:warmup_sizes = self.vllm_config.compilation_config.compile_sizes.copy()if not self.model_config.enforce_eager:warmup_sizes = [x for x in warmup_sizes if x not inself.vllm_config.compilation_config.cudagraph_capture_sizes]for size in sorted(warmup_sizes, reverse=True):self.model_runner._dummy_run(size, skip_eplb=True)if not self.model_config.enforce_eager:self.model_runner.capture_model()if get_pp_group().is_last_rank:max_num_reqs = min(self.scheduler_config.max_num_seqs,self.scheduler_config.max_num_batched_tokens)hidden_states, last_hidden_states = \self.model_runner._dummy_run(num_tokens=max_num_reqs,skip_eplb=True,)if self.model_runner.is_pooling_model:self.model_runner._dummy_pooler_run(hidden_states)else:self.model_runner._dummy_sampler_run(hidden_states=last_hidden_states)set_random_seed(self.model_config.seed)
reinitialize_distributed
reinitialize_distributed重新初始化和配置分布式设置:
- 计算new_ep_size = DP Size * TP Size * PP size
- 如果new_ep_size < old_ep_size,则调用self._eplb_before_scale_down(old_ep_size, new_ep_size)缩小EPLB资源池。反之则调用self._eplb_after_scale_up(old_ep_size, new_ep_size, global_expert_load)增大EPLB资源池。
- 调用self._reconfigure_parallel_config(reconfig_request)重配并行配置。
class Worker(WorkerBase):def reinitialize_distributed(self, reconfig_request: ReconfigureDistributedRequest) -> None:old_ep_size = get_ep_group().world_sizeold_ep_rank = get_ep_group().ranknew_ep_size = reconfig_request.new_data_parallel_size * get_tp_group().world_size * get_pp_group().world_sizeif new_ep_size < old_ep_size:self._eplb_before_scale_down(old_ep_size, new_ep_size)cleanup_dist_env_and_memory()if reconfig_request.new_data_parallel_rank == \ReconfigureRankType.SHUTDOWN_CURRENT_RANK:assert old_ep_rank >= new_ep_size# shutdownreturnself._reconfigure_parallel_config(reconfig_request)with set_current_vllm_config(self.vllm_config):init_worker_distributed_environment(self.vllm_config, self.rank,self.distributed_init_method,self.local_rank)global_expert_load = self._reconfigure_moe(old_ep_size, new_ep_size)if new_ep_size > old_ep_size:assert global_expert_load is not Noneself._eplb_after_scale_up(old_ep_size, new_ep_size,global_expert_load)