vLLM - LLMEngine
LLMEngine是vLLM的核心,它接收输入,通过语言模型生成输出,并将结果返回给客户端。离线批量推理的LLM,在线服务的AsyncLLMEngine都通过封装LLMEngine对外提供推理服务。
LLMEngine有3个关键组成部分:
- Processor(Tokenizer):将输入文本转换为tokens。
- OutputProcessor(Detokenizer):将token转换为文本。
- EngineCoreClient:模型代理,支持Language Model和Pooling Model,支持分布式部署。
LLMEngine有v0和v1两个版本的实现,目前主流是使用的v1版本,源代码位于:vllm/vllm/v1/engine/llm_engine.py
__init__
LLMEngine的__init__完成:
- 调用stateless_init_dp_group初始化DP通信的Group
- 调用init_tokenizer_from_configs初始化Tokenizer
- 创建Processor,用于将Inputs转换为EngineCoreRequest(主要是Tokenize)。
- 创建OutputProcessor,用于将EngineCoreOutput转换为RequestOutput(主要是Detokenize)。
- 调用EngineCoreClient.make_client创建EngineCoreClient。
- 调用reset_mm_cache重置Multi-Modal Cache。
class LLMEngine:def __init__(self,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,stat_loggers: Optional[list[StatLoggerFactory]] = None,mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,use_cached_outputs: bool = False,multiprocess_mode: bool = False,) -> None:...parallel_config = vllm_config.parallel_configif not multiprocess_mode and parallel_config.data_parallel_size > 1:self.dp_group = parallel_config.stateless_init_dp_group()else:self.dp_group = Noneself.should_execute_dummy_batch = Falseself.tokenizer = init_tokenizer_from_configs(model_config=vllm_config.model_config,scheduler_config=vllm_config.scheduler_config,lora_config=vllm_config.lora_config)self.processor = Processor(vllm_config=vllm_config,tokenizer=self.tokenizer,mm_registry=mm_registry)self.output_processor = OutputProcessor(self.tokenizer,log_stats=self.log_stats)self.engine_core = EngineCoreClient.make_client(multiprocess_mode=multiprocess_mode,asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=self.log_stats,)if not multiprocess_mode:self.model_executor = self.engine_core.engine_core.model_executor self.reset_mm_cache()
add_request
add_request实现添加一个新的request:
- 调用self.processor.process_inputs将Inputs转换为EngineCoreRequest
- 如果是Sampling场景,根据SamplingParams的输出序列数为n生成n个EngineCoreRequest;如果是Pooling场景,则只有1个EngineCoreRequest。
- 将生成的所有EngineCoreRequest分别添加到self.output_processor和self.engine_core,用于后续step处理。
class LLMEngine:def add_request(self,request_id: str,prompt: PromptType,params: Union[SamplingParams, PoolingParams],arrival_time: Optional[float] = None,lora_request: Optional[LoRARequest] = None,tokenization_kwargs: Optional[dict[str, Any]] = None,trace_headers: Optional[Mapping[str, str]] = None,prompt_adapter_request: Optional[PromptAdapterRequest] = None,priority: int = 0,) -> None:prompt_str, request = self.processor.process_inputs(request_id, prompt, params, arrival_time, lora_request,tokenization_kwargs, trace_headers, prompt_adapter_request,priority)n = params.n if isinstance(params, SamplingParams) else 1if n == 1:self.output_processor.add_request(request, prompt_str, None, 0)self.engine_core.add_request(request)returnparent_req = ParentRequest(request_id, params)for idx in range(n):request_id, params = parent_req.get_child_info(idx)child_request = request if idx == n - 1 else copy(request)child_request.request_id = request_idchild_request.sampling_params = paramsself.output_processor.add_request(child_request, prompt_str,parent_req, idx)self.engine_core.add_request(child_request)
abort_request
abort_request用于中止请求:
- 分别中止self.output_processor和self.engine_core中对应的请求。
class LLMEngine:def abort_request(self, request_ids: list[str]) -> None:request_ids = self.output_processor.abort_requests(request_ids)self.engine_core.abort_requests(request_ids)
step
step执行一次解码迭代:
- 如果是dummy batch,则直接执行一次dummy_batch(通常用于DP Group同步等待)。
- 调用self.engine_core.get_output()获取EngineCoreOutput列表
- 调用self.output_processor.process_outputs将EngineCoreOutput列表转换为OutputProcessorOutput。
- 如果请求需要中止,则调用self.engine_core.abort_requests中止对应的请求。
- 返回processed_outputs.request_outputs,即RequestOutput/PoolingRequestOutput列表
class LLMEngine:def step(self) -> Union[list[RequestOutput], list[PoolingRequestOutput]]:if self.should_execute_dummy_batch:self.should_execute_dummy_batch = Falseself.engine_core.execute_dummy_batch()return []outputs = self.engine_core.get_output()iteration_stats = IterationStats() if self.log_stats else Noneprocessed_outputs = self.output_processor.process_outputs(outputs.outputs,engine_core_timestamp=outputs.timestamp,iteration_stats=iteration_stats)# 3) Abort any reqs that finished due to stop strings.self.engine_core.abort_requests(processed_outputs.reqs_to_abort)...return processed_outputs.request_outputs
OutputProcessorOutput定义:
@dataclass
class OutputProcessorOutput:request_outputs: list[Union[RequestOutput, PoolingRequestOutput]]reqs_to_abort: list[str]