当前位置: 首页 > news >正文

【Agent的革命之路——LangGraph】工作流中的 Command 模式

这篇文章我们将 LangGraph中的控制流(边)和状态更新(节点)结合起来使用。比如,我们希望同时执行状态更新并决定下一步要转到哪个节点,且这些操作在同一个节点中完成。而正好 LangGraph 提供了一种方法,可以通过从节点函数返回一个 Command 对象来实现这一点。

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # state update
        update={"foo": "bar"},
        # control flow
        goto="my_other_node"
    )

如果我们使用的是 subgraphs(子图) 我们还可以通过 Command 中指定 graph=Command.PARENT这种方式,实现节点从一个子图导航到其他子图。

def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # where `other_subgraph` is a node in the parent graph
        graph=Command.PARENT
    )

当我们从子图节点向父图节点发送更新的时候,且更新的键同时存在于父图和子图的状态模式中时,我们必须为父图状态中正在更新的键定义一个 reducer。
下面我们开始我们的例子。

定义节点和State

定义 graph State:

class State(TypedDict):
    foo: str

定义节点A,

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    print("Called A")
    value = random.choice(["a", "b"])
    # 判断应该跳转到哪个节点
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # Command 允许我们同时更新图状态并路由到下一个节点。
    return Command(
        # 更新图中的状态
        update={"foo": value},
        # 替换edge
        goto=goto,
    )

定义节点b或者c:

def node_b(state: State):
    print("Called B")
    return {"foo": state["foo"] + "b"}


def node_c(state: State):
    print("Called C")
    return {"foo": state["foo"] + "c"}

定义好了上面节点之后,现在,我们可以使用上述节点创建 StateGraph。需要注意的是,该图没有定义条件的路由edge!这是因为控制流是用 Command 内部 node_a 定义的。

builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()

不知道大家注意到没有,我们的 node_a 使用了 Command 作为返回类型注解,例如 Command[Literal[“node_b”, “node_c”]]。这一步对于图的渲染是非常必要的,它会告诉 LangGraph:node_a 可以导航到 node_b 和 node_c。
现在我们看看图的·1展示效果:

from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述
我们来试试多次调用graph:

graph.invoke({"foo": ""})

将会看到,graph会根据节点A中的随机选择,采用不同的路径(A-> B或A-> C)。

导航到 parent graph 中的节点

现在,让我们演示如何从 subgraph 内部导航到 parent graph 中的不同节点。我们将通过将上述示例中的 node_a 转换为一个单节点图来实现这一点,然后将其作为子图添加到 parent graph 中。

import operator
from typing_extensions import Annotated


class State(TypedDict):
    # N定义一个reducer 自动附加消息
    foo: Annotated[str, operator.add]


def node_a(state: State):
    print("Called A")
    value = random.choice(["a", "b"])
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # Command 允许我们同时更新图状态并路由到下一个节点。
    return Command(
        update={"foo": value},
        goto=goto,
        # 这会告诉 LangGraph 导航到父图中的 node_b 或 node_c 这将导航到相对于子图的最近的父图
        graph=Command.PARENT,
    )

subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()

def node_b(state: State):
    print("Called B")
    # 由于我们已经定义了一个 reducer,因此不需要手动将新字符附加到现有的 foo 值上。相反,reducer 会自动(通过 operator.add)将这些值附加到现有值中。
    return {"foo": "b"}


def node_c(state: State):
    print("Called C")
    return {"foo": "c"}

然后我们来构造graph:

builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()

查看效果:

Called A
Called C
{'foo': 'bc'}

到这里我们的代码就结束了,我们通过这种方式展示了如何使用 LangGraph 的 Command 对象在子图中更新状态并导航到父图中的不同节点。并通过 reducer 简化状态管理,就是自动保存消息链。

相关文章:

  • 【C语言】(一)数据在计算机中的存储与表示
  • 解决Excel文件格式损坏问题:如何通过程序读取并复制内容
  • MySQL 中的索引数量是否越多越好?
  • hot100_139. 单词拆分
  • Linux-Ansible模块进阶
  • 【我的Android进阶之旅】Android Studio SDK Update Site 国内的腾讯云镜像配置指南
  • 【Arduino小项目】控制步进电机
  • 通俗易懂的DOM1级标准介绍
  • YOLOv8与DAttention机制的融合:复杂场景下目标检测性能的增强
  • 车载诊断架构 --- LIN节点路由转发注意事项
  • 中兴G7615AV5
  • 直角三角堰计算公式
  • AutoGen 技术博客系列 八:深入剖析 Swarm—— 智能体协作的新范式
  • Linux操作系统4-进程间通信5(共享内存实现两个进程通信)
  • 【多模态处理篇二】【深度揭秘:DeepSeek视频理解之时空注意力机制解析】
  • 2025年华为手机解锁BL的方法
  • 函数指针和函数名在内存中是如何表示的
  • 计算机专业知识【揭开汇编的神秘面纱:从基础概念到实际应用】
  • VMware虚拟机手动安装VMware Tools
  • 合并区间(56)
  • 中国难以承受高关税压力?外交部:任何外部冲击都改变不了中国经济基本面
  • 央行、证监会:科技创新债券含公司债券、企业债券、非金融企业债务融资工具等
  • 联合国秘书长吁印巴“最大程度克制”,特朗普:遗憾,希望尽快结束冲突
  • 上海市委常委会扩大会议传达学习习近平总书记考察上海重要讲话和在部分省区市“十五五”时期经济社会发展座谈会上的重要讲话精神
  • 被炒热的“高潮针”:超适应症使用,安全性和有效性存疑
  • 媒体:南京秦淮区卫健委回应一医院涉嫌违规提供试管婴儿服务