DeerFlow多智能体项目分析-依赖LangGraph实现条件路由和中断机制的源码解析
DeerFlow依赖LangGraph实现条件路由和中断机制的源码解析
概述
DeerFlow基于LangGraph构建了复杂的条件路由和中断机制,实现了智能的工作流控制和人机交互。本文档深入分析这两个核心机制的实现逻辑和关键代码。这部分也同样是langGraph的核心,通过使用LangGraph的特色功能,实现智能体交互中的中断和人工校准(辅助更精准的计划制定,其实也可以理解为任务拆解)。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
1. 条件路由机制
1.1 条件路由架构设计
条件路由是DeerFlow工作流控制的核心,通过状态检查和业务逻辑判断来决定下一个执行节点。
1.1.1 路由决策函数
源码文件路径: src/graph/builder.py
def continue_to_running_research_team(state: State):"""研究团队路由决策函数 - 核心路由逻辑"""current_plan = state.get("current_plan")# 检查1: 计划是否存在if not current_plan or not current_plan.steps:return "planner"# 检查2: 所有步骤是否完成if all(step.execution_res for step in current_plan.steps):return "planner" # 返回规划器生成报告# 检查3: 根据未完成步骤类型路由for step in current_plan.steps:if not step.execution_res: # 找到第一个未完成步骤if step.step_type == StepType.RESEARCH:return "researcher" # 路由到研究员if step.step_type == StepType.PROCESSING:return "coder" # 路由到编程员return "planner" # 默认返回规划器
1.1.2 条件边配置
源码文件路径: src/graph/builder.py
def _build_base_graph():"""构建基础状态图 - 条件边配置"""builder = StateGraph(State)# 核心条件边: 研究团队路由builder.add_conditional_edges("research_team", # 源节点continue_to_running_research_team, # 路由决策函数["planner", "researcher", "coder"], # 可能的目标节点)# 协调器条件边: 支持澄清功能builder.add_conditional_edges("coordinator",lambda state: state.get("goto", "planner"), # 动态路由["planner", "background_investigator", "coordinator", END],)return builder
1.2 协调器动态路由
1.2.1 澄清功能路由逻辑
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):"""协调器节点 - 复杂的条件路由实现"""enable_clarification = state.get("enable_clarification", False)# 路由分支1: 澄清功能禁用if not enable_clarification:# 直接路由到规划器goto = "planner"if state.get("enable_background_investigation"):goto = "background_investigator"# 路由分支2: 澄清功能启用else:clarification_rounds = state.get("clarification_rounds", 0)max_clarification_rounds = state.get("max_clarification_rounds", 3)# 子路由1: 继续澄清对话if (not response.tool_calls and response.content and clarification_rounds < max_clarification_rounds):goto = "__end__" # 中断等待用户输入# 子路由2: 澄清完成或达到最大轮次else:goto = "planner"if state.get("enable_background_investigation"):goto = "background_investigator"# 返回Command对象控制路由return Command(update={"goto": goto,# ... 其他状态更新},goto=goto,)
1.2.2 澄清状态检查函数
源码文件路径: src/graph/nodes.py
def needs_clarification(state: dict) -> bool:"""澄清需求检查 - 路由决策辅助函数"""# 检查1: 澄清功能是否启用if not state.get("enable_clarification", False):return False# 检查2: 获取澄清状态clarification_rounds = state.get("clarification_rounds", 0)is_clarification_complete = state.get("is_clarification_complete", False)max_clarification_rounds = state.get("max_clarification_rounds", 3)# 检查3: 综合判断是否需要继续澄清return (clarification_rounds > 0 # 已开始澄清and not is_clarification_complete # 澄清未完成and clarification_rounds <= max_clarification_rounds # 未超过最大轮次)
1.3 Command对象路由控制
1.3.1 规划器节点路由
源码文件路径: src/graph/nodes.py
def planner_node(state: State, config: RunnableConfig):"""规划器节点 - Command路由控制"""plan_iterations = state.get("plan_iterations", 0)# 路由条件1: 超过最大迭代次数if plan_iterations >= configurable.max_plan_iterations:return Command(goto="reporter")# 路由条件2: 计划包含足够上下文if isinstance(curr_plan, dict) and curr_plan.get("has_enough_context"):return Command(update={"messages": [AIMessage(content=full_response, name="planner")],"current_plan": new_plan,},goto="reporter", # 直接生成报告)# 路由条件3: 需要人工反馈return Command(update={"messages": [AIMessage(content=full_response, name="planner")],"current_plan": full_response,},goto="human_feedback", # 等待人工审核)
1.3.2 人工反馈路由
源码文件路径: src/graph/nodes.py
def human_feedback_node(state):"""人工反馈节点 - 基于反馈内容的路由"""auto_accepted_plan = state.get("auto_accepted_plan", False)if not auto_accepted_plan:feedback = interrupt("请审核计划。")# 路由分支1: 编辑计划if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):return Command(update={"messages": [HumanMessage(content=feedback, name="feedback")]},goto="planner", # 返回规划器修改)# 路由分支2: 接受计划elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):goto = "research_team" # 开始执行研究# 计划验证和路由try:new_plan = json.loads(repair_json_output(current_plan))return Command(update={"current_plan": Plan.model_validate(new_plan),"plan_iterations": plan_iterations + 1,},goto=goto,)except json.JSONDecodeError:# 错误处理路由if plan_iterations > 1:return Command(goto="reporter")else:return Command(goto="__end__")
2. 中断机制
2.1 中断机制架构
中断机制允许工作流在特定点暂停,等待外部输入或处理,是实现人机交互的关键。
2.1.1 基础中断实现
源码文件路径: src/graph/nodes.py
from langgraph.types import interruptdef human_feedback_node(state):"""基础中断机制 - 等待人工反馈"""auto_accepted_plan = state.get("auto_accepted_plan", False)if not auto_accepted_plan:# 核心中断调用feedback = interrupt("请审核计划。")# 中断后的处理逻辑if feedback:# 根据反馈内容决定后续流程if str(feedback).upper().startswith("[EDIT_PLAN]"):# 处理编辑请求return Command(update={"messages": [HumanMessage(content=feedback, name="feedback")]},goto="planner",)elif str(feedback).upper().startswith("[ACCEPTED]"):# 处理接受请求logger.info("用户接受了计划。")
2.1.2 澄清功能中断
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):"""澄清功能中断 - 复杂中断处理"""enable_clarification = state.get("enable_clarification", False)if enable_clarification:clarification_rounds = state.get("clarification_rounds", 0)max_clarification_rounds = state.get("max_clarification_rounds", 3)# 中断条件: LLM提出澄清问题if not response.tool_calls and response.content:if clarification_rounds < max_clarification_rounds:# 使用__interrupt__键实现中断return Command(update={"messages": state_messages,"clarification_rounds": clarification_rounds + 1,"is_clarification_complete": False,"__interrupt__": [("coordinator", response.content)], # 中断标记},goto="__end__", # 结束当前流程等待输入)else:# 达到最大轮次,不再中断logger.warning(f"达到最大澄清轮次({max_clarification_rounds})。移交给规划器。")goto = "planner"
2.2 中断恢复机制
2.2.1 澄清对话恢复
源码文件路径: src/workflow.py
async def run_agent_workflow_async(user_input: str, **kwargs):"""中断恢复 - 澄清对话继续"""# 执行工作流async for s in graph.astream(input=initial_state, config=config, stream_mode="values"):final_state = s# ... 处理流式输出# 检查是否需要澄清 - 中断恢复逻辑if final_state and isinstance(final_state, dict):from src.graph.nodes import needs_clarificationif needs_clarification(final_state):# 等待用户输入 - 中断点clarification_rounds = final_state.get("clarification_rounds", 0)max_clarification_rounds = final_state.get("max_clarification_rounds", 3)user_response = input(f"您的回复 ({clarification_rounds}/{max_clarification_rounds}): ").strip()if not user_response:logger.warning("空回复,结束澄清")return final_state# 恢复工作流 - 递归调用current_state = final_state.copy()current_state["messages"] = final_state["messages"] + [{"role": "user", "content": user_response}]# 递归恢复执行return await run_agent_workflow_async(user_input=user_response,initial_state=current_state, # 传递中断状态**kwargs)
2.2.2 中断状态管理
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):"""中断状态管理"""# 澄清历史管理clarification_history = state.get("clarification_history", [])# 处理用户响应 - 中断恢复后的状态更新if clarification_rounds > 0:last_message = state.get("messages", [])[-1] if state.get("messages") else None# 提取用户响应内容if last_message:if isinstance(last_message, dict) and last_message.get("role") == "user":clarification_history.append(last_message["content"])elif hasattr(last_message, "content"):clarification_history.append(last_message.content)# 构建澄清上下文clarification_context = f"""继续澄清(第{clarification_rounds}/{max_clarification_rounds}轮):用户最新回复: {current_response}询问剩余缺失的维度。不要重复问题或开始新话题。"""messages.append({"role": "system", "content": clarification_context})
2.3 工具调用中断
2.3.1 工具定义
源码文件路径: src/graph/nodes.py
from langchain_core.tools import tool
from typing import Annotated@tool
def handoff_to_planner(research_topic: Annotated[str, "要移交的研究任务主题"],locale: Annotated[str, "用户检测到的语言区域设置"],
):"""移交给规划器智能体进行计划制定 - 工具中断"""# 工具不返回实际内容,仅用于信号传递return@tool
def handoff_after_clarification(locale: Annotated[str, "用户检测到的语言区域设置"],
):"""澄清轮次完成后移交给规划器 - 澄清结束信号"""return
2.3.2 工具调用处理
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):"""工具调用中断处理"""# 绑定工具if enable_clarification:tools = [handoff_to_planner, handoff_after_clarification]else:tools = [handoff_to_planner]# 调用LLM with工具response = (get_llm_by_type(AGENT_LLM_MAP["coordinator"]).bind_tools(tools).invoke(messages))# 处理工具调用 - 中断信号检测if response.tool_calls:try:for tool_call in response.tool_calls:tool_name = tool_call.get("name", "")tool_args = tool_call.get("args", {})if tool_name in ["handoff_to_planner", "handoff_after_clarification"]:logger.info("移交给规划器")goto = "planner"# 提取工具参数if tool_args.get("locale") and tool_args.get("research_topic"):locale = tool_args.get("locale")research_topic = tool_args.get("research_topic")breakexcept Exception as e:logger.error(f"处理工具调用时出错: {e}")goto = "planner"
3. 路由和中断的协同工作
3.1 状态驱动的协同
源码文件路径: src/graph/nodes.py
def coordinator_node(state: State, config: RunnableConfig):"""路由和中断的协同工作示例"""# 状态检查enable_clarification = state.get("enable_clarification", False)clarification_rounds = state.get("clarification_rounds", 0)# 协同逻辑1: 中断条件检查影响路由if (enable_clarification and not response.tool_calls and response.content and clarification_rounds < max_clarification_rounds):# 中断: 暂停工作流return Command(update={"clarification_rounds": clarification_rounds + 1,"__interrupt__": [("coordinator", response.content)],},goto="__end__", # 路由: 结束等待输入)# 协同逻辑2: 工具调用决定路由if response.tool_calls:goto = "planner" # 路由: 移交给规划器else:goto = "__end__" # 路由: 结束流程# 协同逻辑3: 背景调查路由修正if goto == "planner" and state.get("enable_background_investigation"):goto = "background_investigator" # 路由: 先进行背景调查return Command(update={"goto": goto,"is_clarification_complete": goto != "coordinator",},goto=goto,)
3.2 错误处理中的路由
源码文件路径: src/graph/nodes.py
def human_feedback_node(state):"""错误处理中的路由决策"""try:# 尝试解析计划current_plan = repair_json_output(current_plan)new_plan = json.loads(current_plan)# 成功路由return Command(update={"current_plan": Plan.model_validate(new_plan)},goto="research_team",)except json.JSONDecodeError:logger.warning("规划器响应不是有效的JSON")# 错误路由决策if plan_iterations > 1:return Command(goto="reporter") # 路由: 生成报告else:return Command(goto="__end__") # 路由: 结束流程
4. 核心实现总结
4.1 条件路由核心要素
- 状态检查: 基于State对象的字段进行条件判断
- 业务逻辑: 根据计划完成度、步骤类型等业务状态路由
- Command控制: 使用Command对象精确控制路由和状态更新
- 条件边配置: 在图构建时定义条件边和路由函数
4.2 中断机制核心要素
- interrupt函数: LangGraph提供的中断原语
- __interrupt__键: 在Command更新中标记中断点
- 状态恢复: 通过递归调用和状态传递实现中断恢复
- 工具信号: 使用工具调用作为中断和恢复的信号
4.3 协同工作模式
- 状态驱动: 路由决策基于状态,中断影响状态
- 信号传递: 工具调用、LLM响应作为路由和中断信号
- 递归恢复: 中断后通过递归调用恢复工作流
- 错误处理: 异常情况下的路由降级和恢复策略
这种设计使得DeerFlow能够实现复杂的人机交互和智能工作流控制,同时保持系统的稳定性和可维护性。
