【SpringAI】9.创建本地mcp服务(演示通过mcp实现联网搜索)
前言
上一篇演示了接入公网的高德地图sse服务,有人说我贴的代码不全,确实有些自定义工具类我不可能全部复制过来,复杂的功能大家一般也都会拆分开避免单文件过大,要查看完整代码还是去看完整项目的好,
这篇文章接入本地/内网的mcp服务实现联网搜索
1,先看最终效果
2,新建一个mcp服务项目或模块
引入pom依赖
<dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-starter-mcp-server-webmvc</artifactId><version>${spring-ai.version}</version></dependency><!-- Spring AI MCP 核心包 (手动实现SSE) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-mcp</artifactId><version>${spring-ai.version}</version></dependency><!-- Spring AI Model (基础接口) --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-model</artifactId><version>${spring-ai.version}</version></dependency>
application.yml添加必要的mcp服务信息,
服务端口假设9099
server:port: 9099
spring:ai:mcp:server:enabled: truetype: SYNCname: "LocalMcpServer"version: "1.0.0"stdio: falsesse-message-endpoint: "/mcp/message"sse-endpoint: "/sse"# MCP 服务能力配置capabilities:tool: trueresource: trueprompt: truecompletion: falseroots: falsesampling: false
3,创建sse接收端点
private final ConcurrentMap<String, SseEmitter> clients = new ConcurrentHashMap<>();/*** SSE端点 - 匹配MCP客户端期望的路径*/@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter sse(@RequestParam(value = "clientId", defaultValue = "default") String clientId) {logger.info("🔗 新SSE连接: clientId={}", clientId);SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 无超时clients.put(clientId, emitter);// 发送初始事件try {emitter.send(SseEmitter.event().name("connect").data("SSE连接已建立 - clientId: " + clientId));emitter.send(SseEmitter.event().name("mcp-ready").data("{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\",\"params\":{}}"));} catch (IOException e) {logger.error("发送初始SSE事件失败: {}", e.getMessage());clients.remove(clientId);emitter.complete();}// 设置完成和错误处理emitter.onCompletion(() -> {logger.info("🔌 SSE连接关闭: clientId={}", clientId);clients.remove(clientId);});emitter.onError((ex) -> {logger.error("❌ SSE连接错误: clientId={}, error={}", clientId, ex.getMessage());clients.remove(clientId);});emitter.onTimeout(() -> {logger.warn("⏰ SSE连接超时: clientId={}", clientId);clients.remove(clientId);});return emitter;}
4,获取联网搜索的key
https://www.searchapi.io/
登录注册即可拿到一个key,实际使用时直接
curl --get https://www.searchapi.io/api/v1/search \-d engine="baidu" \-d q="ERNIE Bot"
api_key可以放请求头,实测也可以直接当普通参数与q一起拼接到url后面,
更多的可选参数可以参考:https://www.searchapi.io/docs/baidu#api-parameters-search-query
将拿到的key配置到yml中随时读取
5,添加联网搜索工具类
/*** 联网搜索工具配置类,提供基于SearchAPI的搜索功能。*/
@Configuration
public class SearchToolsConfiguration {private static final Logger logger = LoggerFactory.getLogger(SearchToolsConfiguration.class);private static final int DEFAULT_NUM_RESULTS = 50;private static final int MAX_NUM_RESULTS = 100;@Value("${search.url}")private String searchApiUrl;private final RestTemplate restTemplate = new RestTemplate();private final ObjectMapper objectMapper = new ObjectMapper();/*** 搜索工具集合:包含所有联网搜索相关的工具。*/@Beanpublic List<McpServerFeatures.SyncToolSpecification> searchTools() {return List.of(createSearchWebTool());}/*** 创建联网搜索工具。*/private McpServerFeatures.SyncToolSpecification createSearchWebTool() {logger.info("创建联网搜索工具,API URL: {}", searchApiUrl);String schemaJson = """{"type": "object","properties": {"q": {"type": "string","description": "搜索查询内容,必填参数"},"num": {"type": "integer","description": "返回结果数量,可选参数,默认50,最大100","minimum": 1,"maximum": 100,"default": 50}},"required": ["q"]}""";McpSchema.Tool tool = new McpSchema.Tool("search_web","执行联网搜索并返回分页的结构化搜索结果",schemaJson);return new McpServerFeatures.SyncToolSpecification(tool,(exchange, arguments) -> {try {String query = (String) arguments.get("q");if (query == null || query.trim().isEmpty()) {return new McpSchema.CallToolResult("错误:搜索查询内容不能为空", true);}Integer num = DEFAULT_NUM_RESULTS;if (arguments.containsKey("num")) {Object numValue = arguments.get("num");if (numValue instanceof Number) {num = ((Number) numValue).intValue();} else if (numValue instanceof String) {try {num = Integer.parseInt((String) numValue);} catch (NumberFormatException e) {logger.warn("无效的num参数值: {}, 使用默认值: {}", numValue, DEFAULT_NUM_RESULTS);}}// 限制结果数量在合理范围内num = Math.max(1, Math.min(num, MAX_NUM_RESULTS));}logger.info("执行联网搜索: query='{}', num={}", query, num);// 构建完整的搜索URL,使用UriComponents处理特殊字符UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(searchApiUrl);String url = builder.toUriString();// 手动构建查询参数,确保正确编码StringBuilder queryString = new StringBuilder();queryString.append("q=").append(UriUtils.encode(query, "UTF-8"));queryString.append("&num=").append(num);// 如果URL已有查询参数,追加到现有参数String fullUrl = url;if (url.contains("?")) {fullUrl = url + "&" + queryString.toString();} else {fullUrl = url + "?" + queryString.toString();}URI searchUri = URI.create(fullUrl);logger.debug("搜索API请求URL: {}", searchUri);// 调用搜索APIString response = restTemplate.getForObject(searchUri, String.class);if (response == null || response.trim().isEmpty()) {return new McpSchema.CallToolResult("错误:搜索API返回空结果", true);}// 解析并格式化结果try {Map<String, Object> searchResult = objectMapper.readValue(response, Map.class);// 检查搜索结果状态if (searchResult.containsKey("search_metadata")) {Map<String, Object> metadata = (Map<String, Object>) searchResult.get("search_metadata");String status = (String) metadata.get("status");if (!"Success".equals(status)) {logger.warn("搜索API返回非成功状态: {}", status);return new McpSchema.CallToolResult("搜索失败,API返回状态: " + status, true);}}// 提取有用的搜索结果List<Map<String, Object>> organicResults = (List<Map<String, Object>>) searchResult.get("organic_results");if (organicResults == null || organicResults.isEmpty()) {return new McpSchema.CallToolResult("无内容", false);}// 直接返回organic_results的原始内容String organicResultsJson = objectMapper.writeValueAsString(organicResults);return new McpSchema.CallToolResult(organicResultsJson, false);} catch (Exception e) {logger.error("解析搜索结果失败", e);// 如果无法解析JSON,直接返回原始响应return new McpSchema.CallToolResult(response, false);}} catch (Exception e) {logger.error("联网搜索失败", e);return new McpSchema.CallToolResult("搜索错误: " + e.getMessage(), true);}});}
}
6,实际调用
配置与前一章节的类似,在前端的mcp管理页面新增mcp,表单json填入:
{"mcpServers": {"LocalMcpServer": {"url": "http://127.0.0.1:9099","type": "sse","sseEndpoint": "/sse"}}
}
后端对所有管理的模型做了维护,实际调用是从工厂取出再决定要不要使用mcp工具
/*** 输出处理后端流式结果* 优化流式输出处理逻辑:* 1. 在开始阶段或空字符串输出时,状态保持为"think",表示模型正在思考* 2. 只有当检测到有意义的内容(中文、数字、字母等)时,才将状态改为"running"* 3. 在流结束时明确添加一个"stop"状态的消息,确保前端能正确处理结束状态** @param messageList 模型消息,包括系统提示词、用户提示词、历史对话和媒体文件* @param myModel 指定模型对象* @param body 用户请求参数* @return 处理后的FluxVO流*/private Flux<FluxVO> getFluxVOFlux(List<Message> messageList, AiModel myModel, QuestionVO body) {Prompt prompt = new Prompt(messageList);AtomicBoolean inThinking = new AtomicBoolean(false);StringBuffer outputText = body.getMemory() ? new StringBuffer() : null;ChatClient chatModel = myModel.getChatClient();// 1. 先构造 Publisher<ChatResponse>Flux<ChatResponse> publisher;if (body.getUseTools()) {List<ToolCallback> toolCallbacks = dynamicMcpClientManager.getAvailableToolCallbacks();publisher = chatModel.prompt(prompt).toolCallbacks(toolCallbacks).stream().chatResponse();} else {publisher = chatModel.prompt(prompt).stream().chatResponse();}// 主动推送一条“处理中”消息Flux<FluxVO> proactiveMsg = Flux.just(FluxVO.builder().text("").status("before").build());Flux<FluxVO> resp = Flux.from(publisher).doFirst(() -> {System.out.println("-------------开始输出");if (body.getMemory()) {chatMemoryService.saveMessage(body);}}).map(response -> {String text = response.getResult().getOutput().getText();if (text == null) {text = "";}// 处理工具使用信息if (!response.getResult().getOutput().getToolCalls().isEmpty()) {for (AssistantMessage.ToolCall toolCall : response.getResult().getOutput().getToolCalls()) {System.out.println("==================调用mcp工具====================");System.out.println(toolCall.name());}}if ("<think>".equals(text)) {inThinking.set(true);} else if ("</think>".equals(text)) {inThinking.set(false);}boolean isStop = response.getResult().getMetadata().getFinishReason() != null && !response.getResult().getMetadata().getFinishReason().isEmpty();String status = inThinking.get() ? "think" : (isStop ? "stop" : "running");if (outputText != null) {outputText.append(text);}return FluxVO.builder().text(text).status(status).build();}).doFinally(signalType -> {System.out.println("-------------流式处理结束");if (body.getMemory() && outputText != null) {chatMemoryService.saveMessage(body.getSessionId(), "ASSISTANT", outputText.toString(), body.getModel());}}).onErrorResume(error -> {System.err.println("流式处理异常: " + error.getMessage());return Flux.just(FluxVO.builder().text("AI服务异常,请稍后重试").status("stop").build());});// 先推 proactiveMsg,再推 publisherreturn Flux.concat(proactiveMsg, resp);}
7,其他
后端的动态mcp管理类做了重大优化,支持服务运行时动态添加mcp服务、定期健康检查、定时重连等,还是推荐去看我的完整目录
https://gitee.com/luckylanyu/springai-novel