开源Agent平台Dify源码剖析系列(六)核心模块core/agent之CotCompletionAgentRunner
每一篇文章都短小精悍,不啰嗦。
笔者寄语
本期介绍Dify框架的核心模块core/agent。接下来我们一起深入剖析core/agent目录下的所有代码,并以通俗易懂的方式解释。我们需要先了解这个目录的完整结构,然后逐个分析关键文件,最后总结整个Agent框架的设计和工作原理。
首先,让我查看core/agent目录的完整结构:
dify/api/core/agent
.
├── base_agent_runner.py # Agent框架的基础实现
├── cot_agent_runner.py # Chain of Thought (CoT) Agent Runner的实现
├── cot_chat_agent_runner.py # CoT Chat Agent Runner的实现
├── cot_completion_agent_runner.py # CoT Completion Agent Runner的实现
├── entities.py # 定义了Agent框架中的核心实体和数据结构
├── fc_agent_runner.py # CoT Completion Agent Runner的实现
├── __init__.py
├── output_parser
│ └── cot_output_parser.py # Chain of Thought输出解析器的实现
└── prompt
└── template.py # Agent提示模板的实现
CotCompletionAgentRunner 类承自 CotAgentRunner
,是专为完成式(Completion)场景优化的思维链代理实现。与聊天场景不同,完成式场景更注重将所有上下文整合为单个提示,引导模型生成连贯的回答。下面我们从架构定位、核心功能、设计模式和技术细节四个层面进行剖析。
一、架构定位:完成式场景的思维链代理
CotCompletionAgentRunner
在继承体系中的位置如下:
AppRunner (基础层)
↑
BaseAgentRunner (中间层)
↑
CotAgentRunner (CoT基础实现)
↑
├─ CotChatAgentRunner (聊天场景特化)
└─ CotCompletionAgentRunner (完成式场景特化)
与 CotChatAgentRunner
的对比:
- 聊天场景
以对话形式交互,维护多轮消息列表(如
[System, User, Assistant, User, ...]
) - 完成式场景
将所有上下文整合为单个提示,更接近传统的「指令 - 回答」模式
这种设计遵循了「单一职责原则」,通过继承实现功能复用,同时针对不同场景进行特化优化。
二、核心功能:三大提示构建器
类中定义了三个核心方法,分别负责构建指令提示、历史提示和整合所有提示组件:
1. _organize_instruction_prompt()
:构建指令提示
def_organize_instruction_prompt(self)->str:
assert self.app_config.agent and self.app_config.agent.promptfirst_prompt = self.app_config.agent.prompt.first_prompt# 填充模板变量system_prompt =(first_prompt.replace("{{instruction}}", self._instruction)
.replace("{{tools}}", json.dumps(jsonable_encoder(self._prompt_messages_tools)))
.replace("{{tool_names}}",", ".join([tool.name for tool in self._prompt_messages_tools]))
)return system_prompt
关键操作:
从配置中获取基础提示模板(如
first_prompt
)动态填充三个关键变量:
{{instruction}}
用户指令(如「分析销量趋势」)
{{tools}}
可用工具列表的 JSON 字符串
{{tool_names}}
工具名称的逗号分隔列表
与聊天场景的区别:
直接返回字符串而非
SystemPromptMessage
对象不区分系统消息和用户消息,全部整合为单个提示
2. _organize_historic_prompt()
:构建历史提示
def_organize_historic_prompt(self, current_session_messages=None)->str:historic_prompt_messages = self._organize_historic_prompt_messages(current_session_messages)historic_prompt =""for message in historic_prompt_messages:
ifisinstance(message, UserPromptMessage):historic_prompt +=f"Question: {message.content}\n\n"
elifisinstance(message, AssistantPromptMessage):
ifisinstance(message.content,str):historic_prompt += message.content +"\n\n"
elifisinstance(message.content,list):
for content in message.content:
ifisinstance(content, TextPromptMessageContent):historic_prompt += content.datareturn historic_prompt
关键操作:
将历史消息转换为特定格式(如
Question: ...
和Answer: ...
)处理不同类型的消息内容(字符串或列表)
过滤非文本内容(如图片、文件)
设计亮点:
使用
Question:
和Answer:
标签明确区分用户问题和 AI 回答支持嵌套内容(如多模态消息中的文本部分)
通过
_organize_historic_prompt_messages
方法复用历史消息处理逻辑
3. _organize_prompt_messages()
:整合所有提示组件
def_organize_prompt_messages(self)->list[PromptMessage]:
# 1. 构建系统提示system_prompt = self._organize_instruction_prompt()# 2. 构建历史提示historic_prompt = self._organize_historic_prompt()# 3. 构建当前助手消息(思维链)assistant_prompt =""
for unit in self._agent_scratchpad or[]:
if unit.is_final():assistant_prompt +=f"Final Answer: {unit.agent_response}"
else:assistant_prompt +=f"Thought: {unit.thought}\n\n"
if unit.action_str:assistant_prompt +=f"Action: {unit.action_str}\n\n"
if unit.observation:assistant_prompt +=f"Observation: {unit.observation}\n\n"# 4. 构建当前查询query_prompt =f"Question: {self._query}"# 5. 整合所有提示prompt =(system_prompt.replace("{{historic_messages}}", historic_prompt)
.replace("{{agent_scratchpad}}", assistant_prompt)
.replace("{{query}}", query_prompt)
)return[UserPromptMessage(content=prompt)]
关键操作:
将系统提示、历史对话、当前思考步骤和用户查询整合为单个字符串
通过模板替换(
{{historic_messages}}
,{{agent_scratchpad}}
,{{query}}
)动态组合内容返回包含完整提示的单个
UserPromptMessage
与聊天场景的区别:
不区分消息类型(系统 / 用户 / 助手),全部整合为用户消息
使用额外的模板变量(如
{{historic_messages}}
)来组织更复杂的提示结构
三、设计模式分析
1. 模板方法模式
与 CotChatAgentRunner
类似,CotCompletionAgentRunner
重写了父类的几个关键方法:
def_organize_instruction_prompt(self)->str:
# 特化实现...def_organize_historic_prompt(self, current_session_messages=None)->str:
# 特化实现...def_organize_prompt_messages(self)->list[PromptMessage]:
# 特化实现...
模式应用:
父类
CotAgentRunner
定义算法骨架子类重写特定步骤以适应完成式场景
整体流程保持一致,但具体实现不同
2. 策略模式
在处理不同类型的历史消息时,代码使用了策略模式的变体:
for message in historic_prompt_messages:
ifisinstance(message, UserPromptMessage):
# 处理用户消息的策略
elifisinstance(message, AssistantPromptMessage):
# 处理助手消息的策略
设计优势:
可扩展性:若需支持新的消息类型(如系统通知),只需添加新的条件分支
单一职责:每种消息类型的处理逻辑分离
松耦合:消息类型与处理逻辑解耦
3. 组合模式
在组织最终提示时,代码使用了组合模式:
prompt =(system_prompt.replace("{{historic_messages}}", historic_prompt)
.replace("{{agent_scratchpad}}", assistant_prompt)
.replace("{{query}}", query_prompt)
)
设计特点:
不同类型的提示组件(系统提示、历史对话、思维链)被组合成一个整体
组件之间松耦合,可独立变化
整体提示对 LLM 呈现为统一的输入
四、技术细节与设计亮点
1. 提示模板的灵活性
代码通过多层模板替换实现高度灵活的提示构建:
第一层:
_organize_instruction_prompt
替换{{instruction}}
,{{tools}}
,{{tool_names}}
第二层:
_organize_prompt_messages
替换{{historic_messages}}
,{{agent_scratchpad}}
,{{query}}
这种设计允许通过配置文件自定义提示结构,无需修改代码。
2. 思维链的显式化
与 CotChatAgentRunner
类似,代码将 Agent 的思考过程显式化:
assistant_prompt +=f"Thought: {unit.thought}\n\n"
assistant_prompt +=f"Action: {unit.action_str}\n\n"
assistant_prompt +=f"Observation: {unit.observation}\n\n"
技术优势:
可解释性:用户和开发者可追溯 Agent 的决策过程
调试便利:便于定位思维链中的问题
一致性:强制模型遵循特定的思考格式
3. 类型安全与健壮性
代码通过类型检查和断言确保健壮性:
if isinstance(message.content,str):historic_prompt += message.content +"\n\n"
elif isinstance(message.content,list):
for content in message.content:
ifisinstance(content, TextPromptMessageContent):historic_prompt += content.data
关键保障:
处理不同类型的消息内容(字符串或列表)
过滤非文本内容,避免 LLM 无法处理的格式
通过断言确保必要的配置项已设置
五、总结:完成式场景的思维链优化
CotCompletionAgentRunner
的设计针对完成式场景进行了以下优化:
- 提示整合
将系统指令、历史对话、当前思考步骤和用户查询整合为单个提示
- 格式统一
使用
Question:
和Answer:
标签明确区分不同类型的内容 - 模板扩展
通过多层模板替换支持更复杂的提示结构
- 思维链延续
保留了思维链的显式化表示,支持多步骤推理
这种设计使 Agent 在完成式场景中能够:
更好地理解复杂指令
保持长距离上下文连贯性
遵循结构化的思考模式
生成更符合预期的回答
对于开发者而言,这段代码展示了如何通过继承和设计模式,在保持核心功能的同时,针对不同场景进行高效优化。