【MCP】为什么使用Streamable HTTP: 相比SSE的优势与实践指南
在现代Web开发中,实时通信已经成为许多应用的核心需求。从聊天应用到股票市场更新,从游戏服务器到AI模型通信,各种技术应运而生以满足这些需求。最近,Model Context Protocol (MCP) 引入了一种新的传输机制 —— Streamable HTTP,它为服务器到客户端的实时通信提供了更优雅的解决方案。本文将深入探讨Streamable HTTP相较于Server-Sent Events (SSE)的优势,并通过实际代码示例展示其实现。
实时通信技术的演进
在深入Streamable HTTP之前,我们先简要回顾一下Web实时通信技术的发展历程:
长轮询 (Long Polling)
长轮询是早期使用的一种"黑客"方式,用于在浏览器中通过HTTP实现服务器-客户端消息传递。客户端发送HTTP请求,服务器保持连接开放直到有新数据可用。一旦服务器发送响应,客户端立即发起新的请求。这种方法虽然简单,但效率低下且可能导致消息丢失。
// 长轮询的JavaScript客户端实现
function longPoll() {fetch('http://example.com/poll').then(response => response.json()).then(data => {console.log("接收到数据:", data);longPoll(); // 立即建立新的长轮询请求}).catch(error => {// 10秒后重试setTimeout(longPoll, 10000);});
}
longPoll(); // 启动长轮询
WebSockets
WebSockets提供了一种全双工通信机制,允许在单个长连接上双向传输数据。这项技术克服了HTTP请求-响应周期的开销,非常适合低延迟、高频率更新的场景。
// WebSocket的JavaScript客户端实现
const socket = new WebSocket('ws://example.com');socket.onopen = function(event) {console.log('连接已建立');// 向服务器发送消息socket.send('你好,服务器!');
};socket.onmessage = function(event) {console.log('来自服务器的消息:', event.data);
};
尽管WebSocket API基础使用简单,但在生产环境中处理连接断开、重连和心跳检测等问题是相当复杂的。
Server-Sent Events (SSE)
Server-Sent Events提供了一种标准方式,通过HTTP将服务器更新推送到客户端。与WebSockets不同,SSE专为单向通信(从服务器到客户端)设计,适用于新闻推送、体育比分更新等场景。
// SSE的JavaScript客户端实现
const evtSource = new EventSource("https://example.com/events");// 处理消息事件
evtSource.onmessage = event => {console.log('收到消息: ' + event.data);
};
SSE的优点是自动重连及使用标准HTTP协议,但它存在一些局限性,如:
- 需要维护长期连接
- 浏览器限制每个域名最多6个并发连接
- 在企业环境中可能受到代理和防火墙的限制
Streamable HTTP:新一代实时通信方案
Streamable HTTP是MCP协议在2025年3月引入的一种新传输机制,旨在取代之前的HTTP+SSE传输模式。它的设计理念是在保留SSE优点的同时克服其限制,特别是提供更好的可扩展性和企业环境兼容性。
Streamable HTTP的工作原理
Streamable HTTP的核心思想是提供一个统一的HTTP端点,同时支持POST和GET方法:
- POST方法:用于客户端向服务器发送请求和接收响应
- GET方法(可选):用于建立SSE流,接收服务器实时推送的消息
与传统HTTP+SSE不同,Streamable HTTP不要求维护单独的初始化连接和消息端点,简化了协议设计并提高了可靠性。
Streamable HTTP相比SSE的五大优势
1. 简化的通信模型
传统的HTTP+SSE方法需要两个不同的端点:一个用于建立连接,另一个用于发送消息。而Streamable HTTP提供了一个统一的端点,简化了客户端和服务器之间的交互。
传统SSE实现(两个端点):
// 服务器端(传统SSE)
router.get("/connect", async (req, res) => {const transport = new SSEServerTransport(POST_ENDPOINT, res);transports[transport.sessionId] = transport;await server.connect(transport);
});router.post(POST_ENDPOINT, async (req, res) => {const sessionId = req.query.sessionId;if (!transports[sessionId]) {res.status(400).send({ message: "无效的会话ID" });return;}await transports[sessionId].handlePostMessage(req, res, req.body);
});
Streamable HTTP实现(单一端点):
// 服务器端(Streamable HTTP)
app.post("/mcp", async (req, res) => {const sessionId = req.headers['mcp-session-id'];if (sessionId && transports[sessionId]) {// 使用现有传输await transports[sessionId].handleRequest(req, res, req.body);return;}if (!sessionId && isInitializeRequest(req.body)) {// 创建新传输const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => randomUUID(),});await server.connect(transport);await transport.handleRequest(req, res, req.body);const newSessionId = transport.sessionId;if (newSessionId) {transports[newSessionId] = transport;}return;}res.status(400).json({ error: "无效的请求" });
});app.get("/mcp", async (req, res) => {const sessionId = req.headers['mcp-session-id'];if (!sessionId || !transports[sessionId]) {res.status(400).json({ error: "无效的会话ID" });return;}await transports[sessionId].handleRequest(req, res);
});
2. 支持无状态模式
Streamable HTTP的一个重要创新是支持完全无状态操作。通过设置sessionIdGenerator: () => undefined
,服务器可以在不维护会话状态的情况下处理请求,非常适合无服务器环境。
// 无状态模式配置
const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => undefined, // 启用无状态模式
});
这种无状态模式特别适合:
- 部署在AWS Lambda、Azure Functions等无服务器环境
- 短暂交互而非长期连接的场景
- 需要最小化服务器内存使用的应用
3. 更好的可伸缩性
由于Streamable HTTP可以在无状态模式下运行,它非常适合容器化和自动扩展场景。服务器不需要维护长期连接,可以根据请求动态分配资源,显著提高可伸缩性。
这解决了SSE的一个主要问题:当有大量客户端时,每个客户端都需要维持一个长连接,可能导致服务器资源耗尽。使用Streamable HTTP的无状态模式,服务器只在处理请求时分配资源,处理完成后即可释放。
4. 提高的可靠性
Streamable HTTP的简化设计减少了出错机会:
- 会话管理:在有状态模式下,会话ID通过HTTP头而非查询参数传递,减少安全风险
- 重连处理:客户端可以在会话有效期内随时重连,无需复杂的重连逻辑
- 错误恢复:简化的协议使错误处理和恢复更加直观
5. 更好的企业环境兼容性
在企业环境中,代理服务器和防火墙常常会阻止非标准HTTP连接。Streamable HTTP使用标准HTTP通信,大大减少了这类问题:
- 使用标准HTTP POST和GET,无需特殊配置
- 不依赖长连接,减少代理超时问题
- 会话ID通过HTTP头传递,更符合企业安全要求
实现Streamable HTTP的最佳实践
以下是一个实现Streamable HTTP服务器的完整示例,包含了所有最佳实践:
import express from 'express';
import { Request, Response } from 'express';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { randomUUID } from 'crypto';// 创建MCP服务器
const server = new Server({name: "streamable-http-demo",version: "1.0.0"
}, {capabilities: {tools: {},logging: {}}
});// 创建Express应用
const app = express();
app.use(express.json());// 会话存储
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};// 单一MCP端点
const MCP_ENDPOINT = "/mcp";// 处理POST请求
app.post(MCP_ENDPOINT, async (req: Request, res: Response) => {const sessionId = req.headers['mcp-session-id'] as string | undefined;try {// 1. 重用现有会话if (sessionId && transports[sessionId]) {await transports[sessionId].handleRequest(req, res, req.body);return;}// 2. 创建新会话(初始化请求)if (!sessionId && isInitializeRequest(req.body)) {const transport = new StreamableHTTPServerTransport({sessionIdGenerator: () => randomUUID(),// 无状态模式: sessionIdGenerator: () => undefined});await server.connect(transport);await transport.handleRequest(req, res, req.body);// 存储新会话const newSessionId = transport.sessionId;if (newSessionId) {transports[newSessionId] = transport;console.log(`新会话创建: ${newSessionId}`);}return;}// 3. 处理错误情况res.status(400).json(createErrorResponse("无效的会话ID或请求方法"));} catch (error) {console.error('处理请求出错:', error);res.status(500).json(createErrorResponse("内部服务器错误"));}
});// 处理GET请求(SSE流)
app.get(MCP_ENDPOINT, async (req: Request, res: Response) => {const sessionId = req.headers['mcp-session-id'] as string | undefined;if (!sessionId || !transports[sessionId]) {res.status(400).json(createErrorResponse("无效的会话ID"));return;}try {const transport = transports[sessionId];await transport.handleRequest(req, res);console.log(`SSE流已为会话 ${sessionId} 建立`);} catch (error) {console.error('建立SSE流出错:', error);if (!res.headersSent) {res.status(500).json(createErrorResponse("建立SSE流出错"));}}
});// 辅助函数
function isInitializeRequest(body: any): boolean {return body && body.method === 'initialize';
}function createErrorResponse(message: string): any {return {jsonrpc: '2.0',error: {code: -32000,message: message,},id: null,};
}// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {console.log(`服务器运行在 http://localhost:${PORT}`);
});
在客户端,使用Streamable HTTP也非常直观:
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { URL } from 'url';async function main() {// 1. 创建客户端const client = new Client({ name: "streamable-http-client", version: "1.0.0" });// 2. 创建传输实例const transport = new StreamableHTTPClientTransport(new URL('http://localhost:3000/mcp'));// 3. 连接到服务器await client.connect(transport);console.log('已连接到服务器');// 4. 设置通知处理transport.onmessage = (message) => {console.log('接收到消息:', message);};// 5. 调用工具示例const result = await client.callTool({name: 'example-tool',arguments: { param: 'value' },});console.log('工具调用结果:', result);
}main().catch(console.error);
处理Streamable HTTP的常见挑战
尽管Streamable HTTP提供了许多优势,但在实际应用中仍需注意一些挑战:
1. 会话管理
在有状态模式下,需要妥善管理会话资源,避免内存泄漏。一个好的实践是设置会话超时机制:
// 会话超时管理
function setupSessionTimeout(sessionId, timeoutMs = 30 * 60 * 1000) {const timeoutId = setTimeout(() => {if (transports[sessionId]) {console.log(`会话 ${sessionId} 已超时,正在清理`);delete transports[sessionId];}}, timeoutMs);// 存储超时ID以便可以取消sessionTimeouts[sessionId] = timeoutId;
}// 在活动时重置超时
function resetSessionTimeout(sessionId) {if (sessionTimeouts[sessionId]) {clearTimeout(sessionTimeouts[sessionId]);setupSessionTimeout(sessionId);}
}
2. 断线重连策略
客户端应实现断线重连策略,特别是在移动网络等不稳定环境中:
class MCPClient {// ...async connectWithRetry(url, maxRetries = 5, delayMs = 1000) {let retries = 0;while (retries < maxRetries) {try {await this.connect(url);console.log('连接成功');return;} catch (error) {retries++;console.log(`连接失败,第 ${retries} 次重试...`);await new Promise(resolve => setTimeout(resolve, delayMs));// 指数退避delayMs *= 1.5;}}throw new Error('连接失败,已达到最大重试次数');}
}
3. 处理未送达事件
当客户端断线重连时,可能会错过服务器发送的事件。一个解决方案是使用事件ID和断点续传:
// 服务器端
app.post("/mcp", async (req, res) => {// ...// 包含最后事件IDconst lastEventId = req.headers['last-event-id'];if (lastEventId && sessionId) {// 发送错过的事件await sendMissedEvents(transport, lastEventId);}
});// 客户端
let lastEventId = null;transport.onmessage = (message) => {if (message.id) {lastEventId = message.id;localStorage.setItem('lastEventId', lastEventId);}
};// 重连时包含最后事件ID
async function reconnect() {lastEventId = localStorage.getItem('lastEventId');const headers = {};if (lastEventId) {headers['last-event-id'] = lastEventId;}transport = new StreamableHTTPClientTransport(url, { headers });await client.connect(transport);
}
结论
Streamable HTTP代表了Web实时通信的一个重要进步,特别适合需要在各种环境中可靠工作的企业应用。与传统SSE相比,它提供了更简化的通信模型、可选的无状态模式、更好的可伸缩性、提高的可靠性以及更好的企业环境兼容性。
随着越来越多的服务采用MCP协议,Streamable HTTP有望成为构建实时通信应用的首选方法,特别是在AI工具集成和企业应用领域。
如果你正在考虑在项目中实现实时通信,Streamable HTTP绝对值得考虑,尤其是当你的应用需要在各种网络环境中可靠运行,或者部署在无服务器环境中时。
参考资料
- Model Context Protocol官方规范:https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http
- MCP TypeScript SDK:https://github.com/modelcontextprotocol/typescript-sdk
- RxDB博客 - WebSockets vs SSE vs 长轮询:https://rxdb.info/articles/websockets-sse-polling-webrtc-webtransport.html
- 使用SSE代替WebSockets的场景:https://blog.csdn.net/adcwa/article/details/146942148
- MCP服务器实现示例:https://medium.com/@itsuki.enjoy/mcp-server-and-client-with-sse-the-new-streamable-http-d860850d9d9d