Spring AI alibaba 智能体扩展
本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。
文章目录
- 前言
- 一、循环执行检测与处理
- 1.1、定义is_stuck
- 1.2、定义handle_stuck_state
- 1.3、改造run方法
- 二、智能体交互式执行
- 1.1、AskHuman工具类
- 三、Open Manus 集成MCP
- 3.1、源码实现:initialize_mcp_servers
- 3.2、源码实现:connect_mcp_server
- 3.3、源码实现:disconnect_mcp_server
- 3.4、源码实现:cleanup
- 3.5、源码实现:think
前言
在前篇中,参照Open Manus的源码,利用Spring AI 框架,实现了一个JAVA版本具有主要功能的智能体应用。在本篇中,继续参照源码对智能体应用的功能进行一些扩展:
- 实现源码
base.py
中关于循环执行检测与处理
的相关逻辑,即is_stuck
和handle_stuck_state
- 实现源码中
AskHuman
工具的使用,即让AI自主决定什么时候让用户进行辅助输入查询。
最后还会看一下Open Manus中关于MCP集成的实现。
一、循环执行检测与处理
参照源码,在base.py
的run
方法的循环中,调用了两个方法:
- is_stuck是用于判断当前是否出现了循环执行的问题。
- handle_stuck_state则是对循环执行的问题进行处理。
is_stuck
方法的含义:
def is_stuck(self) -> bool:"""Check if the agent is stuck in a loop by detecting duplicate content"""# 检查消息数量是否足够进行循环检测(至少需要2条消息)if len(self.memory.messages) < 2:return False # 消息数量不足,无法检测循环,返回False# 获取最后一条消息(最新的助手回复)last_message = self.memory.messages[-1]# 检查最后一条消息是否有内容if not last_message.content:return False # 如果最后一条消息内容为空,返回False# 统计与最后一条消息内容相同的助手消息数量duplicate_count = sum(1 # 计数器,每找到一个重复消息就加1for msg in reversed(self.memory.messages[:-1]) # 遍历除最后一条外的所有历史消息(倒序)if msg.role == "assistant" and msg.content == last_message.content # 只统计助手角色且内容完全相同的消息)# 判断重复次数是否达到阈值,如果达到则认为智能体陷入循环return duplicate_count >= self.duplicate_threshold
handle_stuck_state
方法的主要作用,当智能体陷入循环时,系统会自动添加提示来引导它改变策略,避免继续重复无效的行为。
- 定义一个提示信息,告诉智能体检测到了重复响应,建议考虑新策略
- 将这个提示信息添加到现有的下一步提示前面,让智能体在下次执行时能看到这个提醒
- 记录警告日志,便于调试和监控智能体的状态
def handle_stuck_state(self):"""Handle stuck state by adding a prompt to change strategy"""# 定义检测到循环状态时的提示信息stuck_prompt = "\Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."# 将循环检测提示添加到现有的下一步提示前面,用换行符连接self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"# 记录警告日志,记录智能体检测到循环状态并添加了提示信息logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
1.1、定义is_stuck
我们在智能体的BaseAgent
中首先定义is_stuck
,并且参照源码的逻辑进行实现:
/*** 智能体循环检测阈值*/private Integer duplicateThreshold = 2;/*** 循环消息检测* @return*/public boolean isStuck() {
// 检查消息数量是否足够进行循环检测(至少需要2条消息)if (this.memoryMessages.size() < 2) {return false;}// 获取最后一条消息(最新的助手回复)Message lastMessage = CollUtil.getLast(this.memoryMessages);if (ObjUtil.isEmpty(lastMessage) || lastMessage.getText().isEmpty()){return false;}//统计与最后一条消息内容相同的助手消息数量int count = 0;//遍历除最后一条外的所有历史消息(倒序)for (int i = this.memoryMessages.size() - 2 ; i >= 0; i--) {Message message = memoryMessages.get(i);
// 只统计助手角色且内容完全相同的消息if (message instanceof AssistantMessage && message.getText().equals(lastMessage.getText())){count++;}}
// 判断重复次数是否达到阈值,如果达到则认为智能体陷入循环return count >= duplicateThreshold;}
1.2、定义handle_stuck_state
然后定义出现循环问题的解决方法,相对比较简单:
/*** 处理循环消息*/public void handleStuckState(){String stuckPrompt = "Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted.";this.nextStepPrompt = nextStepPrompt + "/n" + stuckPrompt;log.info("Agent detected stuck state. Added prompt: {}",stuckPrompt);}
1.3、改造run方法
在run方法的循环中,增加循环检测和处理:
//循环条件 小于最大轮次,并且状态不为结束for (int i = 0; i < maxStep && agentState != AgentState.FINISHED; i++) {//执行具体的操作,子类实现String stepResult = step();// 扩展点 加入循环检测的逻辑if (this.isStuck()){this.handleStuckState();}//.....}
二、智能体交互式执行
参照源码,在tool
目录下定义了一个AskHuman
工具,后续在自己的智能体项目中,需要将其转换为Spring AI工具类的形式。
# 定义人机交互工具类
class AskHuman(BaseTool):"""Add a tool to ask human for help."""# 定义工具的唯一标识符名称name: str = "ask_human"# 定义工具的功能描述,说明何时使用此工具description: str = "Use this tool to ask human for help."# 定义工具的参数结构,使用JSON Schema格式parameters: str = {"type": "object", # 参数类型为对象"properties": { # 定义对象的属性"inquire": { # 定义询问参数"type": "string", # 参数类型为字符串"description": "The question you want to ask human.", # 参数描述}},"required": ["inquire"], # 指定必需参数为inquire}# 定义异步执行方法,用于处理人机交互async def execute(self, inquire: str) -> str:# 使用input函数在控制台显示问题并等待用户输入,然后去除首尾空白字符return input(f"""Bot: {inquire}\n\nYou: """).strip()
1.1、AskHuman工具类
/*** 定义人机交互工具类*/
public class AskHuman {@Tool(description = "ask_human")public String askHuman(@ToolParam(description = "The question you want to ask human.") String inquire) {// 使用Scanner获取用户输入Scanner scanner = new Scanner(System.in);System.out.printf("Bot: %s%n%nYou: ", inquire);String userInput = scanner.nextLine();return userInput.trim();}
}
然后将其注册到ToolRegistration
中:
@Configuration
public class ToolRegistration {@Beanpublic ToolCallback[] availableTools(){FileOperationTool fileOperationTool = new FileOperationTool();PDFGenerationTool pdfGenerationTool = new PDFGenerationTool();ResourceDownloadTool resourceDownloadTool = new ResourceDownloadTool();TerminalOperationTool terminalOperationTool = new TerminalOperationTool();WebScrapingTool webScrapingTool = new WebScrapingTool();
// WebSearchTool webSearchTool = new WebSearchTool();TerminateTool terminateTool = new TerminateTool();//人机交互工具类AskHumanTool askHumanTool = new AskHumanTool();return ToolCallbacks.from(fileOperationTool,pdfGenerationTool,resourceDownloadTool,terminalOperationTool,webScrapingTool,terminateTool,askHumanTool);}
}
三、Open Manus 集成MCP
首先参照源码中的集成方式,在Manus.py
中,首先定义了两个属性,表示MCP客户端实例
和记录已连接的MCP服务器
:
其中还有一些关键的方法:
3.1、源码实现:initialize_mcp_servers
该方法主要是初始化所有配置的MCP服务器连接
:
- 遍历配置中的所有MCP服务器
- 支持SSE和stdio两种连接方式
- 包含错误处理和日志记录
async def initialize_mcp_servers(self) -> None:"""Initialize connections to configured MCP servers."""# 遍历配置中的所有MCP服务器for server_id, server_config in config.mcp_config.servers.items():try:# 检查服务器类型是否为SSE(Server-Sent Events)if server_config.type == "sse":# 如果配置了URL,则连接到SSE服务器if server_config.url:await self.connect_mcp_server(server_config.url, server_id)# 记录成功连接日志logger.info(f"Connected to MCP server {server_id} at {server_config.url}")# 检查服务器类型是否为stdio(标准输入输出)elif server_config.type == "stdio":# 如果配置了命令,则通过stdio连接到服务器if server_config.command:await self.connect_mcp_server(server_config.command,server_id,use_stdio=True,stdio_args=server_config.args,)# 记录成功连接日志logger.info(f"Connected to MCP server {server_id} using command {server_config.command}")except Exception as e:# 记录连接失败的错误日志logger.error(f"Failed to connect to MCP server {server_id}: {e}")
3.2、源码实现:connect_mcp_server
在上一个initialize_mcp_servers
方法中,根据服务器类型,最终执行的是connect_mcp_server
方法连接到服务器:
- 检查连接方式(stdio或SSE)
- 将自身添加到保存已连接MCP的集合中。
- 将新工具添加到可用工具集合
可以看到,在连接到服务器之后,会获取到MCP服务对应的工具,然后加入到一个可用工具集合
中。
最终在调用工具时,还是会遍历本地的可用工具集合并且执行,而不是每次都去MCP服务器上获取。
async def connect_mcp_server(self,server_url: str,server_id: str = "",use_stdio: bool = False,stdio_args: List[str] = None,) -> None:"""Connect to an MCP server and add its tools."""# 检查是否使用stdio连接方式if use_stdio:# 通过stdio连接到MCP服务器await self.mcp_clients.connect_stdio(server_url, stdio_args or [], server_id)# 将服务器信息添加到已连接服务器字典中self.connected_servers[server_id or server_url] = server_urlelse:# 通过SSE连接到MCP服务器await self.mcp_clients.connect_sse(server_url, server_id)# 将服务器信息添加到已连接服务器字典中self.connected_servers[server_id or server_url] = server_url# 更新可用工具集合,只添加来自此服务器的新工具new_tools = [tool for tool in self.mcp_clients.tools if tool.server_id == server_id]# 将新工具添加到可用工具集合中self.available_tools.add_tools(*new_tools)
3.3、源码实现:disconnect_mcp_server
该方法的作用是,断开MCP服务器连接并移除其工具:
- 断开服务器连接
- 清理已连接MCP的集合
- 重置可用工具集合
async def disconnect_mcp_server(self, server_id: str = "") -> None:"""Disconnect from an MCP server and remove its tools."""# 断开与MCP服务器的连接await self.mcp_clients.disconnect(server_id)# 如果指定了服务器ID,则只移除该服务器if server_id:self.connected_servers.pop(server_id, None)else:# 如果没有指定服务器ID,则清空所有已连接服务器self.connected_servers.clear()# 重新构建可用工具集合,移除已断开服务器的工具base_tools = [toolfor tool in self.available_tools.toolsif not isinstance(tool, MCPClientTool)]# 创建新的工具集合,只包含基础工具self.available_tools = ToolCollection(*base_tools)# 添加剩余的MCP客户端工具self.available_tools.add_tools(*self.mcp_clients.tools)
3.4、源码实现:cleanup
该方法的主要目的是清理MCP相关资源:
- 清理浏览器上下文
- 断开所有MCP服务器连接
async def cleanup(self):"""Clean up Manus agent resources."""# 清理浏览器上下文助手if self.browser_context_helper:await self.browser_context_helper.cleanup_browser()# 只有在已初始化的情况下才断开所有MCP服务器连接if self._initialized:await self.disconnect_mcp_server()self._initialized = False
3.5、源码实现:think
最后还对父类的think
方法做了增强:
- 检查初始化状态
- 自动初始化MCP服务器