MCP 传输机制(Streamable HTTP)
MCP 概念基础
MCP 传输机制
MCP 传输机制(Streamable HTTP)
文章目录
- 1. Streamable HTTP 优势
- 1.1. HTTP + SSE 存在的问题
- 1.2 Streamable HTTP 的改进
- 2. Streamable HTTP 传输模式
- 2.1. 服务器实例(FastMCP + Starlette)
- 2.1.1. 服务器代码编写
- 2.1.2. 客户端代码编写
- 2.1.3. 测试运行
- 2.1.4. 运行 Inspector
- 2.1.5. 测试运行(curl)
- 2.2. 服务器实例(fastapi + curl)
- 2.2.1. 服务端代码编写
- 2.2.2. 测试运行(curl)
- 2.3. 服务器实例(requests + Starlette)
- 2.3.1. 服务器代码编写
- 2.3.2. 编写客户端代码
- 2.3.3. 测试运行
HTTP + SSE 存在的问题 和 Streamable HTTP 的改进:
https://developer.aliyun.com/article/1661971
1. Streamable HTTP 优势
1.1. HTTP + SSE 存在的问题
HTTP + SSE 的传输过程实现中,客户端和服务器通过两个主要渠道进行通信:
(1)HTTP 请求/响应:客户端通过标准的 HTTP 请求向服务器发送消息。
(2)服务器发送事件(SSE):服务器通过专门的 /sse 端点向客户端推送消息。
这就导致存在下面三个问题:
• 服务器必须维护长连接,在高并发情况下会导致显著的资源消耗。
• 服务器消息只能通过 SSE 传递,造成了不必要的复杂性和开销。
• 基础架构兼容性,许多现有的网络基础架构可能无法正确处理长期的 SSE 连接。企业防火墙可能会强制终止超时连接,导致服务不可靠。
1.2 Streamable HTTP 的改进
Streamable HTTP 是 MCP 协议的一次重要升级,通过下面的改进解决了原有 HTTP + SSE 传输方式的多个关键问题:
• 统一端点:移除了专门建立连接的 /sse 端点,将所有通信整合到统一的端点。
• 按需流式传输:服务器可以灵活选择返回标准 HTTP 响应或通过 SSE 流式返回。
• 状态管理:引入 session 机制以支持状态管理和恢复。
2. Streamable HTTP 传输模式
2.1. 服务器实例(FastMCP + Starlette)
参考:
https://blog.csdn.net/supingemail/article/details/148151787
https://www.cnblogs.com/xiao987334176/p/18872195
2.1.1. 服务器代码编写
服务器代码编写 streamable_server.py
import argparse
import uvicorn
import requests
import jsonfrom mcp.server.fastmcp import FastMCP
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.server.streamable_http import StreamableHTTPServerTransport
from mcp.server.streamable_http_manager import StreamableHTTPSessionManagerfrom starlette.requests import Request
from starlette.routing import Mount, Route
from starlette.applications import Starlette# 主要参考:
# https://blog.csdn.net/supingemail/article/details/148151787mcp = FastMCP("StreamableHttpServer",)@mcp.tool()
def get_public_ip_address() -> str:"""获取服务器公网 IP 地址返回: str: 当前网络的公网 IP 地址"""try:response = requests.get("http://ip-api.com/json")response.raise_for_status() # 检查 HTTP 请求是否成功content = json.loads(response.text)return content.get("query", "Unknown IP") # 提供默认值以防字段缺失except requests.RequestException as e:print(f"请求错误: {e}")return "Request Failed"except json.JSONDecodeError as e:print(f"JSON 解码错误: {e}")return "Invalid Response"# 创建 Starlette 应用
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:# 创建会话管理器session_manager = StreamableHTTPSessionManager(app=mcp_server,json_response=True,stateless=False)return Starlette(debug=True,routes=[Mount("/mcp", app=session_manager.handle_request),],lifespan=lambda app: session_manager.run(),)# 主程序入口
if __name__ == "__main__":mcp_server = mcp._mcp_server# 解析命令行参数parser = argparse.ArgumentParser(description='Run MCP SSE-based server')parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')parser.add_argument('--port', type=int, default=18082, help='Port to listen on')args = parser.parse_args()# 创建并运行 Starlette 应用starlette_app = create_starlette_app(mcp_server, debug=True)uvicorn.run(starlette_app, host=args.host, port=args.port)
2.1.2. 客户端代码编写
客户端代码编写 streamable_client.py
import asyncio
import os
import json
import sys
from typing import Optional
from contextlib import AsyncExitStackfrom openai import OpenAI
from dotenv import load_dotenvfrom mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client# 加载 .env 文件,确保 API Key 受到保护
load_dotenv()class MCPClient:def __init__(self):"""初始化 MCP 客户端"""self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("OPENAI_API_KEY") # 读取 OpenAI API Keyself.base_url = os.getenv("BASE_URL") # 读取 BASE YRLself.model = os.getenv("MODEL") # 读取 modelif not self.openai_api_key:raise ValueError("❌ 未找到 OpenAI API Key,请在 .env 文件中设置 OPENAI_API_KEY")self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url) # 创建 OpenAI clientself.session: Optional[ClientSession] = None # 用于保存 MCP 的客户端会话,默认是 None,稍后通过 connect_to_server 进行连接。async def connect_to_streamable_server(self, server_url: str):"""Connect to an MCP server running with SSE transport"""# # 代码块 1# # 创建 SSE 客户端连接上下文管理器# self._streams_context = streamablehttp_client(url=server_url)# # 异步初始化 SSE 连接,获取数据流对象# read_stream, write_stream, get_session_id = await self._streams_context.__aenter__()# # 使用数据流创建 MCP 客户端会话上下文# self._session_context = ClientSession(read_stream, write_stream)# # 初始化客户端会话对象# self.session: ClientSession = await self._session_context.__aenter__()# 代码块 2sse_transport = await self.exit_stack.enter_async_context(streamablehttp_client(url=server_url, sse_read_timeout=300))self.stdio, self.write, get_session_id = sse_transport"""创建 MCP 客户端会话,与服务器交互。"""self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# 代码块 结束# 执行 MCP 协议初始化握手await self.session.initialize()session_id = get_session_id()print("session_id", session_id)# 列出 MCP 服务器上的工具"""向 MCP 服务器请求所有已注册的工具(用 @mcp.tool() 标记)。"""response = await self.session.list_tools()tools = response.toolsprint("\n已连接到服务器,支持以下工具:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:"""使用大模型处理查询并调用可用的 MCP 工具 (Function Calling)"""messages = [{"role": "user", "content": query}]response = await self.session.list_tools()"""获取服务器上的工具,再转换成 available_tools 的格式。"""available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]# print(available_tools)"""方法发送请求:model=self.model:比如 "gpt-4o" 或 "deepseek-chat"messages=messages:聊天上下文tools=available_tools:让模型知道有哪些可调用的「函数」。这是你自定义的**“Function Calling”**协议(非官方 JSON schema)。"""response = self.client.chat.completions.create(model=self.model, messages=messages,tools=available_tools )# 处理返回的内容content = response.choices[0]if content.finish_reason == "tool_calls":# 如何是需要使用工具,就解析工具。"""它会在 content.message.tool_calls 列表中声明要用哪个函数、参数是什么。这是你自定义的一种函数调用机制,和官方 function_call 格式略有不同,但逻辑相似。"""tool_call = content.message.tool_calls[0]tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)# 执行工具"""取出工具名 tool_name 和参数 tool_args,再调用 self.session.call_tool(tool_name, tool_args) 执行 MCP 工具。"""result = await self.session.call_tool(tool_name, tool_args)print(f"\n\n[Calling tool {tool_name} with args {tool_args}]\n\n")# 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入 messages 中"""把工具调用结果以「role=tool」的形式写入 messages。这样相当于把“函数调用结果”再喂给模型。"""messages.append(content.message.model_dump())messages.append({"role": "tool","content": result.content[0].text,"tool_call_id": tool_call.id,})# 将上面的结果再返回给大模型用于生产最终的结果response = self.client.chat.completions.create(model=self.model,messages=messages,)return response.choices[0].message.content# 如果没有要调用工具,直接返回 content.message.content(模型的文本回答)。return content.message.contentasync def chat_loop(self):"""运行交互式聊天循环"""print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")while True:try:query = input("\n你: ").strip()if query.lower() == 'quit':breakresponse = await self.process_query(query) # 发送用户输入到 OpenAI APIprint(f"\n🤖 OpenAI: {response}")except Exception as e:print(f"\n⚠️ 发生错误: {str(e)}")async def cleanup(self):"""清理资源"""await self.exit_stack.aclose()async def main():if len(sys.argv) < 2:print("Usage: python client.py <path_to_server_script>")sys.exit(1)client = MCPClient()try:await client.connect_to_streamable_server(sys.argv[1])await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":import sysasyncio.run(main())
2.1.3. 测试运行
# 运行服务器
uv run .\src\streamable_server.py# 运行客户端
python.exe .\src\streamable_client.py http://localhost:18082/mcp/
2.1.4. 运行 Inspector
# 启动 Inspector(不知道为啥还要指定 服务器源码路径)
# 启动过程需要等一会
mcp dev .\src\streamable_server.py# 启动 MCP Server
uv run .\src\streamable_server.py
2.1.5. 测试运行(curl)
尚未完全调试通过,可以直接跳过该节。
https://blog.csdn.net/xinjichenlibing/article/details/148150901
- initialize 能力协商请求
注意 首先将请求头中加入 Content-Type: application/json
curl -X POST http://127.0.0.1:18082/mcp/ \
-H 'Content-Type: application/json' \
-H 'Accept:application/json,text/event-stream' \
-d '{"method": "initialize","params": {"protocolVersion": "2025-06-18","capabilities": {"sampling": {},"roots": {"listChanged": true}},"clientInfo": {"name": "curl","version": "0.15.0"}},"jsonrpc": "2.0","id": 0
}'curl.exe -X POST http://127.0.0.1:18082/mcp/ -H 'Content-Type: application/json' -H 'Accept:application/json,text/event-stream' -d '{\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2025-06-18\",\"capabilities\":{\"sampling\":{},\"roots\":{\"listChanged\":true}},\"clientInfo\":{\"name\":\"mcp-inspector\",\"version\":\"0.15.0\"}},\"jsonrpc\":\"2.0\",\"id\":0}'curl.exe -N -X POST http://127.0.0.1:18082/mcp/ -H 'Content-Type: application/json' -H 'Accept:application/json,text/event-stream' -d '{\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2025-06-18\",\"capabilities\":{\"sampling\":{},\"roots\":{\"listChanged\":true}},\"clientInfo\":{\"name\":\"mcp-inspector\",\"version\":\"0.15.0\"}},\"jsonrpc\":\"2.0\",\"id\":0}'
返回结果:
{"jsonrpc": "2.0","id": 0,"result": {"protocolVersion": "2025-03-26","capabilities": {"experimental": {},"prompts": {"listChanged": false},"resources": {"subscribe": false,"listChanged": false},"tools": {"listChanged": false}},"serverInfo": {"name": "StreamableHttpServer","version": "1.9.4"}}
}
以下尚未调试通过
- notifications/initialized 通知
确认客户端成功连接服务器,因为只是通知类型,服务器会返回状态码为 204 的空包:
curl -X POST http://127.0.0.1:18082/mcp \
-H 'Content-Type: application/json' \
-d '{"jsonrpc": "2.0","id": 0,"method": "notifications/initialized"
}'curl.exe -X POST http://127.0.0.1:18082/mcp -H 'Content-Type: application/json' -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"notifications/initialized\"}'curl.exe -N -X POST http://127.0.0.1:18082/mcp -H 'Content-Type: application/json' -H "Mcp-Session-Id:2d95c8770a5b4d559f8e2a87614beca5" -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"notifications/initialized\"}'
返回结果:
无
- tools/list 请求
获取工具注册表。我们期望可以获得 get_weather 工具结构体和 json schema 的工具注册表
curl -X POST http://127.0.0.1:18082/mcp \
-H 'Content-Type: application/json' \
-d '{"jsonrpc": "2.0","id": 0,"method": "tools/list"
}'curl.exe -X POST http://127.0.0.1:18082/mcp -H 'Content-Type: application/json' -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"tools/list\"}'curl.exe -N -X POST http://127.0.0.1:18082/mcp/ -H 'Content-Type: application/json' -H 'Accept:application/json,text/event-stream' -H "Mcp-Session-Id:2d95c8770a5b4d559f8e2a87614beca5" -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"tools/list\"}'curl.exe -N -X POST http://127.0.0.1:18082/mcp/ -H 'Content-Type: application/json' -H 'Accept:application/json,text/event-stream' -H "Mcp-Session-Id:2d95c8770a5b4d559f8e2a87614beca5" -d '{\"method\":\"tools/list\",\"params\":{\"_meta\":{\"progressToken\": 1}},\"jsonrpc\":\"2.0\",\"id\":0}'
返回结果:
{"jsonrpc": "2.0","id": 0,"result": {"tools": [{"name": "get_weather","description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。","inputSchema": {"type": "object","properties": {"city": {"type": "string","description": "City name, e.g. 'Hangzhou'"}},"required": ["city"]}}],"nextCursor": null}
}
2.2. 服务器实例(fastapi + curl)
参考:
https://zhuanlan.zhihu.com/p/1911039530187855274
Streamable HTTP 请求响应顺序
首先,客户端启动与服务器建立连接,需要 3 步握手(前三步),这时用户还没有输入信息;
当用户第一次提问时,模型判断要使用工具,客户端向服务端发起工具调用请求(执行4、5步)
时刻 | HTTP | JSON-RPC method | 作用 | 服务器典型响应 |
---|---|---|---|---|
① | POST /mcp | initialize | 协商协议版本 & 能力 | result.protocolVersion = 协议版本号 result.capabilities.tools.listChanged = true |
② | POST /mcp | notifications/initialized | 客户端告诉服务器“我已就绪” | 服务器只回 204 No Content HTTP 204 无包体 |
③ | POST /mcp | tools/list | 客户端请求工具清单 | result.tools 数组 + nextCursor(下一流式点) |
④ | POST /mcp | tools/call | 调用对应工具 | params.name = get_weather params.arguments.city 或 location |
⑤ | 流式响应 | stream / result | 流式返回工具执行结果 | 服务器推送: 成功 result.content[] |
在收到 第5步 的 result.content 后,会把文本回填到大模型对话记录中,大模型再输出给终端,这样就可以看到 MCP 服务器执行的结果啦!
将上述流程按顺序简写后如下:
POST /mcp → 200 initialize
POST /mcp → 204 notifications/initialized
POST /mcp → 200 tools/list
——等待用户——
POST /mcp → 200 /stream tools/call (服务器保持连接,逐行推流)
如果有多次工具调用,步骤 ④, ⑤ 会重复,每次 id 都会改变。
JSON-RPC 协议格式
类型 | 字段 | 说明 |
---|---|---|
请求 | jsonrpc | 固定为 “2.0” |
id | 请求编号,用于对应请求与响应 | |
method | 要调用的方法名(如: “tools/call”) | |
params | 方法参数(可以是对象或数组) | |
响应 | jsonrpc | 也要写 “2.0” |
id | 与请求的 ID 一致 | |
result | 成功返回值(只需 result) | |
error | 如果出错则返回 error 对象 |
2.2.1. 服务端代码编写
import argparse
import asyncio
import json
import uvicorn
import requestsfrom fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponseSERVER_NAME = "WeatherServer" # 定义服务器名称
SERVER_VERSION = "1.0.0" # 定义服务器版本
PROTOCOL_VERSION = "2025-05-16" # 定义协议版本号
TOOLS_REGISTRY = {"tools": [{"name": "get_weather","description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。","inputSchema": {"type": "object","properties": {"city": {"type": "string","description": "City name, e.g. 'Hangzhou'"}},"required": ["city"]}}],"nextCursor": None
}# 编写请求天气函数
async def fetch_weather(city: str):try:url="https://api.seniverse.com/v3/weather/now.json"params={"key": "你注册的心知天气api", # 无"location": city,"language": "zh-Hans","unit": "c"}response = requests.get(url, params=params)temperature = response.json()['results'][0]['now']except Exception:return "error"return json.dumps(temperature)# 使用生成器将请求天气改写为流传输的形式
async def stream_weather(city: str, req_id: int | str):yield json.dumps({"jsonrpc": "2.0", "id": req_id, "stream": f"查询 {city} 天气中…"}).encode() + b"\n"await asyncio.sleep(0.3)data = await fetch_weather(city)if data == "error":yield json.dumps({"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": data["error"]}}).encode() + b"\n"returnyield json.dumps({"jsonrpc": "2.0", "id": req_id,"result": {"content": [{"type": "text", "text": data}],"isError": False}}).encode() + b"\n"app = FastAPI(title=SERVER_NAME)@app.get("/mcp")
async def mcp_initialize_via_get():# GET 请求也执行了 initialize 方法, 对应步骤1return {"jsonrpc": "2.0","id": 0,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True,"tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME,"version": SERVER_VERSION},"instructions": "Use the get_weather tool to fetch weather by city name."}}@app.post("/mcp")
async def mcp_endpoint(request: Request):try:body = await request.json()# 打印客户端的请求内容print("收到请求:\n", json.dumps(body, ensure_ascii=False, indent=2))except Exception as ex:print("异常请求", str(ex))return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}}req_id = body.get("id", 1)method = body.get("method")# 打印当前方法类型print(f"方法: {method}")if not method:return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP server online."}}if method == "initialize": # 对应步骤 1,请求建立连接return {"jsonrpc": "2.0", "id": req_id,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True,"tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},"instructions": "Use the get_weather tool to fetch weather by city name."}}elif method == "notifications/initialized": # 对应步骤 2,连接建立初始化return Response(status_code=status.HTTP_204_NO_CONTENT)elif method == "tools/list": # 对应步骤 3,向服务器请求函数工具清单print(json.dumps(TOOLS_REGISTRY, indent=2, ensure_ascii=False))return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}elif method == "tools/call": # 对应步骤 4和步骤 5,客户端发送工具调用请求params = body.get("params", {})tool_name = params.get("name")args = params.get("arguments", {})if tool_name != "get_weather":return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": "Unknown tool"}}city = args.get("city")if not city:return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602, "message": "Missing city"}}return StreamingResponse(stream_weather(city, req_id), media_type="application/json")return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": "Method not found"}}if __name__ == "__main__":parser = argparse.ArgumentParser(description="Weather MCP HTTP-Stream")parser.add_argument("--host", default="127.0.0.1")parser.add_argument("--port", type=int, default=8000)args = parser.parse_args()uvicorn.run(app, host=args.host, port=args.port, log_level="info")
2.2.2. 测试运行(curl)
- 启动服务器
uv run .\src\streamable_server1.py
- initialize 能力协商请求
注意 首先将请求头中加入 Content-Type: application/json
curl -X POST http://127.0.0.1:8000/mcp \
-H 'Content-Type: application/json' \
-d '{"jsonrpc": "2.0","id": 0,"method": "initialize","params": {"protocolVersion": "2024-11-05"}
}'curl.exe -X POST http://127.0.0.1:8000/mcp -H 'Content-Type: application/json' -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"initialize\",\"params\": {\"protocolVersion\": \"2024-11-05\"}}'
返回结果:
{"jsonrpc": "2.0","id": 0,"result": {"protocolVersion": "2025-05-16","capabilities": {"streaming": true,"tools": {"listChanged": true}},"serverInfo": {"name": "WeatherServer","version": "1.0.0"},"instructions": "Use the get_weather tool to fetch weather by city name."}
}
- notifications/initialized 通知
确认客户端成功连接服务器,因为只是通知类型,服务器会返回状态码为 204 的空包:
curl -X POST http://127.0.0.1:8000/mcp \
-H 'Content-Type: application/json' \
-d '{"jsonrpc": "2.0","id": 0,"method": "notifications/initialized"
}'curl.exe -X POST http://127.0.0.1:8000/mcp -H 'Content-Type: application/json' -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"notifications/initialized\"}'
返回结果:
无
- tools/list 请求
获取工具注册表。我们期望可以获得 get_weather 工具结构体和 json schema 的工具注册表
curl -X POST http://127.0.0.1:8000/mcp \
-H 'Content-Type: application/json' \
-d '{"jsonrpc": "2.0","id": 0,"method": "tools/list"
}'curl.exe -X POST http://127.0.0.1:8000/mcp -H 'Content-Type: application/json' -d '{\"jsonrpc\": \"2.0\",\"id\": 0,\"method\": \"tools/list\"}'
返回结果:
{"jsonrpc": "2.0","id": 0,"result": {"tools": [{"name": "get_weather","description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。","inputSchema": {"type": "object","properties": {"city": {"type": "string","description": "City name, e.g. 'Hangzhou'"}},"required": ["city"]}}],"nextCursor": null}
}
2.3. 服务器实例(requests + Starlette)
https://blog.csdn.net/shanxuanang/article/details/146905413
https://zhuanlan.zhihu.com/p/1911039530187855274
2.3.1. 服务器代码编写
import json
import uuid
from datetime import datetime
import asyncio
import aiofiles
import randomfrom starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from starlette.responses import JSONResponse, StreamingResponse
from starlette.middleware.cors import CORSMiddlewarefrom typing import Dict, Any# 存储会话ID和对应的任务队列
sessions: Dict[str, Dict[str, Any]] = {}# 添加CORS支持
app = Starlette()
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],expose_headers=["Mcp-Session-Id"],
)@app.route('/message', methods=["POST", "GET"])
async def handle_message(request: Request):"""处理POST和GET请求。"""session_id = request.headers.get("Mcp-Session-Id") or request.query_params.get("Mcp-Session-Id")print("Message Session ID", request.method, session_id)if request.method == "POST":try:data = await request.json()if data.get("method") == "initialize":# 初始化会话session_id = str(uuid.uuid4())sessions[session_id] = {"initialized": True, "task_queue": asyncio.Queue()}response = JSONResponse(content={"jsonrpc": "2.0", "id": data.get("id"),"result": {"serverInfo": {"name": "MCP Server", "version": "1.0"},"capabilities": {},},})response.headers["Mcp-Session-Id"] = session_idreturn responseelif session_id and sessions.get(session_id, {}).get("initialized"):# 处理已初始化的请求if data.get("method") == "get_file":try:# 异步读取文件内容content = await async_read_file(data.get("params", {}).get("path", ""))return JSONResponse(content={"jsonrpc": "2.0","id": data.get("id"),"result": content,})except Exception as e:return JSONResponse(content={"jsonrpc": "2.0", "id": data.get("id"),"error": f"Error reading file: {str(e)}",})else:return JSONResponse(content={"error": "Unknown method"})else:return JSONResponse(content={"error": "Session not initialized"}, status_code=400)except Exception as e:return JSONResponse(content={"error": f"Internal server error: {str(e)}"}, status_code=500)elif request.method == "GET":# 处理 SSE 流请求if not session_id or session_id not in sessions:return JSONResponse(content={"error": "Session not found"}, status_code=404)async def event_generator(session_id):while True:try:message = await asyncio.wait_for(sessions[session_id]["task_queue"].get(), timeout=10) # 超时时间10秒yield f"data: {json.dumps(message)}\n\n"except asyncio.TimeoutError as e:yield f"data: {e}\n\n" # 发送空数据作为心跳包,防止超时断开return StreamingResponse(event_generator(session_id), media_type="text/event-stream")async def async_read_file(path: str="") -> str:"""异步读取文件内容。"""try:async with aiofiles.open(path, "r", encoding="utf-8") as file:content = await file.read()return contentexcept Exception as e:raise Exception(f"Error reading file: {str(e)}")async def background_task(session_id: str, task: Dict[str, Any]):"""后台任务处理。"""# 模拟耗时操作await asyncio.sleep(1)# 将结果放入任务队列sessions[session_id]["task_queue"].put_nowait(task)@app.on_event("startup")
async def startup_event():print("Message startup_event")async def push_test_messages():while True:sp = random.randint(1, 3)await asyncio.sleep(sp) # 每5秒推送一个消息for session_id in sessions.keys():if sessions[session_id]["initialized"]:sessions[session_id]["task_queue"].put_nowait({"message": f"Hello from server!", "sleep": sp,"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")})asyncio.create_task(push_test_messages()) # 创建后台任务if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
2.3.2. 编写客户端代码
import httpx
import json
import asyncio
import aiofilesclass MCPClient:def __init__(self, server_url: str):self.server_url = server_urlself.session_id = Noneself.headers = {"Content-Type": "application/json","Accept": "text/event-stream, application/json"}print(f"Server Url: {self.server_url}")async def initialize(self):"""初始化会话。"""async with httpx.AsyncClient() as client:try:response = await client.post(f"{self.server_url}/message",headers=self.headers,json={"jsonrpc": "2.0","method": "initialize","params": {"clientInfo": {"name": "MCP Client", "version": "1.0"},"capabilities": {},},},)response.raise_for_status()self.session_id = response.headers.get("Mcp-Session-Id")print(f"Session ID: {self.session_id}")return self.session_idexcept Exception as e:print(f"Failed to initialize session: {e}")return Noneasync def send_message(self, method: str, params: dict = None):"""发送消息。"""if not self.session_id:await self.initialize()async with httpx.AsyncClient() as client:try:# print("Headers", {"Mcp-Session-Id": self.session_id, **self.headers})response = await client.post(f"{self.server_url}/message",headers={"Mcp-Session-Id": self.session_id, **self.headers},json={"jsonrpc": "2.0","id": 1,"method": method,"params": params or {},},)response.raise_for_status()return response.json()except Exception as e:print(f"Failed to send message: {e}")return Noneasync def listen_sse(self):if not self.session_id:await self.initialize()async with httpx.AsyncClient(timeout=None) as client: # 取消超时限制try:async with client.stream("GET",f"{self.server_url}/message",headers={"Mcp-Session-Id": self.session_id, **self.headers},) as response:async for line in response.aiter_lines():if line.strip(): # 避免空行print(f"SSE Message: {line}")except Exception as e:print(f"Failed to listen SSE: {e}")await self.reconnect()async def reconnect(self):"""断线重连。"""print("Attempting to reconnect...")await asyncio.sleep(5) # 等待5秒后重试await self.initialize()await self.listen_sse()async def main():client = MCPClient("http://localhost:8000")await client.initialize()# params = {"path": "./src/streamable_server.py"}params = {"path": "./src/test_file.pdf"}response = await client.send_message(method="get_file", params=params)print(f"Response: {response}")await client.listen_sse()if __name__ == "__main__":asyncio.run(main())
2.3.3. 测试运行
首先安装依赖
uv pip install aiofiles
测试运行
# 启动服务端
uv run .\src\streamable_server2.py# 启动客户端
uv run .\src\streamable_client2.py