当前位置: 首页 > news >正文

使用Python实现MCP协议Streamable HTTP详细教程

使用Python实现MCP协议Streamable HTTP详细教程

目录

  1. 概念引入
  2. 原理讲解
  3. 场景应用
  4. 代码实现
  5. 问题解决
  6. 总结提炼

概念引入

什么是MCP协议?

MCP(Model Context Protocol)是专为模型交互设计的协议,它定义了模型服务器与客户端之间的通信标准。想象一下,MCP就像是不同语言的人之间的"通用翻译器",让各种AI模型和应用能够顺畅地交流。

什么是Streamable HTTP?

Streamable HTTP是MCP协议的一个重要新特性,它基于HTTP协议实现了流式数据传输。我们可以把它比作一个"智能水管系统":

  • 传统HTTP就像是一次性送水的桶装水服务,每次只能送一整桶
  • Streamable HTTP则像是自来水管道,可以持续不断地输送数据,而且可以双向流动

这种设计特别适合需要持续交换上下文或进行多轮对话的大模型应用场景。

原理讲解

Streamable HTTP的核心机制

Streamable HTTP基于以下技术实现:

  1. HTTP/1.1 Chunked Transfer EncodingHTTP/2 Streams:这些技术允许数据分块传输,而不需要等待整个响应完成。

  2. 双向流式处理:支持客户端和服务器同时发送数据,实现真正的双向通信。

  3. Web友好性:基于标准HTTP协议,天然兼容现有的Web基础设施,如反向代理、负载均衡器等。

工作流程

Streamable HTTP MCP的工作流程如下:

客户端请求 → 建立HTTP连接 → 流式数据交换 → 关闭连接

与传统HTTP不同,Streamable HTTP在连接建立后可以持续交换多个请求和响应,而不需要为每个请求建立新连接,大大提高了通信效率。

场景应用

适用场景

Streamable HTTP特别适合以下场景:

  1. 长对话应用:需要保持上下文的多轮对话场景
  2. 实时数据流处理:如实时翻译、代码生成辅助等
  3. 分布式模型服务:多个模型服务需要协同工作
  4. Web集成场景:需要与现有Web基础设施无缝集成

实际价值

  • 提高效率:减少连接建立和关闭的开销
  • 降低延迟:流式传输可以边接收边处理
  • 增强扩展性:更容易与现有Web技术栈集成
  • 改善用户体验:支持实时反馈和交互

代码实现

环境准备

首先,我们需要安装必要的依赖。推荐使用FastMCP框架,它是Python中实现MCP协议的最佳选择。

# 安装FastMCP框架
pip install fastmcp# 或者使用uv包管理器(推荐)
pip install uv
uv init mcp-streamable-http-demo
cd mcp-streamable-http-demo
uv venv
# Windows
.venv\Scripts\activate
# macOS/Linux
source .venv/bin/activate
uv add fastmcp

基础实现

下面是一个最简单的Streamable HTTP MCP服务器实现:

# server.py
from fastmcp import FastMCP# 初始化MCP服务器
mcp = FastMCP("Demo")@mcp.tool()
def add(a: int, b: int) -> int:"""Add two numbers"""return a + bif __name__ == "__main__":mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")

这段代码只有8行,但已经实现了一个完整的Streamable HTTP MCP服务器。让我们逐行解释:

  1. 导入FastMCP框架
  2. 创建MCP服务器实例,命名为"Demo"
  3. 使用装饰器注册一个工具函数add
  4. 定义工具函数,实现两数相加
  5. 启动服务器,指定传输方式为"streamable-http"

进阶实现:获取公网IP

下面是一个更实用的例子,演示如何获取服务器的公网IP地址:

# advanced_server.py
import json
import requests
from fastmcp import FastMCP# 初始化MCP服务器
mcp = FastMCP("Network Tools")@mcp.tool()
def get_public_ip_address() -> str:"""获取服务器公网IP地址返回:str: 服务器的公网IP地址"""try:# 使用多个IP查询服务以提高可靠性services = ["https://api.ipify.org?format=json","https://httpbin.org/ip","https://ipinfo.io/json"]for service in services:try:response = requests.get(service, timeout=5)if response.status_code == 200:data = response.json()# 不同服务返回的JSON结构可能不同if "ip" in data:return data["ip"]elif "origin" in data:return data["origin"]except:continuereturn "无法获取公网IP地址"except Exception as e:return f"获取IP时发生错误: {str(e)}"@mcp.tool()
def get_ip_details(ip_address: str = None) -> str:"""获取IP地址的详细信息参数:ip_address (str, optional): 要查询的IP地址,如果不提供则查询当前公网IP返回:str: IP地址的详细信息"""try:# 如果没有提供IP地址,先获取当前公网IPif not ip_address:ip_address = get_public_ip_address()if "无法获取" in ip_address or "发生错误" in ip_address:return ip_address# 查询IP详细信息response = requests.get(f"https://ipinfo.io/{ip_address}/json", timeout=5)if response.status_code == 200:data = response.json()return json.dumps(data, indent=2, ensure_ascii=False)else:return f"查询IP {ip_address} 详细信息失败"except Exception as e:return f"查询IP详细信息时发生错误: {str(e)}"if __name__ == "__main__":mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")

文生图服务实现

下面是一个更复杂的例子,演示如何实现一个文生图服务:

# image_generation_server.py
from fastmcp import FastMCP
from fastmcp.server.dependencies import get_http_request
import requests
from starlette.requests import Request
import json
import time# 初始化MCP服务器
mcp = FastMCP("Text to Image Service")# 模拟的图像生成API地址
# 在实际应用中,这里应该是真实的API地址
IMAGE_SYNTHESIS_URL = "https://api.example.com/image-synthesis"
TASKS_URL = "https://api.example.com/tasks"# 模拟的任务存储
task_storage = {}@mcp.tool(name="文生图 - 创建任务",description="提交文生图请求,返回任务ID(task_id)。\n""参数说明:\n""prompt: 提示词(中英文,≤800字符);\n""size: 图片尺寸(如512*512,默认1:1);\n""style: 图像风格(如写实、卡通、水彩等);\n""quality: 图像质量(如standard、high)"
)
def create_image_task(prompt: str, size: str = "512*512", style: str = "写实", quality: str = "standard"
) -> dict:"""创建图像生成任务参数:prompt (str): 图像描述提示词size (str): 图像尺寸,默认为"512*512"style (str): 图像风格,默认为"写实"quality (str): 图像质量,默认为"standard"返回:dict: 包含任务ID的响应"""# 生成唯一任务IDtask_id = f"img_{int(time.time())}_{hash(prompt) % 10000}"# 存储任务信息task_storage[task_id] = {"status": "processing","prompt": prompt,"size": size,"style": style,"quality": quality,"created_at": time.time(),"result_url": None}# 在实际应用中,这里应该调用真实的图像生成API# 这里我们模拟一个异步处理过程# 实际项目中可能需要使用异步任务队列如Celeryreturn {"task_id": task_id,"status": "submitted","message": "图像生成任务已提交,请使用task_id查询结果"}@mcp.tool(name="文生图 - 查询结果",description="根据task_id查询图像生成状态与结果\n""参数说明:\n""task_id: 图像生成任务的唯一标识符"
)
def get_image_result(task_id: str) -> dict:"""查询图像生成结果参数:task_id (str): 图像生成任务的ID返回:dict: 任务状态和结果"""# 检查任务是否存在if task_id not in task_storage:return {"error": f"任务ID {task_id} 不存在"}task = task_storage[task_id]# 模拟处理过程 - 在实际应用中,这里应该查询真实APIcurrent_time = time.time()elapsed_time = current_time - task["created_at"]# 模拟处理需要5秒if elapsed_time > 5 and task["status"] == "processing":# 模拟任务完成task["status"] = "completed"task["result_url"] = f"https://example.com/images/{task_id}.png"task["completed_at"] = current_timereturn {"task_id": task_id,"status": task["status"],"prompt": task["prompt"],"size": task["size"],"style": task["style"],"quality": task["quality"],"created_at": task["created_at"],"result_url": task["result_url"],"completed_at": task.get("completed_at")}@mcp.tool(name="文生图 - 批量查询",description="批量查询多个图像生成任务的状态\n""参数说明:\n""task_ids: 图像生成任务ID列表,用逗号分隔"
)
def batch_get_results(task_ids: str) -> dict:"""批量查询图像生成结果参数:task_ids (str): 逗号分隔的任务ID列表返回:dict: 所有任务的状态和结果"""id_list = [tid.strip() for tid in task_ids.split(",") if tid.strip()]results = {}for task_id in id_list:results[task_id] = get_image_result(task_id)return {"batch_results": results,"total": len(id_list),"timestamp": time.time()}if __name__ == "__main__":mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")

客户端实现

下面是如何实现一个Streamable HTTP MCP客户端的示例:

# mcp_client.py
import requests
import json
from typing import Dict, Any, Optionalclass StreamableHTTPMCPClient:"""Streamable HTTP MCP客户端"""def __init__(self, base_url: str = "http://localhost:8000/mcp"):"""初始化MCP客户端参数:base_url (str): MCP服务器的基础URL"""self.base_url = base_urlself.session = requests.Session()def list_tools(self) -> Dict[str, Any]:"""获取可用工具列表"""response = self.session.post(f"{self.base_url}/tools/list",json={})return response.json()def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:"""调用指定工具参数:tool_name (str): 工具名称arguments (Dict[str, Any]): 工具参数返回:Dict[str, Any]: 工具执行结果"""payload = {"name": tool_name,"arguments": arguments}response = self.session.post(f"{self.base_url}/tools/call",json=payload)return response.json()def close(self):"""关闭客户端连接"""self.session.close()# 使用示例
if __name__ == "__main__":# 创建客户端实例client = StreamableHTTPMCPClient()try:# 获取可用工具列表tools = client.list_tools()print("可用工具:")for tool in tools.get("tools", []):print(f"- {tool.get('name')}: {tool.get('description')}")# 调用加法工具result = client.call_tool("add", {"a": 10, "b": 20})print("\n加法结果:", result)# 调用获取公网IP工具ip_result = client.call_tool("get_public_ip_address", {})print("\n公网IP:", ip_result)# 创建图像生成任务image_task = client.call_tool("文生图 - 创建任务", {"prompt": "一只可爱的小猫在花园里玩耍","size": "512*512","style": "卡通"})print("\n图像任务创建结果:", image_task)# 查询图像生成结果if "task_id" in image_task:task_id = image_task["task_id"]image_result = client.call_tool("文生图 - 查询结果", {"task_id": task_id})print("\n图像生成结果:", image_result)finally:# 关闭客户端连接client.close()

问题解决

常见问题1:连接超时

问题描述:客户端连接服务器时出现超时错误。

解决方案

  1. 检查服务器是否正在运行
  2. 确认端口号和IP地址是否正确
  3. 检查防火墙设置
# 增加超时设置和重试机制
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retryclass RobustMCPClient(StreamableHTTPMCPClient):def __init__(self, base_url: str = "http://localhost:8000/mcp", max_retries: int = 3):super().__init__(base_url)# 配置重试策略retry_strategy = Retry(total=max_retries,backoff_factor=1,status_forcelist=[429, 500, 502, 503, 504],)adapter = HTTPAdapter(max_retries=retry_strategy)self.session.mount("http://", adapter)self.session.mount("https://", adapter)def call_tool(self, tool_name: str, arguments: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:"""调用工具,增加超时设置"""payload = {"name": tool_name,"arguments": arguments}try:response = self.session.post(f"{self.base_url}/tools/call",json=payload,timeout=timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:return {"error": f"请求失败: {str(e)}"}

常见问题2:数据序列化错误

问题描述:传递复杂数据结构时出现序列化错误。

解决方案

  1. 确保所有参数都是JSON可序列化的
  2. 对于特殊对象,实现自定义序列化方法
import json
from datetime import datetimeclass CustomJSONEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime):return obj.isoformat()# 添加其他自定义类型的处理return super().default(obj)# 在客户端中使用自定义编码器
def call_tool_with_custom_serialization(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:"""使用自定义序列化调用工具"""try:payload = {"name": tool_name,"arguments": arguments}# 使用自定义编码器序列化数据json_data = json.dumps(payload, cls=CustomJSONEncoder)response = self.session.post(f"{self.base_url}/tools/call",data=json_data,headers={"Content-Type": "application/json"})return response.json()except Exception as e:return {"error": f"序列化错误: {str(e)}"}

常见问题3:并发访问限制

问题描述:高并发场景下服务器响应缓慢或拒绝连接。

解决方案

  1. 实现连接池
  2. 添加速率限制
  3. 使用异步客户端
import asyncio
import aiohttp
from typing import Listclass AsyncMCPClient:"""异步MCP客户端,支持高并发"""def __init__(self, base_url: str = "http://localhost:8000/mcp", max_connections: int = 100):self.base_url = base_urlself.connector = aiohttp.TCPConnector(limit=max_connections)async def call_tool(self, session: aiohttp.ClientSession, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:"""异步调用工具"""payload = {"name": tool_name,"arguments": arguments}async with session.post(f"{self.base_url}/tools/call",json=payload) as response:return await response.json()async def batch_call_tools(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:"""批量调用工具"""async with aiohttp.ClientSession(connector=self.connector) as session:tasks = [self.call_tool(session, req["tool_name"], req["arguments"])for req in requests]return await asyncio.gather(*tasks)async def close(self):"""关闭客户端"""await self.connector.close()# 使用示例
async def main():client = AsyncMCPClient()try:# 准备批量请求batch_requests = [{"tool_name": "add", "arguments": {"a": i, "b": i*2}},{"tool_name": "get_public_ip_address", "arguments": {}}for i in range(10)]# 执行批量请求results = await client.batch_call_tools(batch_requests)for i, result in enumerate(results):print(f"请求 {i+1} 结果:", result)finally:await client.close()if __name__ == "__main__":asyncio.run(main())

总结提炼

核心步骤

实现Python MCP协议Streamable HTTP的关键步骤:

  1. 环境准备:安装FastMCP框架和相关依赖
  2. 服务器创建:使用FastMCP创建MCP服务器实例
  3. 工具注册:使用@mcp.tool()装饰器注册工具函数
  4. 服务器启动:指定transport="streamable-http"启动服务器
  5. 客户端实现:创建HTTP客户端与服务器交互

适用场景

Streamable HTTP MCP特别适合:

  • 需要持续交互的应用:如聊天机器人、代码助手
  • 实时数据流处理:如实时翻译、数据分析
  • 分布式系统:多个服务需要协同工作
  • Web集成场景:需要与现有Web基础设施无缝集成

最佳实践

  1. 错误处理:实现完善的错误处理和重试机制
  2. 性能优化:使用连接池和异步处理提高性能
  3. 安全考虑:添加认证和授权机制保护服务
  4. 监控日志:实现详细的日志记录和监控
http://www.dtcms.com/a/523676.html

相关文章:

  • JMeter测试HTTP GET(附实例)
  • 保定网站建设系统wordpress 后台速度优化
  • 【OS笔记21】:处理机调度3-进程调度
  • Flutter中Key的作用以及应用场景
  • linux ubuntu 报错findfont: Font family ‘Times New Roman‘ not found.
  • 基于单片机的滴速液位输液报警系统
  • 如何通过 C# 高效读写 Excel 工作表
  • 【final、finally和 finalize的区别】
  • JVM直接内存和堆内存比例如何设置?
  • Spring Boot 启动时,JVM 是如何工作的?
  • 个性化网站建设开发李沧建网站公司
  • 益品康丰集团:以科技重塑康养未来,让健康触手可及
  • 华为Watch GT 6:运动与科技的完美融合
  • 微算法科技(NASDAQ MLGO)开发基于区块链的差分优化联邦增量学习算法,提高机器学习的性能与安全性
  • 《水龙吟》开播即热 李家豪化身“阳光侠客”点亮玄侠江湖
  • Linux基础 -- UBI模块之 leb_read_sanity_check函数说明
  • 深入解析 Transformer 模型:以 ChatGPT 为例从词嵌入到输出预测的大语言模型核心工作机制
  • 破局延时任务(上):为什么选择Spring Boot + DelayQueue来自研分布式延时队列组件?
  • 云手机是一种应用软件吗?
  • 工业无线通信突破!SG-Lora-TCP 模块,7 公里无线替代 TCP 布线
  • 网站建设 服务内容 费用上海有几个区最好
  • 现代前端状态管理深度剖析:从单一数据源到分布式状态
  • UART 串口协议详解与 STM32 实战实现
  • 【CMakeLists.txt】QtSvg 头文件包含配置详解
  • 调用Zlib库接口压缩、解压缩(C++源码)
  • flume的log4j日志无输出排查
  • 一个域名可以做两个网站吗天津人事考试网
  • whisper 模型处理音频办法与启示
  • linux rt任务调度器
  • 金融智能体技术解读:十大应用场景与AI Agent架构设计思路