开源Agent平台Dify源码剖析系列(四)核心模块core/agent之CotAgentRunner
每一篇文章都短小精悍,不啰嗦。
笔者寄语
本期介绍Dify框架的核心模块core/agent的CotAgentRunner。接下来我们一起深入剖析core/agent目录下的所有代码,并以通俗易懂的方式解释。我们需要先了解这个目录的完整结构,然后逐个分析关键文件,最后总结整个Agent框架的设计和工作原理。
首先,让我查看core/agent目录的完整结构:
dify/api/core/agent
.
├── base_agent_runner.py # Agent框架的基础实现
├── cot_agent_runner.py # Chain of Thought (CoT) Agent Runner的实现
├── cot_chat_agent_runner.py # CoT Chat Agent Runner的实现
├── cot_completion_agent_runner.py # CoT Completion Agent Runner的实现
├── entities.py # 定义了Agent框架中的核心实体和数据结构
├── fc_agent_runner.py # CoT Completion Agent Runner的实现
├── __init__.py
├── output_parser
│ └── cot_output_parser.py # Chain of Thought输出解析器的实现
└── prompt
└── template.py # Agent提示模板的实现
我们已经对Dify的Agent系统架构有了初步了解。接下来我们要理解 CotAgentRunner
类,我们需要从「思维链(Chain of Thought, CoT)」的核心逻辑出发。这类代理的特点是:通过多轮「思考→调用工具→观察结果→再思考」的循环,逐步解决复杂问题,而不是一次性生成答案。下面我们结合代码,从「功能定位→核心流程→关键模块→实战场景」四个层面深入剖析。
一、功能定位:思维链代理的「迭代决策引擎」
CotAgentRunner
继承自 BaseAgentRunner
,是基于思维链逻辑的代理运行器。它的核心使命是:让 AI 代理像人类一样「逐步思考」—— 面对复杂问题时,先分析需求,调用工具获取信息,根据结果调整策略,直到得出最终答案。
例如:当用户提问「分析近 3 个月公司产品销量 Top3 的地区,并预测下月趋势」时,普通代理可能直接回答(或失败),而 CotAgentRunner
会:
思考:“需要先查近 3 个月的销量数据(调用销售数据集工具)”;
调用工具:获取各地区销量数据;
观察结果:发现北京、上海、广州是 Top3;
再思考:“需要基于历史数据预测趋势(调用预测工具)”;
调用工具:获取预测结果;
最终整理:输出分析和预测。
这种「分步拆解」的能力,正是 CotAgentRunner
的核心价值。
二、核心流程:多轮迭代的「思考 - 行动 - 观察」循环
run
方法是整个类的核心,它实现了思维链的迭代逻辑。我们以「用户查询季度销量趋势」为例,拆解完整流程:
步骤 1:初始化与参数准备
def run(...):self._repack_app_generate_entity(app_generate_entity) # 整理输入参数self._init_react_state(query) # 初始化思维链状态(如记录思考步骤)# 处理模型stop词(确保模型输出在"Observation"处停止,避免输出混乱)if "Observation" not in app_generate_entity.model_conf.stop:app_generate_entity.model_conf.stop.append("Observation")# 初始化指令(填充用户输入到提示模板)self._instruction = self._fill_in_inputs_from_external_data_tools(instruction, inputs)# 初始化工具(同BaseAgentRunner,转换为模型可识别的格式)tool_instances, prompt_messages_tools = self._init_prompt_tools()
关键操作:
为模型添加
Observation
作为停止词:确保模型在生成工具调用结果(观察)前停止,避免输出冗余内容。初始化思维链状态:通过
_init_react_state
初始化_agent_scratchpad
(记录每步思考的列表),用于跟踪代理的「思考 - 行动 - 观察」过程。
步骤 2:多轮迭代循环(核心逻辑)
代理通过循环迭代(最多 max_iteration_steps
次)完成思考过程,每次迭代包含「生成思考→决定是否调用工具→执行工具→记录结果」四步:
iteration_step = 1
max_iteration_steps = min(app_config.agent.max_iteration, 99) + 1 # 限制最大迭代次数(避免无限循环)while function_call_state and iteration_step <= max_iteration_steps:# 1. 创建本轮思考记录(数据库中保存)agent_thought = self.create_agent_thought(...)# 2. 调用LLM生成思考或工具调用指令prompt_messages = self._organize_prompt_messages() # 构建包含历史的提示chunks = model_instance.invoke_llm(...) # 流式调用LLMreact_chunks = CotAgentOutputParser.handle_react_stream_output(chunks, usage_dict) # 解析LLM输出为思维链结构# 3. 解析LLM输出,提取思考和工具调用指令scratchpad = AgentScratchpadUnit(...) # 记录本轮思考的细节(思考内容、行动指令等)for chunk in react_chunks:if isinstance(chunk, AgentScratchpadUnit.Action): # 若输出是工具调用指令scratchpad.action = chunk # 记录工具名称和参数else: # 若输出是自然语言思考scratchpad.thought += chunk # 记录思考内容yield ... # 流式返回思考过程(用户可实时看到代理"思考")# 4. 判断是否需要调用工具if scratchpad.is_final(): # 若代理决定输出最终答案final_answer = ... # 提取答案function_call_state = False # 结束循环else: # 若需要调用工具# 调用工具并获取结果observation, tool_invoke_meta = self._handle_invoke_action(...)scratchpad.observation = observation # 记录工具返回结果function_call_state = True # 继续下一轮迭代iteration_step += 1 # 迭代次数+1
核心逻辑解析:
- 多轮迭代的必要性
:复杂问题无法一次解决(如需要多次调用工具),循环确保代理能「逐步逼近答案」。
- 流式输出
:通过
yield
返回LLMResultChunk
,让用户实时看到代理的思考过程(如 “我现在需要查询销量数据…”),提升交互体验。 - 状态记录
:
AgentScratchpadUnit
类是核心状态容器,记录每轮的「思考内容(thought)→工具调用(action)→观察结果(observation)」,确保迭代过程可追溯。
步骤 3:工具调用与结果处理
当代理决定调用工具(如「销量查询工具」)时,_handle_invoke_action
方法负责执行调用并返回结果:
def _handle_invoke_action(...):tool_instance = tool_instances.get(tool_call_name) # 获取工具实例# 解析工具参数(如将JSON字符串转为字典)if isinstance(tool_call_args, str):tool_call_args = json.loads(tool_call_args)# 调用工具tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(...)# 发布工具返回的文件(如图表)for message_file_id in message_files:self.queue_manager.publish(QueueMessageFileEvent(...))return tool_invoke_response, tool_invoke_meta
关键操作:
工具参数兼容性处理:支持字符串或字典格式的参数,通过
json.loads
容错解析。工具调用结果实时发布:若工具返回文件(如销量趋势图),通过队列管理器实时推送给用户。
步骤 4:结束迭代与结果发布
当迭代达到最大次数或代理生成最终答案时,循环结束,发布最终结果:
# 输出最终答案yield LLMResultChunk(...)# 保存最后一轮思考记录self.save_agent_thought(...)# 发布结束事件(通知前端对话完成)self.queue_manager.publish(QueueMessageEndEvent(...))
三、关键模块:支撑思维链逻辑的 5 大核心组件
CotAgentRunner
的复杂逻辑依赖于多个模块的协同,以下是最关键的 5 个组件:
模块 / 类 | 作用 | 核心方法 / 属性 |
---|---|---|
思维链解析器 | 将 LLM 的原始输出(字符串)解析为结构化的「思考 - 行动 - 观察」数据 | CotAgentOutputParser.handle_react_stream_output |
状态容器 | 记录每轮迭代的思考、工具调用和结果,是迭代的「记忆载体」 | AgentScratchpadUnit 类 |
工具调用引擎 | 执行工具调用,处理参数解析和结果返回 | ToolEngine.agent_invoke 、 |
历史消息转换器 | 将历史对话转换为思维链格式,确保代理能基于历史继续思考 | _organize_historic_prompt_messages |
迭代控制器 | 管理迭代次数、判断是否终止循环(如达到最大步数或生成最终答案) | function_call_state 变量、 |
四、实战场景:一次完整的思维链交互示例
假设用户提问:「查询本季度(2024Q3)产品销量 Top3 的地区,并生成趋势图表」,我们跟踪 CotAgentRunner
的执行过程:
初始化:
接收用户 query,初始化
_agent_scratchpad
(空列表)。加载工具:「销量数据集检索工具」「图表生成工具」。
设置最大迭代次数(如 5 次),避免无限循环。
第 1 轮迭代:
- 生成思考
:调用 LLM,输入包含用户问题和工具列表的提示,LLM 输出:“我需要先查询 2024Q3 各地区的销量数据,使用销量数据集检索工具。”
- 解析输出
:
CotAgentOutputParser
将上述内容解析为AgentScratchpadUnit
,其中action
为{"action_name": "sales_dataset", "action_input": {"quarter": "2024Q3"}}
。 - 调用工具
:
_handle_invoke_action
调用「销量数据集检索工具」,返回结果:{"北京": 1200, "上海": 1000, "广州": 800}
。 - 记录状态
:
scratchpad.observation
保存工具返回结果,迭代次数变为 2。
- 生成思考
第 2 轮迭代:
- 生成思考
:LLM 基于历史(第 1 轮的思考和结果),输出:“已获取销量数据,Top3 为北京、上海、广州。需要生成趋势图表,使用图表生成工具。”
- 解析输出
:
action
为{"action_name": "chart_generator", "action_input": {"data": {...}}}
。 - 调用工具
:调用「图表生成工具」,返回图表文件 ID(如
file_123
)。 - 发布文件
:通过
QueueMessageFileEvent
将图表推送给用户,用户此时可看到图表预览。
- 生成思考
第 3 轮迭代:
- 生成思考
:LLM 基于图表结果,输出:“趋势显示北京销量持续上升,上海稳定,广州略有下降。可以得出最终结论。”
- 解析输出
:
is_final()
为True
,提取最终答案:“2024Q3 销量 Top3 地区为北京(1200)、上海(1000)、广州(800)…(附趋势图表)”。
- 生成思考
结束迭代:
输出最终答案,发布
QueueMessageEndEvent
通知对话完成,保存所有思考记录到数据库。
五、设计亮点与技术细节
兼容性设计:
处理不同模型的特性:通过
model_schema.features
判断模型是否支持流式工具调用(stream_tool_call
),动态调整输出方式。容错机制:工具参数解析时用
try-except
捕获 JSON 解码错误,确保程序稳定性。
可追溯性:
每轮思考过程通过
MessageAgentThought
记录到数据库,包含 token 使用量(llm_usage
)、工具输入输出等,支持后续审计和计费。
用户体验优化:
流式思考输出:用户无需等待完整结果,可实时看到代理的 “思考过程”,增强信任感。
工具结果即时推送:如图表生成后立即通过
QueueMessageFileEvent
推送,避免用户长时间等待。
六、总结:思维链代理的核心价值
CotAgentRunner
通过「多轮迭代 + 状态记录 + 工具调用」的设计,让 AI 代理具备了处理复杂问题的能力。它的本质是将大语言模型的 “黑箱推理” 转化为可拆解、可追溯的 “白箱步骤”,既提升了结果的可靠性,又增强了用户对 AI 决策过程的理解。
这种设计广泛应用于需要深度分析的场景(如数据分析、科研辅助、复杂规划),是构建 “智能代理” 的核心技术之一。