当前位置: 首页 > news >正文

vLLM - EngineCoreClient

EngineCoreClient是与EngineCore进行交互的基类:

  • API定义了同步和异步两个版本。
class EngineCoreClient(ABC):@abstractmethoddef shutdown(self):...def get_output(self) -> EngineCoreOutputs:raise NotImplementedErrordef add_request(self, request: EngineCoreRequest) -> None:raise NotImplementedError...def collective_rpc(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedErrordef dp_engines_running(self) -> bool:raise NotImplementedErrorasync def scale_elastic_ep(self, new_data_parallel_size: int) -> None:raise NotImplementedErrorasync def get_output_async(self) -> EngineCoreOutputs:raise NotImplementedErrorasync def add_request_async(self, request: EngineCoreRequest) -> None:raise NotImplementedError...async def collective_rpc_async(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedError

InprocClient

InprocClient是EngineCoreClient的单进程子类,主要用于V0版本:

  • 在同一个进程中直接调用 EngineCore 的方法,而不需要通过IPC。
  • 避免了通信开销,但会阻塞主线程。
class InprocClient(EngineCoreClient):def __init__(self, *args, **kwargs):self.engine_core = EngineCore(*args, **kwargs)def get_output(self) -> EngineCoreOutputs:outputs, _ = self.engine_core.step()return outputs.get(0) or EngineCoreOutputs()def add_request(self, request: EngineCoreRequest) -> None:self.engine_core.add_request(request)...

MPClient

MPClient是EngineCoreClient的多进程子类:

  • 用一个后台进程(Background Process)来执行EngineCore
  • 使用input_socket来Push EngineCoreRequests
  • 使用output_socket来Pull EngineCoreOutputs

MPClient.init

  • 创建encoder和decoder,用于序列化EngineCoreRequests和反序列化EngineCoreOutputs
  • 创建zmq.Context/zmq.asyncio.Context:EngineCoreClient和EngineCore之间的交互Context。
  • 创建BackgroundResources用于释放资源(如zmp.Context等)
  • 调用weakref.finalize将BackgroundResources绑定到self,这样在self对象销毁时,会自动调用BackgroundResources.__call__。
  • 如果配置了client_addresses,则使用外部创建好的EngineCore,否则则调用launch_core_engines创建EngineCore:engine_manager和coordinator。
  • 调用make_zmq_socket创建EngineCoreClient Push/Pull EngineCore的socket。
  • 根据engine_ranks为每一个RANK分配一个ZMQ identity。
  • 用input_socket接收engine_ranks发送的初始化消息,全部接收完成后说明所有的EngineCore都被正确初始化。
  • 创建pending_messages用于跟踪socket的消息发送情况,以确保数据发送完成后,再释放对应的资源(如Pytorch的Tensor内存)
class MPClient(EngineCoreClient):def __init__(self,asyncio_mode: bool,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,client_addresses: Optional[dict[str, str]] = None,):self.vllm_config = vllm_configself.encoder = MsgpackEncoder()self.decoder = MsgpackDecoder(EngineCoreOutputs)sync_ctx = zmq.Context(io_threads=2)self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctxself.resources = BackgroundResources(ctx=sync_ctx)self._finalizer = weakref.finalize(self, self.resources)success = Falsetry:self.engines_running = Falseself.stats_update_address: Optional[str] = Noneif client_addresses is not None:# Engines are managed externally to this client.input_address = client_addresses["input_address"]output_address = client_addresses["output_address"]self.stats_update_address = client_addresses.get("stats_update_address")else:with launch_core_engines(vllm_config, executor_class,log_stats) as (engine_manager,coordinator,addresses):self.resources.coordinator = coordinatorself.resources.engine_manager = engine_manager(input_address, ) = addresses.inputs(output_address, ) = addresses.outputsself.stats_update_address = (addresses.frontend_stats_publish_address)if coordinator is not None:assert self.stats_update_address == (coordinator.get_stats_publish_address())self.input_socket = self.resources.input_socket = make_zmq_socket(self.ctx, input_address, zmq.ROUTER, bind=True)self.resources.output_socket = make_zmq_socket(self.ctx, output_address, zmq.PULL)...engine_ranks = [dp_rank] if (offline_modeor external_dp_lb) else range(dp_size)self.core_engines: list[EngineIdentity] = [index.to_bytes(2, "little") for index in engine_ranks]identities = set(self.core_engines)sync_input_socket = zmq.Socket.shadow(self.input_socket)while identities:if not sync_input_socket.poll(timeout=600_000):raise TimeoutError("Timed out waiting for engines to send""initial message on input socket.")identity, _ = sync_input_socket.recv_multipart()identities.remove(identity)self.core_engine: EngineIdentity = self.core_engines[0]self.utility_results: dict[int, AnyFuture] = {}self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()success = Truefinally:if not success:self._finalizer()

SyncMPClient

SyncMPClient是MPClient的同步IO子类:

  • 实现了EngineCoreClient中定义的所有同步IO的API。
  • 使用queue.Queue队列,实现同步IO。

SyncMPClient.__init__:

  • 创建一个线程,接收EngineCore发送到output_socket的消息,并加入到self.outputs_queue。
class SyncMPClient(MPClient):def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool):super().__init__(asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,)...self.outputs_queue = queue.Queue[Union[EngineCoreOutputs, Exception]]()ctx = self.ctxout_socket = self.resources.output_socketdecoder = self.decoderutility_results = self.utility_resultsoutputs_queue = self.outputs_queueshutdown_path = get_open_zmq_inproc_path()resources = self.resourcesresources.shutdown_path = shutdown_pathdef process_outputs_socket():shutdown_socket = ctx.socket(zmq.PAIR)try:shutdown_socket.bind(shutdown_path)poller = zmq.Poller()poller.register(shutdown_socket, zmq.POLLIN)poller.register(out_socket, zmq.POLLIN)while True:socks = poller.poll()...frames = out_socket.recv_multipart(copy=False)resources.validate_alive(frames)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output,utility_results)else:outputs_queue.put_nowait(outputs)except Exception as e:...self.output_queue_thread = Thread(target=process_outputs_socket,name="EngineCoreOutputQueueThread",daemon=True)self.output_queue_thread.start()

SyncMPClient和EngineCore的交互主要有3类:

  • add_request:添加EngineCoreRequest
  • call_utility:远程调用EngineCore方法
  • get_output:获取EngineCoreOutput

SyncMPClient.add_request

  • 序列化EngineCoreRequest,会同时序列化里面的Auxiliary Buffers(比如Pytorch Tensor等)。
  • 使用input_socke发送消息给EngineCore:(Identity, RequestType, EngineCoreRequest(Serialized), Auxiliary Buffers)
  • 如果存在Auxiliary Buffers,会将消息添加到self.pending_messages,用于确保消息发送完成后,再释放对应的内存(如Pytorch Tensor等)
class SyncMPClient(MPClient):def add_request(self, request: EngineCoreRequest) -> None:if self.is_dp:self.engines_running = Trueself._send_input(EngineCoreRequestType.ADD, request) def _send_input(self, request_type: EngineCoreRequestType, request: Any):self.ensure_alive()self.free_pending_messages()# (Identity, RequestType, SerializedRequest)msg = (self.core_engine, request_type.value,*self.encoder.encode(request))if len(msg) <= 3:# No auxiliary buffers => no tensor backing buffers in request.self.input_socket.send_multipart(msg, copy=False)returntracker = self.input_socket.send_multipart(msg, copy=False, track=True)self.add_pending_message(tracker, request)

SyncMPClient.call_utility

以profile为例:

  • 生成一个call_id
  • 使用input_socke发送消息给EngineCore:(Identity, RequestType, (0, call_id, method, args)(Serialized))
class SyncMPClient(MPClient):def profile(self, is_start: bool = True) -> None:self.call_utility("profile", is_start)def call_utility(self, method: str, *args) -> Any:call_id = uuid.uuid1().int >> 64future: Future[Any] = Future()self.utility_results[call_id] = futureself._send_input(EngineCoreRequestType.UTILITY,(0, call_id, method, args))return future.result()

SyncMPClient.get_output

在SyncMPClient.__init__中,已经独立创建了一个线程,用于接收并解码output_socket的消息,并放入self.outputs_queue(queue.Queue类型)。
所以SyncMPClient.get_output只需要调用self.outputs_queue.get()即可以实现同步IO:在未收到数据时,阻塞主线程。

AsyncMPClient

TODO:待补充


文章转载自:

http://xxkFX1hn.hyjpL.cn
http://B2vxswMh.hyjpL.cn
http://7eO0faC1.hyjpL.cn
http://goRHl1nB.hyjpL.cn
http://V4xFgVJl.hyjpL.cn
http://q1a5t28F.hyjpL.cn
http://f824KJkL.hyjpL.cn
http://UyuBKHkc.hyjpL.cn
http://43nfGyeg.hyjpL.cn
http://qlIJ07eY.hyjpL.cn
http://DCwZ0Bgf.hyjpL.cn
http://41leidY7.hyjpL.cn
http://Km9jdXgp.hyjpL.cn
http://iQiDqjPj.hyjpL.cn
http://mxuDhSNh.hyjpL.cn
http://hPIkchRl.hyjpL.cn
http://7g9YznTP.hyjpL.cn
http://DKDorDzF.hyjpL.cn
http://LK8HxXA4.hyjpL.cn
http://fEkM1Fh3.hyjpL.cn
http://1Sj8UxCp.hyjpL.cn
http://rllR87jF.hyjpL.cn
http://9Dkg6ctr.hyjpL.cn
http://n28Vklf3.hyjpL.cn
http://M9ydxtDP.hyjpL.cn
http://qHQnVlIY.hyjpL.cn
http://vKfX33f4.hyjpL.cn
http://vajP1gfk.hyjpL.cn
http://jWRvWVAx.hyjpL.cn
http://t6rp3FBO.hyjpL.cn
http://www.dtcms.com/a/383441.html

相关文章:

  • MySQL专题Day(2)————存储引擎
  • 多文件编程与宏的使用
  • 第5节-连接表-Inner-Join
  • 【Csp - S】 图的知识
  • 【图文详解】MCP、A2A的核心技术特点以及架构模式
  • Java基础 9.13
  • Shell 正则表达式完全指南
  • 玩转ClaudeCode:用Database-MCP实现自然语言操作数据库
  • 【Android】答题系统Web服务器APP应用开发流程详解
  • Web服务器VS应用服务器:核心差异解析
  • 分享一个vue2的tinymce配置
  • spring bean一共有几种作用域
  • Redie详细入门教程2
  • Maven入门_简介、安装与配置
  • Vue组件化开发介绍
  • ​new species of flying reptile1 discovered in Scotland​
  • Spring JDBC与KingbaseES深度集成:构建高性能国产数据库应用实战
  • 闪电科创 SCI专业辅导
  • 【数据结构与算法】图 Floyd算法
  • 代码随想录算法训练营第十一天--二叉树2 || 226.翻转二叉树 / 101.对称二叉树 / 104.二叉树的最大深度 / 111.二叉树的最小深度
  • IDEA编译器设置代码注释模板
  • 10-鼠标操作的处理
  • efcore 对象内容相同 提交MSSQL后数据库没有更新
  • Docker 容器化
  • 玩转Docker | 使用Docker部署OmniTools自托管IT工具箱
  • 类的组合(对比继承)
  • python爬虫的逆向技术讲解
  • Cookie 和 Session
  • 【WebSocket✨】入门之旅(四):WebSocket 的性能优化
  • 40分钟的Docker实战攻略