当前位置: 首页 > news >正文

MCP基础知识二(实战通信方式之Streamable HTTP)

介绍

MCP 使用 JSON-RPC 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输,并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。其中SSE被废弃了(Server-Sent Events (SSE) - Deprecated)

SSE as a standalone transport is deprecated as of protocol version 2024-11-05. It has been replaced by Streamable HTTP, which incorporates SSE as an optional streaming mechanism. For backwards compatibility information, see the backwards compatibility section below.

自协议版本 2024-11-05 起,SSE 作为独立传输已被弃用。它已被流式 HTTP 取代,后者将 SSE 作为可选的流式机制。有关向后兼容性信息,请参阅下方的向后兼容性部分。

MCP官网:Transports - Model Context Protocol

实战

1.编写一个py代码

from __future__ import annotations
import argparse
import asyncio
import json
from typing import Any, AsyncIterator
import httpx
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponse
# ---------------------------------------------------------------------------
# Server constants
# ---------------------------------------------------------------------------
SERVER_NAME = "WeatherServer"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2024-11-05" # Cherry Studio current
# ---------------------------------------------------------------------------
# Weather helpers
# ---------------------------------------------------------------------------
OPENWEATHER_URL = "https://api.openweathermap.org/data/2.5/weather"
API_KEY: str | None = None
USER_AGENT = "weather-app/1.0"
async def fetch_weather(city: str) -> dict[str, Any]:if not API_KEY:return {"error": "API_KEY 未设置,请提供有效的 OpenWeather API Key。"}params = {"q": city, "appid": API_KEY, "units": "metric", "lang": "zh_cn"}headers = {"User-Agent": USER_AGENT}async with httpx.AsyncClient(timeout=30.0) as client:try:r = await client.get(OPENWEATHER_URL, params=params, headers=headers)r.raise_for_status()return r.json()except httpx.HTTPStatusError as exc:return {"error": f"HTTP 错误: {exc.response.status_code} - {exc.response.text}"}except httpx.RequestError as exc:return {"error": f"网络请求失败: {exc}"}except json.JSONDecodeError:return {"error": "OpenWeather 返回的数据不是有效的 JSON"}def format_weather(data: dict[str, Any]) -> str:if "error" in data:return data["error"]city = data.get("name", "未知")country = data.get("sys", {}).get("country", "未知")temp = data.get("main", {}).get("temp", "N/A")humidity = data.get("main", {}).get("humidity", "N/A")wind = data.get("wind", {}).get("speed", "N/A")desc = data.get("weather", [{}])[0].get("description", "未知")return (f"🌍 {city}, {country}\n"f"🌡 温度: {temp}°C\n"f"💧 湿度: {humidity}%\n"f"🌬 风速: {wind} m/s\n"f"🌤 天气: {desc}")async def stream_weather(city: str, req_id: int | str) -> AsyncIterator[bytes]:# 进度提示(不会出错的部分放在try外面)yield json.dumps({"jsonrpc": "2.0","id": req_id,"stream": f"查询{city}天气中…"}).encode() + b"\n"try:await asyncio.sleep(0.3)data = await fetch_weather(city)  # 主要可能出错点if "error" in data:yield json.dumps({"jsonrpc": "2.0","id": req_id,"error": {"code": -32002,  # 保留业务错误码"message": data["error"]}}).encode() + b"\n"return# 正常结果(json.dumps理论上不会出错)yield json.dumps({"jsonrpc": "2.0","id": req_id,"result": {"content": [{"type": "text", "text": format_weather(data)}],"isError": False}}).encode() + b"\n"except Exception as exc:# 系统级错误单独处理yield json.dumps({"jsonrpc": "2.0","id": req_id,"error": {"code": -32003,  # 保留系统错误码"message": f"服务器内部错误: {str(exc)}"}}).encode() + b"\n"# ---------------------------------------------------------------------------# FastAPI app# ---------------------------------------------------------------------------
app = FastAPI(title="WeatherServer HTTP-Stream v8")
TOOLS_REGISTRY = {"tools": [{"name": "get_weather","description": "用于进行天气信息查询的函数,输入城市英文名称,即可获得当前城市天气信息。","inputSchema": {"type": "object","properties": {"city": {"type": "string","description": "City name, e.g. 'Hangzhou'"}},"required": ["city"]}}],"nextCursor": "qinqing"
}
@app.get("/mcp")
async def mcp_initialize_via_get():
# GET 请求也执行了 initialize 方法return {"jsonrpc": "2.0","id": 0,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True,"tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME,"version": SERVER_VERSION},"instructions": "Use the get_weather tool to fetch weather by cityname."}
}
@app.post("/mcp")
async def mcp_endpoint(request: Request):try:body = await request.json()# ✅ 打印客户端请求内容print("💡 收到请求:", json.dumps(body, ensure_ascii=False, indent=2))except Exception:return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700,"message": "Parse error"}}req_id = body.get("id", 1)method = body.get("method")# ✅ 打印当前方法类型print(f"🔧 方法: {method}")# 0) Ignore initialized notification (no response required)if method == "notifications/initialized":return Response(status_code=status.HTTP_204_NO_CONTENT)# 1) Activation probe (no method)if method is None:return {"jsonrpc": "2.0", "id": req_id, "result": {"status": "MCP serveronline."}}# 2) initializeif method == "initialize":return {"jsonrpc": "2.0","id": req_id,"result": {"protocolVersion": PROTOCOL_VERSION,"capabilities": {"streaming": True,"tools": {"listChanged": True}},"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},"instructions": "Use the get_weather tool to fetch weather bycity name."}}# 3) tools/listif method == "tools/list":print(json.dumps(TOOLS_REGISTRY, indent=2, ensure_ascii=False))return {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_REGISTRY}# 4) tools/callif method == "tools/call":params = body.get("params", {})tool_name = params.get("name")args = params.get("arguments", {})if tool_name != "get_weather":return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602,"message": "Unknown tool"}}city = args.get("city")if not city:return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32602,"message": "Missing city"}}# return StreamingResponse(stream_weather(city, req_id),media_type="application/json")data = await fetch_weather(city)return {"jsonrpc": "2.0","id": req_id,"result": {"content": [{"type": "text", "text": format_weather(data)}],"isError": False}}# 5) unknown methodreturn {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message":"Method not found"}}
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def main() -> None:parser = argparse.ArgumentParser(description="Weather MCP HTTP-Stream v8")parser.add_argument("--api_key", required=False,default='xxxx')parser.add_argument("--host", default="127.0.0.1")parser.add_argument("--port", type=int, default=8000)args = parser.parse_args()global API_KEYAPI_KEY = args.api_keyimport uvicornuvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":# npx -y @modelcontextprotocol/inspector uv run weather2.pymain()

其中api_key 需要自己申请即可,可以免费调用大概1000次(测试和学习基本够用了) 

2. 使用mcp的检查器进行测试

npx -y @modelcontextprotocol/inspector uv run xxxx.py

参考官网:Inspector - Model Context Protocol

3. 将py进行启动运行

4. 检查器启动成功之后进行测试

简单的例子Streamable HTTP就成功了 ~

http://www.dtcms.com/a/279326.html

相关文章:

  • 【CTF学习】PWN基础工具的使用(binwalk、foremost、Wireshark、WinHex)
  • ewdyfdfytty
  • LangChain教程——文本嵌入模型
  • 20250714让荣品RD-RK3588开发板在Android13下长按关机
  • Debezium日常分享系列之:提升Debezium性能
  • 制造业实战:数字化集采如何保障千种备件“不断供、不积压”?
  • 16.避免使用裸 except
  • MFC扩展库BCGControlBar Pro v36.2新版亮点:可视化设计器升级
  • 计算机毕业设计Java轩辕购物商城管理系统 基于 SpringBoot 的轩辕电商商城管理系统 Java 轩辕购物平台管理系统设计与实现
  • 面向对象的设计模式
  • 【数据结构】树(堆)·上
  • js的局部变量和全局变量
  • 测试驱动开发(TDD)实战:在 Spring 框架实现中践行 “红 - 绿 - 重构“ 循环
  • Bash vs PowerShell | 从 CMD 到跨平台工具:Bash 与 PowerShell 的全方位对比
  • vue3 服务端渲染时请求接口没有等到数据,但是客户端渲染是请求接口又可以得到数据
  • 7.14 map | 内存 | 二维dp | 二维前缀和
  • python+Request提取cookie
  • 电脑升级Experience
  • python transformers笔记(Trainer类)
  • 代码随想录算法训练营第三十五天|416. 分割等和子集
  • LLM表征工程还有哪些值得做的地方
  • 内部文件审计:企业文件服务器审计对网络安全提升有哪些帮助?
  • 防火墙技术概述
  • Qt轮廓分析设计+算法+避坑
  • Redis技术笔记-主从复制、哨兵与持久化实战指南
  • 第五章 uniapp实现兼容多端的树状族谱关系图,剩余组件
  • 学习C++、QT---25(QT中实现QCombobox库的介绍和用QCombobox设置编码和使用编码的讲解)
  • SQL ORM映射框架深度剖析:从原理到实战优化
  • 【Unity】MiniGame编辑器小游戏(十三)最强射手【Shooter】(下)
  • ElasticSearch重置密码