当前位置: 首页 > news >正文

MCP传输机制完全指南:Stdio、SSE、Streamable HTTP详解-实践案例-整体对比

MCP传输机制完全指南:Stdio、SSE、Streamable HTTP详解-实践案例-整体对比

在AI应用开发的世界里,选择合适的通信方式就像为不同的交通需求选择最佳的交通工具。MCP(Model Context Protocol)提供了三种主要的传输机制:StdioSSEStreamable HTTP,每种都有其独特的优势和适用场景。本文将深入探讨这三种协议的原理、实现方式,并通过Python代码示例帮助您理解如何在实际项目中应用它们。

1. Stdio协议:本地高效的数据交互

原理概述

Stdio(Standard Input/Output)协议基于操作系统的标准输入输出机制,是最直接、最简单的进程间通信方式。它通过标准输入流接收数据,通过标准输出流返回结果,整个过程都在本地内存中完成。

核心优势

  • 零配置门槛:无需复杂的网络设置或第三方库
  • 超低延迟:数据直接通过内存管道传输
  • 绝对安全:数据不离开本地环境,隐私保护最佳
  • 广泛兼容:几乎所有编程语言和操作系统都原生支持

局限性

传输效率相对较低,在处理大量数据时,频繁的输入输出操作容易成为性能瓶颈。无法同时处理多个请求,不适合大规模应用;
限定于本地只能在同一台电脑上使用,无法满足远程数据交互的需求(访问不了其他电脑或者云端资源)

适用场景

本地小工具集成、处理个人隐私数据、快速做功能Demo看效果等。

Python实现示例

服务端(MCP Server)
import sys
import json
import loggingclass StdioMCPServer:def __init__(self):self.methods = {"get_user_info": self.get_user_info,"process_data": self.process_data}def get_user_info(self, params):"""获取用户信息"""user_id = params.get("user_id", "default")return {"user_id": user_id,"name": f"User_{user_id}","status": "active"}def process_data(self, params):"""处理数据"""data = params.get("data", [])result = sum(data) if isinstance(data, list) else 0return {"result": result, "count": len(data)}def handle_request(self, request):"""处理请求"""try:method = request.get("method")params = request.get("params", {})if method in self.methods:result = self.methods[method](params)return {"id": request.get("id"),"result": result,"error": None}else:return {"id": request.get("id"),"result": None,"error": f"Method {method} not found"}except Exception as e:return {"id": request.get("id"),"result": None,"error": str(e)}def run(self):"""运行服务器"""for line in sys.stdin:try:request = json.loads(line.strip())response = self.handle_request(request)print(json.dumps(response), flush=True)except json.JSONDecodeError:error_response = {"id": None,"result": None,"error": "Invalid JSON"}print(json.dumps(error_response), flush=True)if __name__ == "__main__":server = StdioMCPServer()server.run()
客户端(MCP Client)
import subprocess
import json
import uuidclass StdioMCPClient:def __init__(self, server_script):self.process = subprocess.Popen(["python", server_script],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True,bufsize=0)def call_method(self, method, params=None):"""调用远程方法"""request = {"id": str(uuid.uuid4()),"method": method,"params": params or {}}# 发送请求request_json = json.dumps(request) + "\n"self.process.stdin.write(request_json)self.process.stdin.flush()# 接收响应response_line = self.process.stdout.readline()response = json.loads(response_line.strip())if response.get("error"):raise Exception(f"Server error: {response['error']}")return response.get("result")def close(self):"""关闭连接"""self.process.stdin.close()self.process.stdout.close()self.process.terminate()# 使用示例
if __name__ == "__main__":client = StdioMCPClient("stdio_server.py")try:# 获取用户信息user_info = client.call_method("get_user_info", {"user_id": "123"})print(f"用户信息: {user_info}")# 处理数据result = client.call_method("process_data", {"data": [1, 2, 3, 4, 5]})print(f"处理结果: {result}")finally:client.close()

2. SSE协议:实时数据推送的利器

原理概述

SSE(Server-Sent Events)是一种基于HTTP的单向通信协议,服务器可以主动向客户端推送数据。它建立一个持久的HTTP连接,服务器通过这个连接持续发送事件数据给客户端。

核心优势

  • 实时性强:服务器可以立即推送数据更新
  • 网络友好:基于HTTP,容易穿越防火墙和代理
  • 实现简单:相比WebSocket更轻量级
  • 自动重连:浏览器会自动处理连接中断和重连

局限性

  • **单向传输协议:**客户端无法主动向服务器发送数据,这限制了其在双向交互场景中的应用。
  • **服务器压力大:**需要一直建立连接,如果有多个客户端,服务器负担过重;
  • **兼容问题:**对浏览器的兼容性存在一定差异,在一些老旧浏览器中可能无法正常使用;
  • **即将“淘汰”:**官方觉得有更好的替换方案,准备用下面要讲的第三种 Streamable HTTP来代替。

适用场景

适用于需要服务器实时推送通知到客户端的远程服务器。

Python实现示例

服务端(Flask SSE Server)
from flask import Flask, Response, request
import json
import time
import threading
import queueapp = Flask(__name__)class SSEMCPServer:def __init__(self):self.clients = {}  # 存储客户端连接self.message_queue = queue.Queue()self.running = True# 启动消息处理线程self.message_thread = threading.Thread(target=self._process_messages)self.message_thread.daemon = Trueself.message_thread.start()def _process_messages(self):"""处理消息队列"""while self.running:try:message = self.message_queue.get(timeout=1)self._broadcast_message(message)self.message_queue.task_done()except queue.Empty:continuedef _broadcast_message(self, message):"""广播消息给所有客户端"""disconnected_clients = []for client_id, client_queue in self.clients.items():try:client_queue.put(message)except:disconnected_clients.append(client_id)# 清理断开的客户端for client_id in disconnected_clients:del self.clients[client_id]def add_client(self, client_id):"""添加客户端"""self.clients[client_id] = queue.Queue()return self.clients[client_id]def remove_client(self, client_id):"""移除客户端"""if client_id in self.clients:del self.clients[client_id]def push_message(self, message_type, data):"""推送消息"""message = {"type": message_type,"data": data,"timestamp": time.time()}self.message_queue.put(message)# 全局服务器实例
sse_server = SSEMCPServer()@app.route('/events')
def events():"""SSE端点"""client_id = request.args.get('client_id', 'default')client_queue = sse_server.add_client(client_id)def event_stream():try:# 发送连接确认yield f"data: {json.dumps({'type': 'connected', 'client_id': client_id})}\n\n"while True:try:# 获取消息(超时1秒)message = client_queue.get(timeout=1)yield f"data: {json.dumps(message)}\n\n"client_queue.task_done()except queue.Empty:# 发送心跳yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': time.time()})}\n\n"except GeneratorExit:breakfinally:sse_server.remove_client(client_id)return Response(event_stream(),mimetype='text/event-stream',headers={'Cache-Control': 'no-cache','Connection': 'keep-alive','Access-Control-Allow-Origin': '*'})@app.route('/push', methods=['POST'])
def push_message():"""推送消息API"""data = request.get_json()message_type = data.get('type', 'message')message_data = data.get('data', {})sse_server.push_message(message_type, message_data)return {"status": "success", "message": "Message pushed"}@app.route('/status')
def status():"""服务器状态"""return {"status": "running","connected_clients": len(sse_server.clients),"timestamp": time.time()}if __name__ == "__main__":app.run(host='0.0.0.0', port=5000, threaded=True)
客户端
import requests
import sseclient
import json
import threading
import timeclass SSEMCPClient:def __init__(self, server_url, client_id=None):self.server_url = server_urlself.client_id = client_id or f"client_{int(time.time())}"self.event_handlers = {}self.running = Falseself.sse_thread = Nonedef on(self, event_type, handler):"""注册事件处理器"""self.event_handlers[event_type] = handlerdef _handle_message(self, message):"""处理接收到的消息"""try:data = json.loads(message.data)event_type = data.get('type')if event_type in self.event_handlers:self.event_handlers[event_type](data)elif 'default' in self.event_handlers:self.event_handlers['default'](data)except json.JSONDecodeError:print(f"Invalid JSON received: {message.data}")def _sse_listener(self):"""SSE监听线程"""url = f"{self.server_url}/events?client_id={self.client_id}"while self.running:try:response = requests.get(url, stream=True)client = sseclient.SSEClient(response)for message in client.events():if not self.running:breakself._handle_message(message)except Exception as e:print(f"SSE连接错误: {e}")if self.running:time.sleep(5)  # 等待5秒后重连def start(self):"""启动客户端"""self.running = Trueself.sse_thread = threading.Thread(target=self._sse_listener)self.sse_thread.daemon = Trueself.sse_thread.start()print(f"SSE客户端已启动,ID: {self.client_id}")def stop(self):"""停止客户端"""self.running = Falseif self.sse_thread:self.sse_thread.join()print("SSE客户端已停止")def send_message(self, message_type, data):"""向服务器发送消息(通过HTTP POST)"""url = f"{self.server_url}/push"payload = {"type": message_type,"data": data}response = requests.post(url, json=payload)return response.json()# 使用示例
if __name__ == "__main__":client = SSEMCPClient("http://localhost:5000")# 注册事件处理器def on_connected(data):print(f"已连接到服务器: {data}")def on_message(data):print(f"收到消息: {data}")def on_heartbeat(data):print(".", end="", flush=True)client.on('connected', on_connected)client.on('message', on_message)client.on('heartbeat', on_heartbeat)# 启动客户端client.start()try:# 模拟发送消息time.sleep(2)client.send_message('test', {'content': 'Hello from client!'})# 保持连接time.sleep(30)except KeyboardInterrupt:passfinally:client.stop()

3. Streamable HTTP协议:现代分布式系统的首选

原理概述

Streamable HTTP协议基于HTTP协议实现流式数据传输,允许数据以流的形式在客户端和服务器间传输。它特别适合处理大量数据或需要边传输边处理的场景。

核心优势

  • 高效流式传输:支持大文件和持续数据流
  • 现代架构友好:完美配合云函数、负载均衡器等
  • 双向通信:同时支持请求和响应流
  • 资源优化:内存使用更高效

局限性

  • **网络环境的依赖性较高:**在网络不稳定的情况下,可能会出现卡顿、缓冲等问题,影响数据的连续性和流畅性。
  • **稍复杂:**由于数据是按流传输,在数据完整性校验和错误恢复方面相对复杂,一旦出现数据丢失或错误,修复难度较大,可能需要开发者自己多做一些努力。

适用场景

Streamable HTTP 协议凭借其灵活高效的数据传输特性,成为云函数(比如 AWS Lambda)、需要根据负载自动伸缩的分布式系统以及无状态服务架构等场景中远程通信的理想选择。

Python实现示例

服务端(FastAPI Streaming Server)
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import StreamingResponse
import json
import asyncio
import time
from typing import Generator, AsyncGenerator
import uuidapp = FastAPI(title="Streamable HTTP MCP Server")class StreamableMCPServer:def __init__(self):self.active_streams = {}self.data_processors = {"process_large_dataset": self.process_large_dataset,"real_time_analysis": self.real_time_analysis,"file_processing": self.file_processing}async def process_large_dataset(self, params) -> AsyncGenerator[str, None]:"""处理大型数据集"""dataset_size = params.get("size", 1000)batch_size = params.get("batch_size", 100)for i in range(0, dataset_size, batch_size):batch_data = {"batch_id": i // batch_size,"start_index": i,"end_index": min(i + batch_size, dataset_size),"processed_count": min(batch_size, dataset_size - i),"timestamp": time.time()}# 模拟处理时间await asyncio.sleep(0.1)yield f"data: {json.dumps(batch_data)}\n\n"# 发送完成信号completion_data = {"status": "completed","total_processed": dataset_size,"timestamp": time.time()}yield f"data: {json.dumps(completion_data)}\n\n"async def real_time_analysis(self, params) -> AsyncGenerator[str, None]:"""实时分析数据流"""duration = params.get("duration", 30)  # 运行时长(秒)interval = params.get("interval", 1)   # 数据间隔(秒)start_time = time.time()counter = 0while time.time() - start_time < duration:counter += 1analysis_result = {"id": counter,"timestamp": time.time(),"value": counter * 1.5 + (time.time() % 10),"trend": "increasing" if counter % 3 == 0 else "stable","alerts": [] if counter % 5 != 0 else ["threshold_exceeded"]}yield f"data: {json.dumps(analysis_result)}\n\n"await asyncio.sleep(interval)# 发送分析总结summary = {"status": "analysis_completed","total_points": counter,"duration": time.time() - start_time,"average_value": counter * 0.75}yield f"data: {json.dumps(summary)}\n\n"async def file_processing(self, params) -> AsyncGenerator[str, None]:"""文件处理流"""file_size = params.get("file_size", 10000)  # 模拟文件大小chunk_size = params.get("chunk_size", 1024)processed_bytes = 0chunk_count = 0while processed_bytes < file_size:current_chunk_size = min(chunk_size, file_size - processed_bytes)processed_bytes += current_chunk_sizechunk_count += 1progress_data = {"chunk_id": chunk_count,"processed_bytes": processed_bytes,"total_bytes": file_size,"progress_percent": (processed_bytes / file_size) * 100,"processing_rate": f"{current_chunk_size} bytes/chunk","timestamp": time.time()}yield f"data: {json.dumps(progress_data)}\n\n"await asyncio.sleep(0.05)  # 模拟处理时间# 发送完成信息completion_info = {"status": "file_processed","total_chunks": chunk_count,"total_bytes": processed_bytes,"processing_time": "simulated","timestamp": time.time()}yield f"data: {json.dumps(completion_info)}\n\n"# 全局服务器实例
streaming_server = StreamableMCPServer()@app.get("/stream/{method}")
async def stream_method(method: str, size: int = 1000, batch_size: int = 100, duration: int = 30, interval: int = 1,file_size: int = 10000, chunk_size: int = 1024):"""流式方法调用"""if method not in streaming_server.data_processors:raise HTTPException(status_code=404, detail=f"Method {method} not found")# 构建参数params = {"size": size,"batch_size": batch_size,"duration": duration,"interval": interval,"file_size": file_size,"chunk_size": chunk_size}# 生成唯一的流IDstream_id = str(uuid.uuid4())async def generate_stream():try:streaming_server.active_streams[stream_id] = True# 发送流开始信号start_signal = {"stream_id": stream_id,"method": method,"status": "stream_started","params": params,"timestamp": time.time()}yield f"data: {json.dumps(start_signal)}\n\n"# 执行流式处理processor = streaming_server.data_processors[method]async for chunk in processor(params):if stream_id not in streaming_server.active_streams:breakyield chunkexcept Exception as e:error_data = {"stream_id": stream_id,"status": "error","error": str(e),"timestamp": time.time()}yield f"data: {json.dumps(error_data)}\n\n"finally:# 清理流状态streaming_server.active_streams.pop(stream_id, None)# 发送流结束信号end_signal = {"stream_id": stream_id,"status": "stream_ended","timestamp": time.time()}yield f"data: {json.dumps(end_signal)}\n\n"return StreamingResponse(generate_stream(),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive","X-Stream-ID": stream_id})@app.get("/status")
async def get_status():"""获取服务器状态"""return {"status": "running","active_streams": len(streaming_server.active_streams),"available_methods": list(streaming_server.data_processors.keys()),"timestamp": time.time()}@app.delete("/stream/{stream_id}")
async def stop_stream(stream_id: str):"""停止特定的流"""if stream_id in streaming_server.active_streams:del streaming_server.active_streams[stream_id]return {"status": "stream_stopped", "stream_id": stream_id}else:raise HTTPException(status_code=404, detail="Stream not found")if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
客户端
import requests
import json
import time
import threading
from typing import Callable, Optionalclass StreamableHTTPClient:def __init__(self, server_url: str):self.server_url = server_url.rstrip('/')self.active_requests = {}def stream_call(self, method: str, params: dict = None, on_data: Callable = None, on_complete: Callable = None, on_error: Callable = None) -> str:"""发起流式调用"""# 构建请求URLurl = f"{self.server_url}/stream/{method}"# 添加参数if params:param_strs = []for key, value in params.items():param_strs.append(f"{key}={value}")if param_strs:url += "?" + "&".join(param_strs)def stream_worker():try:response = requests.get(url, stream=True)response.raise_for_status()stream_id = response.headers.get('X-Stream-ID', 'unknown')self.active_requests[stream_id] = responsefor line in response.iter_lines(decode_unicode=True):if line.startswith('data: '):try:data = json.loads(line[6:])  # 去掉 'data: ' 前缀# 处理不同类型的消息status = data.get('status')if status == 'error' and on_error:on_error(data)elif status in ['stream_ended', 'completed', 'analysis_completed', 'file_processed']:if on_complete:on_complete(data)elif on_data:on_data(data)except json.JSONDecodeError:if on_error:on_error({"error": "Invalid JSON in stream", "raw_data": line})except requests.exceptions.RequestException as e:if on_error:on_error({"error": str(e), "type": "request_error"})finally:# 清理if stream_id in self.active_requests:del self.active_requests[stream_id]# 启动流处理线程thread = threading.Thread(target=stream_worker)thread.daemon = Truethread.start()return threaddef get_status(self) -> dict:"""获取服务器状态"""response = requests.get(f"{self.server_url}/status")return response.json()def stop_stream(self, stream_id: str) -> dict:"""停止指定的流"""response = requests.delete(f"{self.server_url}/stream/{stream_id}")return response.json()# 使用示例
if __name__ == "__main__":client = StreamableHTTPClient("http://localhost:8000")print("服务器状态:", client.get_status())# 示例1: 处理大型数据集def on_dataset_data(data):if 'batch_id' in data:print(f"处理批次 {data['batch_id']}: {data['processed_count']} 项")elif data.get('status') == 'completed':print(f"数据集处理完成! 总共处理 {data['total_processed']} 项")def on_complete(data):print(f"流处理完成: {data}")def on_error(data):print(f"错误: {data}")print("\n开始大数据集处理...")dataset_thread = client.stream_call("process_large_dataset",{"size": 500, "batch_size": 50},on_data=on_dataset_data,on_complete=on_complete,on_error=on_error)# 等待一段时间观察结果time.sleep(3)# 示例2: 实时分析print("\n开始实时分析...")def on_analysis_data(data):if 'value' in data:alerts = ", ".join(data.get('alerts', [])) or "无"print(f"分析点 {data['id']}: 值={data['value']:.2f}, 趋势={data['trend']}, 警报={alerts}")analysis_thread = client.stream_call("real_time_analysis",{"duration": 10, "interval": 1},on_data=on_analysis_data,on_complete=on_complete,on_error=on_error)# 等待分析完成time.sleep(12)# 示例3: 文件处理print("\n开始文件处理...")def on_file_data(data):if 'progress_percent' in data:print(f"文件处理进度: {data['progress_percent']:.1f}% "f"({data['processed_bytes']}/{data['total_bytes']} 字节)")file_thread = client.stream_call("file_processing",{"file_size": 5000, "chunk_size": 512},on_data=on_file_data,on_complete=on_complete,on_error=on_error)# 等待所有任务完成time.sleep(15)print("\n所有示例完成!")

全面对比分析

特性StdioSSEStreamable HTTP
部署复杂度⭐⭐⭐⭐⭐ 极简⭐⭐⭐ 中等⭐⭐ 较复杂
性能表现⭐⭐⭐⭐⭐ 极快⭐⭐⭐ 中等⭐⭐⭐⭐ 较快
网络支持❌ 仅本地✅ 支持远程✅ 支持远程
通信方向↔️ 双向⬇️ 单向推送↔️ 双向流式
并发能力❌ 单一进程⭐⭐⭐ 多客户端⭐⭐⭐⭐⭐ 高并发
数据规模⭐⭐ 小到中等⭐⭐⭐ 中等⭐⭐⭐⭐⭐ 大规模
实时性⭐⭐⭐⭐⭐ 极佳⭐⭐⭐⭐ 很好⭐⭐⭐⭐ 很好
资源消耗⭐⭐⭐⭐⭐ 极低

文章转载自:

http://3IWWDOUB.ggtgL.cn
http://vhBpsHRY.ggtgL.cn
http://lmJ2weva.ggtgL.cn
http://yz8CU0Sv.ggtgL.cn
http://voFBpKcQ.ggtgL.cn
http://ElOKlMyv.ggtgL.cn
http://xPp3RPmo.ggtgL.cn
http://IxpsC0Vj.ggtgL.cn
http://fhGpL6Ty.ggtgL.cn
http://hCqSWXX6.ggtgL.cn
http://hfOVSLxL.ggtgL.cn
http://bwuF72XH.ggtgL.cn
http://CEWa0hmW.ggtgL.cn
http://cSfvlbvp.ggtgL.cn
http://jFa8FM44.ggtgL.cn
http://QTnkzfCk.ggtgL.cn
http://Mp6ie4XQ.ggtgL.cn
http://n72FDrSE.ggtgL.cn
http://bSQY55tV.ggtgL.cn
http://GCFYk0de.ggtgL.cn
http://C4wTCMUs.ggtgL.cn
http://0oR1FKGk.ggtgL.cn
http://BBLACp83.ggtgL.cn
http://sQyksfG2.ggtgL.cn
http://aNpc3JLK.ggtgL.cn
http://uuVKPgOf.ggtgL.cn
http://GBodHiYi.ggtgL.cn
http://Scga5H43.ggtgL.cn
http://psl7XZ4M.ggtgL.cn
http://ZZhtJRf8.ggtgL.cn
http://www.dtcms.com/a/384169.html

相关文章:

  • 基于C#的快递打单系统源码+数据库+使用教程
  • RabbitMQ 高可用实战篇(Mirrored Queue + Cluster + 持久化整合)
  • RabbitMQ 命令执行流程与内核数据结构
  • Dify:Step1 本地化安装部署on MACOS
  • 有鹿机器人:以智能清洁 redefine 服务,以灵活租赁开启可能
  • 9.5 机器翻译与数据集
  • 苹果MAC、MacBook air和pro安装windows双系统与iOS分发
  • 跨数据中心的 Kafka 架构与落地实战
  • Kafka架构:构建高吞吐量分布式消息系统的艺术——进阶优化与行业实践
  • 如何在企业微信上以 HTTPS 方式访问内网 OA/ERP 等系统?
  • iOS 上架全流程指南 iOS 应用发布步骤、App Store 上架流程、uni-app 打包上传 ipa 与审核实战经验分享
  • 细粒度文本分类
  • Go 并发模型学习:从 goroutine 到 channel 的最佳实践
  • 高效解决多语言视频分发难题:Amazon MediaConvert 多语言输入配置 + CMAF 通用容器输出优化实战
  • 摆脱劳心,奔向劳体
  • pcl案例五 求类平面点云孔区面积
  • 第6.2节 Android Agent开发<三>
  • 利用kimi k2编写postgresql协议服务端的尝试
  • 深入理解 Java 集合框架
  • 第十届99全球链商节重点项目“全球纸基生态战略联盟”正式签约
  • 系统服务包括1-4章
  • 自动化C到Rust翻译工具探索:工具实操、不足与挑战解析
  • RabbitMQ 事件驱动与多进程架构
  • 飞书视频,设计测试case
  • python 自动化从入门到实战-开发一个文件自动备份工具(7)
  • 量子能量泵:一种基于并联电池与电容阵的动态直接升压架构
  • 从 WPF 到 Avalonia 的迁移系列实战篇7:EventTrigger 的迁移
  • pgNow:一款免费的PostgreSQL监控与性能诊断工具
  • 【完整源码+数据集+部署教程】俯视视角交通场景图像分割系统: yolov8-seg-FocalModulation
  • 《用 Python 构建可靠的自动化 Web 测试:从入门到进阶实战(含 Playwright + pytest + CI/Docker)》