当前位置: 首页 > news >正文

LangGraph(八)——LangGraph运行时

目录

  • 1. 引言
  • 2. Pregel(了解
    • 2.1 简介
      • 2.1.1 Actors
      • 2.1.2 Channels
    • 2.2 Pregel API示例(这里是源代码里的示例)
      • 2.2.1 单个节点
      • 2.2.2 多个节点
      • 2.2.3 Topic Channel
      • 2.2.4 BinaryOperatorAggregate Channel
      • 2.2.5 循环
  • 3. 高级API
    • 3.1 Graph API
    • 3.2 Functional API
  • 参考

1. 引言

  Pregel实现了LangGraph的运行时,管理LangGraph应用程序的执行。编译一个StateGraph或创建entrypoint都会生成一个Pregel实例,该实例可以接受输入后被调用。

2. Pregel(了解

2.1 简介

  在LangGraph中,Pregel将actors和channels合并为一个应用程序。Actors从channels读取数据并将其写入channels。Pregel将应用程序的执行组织成多个步骤,遵循Pregel算法/批量同步并行模型。
  每个步骤包含三个阶段:
  1. 计划:确定本步骤中需要执行的actors。例如,在第一步中选择订阅特定输入channel的actors,在后续步骤中选择订阅上一步更新channels的actors。
  2. 执行:并行执行所有选定的actors,直到所有actors完成、一个actor失败或达到超时为止。在该阶段,channels更新对actors是不可见的,直到下一步。
  3. 更新:使用actors在本步骤写入的值更新channels。
  重复执行,直到没有actors被选中执行,或者达到最大步骤数。

2.1.1 Actors

  一个actor是一个PregelNode。它订阅channels,从channels中读取数据,并将数据写入到channels。可以将其视为Pregel算法中的actor。PregelNode实现了LangChain的Runnable接口。

2.1.2 Channels

  channels用于在actors之间进行通信。每个channel都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。channels可用于从一个链向另一个链发送数据,或从一个链在未来步骤中向自身发送数据。LangGraph提供了多种内置channels:
  1. LastValue:默认channel,存储发送到channel的最后一个值,适用于输入和输出值,或用于从一步向下一步发送数据。
  2. Topic:一个可配置的PubSub主题,适用于在actors之间发送多个值,或用于累积输出。可以配置为去重值或在多个步骤期间累积值。
  3. BinaryOperatorAggregate:存储一个持久值,通过将二进制运算符应用于当前值和发送到channel的每个更新来更新值,适用于在多个步骤中计算聚合。例如:total = BinaryOperatorAggregate(int, operator.add)

2.2 Pregel API示例(这里是源代码里的示例)

2.2.1 单个节点

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)app = Pregel(nodes={"node1": node1},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},input_channels=["a"],output_channels=["b"],
)print(app.invoke({"a": "foo"}))

  运行结果为:

{'b': 'foofoo'}

2.2.2 多个节点

from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| Channel.write_to("c")
)app = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": LastValue(str),"c": EphemeralValue(str),},input_channels=["a"],output_channels=["b", "c"],
)app.invoke({"a": "foo"})

  运行结果为:

{'b': 'foofoo', 'c': 'foofoofoofoo'}

2.2.3 Topic Channel

from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| {"b": Channel.write_to("b"),"c": Channel.write_to("c")}
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| {"c": Channel.write_to("c"),}
)app = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": Topic(str, accumulate=True),},input_channels=["a"],output_channels=["c"],
)app.invoke({"a": "foo"})

  运行结果为:

{'c': ['foofoo', 'foofoofoofoo']}

2.2.4 BinaryOperatorAggregate Channel

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| {"b": Channel.write_to("b"),"c": Channel.write_to("c")}
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| {"c": Channel.write_to("c"),}
)def reducer(current, update):if current:return current + " | " + updateelse:return updateapp = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": BinaryOperatorAggregate(str, operator=reducer),},input_channels=["a"],output_channels=["c"]
)app.invoke({"a": "foo"})

  运行结果为

{'c': 'foofoo | foofoofoofoo'}

2.2.5 循环

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntryexample_node = (Channel.subscribe_to("value")| (lambda x: x + x if len(x) < 10 else None)| ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)app = Pregel(nodes={"example_node": example_node},channels={"value": EphemeralValue(str),},input_channels=["value"],output_channels=["value"]
)app.invoke({"value": "a"})

  运行结果为:

{'value': 'aaaaaaaaaaaaaaaa'}

3. 高级API

3.1 Graph API

  StateGraph是一种高级抽象,简化了Pregel应用程序的创建。它允许你定义一个由节点和边组成的图。当你编译图时,StateGraph API会自动为你创建Pregel应用程序。

from typing import TypedDict, Optionalfrom langgraph.constants import START
from langgraph.graph import StateGraphclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}def score_essay(essay: Essay):return {"score": 10}builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")graph = builder.compile()
print("Nodes:")
print(graph.nodes)
print("Edges:")
print(graph.channels)

  运行结果:

Nodes:
{'__start__': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA000>, 'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA1E0>, 'score_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA360>}
Edges:
{'topic': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E0D40>, 'content': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3EC0>, 'score': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3B80>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE4D0D40>, 'branch:to:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50ECC0>, 'branch:to:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50C7C0>}

3.2 Functional API

  在Functional API中,你可以使用entrypoint来创建一个Pregel应用程序。entrypoint装饰器允许你定义一个接受输入并返回输出的函数。

from typing import TypedDict, Optionalfrom langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypointclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]checkpointer = InMemorySaver()@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)

  运行结果为:

Nodes: 
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA5A0>}
Channels: 
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE532140>, '__end__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE310680>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE5B31C0>}

参考

https://langchain-ai.github.io/langgraph/concepts/low_level/

相关文章:

  • 博士论文写作笔记
  • 【大模型DA】Unified Language-driven Zero-shot Domain Adaptation
  • agent-zero: 打造你的AI专属AI助理
  • Canvas: trying to draw too large(256032000bytes) bitmap.
  • JavaScript 模块系统:CJS/AMD/UMD/ESM
  • QT/c++航空返修数据智能分析系统
  • Cocos 打包 APK 兼容环境表(Android API Level 10~15)
  • 【渲染】拆解《三国:谋定天下》场景渲染技术
  • 读《Go语言圣经记录》(二):深入理解Go语言的程序结构
  • 工作流引擎-06-流程引擎(Process Engine)对比 Flowable、Activiti 与 Camunda 全维度对比分析
  • 淘宝商品详情页有哪些常见的动态加载技术?
  • t018-高校宣讲会管理系统 【含源码!】
  • 大规模真实场景 WiFi 感知基准数据集
  • 子串题解——和为 K 的子数组【LeetCode】
  • C++11 智能指针:从原理到实现
  • 为什么badmin reconfig以后始终不能提交任务
  • C#语音录制:使用NAudio库实现语音录制功能详解
  • 【CBAP50技术手册】#32 Organizational Modelling(组织建模):BA(业务分析师)的“变革导航图”
  • Ubuntu取消开机用户自动登录
  • Practice 2025.6.1—— 二叉树进阶面试题(2)
  • 做网站为什么要用php框架/电脑上突然出现windows优化大师
  • php怎么做视频网站/网络优化工程师吃香吗
  • 中企动力网站/市场调研报告500字
  • 做网站游戏推广赚钱/东莞关键词优化推广
  • 建站申请/宁波seo推广如何收费
  • 明薇通网站建设/手机建立一个免费网站