当前位置: 首页 > news >正文

DeerFlow多智能体项目分析-依赖LangGraph实现条件路由和中断机制的源码解析

DeerFlow依赖LangGraph实现条件路由和中断机制的源码解析

概述

DeerFlow基于LangGraph构建了复杂的条件路由和中断机制,实现了智能的工作流控制和人机交互。本文档深入分析这两个核心机制的实现逻辑和关键代码。这部分也同样是langGraph的核心,通过使用LangGraph的特色功能,实现智能体交互中的中断和人工校准(辅助更精准的计划制定,其实也可以理解为任务拆解)。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/

1. 条件路由机制

1.1 条件路由架构设计

条件路由是DeerFlow工作流控制的核心,通过状态检查和业务逻辑判断来决定下一个执行节点。

1.1.1 路由决策函数

源码文件路径: src/graph/builder.py

def continue_to_running_research_team(state: State):"""研究团队路由决策函数 - 核心路由逻辑"""current_plan = state.get("current_plan")# 检查1: 计划是否存在if not current_plan or not current_plan.steps:return "planner"# 检查2: 所有步骤是否完成if all(step.execution_res for step in current_plan.steps):return "planner"  # 返回规划器生成报告# 检查3: 根据未完成步骤类型路由for step in current_plan.steps:if not step.execution_res:  # 找到第一个未完成步骤if step.step_type == StepType.RESEARCH:return "researcher"  # 路由到研究员if step.step_type == StepType.PROCESSING:return "coder"       # 路由到编程员return "planner"  # 默认返回规划器
1.1.2 条件边配置

源码文件路径: src/graph/builder.py

def _build_base_graph():"""构建基础状态图 - 条件边配置"""builder = StateGraph(State)# 核心条件边: 研究团队路由builder.add_conditional_edges("research_team",                    # 源节点continue_to_running_research_team,  # 路由决策函数["planner", "researcher", "coder"], # 可能的目标节点)# 协调器条件边: 支持澄清功能builder.add_conditional_edges("coordinator",lambda state: state.get("goto", "planner"),  # 动态路由["planner", "background_investigator", "coordinator", END],)return builder

1.2 协调器动态路由

1.2.1 澄清功能路由逻辑

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):"""协调器节点 - 复杂的条件路由实现"""enable_clarification = state.get("enable_clarification", False)# 路由分支1: 澄清功能禁用if not enable_clarification:# 直接路由到规划器goto = "planner"if state.get("enable_background_investigation"):goto = "background_investigator"# 路由分支2: 澄清功能启用else:clarification_rounds = state.get("clarification_rounds", 0)max_clarification_rounds = state.get("max_clarification_rounds", 3)# 子路由1: 继续澄清对话if (not response.tool_calls and response.content and clarification_rounds < max_clarification_rounds):goto = "__end__"  # 中断等待用户输入# 子路由2: 澄清完成或达到最大轮次else:goto = "planner"if state.get("enable_background_investigation"):goto = "background_investigator"# 返回Command对象控制路由return Command(update={"goto": goto,# ... 其他状态更新},goto=goto,)
1.2.2 澄清状态检查函数

源码文件路径: src/graph/nodes.py

def needs_clarification(state: dict) -> bool:"""澄清需求检查 - 路由决策辅助函数"""# 检查1: 澄清功能是否启用if not state.get("enable_clarification", False):return False# 检查2: 获取澄清状态clarification_rounds = state.get("clarification_rounds", 0)is_clarification_complete = state.get("is_clarification_complete", False)max_clarification_rounds = state.get("max_clarification_rounds", 3)# 检查3: 综合判断是否需要继续澄清return (clarification_rounds > 0                        # 已开始澄清and not is_clarification_complete               # 澄清未完成and clarification_rounds <= max_clarification_rounds  # 未超过最大轮次)

1.3 Command对象路由控制

1.3.1 规划器节点路由

源码文件路径: src/graph/nodes.py

def planner_node(state: State, config: RunnableConfig):"""规划器节点 - Command路由控制"""plan_iterations = state.get("plan_iterations", 0)# 路由条件1: 超过最大迭代次数if plan_iterations >= configurable.max_plan_iterations:return Command(goto="reporter")# 路由条件2: 计划包含足够上下文if isinstance(curr_plan, dict) and curr_plan.get("has_enough_context"):return Command(update={"messages": [AIMessage(content=full_response, name="planner")],"current_plan": new_plan,},goto="reporter",  # 直接生成报告)# 路由条件3: 需要人工反馈return Command(update={"messages": [AIMessage(content=full_response, name="planner")],"current_plan": full_response,},goto="human_feedback",  # 等待人工审核)
1.3.2 人工反馈路由

源码文件路径: src/graph/nodes.py

def human_feedback_node(state):"""人工反馈节点 - 基于反馈内容的路由"""auto_accepted_plan = state.get("auto_accepted_plan", False)if not auto_accepted_plan:feedback = interrupt("请审核计划。")# 路由分支1: 编辑计划if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):return Command(update={"messages": [HumanMessage(content=feedback, name="feedback")]},goto="planner",  # 返回规划器修改)# 路由分支2: 接受计划elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):goto = "research_team"  # 开始执行研究# 计划验证和路由try:new_plan = json.loads(repair_json_output(current_plan))return Command(update={"current_plan": Plan.model_validate(new_plan),"plan_iterations": plan_iterations + 1,},goto=goto,)except json.JSONDecodeError:# 错误处理路由if plan_iterations > 1:return Command(goto="reporter")else:return Command(goto="__end__")

2. 中断机制

2.1 中断机制架构

中断机制允许工作流在特定点暂停,等待外部输入或处理,是实现人机交互的关键。

2.1.1 基础中断实现

源码文件路径: src/graph/nodes.py

from langgraph.types import interruptdef human_feedback_node(state):"""基础中断机制 - 等待人工反馈"""auto_accepted_plan = state.get("auto_accepted_plan", False)if not auto_accepted_plan:# 核心中断调用feedback = interrupt("请审核计划。")# 中断后的处理逻辑if feedback:# 根据反馈内容决定后续流程if str(feedback).upper().startswith("[EDIT_PLAN]"):# 处理编辑请求return Command(update={"messages": [HumanMessage(content=feedback, name="feedback")]},goto="planner",)elif str(feedback).upper().startswith("[ACCEPTED]"):# 处理接受请求logger.info("用户接受了计划。")
2.1.2 澄清功能中断

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):"""澄清功能中断 - 复杂中断处理"""enable_clarification = state.get("enable_clarification", False)if enable_clarification:clarification_rounds = state.get("clarification_rounds", 0)max_clarification_rounds = state.get("max_clarification_rounds", 3)# 中断条件: LLM提出澄清问题if not response.tool_calls and response.content:if clarification_rounds < max_clarification_rounds:# 使用__interrupt__键实现中断return Command(update={"messages": state_messages,"clarification_rounds": clarification_rounds + 1,"is_clarification_complete": False,"__interrupt__": [("coordinator", response.content)],  # 中断标记},goto="__end__",  # 结束当前流程等待输入)else:# 达到最大轮次,不再中断logger.warning(f"达到最大澄清轮次({max_clarification_rounds})。移交给规划器。")goto = "planner"

2.2 中断恢复机制

2.2.1 澄清对话恢复

源码文件路径: src/workflow.py

async def run_agent_workflow_async(user_input: str, **kwargs):"""中断恢复 - 澄清对话继续"""# 执行工作流async for s in graph.astream(input=initial_state, config=config, stream_mode="values"):final_state = s# ... 处理流式输出# 检查是否需要澄清 - 中断恢复逻辑if final_state and isinstance(final_state, dict):from src.graph.nodes import needs_clarificationif needs_clarification(final_state):# 等待用户输入 - 中断点clarification_rounds = final_state.get("clarification_rounds", 0)max_clarification_rounds = final_state.get("max_clarification_rounds", 3)user_response = input(f"您的回复 ({clarification_rounds}/{max_clarification_rounds}): ").strip()if not user_response:logger.warning("空回复,结束澄清")return final_state# 恢复工作流 - 递归调用current_state = final_state.copy()current_state["messages"] = final_state["messages"] + [{"role": "user", "content": user_response}]# 递归恢复执行return await run_agent_workflow_async(user_input=user_response,initial_state=current_state,  # 传递中断状态**kwargs)
2.2.2 中断状态管理

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):"""中断状态管理"""# 澄清历史管理clarification_history = state.get("clarification_history", [])# 处理用户响应 - 中断恢复后的状态更新if clarification_rounds > 0:last_message = state.get("messages", [])[-1] if state.get("messages") else None# 提取用户响应内容if last_message:if isinstance(last_message, dict) and last_message.get("role") == "user":clarification_history.append(last_message["content"])elif hasattr(last_message, "content"):clarification_history.append(last_message.content)# 构建澄清上下文clarification_context = f"""继续澄清(第{clarification_rounds}/{max_clarification_rounds}轮):用户最新回复: {current_response}询问剩余缺失的维度。不要重复问题或开始新话题。"""messages.append({"role": "system", "content": clarification_context})

2.3 工具调用中断

2.3.1 工具定义

源码文件路径: src/graph/nodes.py

from langchain_core.tools import tool
from typing import Annotated@tool
def handoff_to_planner(research_topic: Annotated[str, "要移交的研究任务主题"],locale: Annotated[str, "用户检测到的语言区域设置"],
):"""移交给规划器智能体进行计划制定 - 工具中断"""# 工具不返回实际内容,仅用于信号传递return@tool  
def handoff_after_clarification(locale: Annotated[str, "用户检测到的语言区域设置"],
):"""澄清轮次完成后移交给规划器 - 澄清结束信号"""return
2.3.2 工具调用处理

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):"""工具调用中断处理"""# 绑定工具if enable_clarification:tools = [handoff_to_planner, handoff_after_clarification]else:tools = [handoff_to_planner]# 调用LLM with工具response = (get_llm_by_type(AGENT_LLM_MAP["coordinator"]).bind_tools(tools).invoke(messages))# 处理工具调用 - 中断信号检测if response.tool_calls:try:for tool_call in response.tool_calls:tool_name = tool_call.get("name", "")tool_args = tool_call.get("args", {})if tool_name in ["handoff_to_planner", "handoff_after_clarification"]:logger.info("移交给规划器")goto = "planner"# 提取工具参数if tool_args.get("locale") and tool_args.get("research_topic"):locale = tool_args.get("locale")research_topic = tool_args.get("research_topic")breakexcept Exception as e:logger.error(f"处理工具调用时出错: {e}")goto = "planner"

3. 路由和中断的协同工作

3.1 状态驱动的协同

源码文件路径: src/graph/nodes.py

def coordinator_node(state: State, config: RunnableConfig):"""路由和中断的协同工作示例"""# 状态检查enable_clarification = state.get("enable_clarification", False)clarification_rounds = state.get("clarification_rounds", 0)# 协同逻辑1: 中断条件检查影响路由if (enable_clarification and not response.tool_calls and response.content and clarification_rounds < max_clarification_rounds):# 中断: 暂停工作流return Command(update={"clarification_rounds": clarification_rounds + 1,"__interrupt__": [("coordinator", response.content)],},goto="__end__",  # 路由: 结束等待输入)# 协同逻辑2: 工具调用决定路由if response.tool_calls:goto = "planner"  # 路由: 移交给规划器else:goto = "__end__"  # 路由: 结束流程# 协同逻辑3: 背景调查路由修正if goto == "planner" and state.get("enable_background_investigation"):goto = "background_investigator"  # 路由: 先进行背景调查return Command(update={"goto": goto,"is_clarification_complete": goto != "coordinator",},goto=goto,)

3.2 错误处理中的路由

源码文件路径: src/graph/nodes.py

def human_feedback_node(state):"""错误处理中的路由决策"""try:# 尝试解析计划current_plan = repair_json_output(current_plan)new_plan = json.loads(current_plan)# 成功路由return Command(update={"current_plan": Plan.model_validate(new_plan)},goto="research_team",)except json.JSONDecodeError:logger.warning("规划器响应不是有效的JSON")# 错误路由决策if plan_iterations > 1:return Command(goto="reporter")    # 路由: 生成报告else:return Command(goto="__end__")     # 路由: 结束流程

4. 核心实现总结

4.1 条件路由核心要素

  1. 状态检查: 基于State对象的字段进行条件判断
  2. 业务逻辑: 根据计划完成度、步骤类型等业务状态路由
  3. Command控制: 使用Command对象精确控制路由和状态更新
  4. 条件边配置: 在图构建时定义条件边和路由函数

4.2 中断机制核心要素

  1. interrupt函数: LangGraph提供的中断原语
  2. __interrupt__键: 在Command更新中标记中断点
  3. 状态恢复: 通过递归调用和状态传递实现中断恢复
  4. 工具信号: 使用工具调用作为中断和恢复的信号

4.3 协同工作模式

  • 状态驱动: 路由决策基于状态,中断影响状态
  • 信号传递: 工具调用、LLM响应作为路由和中断信号
  • 递归恢复: 中断后通过递归调用恢复工作流
  • 错误处理: 异常情况下的路由降级和恢复策略

这种设计使得DeerFlow能够实现复杂的人机交互和智能工作流控制,同时保持系统的稳定性和可维护性。

http://www.dtcms.com/a/524049.html

相关文章:

  • 【JUnit实战3_10】第六章:关于测试的质量(上)
  • 容器编排大王Kubernetes——helm包管理工具(8)
  • 南皮县网站建设php网站开发接口开发
  • 【AOA定位与UKF例程】到达角度(AOA)定位后,使用无迹卡尔曼滤波(UKF)对轨迹滤波,MATLAB例程可下载
  • 拒绝笨重,一款轻量、极致简洁的开源CI/CD工具 - Arbess
  • JavaWeb--Servlet
  • 【机器学习】15.深度聚类(Deep Clustering)原理讲解与实战
  • Atom编辑器下载安装图文教程(附安装包)
  • 【基础复习1】ROC 与 AUC:逻辑回归二分类例子
  • 【Angular 】Angular 中的依赖注入
  • 做门户网站需要什么条件文化传播公司网站模版
  • 马斯克公司推出视频模型 Imagine v0.9,实测解析
  • 扶风做企业网站网站建设平台安全问题有哪些方面
  • 【AI4S】Motif-Driven Contrastive Learning of Graph Representations
  • Flutter路由使用指南
  • husky vs lefthook:新一代 Git Hooks 工具全面对比
  • Go Web 编程快速入门 07 - 模板(1):语法与最佳实践
  • 聊城网站建设信息兴义网站建设网站建设
  • 今年前三季度浙江进出口总值首破四万亿元
  • 【一文了解】八大排序-插入排序、希尔排序
  • n8n数据存储在postgres
  • 数据结构——冒泡排序
  • 医疗连续体机器人模块化控制界面设计(2025年更新版Python库)
  • 做网站服务器需要系统wordpress折腾怕了
  • 022数据结构之树状数组——算法备赛
  • 从 TypeScript 到 Java(4):访问修饰符与作用域 —— Java 的封装哲学
  • 做网站要有什么团队上海网站营销推广
  • 残差网络的介绍及ResNet-18的搭建(pytorch版)
  • WPF绘制界面常用功能
  • vbs笔记 【未完更】