当前位置: 首页 > news >正文

MCP 传输层代码分析

MCP 传输层代码分析

MCP 整体架构说明

引用官方文档原文:Model Context Protocol (MCP) 构建在一个灵活且可扩展的架构上,使 LLM 应用和集成之间的无缝通信成为可能。具体架构细节可以参考文档(核心架构 - MCP 中文文档)。MCP 采用分层架构,分为协议层与传输层,官方实现的传输层支持两种实现:Stdio 传输通过 HTTP 的 SSE 传输。本文旨在通过官方提供的 Python 库代码,来分析 通过 HTTP 的 SSE 传输 方式的实现。

Python 用到的技术

  • AnyIOanyio 是 Python 的一个异步编程库,它提供了统一的 API 来编写兼容多种异步运行时(如 asynciotrio)的代码。
  • StarletteStarlette 是一个轻量级的 ASGI(异步服务器网关接口) Web 框架,专为 Python 异步编程设计,具有高性能、灵活和极简的特点。

细节说明

在此我会列出以上两个库用到的最主要的几个方法,特别是第一个方法。

  • anyio.create_memory_object_stream:此方法返回一个输入流和一个输出流,可以将其看成是一个水管,返回的参数可以看成是水管的两端。从一个流输入数据,可以从另一个流输出数据。代码中总共用到了三组通道(即输入流和输出流)。传输层与协议层之间是通过流来交流数据的。
  • anyio.create_task_group:用于创建任务组。
  • EventSourceResponse:这是 StarletteFastAPI 中用于实现 Server-Sent Events (SSE) 的响应类。注意代码中用到的参数 content,用于接收输入流,data_sender_callable,用于发送数据的方法(我没有找到相关的文档)。

整体流程

在这里插入图片描述

对于协议层的封装在 lowlevel.server.Server 中,其中 run 方法接收一个输入流和一个输出流。函数签名如下:

async def run(self,read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],write_stream: MemoryObjectSendStream[types.JSONRPCMessage],initialization_options: InitializationOptions,# When False, exceptions are returned as messages to the client.# When True, exceptions are raised, which will cause the server to shut down# but also make tracing exceptions much easier during testing and when using# in-process servers.raise_exceptions: bool = False,
):

传输层的主要工作是初始化一个输入流和输出流,用于传输客户端请求命令及服务端的响应数据。

SseServerTransport 类为传输层的实现类,该类对客户端暴露了两个端点:

  • /sse
    • 作用:创建一个 SSE 通道,初始化输入输出流,创建 sessionId,传输服务端响应的数据(从 read_stream 读取数据),启动协议层的服务。
    • 状态:有状态。
  • /message
    • 作用:接收客户端的 POST 请求,将请求传递给 write_stream
    • 状态:无状态。
starlette_app = Starlette(debug=self.settings.debug,routes=[Route("/sse", endpoint=handle_sse),Mount("/messages/", app=sse.handle_post_message),],
)

初始化 SSE 连接的代码

@asynccontextmanager
async def connect_sse(self, scope: Scope, receive: Receive, send: Send):if scope["type"] != "http":logger.error("connect_sse received non-HTTP request")raise ValueError("connect_sse can only handle HTTP requests")logger.debug("Setting up SSE connection")read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]write_stream: MemoryObjectSendStream[types.JSONRPCMessage]write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]read_stream_writer, read_stream = anyio.create_memory_object_stream(0)write_stream, write_stream_reader = anyio.create_memory_object_stream(0)session_id = uuid4()session_uri = f"{quote(self._endpoint)}?session_id={session_id.hex}"self._read_stream_writers[session_id] = read_stream_writerlogger.debug(f"Created new session with ID: {session_id}")sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, Any]](0)async def sse_writer():logger.debug("Starting SSE writer")async with sse_stream_writer, write_stream_reader:await sse_stream_writer.send({"event": "endpoint", "data": session_uri})logger.debug(f"Sent endpoint event: {session_uri}")async for message in write_stream_reader:logger.debug(f"Sending message via SSE: {message}")await sse_stream_writer.send({"event": "message","data": message.model_dump_json(by_alias=True, exclude_none=True),})async with anyio.create_task_group() as tg:response = EventSourceResponse(content=sse_stream_reader, data_sender_callable=sse_writer)logger.debug("Starting SSE response task")tg.start_soon(response, scope, receive, send)logger.debug("Yielding read and write streams")yield (read_stream, write_stream)

以上代码中初始化了三个通道,其中 sse_stream_writersse_stream_reader 没有对外暴露。它们用于将数据传给 EventSourceResponse。三个通道之间的数据流向较为复杂,建议在阅读代码时结合流程图来分析,以便更好地理解数据的走向。注意 read_stream_writer 被添加进了一个字典中,这是因为 read_stream_writer 是在 handle_post_message 中传输用户请求的。由于 POST 请求是无状态的,为了找到要发送给哪个 read_stream_writer,必须通过 session_id 去查找。

对于不习惯协程编程范式的人来说,理解这几段代码可能还有些困难。有时间我会继续修改这篇文章,将整个流程说明白。

相关文章:

  • 什么是建行财资云,招行CBS,光大跨行通
  • 什么是 ANR 如何避免它
  • 电池单元和电极性能
  • 何人传来空指针-GDB调试
  • Linux文件编程——open函数
  • MySQL 数据操纵与数据库优化
  • PostGreSQL:数据表被锁无法操作
  • Spark 中RDD、Job,stage,task的关系
  • c++STL-string的使用
  • 接口的基础定义与属性约束
  • Nginx 使用 Keepalived 搭建 nginx 高可用
  • (十二)Java枚举类深度解析:从基础到高级应用
  • 数据分析预备篇---NumPy数组
  • ARP协议的工作原理
  • JavaScript学习教程,从入门到精通,jQuery Mobile 移动页面开发语法知识点及案例代码(42)
  • 【Beat Saber 节奏光剑】全身动捕直播搭建指南
  • 销售管理系统使用全攻略:从基础配置到数据分析
  • 《Go小技巧易错点100例》第三十二篇
  • 实战项目1(02)
  • 《AI大模型应知应会100篇》第55篇:大模型本地开发环境搭建
  • 他站在当代思想的地平线上,眺望浪漫主义的余晖
  • 国务院新闻办公室发布《新时代的中国国家安全》白皮书
  • 民企老板被错羁212天续:申请国赔千万余元,要求恢复名誉赔礼道歉
  • 首映丨纪录电影《滚烫年华》:献给所有奋斗者
  • 巴基斯坦称成功拦截印度导弹,空军所有资产安全
  • 三星“七天机”质保期内屏幕漏液被要求自费维修,商家:系人为损坏