java快速接入mcp以及结合mysql动态管理
使用AI4J为应用快速接入MCP服务
本博文给大家介绍一下如何使用AI4J
快速接入MCP服务。
介绍
由于SpringAI需要使用JDK17和Spring Boot3,但是目前很多应用依旧使用的JDK8版本,所以使用可以支持JDK8的AI4J来接入OpenAI大模型。
AI4J是一款JavaSDK用于快速接入AI大模型应用,整合多平台大模型,如OpenAi、智谱Zhipu(ChatGLM)、深度求索DeepSeek、月之暗面Moonshot(Kimi)、腾讯混元Hunyuan、零一万物(01)等等,提供统一的输入输出(对齐OpenAi)消除差异化,优化函数调用(Tool Call),优化RAG调用、支持向量数据库(Pinecone),并且支持JDK1.8,为用户提供快速整合AI的能力。
支持MCP协议,支持STDIO,SSE,Streamable HTTP; 支持MCP Server与MCP Client; 支持MCP网关; 支持自定义MCP数据源; 支持MCP自动重连
AI4J-GitHub
AI4J-Gitee
引入Ai4j依赖
<!-- Spring应用 -->
<dependency><groupId>io.github.lnyo-cly</groupId><artifactId>ai4j-spring-boot-stater</artifactId><version>1.4.2</version>
</dependency>
利用MCP-Client接入MCP服务
使用MCP-Client接入stdio
@Testpublic void test_mcp_client_stdio() throws Exception {// 1. 构建传输层McpTransport transport = McpTransportFactory.createTransport("stdio", TransportConfig.stdio("npx", Arrays.asList("-y", "12306-mcp")));// 2. 构建mcp clientMcpClient mcpClient = new McpClient("12306-mcp","1.0.0", transport);// 3. 开启连接mcpClient.connect().join();// 4. 获取可用工具List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();System.out.println(tools);}
上述代码,使用stdio方式接入MCP服务,使用前需要本地安装12306的mcp,运行结果如下:
使用MCP-Client接入SSE服务
@Testpublic void test_mcp_client_sse() throws Exception {// 1. 构建传输层McpTransport transport = McpTransportFactory.createTransport("sse", TransportConfig.sse("https://mcp.amap.com/sse?key=您在高德官网上申请的key"));// 2. 构建mcp clientMcpClient mcpClient = new McpClient("高德MCP","1.0.0", transport);// 3. 开启连接mcpClient.connect().join();// 4. 获取可用工具List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();System.out.println(tools);// 5. 创建参数对象Map<String, Object> arguments = new HashMap<>();arguments.put("city", "北京");// 6. 调用函数String result = mcpClient.callTool("maps_weather", arguments).join();System.out.println(result);}
上述代码,使用SSE方式接入高德的MCP服务,注意,高德MCP需要key,需要自行官网申请。
并进行调用mcp当中的服务,调用结果如下:
使用MCP-Client接入Streamable-HTTP
接入方法同理
@Testpublic void test_mcp_client_http() throws Exception {// 1. 构建传输层McpTransport transport = McpTransportFactory.createTransport("http", TransportConfig.http("http://localhost:8080/mcp"));// 2. 构建mcp clientMcpClient mcpClient = new McpClient("Test-MCP","1.0.0", transport);// 3. 开启连接mcpClient.connect().join();// 4. 获取可用工具List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();System.out.println(tools);}
MCP网关连接MCP服务
使用配置文件接入mcp网关
创建 mcp-servers-config.json
配置文件,配置mcp:
{"mcpServers": {"map-mcp": {"type": "sse","url": "https://mcp.amap.com/sse?key=xxxxxxx"},"12306-mcp": {"type": "stdio","command": "npx","args": ["-y", "12306-mcp"]}}
}
@Testpublic void test_mcp_gateway() throws Exception {// 1. 创建并初始化MCP网关McpGateway gateway = new McpGateway();gateway.initialize("mcp-servers-config.json").join();// 2. 获取可用的工具List<Tool.Function> tools = gateway.getAvailableTools().join();System.out.println(tools);// 3. 创建参数对象Map<String, Object> arguments = new HashMap<>();arguments.put("city", "北京");// 4. 调用工具String result = gateway.callTool("maps_weather", arguments).join();System.out.println(result);// 5. 关闭网关gateway.shutdown().join();}
使用serverinfo连接mcp网关
@Testpublic void test_mcp_gateway_config() {McpGateway gateway = new McpGateway();// 不从配置文件初始化,手动添加服务器gateway.initialize().join();// 动态添加服务器McpServerConfig.McpServerInfo serverInfo = new McpServerConfig.McpServerInfo();serverInfo.setName("12306-mcp");serverInfo.setType("stdio");serverInfo.setCommand("npx");serverInfo.setArgs(Arrays.asList("-y", "12306-mcp"));serverInfo.setEnabled(true);gateway.addMcpServer("12306-mcp", serverInfo).join();List<Tool.Function> tools = gateway.getAvailableTools().join();System.out.println(tools);gateway.shutdown().join();}
使用client连接mcp网关
@Testpublic void test_mcp_gateway_client() {McpGateway gateway = new McpGateway();gateway.initialize().join();// 直接创建并添加MCP客户端McpTransport transport = McpTransportFactory.createTransport("stdio",TransportConfig.stdio("npx", Arrays.asList("-y", "12306-mcp")));McpClient mcpClient = new McpClient("12306", "1.0.0", transport);gateway.addMcpClient("12306", mcpClient).join();List<Tool.Function> tools = gateway.getAvailableTools().join();System.out.println(tools);gateway.shutdown().join();}
OpenAi 使用 MCP服务
@Testpublic void test_mcp_gateway_chat() throws Exception {// 1. 创建并初始化MCP网关McpGateway gateway = new McpGateway();gateway.initialize("mcp-servers-config.json").join();// 2. 检查网关状态Map<String, Object> status = gateway.getGatewayStatus();System.out.println("网关状态: " + status);// 3. 获取Chat服务IChatService chatService = aiService.getChatService(PlatformType.OPENAI);// 4. 构建Chat请求String content = "帮我查询今天北京到上海的高铁";ChatCompletion chatCompletion = ChatCompletion.builder().model("gpt-4o-mini").mcpServices("12306-mcp", "map-mcp").message(ChatMessage.withUser(content)).build();// 5. 发送Chat请求ChatCompletionResponse chatCompletionResponse = chatService.chatCompletion(chatCompletion);System.out.println("问题:" + content);System.out.println("回答:" + chatCompletionResponse.getChoices().get(0).getMessage().getContent());System.out.println("花费tokens:" + chatCompletionResponse.getUsage().getTotalTokens());// 6. 关闭网关gateway.shutdown().join();}
结合MySQL实现MCP动态管理(使用自定义数据源)
建立数据库
实体类、mapper、以及有关的service不再提供(比较占用篇幅),可以自行编写。
构建service实现自定义数据源
@Service
@Slf4j
public class McpGatewayService {@Autowiredprivate McpServiceConfigMapper mcpServiceConfigMapper;@Autowiredprivate UserMcpServiceMapper userMcpServiceMapper;@Autowiredprivate UserMcpAuthMapper userMcpAuthMapper;private McpGateway mcpGateway;private DatabaseMcpConfigSource databaseConfigSource;@PostConstructpublic void init() {// 初始化MCP网关mcpGateway = new McpGateway();// 使用数据库配置源databaseConfigSource = new DatabaseMcpConfigSource();mcpGateway.setConfigSource(databaseConfigSource);// 初始化网关mcpGateway.initialize().join();log.info("MCP网关初始化完成");}/*** 为用户设置MCP服务*/public CompletableFuture<Void> setupUserMcpServices(Long userId, List<String> serviceIds) {return CompletableFuture.runAsync(() -> {try {// 清除用户之前的MCP客户端mcpGateway.clearUserMcpClients(String.valueOf(userId)).join();for (String serviceId : serviceIds) {setupSingleUserMcpService(userId, serviceId);}log.info("为用户 {} 设置了 {} 个MCP服务", userId, serviceIds.size());} catch (Exception e) {log.error("为用户 {} 设置MCP服务失败", userId, e);throw new RuntimeException("设置MCP服务失败", e);}});}/*** 设置单个用户MCP服务*/private void setupSingleUserMcpService(Long userId, String serviceId) {try {// 1. 获取服务配置McpServiceConfig serviceConfig = mcpServiceConfigMapper.selectByServiceId(serviceId);if (serviceConfig == null || serviceConfig.getStatus() != 1) {log.warn("用户 {} 请求的服务 {} 不可用", userId, serviceId);return;}// 2. 检查用户是否已添加该服务UserMcpService userService = userMcpServiceMapper.selectByUserIdAndServiceId(userId, serviceId);if (userService == null || userService.getEnabled() != 1) {log.warn("用户 {} 未添加服务 {} 或已禁用", userId, serviceId);return;}// 3. 构建MCP服务配置McpServerConfig.McpServerInfo mcpConfig = buildMcpConfig(serviceConfig, userId);// 4. 创建MCP客户端String clientId = "user_" + userId + "_service_" + serviceId;TransportConfig transportConfig = createTransportConfig(mcpConfig);McpClient client = new McpClient(clientId, "1.0.0",McpTransportFactory.createTransport(serviceConfig.getTransportType(), transportConfig));// 5. 添加到网关(用户级别)mcpGateway.addUserMcpClient(String.valueOf(userId), serviceId, client).join();log.info("为用户 {} 成功设置MCP服务: {}", userId, serviceId);} catch (Exception e) {log.error("为用户 {} 设置MCP服务 {} 失败", userId, serviceId, e);}}/*** 构建MCP配置(包含用户认证信息)*/private McpServerConfig.McpServerInfo buildMcpConfig(McpServiceConfig serviceConfig, Long userId) {McpServerConfig.McpServerInfo config = new McpServerConfig.McpServerInfo();config.setName(serviceConfig.getServiceName());config.setType(serviceConfig.getTransportType());config.setCommand(serviceConfig.getCommand());config.setUrl(serviceConfig.getUrl());// 解析命令参数if (serviceConfig.getArgs() != null) {List<String> args = JSON.parseArray(serviceConfig.getArgs(), String.class);config.setArgs(args);}// 构建环境变量Map<String, String> env = new HashMap<>();// 1. 添加默认环境变量if (serviceConfig.getEnvVars() != null) {Map<String, String> defaultEnv = JSON.parseObject(serviceConfig.getEnvVars(),new TypeReference<Map<String, String>>() {});env.putAll(defaultEnv);}// 2. 如果需要认证,添加用户认证信息if (serviceConfig.getRequireAuth() == 1) {UserMcpAuth userAuth = userMcpAuthMapper.selectByUserIdAndServiceId(userId, serviceConfig.getServiceId());if (userAuth != null && userAuth.getStatus() == 1) {Map<String, String> authData = JSON.parseObject(userAuth.getAuthData(),new TypeReference<Map<String, String>>() {});env.putAll(authData);log.debug("为用户 {} 的服务 {} 添加了认证信息", userId, serviceConfig.getServiceId());} else {log.warn("用户 {} 的服务 {} 需要认证但未配置认证信息", userId, serviceConfig.getServiceId());}}config.setEnv(env);config.setEnabled(true);return config;}/*** 创建传输配置*/private TransportConfig createTransportConfig(McpServerConfig.McpServerInfo config) {String transportType = config.getType();switch (transportType.toLowerCase()) {case "stdio":return TransportConfig.stdio(config.getCommand(), config.getArgs(), config.getEnv());case "sse":return TransportConfig.sse(config.getUrl());case "streamable_http":case "http":return TransportConfig.streamableHttp(config.getUrl());default:throw new IllegalArgumentException("不支持的传输类型: " + transportType);}}/*** 获取用户可用的MCP工具*/public CompletableFuture<List<Tool.Function>> getUserAvailableTools(Long userId) {return mcpGateway.getUserAvailableTools(String.valueOf(userId));}/*** 调用用户的MCP工具*/public CompletableFuture<String> callUserTool(Long userId, String toolName, Object arguments) {return mcpGateway.callUserTool(String.valueOf(userId), toolName, arguments);}/*** 获取网关状态*/public Map<String, Object> getGatewayStatus() {return mcpGateway.getGatewayStatus();}/*** 数据库配置源实现*/private class DatabaseMcpConfigSource implements io.github.lnyocly.ai4j.mcp.config.McpConfigSource {private final List<ConfigChangeListener> listeners = new ArrayList<>();private volatile Map<String, McpServerConfig.McpServerInfo> cachedConfigs = new HashMap<>();public DatabaseMcpConfigSource() {loadConfigsFromDatabase();}@Overridepublic Map<String, McpServerConfig.McpServerInfo> getAllConfigs() {return new HashMap<>(cachedConfigs);}@Overridepublic McpServerConfig.McpServerInfo getConfig(String serverId) {return cachedConfigs.get(serverId);}@Overridepublic void addConfigChangeListener(ConfigChangeListener listener) {listeners.add(listener);}@Overridepublic void removeConfigChangeListener(ConfigChangeListener listener) {listeners.remove(listener);}/*** 从数据库加载全局配置(不需要认证的服务)*/private void loadConfigsFromDatabase() {try {// 只加载不需要认证的全局服务List<McpServiceConfig> configs = mcpServiceConfigMapper.selectByCondition(1, null);Map<String, McpServerConfig.McpServerInfo> newConfigs = new HashMap<>();for (McpServiceConfig config : configs) {// 只加载不需要认证的服务作为全局服务if (config.getRequireAuth() == 0) {McpServerConfig.McpServerInfo serverInfo = convertToServerInfo(config);newConfigs.put(config.getServiceId(), serverInfo);}}this.cachedConfigs = newConfigs;log.info("从数据库加载了 {} 个全局MCP服务配置", newConfigs.size());} catch (Exception e) {log.error("从数据库加载MCP配置失败", e);}}private McpServerConfig.McpServerInfo convertToServerInfo(McpServiceConfig config) {McpServerConfig.McpServerInfo serverInfo = new McpServerConfig.McpServerInfo();serverInfo.setName(config.getServiceName());serverInfo.setType(config.getTransportType());serverInfo.setCommand(config.getCommand());serverInfo.setUrl(config.getUrl());if (config.getArgs() != null) {List<String> args = JSON.parseArray(config.getArgs(), String.class);serverInfo.setArgs(args);}if (config.getEnvVars() != null) {Map<String, String> env = JSON.parseObject(config.getEnvVars(),new TypeReference<Map<String, String>>() {});serverInfo.setEnv(env);}serverInfo.setEnabled(true);return serverInfo;}/*** 刷新配置(管理员修改配置后调用)*/public void refreshConfigs() {Map<String, McpServerConfig.McpServerInfo> oldConfigs = new HashMap<>(cachedConfigs);loadConfigsFromDatabase();// 检测变更并通知监听器detectAndNotifyChanges(oldConfigs, cachedConfigs);}private void detectAndNotifyChanges(Map<String, McpServerConfig.McpServerInfo> oldConfigs,Map<String, McpServerConfig.McpServerInfo> newConfigs) {// 检测新增newConfigs.forEach((serverId, config) -> {if (!oldConfigs.containsKey(serverId)) {notifyConfigAdded(serverId, config);} else if (!configEquals(oldConfigs.get(serverId), config)) {notifyConfigUpdated(serverId, config);}});// 检测删除oldConfigs.forEach((serverId, config) -> {if (!newConfigs.containsKey(serverId)) {notifyConfigRemoved(serverId);}});}private boolean configEquals(McpServerConfig.McpServerInfo config1, McpServerConfig.McpServerInfo config2) {if (config1 == null && config2 == null) return true;if (config1 == null || config2 == null) return false;try {String json1 = JSON.toJSONString(config1);String json2 = JSON.toJSONString(config2);return json1.equals(json2);} catch (Exception e) {log.warn("比较配置时发生错误", e);return false;}}private void notifyConfigAdded(String serverId, McpServerConfig.McpServerInfo config) {listeners.forEach(listener -> {try {listener.onConfigAdded(serverId, config);} catch (Exception e) {log.error("通知配置添加失败: {}", serverId, e);}});}private void notifyConfigRemoved(String serverId) {listeners.forEach(listener -> {try {listener.onConfigRemoved(serverId);} catch (Exception e) {log.error("通知配置删除失败: {}", serverId, e);}});}private void notifyConfigUpdated(String serverId, McpServerConfig.McpServerInfo config) {listeners.forEach(listener -> {try {listener.onConfigUpdated(serverId, config);} catch (Exception e) {log.error("通知配置更新失败: {}", serverId, e);}});}}/*** 刷新全局配置*/public void refreshGlobalConfigs() {if (databaseConfigSource != null) {databaseConfigSource.refreshConfigs();log.info("全局MCP配置已刷新");}}
}
构建controller,实现admin端在线增加mcp服务
@RestController
@RequestMapping("/api/admin/mcp/services")
public class AdminMcpController extends BaseController {@Autowiredprivate McpServiceConfigService mcpServiceConfigService;@Autowiredprivate McpGatewayService mcpGatewayService;/*** 获取所有MCP服务配置*/@GetMapping("/list")public AjaxResult list(McpServiceConfig mcpServiceConfig) {List<McpServiceConfig> services = mcpServiceConfigService.selectList(null);return AjaxResult.success(services);}/*** 分页获取MCP服务配置*/@GetMapping("/page")public TableDataInfo page(McpServiceConfig mcpServiceConfig) {startPage();List<McpServiceConfig> services = mcpServiceConfigService.selectList(mcpServiceConfig);return getDataTable(services);}/*** 获取单个MCP服务配置*/@GetMapping("/{id}")public AjaxResult getService(@PathVariable Long id) {McpServiceConfig service = mcpServiceConfigService.getServiceById(id);if (service == null) {return AjaxResult.error("服务不存在");}return AjaxResult.success(service);}/*** 添加MCP服务配置*/@PostMappingpublic AjaxResult addService(@RequestBody McpServiceConfig config) {// 检查serviceId是否已存在if (mcpServiceConfigService.getServiceByServiceId(config.getServiceId()) != null) {return AjaxResult.error("服务ID已存在");}config.setCreatedBy(StpUtil.getLoginIdAsString());config.setStatus(1); // 默认启用boolean success = mcpServiceConfigService.addService(config);if (success) {// 自动刷新MCP配置try {mcpGatewayService.refreshGlobalConfigs();} catch (Exception e) {// 记录日志但不影响主流程logger.error("刷新MCP配置失败", e);}return AjaxResult.success();} else {return AjaxResult.error("添加失败");}}/*** 更新MCP服务配置*/@PutMappingpublic AjaxResult updateService(@RequestBody McpServiceConfig config) {McpServiceConfig existingConfig = mcpServiceConfigService.getServiceById(config.getId());// 如果修改了serviceId,检查新的serviceId是否已存在if (!existingConfig.getServiceId().equals(config.getServiceId())) {if (mcpServiceConfigService.getServiceByServiceId(config.getServiceId()) != null) {return AjaxResult.error("服务ID已存在");}}boolean success = mcpServiceConfigService.updateService(config);if (success) {return AjaxResult.success();} else {return AjaxResult.error("更新失败");}}/*** 上架/下架MCP服务*/@PutMapping("/{id}/status")public AjaxResult updateServiceStatus(@PathVariable Long id, @RequestParam Integer status) {if (status != 0 && status != 1) {return AjaxResult.error("状态值无效");}boolean success = mcpServiceConfigService.updateServiceStatus(id, status);if (success) {return AjaxResult.success();} else {return AjaxResult.error("更新状态失败");}}/*** 删除MCP服务配置*/@DeleteMapping("/{id}")public AjaxResult deleteService(@PathVariable Long id) {boolean success = mcpServiceConfigService.deleteService(id);if (success) {return AjaxResult.success();} else {return AjaxResult.error("删除失败");}}/*** 刷新MCP配置(管理员修改配置后调用)*/@PostMapping("/refresh")public AjaxResult refreshMcpConfigs() {try {// 方式1: 通过ApplicationEventPublisher发布事件// applicationEventPublisher.publishEvent(new McpConfigRefreshEvent());// 方式2: 直接调用刷新方法(需要暴露接口)mcpGatewayService.refreshGlobalConfigs();return AjaxResult.success();} catch (Exception e) {return AjaxResult.error("刷新配置失败: " + e.getMessage());}}
}