当前位置: 首页 > news >正文

开源Agent平台Dify源码剖析系列(四)核心模块core/agent之CotAgentRunner

每一篇文章都短小精悍,不啰嗦。

笔者寄语

本期介绍Dify框架的核心模块core/agent的CotAgentRunner。接下来我们一起深入剖析core/agent目录下的所有代码,并以通俗易懂的方式解释。我们需要先了解这个目录的完整结构,然后逐个分析关键文件,最后总结整个Agent框架的设计和工作原理。

首先,让我查看core/agent目录的完整结构:

dify/api/core/agent.├── base_agent_runner.py    # Agent框架的基础实现├── cot_agent_runner.py    # Chain of Thought (CoT) Agent Runner的实现├── cot_chat_agent_runner.py    # CoT Chat Agent Runner的实现├── cot_completion_agent_runner.py    # CoT Completion Agent Runner的实现├── entities.py    # 定义了Agent框架中的核心实体和数据结构├── fc_agent_runner.py    # CoT Completion Agent Runner的实现├── __init__.py├── output_parser│   └── cot_output_parser.py  # Chain of Thought输出解析器的实现└── prompt    └── template.py  # Agent提示模板的实现

我们已经对Dify的Agent系统架构有了初步了解。接下来我们要理解 CotAgentRunner 类,我们需要从「思维链(Chain of Thought, CoT)」的核心逻辑出发。这类代理的特点是:通过多轮「思考→调用工具→观察结果→再思考」的循环,逐步解决复杂问题,而不是一次性生成答案。下面我们结合代码,从「功能定位→核心流程→关键模块→实战场景」四个层面深入剖析。

一、功能定位:思维链代理的「迭代决策引擎」

CotAgentRunner 继承自 BaseAgentRunner,是基于思维链逻辑的代理运行器。它的核心使命是:让 AI 代理像人类一样「逐步思考」—— 面对复杂问题时,先分析需求,调用工具获取信息,根据结果调整策略,直到得出最终答案。

例如:当用户提问「分析近 3 个月公司产品销量 Top3 的地区,并预测下月趋势」时,普通代理可能直接回答(或失败),而 CotAgentRunner 会:

  1. 思考:“需要先查近 3 个月的销量数据(调用销售数据集工具)”;

  2. 调用工具:获取各地区销量数据;

  3. 观察结果:发现北京、上海、广州是 Top3;

  4. 再思考:“需要基于历史数据预测趋势(调用预测工具)”;

  5. 调用工具:获取预测结果;

  6. 最终整理:输出分析和预测。

这种「分步拆解」的能力,正是 CotAgentRunner 的核心价值。

二、核心流程:多轮迭代的「思考 - 行动 - 观察」循环

run 方法是整个类的核心,它实现了思维链的迭代逻辑。我们以「用户查询季度销量趋势」为例,拆解完整流程:

步骤 1:初始化与参数准备
def run(...):self._repack_app_generate_entity(app_generate_entity)  # 整理输入参数self._init_react_state(query)  # 初始化思维链状态(如记录思考步骤)# 处理模型stop词(确保模型输出在"Observation"处停止,避免输出混乱)if "Observation" not in app_generate_entity.model_conf.stop:app_generate_entity.model_conf.stop.append("Observation")# 初始化指令(填充用户输入到提示模板)self._instruction = self._fill_in_inputs_from_external_data_tools(instruction, inputs)# 初始化工具(同BaseAgentRunner,转换为模型可识别的格式)tool_instances, prompt_messages_tools = self._init_prompt_tools()

关键操作

  • 为模型添加 Observation 作为停止词:确保模型在生成工具调用结果(观察)前停止,避免输出冗余内容。

  • 初始化思维链状态:通过 _init_react_state 初始化 _agent_scratchpad(记录每步思考的列表),用于跟踪代理的「思考 - 行动 - 观察」过程。

步骤 2:多轮迭代循环(核心逻辑)

代理通过循环迭代(最多 max_iteration_steps 次)完成思考过程,每次迭代包含「生成思考→决定是否调用工具→执行工具→记录结果」四步:

iteration_step = 1
max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1  # 限制最大迭代次数(避免无限循环)while function_call_state and iteration_step <= max_iteration_steps:# 1. 创建本轮思考记录(数据库中保存)agent_thought = self.create_agent_thought(...)# 2. 调用LLM生成思考或工具调用指令prompt_messages = self._organize_prompt_messages()  # 构建包含历史的提示chunks = model_instance.invoke_llm(...)  # 流式调用LLMreact_chunks = CotAgentOutputParser.handle_react_stream_output(chunks, usage_dict)  # 解析LLM输出为思维链结构# 3. 解析LLM输出,提取思考和工具调用指令scratchpad = AgentScratchpadUnit(...)  # 记录本轮思考的细节(思考内容、行动指令等)for chunk in react_chunks:if isinstance(chunk, AgentScratchpadUnit.Action):  # 若输出是工具调用指令scratchpad.action = chunk  # 记录工具名称和参数else:  # 若输出是自然语言思考scratchpad.thought += chunk  # 记录思考内容yield ...  # 流式返回思考过程(用户可实时看到代理"思考")# 4. 判断是否需要调用工具if scratchpad.is_final():  # 若代理决定输出最终答案final_answer = ...  # 提取答案function_call_state = False  # 结束循环else:  # 若需要调用工具# 调用工具并获取结果observation, tool_invoke_meta = self._handle_invoke_action(...)scratchpad.observation = observation  # 记录工具返回结果function_call_state = True  # 继续下一轮迭代iteration_step += 1  # 迭代次数+1

核心逻辑解析

  • 多轮迭代的必要性

    :复杂问题无法一次解决(如需要多次调用工具),循环确保代理能「逐步逼近答案」。

  • 流式输出

    :通过 yield 返回 LLMResultChunk,让用户实时看到代理的思考过程(如 “我现在需要查询销量数据…”),提升交互体验。

  • 状态记录

    AgentScratchpadUnit 类是核心状态容器,记录每轮的「思考内容(thought)→工具调用(action)→观察结果(observation)」,确保迭代过程可追溯。

步骤 3:工具调用与结果处理

当代理决定调用工具(如「销量查询工具」)时,_handle_invoke_action 方法负责执行调用并返回结果:

def _handle_invoke_action(...):tool_instance = tool_instances.get(tool_call_name)  # 获取工具实例# 解析工具参数(如将JSON字符串转为字典)if isinstance(tool_call_args, str):tool_call_args = json.loads(tool_call_args)# 调用工具tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(...)# 发布工具返回的文件(如图表)for message_file_id in message_files:self.queue_manager.publish(QueueMessageFileEvent(...))return tool_invoke_response, tool_invoke_meta

关键操作

  • 工具参数兼容性处理:支持字符串或字典格式的参数,通过 json.loads 容错解析。

  • 工具调用结果实时发布:若工具返回文件(如销量趋势图),通过队列管理器实时推送给用户。

步骤 4:结束迭代与结果发布

当迭代达到最大次数或代理生成最终答案时,循环结束,发布最终结果:

# 输出最终答案yield LLMResultChunk(...)# 保存最后一轮思考记录self.save_agent_thought(...)# 发布结束事件(通知前端对话完成)self.queue_manager.publish(QueueMessageEndEvent(...))

三、关键模块:支撑思维链逻辑的 5 大核心组件

CotAgentRunner 的复杂逻辑依赖于多个模块的协同,以下是最关键的 5 个组件:

模块 / 类

作用

核心方法 / 属性

思维链解析器

将 LLM 的原始输出(字符串)解析为结构化的「思考 - 行动 - 观察」数据

CotAgentOutputParser.handle_react_stream_output
状态容器

记录每轮迭代的思考、工具调用和结果,是迭代的「记忆载体」

AgentScratchpadUnit

 类

工具调用引擎

执行工具调用,处理参数解析和结果返回

ToolEngine.agent_invoke

_handle_invoke_action

历史消息转换器

将历史对话转换为思维链格式,确保代理能基于历史继续思考

_organize_historic_prompt_messages
迭代控制器

管理迭代次数、判断是否终止循环(如达到最大步数或生成最终答案)

function_call_state

 变量、max_iteration_steps

四、实战场景:一次完整的思维链交互示例

假设用户提问:「查询本季度(2024Q3)产品销量 Top3 的地区,并生成趋势图表」,我们跟踪 CotAgentRunner 的执行过程:

  1. 初始化

    • 接收用户 query,初始化 _agent_scratchpad(空列表)。

    • 加载工具:「销量数据集检索工具」「图表生成工具」。

    • 设置最大迭代次数(如 5 次),避免无限循环。

  2. 第 1 轮迭代

    • 生成思考

      :调用 LLM,输入包含用户问题和工具列表的提示,LLM 输出:“我需要先查询 2024Q3 各地区的销量数据,使用销量数据集检索工具。”

    • 解析输出

      CotAgentOutputParser 将上述内容解析为 AgentScratchpadUnit,其中 action 为 {"action_name": "sales_dataset", "action_input": {"quarter": "2024Q3"}}

    • 调用工具

      _handle_invoke_action 调用「销量数据集检索工具」,返回结果:{"北京": 1200, "上海": 1000, "广州": 800}

    • 记录状态

      scratchpad.observation 保存工具返回结果,迭代次数变为 2。

  3. 第 2 轮迭代

    • 生成思考

      :LLM 基于历史(第 1 轮的思考和结果),输出:“已获取销量数据,Top3 为北京、上海、广州。需要生成趋势图表,使用图表生成工具。”

    • 解析输出

      action 为 {"action_name": "chart_generator", "action_input": {"data": {...}}}

    • 调用工具

      :调用「图表生成工具」,返回图表文件 ID(如 file_123)。

    • 发布文件

      :通过 QueueMessageFileEvent 将图表推送给用户,用户此时可看到图表预览。

  4. 第 3 轮迭代

    • 生成思考

      :LLM 基于图表结果,输出:“趋势显示北京销量持续上升,上海稳定,广州略有下降。可以得出最终结论。”

    • 解析输出

      is_final() 为 True,提取最终答案:“2024Q3 销量 Top3 地区为北京(1200)、上海(1000)、广州(800)…(附趋势图表)”。

  5. 结束迭代

    • 输出最终答案,发布 QueueMessageEndEvent 通知对话完成,保存所有思考记录到数据库。

五、设计亮点与技术细节

  1. 兼容性设计

    • 处理不同模型的特性:通过 model_schema.features 判断模型是否支持流式工具调用(stream_tool_call),动态调整输出方式。

    • 容错机制:工具参数解析时用 try-except 捕获 JSON 解码错误,确保程序稳定性。

  2. 可追溯性

    • 每轮思考过程通过 MessageAgentThought 记录到数据库,包含 token 使用量(llm_usage)、工具输入输出等,支持后续审计和计费。

  3. 用户体验优化

    • 流式思考输出:用户无需等待完整结果,可实时看到代理的 “思考过程”,增强信任感。

    • 工具结果即时推送:如图表生成后立即通过 QueueMessageFileEvent 推送,避免用户长时间等待。

六、总结:思维链代理的核心价值

CotAgentRunner 通过「多轮迭代 + 状态记录 + 工具调用」的设计,让 AI 代理具备了处理复杂问题的能力。它的本质是将大语言模型的 “黑箱推理” 转化为可拆解、可追溯的 “白箱步骤”,既提升了结果的可靠性,又增强了用户对 AI 决策过程的理解。

这种设计广泛应用于需要深度分析的场景(如数据分析、科研辅助、复杂规划),是构建 “智能代理” 的核心技术之一。

http://www.dtcms.com/a/283077.html

相关文章:

  • SMTPman,发送邮件服务器smtp的功能详解!
  • 统计功效是什么?
  • ST17H36 蓝牙Soc开发(4)—— 外设应用1
  • mac电脑无法阅读runc源码
  • 【网易云-header】
  • HarmonyOS从入门到精通:自定义组件开发指南(九):组件复合与组合模式探秘
  • S7-1200 数字量模块接线:从源型 / 漏型到信号板扩展全解析
  • 【Tools】Saleae Logic 16软件安装教程
  • 【人工智能99问】损失函数有哪些,如何选择?(6/99)
  • 道可云人工智能每日资讯|天津市人工智能(AI+信创)创新生态联盟成立
  • 手撕设计模式之消息推送系统——桥接模式
  • MyBatis详解以及在IDEA中的开发
  • TRAE + Milvus MCP:用自然语言 0 门槛玩转向量数据库
  • 第五章 OB 分布式事务高级技术
  • 【Unity基础】Unity中的Pivot vs Center 小实验步骤列表 + 截图指引
  • 股票基金量化开源平台对比
  • 用AI破解数据质量难题
  • 【前端】CSS类命名规范指南
  • 主流 TOP5 AI智能客服系统对比与推荐
  • 高效开发利器:用宝塔面板快速搭建 PHP 开发环境教程
  • Android开发知识点总结合集
  • 微服务引擎 MSE 及 API 网关 2025 年 4 月产品动态
  • Docker 安装和配置 MySQL 8.0.36 的详细步骤
  • @[TOC](斐波那契数列模型)
  • RHCSA(配置本地yum源仓库)
  • 【Canvas与文字】“浪急方舟静 山险马背平”中堂
  • 【Datawhale AI 夏令营】电商评论用户洞察
  • 亚马逊广告优化策略:如何通过预算管理提升投放效果?
  • @classmethod
  • 无细胞蛋白表达|线性DNA快速表达|高效体外合成系统