MCP 传输层代码分析
MCP 传输层代码分析
MCP 整体架构说明
引用官方文档原文:Model Context Protocol (MCP) 构建在一个灵活且可扩展的架构上,使 LLM 应用和集成之间的无缝通信成为可能。具体架构细节可以参考文档(核心架构 - MCP 中文文档)。MCP 采用分层架构,分为协议层与传输层,官方实现的传输层支持两种实现:Stdio 传输 和 通过 HTTP 的 SSE 传输。本文旨在通过官方提供的 Python 库代码,来分析 通过 HTTP 的 SSE 传输 方式的实现。
Python 用到的技术
- AnyIO:
anyio
是 Python 的一个异步编程库,它提供了统一的 API 来编写兼容多种异步运行时(如asyncio
和trio
)的代码。 - Starlette:
Starlette
是一个轻量级的 ASGI(异步服务器网关接口) Web 框架,专为 Python 异步编程设计,具有高性能、灵活和极简的特点。
细节说明
在此我会列出以上两个库用到的最主要的几个方法,特别是第一个方法。
anyio.create_memory_object_stream
:此方法返回一个输入流和一个输出流,可以将其看成是一个水管,返回的参数可以看成是水管的两端。从一个流输入数据,可以从另一个流输出数据。代码中总共用到了三组通道(即输入流和输出流)。传输层与协议层之间是通过流来交流数据的。anyio.create_task_group
:用于创建任务组。EventSourceResponse
:这是 Starlette 和 FastAPI 中用于实现 Server-Sent Events (SSE) 的响应类。注意代码中用到的参数content
,用于接收输入流,data_sender_callable
,用于发送数据的方法(我没有找到相关的文档)。
整体流程
对于协议层的封装在 lowlevel.server.Server
中,其中 run
方法接收一个输入流和一个输出流。函数签名如下:
async def run(self,read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],write_stream: MemoryObjectSendStream[types.JSONRPCMessage],initialization_options: InitializationOptions,# When False, exceptions are returned as messages to the client.# When True, exceptions are raised, which will cause the server to shut down# but also make tracing exceptions much easier during testing and when using# in-process servers.raise_exceptions: bool = False,
):
传输层的主要工作是初始化一个输入流和输出流,用于传输客户端请求命令及服务端的响应数据。
SseServerTransport
类为传输层的实现类,该类对客户端暴露了两个端点:
/sse
:- 作用:创建一个 SSE 通道,初始化输入输出流,创建
sessionId
,传输服务端响应的数据(从read_stream
读取数据),启动协议层的服务。 - 状态:有状态。
- 作用:创建一个 SSE 通道,初始化输入输出流,创建
/message
:- 作用:接收客户端的 POST 请求,将请求传递给
write_stream
。 - 状态:无状态。
- 作用:接收客户端的 POST 请求,将请求传递给
starlette_app = Starlette(debug=self.settings.debug,routes=[Route("/sse", endpoint=handle_sse),Mount("/messages/", app=sse.handle_post_message),],
)
初始化 SSE 连接的代码
@asynccontextmanager
async def connect_sse(self, scope: Scope, receive: Receive, send: Send):if scope["type"] != "http":logger.error("connect_sse received non-HTTP request")raise ValueError("connect_sse can only handle HTTP requests")logger.debug("Setting up SSE connection")read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]write_stream: MemoryObjectSendStream[types.JSONRPCMessage]write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]read_stream_writer, read_stream = anyio.create_memory_object_stream(0)write_stream, write_stream_reader = anyio.create_memory_object_stream(0)session_id = uuid4()session_uri = f"{quote(self._endpoint)}?session_id={session_id.hex}"self._read_stream_writers[session_id] = read_stream_writerlogger.debug(f"Created new session with ID: {session_id}")sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, Any]](0)async def sse_writer():logger.debug("Starting SSE writer")async with sse_stream_writer, write_stream_reader:await sse_stream_writer.send({"event": "endpoint", "data": session_uri})logger.debug(f"Sent endpoint event: {session_uri}")async for message in write_stream_reader:logger.debug(f"Sending message via SSE: {message}")await sse_stream_writer.send({"event": "message","data": message.model_dump_json(by_alias=True, exclude_none=True),})async with anyio.create_task_group() as tg:response = EventSourceResponse(content=sse_stream_reader, data_sender_callable=sse_writer)logger.debug("Starting SSE response task")tg.start_soon(response, scope, receive, send)logger.debug("Yielding read and write streams")yield (read_stream, write_stream)
以上代码中初始化了三个通道,其中 sse_stream_writer
和 sse_stream_reader
没有对外暴露。它们用于将数据传给 EventSourceResponse
。三个通道之间的数据流向较为复杂,建议在阅读代码时结合流程图来分析,以便更好地理解数据的走向。注意 read_stream_writer
被添加进了一个字典中,这是因为 read_stream_writer
是在 handle_post_message
中传输用户请求的。由于 POST 请求是无状态的,为了找到要发送给哪个 read_stream_writer
,必须通过 session_id
去查找。
对于不习惯协程编程范式的人来说,理解这几段代码可能还有些困难。有时间我会继续修改这篇文章,将整个流程说明白。