当前位置: 首页 > news >正文

【vLLM】源码解读:高性能大语言模型推理引擎的工程设计与实现

vLLM:高性能大语言模型推理引擎的工程设计与实现

在这里插入图片描述

引言

vLLM(Very Large Language Model)是一个专为大语言模型推理优化的高性能引擎,其设计目标是在保证推理质量的前提下,最大化吞吐量并优化内存使用效率。本文将从工程角度深入分析vLLM的设计思路、核心架构以及执行流程,帮助读者理解这个优秀开源项目的技术精髓。

1. 整体框架架构原理图

vLLM采用了清晰的分层架构设计,每一层都有明确的职责和接口。让我们首先通过整体架构图来理解系统的全貌:

1.1 系统整体架构图

在这里插入图片描述

架构层次说明:

  • 用户接口层 (LLM):提供用户友好的API接口,支持多种任务类型
  • 引擎协调层 (LLMEngine):核心协调者,管理EngineCoreClient和EngineCore
  • 调度层 (Scheduler):智能调度系统,实现连续批处理和PagedAttention
  • 执行层 (Executor):分布式执行抽象,支持多进程和Ray分布式
  • 工作器层 (Worker):具体的工作进程,负责设备管理和模型加载
  • 模型运行层 (ModelRunner):模型推理执行器,处理实际的模型计算

1.2 核心设计理念

1.2.1 以LLM类为入口的用户友好设计

vLLM的设计从用户体验出发,以LLM类作为整个系统的统一入口点:

# 简单的使用示例
llm = LLM(model="meta-llama/Llama-2-7b-hf")
outputs = llm.generate(prompts, sampling_params)

LLM类的设计体现了几个重要的工程原则:

  1. 单一职责原则:LLM类专注于提供高层API,将复杂的引擎管理委托给LLMEngine
  2. 依赖注入:通过EngineArgs配置对象注入各种参数,实现了配置与实现的解耦
  3. 适配器模式:支持多种任务类型(生成、嵌入、分类、评分等),通过统一接口适配不同的使用场景
1.2.2 分层架构的优势

vLLM的分层架构设计带来了以下优势:

  1. 职责分离:每一层都有明确的职责边界,便于维护和扩展
  2. 接口标准化:层与层之间通过标准接口通信,降低耦合度
  3. 可插拔性:支持不同层次的组件替换和扩展
  4. 可测试性:每层都可以独立测试和验证
  5. 可扩展性:新功能可以在合适的层次添加,不影响其他层次

2. 模型初始化阶段

模型初始化阶段是vLLM系统启动的关键环节,它负责设置和加载模型,准备推理环境。让我们通过详细的流程图来理解这个过程:

2.1 初始化阶段流程图

在这里插入图片描述

2.2 初始化阶段详细解析

初始化阶段是vLLM系统启动的关键环节,整个过程可以分为以下几个主要步骤:

2.2.1 用户接口层初始化

1. 用户创建LLM实例

llm = LLM(model="meta-llama/Llama-2-7b-hf",tensor_parallel_size=4,gpu_memory_utilization=0.9
)

2. 解析EngineArgs配置

  • 参数验证和类型检查
  • 配置聚合和默认值处理
  • 兼容性验证
2.2.2 引擎协调层初始化

1. 创建LLMEngine

self.llm_engine = LLMEngine.from_engine_args(engine_args=engine_args, usage_context=UsageContext.LLM_CLASS
)

2. 初始化核心组件

  • Tokenizer初始化:分词器配置和加载
  • EngineCoreClient创建:多模式执行抽象
  • Processor创建:输入处理模块
  • OutputProcessor创建:输出处理模块
2.2.3 核心系统初始化

1. 创建EngineCore

self.engine_core = EngineCore(vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats
)

2. 初始化KV缓存系统

  • 内存分析和配置
  • KV缓存块分配
  • 预热模型执行

3. 初始化调度器

  • 智能调度系统设置
  • 连续批处理配置
  • PagedAttention初始化
2.2.4 执行层初始化

1. 创建Executor

self.model_executor = executor_class(vllm_config)

2. 初始化Worker进程

  • 多进程创建和管理
  • 进程间通信设置
  • 消息队列初始化
2.2.5 模型层初始化

1. 设备初始化

  • CUDA设备设置
  • 分布式环境初始化
  • 设备内存管理

2. 创建ModelRunner

self.model_runner = GPUModelRunner(vllm_config, device
)

3. 加载模型权重

  • 模型实例化
  • 权重加载和验证
  • CUDA图初始化
2.2.6 初始化完成

系统完成初始化后,所有组件都准备就绪,可以开始处理推理请求。

初始化关键特性:

  • 延迟加载:模型和KV缓存按需初始化
  • 并行初始化:多个组件可并行创建
  • 错误恢复:初始化失败时的回滚机制
  • 资源管理:内存和GPU资源的智能分配
  • 配置验证:参数完整性和兼容性检查

初始化性能指标:

  • 模型加载:3-10秒(取决于模型大小)
  • KV缓存初始化:1-3秒
  • Worker进程创建:0.5-2秒
  • 总初始化时间:5-15秒
  • 内存占用:模型大小+缓存空间

3. 模型推理阶段

模型推理阶段是vLLM性能优化的核心,它需要将多个不同的prompt高效地组织成batch并执行推理。这个过程涉及复杂的调度、内存管理和并行计算。

3.1 推理阶段流程图

在这里插入图片描述

3.2 推理阶段详细解析

推理阶段描述了从用户调用generate开始,到最终得出结果的完整流程:

3.2.1 请求处理阶段

1. 用户调用generate

async for output in llm.generate(prompts, sampling_params):print(output)

2. AsyncLLM.generate处理

  • 启动output_handler后台循环
  • 验证截断参数
  • 调用add_request添加请求

3. add_request处理

  • 创建输出收集器
  • 处理n>1的情况(生成多个候选)
  • 转换输入为请求格式

4. EngineCore.add_request_async

  • 验证request_id类型
  • 验证池化参数
  • 检查KV传输参数

5. Scheduler.add_request

  • 设置WAITING状态
  • 添加到等待队列
  • 记录统计信息
3.2.2 调度执行阶段

1. output_handler后台循环

  • 持续从EngineCore拉取输出
  • 处理输出并推送到相应队列
  • 异步非阻塞处理

2. EngineCore.step_async

  • 检查调度器中是否有请求
  • 调度请求,创建SchedulerOutput
  • 执行模型推理
  • 更新调度器状态

3. Scheduler.schedule

  • 智能调度请求
  • 创建批处理
  • KV缓存分配
  • 抢占机制处理

4. ModelExecutor.execute_model

  • RPC调用Worker
  • 聚合输出结果
  • 处理分布式执行

5. Worker.execute_model

  • 调用ModelRunner
  • 处理模型推理
  • 返回推理结果

6. ModelRunner.execute_model

  • 预处理输入数据
  • 执行模型前向传播
  • 后处理输出
3.2.3 结果处理阶段

1. 模型推理执行

  • Transformer计算
  • 注意力机制
  • 输出生成

2. 后处理输出

  • 采样解码
  • 结果生成
  • 格式化输出

3. Scheduler.update_from_output

  • 更新请求状态
  • 清理完成请求
  • 释放资源

4. 返回EngineCoreOutputs

  • 推理结果封装
  • 状态信息
  • 错误处理

5. OutputProcessor.process_outputs

  • 处理引擎输出
  • 推送到队列
  • 清理资源

6. 用户获得结果

  • AsyncGenerator流式输出
  • 实时返回生成结果
  • 完成推理流程

3.3 推理阶段关键特性

推理阶段关键特性:

  • 异步处理:完全异步的API设计
  • 连续批处理:动态组织不同长度序列
  • PagedAttention:高效内存管理
  • 推测解码:并行生成多个token
  • 流式输出:实时返回生成结果

推理性能指标:

  • 首token延迟:50-200ms
  • 生成速度:20-100 tokens/s
  • 批处理效率:2-4倍提升
  • 内存利用率:80-95%
  • 并发处理:支持数百个请求

技术实现细节:

  • 后台循环:output_handler持续处理
  • RPC通信:ZMQ高效进程间通信
  • 批处理优化:动态调整batch大小
  • CUDA图:减少CPU-GPU同步开销
  • 内存管理:智能KV缓存分配

优化策略:

  • 连续批处理:消除等待时间
  • 抢占机制:内存不足时智能抢占
  • 流水线并行:重叠计算和通信
  • 推测解码:提前生成候选token
  • 内存预分配:减少动态分配开销

4. 核心组件深度解析

4.1 vLLM架构层次深度解析

根据vLLM的整体架构设计,系统可以分为五个主要层次,每个层次都有明确的职责和接口。让我们从顶层到底层逐一分析:

4.1.1 架构层次概览

vLLM采用了清晰的分层架构设计,每一层都有明确的职责边界:

LLM层 - 用户接口
LLMEngine层 - 引擎协调
Executor层 - 执行抽象
Worker层 - 工作进程
ModelRunner层 - 模型运行
最底层模型处理

各层次职责:

  • LLM层:用户友好的API接口,参数验证和结果处理
  • LLMEngine层:引擎协调,包含EngineCoreClient和EngineCore
  • Executor层:分布式执行抽象,支持多种执行模式
  • Worker层:具体的工作进程,设备管理和模型加载
  • ModelRunner层:模型运行器,实际的推理执行
  • 最底层:模型权重和计算内核
4.1.2 系统执行流程:从初始化到推理

根据架构图,vLLM的执行流程可以分为两个主要阶段:初始化阶段推理阶段。让我们详细分析这两个阶段的数据流和控制流:

初始化阶段(Initialization Phase)

初始化阶段负责设置和加载模型,准备推理环境:

初始化阶段
LLM层
EngineArgs
LLMEngine
Tokenizer
model_executor
init_kv_cache
scheduler
init_worker
create_worker
init_device
load_model
ModelRunner
get_model
最底层模型处理

初始化阶段的关键步骤:

  1. LLM层初始化

    • 解析用户参数,创建EngineArgs配置对象
    • 初始化Tokenizer分词器
  2. LLMEngine层初始化

    • 创建model_executor(实际是EngineCore)
    • 初始化KV缓存系统(init_kv_cache)
    • 设置调度器(scheduler)
  3. Executor层初始化

    • 通过init_worker初始化工作进程
  4. Worker层初始化

    • create_worker:创建工作进程
    • init_device:初始化GPU设备
    • load_model:加载模型权重
  5. ModelRunner层初始化

    • 创建ModelRunner实例
    • 完成设备初始化和模型加载
  6. 最底层模型处理

    • get_model:获取模型实例
    • 模型准备就绪,可以开始推理
推理阶段(Inference Phase)

推理阶段描述了接收请求、调度、执行模型推理并返回结果的完整流程:

推理阶段
generate
llm_engine/add_request
llm_engine/run_engine
SequenceGroup
Scheduler/add_seq_group
step
executer_model
execute_model
model_runner
最底层模型处理

推理阶段的关键步骤:

  1. LLM层推理

    • generate:用户调用生成接口
  2. LLMEngine层推理

    • add_request:将请求添加到引擎
    • run_engine:运行引擎执行推理
    • 请求被封装成SequenceGroup
  3. 调度和执行

    • Scheduler/add_seq_group:调度器添加序列组
    • step:执行一步推理(EngineCore的核心方法)
  4. Executor层执行

    • executer_model:执行器处理模型执行
  5. Worker层执行

    • execute_model:实际的模型执行操作
  6. ModelRunner层执行

    • model_runner:模型运行器完成推理任务
  7. 最底层处理

    • 返回推理结果
4.1.3 EngineCore和EngineCoreClient在架构中的位置

根据架构图,我们可以清楚地看到EngineCore和EngineCoreClient在整个系统中的位置:

EngineCoreClient的位置

  • 位于LLMEngine层,作为LLMEngine与EngineCore交互的接口
  • 封装了与底层执行逻辑的通信细节
  • 支持多种执行模式(进程内、多进程、异步多进程)

EngineCore的位置

  • 位于LLMEngine层与Executor层之间,作为协调者
  • 在初始化阶段:协调init_kv_cache和scheduler的初始化
  • 在推理阶段:协调Scheduler和executer_model的工作
  • 通过step方法驱动整个推理流程
4.1.4 模块间协作机制
用户请求
Processor输入处理
EngineCore核心执行
OutputProcessor输出处理
StatLoggerManager统计记录
返回结果给用户
VllmConfig配置管理
Tokenizer分词器
4.1.5 设计优势
  1. 模块化设计:每个模块职责单一,易于维护和扩展
  2. 松耦合架构:模块间通过标准接口通信,降低依赖
  3. 可插拔组件:支持自定义处理器、执行器等组件
  4. 统一配置:通过VllmConfig统一管理所有配置
  5. 可观测性:内置完整的监控和统计功能

4.2 LLM层:用户接口的艺术

LLM类是整个系统的门面,其设计充分体现了"易用性"和"功能完整性"的平衡:

4.2.1 核心参数详解

LLM类的构造函数提供了丰富的参数配置,涵盖了模型加载、内存管理、并行策略等各个方面:

class LLM:def __init__(self,# 基础模型配置model: str,                                    # 模型名称或路径tokenizer: Optional[str] = None,               # 分词器路径tokenizer_mode: TokenizerMode = "auto",        # 分词器模式dtype: ModelDType = "auto",                    # 数据类型quantization: Optional[QuantizationMethods] = None,  # 量化方法# 并行策略配置tensor_parallel_size: int = 1,                 # 张量并行大小pipeline_parallel_size: int = 1,               # 流水线并行大小data_parallel_size: int = 1,                   # 数据并行大小# 内存管理配置gpu_memory_utilization: float = 0.9,           # GPU内存利用率swap_space: float = 4,                         # CPU交换空间(GB)cpu_offload_gb: float = 0,                     # CPU卸载大小(GB)kv_cache_memory_bytes: Optional[int] = None,   # KV缓存内存大小# 执行控制配置enforce_eager: bool = False,                   # 强制eager执行seed: Optional[int] = None,                    # 随机种子# 安全配置trust_remote_code: bool = False,               # 信任远程代码allowed_local_media_path: str = "",            # 允许的本地媒体路径allowed_media_domains: Optional[list[str]] = None,  # 允许的媒体域名# 高级配置hf_token: Optional[Union[bool, str]] = None,   # HuggingFace令牌hf_overrides: Optional[HfOverrides] = None,    # HuggingFace配置覆盖compilation_config: Optional[Union[int, dict, CompilationConfig]] = None,  # 编译配置**kwargs: Any,  # 其他EngineArgs参数):
4.1.2 参数分类说明

1. 模型基础配置

  • model: 指定要加载的模型,支持HuggingFace模型名称或本地路径
  • tokenizer: 自定义分词器路径,默认使用模型自带的分词器
  • dtype: 模型权重和激活的数据类型,支持float32float16bfloat16
  • quantization: 量化方法,支持awqgptqfp8

2. 并行策略配置

  • tensor_parallel_size: 张量并行度,将模型权重分布到多个GPU
  • pipeline_parallel_size: 流水线并行度,将模型层分布到多个GPU
  • data_parallel_size: 数据并行度,复制模型到多个GPU处理不同批次

3. 内存管理配置

  • gpu_memory_utilization: GPU内存利用率,控制KV缓存大小
  • swap_space: CPU交换空间,用于存储被抢占的序列状态
  • cpu_offload_gb: CPU卸载大小,将模型权重卸载到CPU内存
  • kv_cache_memory_bytes: 精确控制KV缓存内存大小

4. 执行控制配置

  • enforce_eager: 强制使用eager模式,禁用CUDA图优化
  • seed: 随机种子,确保结果可重现
  • compilation_config: 编译优化配置,支持不同级别的优化

5. 安全与权限配置

  • trust_remote_code: 是否信任远程代码执行
  • allowed_local_media_path: 允许访问的本地媒体文件路径
  • allowed_media_domains: 允许的多模态输入域名白名单
4.1.3 使用示例
# 基础使用 - 单GPU推理
llm = LLM(model="meta-llama/Llama-2-7b-hf",dtype="float16",gpu_memory_utilization=0.8
)# 多GPU张量并行
llm = LLM(model="meta-llama/Llama-2-70b-hf",tensor_parallel_size=4,  # 使用4个GPU进行张量并行dtype="bfloat16",gpu_memory_utilization=0.9
)# 量化模型推理
llm = LLM(model="meta-llama/Llama-2-7b-hf",quantization="awq",  # 使用AWQ量化dtype="float16"
)# 内存优化配置
llm = LLM(model="meta-llama/Llama-2-13b-hf",gpu_memory_utilization=0.7,swap_space=8,  # 8GB CPU交换空间cpu_offload_gb=4,  # 4GB CPU卸载enforce_eager=True  # 禁用CUDA图以节省内存
)# 多模态模型配置
llm = LLM(model="microsoft/Phi-3-vision-128k-instruct",allowed_local_media_path="/path/to/images",allowed_media_domains=["example.com"],mm_processor_kwargs={"num_crops": 4} # num_crops 参数控制多模态输入的裁剪数量
)
4.2.4 多模态任务支持
class LLM:def generate(self, prompts, sampling_params=None, **kwargs):"""文本生成任务"""def embed(self, prompts, **kwargs):"""嵌入向量生成"""def classify(self, prompts, **kwargs):"""分类任务"""def score(self, data_1, data_2, **kwargs):"""相似度评分"""
4.2.5 LLM类内部逻辑深度解析

LLM类的设计体现了复杂系统的优雅抽象,其内部逻辑可以分为几个关键阶段:

1. 初始化阶段:系统组装
class LLM:def __init__(self, model: str, **kwargs):# 配置对象构建,将所有参数对象化engine_args = EngineArgs(model=model,tensor_parallel_size=tensor_parallel_size,gpu_memory_utilization=gpu_memory_utilization,# ... 其他参数**kwargs,)# 引擎创建(自动选择V0或V1引擎)self.llm_engine = LLMEngine.from_engine_args(engine_args=engine_args, usage_context=UsageContext.LLM_CLASS)# 任务支持检测self.supported_tasks = self.llm_engine.get_supported_tasks()# 插件系统初始化self.io_processor = get_io_processor(self.llm_engine.vllm_config, io_processor_plugin)

初始化逻辑的关键特点:

  • 自动引擎选择:根据配置自动选择V0或V1引擎
  • 任务能力检测:动态检测模型支持的任务类型
  • 插件系统集成:支持IO处理器等扩展插件
2. 请求处理阶段:智能调度
def generate(self, prompts, sampling_params=None, **kwargs):# 1. 模型类型验证if runner_type != "generate":raise ValueError("LLM.generate() is only supported for generative models")# 2. 默认参数处理if sampling_params is None:sampling_params = self.get_default_sampling_params()# 3. 多模态LoRA处理lora_request = self._get_modality_specific_lora_reqs(prompts, lora_request)# 4. 请求验证和添加self._validate_and_add_requests(prompts=prompts,params=sampling_params,lora_request=lora_request,priority=priority,)# 5. 引擎执行outputs = self._run_engine(use_tqdm=use_tqdm)return self.engine_class.validate_outputs(outputs, RequestOutput)
3. 请求验证和添加:质量控制
class LLM:def _validate_and_add_requests(self, prompts, params, **kwargs):# ...输入标准化,参数长度验证,多模态数据验证# 逐个添加请求for i, prompt in enumerate(prompts):self._add_request(prompt,params[i] if isinstance(params, Sequence) else params,lora_request=lora_request[i] if isinstance(lora_request, Sequence) else lora_request,priority=priority[i] if priority else 0,)def _add_request(self,prompt: PromptType,params: Union[SamplingParams, PoolingParams],lora_request: Optional[LoRARequest] = None,priority: int = 0) -> None:# ... 请求ID生成,参数校验self.llm_engine.add_request(request_id,engine_request,params,lora_request=lora_request,tokenization_kwargs=tokenization_kwargs,priority=priority,prompt_text=prompt_text,)
4. 引擎执行阶段:核心推理循环
class LLM:def _run_engine(self, use_tqdm=True):# 1. 进度条初始化if use_tqdm:num_requests = self.llm_engine.get_num_unfinished_requests()pbar = tqdm(total=num_requests, desc="Processed prompts")# 2. 主执行循环outputs = []while self.llm_engine.has_unfinished_requests():# 执行一个推理步骤step_outputs = self.llm_engine.step() # 可以获取到每个 prompt 的执行情况# 收集完成的输出for output in step_outputs:if output.finished:outputs.append(output)# 更新进度条和统计信息if use_tqdm:self._update_progress_bar(pbar, output)# 3. 结果排序和返回return sorted(outputs, key=lambda x: int(x.request_id))
4.2.6 EngineArgs:配置管理的核心

EngineArgs是vLLM中最重要的配置类,它整合了所有子系统的配置参数,实现了统一的参数管理:

@dataclass
class EngineArgs:"""Arguments for vLLM engine."""# 模型相关配置model: str = ModelConfig.modeltokenizer: Optional[str] = ModelConfig.tokenizerdtype: ModelDType = ModelConfig.dtypemax_model_len: Optional[int] = ModelConfig.max_model_len# 并行策略配置tensor_parallel_size: int = ParallelConfig.tensor_parallel_sizepipeline_parallel_size: int = ParallelConfig.pipeline_parallel_sizedata_parallel_size: int = ParallelConfig.data_parallel_size# 内存管理配置gpu_memory_utilization: float = CacheConfig.gpu_memory_utilizationswap_space: float = CacheConfig.swap_spacecpu_offload_gb: float = CacheConfig.cpu_offload_gb# 调度器配置max_num_seqs: Optional[int] = SchedulerConfig.max_num_seqsmax_num_batched_tokens: Optional[int] = SchedulerConfig.max_num_batched_tokens# 量化配置quantization: Optional[QuantizationMethods] = ModelConfig.quantization# LoRA配置enable_lora: bool = Falsemax_loras: int = LoRAConfig.max_loras

EngineArgs的设计体现了几个重要的工程原则:

  1. 配置聚合:将分散在各个配置类中的参数统一到一个入口点
  2. 类型安全:使用dataclass和类型注解确保参数类型正确性
  3. 默认值管理:通过get_field()函数从各个配置类中获取默认值
  4. 向后兼容:支持参数的渐进式演进和废弃处理

这种设计使得用户可以通过一个统一的接口配置整个vLLM引擎,而不需要了解底层的复杂配置结构。

4.3 EngineCoreClient:多模式执行架构的核心

EngineCoreClient 是vLLM V1架构中最关键的设计之一,它实现了多种执行模式的统一抽象,为不同的使用场景提供了灵活的解决方案。

4.3.1 设计理念与架构

EngineCoreClient 采用了抽象基类设计,通过工厂模式根据不同的配置参数自动选择合适的实现:

class EngineCoreClient(ABC):"""EngineCoreClient: 子类处理不同的推送和拉取方法用于asyncio / 多进程的EngineCore子类:* InprocClient: 进程内EngineCore (用于V0风格的LLMEngine)* SyncMPClient: ZMQ + 后台进程EngineCore (用于LLM)* AsyncMPClient: ZMQ + 后台进程EngineCore w/ asyncio (用于AsyncLLM)"""@staticmethoddef make_client(multiprocess_mode: bool,asyncio_mode: bool,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,) -> "EngineCoreClient":"""工厂方法,根据配置自动选择合适的客户端实现"""if multiprocess_mode and asyncio_mode:return EngineCoreClient.make_async_mp_client(vllm_config, executor_class, log_stats)if multiprocess_mode and not asyncio_mode:return SyncMPClient(vllm_config, executor_class, log_stats)return InprocClient(vllm_config, executor_class, log_stats)
4.3.2 三种核心实现模式
1. InprocClient:进程内执行模式

InprocClient 是最简单的实现,适用于传统的同步推理场景:

class InprocClient(EngineCoreClient):"""InprocClient: 进程内EngineCore客户端用于LLMEngine的V0风格add_request()和step()EngineCore设置在此进程中(无忙循环)* 直接将EngineCoreRequest推送到EngineCore* 通过步进EngineCore拉取EngineCoreOutputs"""def __init__(self, *args, **kwargs):self.engine_core = EngineCore(*args, **kwargs)def get_output(self) -> EngineCoreOutputs:outputs, _ = self.engine_core.step_fn()return outputs and outputs.get(0) or EngineCoreOutputs()def add_request(self, request: EngineCoreRequest) -> None:req, request_wave = self.engine_core.preprocess_add_request(request)self.engine_core.add_request(req, request_wave)

特点:

  • 零延迟通信:直接方法调用,无网络开销
  • 简单调试:所有代码在同一进程中,便于调试
  • 内存共享:与主进程共享内存空间
  • 适用场景:单机推理、开发测试、小规模部署
2. SyncMPClient:同步多进程模式

SyncMPClient 使用ZMQ进行进程间通信,支持多进程并行执行:

class SyncMPClient(MPClient):"""同步多进程EngineCore客户端"""def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool):super().__init__(asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,)# 创建输出队列和后台处理线程self.outputs_queue = queue.Queue[Union[EngineCoreOutputs, Exception]]()self._start_output_processing_thread()def _start_output_processing_thread(self):"""启动后台线程处理EngineCore输出"""def process_outputs_socket():while True:frames = out_socket.recv_multipart(copy=False)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output, utility_results)else:outputs_queue.put_nowait(outputs)self.output_queue_thread = Thread(target=process_outputs_socket,name="EngineCoreOutputQueueThread",daemon=True)self.output_queue_thread.start()def get_output(self) -> EngineCoreOutputs:"""同步获取输出结果"""outputs = self.outputs_queue.get()if isinstance(outputs, Exception):raise self._format_exception(outputs) from Nonereturn outputs

特点:

  • 进程隔离:EngineCore运行在独立进程中,提高稳定性
  • 异步处理:后台线程处理输出,主线程不阻塞
  • 高吞吐量:支持多进程并行,提高整体吞吐量
  • 适用场景:生产环境、高并发服务、多GPU部署
3. AsyncMPClient:异步多进程模式

AsyncMPClient 结合了多进程和异步编程的优势:

class AsyncMPClient(MPClient):"""异步多进程EngineCore客户端"""def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool, client_addresses: Optional[dict[str, str]] = None,client_count: int = 1, client_index: int = 0):super().__init__(asyncio_mode=True,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,client_addresses=client_addresses,)self.outputs_queue = asyncio.Queue[Union[EngineCoreOutputs, Exception]]()self._ensure_output_queue_task()def _ensure_output_queue_task(self):"""确保输出队列任务正在运行"""if self.resources.output_queue_task is not None:returnasync def process_outputs_socket():try:while True:frames = await output_socket.recv_multipart(copy=False)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output, utility_results)continueif outputs.outputs or outputs.scheduler_stats:outputs_queue.put_nowait(outputs)except Exception as e:outputs_queue.put_nowait(e)self.resources.output_queue_task = asyncio.create_task(process_outputs_socket(), name="EngineCoreOutputQueueTask")async def get_output_async(self) -> EngineCoreOutputs:"""异步获取输出结果"""outputs = await self.outputs_queue.get()if isinstance(outputs, Exception):raise self._format_exception(outputs) from Nonereturn outputs

特点:

  • 异步非阻塞:完全异步的API,支持高并发
  • 事件驱动:基于asyncio事件循环,资源利用率高
  • 流式处理:支持流式输出和实时响应
  • 适用场景:Web服务、API服务器、实时应用

4.4 EngineCore:推理引擎的核心协调者

EngineCore 是vLLM V1架构中最核心的组件,它作为调度器和模型执行器之间的协调者,负责整个推理流程的编排和执行。

4.4.1 设计理念与核心职责

EngineCore的设计遵循"协调者模式",它不直接执行具体的推理任务,而是协调各个子系统的工作:

class EngineCore:"""Inner loop of vLLM's Engine."""def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool, executor_fail_callback: Optional[Callable] = None):# 1. 插件系统初始化from vllm.plugins import load_general_pluginsload_general_plugins()# 2. 模型执行器初始化self.model_executor = executor_class(vllm_config)# 3. KV缓存初始化num_gpu_blocks, num_cpu_blocks, kv_cache_config = \self._initialize_kv_caches(vllm_config)# 4. 调度器初始化self.scheduler: SchedulerInterface = Scheduler(vllm_config=vllm_config,kv_cache_config=kv_cache_config,structured_output_manager=self.structured_output_manager,include_finished_set=vllm_config.parallel_config.data_parallel_size > 1,log_stats=self.log_stats,)# 5. 批处理队列设置(用于流水线并行)self.batch_queue_size = self.model_executor.max_concurrent_batchesif self.batch_queue_size > 1:self.batch_queue = deque(maxlen=self.batch_queue_size)# 6. 选择执行策略self.step_fn = (self.step if self.batch_queue is None else self.step_with_batch_queue)
4.4.2 核心组件初始化流程
1. KV缓存系统初始化

EngineCore首先需要初始化KV缓存系统,这是整个推理性能的关键:

def _initialize_kv_caches(self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:"""初始化KV缓存系统"""start = time.time()# 1. 获取模型所需的KV缓存规格kv_cache_specs = self.model_executor.get_kv_cache_specs()# 2. 确定可用GPU内存if has_kv_cache:if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1":# 弹性扩展模式:同步KV缓存内存大小self.available_gpu_memory_for_kv_cache = \ParallelConfig.sync_kv_cache_memory_size(dp_group, -1)else:# 标准模式:分析模型峰值内存使用available_gpu_memory = self.model_executor.determine_available_memory()self.available_gpu_memory_for_kv_cache = available_gpu_memory[0]# 3. 生成KV缓存配置kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs, available_gpu_memory)scheduler_kv_cache_config = generate_scheduler_kv_cache_config(kv_cache_configs)# 4. 初始化KV缓存并预热模型self.model_executor.initialize_from_config(kv_cache_configs)elapsed = time.time() - startlogger.info("init engine (profile, create kv cache, warmup model) took %.2f seconds", elapsed)return num_gpu_blocks, num_cpu_blocks, scheduler_kv_cache_config

KV缓存初始化关键步骤:

  • 内存分析:通过模型执行器分析峰值内存使用情况
  • 配置生成:根据可用内存和模型需求生成KV缓存配置
  • 预热执行:通过预热运行确保模型和缓存系统正常工作
  • 分布式同步:在数据并行模式下同步KV缓存配置
2. 调度器系统初始化

调度器是EngineCore的核心组件,负责请求的智能调度:

# 调度器初始化
if isinstance(vllm_config.scheduler_config.scheduler_cls, str):Scheduler = resolve_obj_by_qualname(vllm_config.scheduler_config.scheduler_cls)
else:Scheduler = vllm_config.scheduler_config.scheduler_cls# 检查是否支持分块预填充
if len(kv_cache_config.kv_cache_groups) == 0:logger.info("Disabling chunked prefill for model without KVCache")vllm_config.scheduler_config.chunked_prefill_enabled = Falseself.scheduler: SchedulerInterface = Scheduler(vllm_config=vllm_config,kv_cache_config=kv_cache_config,structured_output_manager=self.structured_output_manager,include_finished_set=vllm_config.parallel_config.data_parallel_size > 1,log_stats=self.log_stats,
)
3. 批处理队列系统

对于支持流水线并行的模型,EngineCore会设置批处理队列:

# 批处理队列设置
self.batch_queue_size = self.model_executor.max_concurrent_batches
self.batch_queue: Optional[deque[tuple[Future[ModelRunnerOutput], SchedulerOutput]]] = Noneif self.batch_queue_size > 1:logger.info("Batch queue is enabled with size %d", self.batch_queue_size)self.batch_queue = deque(maxlen=self.batch_queue_size)

批处理队列的作用:

  • 异步调度:允许异步调度和执行批次
  • 流水线优化:消除流水线并行中的气泡
  • 并发控制:控制最大并发批次数
4.4.3 核心推理循环

EngineCore的核心是推理循环,它协调调度器和模型执行器的工作:

1. 标准推理循环(step)
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:"""调度、执行和生成输出Returns:tuple: (输出字典, 模型是否执行的标志)"""# 1. 检查是否有待处理的请求if not self.scheduler.has_requests():return {}, False# 2. 调度器选择要执行的请求scheduler_output = self.scheduler.schedule()# 3. 执行模型推理model_output = self.execute_model_with_error_logging(self.model_executor.execute_model,scheduler_output)# 4. 更新调度器状态engine_core_outputs = self.scheduler.update_from_output(scheduler_output, model_output)return (engine_core_outputs, scheduler_output.total_num_scheduled_tokens > 0)

标准推理循环的关键步骤:

  1. 请求检查:检查调度器中是否有待处理的请求
  2. 智能调度:调度器根据当前状态选择最优的请求组合
  3. 模型执行:将调度结果传递给模型执行器进行推理
  4. 状态更新:根据模型输出更新调度器状态
  5. 结果返回:返回推理结果和执行状态
2. 批处理队列推理循环(step_with_batch_queue)

对于支持流水线并行的模型,EngineCore使用更复杂的批处理队列循环:

def step_with_batch_queue(self) -> tuple[Optional[dict[int, EngineCoreOutputs]], bool]:"""使用批处理队列进行调度和执行执行流程:1. 如果批处理队列未满,尝试调度新批次2. 如果没有新批次可调度,等待队列中的批次完成3. 更新调度器状态"""batch_queue = self.batch_queueassert batch_queue is not Nonemodel_executed = False# 1. 尝试调度新批次(如果队列未满)if self.scheduler.has_requests():scheduler_output = self.scheduler.schedule()future = self.model_executor.execute_model(scheduler_output, non_block=True)batch_queue.appendleft((future, scheduler_output))model_executed = scheduler_output.total_num_scheduled_tokens > 0# 如果队列未满且最后一个批次未完成,不阻塞if model_executed and len(batch_queue) < self.batch_queue_size \and not batch_queue[-1][0].done():return None, True# 2. 等待下一个结果可用future, scheduler_output = batch_queue.pop()model_output = self.execute_model_with_error_logging(lambda _: future.result(), scheduler_output)# 3. 更新调度器状态engine_core_outputs = self.scheduler.update_from_output(scheduler_output, model_output)return engine_core_outputs, model_executed

批处理队列循环的优势:

  • 异步执行:调度和执行可以并行进行
  • 流水线优化:消除流水线并行中的等待时间
  • 资源利用:最大化GPU和CPU的利用率

4.5 Scheduler:智能调度的艺术

调度器是vLLM性能优化的关键组件,实现了多种先进的调度策略:

4.5.1 连续批处理(Continuous Batching)

vLLM使用连续批处理技术,允许不同长度的序列在同一个batch中处理:

def _organize_batch(self, scheduler_output: SchedulerOutput):"""组织批处理,处理不同长度的序列"""# 1. 收集所有请求的tokenall_tokens = []token_to_request = {}  # token索引到请求ID的映射for req_data in scheduler_output.scheduled_new_reqs:if req_data.prompt_token_ids:for i, token_id in enumerate(req_data.prompt_token_ids):all_tokens.append(token_id)token_to_request[len(all_tokens) - 1] = req_data.req_idfor i, req_id in enumerate(scheduler_output.scheduled_cached_reqs.req_ids):num_tokens = scheduler_output.num_scheduled_tokens[req_id]for j in range(num_tokens):all_tokens.append(0)  # 占位符,实际值在推理时填充token_to_request[len(all_tokens) - 1] = req_id# 2. 创建注意力maskattention_mask = self._create_attention_mask(scheduler_output, token_to_request)# 3. 创建位置编码positions = self._create_positions(scheduler_output, token_to_request)return all_tokens, attention_mask, positions, token_to_request
4.5.2 PagedAttention优化
  • 【vLLM】核心技术PagedAttention,调度原理

PagedAttention允许高效处理不同长度的序列:

def _create_attention_mask(self, scheduler_output: SchedulerOutput, token_to_request: dict[int, str]):"""创建注意力mask,支持PagedAttention"""batch_size = len(scheduler_output.scheduled_new_reqs) + \len(scheduler_output.scheduled_cached_reqs.req_ids)# 创建块级别的注意力maskattention_mask = torch.zeros((batch_size, self.max_num_tokens), dtype=torch.bool, device=self.device)# 为每个请求设置有效的token位置for i, req_data in enumerate(scheduler_output.scheduled_new_reqs):num_tokens = len(req_data.prompt_token_ids) if req_data.prompt_token_ids else 0attention_mask[i, :num_tokens] = True# 处理缓存请求cached_start_idx = len(scheduler_output.scheduled_new_reqs)for i, req_id in enumerate(scheduler_output.scheduled_cached_reqs.req_ids):num_tokens = scheduler_output.num_scheduled_tokens[req_id]attention_mask[cached_start_idx + i, :num_tokens] = Truereturn attention_mask

4.6 Executor:分布式执行的抽象

执行器层提供了统一的分布式执行抽象,负责管理Worker进程的创建、初始化和协调。

4.6.1 MultiprocExecutor初始化流程
class MultiprocExecutor(Executor):def _init_executor(self) -> None:"""初始化执行器,创建和管理Worker进程"""# 1. 设置多进程环境set_multiprocessing_worker_envs()# 2. 配置分布式通信distributed_init_method = get_distributed_init_method(get_loopback_ip(), get_open_port())# 3. 创建消息队列用于Worker通信max_chunk_bytes = envs.VLLM_MQ_MAX_CHUNK_BYTES_MB * 1024 * 1024self.rpc_broadcast_mq = MessageQueue(self.world_size,self.world_size,max_chunk_bytes=max_chunk_bytes)scheduler_output_handle = self.rpc_broadcast_mq.export_handle()# 4. 创建Worker进程context = get_mp_context()shared_worker_lock = context.Lock()unready_workers: list[UnreadyWorkerProcHandle] = []for rank in range(self.world_size):unready_workers.append(WorkerProc.make_worker_process(vllm_config=self.vllm_config,local_rank=rank,rank=rank,distributed_init_method=distributed_init_method,input_shm_handle=scheduler_output_handle,shared_worker_lock=shared_worker_lock,))# 5. 等待所有Worker准备就绪self.workers = WorkerProc.wait_for_ready(unready_workers)# 6. 确保消息队列准备就绪self.rpc_broadcast_mq.wait_until_ready()for w in self.workers:w.worker_response_mq.wait_until_ready()# 7. 启动Worker监控self.start_worker_monitor()

4.7 ModelRunner:模型执行的核心

ModelRunner负责实际的模型推理执行,它是Worker中最重要的组件之一。

4.7.1 ModelRunner的创建流程

ModelRunner的创建发生在Worker的初始化过程中:

class Worker(WorkerBase):def __init__(self, vllm_config: VllmConfig, local_rank: int, rank: int, ...):super().__init__(vllm_config=vllm_config, ...)# 在Worker初始化过程中创建ModelRunner# 这发生在init_device()方法中self.init_device()def init_device(self):"""初始化设备并创建ModelRunner"""# 1. 设置CUDA设备self.device = torch.device(f"cuda:{self.local_rank}")torch.cuda.set_device(self.device)# 2. 初始化分布式环境init_distributed_environment()ensure_model_parallel_initialized()# 3. 创建ModelRunner实例self.model_runner: GPUModelRunner = GPUModelRunner(self.vllm_config, self.device)
4.7.2 GPUModelRunner的初始化

GPUModelRunner是ModelRunner的具体实现,它的初始化过程非常复杂:

class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):def __init__(self, vllm_config: VllmConfig, device: torch.device):"""初始化GPUModelRunner"""# 1. 保存配置信息self.vllm_config = vllm_configself.model_config = vllm_config.model_configself.cache_config = vllm_config.cache_configself.compilation_config = vllm_config.compilation_configself.lora_config = vllm_config.lora_configself.load_config = vllm_config.load_configself.parallel_config = vllm_config.parallel_configself.scheduler_config = vllm_config.scheduler_configself.speculative_config = vllm_config.speculative_configself.observability_config = vllm_config.observability_config# 2. 设置CPU卸载和批处理不变性from vllm.model_executor.models.utils import set_cpu_offload_max_bytesset_cpu_offload_max_bytes(int(self.cache_config.cpu_offload_gb * 1024**3))from vllm.model_executor.layers.batch_invariant import init_batch_invarianceinit_batch_invariance()# 3. 设置设备和数据类型self.device = deviceself.pin_memory = is_pin_memory_available()self.dtype = self.model_config.dtypeif cache_config.cache_dtype == "auto":self.kv_cache_dtype = self.dtypeelse:self.kv_cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[cache_config.cache_dtype]# 4. 设置模型相关参数self.is_pooling_model = (model_config.runner_type == 'pooling')self.enable_prompt_embeds = model_config.enable_prompt_embedsself.is_multimodal_raw_input_only_model = model_config.is_multimodal_raw_input_only_modelself.max_model_len = model_config.max_model_lenself.max_num_tokens = scheduler_config.max_num_batched_tokensself.max_num_reqs = scheduler_config.max_num_seqs# 5. 设置注意力相关参数self.num_query_heads = model_config.get_num_attention_heads(parallel_config)self.hidden_size = model_config.get_hidden_size()self.attention_chunk_size = model_config.attention_chunk_sizeself.use_alibi = check_use_alibi(model_config)self.cascade_attn_enabled = not self.model_config.disable_cascade_attn# 6. 设置多模态支持self.mm_registry = MULTIMODAL_REGISTRYself.uses_mrope = model_config.uses_mropeself.supports_mm_inputs = self.mm_registry.supports_multimodal_inputs(model_config)# 7. 设置编码器-解码器模型参数if self.model_config.is_encoder_decoder:self.max_encoder_len = scheduler_config.max_num_encoder_input_tokenselse:self.max_encoder_len = 0# 8. 初始化采样器self.sampler = Sampler(logprobs_mode=self.model_config.logprobs_mode)# 9. 初始化专家并行负载均衡器状态self.eplb_state: Optional[EplbState] = None# 10. 初始化延迟加载的组件self.kv_caches: list[torch.Tensor] = []self.attn_groups: list[list[AttentionGroup]] = []# self.model: nn.Module  # 在load_model()中设置# self.kv_cache_config: KVCacheConfig  # 在initialize_kv_cache中初始化

5. 总结与展望

vLLM的成功在于其优雅的工程设计和对性能的极致追求:

5.1 设计优势

  1. 分层架构:清晰的职责分离,易于维护和扩展
  2. 内存创新:PagedAttention革命性地解决了内存碎片问题
  3. 性能优化:从算法到实现的全方位优化
  4. 用户友好:简洁的API设计,降低使用门槛

5.2 技术创新

  1. PagedAttention:将虚拟内存概念引入Attention计算
  2. 动态批处理:智能的请求调度和批处理策略
  3. 多级缓存:GPU-CPU协同的内存管理
  4. 内核优化:深度定制的CUDA/Triton内核

5.3 未来发展

vLLM的设计为未来发展奠定了坚实基础:

  • 更多硬件支持:扩展到更多AI加速器
  • 模型格式支持:支持更多模型架构和格式
  • 推理优化:持续的算法和系统优化
  • 生态建设:更丰富的工具链和插件生态

vLLM不仅是一个高性能的推理引擎,更是现代AI系统工程的典范。它展示了如何通过精心的架构设计、创新的算法思想和工程最佳实践,构建一个既高效又易用的复杂系统。对于AI工程师而言,深入理解vLLM的设计思路和实现细节,将有助于在自己的项目中应用这些宝贵的经验和技术。


本文基于vLLM开源项目的代码分析,旨在从工程角度解析其设计思路和实现细节。随着项目的持续发展,部分实现细节可能会有所变化,建议读者结合最新的源代码进行学习。

http://www.dtcms.com/a/462466.html

相关文章:

  • 网站seo优化有哪些陕西锦宇建设有限公司网站
  • Looper、MessageQueue、Message及Handler的关系是什么?如何保证MessageQueue的并发访问安全?
  • ELK运维之路(Elasticsearch7集群组建-7.17.24)
  • 网站建设管理工作总结室外平台设计
  • OpenShift Virtualization - 为使用 ovn-k8s-cni-overlay 类型网络的 VM 自动分配 IP 地址
  • 投资建设个什么网站好网络设计实践课程报告
  • 柳州住房和城乡建设局网站在深圳注册公司需要什么条件
  • Java 并发编程中的 CLH 队列
  • 客服AI软件如何成为电商店铺的“隐形增长官“
  • 世纪龙科技-汽车玻璃升降器更换及车门调整仿真教学软件介绍
  • 友达G156HAN04.0工业宽温液晶模组技术摘要
  • 推荐西安知名的集团门户网站建设公司南京调查公司网站
  • 国际化(货币单位、时间、数字)
  • next项目如何实现不同页面使用不同的布局结构,比如login不使用全局的layout
  • dnf游戏币交易网站建设网站app简单做
  • 服务器操作手册(四)nacos搭建+redis搭建+nexus搭建
  • LoadRunner2022 社区版下载及安装教程 + 中文版教程,内附安装包
  • CC防护的实时监控与响应机制
  • 长沙网站柯林建站程序
  • 唐山网站搭建wordpress 分页按钮 显示文章数
  • html怎么做网站地图卓越职业院校建设专题网站
  • Debezium日常分享系列之:使用 Debezium 添加新表:最佳实践和陷阱
  • flash个人网站源码西安是哪个省属于哪个市
  • 东莞h5网站开发网站内部链接是怎么做的
  • 做网站每年包多少流量网站开发专业简历
  • 台州市临海建设局网站四川集团网站建设
  • 第十八篇:变量作用域(Local, Enclosing, Global, Built-in)与global、nonlocal关键字
  • 寻梦数据空间 | 内核篇:新一代数据空间的四大核心能力解析
  • 企业网站定制开发流程wordpress淘宝客 瀑布流
  • 一款简单易用的机器人流程自动化桌面软件