Spring AI将存量接口转化为MCP服务(附源码)
介绍
许多公司自己有存量 OpenAPI,怎么让自己的存量的 OpenAPI 转为 MCP Server 服务,从而可以被各种 Agent 调用,进行 AI 应用上的创新是一个不可阻挡的趋势和潮流。但传统 OpenAPI 向 MCP 协议迁移挑战,如何通过一个 OpenApi2MCP 适配器低成本实现存量 OpenAPI 一键转为 MCP Server 服务。
这篇文章以WebFlux Server SSE Transport方式实现0代码改动地实现将存量API转换成MCP 服务的能力,而且实现客户端到服务端的认证。
MCP Server端
重写WebFluxSseServerTransportProvider
McpServerTransport 是服务端传输层的标记接口,定义了服务端通信的基础功能
McpServerTransportProvider 是服务端传输层的核心接口,负责会话管理、消息广播和资源清理
该类是服务端实现的 MCP 传输层(内部类 WebFluxMcpSessionTransport 实现 McpServerTransport),基于 Spring WebFlux 框架,使用 SSE 协议实现双向通信。它负责管理客户端会话,处理消息的接收与发送,并提供可靠的消息广播功能,主要功能如下:
- SSE 连接管理:通过 SSE 建立服务端到客户端的实时消息通道
- 消息接收与处理:通过 HTTP POST 接收客户端发送的 JSON-RPC 消息
- 消息广播:支持将消息推送到所有活跃的客户端会话
- 会话管理:维护客户端会话的生命周期,支持资源清理和优雅关闭
- 线程安全:使用 ConcurrentHashMap 管理会话,确保多客户端连接的安全性
各字段含义
ObjectMapper objectMapper
:用于 JSON 序列化和反序列化的 ObjectMapper 实例String baseUrl
:消息端点的基础 URL,用于构建客户端发送消息的完整路径,默认为""String messageEndpoint
:客户端发送 JSON-RPC 消息的端点 URI,默认为”/mcp/message”String sseEndpoint
:服务端接收 SSE 连接的端点 URI,默认为”/sse”RouterFunction<?> routerFunction
:定义 HTTP 路由的 RouterFunction,包括 SSE 和消息端点McpServerSession.Factory sessionFactory
:会话工厂,用于创建新的服务端会话ConcurrentHashMap<String, McpServerSession> sessions
:存储活跃客户端会话的线程安全映射,键为会话 IDboolean isClosing
:标志传输是否正在关闭,防止关闭期间接受新连接
改动点
-
handleSseConnection方法 中添加请求头信息
private Mono<ServerResponse> handleSseConnection(ServerRequest request) {return this.isClosing ? ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down") : ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(Flux.create((FluxSink<ServerSentEvent<?>> sink) -> {WebFluxMcpSessionTransport sessionTransport = new WebFluxMcpSessionTransport(sink);McpServerSession session = this.sessionFactory.create(sessionTransport);String sessionId = session.getId();logger.debug("Created new SSE connection for session: {}", sessionId);this.sessions.put(sessionId, session);// 获取请求头Map<String, String> headers = request.headers().asHttpHeaders().toSingleValueMap();logger.debug("sessionId: {} with headers: {}", sessionId, headers);session2headers.put(sessionId, headers);logger.debug("Sending initial endpoint event to session: {}", sessionId);sink.next(ServerSentEvent.builder().event("endpoint").data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId).build());sink.onCancel(() -> {logger.debug("Session {} cancelled", sessionId);this.sessions.remove(sessionId);});}), ServerSentEvent.class);}
-
handleMessage方法中 工具触发消息,获取此时对应的工具信息,塞入对应的请求头信息
private Mono<ServerResponse> handleMessage(ServerRequest request) {if (this.isClosing) {return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");} else if (request.queryParam("sessionId").isEmpty()) {return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing in message endpoint"));} else {McpServerSession session = (McpServerSession)this.sessions.get(request.queryParam("sessionId").get());return session == null ? ServerResponse.status(HttpStatus.NOT_FOUND).bodyValue(new McpError("Session not found: " + (String)request.queryParam("sessionId").get())) : request.bodyToMono(String.class).flatMap((body) -> {try {McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, body);if (message instanceof McpSchema.JSONRPCRequest) {String method = ((McpSchema.JSONRPCRequest) message).method();if (McpSchema.METHOD_TOOLS_CALL.equals(method)) {// 工具触发消息,获取此时对应的工具信息,塞入对应的请求头信息Map<String, String> headers = this.session2headers.get(session.getId());LinkedHashMap<String, String> params = (LinkedHashMap<String, String>) ((McpSchema.JSONRPCRequest) message).params();String toolName = params.get("name");Assert.notNull(toolName, "Tool name cannot be null");for (McpRestfulToolCallback toolCallback : (McpRestfulToolCallback[]) mcpRestfulToolCallbackProvider.getToolCallbacks()) {if (toolName.equals(toolCallback.getToolDefinition().name())) {toolCallback.setHeaders( headers);}}}}return session.handle(message).flatMap((response) -> ServerResponse.ok().build()).onErrorResume((error) -> {logger.error("Error processing message: {}", error.getMessage());return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).bodyValue(new McpError(error.getMessage()));});} catch (IOException | IllegalArgumentException e) {logger.error("Failed to deserialize message: {}", ((Exception)e).getMessage());return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));}});}}
自定义实现ToolCallbackProvider、ToolCallback、ToolDefinition
McpRestfulToolCallbackProvider
集成 MCP 同步客户端的 ToolCallbackProvider,负责从一个或多个 MCP 同步服务器(通过 McpSyncClient)自动发现、收集所有可用的工具,支持对工具进行过滤,确保工具名唯一
List<McpSyncClient> mcpClients
:存储所有需要集成的 MCP 同步客户端实例,用于从多个 MCP 服务器拉取工具列表BiPredicate<McpSyncClient, Tool> toolFilter
:工具过滤器,允许根据客户端和工具元数据自定义过滤逻辑,决定哪些工具被暴露
方法名称 | 描述 |
---|---|
McpRestfulToolCallbackProvider | 根据MCP同步客户端、工具过滤器等实现构造器 |
getToolCallbacks | 从所有 MCP 客户端拉取工具列表,应用过滤器,包装为 SyncMcpToolCallback,并校验工具名唯一性,最终返回所有可用工具的回调数组 |
McpRestfulToolCallback
MCP 同步工具适配为 SpringAI 中 ToolCallback 的桥接实现
McpSyncClient mcpClient
:MCP 同步客户端实例,负责与 MCP 服务器通信、发起工具调用Tool tool
:MCP 工具定义对象,包含工具的名称、描述、输入参数 schema 等元数据
方法名称 | 描述 |
---|---|
McpRestfulToolCallback | 根据MCP同步客户端、工具定义实现构造器 |
getToolDefinition | 将 MCP 工具定义转换为 Spring AI 的 ToolDefinition,包括名称(带前缀防止冲突)、描述、输入参数 schema(JSON 格式) |
call | 执行工具调用。将 JSON 字符串参数转为 Map,调用 MCP 工具,处理异常和错误,并将结果序列化为 JSON 字符串返回 |
-
在执行call方法时,使用WebClient执行restful方法同时设置请求头
public String call(String toolInput, @Nullable ToolContext toolContext) {Assert.hasText(toolInput, "toolInput cannot be null or empty");logger.debug("Starting execution of tool: {}", this.toolDefinition.name());Map<String, Object> toolArguments = JsonParser.fromJson(toolInput, new TypeReference<Map<String, Object>>() {});String result = "";if (HttpMethod.GET.equals(toolDefinition.httpMethod())) {StringBuilder uriBuilder = new StringBuilder().append(toolDefinition.path()).append("?");toolArguments.forEach((key, value) -> {uriBuilder.append(key).append("=").append(value).append("&");});String uri = uriBuilder.toString();result = WebClient.builder().build().get().uri(toolDefinition.url() + uri).headers(headers -> {this.headers.forEach(headers::add);}).retrieve().bodyToMono(String.class).block();} else if (HttpMethod.POST.equals(toolDefinition.httpMethod())) {result = WebClient.builder().build().post().uri(toolDefinition.url()).headers(headers -> {this.headers.forEach(headers::add);}).bodyValue(toolArguments).retrieve().bodyToMono(String.class).block();}logger.debug("Successful execution of tool: {}, result: {}", this.toolDefinition.name(), result);return result;}public void setHeaders(Map<String, String> headersMap) {this.headers = headersMap;}
RestfulToolDefinition
将 MCP 工具定义转换为 Spring AI 的 ToolDefinition,包括名称(带前缀防止冲突)、描述、输入参数 schema(JSON 格式) RestfulToolDefinition
服务端注册ToolCallbackProvider
-
restful api 接口信息转换为ToolCallback MCP信息
package com.lucifer.ai.mcp.server.parse; import com.lucifer.ai.mcp.server.model.Parameter; import com.lucifer.ai.mcp.server.model.RestfulModel; import com.lucifer.ai.mcp.server.util.JSONSchemaUtil; import org.springframework.ai.mcp.McpRestfulToolCallback; import org.springframework.ai.mcp.McpRestfulToolCallbackProvider; import org.springframework.ai.mcp.RestfulToolDefinition; import org.springframework.http.HttpMethod; import org.springframework.stereotype.Component;import java.util.ArrayList; import java.util.List;/*** @author lucifer* @date 2025/8/8 13:40*/ @Component public class ParseRestful {public McpRestfulToolCallbackProvider getRestfulToolCallbackProvider() {List<McpRestfulToolCallback> toolCallbacks = new ArrayList<>();getRestfulModels().forEach(restfulModel -> {RestfulToolDefinition restfulToolDefinition = RestfulToolDefinition.builder().name(restfulModel.name()).description(restfulModel.description()).inputSchema(restfulModel.inputSchema()).url(restfulModel.url()).method(restfulModel.method()).path(restfulModel.path()).httpMethod(restfulModel.httpMethod()).build();McpRestfulToolCallback mcpRestfulToolCallback = McpRestfulToolCallback.builder().toolDefinition(restfulToolDefinition).build();toolCallbacks.add(mcpRestfulToolCallback);});return McpRestfulToolCallbackProvider.builder().toolCallbacks(toolCallbacks.toArray(new McpRestfulToolCallback[0])).build();}public List<RestfulModel> getRestfulModels() {Parameter parameter = Parameter.builder().parameteNname("timeZoneId").description("time zone id, such as Asia/Shanghai").required(true).type("string").build();return List.of(new RestfulModel("getCityTime", "获取指定时区的时间", JSONSchemaUtil.getInputSchema(List.of(parameter)), "http://localhost:8001", "getCiteTimeMethod", "/time/city", HttpMethod.GET));}}
-
配置ToolCallbackProvider
package com.lucifer.ai.mcp; import com.lucifer.ai.mcp.server.parse.ParseRestful; import com.lucifer.ai.mcp.tools.WeatherService; import org.springframework.ai.tool.ToolCallbackProvider; import org.springframework.ai.tool.method.MethodToolCallbackProvider; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean;@SpringBootApplication public class LuciferAiMcpServer {public static void main(String[] args) {SpringApplication.run(LuciferAiMcpServer.class, args);}@Beanpublic ToolCallbackProvider mcpRestfulToolCallbackProvider(ParseRestful parseRestful) {return parseRestful.getRestfulToolCallbackProvider();} }
MCP Client端
重写SseWebFluxTransportAutoConfiguration
自动配置基于 WebFlux 的 SSE MCP 客户端传输能力,仅当满足以下条件时自动配置生效
- 类路径中有 WebFluxSseClientTransport
- 配置项 spring.ai.mcp.client.enabled=true(默认为 true)
对外提供 List的 Bean,逻辑如下
- 读取所有配置的 SSE 连接(如 server1、server2)
- 为每个连接克隆一个 WebClient.Builder,设置对应的 baseUrl
- 构建 WebFluxSseClientTransport 实例,设置端点和 JSON 处理器
- 封装为 NamedClientMcpTransport,加入列表
改动点
-
WebClient 中添加请求头信息
package org.springframework.ai.mcp.client.autoconfigure;import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport; import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties; import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.web.reactive.function.client.WebClient;import java.util.ArrayList; import java.util.List; import java.util.Map;/*** @author lucifer* @date 2025/8/8 13:33*/ @AutoConfiguration @ConditionalOnClass({WebFluxSseClientTransport.class}) @EnableConfigurationProperties({McpSseClientProperties.class, McpClientCommonProperties.class}) @ConditionalOnProperty(prefix = "spring.ai.mcp.client",name = {"enabled"},havingValue = "true",matchIfMissing = true ) public class SseWebFluxTransportAutoConfiguration {@Beanpublic List<NamedClientMcpTransport> webFluxClientTransports(McpSseClientProperties sseProperties, ObjectProvider<WebClient.Builder> webClientBuilderProvider, ObjectProvider<ObjectMapper> objectMapperProvider) {List<NamedClientMcpTransport> sseTransports = new ArrayList();WebClient.Builder webClientBuilderTemplate = (WebClient.Builder)webClientBuilderProvider.getIfAvailable(WebClient::builder);ObjectMapper objectMapper = (ObjectMapper)objectMapperProvider.getIfAvailable(ObjectMapper::new);for(Map.Entry<String, McpSseClientProperties.SseParameters> serverParameters : sseProperties.getConnections().entrySet()) {WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(((McpSseClientProperties.SseParameters)serverParameters.getValue()).url())// 添加请求头.defaultHeaders((headers) ->{if (serverParameters.getValue().headers() != null) {serverParameters.getValue().headers().forEach(headers::add);}});String sseEndpoint = ((McpSseClientProperties.SseParameters)serverParameters.getValue()).sseEndpoint() != null ? ((McpSseClientProperties.SseParameters)serverParameters.getValue()).sseEndpoint() : "/sse";WebFluxSseClientTransport transport = WebFluxSseClientTransport.builder(webClientBuilder).sseEndpoint(sseEndpoint).objectMapper(objectMapper).build();sseTransports.add(new NamedClientMcpTransport((String)serverParameters.getKey(), transport));}return sseTransports;} }
配置文件中添加token信息
server:port: 8002
spring:application:name: lucifer-ai-mcp-clientmain:allow-bean-definition-overriding: trueai:openai:api-key: ${api-key}base-url: https://open.bigmodel.cn/api/paas/v4/chat:options:model: glm-4-flashtemperature: 0.7completions-path: /chat/completionsmcp:client:enabled: truename: lucifer-ai-mcp-clientversion: 1.0.0request-timeout: 30stype: ASYNCsse:connections:server1:url: http://localhost:8001headers:token: lucifertoolcallback:enabled: true
使用说明
先启动lucifer-ai-mcp-server
然后启动 lucifer-ai-mcp-client
然后在 lucifer-ai-mcp-client 客户端调用接口 GET http://localhost:8002/ai/chat?message=北京时间
此时会调用 lucifer-ai-mcp-server 服务端的 GET http://localhost:8001/time/city ,同时服务端刻可以接收到请求头信息token信息。
这里的mcp-server可以将存量api接口转换为mcp服务,而且同时可以获取客户端认证信息。
源码地址
lucifer-ai-mcp-server:https://github.com/luguangdong/lucifer-ai-mcp-server.git
lucifer-ai-mcp-client:https://github.com/luguangdong/lucifer-ai-mcp-client.git