LangGraph 源码学习总结 1-Graph结构
PregelProtocol、Pregel、CompiledGraph 与 CompiledStateGraph 的架构解析
-
PregelProtocol
角色:最底层“通信协议”
功能:- 定义了
invoke/stream/astream/get_state/update_state
等抽象接口,相当于“图必须长什么样”。 - 本身不带任何实现,只规定“图必须能跑、能流式输出、能读写状态”。
- 定义了
-
Pregel
角色:通用图执行引擎
功能:- 完整实现了 PregelProtocol,用“channel + node”模型跑任意 DAG。
- 节点就是普通 Runnable,边通过 Channel 隐式连接,数据以消息形式在 Channel 中流动。
- 无状态概念,输入/输出都是 dict,内部状态只在一次 invoke 内有效。
差异: - 最灵活,但写起来像“拼乐高”,需手动声明 channel 和节点。
-
CompiledGraph
角色:把“声明式图”编译成可执行 Pregel 的中间层
功能:- 由
Graph.compile()
返回,内部把“边+节点”翻译成 Pregel 所需的 channel 与 PregelNode。 - 对用户仍暴露 PregelProtocol 接口(invoke/stream…),但隐藏了 channel 细节。
差异: - 你不再手写 Channel,只需
.add_node
/.add_edge
,编译器帮你生成底层 Pregel。
- 由
-
CompiledStateGraph
角色:带“状态模式”的 CompiledGraph
功能:- 在 CompiledGraph 基础上额外实现“状态读写”与“状态模式校验”。
- 要求节点函数签名
NodeFunc(State) -> State|UpdateDict
,自动做状态合并/写回。 - 支持
get_state/update_state
回溯、断点续跑、人机交互。
差异: - 比 CompiledGraph 多一个“状态模式(schema)”,所有节点共享同一份结构化状态,适合多步对话、Agent 循环等场景。
一句话总结
- PregelProtocol:接口合同
- Pregel:最通用、无状态、手动连 channel
- CompiledGraph:帮你把“图”翻译成 Pregel,省掉 channel 细节
- CompiledStateGraph:在 CompiledGraph 之上再加“共享状态模式”,支持状态持久化与断点续跑
Channel
Channel 是“带类型的、可订阅的、可写的、可缓存的”数据槽。
- 类型:声明它只能存什么(str/int/list…)。
- 可订阅:节点用
Channel.subscribe_to("槽名")
表示“我要读这个槽”。 - 可写:节点用
Channel.write_to("槽名")
表示“我把结果写进这个槽”。 - 可缓存:一次 [invoke](file://src/pregel/p_1.py#19#4) 里,槽里的值会一直留着,供后续节点反复读。
传统 DAG 要显式画边:
A ──edge──> B
在 Pregel 里,边被“槽”取代:
- 节点 A 把结果
write_to("x")
- 节点 B 用
subscribe_to("x")
于是 A→B 的依赖就“隐式”成立了——不需要在代码里写add_edge("A","B")
,只要它们读写同一个槽名即可。
Channel类
Channel 是 Pregel 框架中实现 数据流驱动执行 的核心工具类,通过声明式订阅(subscribe_to)和写入(write_to)机制,隐式连接节点(PregelNode)与数据流。
通过订阅和写入机制实现节点间松耦合通信,开发者无需手动管理数据流,只需声明依赖关系即可构建动态 DAG
class Channel:@overload@classmethoddef subscribe_to(cls,channels: str,*,key: str | None = None,tags: list[str] | None = None,) -> PregelNode: ...@overload@classmethoddef subscribe_to(cls,channels: Sequence[str],*,key: None = None,tags: list[str] | None = None,) -> PregelNode: ...@classmethoddef subscribe_to(cls,channels: str | Sequence[str],*,key: str | None = None,tags: list[str] | None = None,) -> PregelNode:"""Runs process.invoke() each time channels are updated,with a dict of the channel values as input."""if not isinstance(channels, str) and key is not None:raise ValueError("Can't specify a key when subscribing to multiple channels")return PregelNode(channels=cast(Union[list[str], Mapping[str, str]],({key: channels}if isinstance(channels, str) and key is not Noneelse ([channels]if isinstance(channels, str)else {chan: chan for chan in channels})),),triggers=[channels] if isinstance(channels, str) else channels,tags=tags,)@classmethoddef write_to(cls,*channels: str,**kwargs: WriteValue,) -> ChannelWrite:"""Writes to channels the result of the lambda, or None to skip writing."""return ChannelWrite([ChannelWriteEntry(c) for c in channels]+ [(ChannelWriteEntry(k, mapper=v)if callable(v)else ChannelWriteEntry(k, value=v))for k, v in kwargs.items()])
Channel.subscribe_to(“a”) 返回的确实是一个 PregelNode 实例,而不是“Channel”本身。
它只是用 “槽名” 去构造了一个 可运行节点(PregelNode),该节点在运行时会:
- 从名为 “a” 的 真实 Channel 实例 里读当前值;
把读到的值喂给后面的 Runnable(这里是 lambda x: x + x);
最终通过 Channel.write_to(“b”) 把结果写到名为 “b” 的 Channel 里。
所以“Channel”在这里只是 工厂类,负责生成“能读写 Channel 的节点”,真正的数据槽仍是你在 Pregel(…, channels={…}) 里声明的那些 BaseChannel 对象。
使用Pregel创建单结点图
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, Channel, ChannelWriteEntrynode1 = (Channel.subscribe_to("a")| (lambda x: x + x)| Channel.write_to("b")
)app = Pregel(nodes={"node1": node1},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},input_channels=["a"],output_channels=["b"],
)result = app.invoke({"a": "foo"})
print(result)
代码解释
# 初始槽状态:a="foo", b=None
node1 订阅 a,读到 "foo" → 计算得 "foofoo" → 写回 b
# 槽状态:a="foo", b="foofoo"
没有节点再订阅 b,流程结束,输出 b 的值 "foofoo"
整个过程中,数据就像“消息”一样从槽 a 流出,经过节点加工,再流入槽 b——这就是“在 Channel 中流动”。