当前位置: 首页 > news >正文

【vLLM】源码解读:vllm 模型加载到推理全流程

🚀 vLLM + Qwen2 (tp=2) 完整流程详解

基于多进程架构 (MultiprocExecutor)


📋 目录

  1. 系统初始化流程
  2. 子进程创建与初始化
  3. 模型加载与权重分片
  4. 推理请求处理流程
  5. 模型前向传播详解
  6. 通信机制详解
  7. 完整时序图

一、系统初始化流程

1.1 用户代码启动

# 用户代码
from vllm import LLM, SamplingParamsllm = LLM(model="Qwen/Qwen2-7B",tensor_parallel_size=2,  # tp=2dtype="bfloat16",gpu_memory_utilization=0.9,trust_remote_code=True
)

1.2 LLM 类初始化

# vllm/entrypoints/llm.pyclass LLM:def __init__(self, model, tensor_parallel_size=2, **kwargs):"""主进程入口"""# 1. 创建 EngineArgs (封装所有配置参数)engine_args = EngineArgs(model="Qwen/Qwen2-7B",tensor_parallel_size=2,dtype="bfloat16",gpu_memory_utilization=0.9,**kwargs)# 2. 从 EngineArgs 创建 LLMEngineself.llm_engine = LLMEngine.from_engine_args(engine_args=engine_args,usage_context=UsageContext.LLM_CLASS)# 这会触发整个初始化链# 3. 获取 tokenizer (用于主进程)self.tokenizer = self.llm_engine.get_tokenizer()

1.3 LLMEngine 初始化

# vllm/v1/engine/llm_engine.pyclass LLMEngine:def __init__(self, vllm_config, executor_class, ...):"""主进程的引擎"""self.vllm_config = vllm_configself.model_config = vllm_config.model_config# 1. 初始化 Tokenizer (主进程)self.tokenizer = init_tokenizer_from_configs(model_config=vllm_config.model_config)# 2. 创建 Processor (处理输入,转换为引擎请求)self.processor = Processor(vllm_config=vllm_config,tokenizer=self.tokenizer)# 3. 创建 OutputProcessor (处理输出,转换为用户格式)self.output_processor = OutputProcessor(self.tokenizer,log_stats=self.log_stats)# 4. 创建 EngineCore (核心调度和执行)self.engine_core = EngineCoreClient.make_client(multiprocess_mode=True,  # 使用多进程模式vllm_config=vllm_config,executor_class=executor_class  # MultiprocExecutor)

1.4 MultiprocExecutor 初始化

# vllm/v1/executor/multiproc_executor.pyclass MultiprocExecutor(Executor):def _init_executor(self):"""主进程中的 Executor负责创建和管理子进程"""# 1. 计算 world_sizeself.world_size = 2  # tp=2tensor_parallel_size = 2pipeline_parallel_size = 1# 2. 设置分布式初始化方法distributed_init_method = get_distributed_init_method(get_loopback_ip(),  # "127.0.0.1"get_open_port()     # 随机可用端口,如 "29500")# 结果: "tcp://127.0.0.1:29500"# 3. 创建共享内存消息队列 (主进程 -> 子进程)self.rpc_broadcast_mq = MessageQueue(num_writers=self.world_size,  # 2num_readers=self.world_size   # 2)scheduler_output_handle = self.rpc_broadcast_mq.export_handle()# 4. 创建子进程锁 (用于同步)context = get_mp_context()  # 'spawn' 或 'fork'shared_worker_lock = context.Lock()# 5. 为每个 rank 创建子进程unready_workers = []for rank in range(self.world_size):  # rank: 0, 1unready_worker = WorkerProc.make_worker_process(vllm_config=self.vllm_config,local_rank=rank,        # GPU id: 0, 1rank=rank,              # 全局 rank: 0, 1distributed_init_method=distributed_init_method,input_shm_handle=scheduler_output_handle,shared_worker_lock=shared_worker_lock)unready_workers.append(unready_worker)# 6. 等待所有子进程就绪self.workers = WorkerProc.wait_for_ready(unready_workers)# 此时所有子进程已经:# - 初始化分布式环境# - 加载模型# - 等待主进程的指令# 7. 确保消息队列就绪self.rpc_broadcast_mq.wait_until_ready()for worker in self.workers:worker.worker_response_mq.wait_until_ready()# 8. 启动监控线程 (监控子进程存活状态)self.start_worker_monitor()

二、子进程创建与初始化

2.1 创建子进程

# vllm/v1/executor/multiproc_executor.pyclass WorkerProc:@staticmethoddef make_worker_process(vllm_config, rank, ...):"""主进程调用,创建一个新的子进程"""context = get_mp_context()  # 通常是 'spawn'# 1. 创建 Pipe 用于就绪通知reader, writer = context.Pipe(duplex=False)# 2. 创建死亡检测 Pipedeath_reader, death_writer = context.Pipe(duplex=False)# 3. 创建进程proc = context.Process(target=WorkerProc.worker_main,  # 子进程入口函数kwargs={"vllm_config": vllm_config,"local_rank": rank,"rank": rank,"distributed_init_method": "tcp://127.0.0.1:29500","input_shm_handle": scheduler_output_handle,"ready_pipe": (reader, writer),"death_pipe": death_reader,"shared_worker_lock": shared_worker_lock},name=f"VllmWorker-{rank}",  # "VllmWorker-0" / "VllmWorker-1"daemon=True)# 4. 启动子进程proc.start()return UnreadyWorkerProcHandle(proc, rank, reader, death_writer)

2.2 子进程主函数

class WorkerProc:@staticmethoddef worker_main(vllm_config, local_rank, rank, distributed_init_method, input_shm_handle,ready_pipe, death_pipe, shared_worker_lock):"""每个子进程的入口函数在独立的进程空间中运行"""try:# 1. 创建 WorkerProc 实例worker_proc = WorkerProc(vllm_config=vllm_config,local_rank=local_rank,  # 0 或 1rank=rank,              # 0 或 1distributed_init_method=distributed_init_method,input_shm_handle=input_shm_handle,shared_worker_lock=shared_worker_lock)# 2. 通知主进程已就绪reader, writer = ready_pipereader.close()writer.send({"status": "READY", "worker_response_mq_handle": ...})writer.close()# 3. 进入消息循环worker_proc.run_busy_loop(death_pipe)except Exception as e:# 错误处理logger.exception(f"Worker {rank} failed")writer.send({"status": "FAILED", "error"
http://www.dtcms.com/a/487632.html

相关文章:

  • Keil MDK系列:(四)SCT文件编写教程
  • 如何熟悉网站项目的逻辑做班级网站的实训报告
  • 前端 TypeScript 项目中的“守护者”:Zod 实战使用心得与最佳实践
  • 1.n8n 的搭建与使用
  • 公司网站SEO优化哪个做得好永久免费可联网的进销存软件
  • qq官方网站登录入口做本地网站怎么挣钱
  • 睢县做网站怎样查找自己建设的网站
  • 【开题答辩全过程】以 便利店库存管理系统为例,包含答辩的问题和答案
  • 天津企业做网站多少钱wordpress 附件预览
  • 最好的html5画廊显示质量html5的网站成品网站开发
  • ETH Gas Used
  • Golang + OpenSSL 实现 TLS 安全通信:从私有 CA 到动态证书加载
  • 扩展-docker-ovs编译
  • 什么网站可以免费发布招聘信息鳌江网站建设
  • 门户网站 架构网站怎样快速排名
  • OpenLayers的过滤器 -- 章节二:包含过滤器详解
  • 【题解】B2609【深基1.习1】清扫教室
  • 西安市城乡建设网官方网站免费咨询医生回答在线
  • 【完整源码+数据集+部署教程】 口腔疾病图像分割系统源码&数据集分享 [yolov8-seg等50+全套改进创新点发刊_一键训练教程_Web前端展示]
  • 尤溪网站开发开发一款电商app需要多少钱
  • python单元测试 unittest.mock.patch (一)
  • 一般网站开发好的框架都有哪些网站关闭了域名备案
  • 做自行车车队网站的名字大全做论文查重网站代理能赚到钱吗
  • 华为Asend NPU 大模型W8A8量化调优
  • C#拆箱/装箱(性能优化)
  • 深圳市做网站建设wordpress 获取子分类
  • 网站推广排名平台做网站常州
  • 企业配电柜里的“防火卫士”——ATE800无线测温传感器,让设备更安全!
  • 如何使用云手机进行游戏挂机?
  • 网站自适应手机代码百度网盘资源搜索引擎搜索