当前位置: 首页 > news >正文

Spring AI alibaba 智能体扩展

本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。

文章目录

  • 前言
  • 一、循环执行检测与处理
    • 1.1、定义is_stuck
    • 1.2、定义handle_stuck_state
    • 1.3、改造run方法
  • 二、智能体交互式执行
    • 1.1、AskHuman工具类
  • 三、Open Manus 集成MCP
    • 3.1、源码实现:initialize_mcp_servers
    • 3.2、源码实现:connect_mcp_server
    • 3.3、源码实现:disconnect_mcp_server
    • 3.4、源码实现:cleanup
    • 3.5、源码实现:think


前言

  在前篇中,参照Open Manus的源码,利用Spring AI 框架,实现了一个JAVA版本具有主要功能的智能体应用。在本篇中,继续参照源码对智能体应用的功能进行一些扩展:

  • 实现源码base.py中关于循环执行检测与处理的相关逻辑,即is_stuckhandle_stuck_state
  • 实现源码中AskHuman工具的使用,即让AI自主决定什么时候让用户进行辅助输入查询。

  最后还会看一下Open Manus中关于MCP集成的实现。

一、循环执行检测与处理

  参照源码,在base.pyrun方法的循环中,调用了两个方法:

  1. is_stuck是用于判断当前是否出现了循环执行的问题。
  2. handle_stuck_state则是对循环执行的问题进行处理。

在这里插入图片描述
  is_stuck方法的含义:

def is_stuck(self) -> bool:"""Check if the agent is stuck in a loop by detecting duplicate content"""# 检查消息数量是否足够进行循环检测(至少需要2条消息)if len(self.memory.messages) < 2:return False  # 消息数量不足,无法检测循环,返回False# 获取最后一条消息(最新的助手回复)last_message = self.memory.messages[-1]# 检查最后一条消息是否有内容if not last_message.content:return False  # 如果最后一条消息内容为空,返回False# 统计与最后一条消息内容相同的助手消息数量duplicate_count = sum(1  # 计数器,每找到一个重复消息就加1for msg in reversed(self.memory.messages[:-1])  # 遍历除最后一条外的所有历史消息(倒序)if msg.role == "assistant" and msg.content == last_message.content  # 只统计助手角色且内容完全相同的消息)# 判断重复次数是否达到阈值,如果达到则认为智能体陷入循环return duplicate_count >= self.duplicate_threshold

  handle_stuck_state方法的主要作用,当智能体陷入循环时,系统会自动添加提示来引导它改变策略,避免继续重复无效的行为。

  • 定义一个提示信息,告诉智能体检测到了重复响应,建议考虑新策略
  • 将这个提示信息添加到现有的下一步提示前面,让智能体在下次执行时能看到这个提醒
  • 记录警告日志,便于调试和监控智能体的状态
def handle_stuck_state(self):"""Handle stuck state by adding a prompt to change strategy"""# 定义检测到循环状态时的提示信息stuck_prompt = "\Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."# 将循环检测提示添加到现有的下一步提示前面,用换行符连接self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"# 记录警告日志,记录智能体检测到循环状态并添加了提示信息logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")

1.1、定义is_stuck

  我们在智能体的BaseAgent中首先定义is_stuck,并且参照源码的逻辑进行实现:

/*** 智能体循环检测阈值*/private Integer duplicateThreshold = 2;/*** 循环消息检测* @return*/public boolean isStuck() {
//        检查消息数量是否足够进行循环检测(至少需要2条消息)if (this.memoryMessages.size() < 2) {return false;}// 获取最后一条消息(最新的助手回复)Message lastMessage = CollUtil.getLast(this.memoryMessages);if (ObjUtil.isEmpty(lastMessage) || lastMessage.getText().isEmpty()){return false;}//统计与最后一条消息内容相同的助手消息数量int count = 0;//遍历除最后一条外的所有历史消息(倒序)for (int i = this.memoryMessages.size() - 2 ; i >=  0; i--) {Message message = memoryMessages.get(i);
//            只统计助手角色且内容完全相同的消息if (message instanceof AssistantMessage && message.getText().equals(lastMessage.getText())){count++;}}
//        判断重复次数是否达到阈值,如果达到则认为智能体陷入循环return count >= duplicateThreshold;}

1.2、定义handle_stuck_state

  然后定义出现循环问题的解决方法,相对比较简单:

    /*** 处理循环消息*/public void handleStuckState(){String stuckPrompt = "Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted.";this.nextStepPrompt = nextStepPrompt + "/n" + stuckPrompt;log.info("Agent detected stuck state. Added prompt: {}",stuckPrompt);}

1.3、改造run方法

  在run方法的循环中,增加循环检测和处理:

					 //循环条件 小于最大轮次,并且状态不为结束for (int i = 0; i < maxStep && agentState != AgentState.FINISHED; i++) {//执行具体的操作,子类实现String stepResult = step();// 扩展点 加入循环检测的逻辑if (this.isStuck()){this.handleStuckState();}//.....}

二、智能体交互式执行

  参照源码,在tool目录下定义了一个AskHuman工具,后续在自己的智能体项目中,需要将其转换为Spring AI工具类的形式。

# 定义人机交互工具类
class AskHuman(BaseTool):"""Add a tool to ask human for help."""# 定义工具的唯一标识符名称name: str = "ask_human"# 定义工具的功能描述,说明何时使用此工具description: str = "Use this tool to ask human for help."# 定义工具的参数结构,使用JSON Schema格式parameters: str = {"type": "object",  # 参数类型为对象"properties": {  # 定义对象的属性"inquire": {  # 定义询问参数"type": "string",  # 参数类型为字符串"description": "The question you want to ask human.",  # 参数描述}},"required": ["inquire"],  # 指定必需参数为inquire}# 定义异步执行方法,用于处理人机交互async def execute(self, inquire: str) -> str:# 使用input函数在控制台显示问题并等待用户输入,然后去除首尾空白字符return input(f"""Bot: {inquire}\n\nYou: """).strip()

1.1、AskHuman工具类

/*** 定义人机交互工具类*/
public class AskHuman {@Tool(description = "ask_human")public String askHuman(@ToolParam(description = "The question you want to ask human.") String inquire) {// 使用Scanner获取用户输入Scanner scanner = new Scanner(System.in);System.out.printf("Bot: %s%n%nYou: ", inquire);String userInput = scanner.nextLine();return userInput.trim();}
}

  然后将其注册到ToolRegistration中:

@Configuration
public class ToolRegistration {@Beanpublic ToolCallback[] availableTools(){FileOperationTool fileOperationTool = new FileOperationTool();PDFGenerationTool pdfGenerationTool = new PDFGenerationTool();ResourceDownloadTool resourceDownloadTool = new ResourceDownloadTool();TerminalOperationTool terminalOperationTool = new TerminalOperationTool();WebScrapingTool webScrapingTool = new WebScrapingTool();
//        WebSearchTool webSearchTool = new WebSearchTool();TerminateTool terminateTool = new TerminateTool();//人机交互工具类AskHumanTool askHumanTool = new AskHumanTool();return ToolCallbacks.from(fileOperationTool,pdfGenerationTool,resourceDownloadTool,terminalOperationTool,webScrapingTool,terminateTool,askHumanTool);}
}

三、Open Manus 集成MCP

  首先参照源码中的集成方式,在Manus.py中,首先定义了两个属性,表示MCP客户端实例记录已连接的MCP服务器
在这里插入图片描述

  其中还有一些关键的方法:

3.1、源码实现:initialize_mcp_servers

  该方法主要是初始化所有配置的MCP服务器连接

  • 遍历配置中的所有MCP服务器
  • 支持SSE和stdio两种连接方式
  • 包含错误处理和日志记录
    async def initialize_mcp_servers(self) -> None:"""Initialize connections to configured MCP servers."""# 遍历配置中的所有MCP服务器for server_id, server_config in config.mcp_config.servers.items():try:# 检查服务器类型是否为SSE(Server-Sent Events)if server_config.type == "sse":# 如果配置了URL,则连接到SSE服务器if server_config.url:await self.connect_mcp_server(server_config.url, server_id)# 记录成功连接日志logger.info(f"Connected to MCP server {server_id} at {server_config.url}")# 检查服务器类型是否为stdio(标准输入输出)elif server_config.type == "stdio":# 如果配置了命令,则通过stdio连接到服务器if server_config.command:await self.connect_mcp_server(server_config.command,server_id,use_stdio=True,stdio_args=server_config.args,)# 记录成功连接日志logger.info(f"Connected to MCP server {server_id} using command {server_config.command}")except Exception as e:# 记录连接失败的错误日志logger.error(f"Failed to connect to MCP server {server_id}: {e}")

3.2、源码实现:connect_mcp_server

  在上一个initialize_mcp_servers方法中,根据服务器类型,最终执行的是connect_mcp_server方法连接到服务器:

  • 检查连接方式(stdio或SSE)
  • 将自身添加到保存已连接MCP的集合中。
  • 将新工具添加到可用工具集合

  可以看到,在连接到服务器之后,会获取到MCP服务对应的工具,然后加入到一个可用工具集合中。
最终在调用工具时,还是会遍历本地的可用工具集合并且执行,而不是每次都去MCP服务器上获取。

    async def connect_mcp_server(self,server_url: str,server_id: str = "",use_stdio: bool = False,stdio_args: List[str] = None,) -> None:"""Connect to an MCP server and add its tools."""# 检查是否使用stdio连接方式if use_stdio:# 通过stdio连接到MCP服务器await self.mcp_clients.connect_stdio(server_url, stdio_args or [], server_id)# 将服务器信息添加到已连接服务器字典中self.connected_servers[server_id or server_url] = server_urlelse:# 通过SSE连接到MCP服务器await self.mcp_clients.connect_sse(server_url, server_id)# 将服务器信息添加到已连接服务器字典中self.connected_servers[server_id or server_url] = server_url# 更新可用工具集合,只添加来自此服务器的新工具new_tools = [tool for tool in self.mcp_clients.tools if tool.server_id == server_id]# 将新工具添加到可用工具集合中self.available_tools.add_tools(*new_tools)

3.3、源码实现:disconnect_mcp_server

  该方法的作用是,断开MCP服务器连接并移除其工具:

  • 断开服务器连接
  • 清理已连接MCP的集合
  • 重置可用工具集合
    async def disconnect_mcp_server(self, server_id: str = "") -> None:"""Disconnect from an MCP server and remove its tools."""# 断开与MCP服务器的连接await self.mcp_clients.disconnect(server_id)# 如果指定了服务器ID,则只移除该服务器if server_id:self.connected_servers.pop(server_id, None)else:# 如果没有指定服务器ID,则清空所有已连接服务器self.connected_servers.clear()# 重新构建可用工具集合,移除已断开服务器的工具base_tools = [toolfor tool in self.available_tools.toolsif not isinstance(tool, MCPClientTool)]# 创建新的工具集合,只包含基础工具self.available_tools = ToolCollection(*base_tools)# 添加剩余的MCP客户端工具self.available_tools.add_tools(*self.mcp_clients.tools)

3.4、源码实现:cleanup

  该方法的主要目的是清理MCP相关资源:

  • 清理浏览器上下文
  • 断开所有MCP服务器连接
    async def cleanup(self):"""Clean up Manus agent resources."""# 清理浏览器上下文助手if self.browser_context_helper:await self.browser_context_helper.cleanup_browser()# 只有在已初始化的情况下才断开所有MCP服务器连接if self._initialized:await self.disconnect_mcp_server()self._initialized = False

3.5、源码实现:think

  最后还对父类的think方法做了增强:

  • 检查初始化状态
  • 自动初始化MCP服务器

在这里插入图片描述

http://www.dtcms.com/a/449713.html

相关文章:

  • leetcode 130 被围绕的区域
  • AiCube图形化程序自动生成【SPI,SPI-DMA,I2C,I2C-DMA】代码,驱动OLED-12864
  • Java 变量类型
  • 怎么修改网站源文件高明网站设计多少钱
  • 第14节-增强表结构-Renaming-columns
  • 网站开发长沙免费国内linux服务器
  • 276-基于Python的爱奇艺视频数据可视化分析系统
  • Kubernetes容器运行时:cri-docker vs containerd
  • 购物网站图片的放大怎么做的wordpress表格布局插件
  • 【Canvas与机械】铜制螺帽
  • HarmonyOS ArkTS 深度解析:装饰器与状态管理机制
  • 关于建设网站的合作合同范本中国住房城乡建设部网站首页
  • 松江新城建设投资有限公司网站电子商务营销策略分析
  • 做零售的外贸网站老薛主机卸载wordpress
  • 【Android】Android 的三种动画(帧动画、View 动画、属性动画)
  • 南阳提高网站排名做网站租服务器多少钱
  • Dify本地初始化后安装模型供应商瞬间失败控制台报错401
  • Prometheus接入“飞书“实现自动化告警
  • 现在asp做网站网站改版后百度不收录
  • 做外贸要做什么网站搭建一个网站教程
  • 织梦做的网站怎么会被黑建设项目环评在什么网站公示
  • Python @装饰器用法详解
  • 模电基础:差分放大电路
  • SSM--day2--Spring(二)--核心容器注解开发Spring整合
  • 正规的装饰行业网站建设公司北京新浪网站制作公司
  • Verilog可综合电路设计:重要语法细节指南
  • 力扣hot100做题整理(41-50)
  • 大连模板网站制作公司电话创业平台有哪些
  • 实战指南:文生图模型 Animagine XL 4.0
  • Linux中fcntl系统调用的实现