LangGraph(八)——LangGraph运行时
目录
- 1. 引言
- 2. Pregel(了解)
- 2.1 简介
- 2.1.1 Actors
- 2.1.2 Channels
- 2.2 Pregel API示例(这里是源代码里的示例)
- 2.2.1 单个节点
- 2.2.2 多个节点
- 2.2.3 Topic Channel
- 2.2.4 BinaryOperatorAggregate Channel
- 2.2.5 循环
- 3. 高级API
- 3.1 Graph API
- 3.2 Functional API
- 参考
1. 引言
Pregel实现了LangGraph的运行时,管理LangGraph应用程序的执行。编译一个StateGraph或创建entrypoint都会生成一个Pregel实例,该实例可以接受输入后被调用。
2. Pregel(了解)
2.1 简介
在LangGraph中,Pregel将actors和channels合并为一个应用程序。Actors从channels读取数据并将其写入channels。Pregel将应用程序的执行组织成多个步骤,遵循Pregel算法/批量同步并行模型。
每个步骤包含三个阶段:
1. 计划:确定本步骤中需要执行的actors。例如,在第一步中选择订阅特定输入channel的actors,在后续步骤中选择订阅上一步更新channels的actors。
2. 执行:并行执行所有选定的actors,直到所有actors完成、一个actor失败或达到超时为止。在该阶段,channels更新对actors是不可见的,直到下一步。
3. 更新:使用actors在本步骤写入的值更新channels。
重复执行,直到没有actors被选中执行,或者达到最大步骤数。
2.1.1 Actors
一个actor是一个PregelNode。它订阅channels,从channels中读取数据,并将数据写入到channels。可以将其视为Pregel算法中的actor。PregelNode实现了LangChain的Runnable接口。
2.1.2 Channels
channels用于在actors之间进行通信。每个channel都有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。channels可用于从一个链向另一个链发送数据,或从一个链在未来步骤中向自身发送数据。LangGraph提供了多种内置channels:
1. LastValue:默认channel,存储发送到channel的最后一个值,适用于输入和输出值,或用于从一步向下一步发送数据。
2. Topic:一个可配置的PubSub主题,适用于在actors之间发送多个值,或用于累积输出。可以配置为去重值或在多个步骤期间累积值。
3. BinaryOperatorAggregate:存储一个持久值,通过将二进制运算符应用于当前值和发送到channel的每个更新来更新值,适用于在多个步骤中计算聚合。例如:total = BinaryOperatorAggregate(int, operator.add)
2.2 Pregel API示例(这里是源代码里的示例)
2.2.1 单个节点
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)app = Pregel(nodes={"node1": node1},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},input_channels=["a"],output_channels=["b"],
)print(app.invoke({"a": "foo"}))
运行结果为:
{'b': 'foofoo'}
2.2.2 多个节点
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| Channel.write_to("c")
)app = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": LastValue(str),"c": EphemeralValue(str),},input_channels=["a"],output_channels=["b", "c"],
)app.invoke({"a": "foo"})
运行结果为:
{'b': 'foofoo', 'c': 'foofoofoofoo'}
2.2.3 Topic Channel
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| {"b": Channel.write_to("b"),"c": Channel.write_to("c")}
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| {"c": Channel.write_to("c"),}
)app = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": Topic(str, accumulate=True),},input_channels=["a"],output_channels=["c"],
)app.invoke({"a": "foo"})
运行结果为:
{'c': ['foofoo', 'foofoofoofoo']}
2.2.4 BinaryOperatorAggregate Channel
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, Channelnode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| {"b": Channel.write_to("b"),"c": Channel.write_to("c")}
)node2 = (Channel.subscribe_to("b")| (lambda x: x + x)| {"c": Channel.write_to("c"),}
)def reducer(current, update):if current:return current + " | " + updateelse:return updateapp = Pregel(nodes={"node1": node1, "node2": node2},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),"c": BinaryOperatorAggregate(str, operator=reducer),},input_channels=["a"],output_channels=["c"]
)app.invoke({"a": "foo"})
运行结果为
{'c': 'foofoo | foofoofoofoo'}
2.2.5 循环
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWrite, ChannelWriteEntryexample_node = (Channel.subscribe_to("value")| (lambda x: x + x if len(x) < 10 else None)| ChannelWrite(writes=[ChannelWriteEntry(channel="value", skip_none=True)])
)app = Pregel(nodes={"example_node": example_node},channels={"value": EphemeralValue(str),},input_channels=["value"],output_channels=["value"]
)app.invoke({"value": "a"})
运行结果为:
{'value': 'aaaaaaaaaaaaaaaa'}
3. 高级API
3.1 Graph API
StateGraph是一种高级抽象,简化了Pregel应用程序的创建。它允许你定义一个由节点和边组成的图。当你编译图时,StateGraph API会自动为你创建Pregel应用程序。
from typing import TypedDict, Optionalfrom langgraph.constants import START
from langgraph.graph import StateGraphclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}def score_essay(essay: Essay):return {"score": 10}builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")graph = builder.compile()
print("Nodes:")
print(graph.nodes)
print("Edges:")
print(graph.channels)
运行结果:
Nodes:
{'__start__': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA000>, 'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA1E0>, 'score_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA360>}
Edges:
{'topic': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E0D40>, 'content': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3EC0>, 'score': <langgraph.channels.last_value.LastValue object at 0x000001FCEE3E3B80>, '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE4D0D40>, 'branch:to:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50ECC0>, 'branch:to:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE50C7C0>}
3.2 Functional API
在Functional API中,你可以使用entrypoint来创建一个Pregel应用程序。entrypoint装饰器允许你定义一个接受输入并返回输出的函数。
from typing import TypedDict, Optionalfrom langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypointclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]checkpointer = InMemorySaver()@entrypoint(checkpointer=checkpointer)
def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}print("Nodes: ")
print(write_essay.nodes)
print("Channels: ")
print(write_essay.channels)
运行结果为:
Nodes:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x000001FCEDFDA5A0>}
Channels:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001FCEE532140>, '__end__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE310680>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x000001FCEE5B31C0>}
参考
https://langchain-ai.github.io/langgraph/concepts/low_level/