当前位置: 首页 > news >正文

LangGraph framework

目录

  • Agent 架构
    • Router(路由)
    • 工具调用代理(Tool-calling Agent)
      • 工具调用
      • 记忆机制(Memory)
      • 规划机制(Planning)
    • 自定义 Agent 架构
      • 人类参与(Human-in-the-loop)
      • 并行化(Parallelization)
      • 子图(Subgraphs)
      • 反思机制(Reflection)
    • 工作流与智能体(Workflows and Agents)
      • 构建模块:增强型大语言模型(Augmented LLM)
      • 提示链(Prompt chaining)
      • 并行处理(Parallelization)
      • 路由(Routing)
      • 协调者-执行者(Orchestrator-Worker)
      • 评估者-优化者(Evaluator-optimizer)
      • Agent(智能体)
  • Graphs
    • Graph API
      • StateGraph
    • State
      • Schema
      • Reducers
      • 图状态中的消息处理
        • 在图中使用消息
        • 序列化
        • MessagesState
    • 节点(Nodes)
      • START 节点
      • END 节点
      • 节点缓存(Node Caching)
    • 边(Edges)
      • 普通边(Normal Edges)
      • 条件边(Conditional Edges)
      • 入口点(Entry Point)
      • 条件入口点(Conditional Entry Point)
    • Send(发送)
    • Command(命令对象)
    • 配置
      • 递归限制
    • LangGraph 运行时
    • 如何使用 Graph API
      • 定义与更新状态
        • 定义状态
        • 更新状态
      • 使用 Reducer 处理状态更新
      • 定义输入与输出 Schema
      • 在节点之间传递私有状态
      • 使用 Pydantic 模型作为状态
      • 添加运行时配置
      • 添加重试策略(Add retry policies)
      • 添加节点缓存
      • 创建步骤序列
      • 创建分支
        • 并行运行图节点
        • 延迟节点执行
        • 条件分支
        • Map-Reduce 与 Send API
        • 创建和控制循环
          • 设置递归限制
      • 异步(Async)
      • 结合控制流和状态更新 —— 使用 Command
        • 在父图中导航到某个节点
        • 使用工具内部更新状态
      • 可视化你的图
        • Mermaid
        • PNG 格式
  • 流式传输(Streaming)
    • 流式输出API(Stream outputs)
    • 流式传输图状态(Stream graph state)
    • 子图支持(Subgraphs)
    • 调试模式(Debugging)
    • LLM Tokens(语言模型 Token)
      • 按 LLM 调用标签过滤(Filter by LLM invocation)
      • 按节点过滤(Filter by node)
    • 流式传输自定义数据(Stream custom data)
    • 使用任意 LLM(Use with any LLM)
    • 针对特定聊天模型禁用流式传输
  • 持久化
    • 概念
      • 线程
      • 检查点
        • 获取状态
        • 获取状态历史
      • Replay
        • 更新状态(Update state)
      • 内存存储(Memory Store)
      • Checkpointer 库
        • Checkpointer 接口
        • 序列化器(Serializer)
    • 添加持久化
      • 添加短期内存
        • 使用子图(Subgraphs)
        • 在 Functional API 中使用短期内存【可选】
        • 管理检查点
      • 添加长期内存
        • 生产环境中使用长期存储
        • 使用语义搜索
  • 内存(Memory)
    • 概念
      • 短期内存
        • 管理长对话历史
          • 编辑消息列表
          • 总结历史对话
        • 何时删除消息
      • 长期记忆
      • 记忆类型
        • 语义记忆
        • 情景记忆
        • 程序记忆
        • 写入记忆
    • 管理记忆(Manage memory)
      • 裁剪消息(Trim messages)
      • 摘要消息(Summarize messages)
      • 删除消息(Delete messages)
  • 人类参与环节(Human-in-the-loop)
    • 添加人类参与环节
      • interrupt 函数
      • 设计范式
        • 批准或拒绝
        • 审核并编辑状态
        • 工具调用审核
        • 验证人工输入
      • 使用 Command 原语继续执行
      • 常见误区
        • 副作用
        • 作为函数调用的子图
        • 使用多个 interrupt
  • 断点(Breakpoints)
    • 静态断点
    • 动态断点
    • 与子图一起使用
  • 时间旅行
  • 工具(Tools)
    • 使用工具(Use tools)
      • 创建工具(Create tools)
      • 访问配置(Access config)
      • 短期记忆(Short-term memory)
        • 读取状态(Read state)
        • 更新状态(Update state)
      • 长期记忆(Long-term memory)
        • 读取长期记忆
        • 更新长期记忆
      • 将工具附加到模型(Attach tools to a model)
      • 使用工具(Use tools)
      • 使用预构建 Agent(Use prebuilt agent)
      • 使用预设 ToolNode
        • 错误处理
      • 处理大量工具

Agent 架构

许多 LLM 应用在调用大语言模型前或后,会实现特定的控制流程。例如,RAG(检索增强生成)在模型调用前会检索与用户问题相关的文档,并将这些文档传递给 LLM,从而让模型的回答基于文档上下文。

有时我们不想写死固定的控制流程,而是希望 LLM 系统自行选择流程来解决更复杂的问题。这就是“代理(Agent)”的一种定义:代理是一个使用 LLM 决定应用控制流程的系统。LLM 控制应用的方式有很多:

  • LLM 可以在两个路径间进行选择
  • LLM 可以决定调用哪个工具
  • LLM 可以判断当前结果是否足够,是否需要进一步处理

因此,存在多种不同的代理架构,LLM 在其中拥有不同程度的控制权。

在这里插入图片描述

Router(路由)

Router 允许 LLM 在一组预定义选项中选择一个步骤。它体现了一种控制权相对有限的代理架构,因为模型只做出一次决策,并从一组有限的选项中选择输出。Router 通常依赖几个关键概念来实现。

结构化输出通过指定格式或模式,让 LLM 以特定方式作答。它与工具调用类似但更通用。结构化输出常见实现方式包括:

  • Prompt Engineering:通过提示词指引模型以特定格式作答
  • 输出解析器(Output Parsers):对模型输出进行后处理以提取结构化数据
  • 工具调用:使用支持结构化输入输出的 LLM 工具调用能力

结构化输出对于 Router 很关键,它确保模型决策可被系统可靠解析和执行。

工具调用代理(Tool-calling Agent)

相比 Router,工具调用代理扩展了模型的两个关键能力:

  • 多步骤决策能力:模型可以进行一系列连续决策
  • 工具访问能力:模型可调用多个不同的工具完成任务

ReAct 架构(论文:https://arxiv.org/abs/2210.03629)是一个流行的通用代理架构,它结合了以下三大核心概念:

  • 工具调用:允许模型选择并调用外部工具
  • 记忆能力(Memory):让代理保留并使用先前步骤中的信息
  • 规划能力(Planning):使模型能够制定并执行多步骤计划以达成目标

这种架构使代理行为更复杂灵活,能进行动态问题求解。现代代理基于消息流实现,借助 LLM 的工具调用能力。

在 LangGraph 中,你可以使用预构建的代理快速开始构建工具调用型代理。

工具调用

当代理需要与外部系统(如 API)交互时,工具就派上用场了。外部系统通常要求特定格式的输入,而非自然语言。当我们将某个 API 绑定为工具时,就为模型提供了其输入 schema 的认知。模型会根据用户的自然语言输入选择合适的工具,并生成符合格式要求的输入。

许多 LLM 提供商支持工具调用,LangChain 中的工具接口也很简单:你可以将任何 Python 函数传给 ChatModel.bind_tools(function) 即可。

在这里插入图片描述

记忆机制(Memory)

记忆对代理至关重要,使其能在多步骤任务中保持上下文。记忆可分为:

  • 短期记忆:记录当前会话中的中间信息
  • 长期记忆:存储跨会话的信息,如过往的消息历史

LangGraph 提供了对记忆机制的完整控制:

  • State:用户自定义的 schema,定义要保留的记忆结构
  • Checkpointer:在每一步中持久化状态,用于会话间同步
  • Store:用于存储用户级或应用级的全局数据

通过这些机制,你可以灵活实现所需的记忆功能。具体参考教程:如何为图添加记忆。

良好的记忆管理能提升代理的上下文保持能力,增强学习能力,并做出更明智的决策。

规划机制(Planning)

在工具调用型代理中,LLM 通常运行在一个 while 循环中。每一步模型都会:

  • 决定调用哪些工具
  • 给出这些工具的输入
  • 执行工具调用
  • 接收返回结果作为观察反馈,继续下一轮调用

这个循环会在模型判断“已有足够信息”时停止,不再继续调用工具。

自定义 Agent 架构

除了常见的 Router 与工具调用代理(如 ReAct),在具体任务中构建自定义代理架构常常能带来更优的性能。LangGraph 提供以下特性支持定制化开发。

人类参与(Human-in-the-loop)

人类参与对于提高代理可靠性至关重要,尤其在敏感任务中。人类可以:

  • 审核某些行为
  • 提供反馈以更新代理状态
  • 在复杂判断中提供指导

在无法完全自动化的场景中,这种模式尤为重要。详见:Human-in-the-loop 指南。

并行化(Parallelization)

并行处理对多代理系统和复杂任务非常重要。LangGraph 的 Send API 支持:

  • 并发处理多个状态
  • 实现 map-reduce 类型操作
  • 高效处理互不依赖的子任务

参考教程:Map-Reduce 架构实现

子图(Subgraphs)

在多代理系统中,子图对管理复杂架构很有帮助:

  • 为各个子代理提供独立状态管理
  • 实现代理团队的分层结构
  • 控制主图与子图之间的数据交换

子图通过共享状态 schema 的部分字段与主图通信,从而实现灵活、模块化的代理设计。可参考:子图使用指南。

反思机制(Reflection)

反思机制可提升代理的稳健性:

  • 检查任务是否完成、结果是否正确
  • 提供改进反馈,实现迭代优化
  • 实现自我纠错与学习

反思可以由 LLM 实现,也可以基于确定性机制。例如在编程任务中,可通过编译错误作为反馈信号。参见示例视频:LangGraph 自我纠错生成代码。

通过灵活使用这些功能,LangGraph 可构建出功能强大、任务专用的代理系统,支持复杂流程、代理协作、自我改进等高级行为。

工作流与智能体(Workflows and Agents)

本指南回顾了智能体系统(agentic systems)中的常见模式。在描述这些系统时,区分“工作流”与“智能体”是非常有帮助的。Anthropic 在其博客《Building Effective Agents》中很好地阐明了这一区别:

  • 工作流是指通过预定义的代码路径来编排 LLM 和工具的系统。
  • 智能体则是 LLM 动态引导其自身流程和工具使用的系统,具备对任务完成方式的控制能力。

以下是一个简单的方式来可视化这两者的区别:

在这里插入图片描述
构建智能体与工作流时,LangGraph 提供了以下优势:

  • 持久化(Persistence)
  • 流式处理(Streaming)
  • 调试支持(Debugging)
  • 部署能力(Deployment)

你可以使用任何支持结构化输出与工具调用的聊天模型。以下是使用 Anthropic 模型时的依赖安装、API Key 设置、结构化输出和工具调用测试流程。

安装依赖

pip install langchain langchain-anthropic

初始化 LLM

import os
import getpass
from langchain_anthropic import ChatAnthropicdef _set_env(var: str):if not os.environ.get(var):os.environ[var] = getpass.getpass(f"{var}: ")_set_env("ANTHROPIC_API_KEY")llm = ChatAnthropic(model="claude-3-5-sonnet-latest")

构建模块:增强型大语言模型(Augmented LLM)

大语言模型(LLM)拥有一些增强能力,可用于构建工作流和智能体。这些增强包括结构化输出(Structured Outputs)工具调用(Tool Calling),如下图所示,摘自 Anthropic 的博客《Building Effective Agents》:

在这里插入图片描述

# Schema for structured output
from pydantic import BaseModel, Fieldclass SearchQuery(BaseModel):search_query: str = Field(None, description="Query that is optimized web search.")justification: str = Field(None, description="Why this query is relevant to the user's request.")# Augment the LLM with schema for structured output
structured_llm = llm.with_structured_output(SearchQuery)# Invoke the augmented LLM
output = structured_llm.invoke("How does Calcium CT score relate to high cholesterol?")# Define a tool
def multiply(a: int, b: int) -> int:return a * b# Augment the LLM with tools
llm_with_tools = llm.bind_tools([multiply])# Invoke the LLM with input that triggers the tool call
msg = llm_with_tools.invoke("What is 2 times 3?")# Get the tool call
msg.tool_calls

提示链(Prompt chaining)

在提示链中,每次 LLM 调用都会处理上一步的输出。

正如 Anthropic 的博客《Building Effective Agents》中所指出的:

提示链将一个任务分解为一系列步骤,每一步由一个 LLM 调用完成,处理的是上一步的输出。你可以在任意中间步骤中添加编程式校验(参见下图中的“gate”)以确保流程仍在正确轨道上。

何时使用此工作流: 当任务可以被清晰地拆解为固定的子任务时,这种工作流非常适用。其主要目标是在牺牲一些延迟的情况下换取更高的准确率 —— 因为每一步 LLM 的处理任务都更简单。

在这里插入图片描述
图形 API(Graph API):

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display# Graph state
class State(TypedDict):topic: strjoke: strimproved_joke: strfinal_joke: str# Nodes
def generate_joke(state: State):"""First LLM call to generate initial joke"""msg = llm.invoke(f"Write a short joke about {state['topic']}")return {"joke": msg.content}def check_punchline(state: State):"""Gate function to check if the joke has a punchline"""# Simple check - does the joke contain "?" or "!"if "?" in state["joke"] or "!" in state["joke"]:return "Pass"return "Fail"def improve_joke(state: State):"""Second LLM call to improve the joke"""msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")return {"improved_joke": msg.content}def polish_joke(state: State):"""Third LLM call for final polish"""msg = llm.invoke(f"Add a surprising twist to this joke: {state['improved_joke']}")return {"final_joke": msg.content}# Build workflow
workflow = StateGraph(State)# Add nodes
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)# Add edges to connect nodes
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges("generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)# Compile
chain = workflow.compile()# Show workflow
display(Image(chain.get_graph().draw_mermaid_png()))# Invoke
state = chain.invoke({"topic": "cats"})
print("Initial joke:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:print("Improved joke:")print(state["improved_joke"])print("\n--- --- ---\n")print("Final joke:")print(state["final_joke"])
else:print("Joke failed quality gate - no punchline detected!")

函数式 API(Functional API):

from langgraph.func import entrypoint, task# Tasks
@task
def generate_joke(topic: str):"""First LLM call to generate initial joke"""msg = llm.invoke(f"Write a short joke about {topic}")return msg.contentdef check_punchline(joke: str):"""Gate function to check if the joke has a punchline"""# Simple check - does the joke contain "?" or "!"if "?" in joke or "!" in joke:return "Fail"return "Pass"@task
def improve_joke(joke: str):"""Second LLM call to improve the joke"""msg = llm.invoke(f"Make this joke funnier by adding wordplay: {joke}")return msg.content@task
def polish_joke(joke: str):"""Third LLM call for final polish"""msg = llm.invoke(f"Add a surprising twist to this joke: {joke}")return msg.content@entrypoint()
def prompt_chaining_workflow(topic: str):original_joke = generate_joke(topic).result()if check_punchline(original_joke) == "Pass":return original_jokeimproved_joke = improve_joke(original_joke).result()return polish_joke(improved_joke).result()# Invoke
for step in prompt_chaining_workflow.stream("cats", stream_mode="updates"):print(step)print("\n")

Prompt chaining 的核心思想是把一个任务分解成多个步骤,每一步用一次 LLM 调用来完成,下一步的输入依赖于上一步的输出

就像在流水线上,每个工人(LLM)都只做一小部分工作,最后拼出完整的产品。

比如你想让 LLM 写一篇博客:

  1. 第一步:让 LLM 根据标题生成提纲
  2. 第二步:用提纲写段落草稿
  3. 第三步:润色草稿,生成最终版本

这就是一个 prompt chaining 流程。每个步骤是一个 LLM 调用,前一步的结果喂给下一步。

Anthropic 的博客强调了两点:

  1. 提示链可以让任务更容易处理
    一次性叫 LLM 写整篇文章难度很大,但把它拆成 3 步,每步任务都更简单,结果反而可能更好。

  2. 你可以在中间步骤插入程序逻辑做“检查”
    比如你可以在“提纲生成”后加个判断逻辑:提纲是否包含至少 3 个要点?不合格就让 LLM 重写。这种机制叫 gate(门)

什么时候适合用 Prompt Chaining?

  • 任务是 线性步骤(不能乱序)
  • 每步逻辑相对清晰
  • 希望提高 准确率或可靠性
  • 不介意多花一些延迟时间(因为多次 LLM 调用)

如果任务流程会分叉(比如调用不同工具、需要多轮决策),那就不适合用 prompt chaining,而需要更复杂的 agent 架构LangGraph 之类的图调度模型。

Prompt chaining 是最简单的 agent 架构,用多个 LLM 调用组成一个固定顺序的任务处理链,可以插入逻辑检查提升可靠性,适用于流程简单但对准确率有要求的任务。

并行处理(Parallelization)

通过并行处理,多个 LLM 可以同时协作完成任务:

在这里插入图片描述

LLM 有时可以同时处理一个任务,并通过程序将它们的输出聚合起来。这种工作流程被称为并行化(parallelization),主要有两种形式:

  • 任务分段(Sectioning):将任务拆解为相互独立的子任务,并行执行。
  • 投票机制(Voting):对同一个任务运行多次,获取多样化的输出

适用场景

  • 当子任务可以独立并行执行,以提高处理速度。
  • 当任务需要从多个角度出发,或需要多次尝试以获得更高置信度的结果。

对于需要多方面考虑的复杂任务,让每个方面分别由独立的 LLM 调用处理,往往能带来更好的表现,因为每次调用都能专注于某一个具体方面。

假设你想用 LLM 写一篇电影评论,但希望从剧情、表演、视觉效果这三个不同角度来分析。

传统顺序做法(串行)

  • 先让 LLM 写剧情点评
  • 再让 LLM 写表演点评
  • 最后写视觉效果点评
  • 把三部分合成一篇评论

并行处理做法

  • 把任务拆成3个独立子任务,分别给3个 LLM 同时写剧情、表演和视觉效果点评
  • 三个 LLM 同时工作,互不干扰,节省时间
  • 把3个输出合起来,组成完整评论

好处

  • 更快:因为三个部分同时写,不用等前一个完成才开始后一个
  • 更专注:每个 LLM 只关注一个角度,输出更精准

再举一个投票机制的例子:

你让 LLM 给“这部电影值得推荐吗?”这个问题回答3次,得到3个答案,分别是:

  • 是,很精彩
  • 还可以,有优点也有缺点
  • 不推荐,剧情无聊

然后你通过程序“投票”决定最终答案,比如取出现频率最高的那个,或者综合分析这三个答案的理由。

这样,并行处理就帮你同时多方面探索问题,最后综合结果,既提高效率,也提升了答案质量。

Graph API:

# Graph state
class State(TypedDict):topic: strjoke: strstory: strpoem: strcombined_output: str# Nodes
def call_llm_1(state: State):"""First LLM call to generate initial joke"""msg = llm.invoke(f"Write a joke about {state['topic']}")return {"joke": msg.content}def call_llm_2(state: State):"""Second LLM call to generate story"""msg = llm.invoke(f"Write a story about {state['topic']}")return {"story": msg.content}def call_llm_3(state: State):"""Third LLM call to generate poem"""msg = llm.invoke(f"Write a poem about {state['topic']}")return {"poem": msg.content}def aggregator(state: State):"""Combine the joke and story into a single output"""combined = f"Here's a story, joke, and poem about {state['topic']}!\n\n"combined += f"STORY:\n{state['story']}\n\n"combined += f"JOKE:\n{state['joke']}\n\n"combined += f"POEM:\n{state['poem']}"return {"combined_output": combined}# Build workflow
parallel_builder = StateGraph(State)# Add nodes
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)# Add edges to connect nodes
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()# Show workflow
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))# Invoke
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])

函数式API

@task
def call_llm_1(topic: str):"""First LLM call to generate initial joke"""msg = llm.invoke(f"Write a joke about {topic}")return msg.content@task
def call_llm_2(topic: str):"""Second LLM call to generate story"""msg = llm.invoke(f"Write a story about {topic}")return msg.content@task
def call_llm_3(topic):"""Third LLM call to generate poem"""msg = llm.invoke(f"Write a poem about {topic}")return msg.content@task
def aggregator(topic, joke, story, poem):"""Combine the joke and story into a single output"""combined = f"Here's a story, joke, and poem about {topic}!\n\n"combined += f"STORY:\n{story}\n\n"combined += f"JOKE:\n{joke}\n\n"combined += f"POEM:\n{poem}"return combined# Build workflow
@entrypoint()
def parallel_workflow(topic: str):joke_fut = call_llm_1(topic)story_fut = call_llm_2(topic)poem_fut = call_llm_3(topic)return aggregator(topic, joke_fut.result(), story_fut.result(), poem_fut.result()).result()# Invoke
for step in parallel_workflow.stream("cats", stream_mode="updates"):print(step)print("\n")

我们注意到这里的状态机是没有messages字段的,为什么这个示例没有用 messages?

  • 这是一个基于函数式调用的示例,llm.invoke(prompt) 直接传入字符串 prompt,返回一个响应。
  • 它把每个步骤的输出直接存储在 State 里(如 joke/story/poem),并用状态图管理任务节点之间的依赖关系。
  • 这种方式更像是传统的“调用接口拿结果,存储状态”的工作流,不是基于对话上下文传递 messages

状态机是否必须有 messages 字段?

  • 不一定。
  • 有的系统或库设计上用 messages 作为状态,因为它是对话模型核心的上下文单元(特别是基于 ChatCompletion 的模型)。
  • 但也可以用纯结构化字段来管理状态,只要你调用 LLM 的接口支持纯文本 prompt,这种设计就行得通。

这里是两种不同思路的区别:

方面基于 messages 的状态机基于结构化 State 的工作流
状态表现通过 messages 数组体现对话历史State 的字段体现任务结果、数据
LLM 调用通常是 chat.completions.create(messages=...)直接调用 llm.invoke(prompt_string)
优势方便对话式上下文管理灵活处理非对话类任务,状态字段清晰,易编排
使用场景需要上下文连续、多轮对话独立任务节点结果处理,适合多任务并行和聚合

路由(Routing)

路由对输入进行分类,并将其引导到后续的专门任务。正如 Anthropic 在《构建高效代理》一文中所指出的:

路由会对输入进行分类,并将其引导到一个专门的后续任务。该工作流允许关注点分离,构建更专业化的提示语。没有这种工作流,为某一类输入进行优化可能会影响对其他输入的表现。

何时使用此工作流:路由适用于复杂任务中存在明显类别且这些类别更适合分别处理的情况,且分类能够被准确处理(无论是由大型语言模型还是传统的分类模型/算法完成)。

在这里插入图片描述

from typing_extensions import Literal
from langchain_core.messages import HumanMessage, SystemMessage# Schema for structured output to use as routing logic
class Route(BaseModel):step: Literal["poem", "story", "joke"] = Field(None, description="The next step in the routing process")# Augment the LLM with schema for structured output
router = llm.with_structured_output(Route)# State
class State(TypedDict):input: strdecision: stroutput: str# Nodes
def llm_call_1(state: State):"""Write a story"""result = llm.invoke(state["input"])return {"output": result.content}def llm_call_2(state: State):"""Write a joke"""result = llm.invoke(state["input"])return {"output": result.content}def llm_call_3(state: State):"""Write a poem"""result = llm.invoke(state["input"])return {"output": result.content}def llm_call_router(state: State):"""Route the input to the appropriate node"""# Run the augmented LLM with structured output to serve as routing logicdecision = router.invoke([SystemMessage(content="Route the input to story, joke, or poem based on the user's request."),HumanMessage(content=state["input"]),])return {"decision": decision.step}# Conditional edge function to route to the appropriate node
def route_decision(state: State):# Return the node name you want to visit nextif state["decision"] == "story":return "llm_call_1"elif state["decision"] == "joke":return "llm_call_2"elif state["decision"] == "poem":return "llm_call_3"# Build workflow
router_builder = StateGraph(State)# Add nodes
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)# Add edges to connect nodes
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges("llm_call_router",route_decision,{  # Name returned by route_decision : Name of next node to visit"llm_call_1": "llm_call_1","llm_call_2": "llm_call_2","llm_call_3": "llm_call_3",},
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)# Compile workflow
router_workflow = router_builder.compile()# Show the workflow
display(Image(router_workflow.get_graph().draw_mermaid_png()))# Invoke
state = router_workflow.invoke({"input": "Write me a joke about cats"})
print(state["output"])

协调者-执行者(Orchestrator-Worker)

在协调者-执行者模式中,协调者将任务拆解,并将每个子任务分配给执行者。如 Anthropic 在《构建高效代理》一文中所述:

在协调者-执行者工作流中,中央的 LLM 动态拆分任务,将其委派给多个执行者 LLM,并综合它们的结果。

何时使用此工作流:该工作流非常适合于无法预先确定所需子任务的复杂任务(例如在编程中,需要修改的文件数量以及每个文件修改的内容通常依赖于具体任务)。虽然在结构上与并行化类似,但其关键区别在于灵活性——子任务不是预先定义的,而是由协调者根据具体输入动态决定。

在这里插入图片描述

from typing import Annotated, List
import operator# Schema for structured output to use in planning
class Section(BaseModel):name: str = Field(description="Name for this section of the report.",)description: str = Field(description="Brief overview of the main topics and concepts to be covered in this section.",)class Sections(BaseModel):sections: List[Section] = Field(description="Sections of the report.",)# Augment the LLM with schema for structured output
planner = llm.with_structured_output(Sections)

在 LangGraph 中创建执行者(Workers)

由于协调者-执行者工作流非常常见,LangGraph 提供了 Send API 来支持这一模式。它允许你动态创建执行者节点,并向每个执行者发送特定的输入。每个执行者拥有自己的状态,所有执行者的输出都会写入一个共享的状态键,协调者图可以访问该键。这使协调者能够获取所有执行者的输出,并将它们综合成最终结果。

from langgraph.constants import Send# Graph state
class State(TypedDict):topic: str  # Report topicsections: list[Section]  # List of report sectionscompleted_sections: Annotated[list, operator.add]  # 并行执行者写入这里final_report: str  # Final report# Worker state
class WorkerState(TypedDict):section: Sectioncompleted_sections: Annotated[list, operator.add]# Nodes
def orchestrator(state: State):"""Orchestrator that generates a plan for the report"""# Generate queries# 用一个带结构化输出能力的 LLM planner,基于主题规划报告章节(返回章节列表)。report_sections = planner.invoke([SystemMessage(content="Generate a plan for the report."),HumanMessage(content=f"Here is the report topic: {state['topic']}"),])return {"sections": report_sections.sections}def llm_call(state: WorkerState):"""Worker writes a section of the report"""# Generate sectionsection = llm.invoke([SystemMessage(content="Write a report section following the provided name and description. Include no preamble for each section. Use markdown formatting."),HumanMessage(content=f"Here is the section name: {state['section'].name} and description: {state['section'].description}"),])# Write the updated section to completed sectionsreturn {"completed_sections": [section.content]}def synthesizer(state: State):"""Synthesize full report from sections"""# List of completed sectionscompleted_sections = state["completed_sections"]# Format completed section to str to use as context for final sectionscompleted_report_sections = "\n\n---\n\n".join(completed_sections)return {"final_report": completed_report_sections}# Conditional edge function to create llm_call workers that each write a section of the report
def assign_workers(state: State):"""Assign a worker to each section in the plan"""# Kick off section writing in parallel via Send() APIreturn [Send("llm_call", {"section": s}) for s in state["sections"]]# Build workflow
orchestrator_worker_builder = StateGraph(State)# Add the nodes
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)# Add edges to connect nodes
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges("orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)# Compile the workflow
orchestrator_worker = orchestrator_worker_builder.compile()# Show the workflow
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))# Invoke
state = orchestrator_worker.invoke({"topic": "Create a report on LLM scaling laws"})from IPython.display import Markdown
Markdown(state["final_report"])

评估者-优化者(Evaluator-optimizer)

在评估者-优化者工作流中,一个 LLM 调用生成响应,另一个则在循环中提供评估和反馈:

何时使用该工作流:当我们拥有明确的评估标准,且迭代改进能带来可衡量的价值时,这种工作流特别有效。适用的两个标志是:第一,当人工明确表达反馈时,LLM 的响应可以明显提升;第二,LLM 本身能提供这种反馈。这类似于人类作者在撰写精炼文档时所经历的反复迭代过程。

在这里插入图片描述

# Graph state
class State(TypedDict):joke: strtopic: strfeedback: strfunny_or_not: str# Schema for structured output to use in evaluation
class Feedback(BaseModel):grade: Literal["funny", "not funny"] = Field(description="Decide if the joke is funny or not.",)feedback: str = Field(description="If the joke is not funny, provide feedback on how to improve it.",)# Augment the LLM with schema for structured output
evaluator = llm.with_structured_output(Feedback)# Nodes
def llm_call_generator(state: State):"""LLM generates a joke"""if state.get("feedback"):msg = llm.invoke(f"Write a joke about {state['topic']} but take into account the feedback: {state['feedback']}")else:msg = llm.invoke(f"Write a joke about {state['topic']}")return {"joke": msg.content}def llm_call_evaluator(state: State):"""LLM evaluates the joke"""grade = evaluator.invoke(f"Grade the joke {state['joke']}")return {"funny_or_not": grade.grade, "feedback": grade.feedback}# Conditional edge function to route back to joke generator or end based upon feedback from the evaluator
def route_joke(state: State):"""Route back to joke generator or end based upon feedback from the evaluator"""if state["funny_or_not"] == "funny":return "Accepted"elif state["funny_or_not"] == "not funny":return "Rejected + Feedback"# Build workflow
optimizer_builder = StateGraph(State)# Add the nodes
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)# Add edges to connect nodes
optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges("llm_call_evaluator",route_joke,{  # Name returned by route_joke : Name of next node to visit"Accepted": END,"Rejected + Feedback": "llm_call_generator",},
)# Compile the workflow
optimizer_workflow = optimizer_builder.compile()# Show the workflow
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))# Invoke
state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])

代码逻辑解释:

  1. 状态结构体(State)
    包含当前笑话 (joke)、主题 (topic)、反馈 (feedback)、是否有趣 (funny_or_not) 等字段,贯穿整个流程。

  2. 生成节点(llm_call_generator)
    LLM 根据主题生成一个笑话。如果有反馈,会基于反馈重新生成改进版本。

  3. 评估节点(llm_call_evaluator)
    另一个 LLM 评估笑话是否“有趣”,并给出改进建议(反馈)。

  4. 条件路由(route_joke)
    如果评估结果是“有趣”,流程结束;否则,带着反馈回到生成节点,重新生成改进笑话。

  5. 流程控制

    • 初始从生成节点开始
    • 生成笑话后,进入评估节点
    • 评估结果决定是结束流程还是返回重新生成

这个示例想表达什么?

  • 这是一个迭代循环流程:生成 → 评估 → 反馈 → 改进 → 重新生成 → …,直到满意为止。

  • 两种角色分工

    • **生成者(Generator)**负责输出内容(比如笑话)
    • **评估者(Evaluator)**负责评价输出的质量,给出反馈

适用场景举例

  • 文本内容创作:写文章、故事、广告文案、代码片段等,生成后让模型评估质量并反馈改进建议。
  • 自动化调优:模型自我检测输出错误或不足,循环改进直到符合标准。
  • 复杂任务求解:每次输出一个方案,由另一个模块评价,保证结果逐步优化。
  • 教学或训练辅助:模型作为学生写答案,另一个模型作为老师给反馈,引导改进。

这个工作流适合明确有评判标准,且能通过多轮迭代改进输出质量的任务

Agent(智能体)

智能体通常是指基于大语言模型(LLM),在一个循环中根据环境反馈执行动作(通过调用工具)。正如 Anthropic 关于构建高效智能体的博客中所述:

智能体能够处理复杂任务,但其实现往往比较简单。它们通常只是利用工具,根据环境反馈循环调用 LLM。因此,设计工具集及其文档时,必须清晰且用心。

何时使用智能体:智能体适用于那些步骤数难以预测或无法硬编码固定路径的开放式问题。LLM 可能需要多轮操作,这时你需要对其决策能力有一定信任。智能体的自主性使它们非常适合在可信环境中扩展任务处理。

在这里插入图片描述

from langchain_core.tools import tool# Define tools
@tool
def multiply(a: int, b: int) -> int:"""Multiply a and b.Args:a: first intb: second int"""return a * b@tool
def add(a: int, b: int) -> int:"""Adds a and b.Args:a: first intb: second int"""return a + b@tool
def divide(a: int, b: int) -> float:"""Divide a and b.Args:a: first intb: second int"""return a / b# Augment the LLM with tools
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)

图API

from langgraph.graph import MessagesState
from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage# Nodes
def llm_call(state: MessagesState):"""LLM decides whether to call a tool or not"""return {"messages": [llm_with_tools.invoke([SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")]+ state["messages"])]}def tool_node(state: dict):"""Performs the tool call"""result = []for tool_call in state["messages"][-1].tool_calls:tool = tools_by_name[tool_call["name"]]observation = tool.invoke(tool_call["args"])result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))return {"messages": result}# Conditional edge function to route to the tool node or end based upon whether the LLM made a tool call
def should_continue(state: MessagesState) -> Literal["environment", END]:"""Decide if we should continue the loop or stop based upon whether the LLM made a tool call"""messages = state["messages"]last_message = messages[-1]# If the LLM makes a tool call, then perform an actionif last_message.tool_calls:return "Action"# Otherwise, we stop (reply to the user)return END# Build workflow
agent_builder = StateGraph(MessagesState)# Add nodes
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("environment", tool_node)# Add edges to connect nodes
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges("llm_call",should_continue,{# Name returned by should_continue : Name of next node to visit"Action": "environment",END: END,},
)
agent_builder.add_edge("environment", "llm_call")# Compile the agent
agent = agent_builder.compile()# Show the agent
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))# Invoke
messages = [HumanMessage(content="Add 3 and 4.")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:m.pretty_print()

LangGraph 还提供了一个预构建的方法来创建上述定义的智能体(使用 create_react_agent 函数):

https://langchain-ai.github.io/langgraph/how-tos/create-react-agent/

from langgraph.prebuilt import create_react_agent# 传入:
# (1) 带工具增强的大语言模型(LLM)
# (2) 工具列表(用于创建工具节点)
pre_built_agent = create_react_agent(llm, tools=tools)# 显示智能体结构图
display(Image(pre_built_agent.get_graph().draw_mermaid_png()))# 调用智能体
messages = [HumanMessage(content="Add 3 and 4.")]
messages = pre_built_agent.invoke({"messages": messages})
for m in messages["messages"]:m.pretty_print()

Graphs

Graph API

LangGraph 核心是将智能体工作流建模为图结构。你通过三个关键组成部分定义智能体的行为:

  • State(状态):表示应用当前快照的共享数据结构。可以是任意 Python 类型,通常是 TypedDict 或 Pydantic BaseModel。

  • Nodes(节点):Python 函数,编码智能体的逻辑。它们接收当前 State 作为输入,执行计算或副作用,返回更新后的 State。

  • Edges(边):Python 函数,根据当前 State 决定下一个执行的节点。可以是条件分支或固定跳转。

通过组合节点和边,可以创建复杂且带有循环的工作流,使 State 随时间演化。LangGraph 的真正强大之处在于它如何管理 State。强调一点:节点和边本质上只是 Python 函数——它们可以包含大模型调用,也可以是普通的 Python 代码。

简而言之:

  • 节点负责执行任务,
  • 边负责指示下一步执行哪个节点。

LangGraph 底层的图算法使用消息传递来定义通用程序。节点完成操作后,沿一条或多条边发送消息到其他节点。接收节点执行其函数,将结果消息传递给下一组节点,循环往复。此机制受 Google Pregel 系统启发,程序以离散的“超级步”(super-step)进行。

超级步可以看作图节点的一次迭代。并行运行的节点属于同一超级步,顺序执行的节点属于不同超级步。执行开始时,所有节点处于非激活状态。节点在其任一入边(或“通道”)接收到新消息(状态)时被激活,执行函数并返回更新。每个超级步结束时,无入消息的节点投票停止,将自身标记为非激活。所有节点非激活且无消息传输时,图执行终止。

StateGraph

StateGraph 类是主要的图类,由用户定义的 State 对象参数化。

构建图时,先定义 State,再添加节点和边,最后进行编译。编译是什么,为什么需要?

编译过程很简单。它会对图结构做基本检查(例如无孤立节点等)。同时可在此指定运行时参数,如检查点和断点。调用 .compile 方法即可编译图:

graph = graph_builder.compile(...)

必须先编译图,才能使用它。

State

定义图的第一步是定义图的 State(状态)。State 包含图的 schema(结构)和 reducer 函数,用于指定如何将更新应用到状态。State 的 schema 是图中所有节点(Nodes)和边(Edges)的输入结构,通常是 TypedDict 或 Pydantic 模型。所有节点都会输出对 State 的更新,这些更新通过指定的 reducer 函数进行应用。

Schema

定义图的 schema 主要用 TypedDict,也支持使用 Pydantic BaseModel,以便添加默认值和额外的数据校验。

默认情况下,图的输入输出 schema 相同。如果需要区分输入输出,可以显式指定输入和输出 schema。这在键较多且部分键专门用于输入,部分专门用于输出时非常有用。

通常,图中所有节点使用单一 schema,意味着它们读写相同的状态通道。但有时需要更细致控制:

  • 内部节点可能传递图的输入/输出中不需要的信息。
  • 可能想为图定义不同的输入/输出 schema,比如输出只包含一个关键字段。
  • 节点可以写入图内部的私有状态通道,用于节点间内部通信。只需定义一个私有 schema,如 PrivateState,详见指南。
  • 也可为图定义显式的输入和输出 schema,同时定义一个“内部” schema,包含所有与图操作相关的键。输入和输出 schema 是“内部” schema的子集,用于限制输入输出。

示例代码:

class InputState(TypedDict):user_input: strclass OutputState(TypedDict):graph_output: strclass OverallState(TypedDict):foo: struser_input: strgraph_output: strclass PrivateState(TypedDict):bar: strdef node_1(state: InputState) -> OverallState:return {"foo": state["user_input"] + " name"}def node_2(state: OverallState) -> PrivateState:return {"bar": state["foo"] + " is"}def node_3(state: PrivateState) -> OutputState:return {"graph_output": state["bar"] + " Lance"}builder = StateGraph(OverallState, input=InputState, output=OutputState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", "node_3")
builder.add_edge("node_3", END)graph = builder.compile()
graph.invoke({"user_input": "My"})
# 返回 {'graph_output': 'My name is Lance'}

这里有两个要点:

  1. 输入 schema 给 node_1 是 InputState,但 node_1 写出了 foo(OverallState 的通道)。这是因为节点可以写入图状态中定义的任意通道,图状态是初始化时所有状态通道的并集,包括 OverallState 和 InputState、OutputState。
  2. 图初始化时未传入 PrivateState,node_2 依然可以写入 PrivateState,这是因为只要定义了状态 schema,节点就能声明额外状态通道,图也能访问它们。

Reducers

Reducer 是理解节点更新如何应用到 State 的关键。State 中每个键都有独立的 reducer 函数。若未显式指定 reducer,则默认更新会直接覆盖该键。

以下两个例子展示了如何使用默认的 reducer:

示例 A:

from typing_extensions import TypedDictclass State(TypedDict):foo: intbar: list[str]# 假设图输入 {"foo": 1, "bar": ["hi"]}
# 节点返回 {"foo": 2},更新后 State 为 {"foo": 2, "bar": ["hi"]}
# 下一个节点返回 {"bar": ["bye"]},更新后 State 为 {"foo": 2, "bar": ["bye"]}

示例 B:

from typing import Annotated
from typing_extensions import TypedDict
from operator import addclass State(TypedDict):foo: intbar: Annotated[list[str], add]# 输入同上
# 节点1返回 {"foo": 2},更新后 State 为 {"foo": 2, "bar": ["hi"]}
# 节点2返回 {"bar": ["bye"]},更新后 State 为 {"foo": 2, "bar": ["hi", "bye"]}
# 这里 bar 键的更新是列表合并(add操作)

图状态中的消息处理

为什么用消息?

现代大模型提供的聊天接口接受消息列表作为输入。LangChain 的 ChatModel 接受 Message 对象列表输入,消息类型包括 HumanMessage(用户输入)、AIMessage(模型响应)等。

在图中使用消息

通常需要在图状态中存储对话历史消息列表。为此,可以在状态中添加一个消息列表键(通道),并用 reducer 函数进行注解(例如示例中的 messages 键)。reducer 告诉图如何用节点的更新修改消息列表。若无 reducer,状态更新会覆盖消息列表。想要追加消息,可以用 operator.add。

但若希望手动更新消息(如人工干预),用 operator.add 会导致追加而非替换消息。为避免此问题,可以用预置的 add_messages 函数。该函数对新消息追加,对已有消息根据 ID 覆盖更新。

序列化

add_messages 函数会尝试将消息反序列化为 LangChain Message 对象,支持以下输入格式:

{"messages": [HumanMessage(content="message")]}
{"messages": [{"type": "human", "content": "message"}]}

消息总会被反序列化为 Message 对象,因此访问消息属性应用点操作符,如 state["messages"][-1].content

示例:

from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDictclass GraphState(TypedDict):messages: Annotated[list[AnyMessage], add_messages]
MessagesState

因为消息列表很常用,已有预构建状态 MessagesState,内含 messages 键(AnyMessage 列表,使用 add_messages reducer)。通常状态还有其他字段,用户可继承该状态扩展更多字段:

from langgraph.graph import MessagesStateclass State(MessagesState):documents: list[str]

节点(Nodes)

在 LangGraph 中,节点通常是 Python 函数(同步或异步),第一个位置参数为状态(state),可选的第二个位置参数是一个 “config”,包含可选的配置参数(如 thread_id 等)。

与 NetworkX 类似,你可以使用 add_node 方法将这些节点添加到图中:

API 参考RunnableConfig | StateGraph

from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraphbuilder = StateGraph(dict)def my_node(state: dict, config: RunnableConfig):print("In node: ", config["configurable"]["user_id"])return {"results": f"Hello, {state['input']}!"}# 第二个参数是可选的
def my_other_node(state: dict):return statebuilder.add_node("my_node", my_node)
builder.add_node("other_node", my_other_node)

在内部,这些函数会被转换为 RunnableLambda,它们为你的函数增加了批处理和异步支持,同时内置了跟踪和调试功能。

如果你添加节点时没有显式指定名称,它将使用函数名作为默认名称:

builder.add_node(my_node)
# 之后可以通过字符串 "my_node" 来连接该节点

START 节点

START 节点是一个特殊节点,表示用户输入流入图的入口节点。它的主要作用是确定图的起始节点:

from langgraph.graph import STARTgraph.add_edge(START, "node_a")

END 节点

END 节点是一个特殊节点,表示终止节点。你可以使用它来表示某些边在处理完后没有后续动作:

from langgraph.graph import ENDgraph.add_edge("node_a", END)

节点缓存(Node Caching)

LangGraph 支持基于输入的任务/节点缓存。使用缓存需要以下两个步骤:

  1. 在编译图(或指定入口点)时指定缓存;
  2. 为节点指定缓存策略(cache policy):

每个缓存策略支持以下内容:

  • key_func:用于根据节点输入生成缓存 key,默认使用 pickle 的哈希;
  • ttl:缓存的有效时间(秒)。如果不指定,缓存将永久有效。

例如:

import time
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicyclass State(TypedDict):x: intresult: intbuilder = StateGraph(State)def expensive_node(state: State) -> dict[str, int]:# 耗时计算time.sleep(2)return {"result": state["x"] * 2}builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))
builder.set_entry_point("expensive_node")
builder.set_finish_point("expensive_node")graph = builder.compile(cache=InMemoryCache())print(graph.invoke({"x": 5}, stream_mode='updates'))  
# [{'expensive_node': {'result': 10}}]print(graph.invoke({"x": 5}, stream_mode='updates'))  
# [{'expensive_node': {'result': 10}, '__metadata__': {'cached': True}}]

builder.set_entry_point()builder.set_finish_point() 本质上就是在内部帮你设置了 STARTEND 节点的连接关系。它们是对 graph.add_edge(START, ...)graph.add_edge(..., END) 的简化封装。

具体来说:

builder.set_entry_point("node_a")

等价于手动添加:

graph.add_edge(START, "node_a")

表示:从特殊的 START 节点出发,首先执行 "node_a"

builder.set_finish_point("node_b")

等价于手动添加:

graph.add_edge("node_b", END)

表示:执行完 "node_b" 之后,图流程结束。

边(Edges)

边定义了图中逻辑的流转方式以及图何时停止。这是代理(agent)工作方式和不同节点之间通信的核心组成部分。主要有以下几种边的类型:

  • 普通边(Normal Edges):从一个节点直接跳转到下一个节点。
  • 条件边(Conditional Edges):通过调用函数判断接下来跳转到哪个节点。
  • 入口点(Entry Point):指定当用户输入到达时首先调用哪个节点。
  • 条件入口点(Conditional Entry Point):通过函数判断当用户输入到达时首先调用哪些节点。

一个节点可以拥有多个出边。如果一个节点有多个出边,所有目标节点会在下一个“超级步骤(superstep)”中并行执行。

普通边(Normal Edges)

如果你总是希望从节点 A 跳转到节点 B,可以直接使用 add_edge 方法:

graph.add_edge("node_a", "node_b")

条件边(Conditional Edges)

如果你希望有条件地跳转到一个或多个节点(或有条件地终止),可以使用 add_conditional_edges 方法。该方法接收一个节点名和一个“路由函数”,此函数会在该节点执行后被调用:

graph.add_conditional_edges("node_a", routing_function)

与节点类似,routing_function 接收图当前的 state,并返回一个值。

默认情况下,routing_function 的返回值会被当作下一个要跳转的节点名(可以是单个节点或多个节点的列表),这些节点会在下一个“超级步骤”中并行执行。

你也可以选择传入一个字典,手动将 routing_function 的返回值映射到下一个节点名:

graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})

💡 提示:
如果你希望在一个函数中同时进行状态更新和路由判断,可以使用 Command,而不是 conditional edges

入口点(Entry Point)

入口点是图开始执行时首先运行的节点。你可以使用虚拟的 START 节点通过 add_edge 指定入口:

from langgraph.graph import STARTgraph.add_edge(START, "node_a")

条件入口点(Conditional Entry Point)

条件入口点允许你根据自定义逻辑从不同节点开始执行。你可以通过从 START 添加条件边实现:

from langgraph.graph import STARTgraph.add_conditional_edges(START, routing_function)

同样,你也可以提供一个映射字典,将函数返回值映射为对应的下一个节点名:

graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})

Send(发送)

默认情况下,节点(Nodes)和边(Edges)都是预先定义好的,并且它们在同一个共享状态(State)上运行。然而,有些场景下:

  • 事先无法确定边的具体数量
  • 或者你希望在图中同时存在多个版本的状态(State)

一个典型的例子是 Map-Reduce 模式。在这种设计模式中,一个初始节点可能会生成一个对象列表,你可能希望对这些对象分别应用另一个节点的逻辑。

这时候存在两个问题:

  1. 你事先并不知道要生成多少个对象(也就不知道边的数量);
  2. 传递给下游节点的 State 应该是不同的(每个对象对应一个独立的 State)。

为支持这种模式,LangGraph 允许在条件边(conditional edges)中返回 Send 对象。

Send 接收两个参数:

  • 第一个参数是 目标节点名称
  • 第二个参数是要传递给该节点的 状态(state)

例如:

def continue_to_jokes(state: OverallState):return [Send("generate_joke", {"subject": s}) for s in state['subjects']]graph.add_conditional_edges("node_a", continue_to_jokes)

在这个例子中:

  • state['subjects'] 是一个主题列表;
  • 会为每个主题生成一个新的状态并发送给 generate_joke 节点;
  • 这样多个任务就可以并行执行,每个任务处理一个主题。

Send 是 LangGraph 中的一个核心机制,它允许你动态地控制数据如何流向图中的下一个节点,即:

它不是告诉图 “接下来去哪个节点”,而是告诉图 “去哪个节点” + “传什么状态过去”。

为什么需要 Send?

在普通的边(edge)中,我们这么写:

graph.add_edge("node_a", "node_b")

意思是:node_a 执行完后,结果会传给 node_b。传的内容就是 node_a 输出的那个 state,是统一的一份状态。

但是假如你要 Map 一堆数据,比如:

state = {"subjects": ["cat", "dog", "duck"]}

你想要分别给每个 subject 生成笑话,这样你需要对每个 subject 单独传一个状态给 generate_joke

{"subject": "cat"}
{"subject": "dog"}
{"subject": "duck"}

这时候如果还用普通的边是不行的,因为它一次只能传一个 state。而你现在要传 3 个不同的 state,还要都交给同一个节点执行。

Send 是 LangGraph 提供的一个机制,表示:

我要给这个节点发一个“定制的 state”。

结构上看就是:

Send("generate_joke", {"subject": "cat"})

意思是:把 {"subject": "cat"} 这个状态传给 generate_joke 节点。

你可以返回多个 Send,比如:

return [Send("generate_joke", {"subject": "cat"}),Send("generate_joke", {"subject": "dog"}),Send("generate_joke", {"subject": "duck"}),
]

这就相当于你动态地建立了 3 条边,每条边都指向 generate_joke,但传递的状态不同。

概念作用对比普通方式
Send("节点名", 状态)显式控制传输目标 + 传输数据普通边只控制目标,不能分发多个不同的数据
典型应用MapReduce / 多路并发 / fan-out 分发无法用普通 add_edge 实现

这类场景在多任务处理、MapReduce、动态任务分发中非常常见。下面列出几个典型场景:

✅ 场景 1:MapReduce 风格(批量并发任务)

需求: 你有一批数据(如多个产品 ID),每个都要并发做一个处理,比如调用大模型生成描述或摘要。

state = {"product_ids": [101, 102, 103]}

你希望每个 ID 都发给一个节点做处理:

# fan-out
return [Send("summarize_product", {"product_id": pid}) for pid in state["product_ids"]]

✅ 场景 2:并发生成(如多个提示词、多轮问答)

需求: 给定多个主题(subject),希望大模型为每个生成笑话:

state = {"subjects": ["cat", "dog", "duck"]}

你希望:

return [Send("generate_joke", {"subject": "cat"}),Send("generate_joke", {"subject": "dog"}),Send("generate_joke", {"subject": "duck"}),
]

每个 generate_joke 节点拿到的是独立状态、并行处理。


✅ 场景 3:多用户状态并发(每个用户独立执行)

需求: 有多个用户同时发消息给系统:

state = {"users": [{"id": 1, "input": "Hi"},{"id": 2, "input": "Help"},{"id": 3, "input": "Hello"},]
}

你希望为每个用户分发任务到 chat_handler

return [Send("chat_handler", user_state) for user_state in state["users"]]

这样每个用户有自己的独立处理流程。


✅ 场景 4:分支聚合前的 fan-out

配合之后的 join_node 使用:

  1. Send 分发多个子任务;
  2. 等所有任务完成后聚合结果;

非常类似 MapReduce 或流水线中的“并发执行 + 汇总”流程。


✅ 总结:你应该在这些时候使用 Send

使用场景是否适用 Send
动态 fan-out,数量不定✅ 非常适合
给多个对象应用相同逻辑✅ 适合
每个对象有独立状态✅ 适合
所有对象共享全局状态❌ 不适用(普通边即可)
固定流程节点、线性执行❌ 不需要 Send

Command(命令对象)

在实际应用中,我们常常希望在一个节点中同时完成状态更新和控制流跳转。例如:

  • 你可能希望在一个节点中既修改状态(如添加字段),又根据结果跳转到下一个节点。

LangGraph 提供了 Command 对象来支持这种写法 —— 你可以在节点函数中返回一个 Command 对象,来同时指定状态更新和跳转逻辑。

✅ 示例:

def my_node(state: State) -> Command[Literal["my_other_node"]]:return Command(update={"foo": "bar"},      # 状态更新goto="my_other_node"        # 跳转到下一个节点)

你也可以写动态跳转逻辑(和条件边的效果一样):

def my_node(state: State) -> Command[Literal["my_other_node"]]:if state["foo"] == "bar":return Command(update={"foo": "baz"}, goto="my_other_node")

⚠️ 注意:
使用 Command 时,必须在函数返回类型里写明所有可能跳转到的节点名,如:
Command[Literal["my_other_node"]]
这是为了让 LangGraph 能够正确生成图形结构。

什么时候用 Command 而不是条件边?

场景推荐方式
只需要根据状态跳转(无状态修改)使用 add_conditional_edges()
需要同时更新状态控制跳转使用 Command()

例如:在多智能体系统中,一个 agent 需要“交接任务”给另一个 agent,并传递中间状态,这种情况非常适合用 Command

跳转到父图节点(跨子图跳转)

如果你使用了子图(subgraph),而你希望从子图中的一个节点跳转到父图的其他节点,可以使用:

return Command(update={"foo": "bar"},goto="other_subgraph",     # 跳转目标是父图中的节点graph=Command.PARENT       # 显示声明跳出子图
)

⚠️ 注意:
如果你要更新的字段同时在子图和父图的状态定义中都存在,那么必须在父图中为该字段定义 reducer(状态合并函数)。

常见应用场景

📌 在工具内部使用 Command 更新状态

例如:在客服系统中,开局阶段通过用户输入的账号查找其基本信息,并更新对话状态。这类逻辑适合在工具内部调用后返回 Command 进行跳转和状态更新。

📌 人类参与流程(Human-in-the-loop)

在中断执行(如等待用户输入)时,LangGraph 提供 interrupt() 机制暂停流程,用户输入后使用:

Command(resume="用户的输入")

恢复执行。这是 Command 在 HITL 流程中的核心机制。

配置

创建图时,你可以标记图中的某些部分为可配置的。通常这样做是为了方便在不同模型或系统提示间切换。这允许你创建一个统一的“认知架构”(即图),但可以拥有多个不同的实例。

你可以在创建图时选择性地指定一个配置模式(config_schema)。

class ConfigSchema(TypedDict):llm: strgraph = StateGraph(State, config_schema=ConfigSchema)

然后可以通过 configurable 配置字段将配置传入图中。

config = {"configurable": {"llm": "anthropic"}}graph.invoke(inputs, config=config)

之后你可以在节点函数或条件边中访问并使用这个配置:

def node_a(state, config):llm_type = config.get("configurable", {}).get("llm", "openai")llm = get_llm(llm_type)...

Configuration(配置)在 LangGraph 里的作用其实主要是让同一个图结构(Graph)能够复用,不用改代码就能切换行为或底层实现。总结来说:

  • 不是单纯“传递一个配置”这么简单,而是给整个图提供一个“可配置的参数集合”(config_schema 定义了这些参数的结构和类型)。

  • 这个配置会被传递到图中每个节点(node)和条件边(conditional edge)里去使用,节点函数可以根据配置调整它们的行为,比如选用不同的模型、调用不同的接口,或者调整某些处理逻辑。

  • 也就是说,图的逻辑是固定的(节点和边的关系),但具体“做什么”和“用哪个模型”是动态的,通过配置来驱动。

  • 这样做的好处是:

    • 你维护一个“认知架构”图
    • 不用改代码,就能运行不同版本的模型(比如切换 OpenAI GPT、Anthropic,或者不同的参数)
    • 支持多实例,便于测试和线上灰度切换

简化理解:

Configuration = 图的“参数开关”,用于灵活控制图内部节点的具体执行行为和调用的资源,而不是重新写图或节点代码。

举个例子:

  • 你写了一个聊天对话图,里面有个节点调用语言模型生成回答。
  • 你定义了一个 llm 配置字段,可以是 "openai""anthropic"
  • 调用图时传入不同配置,节点里根据配置动态选择不同模型服务接口。
  • 图结构没变,但行为根据配置变了。

这样你就可以用一套代码跑多个模型,或者快速切换提示词、参数等,提升灵活性和复用性。

递归限制

递归限制定义图在单次执行过程中能执行的最大超步数(super-step)。一旦达到限制,LangGraph 会抛出 GraphRecursionError。默认限制为25步。

递归限制可以在运行时对任何图进行设置,通过传递 config 字典中的 recursion_limit 参数给 .invoke.stream 方法实现。

重要的是,recursion_limit 是独立的配置项,不应放在 configurable 键里(configurable 用于用户定义的配置)。示例如下:

graph.invoke(inputs, config={"recursion_limit": 5, "configurable": {"llm": "anthropic"}})

你应该直接把 recursion_limit 放在传给 graph.invoke 的顶层 config 字典里,和 configurable 同级,而不是放到 configurable 里。

举个例子:

graph.invoke(inputs,config={"recursion_limit": 5,           # 这是独立配置项"configurable": {"llm": "anthropic"}  # 这是用户定义的配置}
)

换句话说:

  • configurable 这个字段专门用来放你自定义的配置参数,供节点使用。
  • recursion_limit 是 LangGraph 自身的控制参数,属于框架级的配置,独立存在。

所以它们在同一个字典里,但互相是平级关系,不嵌套。

LangGraph 运行时

Pregel 实现了 LangGraph 的运行时,负责管理 LangGraph 应用程序的执行过程。

编译一个 StateGraph 或创建一个入口点(entrypoint)会生成一个 Pregel 实例,可以通过输入数据调用它。

本指南从高层次介绍该运行时,并提供了直接使用 Pregel 实现应用程序的说明。

注意:Pregel 运行时的命名来源于 Google 的 Pregel 算法,该算法是一种用于大规模并行图计算的高效方法。

在 LangGraph 中,Pregel 将参与者(actors)和通道(channels)结合为一个应用程序整体。参与者从通道中读取数据并向通道写入数据。

Pregel 依据 Pregel 算法(也称 BSP - Bulk Synchronous Parallel)模式将应用程序的执行划分为多个步骤。

每个步骤包含三个阶段:

  1. 计划(Plan):
    决定在本步骤中要执行哪些参与者。
    例如,在第一个步骤中选择订阅了特殊输入通道的参与者;在后续步骤中选择那些订阅了上一步中更新过的通道的参与者。

  2. 执行(Execution):
    并行执行所有选中的参与者,直到全部完成,或某个失败,或达到超时。
    注意:在这个阶段中,参与者无法看到本步骤内的通道更新,直到下一个步骤开始。

  3. 更新(Update):
    用本步骤内参与者写入的值来更新通道。

上述过程将持续,直到没有参与者被选中执行,或者达到设定的最大步骤数。

参与者(Actors)

参与者在 Pregel 中被称为 PregelNode
它们订阅通道,从中读取数据,并将数据写入通道。
可以将它类比为 Pregel 算法中的“计算节点”。
每个 PregelNode 都实现了 LangChain 的 Runnable 接口。

通道(Channels)

通道用于在参与者之间(即 PregelNode)进行通信。
每个通道具有以下属性:

  • 值类型(value type)
  • 更新类型(update type)
  • 更新函数(update function):接收多个更新操作并修改存储值。

通道可以用于:

  • 在不同链之间传递数据;
  • 或在未来的步骤中将数据返回给自身。

LangGraph 提供了一些内置通道类型:

  • LastValue:
    默认通道,保存最近一次写入的值。适用于输入/输出,或将数据从某一步传递给下一步。

  • Topic:
    可配置的发布-订阅主题。适合多个值在参与者之间传递或累积输出。可配置去重或累加策略。

  • BinaryOperatorAggregate:
    持久存储值,使用二元运算符(如加法)逐步更新,适合跨步骤聚合数据。
    例如:

    total = BinaryOperatorAggregate(int, operator.add)
    

示例

虽然大多数用户会通过 StateGraph APIentrypoint 装饰器使用 Pregel,但你也可以直接操作 Pregel 实例。

以下是几个使用 Pregel API 的示例:

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuildernode1 = (NodeBuilder().subscribe_only("a").do(lambda x: x + x).write_to("b")
)app = Pregel(nodes={"node1": node1},channels={"a": EphemeralValue(str),"b": EphemeralValue(str),},input_channels=["a"],output_channels=["b"],
)app.invoke({"a": "foo"})

高层 API LangGraph 提供了两个高级 API 用于构建 Pregel 应用:

  • StateGraph(图形 API)
  • 函数式 API

StateGraph(图形 API):

StateGraph 是一个更高级别的抽象,简化了 Pregel 应用的创建过程。
它允许你定义节点和边组成的图结构。
当你编译图结构时,StateGraph API 会自动为你创建 Pregel 应用。

from typing import TypedDict, Optional
from langgraph.constants import START
from langgraph.graph import StateGraphclass Essay(TypedDict):topic: strcontent: Optional[str]score: Optional[float]def write_essay(essay: Essay):return {"content": f"Essay about {essay['topic']}",}def score_essay(essay: Essay):return {"score": 10}builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")# 编译图结构,返回一个 Pregel 实例
graph = builder.compile()

你可以通过打印以下内容查看生成的节点和通道:

print(graph.nodes)

输出类似:

{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>
}
print(graph.channels)

你将看到类似这样的内容(具体内容视编译结果而定)。

{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}

如何使用 Graph API

本指南展示了 LangGraph 的 Graph API 的基本用法,涵盖了状态的使用,以及如何构建常见的图结构,如顺序执行、分支和循环。还介绍了 LangGraph 的控制功能,包括用于 map-reduce 工作流的 Send API,以及将状态更新与节点跳转相结合的 Command API。

安装 langgraph:

%pip install -qU langgraph

定义与更新状态

下面介绍如何在 LangGraph 中定义和更新状态。

  • 如何使用状态来定义图的 schema(结构)
  • 如何使用 reducer 控制状态更新的处理方式
定义状态

LangGraph 中的状态可以是 TypedDictPydantic 模型或 dataclass。以下我们使用 TypedDict,如需了解使用 Pydantic,请参考相关章节。

默认情况下,图的输入和输出使用相同的 schema,状态决定了该 schema。如需定义不同的输入与输出 schema,也有相应的支持。

我们来看一个简单的消息处理示例。这是许多 LLM 应用中通用的状态表示方式。更多细节请见概念页。

from langchain_core.messages import AnyMessage
from typing_extensions import TypedDictclass State(TypedDict):messages: list[AnyMessage]extra_field: int

这个状态结构跟踪消息对象的列表,以及一个额外的整数字段。

更新状态

构建一个包含单节点的简单图。节点是一个 Python 函数,读取并更新图的状态。其第一个参数始终是状态。

from langchain_core.messages import AIMessagedef node(state: State):messages = state["messages"]new_message = AIMessage("Hello!")return {"messages": messages + [new_message], "extra_field": 10}

该节点向消息列表添加一条新消息,并设置额外字段。

⚠️ 注意:节点应直接返回更新后的状态,而不是原地修改状态。

接下来定义包含此节点的图:

from langgraph.graph import StateGraphbuilder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()

可以可视化图结构:

from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

执行一次调用:

from langchain_core.messages import HumanMessageresult = graph.invoke({"messages": [HumanMessage("Hi")]})
result

输出结果包含完整的状态:

{'messages': [HumanMessage(...), AIMessage(...)], 'extra_field': 10}

消息内容可通过 .pretty_print() 打印:

for message in result["messages"]:message.pretty_print()

使用 Reducer 处理状态更新

状态中的每个字段都可以定义自己的 reducer 函数,控制如何合并来自不同节点的更新。未指定 reducer 时,默认更新会覆盖原值。

TypedDict 类型的状态定义 reducer 示例:

from typing_extensions import Annotateddef add(left, right):return left + rightclass State(TypedDict):messages: Annotated[list[AnyMessage], add]extra_field: int

此时节点函数可简化为:

def node(state: State):new_message = AIMessage("Hello!")return {"messages": [new_message], "extra_field": 10}

编译并运行:

from langgraph.graph import STARTgraph = StateGraph(State).add_node(node).add_edge(START, "node").compile()
result = graph.invoke({"messages": [HumanMessage("Hi")]})
================================ Human Message =================================Hi
================================== Ai Message ==================================Hello!

在实际应用中,更新消息列表时需要额外考虑以下几点:

  • 我们可能希望更新状态中已存在的消息
  • 我们可能希望接受简化的消息格式输入,例如 OpenAI 的格式

为此,LangGraph 提供了一个内置的 reducer —— add_messages,可以处理上述情况。

示例:

from langgraph.graph.message import add_messagesclass State(TypedDict):messages: Annotated[list[AnyMessage], add_messages]extra_field: int

输入消息支持简写形式:

input_message = {"role": "user", "content": "Hi"}

此外,LangGraph 还提供内建的 MessagesState 类型,可用于简化定义:

from langgraph.graph import MessagesStateclass State(MessagesState):extra_field: int

定义输入与输出 Schema

可为图指定不同的输入与输出 schema。节点之间仍使用内部 schema 进行通信。

示例:

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDictclass InputState(TypedDict):question: strclass OutputState(TypedDict):answer: strclass OverallState(InputState, OutputState):passdef answer_node(state: InputState):return {"answer": "bye", "question": state["question"]}builder = StateGraph(OverallState, input=InputState, output=OutputState)
builder.add_node(answer_node)
builder.add_edge(START, "answer_node")
builder.add_edge("answer_node", END)
graph = builder.compile()print(graph.invoke({"question": "hi"}))  # {'answer': 'bye'}

在节点之间传递私有状态

在某些情况下,你可能希望节点之间交换一些对中间逻辑非常关键的信息,但这些信息并不需要成为图的主要状态结构(主 schema)的一部分。这种私有数据与图的整体输入/输出无关,只应在特定的节点之间共享。

下面我们将创建一个由三个节点(node_1node_2node_3)组成的**顺序图(sequential graph)**示例,其中私有数据在前两个节点(node_1node_2)之间传递,而第三个节点(node_3)只能访问公共的整体状态。

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict# The overall state of the graph (this is the public state shared across nodes)
class OverallState(TypedDict):a: str# Output from node_1 contains private data that is not part of the overall state
class Node1Output(TypedDict):private_data: str# The private data is only shared between node_1 and node_2
def node_1(state: OverallState) -> Node1Output:output = {"private_data": "set by node_1"}print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}")return output# Node 2 input only requests the private data available after node_1
class Node2Input(TypedDict):private_data: strdef node_2(state: Node2Input) -> OverallState:output = {"a": "set by node_2"}print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}")return output# Node 3 only has access to the overall state (no access to private data from node_1)
def node_3(state: OverallState) -> OverallState:output = {"a": "set by node_3"}print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}")return output# Connect nodes in a sequence
# node_2 accepts private data from node_1, whereas
# node_3 does not see the private data.
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()# Invoke the graph with the initial state
response = graph.invoke({"a": "set at start",}
)print()
print(f"Output of graph invocation: {response}")
Entered node `node_1`:Input: {'a': 'set at start'}.Returned: {'private_data': 'set by node_1'}
Entered node `node_2`:Input: {'private_data': 'set by node_1'}.Returned: {'a': 'set by node_2'}
Entered node `node_3`:Input: {'a': 'set by node_2'}.Returned: {'a': 'set by node_3'}Output of graph invocation: {'a': 'set by node_3'}

使用 Pydantic 模型作为状态

StateGraph 在初始化时接受一个 state_schema 参数,用于指定图中各个节点可以访问和更新的状态的“结构”(即状态的形状)。

在示例中,我们通常使用 Python 原生的 TypedDict 来作为 state_schema,但实际上 state_schema 可以是任意类型。

在这里,我们将看到如何使用 Pydantic 的 BaseModel 作为 state_schema,以便在运行时对输入进行校验

已知限制

  • 图的输出目前不会是 Pydantic 模型的实例
  • 运行时校验仅发生在节点的输入上,而不会校验节点的输出
  • Pydantic 报错信息中不会标明错误发生在哪个节点

示例:

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDictfrom pydantic import BaseModel# The overall state of the graph (this is the public state shared across nodes)
class OverallState(BaseModel):a: strdef node(state: OverallState):return {"a": "goodbye"}# Build the state graph
builder = StateGraph(OverallState)
builder.add_node(node)  # node_1 is the first node
builder.add_edge(START, "node")  # Start the graph with node_1
builder.add_edge("node", END)  # End the graph after node_1
graph = builder.compile()# Test the graph with a valid input
graph.invoke({"a": "hello"})
{'a': 'goodbye'}

如果输入非法(如 {"a": 123}),将抛出验证异常:

try:graph.invoke({"a": 123})  # Should be a string
except Exception as e:print("An exception was raised because `a` is an integer rather than a string.")print(e)
An exception was raised because `a` is an integer rather than a string.
1 validation error for OverallState
aInput should be a valid string [type=string_type, input_value=123, input_type=int]For further information visit https://errors.pydantic.dev/2.9/v/string_type

添加运行时配置

有时你希望在调用图时进行配置。例如,你可能希望在运行时指定使用哪个 LLM 或系统提示,而不希望用这些参数污染图的状态。

要添加运行时配置,请按以下步骤操作:

  1. 指定配置的 schema;
  2. 在节点或条件边的函数签名中添加配置;
  3. 将配置传入图中。
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict# 1. 指定配置 schema
class ConfigSchema(TypedDict):my_runtime_value: str# 2. 定义一个在节点中访问配置的图
class State(TypedDict):my_state_value: strdef node(state: State, config: RunnableConfig):if config["configurable"]["my_runtime_value"] == "a":return {"my_state_value": 1}elif config["configurable"]["my_runtime_value"] == "b":return {"my_state_value": 2}else:raise ValueError("未知的值。")builder = StateGraph(State, config_schema=ConfigSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)graph = builder.compile()# 3. 在运行时传入配置:
print(graph.invoke({}, {"configurable": {"my_runtime_value": "a"}}))
print(graph.invoke({}, {"configurable": {"my_runtime_value": "b"}}))# 输出:
# {'my_state_value': 1}
# {'my_state_value': 2}

以下是一个更复杂的示例,展示如何同时在运行时配置两个参数:使用的 LLM 模型和系统提示词(system message)。

from typing import Optional
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, MessagesState, StateGraph, START
from typing_extensions import TypedDict# 配置 schema
class ConfigSchema(TypedDict):model: Optional[str]system_message: Optional[str]MODELS = {"anthropic": init_chat_model("anthropic:claude-3-5-haiku-latest"),"openai": init_chat_model("openai:gpt-4.1-mini"),
}# 节点逻辑:支持系统提示词注入
def call_model(state: MessagesState, config: RunnableConfig):model = config["configurable"].get("model", "anthropic")model = MODELS[model]messages = state["messages"]if system_message := config["configurable"].get("system_message"):messages = [SystemMessage(system_message)] + messagesresponse = model.invoke(messages)return {"messages": [response]}# 构建图
builder = StateGraph(MessagesState, config_schema=ConfigSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)graph = builder.compile()

添加重试策略(Add retry policies)

有许多用例中,你可能希望你的节点拥有自定义的重试策略,比如调用 API、查询数据库或调用 LLM 等。LangGraph 允许你为节点添加重试策略。

要配置重试策略,将 retry 参数传递给 add_noderetry 参数接受一个 RetryPolicy 命名元组对象。下面示例中,我们用默认参数实例化一个 RetryPolicy 对象并将其关联到一个节点:

from langgraph.pregel import RetryPolicybuilder.add_node("node_name",node_function,retry=RetryPolicy(),
)

默认情况下,retry_on 参数使用 default_retry_on 函数,它会对除了以下异常之外的所有异常进行重试:

  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError

此外,对于来自流行的 HTTP 请求库如 requestshttpx 的异常,它仅在状态码为 5xx 时重试。

下面我们向节点传递了两种不同的重试策略:

import sqlite3
from typing_extensions import TypedDictfrom langchain.chat_models import init_chat_modelfrom langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessagedb = SQLDatabase.from_uri("sqlite:///:memory:")model = init_chat_model("anthropic:claude-3-5-haiku-latest")def query_database(state: MessagesState):query_result = db.run("SELECT * FROM Artist LIMIT 10;")return {"messages": [AIMessage(content=query_result)]}def call_model(state: MessagesState):response = model.invoke(state["messages"])return {"messages": [response]}# 定义一个新的图
builder = StateGraph(MessagesState)
builder.add_node("query_database",query_database,retry=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)graph = builder.compile()

添加节点缓存

节点缓存适用于你想避免重复执行某些操作的场景,比如执行耗时或高成本的操作。LangGraph 允许你为图中的节点添加个性化的缓存策略。

配置缓存策略时,将 cache_policy 参数传递给 add_node 函数。以下示例中,实例化了一个 CachePolicy 对象,设置生存时间(TTL)为 120 秒,并使用默认的 key_func 生成器。然后将该缓存策略与某个节点关联:

from langgraph.types import CachePolicybuilder.add_node("node_name",node_function,cache_policy=CachePolicy(ttl=120),
)

接着,为了启用图的节点级缓存,在编译图时设置 cache 参数。下面的例子使用 InMemoryCache 来搭建一个带有内存缓存的图,也可以使用 SqliteCache

from langgraph.cache.memory import InMemoryCachegraph = builder.compile(cache=InMemoryCache())

创建步骤序列

下面演示如何构建一个简单的步骤序列。内容包括:

  • 如何构建一个顺序图
  • 构建类似图的内置简写方法

我们使用图的 .add_node.add_edge 方法来添加一系列节点:

from langgraph.graph import START, StateGraphbuilder = StateGraph(State)# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

也可以使用内置简写 .add_sequence

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")

为什么用 LangGraph 将应用步骤拆分为序列?
举个端到端的例子,我们创建三个步骤的序列:

  1. 在状态的一个键中填充值
  2. 更新同一个值
  3. 填充不同的值

首先定义状态。状态决定了图的模式(schema),也可以指定如何应用更新。详见相关章节。

这里我们仅跟踪两个值:

from typing_extensions import TypedDictclass State(TypedDict):value_1: strvalue_2: int

节点即 Python 函数,读取图的状态并做更新。该函数的第一个参数始终是状态:

def step_1(state: State):return {"value_1": "a"}def step_2(state: State):current_value_1 = state["value_1"]return {"value_1": f"{current_value_1} b"}def step_3(state: State):return {"value_2": 10}

注意
在发出状态更新时,每个节点只需指定想更新的键对应的值。
默认行为是覆盖对应键的值。你也可以使用 reducer 控制更新的处理方式,比如将连续的更新追加到键上。详见相关章节。

最后,定义图。使用 StateGraph 创建基于此状态的图。

然后用 add_nodeadd_edge 填充图并定义控制流:

from langgraph.graph import START, StateGraphbuilder = StateGraph(State)# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")

指定自定义节点名称,可以通过 .add_node 指定节点自定义名称:

builder.add_node("my_node", step_1)

注意事项:

  • .add_edge 需要节点名称,函数默认名称为 node.__name__
  • 必须指定图的入口点,即从 START 节点连出一条边
  • 图在没有更多节点可执行时停止运行

接下来编译图。编译时会进行基本结构检查(如发现孤立节点)。如果程序有持久化需求,可以通过 checkpointer 传入。

graph = builder.compile()

LangGraph 提供了内置工具用于图形可视化。下面查看我们的序列,详见可视化指南:

from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))

简单调用示例:

graph.invoke({"value_1": "c"})

返回结果:

{'value_1': 'a b', 'value_2': 10}

说明:

  • 调用时传入了单个状态键的初始值,必须至少提供一个键的值
  • 第一个节点覆盖了传入的值
  • 第二个节点更新了该值
  • 第三个节点填充了不同的键值

langgraph>=0.2.46 包含内置简写 add_sequence 用于添加节点序列。可按如下方式编译相同的图:

builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")graph = builder.compile()graph.invoke({"value_1": "c"})

创建分支

节点的并行执行对于加快整体图的运行速度至关重要。LangGraph 原生支持节点的并行执行,这可以显著提升基于图的工作流性能。

这种并行化是通过扇出(fan-out)和扇入(fan-in)机制实现的,利用标准边(edges)和条件边(conditional_edges)。

以下是一些示例,展示如何添加并创建适合你的分支数据流。

在这里插入图片描述

并行运行图节点

在本示例中,我们从节点 A 扇出到节点 B 和 C,随后再扇入到节点 D。通过状态(state),我们指定了 reducer 的加法操作(add operation)。这会将特定键(key)对应的值进行合并或累积,而不是简单地覆盖原有值。对于列表类型来说,这意味着将新的列表与已有列表拼接。关于使用 reducer 更新状态的更多细节,请参见上文关于状态 reducer 的部分。

import operator
from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict):# operator.add 作为 reducer 函数,使其只能追加aggregate: Annotated[list, operator.add]def a(state: State):print(f'Adding "A" to {state["aggregate"]}')return {"aggregate": ["A"]}def b(state: State):print(f'Adding "B" to {state["aggregate"]}')return {"aggregate": ["B"]}def c(state: State):print(f'Adding "C" to {state["aggregate"]}')return {"aggregate": ["C"]}def d(state: State):print(f'Adding "D" to {state["aggregate"]}')return {"aggregate": ["D"]}builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

使用 reducer,可以看到每个节点中添加的值都会被累积。

graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})

执行结果:

Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']{'aggregate': ['A', 'B', 'C', 'D']}

注意
在上述示例中,节点 “b” 和 “c” 会在同一个超级步(superstep)中并发执行。由于它们处于同一步骤,节点 “d” 会在 “b” 和 “c” 都执行完毕后再执行。
需要注意的是,来自并行超级步的更新可能不会有一致的顺序。如果需要确定的顺序,应该将输出写入状态中的独立字段,并携带排序用的值。

LangGraph 在“超级步骤”(supersteps)中执行节点,这意味着虽然并行分支会同时执行,但整个超级步骤是事务性的。如果任何一个分支抛出异常,则该超级步骤内所有更新都不会应用到状态(整个超级步骤会报错)。

重要的是,当使用检查点(checkpointer)时,超级步骤内成功节点的结果会被保存,恢复时不会重复执行。

如果你的流程中有容易出错的部分(例如需要处理不稳定的 API 调用),LangGraph 提供了两种解决方案:

  • 你可以在节点中编写常规的 Python 代码来捕获和处理异常。
  • 你可以设置 retry_policy,让图在遇到特定类型异常时自动重试节点。只有失败的分支会被重试,不用担心重复工作。

这两种方式结合使用,可以让你实现并行执行的同时,完全掌控异常处理。

延迟节点执行

延迟节点执行(Deferring node execution)在你想要推迟某个节点的执行,直到所有其他待处理任务完成时非常有用。这在分支长度不一致的情况下尤为重要,比如在类似 Map-Reduce 的工作流中经常遇到。

上面的示例展示了当每条路径只有一步时如何实现扇出(fan-out)和扇入(fan-in)。但如果某个分支有多于一步呢?我们来给“b”分支添加一个名为“b_2”的节点:

import operator
from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict):aggregate: Annotated[list, operator.add]def a(state: State):print(f'Adding "A" to {state["aggregate"]}')return {"aggregate": ["A"]}def b(state: State):print(f'Adding "B" to {state["aggregate"]}')return {"aggregate": ["B"]}def b_2(state: State):print(f'Adding "B_2" to {state["aggregate"]}')return {"aggregate": ["B_2"]}def c(state: State):print(f'Adding "C" to {state["aggregate"]}')return {"aggregate": ["C"]}def d(state: State):print(f'Adding "D" to {state["aggregate"]}')return {"aggregate": ["D"]}builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)  # 延迟执行
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))graph.invoke({"aggregate": []})

在这里插入图片描述

执行结果:

Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']{'aggregate': ['A', 'B', 'C', 'B_2', 'D']}

在此示例中,节点 “b” 和 “c” 仍然在同一个超级步并发执行。我们对节点 d 设置了 defer=True,表示它会等到所有待执行任务完成后才执行。这里的意思是,节点 d 会等待整个 “b” 分支完成后才执行。

条件分支

如果你希望扇出路径在运行时基于状态动态变化,可以使用 add_conditional_edges,根据图状态选择一条或多条路径。下面示例中,节点 a 生成一个状态更新,决定后续执行哪个节点。

import operator
from typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict):aggregate: Annotated[list, operator.add]which: str  # 用于决定分支走向的状态字段def a(state: State):print(f'Adding "A" to {state["aggregate"]}')return {"aggregate": ["A"], "which": "c"}def b(state: State):print(f'Adding "B" to {state["aggregate"]}')return {"aggregate": ["B"]}def c(state: State):print(f'Adding "C" to {state["aggregate"]}')return {"aggregate": ["C"]}builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)def conditional_edge(state: State) -> Literal["b", "c"]:# 这里根据状态决定下一节点return state["which"]builder.add_conditional_edges("a", conditional_edge)graph = builder.compile()from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))result = graph.invoke({"aggregate": []})
print(result)

在这里插入图片描述

输出:

Adding "A" to []
Adding "C" to ['A']
{'aggregate': ['A', 'C'], 'which': 'c'}

提示
你的条件边可以路由到多个目标节点,例如:

def route_bc_or_cd(state: State) -> list[str]:if state["which"] == "cd":return ["c", "d"]return ["b", "c"]
Map-Reduce 与 Send API

默认情况下,节点(Nodes)和边(Edges)是在运行前预先定义好的,并且它们操作的是同一个共享状态(shared state)。然而,有些情况下,具体的边在运行前可能未知,或者你可能希望同时存在多个不同版本的状态。这种情况在 Map-Reduce 设计模式中很常见。在这种设计模式中,第一个节点可能会生成一个对象列表,你希望对这些对象分别应用另一个节点。对象的数量事先未知(意味着边的数量也未知),且下游节点接收到的输入状态应该是不同的——每个生成的对象对应一个输入状态。

为了支持这种设计模式,LangGraph 允许在条件边(conditional edges)中返回 Send 对象。Send 接受两个参数:第一个是节点的名称,第二个是传递给该节点的状态。

def continue_to_jokes(state: OverallState):return [Send("generate_joke", {"subject": s}) for s in state['subjects']]graph.add_conditional_edges("node_a", continue_to_jokes)

下面我们实现一个简单示例,模拟使用大语言模型(LLMs)来完成以下步骤:(1)生成一个主题列表(其长度事先未知),(2)并行生成笑话,和(3)选择“最佳”笑话。值得注意的是,扇出节点的输入状态不同于整个图的总体状态。

在这里插入图片描述

import operator
from typing import Annotated
from typing_extensions import TypedDictfrom langgraph.types import Send
from langgraph.graph import END, StateGraph, STARTclass OverallState(TypedDict):topic: strsubjects: listjokes: Annotated[list, operator.add]  # 累积所有生成的笑话best_selected_joke: strclass JokeState(TypedDict):subject: strdef generate_topics(state: OverallState):# 模拟 LLM 生成主题列表return {"subjects": ["lions", "elephants", "penguins"]}def generate_joke(state: JokeState):# 模拟 LLM 生成笑话joke_map = {"lions": "Why don't lions like fast food? Because they can't catch it!","elephants": "Why don't elephants use computers? They're afraid of the mouse!","penguins": "Why don’t penguins like talking to strangers at parties? Because they find it hard to break the ice.",}return {"jokes": [joke_map[state["subject"]]]}def continue_to_jokes(state: OverallState):# 为每个主题返回一个 Send 对象,指向 generate_joke 节点,传递对应状态return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]def best_joke(state: OverallState):return {"best_selected_joke": "penguins"}builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
graph = builder.compile()from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))for step in graph.stream({"topic": "animals"}):print(step)

在这里插入图片描述

输出示例:

{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don’t penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}
创建和控制循环

当创建带有循环的图时,我们需要一种机制来终止执行。最常见的做法是添加一个条件边(conditional edge),当满足某个终止条件时,跳转到 END 节点。

调用或流式执行图时,你还可以设置图的递归限制(recursion limit)。递归限制规定图允许执行的超级步骤(supersteps)数量,超过后会抛出错误。关于递归限制的概念,可以参考相关文档。

让我们通过一个简单的带循环的图来更好地理解这些机制。

创建循环时,可以包含一个条件边来指定终止条件:

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)def route(state: State) -> Literal["b", END]:if termination_condition(state):return ENDelse:return "b"builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

控制递归限制时,可以在配置中指定 "recursion_limit"。当超出限制时,会抛出 GraphRecursionError,你可以捕获并处理它:

from langgraph.errors import GraphRecursionErrortry:graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:print("Recursion Error")

下面定义一个带简单循环的图。注意这里使用条件边实现了终止条件。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, ENDclass State(TypedDict):# operator.add 作为 reducer 函数,使其只追加不删除aggregate: Annotated[list, operator.add]def a(state: State):print(f'Node A sees {state["aggregate"]}')return {"aggregate": ["A"]}def b(state: State):print(f'Node B sees {state["aggregate"]}')return {"aggregate": ["B"]}# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)# 定义边
def route(state: State) -> Literal["b", END]:if len(state["aggregate"]) < 7:return "b"else:return ENDbuilder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

该架构类似于 ReAct agent,其中节点 “a” 是调用工具的模型,节点 “b” 代表具体的工具。

在条件边 route 中,我们指定当状态中 “aggregate” 列表长度超过阈值时结束循环。

调用图时,可以看到节点 “a” 和 “b” 交替执行,直到达到终止条件才停止。

graph.invoke({"aggregate": []})

输出示例:

Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']{'aggregate': ['A', 'B', 'A', 'B', 'A', 'B', 'A']}
设置递归限制

在某些应用中,无法保证一定会达到终止条件。这时可以设置图的递归限制。超过限制后会抛出 GraphRecursionError,你可以捕获并处理该异常:

from langgraph.errors import GraphRecursionErrortry:graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:print("Recursion Error")

输出示例:

Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Recursion Error

注意这次执行在第四步时终止。默认递归限制为 25。

其次,我们可以不抛出 GraphRecursionError,而是在状态中引入一个新键,用于跟踪距离递归限制还有多少步。然后可以用这个键来判断是否应该结束运行。LangGraph 实现了一个特殊的注解 RemainingSteps。在底层,它会创建一个 ManagedValue 通道——一个仅在图运行期间存在的状态通道。

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingStepsclass State(TypedDict):# operator.add 作为 reducer 函数,使其只追加不删除aggregate: Annotated[list, operator.add]remaining_steps: RemainingStepsdef a(state: State):print(f'Node A sees {state["aggregate"]}')return {"aggregate": ["A"]}def b(state: State):print(f'Node B sees {state["aggregate"]}')return {"aggregate": ["B"]}# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)# 定义边
def route(state: State) -> Literal["b", END]:if state["remaining_steps"] <= 2:return ENDelse:return "b"builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()# 测试运行
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)

运行输出:

Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
{'aggregate': ['A', 'B', 'A']}

为了更好地理解递归限制的工作原理,我们来看一个更复杂的例子。下面实现了一个循环,但其中一步分支成了两个节点:

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, ENDclass State(TypedDict):aggregate: Annotated[list, operator.add]def a(state: State):print(f'Node A sees {state["aggregate"]}')return {"aggregate": ["A"]}def b(state: State):print(f'Node B sees {state["aggregate"]}')return {"aggregate": ["B"]}def c(state: State):print(f'Node C sees {state["aggregate"]}')return {"aggregate": ["C"]}def d(state: State):print(f'Node D sees {state["aggregate"]}')return {"aggregate": ["D"]}# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)# 定义边
def route(state: State) -> Literal["b", END]:if len(state["aggregate"]) < 7:return "b"else:return ENDbuilder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

这个图看起来复杂,但可以理解为一个超级步骤的循环:

  1. 节点 A
  2. 节点 B
  3. 节点 C 和 D(并发执行)
  4. 节点 A
  5. ……

我们有一个由四个超级步骤组成的循环,其中节点 C 和 D 并行执行。像之前一样调用图,我们会看到在达到终止条件前,完成了两个完整的“圈”:

result = graph.invoke({"aggregate": []})Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']

但是如果我们将递归限制设置为 4,则只完成一圈,因为每圈包含四个超级步骤:

from langgraph.errors import GraphRecursionErrortry:result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:print("Recursion Error")Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

异步(Async)

使用异步编程范式在并发运行 IO 密集型代码时(例如,对聊天模型提供者发起并发 API 请求),可以显著提升性能。

要将同步实现的图转换为异步实现,你需要:

  • 将节点函数从 def 改为 async def
  • 在函数内部适当使用 await
  • 根据需求,使用 .ainvoke.astream 调用图。

由于许多 LangChain 对象实现了 Runnable 协议,且所有同步方法都有对应的异步版本,通常将同步图升级为异步图相对简单快捷。

以下示例演示了如何调用底层大语言模型(LLM)的异步接口,我们将以聊天模型为例:

安装命令:

pip install -U "langchain[openai]"

示例代码:

import os
from langchain.chat_models import init_chat_modelos.environ["OPENAI_API_KEY"] = "sk-..."llm = init_chat_model("openai:gpt-4.1")from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraphasync def node(state: MessagesState): new_message = await llm.ainvoke(state["messages"]) return {"messages": [new_message]}builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]})

结合控制流和状态更新 —— 使用 Command

有时需要在同一个节点中既执行状态更新(state updates),又决定下一个要去的节点(控制流,edges)。LangGraph 提供了通过节点函数返回一个 Command 对象的方式实现这一点:

def my_node(state: State) -> Command[Literal["my_other_node"]]:return Command(# 状态更新update={"foo": "bar"},# 控制流goto="my_other_node")

下面给出一个端到端示例。我们创建一个包含 3 个节点(A、B、C)的简单图。先执行节点 A,然后根据节点 A 的输出决定下一步去节点 B 还是节点 C。

import random
from typing_extensions import TypedDict, Literalfrom langgraph.graph import StateGraph, START
from langgraph.types import Command# 定义图的状态
class State(TypedDict):foo: str# 定义节点
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:print("Called A")value = random.choice(["a", "b"])if value == "a":goto = "node_b"else:goto = "node_c"# Command 允许同时更新状态并指定下一跳return Command(update={"foo": value},goto=goto,)def node_b(state: State):print("Called B")return {"foo": state["foo"] + "b"}def node_c(state: State):print("Called C")return {"foo": state["foo"] + "c"}builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# 注意:A、B、C 之间没有边,因为路由逻辑在 node_a 里通过 Command 定义了graph = builder.compile()

重要说明
我们在 node_a 的返回类型中使用了 Command[Literal["node_b", "node_c"]],这对图的渲染和 LangGraph 知道 node_a 可以跳转到 node_b 或 node_c 很重要。

运行多次图时,会看到根据 node_a 中的随机选择,路径会不同(A -> B 或 A -> C):

在这里插入图片描述

graph.invoke({"foo": ""})Called A
Called C{'foo': 'bc'}
在父图中导航到某个节点

如果你使用子图(subgraphs),可能会希望从子图中的某个节点跳转到另一个子图(即父图中的另一个节点)。要实现这一点,可以在 Command 中指定 graph=Command.PARENT

def my_node(state: State) -> Command[Literal["my_other_node"]]:return Command(update={"foo": "bar"},goto="other_subgraph",  # 这里的 other_subgraph 是父图中的节点名graph=Command.PARENT)

下面用之前的例子演示这个用法。我们将例子中的 node_a 改成一个单节点子图,然后将这个子图作为父图的一个节点。

使用 Command.PARENT 进行状态更新:

当你从子图的节点向父图的节点发送更新时,如果更新的 key 在父图和子图的状态结构中共有,你必须在父图的状态定义中为该 key 定义一个reducer。

import operator
from typing_extensions import Annotatedclass State(TypedDict):# 这里定义了一个 reducerfoo: Annotated[str, operator.add]def node_a(state: State):print("Called A")value = random.choice(["a", "b"])# 这替代了条件边缘函数if value == "a":goto = "node_b"else:goto = "node_c"# Command 同时实现状态更新和跳转下一节点return Command(update={"foo": value},goto=goto,# 告诉 LangGraph 跳转到父图中的 node_b 或 node_c# 注意:这里跳转的是距离子图最近的父图graph=Command.PARENT,)subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()def node_b(state: State):print("Called B")# 由于定义了 reducer,无需手动拼接,operator.add 会自动追加return {"foo": "b"}def node_c(state: State):print("Called C")return {"foo": "c"}builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)graph = builder.compile()graph.invoke({"foo": ""})Called A
Called C{'foo': 'bc'}

这个例子中,状态 foo 在子图和父图都存在,并通过 operator.add 作为 reducer 自动拼接更新的值。
通过 graph=Command.PARENT,子图的节点可以控制跳转和更新父图中的节点状态。

使用工具内部更新状态

一个常见的用例是在工具内部更新图的状态。例如,在客户支持应用中,你可能希望在对话开始时,根据客户的账号或 ID 查询客户信息。要从工具中更新图状态,可以让工具返回 Command(update={"my_custom_key": "foo", "messages": [...]}),例如:

@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):"""用来查询用户信息,以更好地帮助回答用户的问题。"""user_info = get_user_info(config.get("configurable", {}).get("user_id"))return Command(update={# 更新状态中的键值"user_info": user_info,# 更新消息历史"messages": [ToolMessage("成功查询到用户信息", tool_call_id=tool_call_id)]})

当从工具返回 Command 时,必须在 Command.update 中包含 messages(或任何用于消息历史的状态键),并且 messages 列表中必须包含至少一个 ToolMessage 这是为了保证生成的消息历史有效(因为大多数大型语言模型提供商要求 AI 消息与工具调用必须紧接着工具结果消息)。

如果你使用的工具通过 Command 更新状态,建议使用预构建的 ToolNode,它会自动处理工具返回的 Command 对象并传播到图的状态中。若你写自定义节点调用工具,则需要手动将工具返回的 Command 对象作为该节点的状态更新进行传播。

可视化你的图

你可以可视化任意图结构,包括 StateGraph。我们来玩点有趣的,画分形图 😃.

import random
from typing import Annotated, Literal
from typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messagesclass State(TypedDict):messages: Annotated[list, add_messages]class MyNode:def __init__(self, name: str):self.name = namedef __call__(self, state: State):return {"messages": [("assistant", f"Called node {self.name}")]}def route(state) -> Literal["entry_node", "__end__"]:if len(state["messages"]) > 10:return "__end__"return "entry_node"def add_fractal_nodes(builder, current_node, level, max_level):if level > max_level:return# 本级生成的节点数量num_nodes = random.randint(1, 3)  # 可调节随机数范围for i in range(num_nodes):nm = ["A", "B", "C"][i]node_name = f"node_{current_node}_{nm}"builder.add_node(node_name, MyNode(node_name))builder.add_edge(current_node, node_name)# 递归添加更多节点r = random.random()if r > 0.2 and level + 1 < max_level:add_fractal_nodes(builder, node_name, level + 1, max_level)elif r > 0.05:builder.add_conditional_edges(node_name, route, node_name)else:# 结束节点builder.add_edge(node_name, "__end__")def build_fractal_graph(max_level: int):builder = StateGraph(State)entry_point = "entry_node"builder.add_node(entry_point, MyNode(entry_point))builder.add_edge(START, entry_point)add_fractal_nodes(builder, entry_point, 1, max_level)# 可选:如果需要,设置结束点builder.add_edge(entry_point, END)  # 或者任意特定节点return builder.compile()app = build_fractal_graph(3)
Mermaid

我们还可以把图转换成 Mermaid 语法。

print(app.get_graph().draw_mermaid())

示例 Mermaid 输出:

__start__

entry_node
node_entry_node_A
node_entry_node_B
node_node_entry_node_B_A
node_node_entry_node_B_B
node_node_entry_node_B_C

__end__

PNG 格式

如果你想要以 PNG 格式渲染图,有三种方案:

  • 使用 Mermaid.ink API(无需额外包)
  • 使用 Mermaid + Pyppeteer(需要安装 pyppeteer
  • 使用 Graphviz(需要安装 graphviz

使用 Mermaid.Ink

默认情况下,draw_mermaid_png() 使用 Mermaid.Ink API 生成图像。

from IPython.display import Image, displaydisplay(Image(app.get_graph().draw_mermaid_png()))

使用 Mermaid + Pyppeteer

pip install --quiet pyppeteer
pip install --quiet nest_asyncio
import nest_asyncio
nest_asyncio.apply()  # Jupyter Notebook 里运行异步函数必需display(Image(app.get_graph().draw_mermaid_png(curve_style=CurveStyle.LINEAR,node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),wrap_label_n_words=9,output_file_path=None,draw_method=MermaidDrawMethod.PYPPETEER,background_color="white",padding=10,))
)

在这里插入图片描述

使用 Graphviz

pip install pygraphviz
try:display(Image(app.get_graph().draw_png()))
except ImportError:print("你可能需要安装 pygraphviz 的依赖,详情见 https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt")

在这里插入图片描述

流式传输(Streaming)

LangGraph 实现了一个流式系统,用于展示实时更新,从而提供响应迅速且透明的用户体验。

LangGraph 的流式系统可以将图执行过程中的实时反馈展示在你的应用中。你可以流式传输以下三类主要数据:

  • 工作流进度 — 每个图节点执行后获取状态更新。
  • LLM 生成的 Token — 随着语言模型生成内容实时流式传输 tokens。
  • 自定义更新 — 发出用户定义的信号(例如:“已获取 10/100 条记录”)。

LangGraph 流式传输的能力

  • 流式传输 LLM Tokens — 可在任意位置捕获 Token 流,包括节点内部、子图或工具中。

  • 从工具中发送进度通知 — 可直接从工具函数发送自定义更新或进度信号。

  • 支持子图中的流式传输 — 可同时包含父图和嵌套子图的输出。

  • 兼容任意 LLM — 即使不是 LangChain 模型,也可以通过自定义流式模式流式传输 tokens。

  • 支持多种流式模式 — 可选择以下几种模式:

    • values(完整状态)
    • updates(状态增量)
    • messages(LLM tokens + 元数据)
    • custom(任意用户数据)
    • debug(详细调试信息)

流式输出API(Stream outputs)

LangGraph 图提供了 .stream()(同步)和 .astream()(异步)方法,用于以迭代器方式输出流式数据。

基本使用示例:

  • 异步用法(Async)
async for chunk in graph.astream(inputs, stream_mode="updates"):print(chunk)
  • 同步用法(Async)
for chunk in graph.stream(inputs, stream_mode="updates"):print(chunk)

扩展示例:流式更新(Streaming Updates)

from typing import TypedDict
from langgraph.graph import StateGraph, START, ENDclass State(TypedDict):topic: strjoke: strdef refine_topic(state: State):return {"topic": state["topic"] + " and cats"}def generate_joke(state: State):return {"joke": f"This is a joke about {state['topic']}"}graph = (StateGraph(State).add_node(refine_topic).add_node(generate_joke).add_edge(START, "refine_topic").add_edge("refine_topic", "generate_joke").add_edge("generate_joke", END).compile()
)for chunk in graph.stream( {"topic": "ice cream"},stream_mode="updates", 
):print(chunk)

支持的流式模式(Supported stream modes)

模式(Mode)描述(Description)
values在图的每一步之后,流式传输完整状态值。
updates在图的每一步之后,流式传输状态的增量更新。如果同一步中有多个更新(例如多个节点执行),这些更新会分别流式传输。
custom从图节点内部流式传输自定义数据。
messages从调用 LLM 的图节点中,流式传输 (LLM token, 元数据) 的二元组。
debug在图执行期间尽可能多地流式传输调试信息。

你可以将多个模式作为列表传入 stream_mode 参数,从而同时流式传输多种类型的数据。

返回的数据将是 (mode, chunk) 的元组,其中 mode 表示当前流式模式的名称,chunk 是该模式输出的数据。

异步用法示例(Async):

async for mode, chunk in graph.astream(inputs, stream_mode=["updates", "custom"]):print(chunk)

同步用法示例(Async):

for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):print(chunk)

流式传输图状态(Stream graph state)

使用 updatesvalues 两种流式模式可以在图执行过程中实时传输其状态。

  • updates:在图每一步之后流式传输状态的 增量更新
  • values:在图每一步之后流式传输状态的 完整值
from typing import TypedDict
from langgraph.graph import StateGraph, START, ENDclass State(TypedDict):topic: strjoke: strdef refine_topic(state: State):return {"topic": state["topic"] + " and cats"}def generate_joke(state: State):return {"joke": f"This is a joke about {state['topic']}"}graph = (StateGraph(State).add_node(refine_topic).add_node(generate_joke).add_edge(START, "refine_topic").add_edge("refine_topic", "generate_joke").add_edge("generate_joke", END).compile()
)

使用 updates 模式,仅流式传输每个节点执行后返回的状态更新。输出内容包括节点名称及其对应的更新值。

for chunk in graph.stream({"topic": "ice cream"},stream_mode="updates",
):print(chunk)

使用values模式可在每一步执行后流式传输图的完整状态。

for chunk in graph.stream({"topic": "ice cream"},stream_mode="values",
):print(chunk)

子图支持(Subgraphs)

若要在流式输出中包含 子图的输出,可在父图的 .stream() 方法中设置 subgraphs=True
这将同时流式传输父图与任意子图的输出。

输出格式为 (namespace, data) 的元组,其中 namespace 是调用子图的节点路径,例如:

("parent_node:<task_id>", "child_node:<task_id>")
for chunk in graph.stream({"foo": "foo"},subgraphs=True, stream_mode="updates",
):print(chunk)

扩展示例:来自子图的流式传输

from langgraph.graph import START, StateGraph
from typing import TypedDict# Define subgraph
class SubgraphState(TypedDict):foo: str  # note that this key is shared with the parent graph statebar: strdef subgraph_node_1(state: SubgraphState):return {"bar": "bar"}def subgraph_node_2(state: SubgraphState):return {"foo": state["foo"] + state["bar"]}subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()# Define parent graph
class ParentState(TypedDict):foo: strdef node_1(state: ParentState):return {"foo": "hi! " + state["foo"]}builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()for chunk in graph.stream({"foo": "foo"},stream_mode="updates",subgraphs=True, 
):print(chunk)

调试模式(Debugging)

使用 debug 流式模式可在图执行过程中尽可能多地输出信息。
输出内容包括节点名称和完整的状态信息。

for chunk in graph.stream({"topic": "ice cream"},stream_mode="debug",
):print(chunk)

LLM Tokens(语言模型 Token)

使用 messages 流式模式,可以从图中的任意部分(包括节点、工具、子图或任务)逐个 token 地流式传输大型语言模型(LLM)的输出

messages 模式下,流式输出是一个二元组 (message_chunk, metadata),其中:

  • message_chunk:LLM 返回的 token 或消息片段。
  • metadata:包含图节点与 LLM 调用相关细节的字典。

如果你使用的 LLM 并非通过 LangChain 集成提供,可改用 custom 模式进行流式传输。详见「适用于任意 LLM」。

from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START@dataclass
class MyState:topic: strjoke: str = ""llm = init_chat_model(model="openai:gpt-4o-mini")def call_model(state: MyState):"""调用 LLM 生成关于话题的笑话"""llm_response = llm.invoke( [{"role": "user", "content": f"Generate a joke about {state.topic}"}])return {"joke": llm_response.content}graph = (StateGraph(MyState).add_node(call_model).add_edge(START, "call_model").compile()
)for message_chunk, metadata in graph.stream( {"topic": "ice cream"},stream_mode="messages",
):if message_chunk.content:print(message_chunk.content, end="|", flush=True)

按 LLM 调用标签过滤(Filter by LLM invocation)

你可以为 LLM 调用设置标签(tags),从而在流式传输中根据标签筛选特定 LLM 的 tokens。

from langchain.chat_models import init_chat_modelllm_1 = init_chat_model(model="openai:gpt-4o-mini", tags=['joke']) 
llm_2 = init_chat_model(model="openai:gpt-4o-mini", tags=['poem']) graph = ... # 定义使用这两个 LLM 的图async for msg, metadata in graph.astream(  {"topic": "cats"},stream_mode="messages",
):if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True)

扩展示例:按标签进行过滤

from typing import TypedDictfrom langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraphjoke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) 
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) class State(TypedDict):topic: strjoke: strpoem: strasync def call_model(state, config):topic = state["topic"]print("Writing joke...")# Note: Passing the config through explicitly is required for python < 3.11# Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasksjoke_response = await joke_model.ainvoke([{"role": "user", "content": f"Write a joke about {topic}"}],config, )print("\n\nWriting poem...")poem_response = await poem_model.ainvoke([{"role": "user", "content": f"Write a short poem about {topic}"}],config, )return {"joke": joke_response.content, "poem": poem_response.content}graph = (StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
)async for msg, metadata in graph.astream({"topic": "cats"},stream_mode="messages", 
):if metadata["tags"] == ["joke"]: print(msg.content, end="|", flush=True)

按节点过滤(Filter by node)

若只希望从特定节点流式传输 tokens,可使用 stream_mode="messages",并通过流式元数据中的 langgraph_node 字段进行筛选:

for msg, metadata in graph.stream( inputs,stream_mode="messages",
):if msg.content and metadata["langgraph_node"] == "some_node_name": ...

扩展示例:从特定节点流式传输 LLM tokens

from typing import TypedDict
from langgraph.graph import START, StateGraph 
from langchain_openai import ChatOpenAImodel = ChatOpenAI(model="gpt-4o-mini")class State(TypedDict):topic: strjoke: strpoem: strdef write_joke(state: State):topic = state["topic"]joke_response = model.invoke([{"role": "user", "content": f"Write a joke about {topic}"}])return {"joke": joke_response.content}def write_poem(state: State):topic = state["topic"]poem_response = model.invoke([{"role": "user", "content": f"Write a short poem about {topic}"}])return {"poem": poem_response.content}graph = (StateGraph(State).add_node(write_joke).add_node(write_poem)# write both the joke and the poem concurrently.add_edge(START, "write_joke").add_edge(START, "write_poem").compile()
)for msg, metadata in graph.stream( {"topic": "cats"},stream_mode="messages",
):if msg.content and metadata["langgraph_node"] == "write_poem": print(msg.content, end="|", flush=True)

流式传输自定义数据(Stream custom data)

要从 LangGraph 的节点或工具内部发送自定义用户定义的数据,请按照以下步骤操作:

  1. 使用 get_stream_writer() 获取流写入器,然后通过它发送自定义数据。
  2. 在调用 .stream().astream() 时,将 stream_mode 设置为 "custom",以便在流中获取自定义数据。你也可以同时组合多种模式(例如 ["updates", "custom"]),但必须至少包含 "custom"

节点:

from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, STARTclass State(TypedDict):query: stranswer: strdef node(state: State):writer = get_stream_writer()writer({"custom_key": "Generating custom data inside node"})return {"answer": "some data"}graph = (StateGraph(State).add_node(node).add_edge(START, "node").compile()
)inputs = {"query": "example"}# 使用示例
for chunk in graph.stream(inputs, stream_mode="custom"):print(chunk)

工具:

from langchain_core.tools import tool
from langgraph.config import get_stream_writer@tool
def query_database(query: str) -> str:"""Query the database."""writer = get_stream_writer() writer({"data": "Retrieved 0/100 records", "type": "progress"}) # perform querywriter({"data": "Retrieved 100/100 records", "type": "progress"}) return "some-answer" graph = ... # define a graph that uses this toolfor chunk in graph.stream(inputs, stream_mode="custom"): print(chunk)

使用任意 LLM(Use with any LLM)

你可以使用 stream_mode="custom" 来从任何 LLM API 流式传输数据——即使该 API 并未实现 LangChain 聊天模型接口。

这让你可以集成原始的 LLM 客户端或提供自有流式接口的外部服务,使 LangGraph 在自定义环境下非常灵活。

from langgraph.config import get_stream_writerdef call_arbitrary_model(state):"""示例节点,调用任意模型并流式输出结果"""writer = get_stream_writer()# 假设你有一个流式客户端会持续产出数据块for chunk in your_custom_streaming_client(state["topic"]):writer({"custom_llm_chunk": chunk})return {"result": "completed"}graph = (StateGraph(State).add_node(call_arbitrary_model)# 根据需要添加其他节点和边.compile()
)for chunk in graph.stream({"topic": "cats"},stream_mode="custom",
):# chunk 包含了从 LLM 流式传输过来的自定义数据print(chunk)

扩展示例:流式传输任意聊天模型

import operator
import jsonfrom typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAIopenai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"async def stream_tokens(model_name: str, messages: list[dict]):response = await openai_client.chat.completions.create(messages=messages, model=model_name, stream=True)role = Noneasync for chunk in response:delta = chunk.choices[0].deltaif delta.role is not None:role = delta.roleif delta.content:yield {"role": role, "content": delta.content}# 这是我们的工具函数
async def get_items(place: str) -> str:"""此工具用于列出你被问及地点中可能存在的物品"""writer = get_stream_writer()response = ""async for msg_chunk in stream_tokens(model_name,[{"role": "user","content": (f"你能告诉我在以下地点“{place}”可能找到哪些物品吗?""请至少列出三个,用逗号分隔,并附上简短描述。"),}],):response += msg_chunk["content"]writer(msg_chunk)return responseclass State(TypedDict):messages: Annotated[list[dict], operator.add]# 这是调用工具的图节点
async def call_tool(state: State):ai_message = state["messages"][-1]tool_call = ai_message["tool_calls"][-1]function_name = tool_call["function"]["name"]if function_name != "get_items":raise ValueError(f"不支持的工具 {function_name}")function_arguments = tool_call["function"]["arguments"]arguments = json.loads(function_arguments)function_response = await get_items(**arguments)tool_message = {"tool_call_id": tool_call["id"],"role": "tool","name": function_name,"content": function_response,}return {"messages": [tool_message]}graph = (StateGraph(State).add_node(call_tool).add_edge(START, "call_tool").compile()
)# 使用一个包含工具调用的 AI 消息调用图:
inputs = {"messages": [{"content": None,"role": "assistant","tool_calls": [{"id": "1","function": {"arguments": '{"place":"bedroom"}',"name": "get_items",},"type": "function",}],}]
}async for chunk in graph.astream(inputs,stream_mode="custom",
):print(chunk["content"], end="|", flush=True)

针对特定聊天模型禁用流式传输

如果你的应用同时使用支持流式传输和不支持流式传输的模型,可能需要显式地为不支持流式的模型禁用流式功能。

在初始化模型时,设置 disable_streaming=True 即可。

from langchain.chat_models import init_chat_modelmodel = init_chat_model("anthropic:claude-3-7-sonnet-latest",disable_streaming=True
)

chat模型接口下:

from langchain_openai import ChatOpenAIllm = ChatOpenAI(model="o1-preview", disable_streaming=True)

持久化

概念

LangGraph 内置了一个持久化层,通过检查点(checkpointers)来实现。当你使用检查点器编译图时,检查点器会在每个超步(super-step)保存图状态的检查点。这些检查点会保存到线程(thread)中,执行图后可以访问该线程。由于线程允许在执行后访问图的状态,因此实现了许多强大功能,包括人机交互(human-in-the-loop)、记忆、时间旅行和容错能力。

在这里插入图片描述

LangGraph API 会自动处理检查点:使用 LangGraph API 时,你无需手动实现或配置检查点器。API
会在后台自动处理所有持久化基础设施。

线程

线程是指由检查点器为每个保存的检查点分配的唯一 ID 或线程标识符。当使用检查点器调用图时,必须在配置的可配置部分中指定一个 thread_id

{"configurable": {"thread_id": "1"}}

在 asyncio 这种单线程异步环境下,不会像多线程那样有多个线程ID,所以 thread_id 作为键(key)时,更多是用来标识一个“会话”或“执行上下文”的唯一标识符,而不一定是操作系统意义上的线程ID。

换句话说,thread_id 更像是一个逻辑上的“线程”或者“任务”标识符,用来区分不同的持久化检查点序列。它的值可以由你自由指定,只要能唯一标识该执行流程即可,不一定非要是真实的线程ID。

如果你在同一个异步流程里,只用一个 thread_id 就足够了;如果有多个并发任务需要独立持久化,可以给它们不同的 thread_id

检查点

检查点是图状态在每个超级步骤保存的快照,由 StateSnapshot 对象表示,包含以下关键属性:

  • config:与该检查点关联的配置。
  • metadata:与该检查点关联的元数据。
  • values:此时刻状态通道的值。
  • next:一个元组,包含图中接下来要执行的节点名称。
  • tasks:一个 PregelTask 对象元组,包含接下来要执行任务的信息。如果该步骤之前尝试过,会包括错误信息。如果图在某个节点内部被动态中断,tasks 会包含与中断相关的额外数据。

下面展示当调用一个简单图时,会保存哪些检查点:

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from operator import addclass State(TypedDict):foo: strbar: Annotated[list[str], add]def node_a(state: State):return {"foo": "a", "bar": ["a"]}def node_b(state: State):return {"foo": "b", "bar": ["b"]}workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": ""}, config)

运行图后,我们预计会看到恰好4个检查点:

  1. 空检查点,下一节点是 START。
  2. 包含用户输入 {'foo': '', 'bar': []} 的检查点,下一节点是 node_a
  3. 包含 node_a 输出 {'foo': 'a', 'bar': ['a']} 的检查点,下一节点是 node_b
  4. 包含 node_b 输出 {'foo': 'b', 'bar': ['a', 'b']} 的检查点,没有后续节点执行。

注意,由于我们为 bar 通道设置了 reducer,bar 通道的值包含了两个节点的输出。

获取状态

在与保存的图状态交互时,必须指定线程标识符(thread identifier)。你可以通过调用 graph.get_state(config) 查看图的最新状态。此方法会返回一个 StateSnapshot 对象,表示与配置中提供的线程 ID 相关联的最新检查点,或者如果提供了检查点 ID,则返回该线程的对应检查点。

# 获取最新状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)# 获取指定 checkpoint_id 的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)

示例中,get_state 的输出示例:

StateSnapshot(values={'foo': 'b', 'bar': ['a', 'b']},next=(),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},created_at='2024-08-29T19:19:38.821749+00:00',parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)
获取状态历史

你可以调用 graph.get_state_history(config) 来获取某线程的完整图执行历史。此方法返回与配置中提供的线程 ID 相关的所有 StateSnapshot 对象列表。重要的是,检查点按时间顺序排列,列表中的第一个是最近的检查点。

config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))

示例中,get_state_history 的输出示例:

[StateSnapshot(values={'foo': 'b', 'bar': ['a', 'b']},next=(),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},created_at='2024-08-29T19:19:38.821749+00:00',parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},tasks=(),),StateSnapshot(values={'foo': 'a', 'bar': ['a']}, next=('node_b',),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},created_at='2024-08-29T19:19:38.819946+00:00',parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),),StateSnapshot(values={'foo': '', 'bar': []},next=('node_a',),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},metadata={'source': 'loop', 'writes': None, 'step': 0},created_at='2024-08-29T19:19:38.817813+00:00',parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),),StateSnapshot(values={'bar': []},next=('__start__',),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},created_at='2024-08-29T19:19:38.816205+00:00',parent_config=None,tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),)
]

在这里插入图片描述

Replay

你也可以对之前的图执行过程进行回放。如果在调用图时传入 thread_idcheckpoint_id,LangGraph 会重新回放在该 checkpoint_id 之前已经执行过的步骤,并仅执行该检查点之后的步骤。

  • thread_id 是线程的标识符
  • checkpoint_id 是指线程中的某个特定检查点的标识符

你需要在调用图时将这两个参数作为配置中 configurable 的一部分传入:

config = {"configurable": {"thread_id": "1", "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
graph.invoke(None, config=config)

需要注意的是,LangGraph 能识别某个步骤是否已经执行过。如果某步骤已经执行过,LangGraph 会回放(re-play)该步骤,而不是重新执行,但这只适用于 checkpoint_id 之前的步骤。

所有 checkpoint_id 之后的步骤都会被重新执行(即重新分支执行),即使它们之前也曾被执行过。

在这里插入图片描述

更新状态(Update state)

除了从特定检查点回放图(graph)之外,我们还可以编辑图的状态,这可以通过 graph.update_state() 方法实现。该方法接受三个参数:

  1. config

    该配置中应包含 thread_id,用于指定要更新哪个线程。

    • 如果只提供 thread_id,则表示更新(或派生)当前状态
    • 如果还包含 checkpoint_id 字段,则会基于该检查点分叉状态。
  2. values

    这是用于更新状态的值。需要注意:

    • 这些值的处理方式与从节点(node)更新状态的方式完全一致;
    • 如果某个状态通道(channel)定义了 reducer 函数,这些值会传入 reducer 中;
    • 因此,update_state 不会自动覆盖所有通道的值,而是只对没有 reducer 的通道进行覆盖。

    来看一个示例:

    假设你为图定义的状态如下(完整示例见前文):

    from typing import Annotated
    from typing_extensions import TypedDict
    from operator import addclass State(TypedDict):foo: intbar: Annotated[list[str], add]
    

    假设当前图的状态为:

    {"foo": 1, "bar": ["a"]}
    

    现在我们执行:

    graph.update_state(config, {"foo": 2, "bar": ["b"]})
    

    图的新状态将是:

    {"foo": 2, "bar": ["a", "b"]}
    

    解释:

    • foo 没有定义 reducer,因此被直接覆盖
    • bar 定义了 add 作为 reducer,因此新值 "b"追加到原有的 ["a"] 中。
  3. as_node(可选)

    可以选择性地传入 as_node 参数,用来指示此次状态更新是来自哪个节点。

    • 如果不传入,系统将默认使用最后更新状态的节点(如果能明确判断);
    • 这非常关键,因为图中接下来要执行的节点,依赖于哪个节点最后更新了状态

    你可以通过这个参数来控制接下来由哪个节点执行

在这里插入图片描述

内存存储(Memory Store)

在这里插入图片描述
一个状态模式(state schema)定义了一组键(keys),这些键会在图执行的过程中被填充。正如前面所述,状态可以通过 checkpointer 在每个图步骤中写入到线程中,从而实现状态持久化。

但如果我们希望在不同线程之间保留某些信息呢?比如,在一个聊天机器人场景中,我们希望在与某个用户的所有聊天会话(即多个线程)中持续保留一些用户相关的信息!

仅依靠 checkpointer,我们无法在不同线程之间共享信息。这就是引入 Store 接口的动机。举个例子,我们可以定义一个 InMemoryStore,用于跨线程存储用户信息。我们只需像之前一样使用 checkpointer 来编译图,同时加上新的 in_memory_store 变量即可。

✅ 使用 LangGraph API 时,无需手动实现或配置存储功能。API 已自动为你处理所有底层存储逻辑。

基本用法(Basic Usage)

我们先在不使用 LangGraph 的前提下展示如何使用 InMemoryStore

from langgraph.store.memory import InMemoryStore
in_memory_store = InMemoryStore()

Memory 是通过“命名空间(namespace)”进行隔离的,命名空间是一个元组,在本例中为:

user_id = "1"
namespace_for_memory = (user_id, "memories")

使用 store.put 方法将记忆写入 store,需指定命名空间与 key-value 对:

memory_id = str(uuid.uuid4())
memory = {"food_preference": "I like pizza"}
in_memory_store.put(namespace_for_memory, memory_id, memory)

使用 store.search 可以读取某命名空间下的所有记忆:

memories = in_memory_store.search(namespace_for_memory)
memories[-1].dict()

输出示例:

{"value": {"food_preference": "I like pizza"},"key": "07e0caf4-1631-47b7-b15f-65515d4c1843","namespace": ["1", "memories"],"created_at": "2024-10-02T17:22:31.590602+00:00","updated_at": "2024-10-02T17:22:31.590605+00:00"
}

每条记忆是一个 Python Item 类的实例,包含以下属性:

  • value:记忆的值(字典)
  • key:在该命名空间下的唯一标识符
  • namespace:记忆的命名空间(字符串列表)
  • created_at:创建时间
  • updated_at:更新时间

语义搜索(Semantic Search)

除了普通检索,store 还支持语义搜索,可通过语义查询找到相关记忆。需要通过嵌入模型(embedding model)初始化 store:

from langchain.embeddings import init_embeddingsstore = InMemoryStore(index={"embed": init_embeddings("openai:text-embedding-3-small"),"dims": 1536,"fields": ["food_preference", "$"]}
)

然后使用自然语言查询记忆:

memories = store.search(namespace_for_memory,query="What does the user like to eat?",limit=3
)

你也可以通过 index 参数来控制哪些字段需要嵌入:

# 嵌入特定字段
store.put(namespace_for_memory,str(uuid.uuid4()),{"food_preference": "I love Italian cuisine","context": "Discussing dinner plans"},index=["food_preference"]
)# 不嵌入(可检索但不可语义搜索)
store.put(namespace_for_memory,str(uuid.uuid4()),{"system_info": "Last updated: 2024-01-01"},index=False
)

在 LangGraph 中使用(Using in LangGraph)

一切就绪后,我们便可以在 LangGraph 中使用 in_memory_store

in_memory_storecheckpointer 配合使用:如前所述,checkpointer 用于将状态保存到各个线程中,而 in_memory_store 则允许我们存储任意信息,以便在线程之间共享访问。

我们在编译图时,同时传入 checkpointerin_memory_store,如下所示:

from langgraph.checkpoint.memory import InMemorySavercheckpointer = InMemorySaver()graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

调用时传入 thread_iduser_id

user_id = "1"
config = {"configurable": {"thread_id": "1", "user_id": user_id}}for update in graph.stream({"messages": [{"role": "user", "content": "hi"}]},config,stream_mode="updates"
):print(update)

我们可以在任意节点中通过将 store: BaseStoreconfig: RunnableConfig 作为参数传入,访问 in_memory_storeuser_id。以下是如何在节点中使用语义搜索查找相关记忆的示例:

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):user_id = config["configurable"]["user_id"]namespace = (user_id, "memories")memory_id = str(uuid.uuid4())memory = {"memory": "example memory content"}store.put(namespace, memory_id, memory)

如上所示,我们也可以在任意节点中访问 store,并使用 store.search 方法获取记忆。请注意,返回的记忆是一个对象列表,这些对象可以转换为字典格式。

memories[-1].dict()
{'value': {'food_preference': 'I like pizza'},'key': '07e0caf4-1631-47b7-b15f-65515d4c1843','namespace': ['1', 'memories'],'created_at': '2024-10-02T17:22:31.590602+00:00','updated_at': '2024-10-02T17:22:31.590605+00:00'}

我们可以访问这些记忆,并在模型调用中使用它们。

def call_model(state: MessagesState, config: RunnableConfig, *, store: BaseStore):# Get the user id from the configuser_id = config["configurable"]["user_id"]# Namespace the memorynamespace = (user_id, "memories")# Search based on the most recent messagememories = store.search(namespace,query=state["messages"][-1].content,limit=3)info = "\n".join([d.value["memory"] for d in memories])# ... Use memories in the model call

如果我们创建一个新线程,只要 user_id 相同,仍然可以访问相同的记忆。

# Invoke the graph
config = {"configurable": {"thread_id": "2", "user_id": "1"}}# Let's say hi again
for update in graph.stream({"messages": [{"role": "user", "content": "hi, tell me about my memories"}]}, config, stream_mode="updates"
):print(update)

当我们使用 LangGraph 平台时,无论是在本地(例如通过 LangGraph Studio)还是通过 LangGraph 平台运行,基础存储(base store)默认可用,无需在图编译期间显式指定。

但如果要启用语义搜索,则需要在 langgraph.json 文件中配置索引设置。例如:

{..."store": {"index": {"embed": "openai:text-embeddings-3-small","dims": 1536,"fields": ["$"]}}
}

Checkpointer 库

在底层,状态检查点功能是通过符合 BaseCheckpointSaver 接口的 checkpointer 对象实现的。LangGraph 提供了多个 checkpointer 实现,它们都是独立的、可安装的库:

  • langgraph-checkpoint:checkpointer 保存器的基础接口库,包含 BaseCheckpointSaver 接口及序列化/反序列化接口(SerializerProtocol)。其中包含内存中的 checkpointer 实现(InMemorySaver),适用于实验性用途。LangGraph 默认自带该库。
  • langgraph-checkpoint-sqlite:使用 SQLite 数据库的 checkpointer 实现(SqliteSaver / AsyncSqliteSaver),非常适合本地开发与实验性工作流程。需要单独安装。
  • langgraph-checkpoint-postgres:使用 Postgres 数据库的高级 checkpointer(PostgresSaver / AsyncPostgresSaver),用于 LangGraph 平台,适合生产环境使用。需要单独安装。
Checkpointer 接口

每个 checkpointer 都符合 BaseCheckpointSaver 接口,并实现以下方法:

  • .put:保存带有配置和元数据的检查点;
  • .put_writes:保存与检查点相关的中间写入(即待提交的写入);
  • .get_tuple:根据给定配置(thread_idcheckpoint_id)获取检查点元组。用于 graph.get_state() 中填充 StateSnapshot
  • .list:列出符合配置和过滤条件的检查点。用于 graph.get_state_history() 中填充状态历史。

如果你在使用异步图执行(例如通过 .ainvoke, .astream, .abatch 执行图),则会使用以上方法的异步版本(.aput, .aput_writes, .aget_tuple, .alist)。

注意:
如果要异步运行图,你可以使用 InMemorySaver,或使用支持异步的 Sqlite/Postgres checkpointer(即 AsyncSqliteSaverAsyncPostgresSaver)。

序列化器(Serializer)

在保存图状态时,checkpointer 需要对状态中的 channel 值进行序列化。这个过程由序列化器对象完成。langgraph_checkpoint 定义了一个用于实现序列化器的协议,并提供了一个默认实现(JsonPlusSerializer),它可以处理多种类型,包括 LangChain 和 LangGraph 的原语、datetimeenum 等。

添加持久化

许多 AI 应用需要内存来跨多次交互共享上下文。LangGraph 支持两种构建对话代理的关键内存类型:

  • 短期内存:通过维护会话内的消息历史,跟踪当前对话进程。
  • 长期内存:跨会话存储用户特定或应用级数据。

在 LangGraph 中:

  • 短期内存也称为线程级内存(thread-level memory)。
  • 长期内存也称为跨线程内存(cross-thread memory)。
  • 一个线程(thread)代表由相同 thread_id 关联的一系列相关执行。

添加短期内存

短期内存(线程级持久化)使代理能够跟踪多轮对话。添加短期内存的步骤:

示例代码:

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.memory import InMemorySavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")def call_model(state: MessagesState):response = model.invoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}
}for chunk in graph.stream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values",
):chunk["messages"][-1].pretty_print()for chunk in graph.stream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values",
):chunk["messages"][-1].pretty_print()

对话示例:

================================ Human Message =================================hi! I'm bob
================================== Ai Message ==================================Hi Bob! How are you doing today? Is there anything I can help you with?
================================ Human Message =================================what's my name?
================================== Ai Message ==================================Your name is Bob.

生产环境中,建议使用数据库支持的 checkpointer,例如:

示例:使用 Postgres 检查点存储器

pip install -U "psycopg[binary,pool]" langgraph langgraph-checkpoint-postgres

首次使用 Postgres 检查点存储器时,需要调用 checkpointer.setup()

同步调用

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres import PostgresSavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:# checkpointer.setup()def call_model(state: MessagesState):response = model.invoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}for chunk in graph.stream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()for chunk in graph.stream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()

异步调用

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres.aio import AsyncPostgresSavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:# await checkpointer.setup()async def call_model(state: MessagesState):response = await model.ainvoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()async for chunk in graph.astream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()

示例:使用 MongoDB 检查点存储器

pip install -U pymongo langgraph langgraph-checkpoint-mongodb

要使用 MongoDB 检查点存储器,你需要一个 MongoDB 集群。

同步调用

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.mongodb import MongoDBSavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "localhost:27017"
with MongoDBSaver.from_conn_string(DB_URI) as checkpointer:def call_model(state: MessagesState):response = model.invoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}for chunk in graph.stream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()for chunk in graph.stream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()

异步调用

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "localhost:27017"
async with AsyncMongoDBSaver.from_conn_string(DB_URI) as checkpointer:async def call_model(state: MessagesState):response = await model.ainvoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()async for chunk in graph.astream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()

示例:使用 Redis 检查点存储器

pip install -U langgraph langgraph-checkpoint-redis

首次使用 Redis 检查点存储器时,需要调用 checkpointer.setup()

同步调用

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.redis import RedisSavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "redis://localhost:6379"
with RedisSaver.from_conn_string(DB_URI) as checkpointer:# checkpointer.setup()def call_model(state: MessagesState):response = model.invoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}for chunk in graph.stream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()for chunk in graph.stream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()

异步调用

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.redis.aio import AsyncRedisSavermodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "redis://localhost:6379"
async with AsyncRedisSaver.from_conn_string(DB_URI) as checkpointer:# await checkpointer.asetup()async def call_model(state: MessagesState):response = await model.ainvoke(state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()async for chunk in graph.astream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"):chunk["messages"][-1].pretty_print()
使用子图(Subgraphs)

如果你的图包含子图,只需要在父图编译时提供 checkpointer,LangGraph 会自动将其传递给子图。

示例:

from langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from typing import TypedDictclass State(TypedDict):foo: str# 子图定义
def subgraph_node_1(state: State):return {"foo": state["foo"] + "bar"}subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph = subgraph_builder.compile()# 父图定义
def node_1(state: State):return {"foo": "hi! " + state["foo"]}builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

如果你希望子图拥有自己的记忆,可以在编译时将 checkpointer 设置为 True。这在多智能体系统中非常有用,比如当你希望每个智能体都能跟踪自己的内部消息历史时:

subgraph_builder = StateGraph(...)
subgraph = subgraph_builder.compile(checkpointer=True)

这在多代理系统中非常有用,能让各代理独立维护自己的消息历史。

在 Functional API 中使用短期内存【可选】

给 Functional API 的 LangGraph 工作流添加短期内存:

  • 通过 entrypoint() 装饰器传入 checkpointer 实例:
from langgraph.func import entrypoint@entrypoint(checkpointer=checkpointer)
def workflow(inputs):...
  • 可选地,在工作流函数签名中添加 previous 参数以访问上一次执行结果:
@entrypoint(checkpointer=checkpointer)
def workflow(inputs, *, previous):previous = previous or []combined_inputs = previous + inputsresult = do_something(combined_inputs)...
  • 可选地选择返回哪些值,并用 checkpointer 保存哪些为 previous
@entrypoint(checkpointer=checkpointer)
def workflow(inputs, *, previous):...result = do_something(...)return entrypoint.final(value=result, save=combine(inputs, result))
管理检查点

你可以查看和删除由 checkpointer 保存的信息:

查看线程状态(检查点)

config = {"configurable": {"thread_id": "1",# 可选:提供特定检查点的 ID,# 否则将显示最新的检查点# "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"}
}
graph.get_state(config)

返回示例:

StateSnapshot(values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},next=(),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},metadata={'source': 'loop','writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}},'step': 4,'parents': {},'thread_id': '1'},created_at='2025-05-05T16:01:24.680462+00:00',parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},tasks=(),interrupts=()
)

查看线程的历史(检查点)

config = {"configurable": {"thread_id": "1"}
}
list(graph.get_state_history(config))

返回示例(列表中包含多个 StateSnapshot,每个代表一次状态快照):

[StateSnapshot(values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},next=(),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}}, 'step': 4, 'parents': {}, 'thread_id': '1'},created_at='2025-05-05T16:01:24.680462+00:00',parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},tasks=(),interrupts=()),StateSnapshot(values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?")]},next=('call_model',),config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},metadata={'source': 'loop', 'writes': None, 'step': 3, 'parents': {}, 'thread_id': '1'},created_at='2025-05-05T16:01:23.863421+00:00',parent_config={...},tasks=(PregelTask(id='8ab4155e-6b15-b885-9ce5-bed69a2c305c', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Your name is Bob.')}),),interrupts=()),# 其他 StateSnapshot 略
]

删除线程的所有检查点

thread_id = "1"
checkpointer.delete_thread(thread_id)

添加长期内存

长期内存(跨线程持久化)用于跨多次会话存储用户或应用相关数据,比如聊天机器人记住用户偏好等。

使用长期内存时,需要在创建图时提供存储:

示例:

import uuid
from typing_extensions import Annotated, TypedDictfrom langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.store.memory import InMemoryStore
from langgraph.store.base import BaseStoremodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")def call_model(state: MessagesState,config: RunnableConfig,*,store: BaseStore,
):user_id = config["configurable"]["user_id"]namespace = ("memories", user_id)memories = store.search(namespace, query=str(state["messages"][-1].content))info = "\n".join([d.value["data"] for d in memories])system_msg = f"You are a helpful assistant talking to the user. User info: {info}"# 如果用户请求记忆,存储新记忆last_message = state["messages"][-1]if "remember" in last_message.content.lower():memory = "User name is Bob"store.put(namespace, str(uuid.uuid4()), {"data": memory})response = model.invoke([{"role": "system", "content": system_msg}] + state["messages"])return {"messages": response}builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")checkpointer = InMemorySaver()
store = InMemoryStore()graph = builder.compile(checkpointer=checkpointer,store=store,
)config = {"configurable": {"thread_id": "1","user_id": "1",}
}
for chunk in graph.stream({"messages": [{"role": "user", "content": "Hi! Remember: my name is Bob"}]},config,stream_mode="values",
):chunk["messages"][-1].pretty_print()config = {"configurable": {"thread_id": "2","user_id": "1",}
}for chunk in graph.stream({"messages": [{"role": "user", "content": "what is my name?"}]},config,stream_mode="values",
):chunk["messages"][-1].pretty_print()

对话示例:

================================ Human Message =================================Hi! Remember: my name is Bob
================================== Ai Message ==================================Hi Bob! I'll remember that your name is Bob. How are you doing today?
================================ Human Message =================================what is my name?
================================== Ai Message ==================================Your name is Bob.
生产环境中使用长期存储

生产环境中建议使用数据库支持的存储:

from langgraph.checkpoint.postgres import PostgresSaverDB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
with PostgresStore.from_conn_string(DB_URI) as store:builder = StateGraph(...)graph = builder.compile(store=store)

Postgres 存储使用示例

pip install -U "psycopg[binary,pool]" langgraph langgraph-checkpoint-postgres

首次使用 Postgres 存储时,需要调用 store.setup() 进行初始化。

同步示例

from langchain_core.runnables import RunnableConfig
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from langgraph.store.base import BaseStore
import uuidmodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"with (PostgresStore.from_conn_string(DB_URI) as store,PostgresSaver.from_conn_string(DB_URI) as checkpointer,
):# store.setup()# checkpointer.setup()def call_model(state: MessagesState,config: RunnableConfig,*,store: BaseStore,):user_id = config["configurable"]["user_id"]namespace = ("memories", user_id)memories = store.search(namespace, query=str(state["messages"][-1].content))info = "\n".join([d.value["data"] for d in memories])system_msg = f"You are a helpful assistant talking to the user. User info: {info}"# 如果用户请求记忆,则保存新记忆last_message = state["messages"][-1]if "remember" in last_message.content.lower():memory = "User name is Bob"store.put(namespace, str(uuid.uuid4()), {"data": memory})response = model.invoke([{"role": "system", "content": system_msg}] + state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer,store=store,)config = {"configurable": {"thread_id": "1","user_id": "1",}}for chunk in graph.stream({"messages": [{"role": "user", "content": "Hi! Remember: my name is Bob"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()config = {"configurable": {"thread_id": "2","user_id": "1",}}for chunk in graph.stream({"messages": [{"role": "user", "content": "what is my name?"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()

异步示例

from langchain_core.runnables import RunnableConfig
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.store.postgres.aio import AsyncPostgresStore
from langgraph.store.base import BaseStoremodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"async with (AsyncPostgresStore.from_conn_string(DB_URI) as store,AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer,
):# await store.setup()# await checkpointer.setup()async def call_model(state: MessagesState,config: RunnableConfig,*,store: BaseStore,):user_id = config["configurable"]["user_id"]namespace = ("memories", user_id)memories = await store.asearch(namespace, query=str(state["messages"][-1].content))info = "\n".join([d.value["data"] for d in memories])system_msg = f"You are a helpful assistant talking to the user. User info: {info}"# Store new memories if the user asks the model to rememberlast_message = state["messages"][-1]if "remember" in last_message.content.lower():memory = "User name is Bob"await store.aput(namespace, str(uuid.uuid4()), {"data": memory})response = await model.ainvoke([{"role": "system", "content": system_msg}] + state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer,store=store,)config = {"configurable": {"thread_id": "1","user_id": "1",}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "Hi! Remember: my name is Bob"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()config = {"configurable": {"thread_id": "2","user_id": "1",}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "what is my name?"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()

Redis 存储使用示例

pip install -U langgraph langgraph-checkpoint-redis

首次使用 Redis 存储时,需要调用 store.setup() 进行初始化。

同步示例

from langchain_core.runnables import RunnableConfig
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.redis import RedisSaver
from langgraph.store.redis import RedisStore
from langgraph.store.base import BaseStore
import uuidmodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "redis://localhost:6379"with (RedisStore.from_conn_string(DB_URI) as store,RedisSaver.from_conn_string(DB_URI) as checkpointer,
):store.setup()checkpointer.setup()def call_model(state: MessagesState,config: RunnableConfig,*,store: BaseStore,):user_id = config["configurable"]["user_id"]namespace = ("memories", user_id)memories = store.search(namespace, query=str(state["messages"][-1].content))info = "\n".join([d.value["data"] for d in memories])system_msg = f"You are a helpful assistant talking to the user. User info: {info}"# 如果用户请求记忆,则保存新记忆last_message = state["messages"][-1]if "remember" in last_message.content.lower():memory = "User name is Bob"store.put(namespace, str(uuid.uuid4()), {"data": memory})response = model.invoke([{"role": "system", "content": system_msg}] + state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer,store=store,)config = {"configurable": {"thread_id": "1","user_id": "1",}}for chunk in graph.stream({"messages": [{"role": "user", "content": "Hi! Remember: my name is Bob"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()config = {"configurable": {"thread_id": "2","user_id": "1",}}for chunk in graph.stream({"messages": [{"role": "user", "content": "what is my name?"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()

异步示例

from langchain_core.runnables import RunnableConfig
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.store.redis.aio import AsyncRedisStore
from langgraph.store.base import BaseStoremodel = init_chat_model(model="anthropic:claude-3-5-haiku-latest")DB_URI = "redis://localhost:6379"async with (AsyncRedisStore.from_conn_string(DB_URI) as store,AsyncRedisSaver.from_conn_string(DB_URI) as checkpointer,
):# await store.setup()# await checkpointer.asetup()async def call_model(state: MessagesState,config: RunnableConfig,*,store: BaseStore,):user_id = config["configurable"]["user_id"]namespace = ("memories", user_id)memories = await store.asearch(namespace, query=str(state["messages"][-1].content))info = "\n".join([d.value["data"] for d in memories])system_msg = f"You are a helpful assistant talking to the user. User info: {info}"# Store new memories if the user asks the model to rememberlast_message = state["messages"][-1]if "remember" in last_message.content.lower():memory = "User name is Bob"await store.aput(namespace, str(uuid.uuid4()), {"data": memory})response = await model.ainvoke([{"role": "system", "content": system_msg}] + state["messages"])return {"messages": response}builder = StateGraph(MessagesState)builder.add_node(call_model)builder.add_edge(START, "call_model")graph = builder.compile(checkpointer=checkpointer,store=store,)config = {"configurable": {"thread_id": "1","user_id": "1",}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "Hi! Remember: my name is Bob"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()config = {"configurable": {"thread_id": "2","user_id": "1",}}async for chunk in graph.astream({"messages": [{"role": "user", "content": "what is my name?"}]},config,stream_mode="values",):chunk["messages"][-1].pretty_print()
使用语义搜索

你可以在图的内存存储中启用语义搜索,让图代理能按语义相似度搜索存储内容。

示例:

from langchain.embeddings import init_embeddings
from langgraph.store.memory import InMemoryStoreembeddings = init_embeddings("openai:text-embedding-3-small")
store = InMemoryStore(index={"embed": embeddings,"dims": 1536,}
)store.put(("user_123", "memories"), "1", {"text": "I love pizza"})
store.put(("user_123", "memories"), "2", {"text": "I am a plumber"})items = store.search(("user_123", "memories"), query="I'm hungry", limit=1
)

长期记忆与语义搜索示例

from typing import Optionalfrom langchain.embeddings import init_embeddings
from langchain.chat_models import init_chat_model
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.graph import START, MessagesState, StateGraphllm = init_chat_model("openai:gpt-4o-mini")# 创建支持语义搜索的存储
embeddings = init_embeddings("openai:text-embedding-3-small")
store = InMemoryStore(index={"embed": embeddings,"dims": 1536,}
)store.put(("user_123", "memories"), "1", {"text": "I love pizza"})
store.put(("user_123", "memories"), "2", {"text": "I am a plumber"})def chat(state, *, store: BaseStore):# 基于用户最后一条消息进行搜索items = store.search(("user_123", "memories"), query=state["messages"][-1].content, limit=2)memories = "\n".join(item.value["text"] for item in items)memories = f"## Memories of user\n{memories}" if memories else ""response = llm.invoke([{"role": "system", "content": f"You are a helpful assistant.\n{memories}"},*state["messages"],])return {"messages": [response]}builder = StateGraph(MessagesState)
builder.add_node(chat)
builder.add_edge(START, "chat")
graph = builder.compile(store=store)for message, metadata in graph.stream(input={"messages": [{"role": "user", "content": "I'm hungry"}]},stream_mode="messages",
):print(message.content, end="")

内存(Memory)

概念

内存是一种认知功能,使人们能够存储、检索和使用信息,从而理解当前和未来的情况。想象一下,如果你和一个同事合作,但对方总是忘记你告诉他的所有内容,需要你不断重复,那会多么令人沮丧!随着 AI 代理承担越来越复杂的任务,涉及大量用户交互,为它们配备内存同样对提升效率和用户满意度至关重要。通过内存,代理可以从反馈中学习并适应用户偏好。

在这里插入图片描述

本指南介绍基于回忆范围的两种内存类型:

  • 短期内存(Short-term memory),也称为线程范围内存(thread-scoped memory),可以在与单个用户的单个对话线程中随时调用。LangGraph 将短期内存作为代理状态的一部分进行管理。状态通过检查点(checkpointer)持久化到数据库,因此线程可以随时恢复。短期内存在图调用或步骤完成时更新,每个步骤开始时都会读取状态。

  • 长期内存(Long-term memory)在多个对话线程之间共享。它可以在任何时间、任何线程中调用。记忆的作用域可以是任意自定义的命名空间,而不仅限于单个线程 ID。LangGraph 提供了存储(store)来帮助你保存和调用长期内存。

短期内存

短期内存让你的应用能够记住单个线程或对话中的先前交互。线程将一个会话中的多次交互组织起来,类似于电子邮件中将多条消息归为一个对话的方式。

LangGraph 将短期内存作为代理状态的一部分进行管理,通过线程范围的检查点(checkpoints)持久化保存。这个状态通常包含对话历史以及其他有状态数据,比如上传的文件、检索的文档或生成的产物。通过将这些信息存储在图的状态中,机器人能够访问给定对话的完整上下文,同时保持不同线程之间的独立性。

管理长对话历史

长对话对当前的大型语言模型(LLM)来说是一个挑战。完整的对话历史可能根本无法全部放入模型的上下文窗口中,导致无法恢复的错误。即使你的模型技术上支持完整的上下文长度,大多数模型在处理长上下文时表现仍然不佳。它们容易被陈旧或离题的内容“干扰”,同时响应速度变慢,成本也更高。

管理短期记忆需要在准确率与召回率之间,以及应用的其他性能需求(如延迟和成本)之间取得平衡。正如以往一样,关键是要批判性地思考如何为你的语言模型表示信息,并仔细分析你的数据。以下我们介绍几种常见的消息列表管理技术,帮助你为应用选择最佳的权衡方案:

  • 编辑消息列表:如何在传递给语言模型之前,考虑剪裁和过滤消息列表。
  • 总结过去的对话:当你不只是想过滤消息列表时,一种常用的技术。
编辑消息列表

聊天模型使用消息作为上下文,消息包括开发者提供的指令(系统消息)和用户输入(人类消息)。在聊天应用中,消息在用户输入和模型回复之间交替出现,消息列表随时间增长。由于上下文窗口有限且消息令牌数量多,很多应用都能受益于手动删除或忘记过时信息的技术。

在这里插入图片描述

最直接的方法是从列表中删除旧消息(类似于最近最少使用缓存,LRU)。

在 LangGraph 中,删除列表内容的典型方法是从某个节点返回一个更新,告诉系统删除列表中的某一部分。你可以自定义这个更新的格式,但常见的做法是返回一个对象或字典,指定需要保留的值。

def manage_list(existing: list, updates: Union[list, dict]):if isinstance(updates, list):# 正常情况,追加消息return existing + updateselif isinstance(updates, dict) and updates["type"] == "keep":# 你可以自定义格式# 比如只接受字符串 "DELETE",清空整个列表return existing[updates["from"]:updates["to"]]# 其他更新类型解释class State(TypedDict):my_list: Annotated[list, manage_list]def my_node(state: State):return {# 只保留索引从 -5 到末尾的元素,删除其余"my_list": {"type": "keep", "from": -5, "to": None}}

每当返回 "my_list" 键的更新时,LangGraph 都会调用 manage_list 这个“reducer”函数,里面定义了可接受的更新类型。通常消息会被追加,但也支持以字典形式“保留”部分状态,这样可以程序化地丢弃旧消息上下文。

另一种常见做法是返回一个“删除”对象列表,指定所有要删除消息的 ID。如果你使用 LangChain 消息和 LangGraph 的 add_messages reducer(或 MessagesState,底层逻辑相同),可以用 RemoveMessage 来删除消息。

from langchain_core.messages import RemoveMessage, AIMessage
from langgraph.graph import add_messagesclass State(TypedDict):messages: Annotated[list, add_messages]def my_node_1(state: State):# 向状态中的 messages 列表添加一条 AI 消息return {"messages": [AIMessage(content="Hi")]}def my_node_2(state: State):# 删除 messages 列表中除了最后两条以外的所有消息delete_messages = [RemoveMessage(id=m.id) for m in state['messages'][:-2]]return {"messages": delete_messages}

上例中,add_messages reducer 允许追加消息。遇到 RemoveMessage 时,会删除对应 ID 的消息(RemoveMessage 会被丢弃)。

总结历史对话

直接删除消息的缺点是可能丢失重要信息,因此有些应用会用聊天模型来总结消息历史,实现更复杂的管理。

在这里插入图片描述

简单的提示和编排逻辑即可实现这一点。例如,在 LangGraph 中,可以扩展 MessagesState,添加一个 summary 字段:

from langgraph.graph import MessagesState
class State(MessagesState):summary: str

然后用已有摘要作为上下文,为聊天历史生成新的摘要。summarize_conversation 节点在积累一定消息后调用:

def summarize_conversation(state: State):summary = state.get("summary", "")if summary:summary_message = (f"This is a summary of the conversation to date: {summary}\n\n""Extend the summary by taking into account the new messages above:")else:summary_message = "Create a summary of the conversation above:"messages = state["messages"] + [HumanMessage(content=summary_message)]response = model.invoke(messages)delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]return {"summary": response.content, "messages": delete_messages}
何时删除消息

大多数大型语言模型(LLM)对上下文窗口有最大支持限制(以 token 数量计)。决定何时截断消息的一种简单方法是统计消息历史中的 token 数量,当接近限制时就进行截断。

虽然简单截断很容易自己实现,但有一些“陷阱”需要注意。部分模型 API 对消息类型的顺序有额外限制(例如必须以用户消息开始,不能连续出现相同类型的消息等)。

如果你使用 LangChain,可以用它提供的 trim_messages 工具,指定要保留的 token 数量以及截断策略(比如保留最后的 max_tokens 个 token),来处理截断边界。

以下是示例代码:

from langchain_core.messages import trim_messagestrim_messages(messages,# 保留消息中最后不超过 n_count 个 tokenstrategy="last",# 根据你的模型调整计数器,或者传入自定义的 token_encodertoken_counter=ChatOpenAI(model="gpt-4"),# 根据期望的对话长度调整max_tokens=45,# 大多数聊天模型期望聊天历史以以下之一开始:# (1) 用户消息(HumanMessage)# (2) 系统消息(SystemMessage)后接用户消息start_on="human",# 大多数聊天模型期望聊天历史以以下之一结束:# (1) 用户消息(HumanMessage)# (2) 工具消息(ToolMessage)end_on=("human", "tool"),# 通常,如果原始历史中有系统消息,我们希望保留它# 系统消息通常包含给模型的重要指令include_system=True,
)

长期记忆

LangGraph 中的长期记忆允许系统跨不同对话或会话保留信息。与线程范围的短期记忆不同,长期记忆存储在自定义的“命名空间”中。

LangGraph 将长期记忆以 JSON 文档的形式存储在存储系统中。每条记忆归类到一个自定义命名空间(类似文件夹)和一个独特的键(类似文件名)下。命名空间通常包含用户 ID、组织 ID 或其他标签,便于组织信息。这种结构支持记忆的分层管理。通过内容过滤器支持跨命名空间的搜索。下面是示例代码:

from langgraph.store.memory import InMemoryStoredef embed(texts: list[str]) -> list[list[float]]:# 用真实的向量化函数或 LangChain embeddings 对象替换return [[1.0, 2.0] * len(texts)]# InMemoryStore 将数据保存到内存字典中,生产环境应使用数据库支持的存储
store = InMemoryStore(index={"embed": embed, "dims": 2})
user_id = "my-user"
application_context = "chitchat"
namespace = (user_id, application_context)
store.put(namespace,"a-memory",{"rules": ["User likes short, direct language","User only speaks English & python",],"my-key": "my-value",},
)
# 根据 ID 获取“记忆”
item = store.get(namespace, "a-memory")
# 在该命名空间内搜索“记忆”,通过内容过滤,按向量相似度排序
items = store.search(namespace, filter={"my-key": "my-value"}, query="language preferences"
)

长期记忆是一个复杂的问题,没有通用的解决方案。不过,以下几个问题可以作为结构化框架,帮助你理解和选择不同的技术:

记忆的类型是什么?

人类用记忆来保存事实、经历和规则。AI 代理也可以用记忆做类似的事情。例如,AI 代理可以利用记忆记住关于用户的具体信息,以完成某项任务。我们将在下面的章节中详细介绍几种记忆类型。

你希望何时更新记忆?

记忆可以作为代理应用逻辑的一部分进行更新(例如“热路径”中)。在这种情况下,代理通常会在回复用户之前决定记住哪些事实。另一种方式是将记忆更新作为后台任务执行(在后台或异步运行,生成记忆)。我们将在后续章节中解释这两种方法的权衡。

记忆类型

不同的应用场景需要不同类型的记忆。虽然这个类比并不完全准确,但参考人类记忆类型可以带来启发。一些研究(如 CoALA 论文)甚至将这些人类记忆类型映射到了 AI 代理使用的记忆类型。

记忆类型存储内容人类示例代理示例
语义记忆事实学校里学到的知识用户的事实信息
情景记忆经验做过的事情代理过去的操作
程序记忆指令本能或运动技能代理系统提示语
语义记忆

语义记忆,无论在人类还是 AI 代理中,都是指对具体事实和概念的记忆。人类的语义记忆包括在学校学到的信息以及对概念及其关系的理解。对于 AI 代理,语义记忆常用于通过记住过去交互中的事实或概念来实现个性化应用。

注意:语义记忆(semantic memory)不同于语义搜索(semantic search)。语义搜索是一种基于“意义”查找相似内容的技术(通常用嵌入向量实现),而语义记忆是心理学术语,指储存事实和知识;语义搜索是基于意义而非精确匹配的信息检索方法。

个人档案(Profile)

在这里插入图片描述

语义记忆可以通过不同方式进行管理。例如,记忆可以是一个单一的、持续更新的“档案”,该档案包含针对用户、组织或其他实体(包括代理自身)的范围明确且具体的信息。这个档案通常是一个 JSON 文档,里面包含你选择用来表示领域的各种键值对。

在保存档案时,需要确保每次都对档案进行更新。因此,你需要传入之前的档案,并请求模型生成一个新的档案(或者生成一些 JSON 补丁以应用到旧档案上)。随着档案体积变大,这种方式容易出错,可能需要将档案拆分成多个文档,或者在生成文档时采用严格解码,以确保记忆结构的有效性。

文档集合(Collection)

在这里插入图片描述

或者,记忆也可以是一组文档的集合,这些文档会随着时间不断更新和扩展。每条单独的记忆范围更为狭窄,生成起来也更容易,这意味着信息丢失的可能性更小。对于大型语言模型(LLM)来说,生成新的对象来存储新信息比将新信息与已有档案进行整合要容易得多。因此,文档集合往往能带来更高的召回率。

不过,这也增加了记忆更新的复杂性。模型现在必须删除或更新列表中已有的条目,这可能比较棘手。此外,有些模型可能默认会过度插入信息,有些则可能过度更新。可以参考 Trustcall 包来管理这种情况,同时考虑使用诸如 LangSmith 之类的工具来评估并调整这种行为。

使用文档集合还会将复杂度转移到对列表中记忆的搜索上。目前的存储系统支持语义搜索和基于内容的过滤。

最后,使用文档集合可能会使向模型提供全面的上下文变得更加困难。尽管单条记忆可能遵循特定的模式,但这种结构可能无法捕捉记忆之间的完整上下文或关系。因此,当使用这些记忆生成回复时,模型可能缺少在统一档案方法中更容易获得的重要上下文信息。

无论采用哪种管理方式,核心点是代理将用语义记忆作为回应的基础,从而实现更个性化、更相关的交互。

情景记忆

情节记忆(Episodic Memory)在人类和 AI 智能体中都涉及对过去事件或行为的回忆。《CoALA》论文对此做了很好的阐述:事实可以写入语义记忆,而经历则应写入情节记忆。对于 AI 智能体来说,情节记忆通常用于帮助其记住完成某项任务的过程。

在实践中,情节记忆常通过少样本示例提示(few-shot example prompting)实现,智能体通过学习过去的交互序列来正确执行任务。有时,“演示”比“说明”更有效,而大语言模型(LLMs)也擅长从示例中学习。少样本学习允许你通过更新提示词中的输入输出示例来“编程”大模型,以此传达期望的行为。虽然可以采用多种最佳实践生成示例,但关键挑战通常在于如何根据用户输入选择最相关的示例。

需要注意的是,记忆存储只是保存少样本示例的一种方式。如果你希望开发者参与更多,或将示例更紧密地结合到评估框架中,也可以使用 LangSmith Dataset 来存储数据。这样可以直接使用动态少样本示例选择器来达到相同目的。LangSmith 会为你索引数据集,并基于关键词相似度(使用类似 BM25 的算法)检索与用户输入最相关的少样本示例。

程序记忆

程序性记忆(Procedural Memory)在人类和 AI 智能体中,指的是记住完成任务所需的规则。在人类中,程序性记忆类似于内化的操作知识,比如通过基本的运动技能和平衡能力学会骑自行车。相比之下,情节记忆则是对特定经历的回忆,比如你第一次不借助辅助轮成功骑车,或一次风景优美的骑行经历。而对于 AI 智能体来说,程序性记忆则是模型权重、智能体代码以及智能体提示词的组合,这些共同决定了智能体的功能行为。

在实践中,智能体很少修改自己的模型权重或重写代码,但更常见的是修改其自身的提示词(prompts)。

一种有效改进智能体指令的方法是使用“反思”(Reflection)或元提示(meta-prompting)。这涉及将当前的指令(例如系统提示词)与近期的对话内容或用户的显式反馈一起作为输入,提示智能体对自身的提示进行改进。这种方法尤其适用于那些一开始难以明确指定指令的任务,因为它允许智能体通过交互过程不断学习与适应。

举个例子,我们曾构建一个用于生成推文的智能体,通过外部反馈与提示词重写,来生成高质量的论文摘要推文。在这种情况下,事先指定一个准确的摘要提示词很困难,但让用户点评生成的推文并提供改进意见却相对容易。

下面的伪代码展示了如何结合 LangGraph 的 memory store 实现这一流程:先使用存储器保存提示词,接着 update_instructions 节点读取当前提示词(以及保存在 state["messages"] 中的用户对话反馈),然后更新提示词并写回存储器。最后,call_model 节点从存储器读取更新后的提示词,并用其生成响应。

# 使用指令的节点
def call_model(state: State, store: BaseStore):namespace = ("agent_instructions", )instructions = store.get(namespace, key="agent_a")[0]# 应用逻辑prompt = prompt_template.format(instructions=instructions.value["instructions"])...# 更新指令的节点
def update_instructions(state: State, store: BaseStore):namespace = ("instructions",)current_instructions = store.search(namespace)[0]# 记忆逻辑prompt = prompt_template.format(instructions=instructions.value["instructions"], conversation=state["messages"])output = llm.invoke(prompt)new_instructions = output['new_instructions']store.put(("agent_instructions",), "agent_a", {"instructions": new_instructions})...

在这里插入图片描述

总结:这一章节讲了 AI 智能体的不同记忆类型及其应用方式,类比人类的记忆系统,详细介绍了四种关键记忆类型:

🧠 1. 语义记忆(Semantic Memory)

  • 作用:存储事实性信息,如用户的基本资料、组织信息等。

  • 实现方式

    • 一种方式是使用单一 JSON 文档,表示一个“用户画像”或“配置文件”,每次更新时替换整个或部分字段。

      • 优点:结构清晰。
      • 缺点:文档变大后难以更新,容易出错。
    • 另一种方式是使用多个文档组成的集合,每条记忆范围更小,便于追加和维护。

      • 优点:更容易保留信息、提升召回率。
      • 缺点:更新和去重变复杂;上下文提供时不如统一 profile 简洁。

🧠 2. 情节记忆(Episodic Memory)

  • 作用:记录过去发生的事件和交互过程,帮助智能体“记得怎么做某事”。

  • 实现方式

    • 多通过**few-shot prompt(少样本提示)**实现:把过去交互作为示例传入,指导模型行为。
    • 可结合工具(如 LangSmith Dataset)来动态检索相关示例,提高针对性。

🧠 3. 程序性记忆(Procedural Memory)

  • 作用:记住完成任务的规则或方法(类似“技能”或“操作流程”)。

  • 实现方式

    • 包括模型参数、代码和提示词,其中提示词是最常更新的部分。

    • 支持通过“反思(Reflection)”方式让模型自我优化提示词:

      • 输入当前提示词 + 最近对话/反馈
      • 模型生成改进版提示词,写回内存
    • 示例:生成 Twitter 推文摘要,通过用户反馈不断改进提示词

写入记忆

虽然人类通常在睡眠中形成长期记忆,AI 智能体则需要采用不同的方式。那么,智能体应在何时以及如何创建新的记忆呢?主要有两种写入记忆的方法:“热路径写入(on the hot path)”“后台写入(in the background)”

在这里插入图片描述
热路径写入(on the hot path)

在运行时创建记忆有其优点也有挑战:

  • 优点

    • 实时更新,使新记忆可以立即在后续交互中使用;
    • 具备可透明性,例如可以通知用户某条记忆被创建和存储。
  • 挑战

    • 如果智能体需要一个新工具来判断“是否应记录”某条信息,会增加系统复杂性;
    • 决定“保存什么”需要推理过程,这会影响响应延迟;
    • 智能体需要在处理当前任务的同时管理记忆创建,可能影响记忆的数量和质量。

例如:ChatGPT 使用 save_memories 工具将内容字符串写入或更新记忆,并在每条用户消息时动态决定是否使用该工具。可以参考我们的 memory-agent 模板了解实现方式。

后台写入(in the background)

将记忆创建作为一个独立的后台任务也有不少优势:

  • 优点

    • 不会影响主流程的延迟;
    • 应用逻辑与记忆管理解耦;
    • 智能体可以更专注于当前任务执行;
    • 更灵活地选择写入时机,避免重复或不必要的操作。
  • 挑战

    • 需要合理设定写入频率,更新太慢可能导致其他线程获取不到最新上下文;

    • 如何触发记忆写入也需要明确策略,例如:

      • 固定时间后触发(若有新事件则重置计时);
      • 使用 cron 定时任务;
      • 用户或应用逻辑主动触发。

管理记忆(Manage memory)

许多 AI 应用需要使用记忆来在多轮交互之间共享上下文。LangGraph 支持构建对话代理所需的两种基本记忆类型:

  • 短期记忆(Short-term memory):在一个会话中维护消息历史,用于跟踪正在进行的对话。
  • 长期记忆(Long-term memory):在多个会话之间存储用户级或应用级的数据。

启用短期记忆后,如果对话太长,可能会超出大语言模型(LLM)的上下文窗口。常见的解决方案包括:

  • 裁剪(Trimming):在调用 LLM 前,移除最前面或最后面的 N 条消息。
  • 摘要(Summarization):将较早的历史消息进行总结,用摘要替代原始内容。
  • 删除(Delete):从 LangGraph 的状态中永久删除某些消息。
  • 自定义策略:如消息过滤等。

这有助于代理在保持上下文的同时避免超出模型的上下文限制。

添加短期记忆示例:

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraphcheckpointer = InMemorySaver()builder = StateGraph(...)
graph = builder.compile(checkpointer=checkpointer)graph.invoke({"messages": [{"role": "user", "content": "hi! i am Bob"}]},{"configurable": {"thread_id": "1"}},
)

长期记忆用于跨会话保存用户特定或应用特定的数据,非常适合需要记住用户偏好等信息的应用(如聊天机器人)。

添加长期记忆示例:

from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraphstore = InMemoryStore()builder = StateGraph(...)
graph = builder.compile(store=store)

裁剪消息(Trim messages)

使用 trim_messages 函数可以在调用模型前对消息历史进行裁剪。

from langchain_core.messages.utils import (trim_messages,count_tokens_approximately
)def call_model(state: MessagesState):messages = trim_messages(state["messages"],strategy="last",token_counter=count_tokens_approximately,max_tokens=128,start_on="human",end_on=("human", "tool"),)response = model.invoke(messages)return {"messages": [response]}

完整示例:裁剪消息(Full example: trim messages)

from langchain_core.messages.utils import (trim_messages,                    # 用于裁剪消息历史count_tokens_approximately        # 用于粗略估算消息的 token 数量
)
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START, MessagesState# 初始化聊天模型(例如 Claude 3.7 Sonnet)
model = init_chat_model("anthropic:claude-3-7-sonnet-latest")
# 绑定摘要模型,限制最多返回 128 个 token
summarization_model = model.bind(max_tokens=128)# 定义调用模型的函数,使用裁剪后的消息
def call_model(state: MessagesState):messages = trim_messages(state["messages"],                # 当前的消息列表strategy="last",                  # 裁剪策略:保留最新的消息token_counter=count_tokens_approximately,  # 用于估算 token 数max_tokens=128,                   # 限制最多 token 数start_on="human",                 # 从人类消息开始裁剪end_on=("human", "tool"),         # 结束条件:人类或工具消息)response = model.invoke(messages)    # 调用模型return {"messages": [response]}      # 返回响应# 设置短期记忆的保存器
checkpointer = InMemorySaver()# 创建图构建器并添加节点
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")    # 设置开始节点到模型调用节点的边# 编译图并绑定记忆保存器
graph = builder.compile(checkpointer=checkpointer)# 配置项:使用 thread_id 表示当前对话线程
config = {"configurable": {"thread_id": "1"}}# 连续调用图,模拟多轮对话
graph.invoke({"messages": "hi, my name is bob"}, config)
graph.invoke({"messages": "write a short poem about cats"}, config)
graph.invoke({"messages": "now do the same but for dogs"}, config)
final_response = graph.invoke({"messages": "what's my name?"}, config)# 打印最终响应
final_response["messages"][-1].pretty_print()

示例输出:

================================== Ai Message ==================================Your name is Bob, as you mentioned when you first introduced yourself.

摘要消息(Summarize messages)

当对话历史过长时,可以通过摘要策略将旧消息压缩为一段摘要信息。

from typing import Any, TypedDict
from langchain_core.messages import AnyMessage
from langchain_core.messages.utils import count_tokens_approximately
from langmem.short_term import SummarizationNode
from langgraph.graph import StateGraph, START, MessagesStateclass State(MessagesState):context: dict[str, Any]class LLMInputState(TypedDict):summarized_messages: list[AnyMessage]context: dict[str, Any]summarization_node = SummarizationNode(token_counter=count_tokens_approximately,model=summarization_model,max_tokens=512,max_tokens_before_summary=256,max_summary_tokens=256,
)def call_model(state: LLMInputState):response = model.invoke(state["summarized_messages"])return {"messages": [response]}builder = StateGraph(State)
builder.add_node(call_model)
builder.add_node("summarize", summarization_node)
builder.add_edge(START, "summarize")
builder.add_edge("summarize", "call_model")

完整示例:摘要消息(Full example: summarize messages)

from typing import Any, TypedDictfrom langchain.chat_models import init_chat_model
from langchain_core.messages import AnyMessage
from langchain_core.messages.utils import count_tokens_approximately
from langgraph.graph import StateGraph, START, MessagesState
from langgraph.checkpoint.memory import InMemorySaver
from langmem.short_term import SummarizationNode# 初始化主模型与摘要模型(最大 token 限制为 128)
model = init_chat_model("anthropic:claude-3-7-sonnet-latest")
summarization_model = model.bind(max_tokens=128)# 定义状态结构,包含消息历史与上下文(用于存放摘要)
class State(MessagesState):context: dict[str, Any]# 定义模型输入状态类型(类型提示)
class LLMInputState(TypedDict):summarized_messages: list[AnyMessage]  # 摘要后的消息列表context: dict[str, Any]                # 摘要内容会存放于 context 中# 构建摘要节点
summarization_node = SummarizationNode(token_counter=count_tokens_approximately,  # token 估算器model=summarization_model,                 # 用于摘要的模型max_tokens=256,                            # 每轮总 token 限制max_tokens_before_summary=256,             # 超过多少 token 就触发摘要max_summary_tokens=128,                    # 摘要长度限制
)# 定义实际调用 LLM 的逻辑(使用摘要后的对话历史)
def call_model(state: LLMInputState):response = model.invoke(state["summarized_messages"])return {"messages": [response]}# 初始化记忆保存器
checkpointer = InMemorySaver()# 构建对话流程图
builder = StateGraph(State)
builder.add_node(call_model)                 # 添加调用模型节点
builder.add_node("summarize", summarization_node)  # 添加摘要节点
builder.add_edge(START, "summarize")         # 从开始跳转到摘要节点
builder.add_edge("summarize", "call_model")  # 从摘要跳转到模型调用
graph = builder.compile(checkpointer=checkpointer)# 配置对话线程 ID
config = {"configurable": {"thread_id": "1"}}# 模拟多轮对话
graph.invoke({"messages": "hi, my name is bob"}, config)
graph.invoke({"messages": "write a short poem about cats"}, config)
graph.invoke({"messages": "now do the same but for dogs"}, config)# 提问并获取最终结果
final_response = graph.invoke({"messages": "what's my name?"}, config)# 打印模型回复
final_response["messages"][-1].pretty_print()# 打印摘要内容
print("\nSummary:", final_response["context"]["running_summary"].summary)

示例输出:

================================== Ai Message ==================================Your name is Bob, as you mentioned earlier.Summary: 
The user introduced themselves as Bob and requested poems about cats and dogs.

删除消息(Delete messages)

使用 RemoveMessage 可以从状态中删除消息。

删除指定消息:

from langchain_core.messages import RemoveMessagedef delete_messages(state):messages = state["messages"]if len(messages) > 2:# 删除最早的两条消息return {"messages": [RemoveMessage(id=m.id) for m in messages[:2]]}

删除全部消息:

from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langchain_core.messages import RemoveMessagedef delete_messages(state):return {"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)]}

要使用 RemoveMessage,状态必须基于带有 add_messages reducer 的键(例如 MessagesState

⚠️ 删除时注意事项:

  • 删除后必须保持消息历史合法。

  • 注意不同 LLM 服务商的限制,比如:

    • 某些模型要求历史必须以用户消息开头。
    • 如果 assistant 调用了工具,必须跟随返回相应的工具结果消息。

完整示例:删除消息(Full example: delete messages)

from langchain_core.messages import RemoveMessage# 删除消息逻辑
def delete_messages(state):messages = state["messages"]if len(messages) > 2:# 删除最早的两条消息return {"messages": [RemoveMessage(id=m.id) for m in messages[:2]]}# 模型调用逻辑
def call_model(state: MessagesState):response = model.invoke(state["messages"])return {"messages": response}# 构建流程图
builder = StateGraph(MessagesState)
builder.add_sequence([call_model, delete_messages])  # 顺序执行:模型调用 → 删除消息
builder.add_edge(START, "call_model")# 使用内存型检查点器
checkpointer = InMemorySaver()
app = builder.compile(checkpointer=checkpointer)# 第一次对话:用户介绍自己
for event in app.stream({"messages": [{"role": "user", "content": "hi! I'm bob"}]},config,stream_mode="values"
):print([(message.type, message.content) for message in event["messages"]])# 第二次对话:询问名字
for event in app.stream({"messages": [{"role": "user", "content": "what's my name?"}]},config,stream_mode="values"
):print([(message.type, message.content) for message in event["messages"]])

输出示例(每一步对话的消息状态):

  1. 用户发送第一条消息:
[('human', "hi! I'm bob")]
  1. 模型回复后,消息记录更新:
[('human', "hi! I'm bob"), ('ai', 'Hi Bob! How are you doing today? Is there anything I can help you with?')]
  1. 用户继续提问:
[('human', "hi! I'm bob"), ('ai', 'Hi Bob! How are you doing today? Is there anything I can help you with?'),('human', "what's my name?")]
  1. 模型继续回复:
[('human', "hi! I'm bob"), ('ai', 'Hi Bob! How are you doing today? Is there anything I can help you with?'),('human', "what's my name?"), ('ai', 'Your name is Bob.')]
  1. 删除最早两条消息后(保留后两条):
[('human', "what's my name?"), ('ai', 'Your name is Bob.')]

人类参与环节(Human-in-the-loop)

LangGraph 支持强大的人类参与工作流(HIL,Human-in-the-loop),允许在自动化流程的任意环节插入人工干预。这在大型语言模型(LLM)驱动的应用中尤为重要,因为模型输出可能需要验证、修正或补充上下文。

关键能力

  • 持久化执行状态
    LangGraph 会在每一步后保存图状态(checkpoint),允许在定义的节点无限期暂停执行,支持异步的人类审查或输入,无时间限制。

  • 灵活的集成点
    可以在工作流中的任意节点引入人类参与逻辑,实现有针对性的人工介入,例如批准 API 调用、修正输出或引导对话。

典型用例

  • 🛠️ 审查工具调用:人类可审查、编辑或批准 LLM 请求的工具调用,确保工具执行前符合要求。
  • 验证 LLM 输出:人类审查、修改或确认 LLM 生成的内容。
  • 💡 提供上下文:允许 LLM 明确请求人工输入以澄清信息、补充细节,或支持多轮对话。

实现机制

  • interrupt 函数
    在特定节点暂停执行,并呈现信息供人类审查。

  • Command 原语
    用于接收人工提供的值,并恢复执行。

添加人类参与环节

interrupt 函数

LangGraph 中的 interrupt 函数支持人类参与工作流(Human-in-the-loop),通过在图的特定节点暂停执行,向人类展示信息,并通过人类输入恢复图的运行。这对于审批、编辑或收集额外上下文等任务非常有用。

图的恢复是通过一个 Command 对象完成的,该对象包含了人类的响应。

from langgraph.types import interrupt, Commanddef human_node(state: State):value = interrupt({"text_to_revise": state["some_text"]})return {"some_text": value}graph = graph_builder.compile(checkpointer=checkpointer)# 运行图,直到遇到 interrupt。
config = {"configurable": {"thread_id": "some_id"}}
result = graph.invoke({"some_text": "original text"}, config=config)
print(result['__interrupt__'])
# > [
# >    Interrupt(
# >       value={'text_to_revise': 'original text'},
# >       resumable=True,
# >       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']
# >    )
# > ]print(graph.invoke(Command(resume="Edited text"), config=config))
# > {'some_text': 'Edited text'}

扩展示例:使用 interrupt

以下是该代码的中文翻译说明:```python
from typing import TypedDict
import uuidfrom langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command# 定义状态类型,包含一个字符串字段 some_text
class State(TypedDict):some_text: str# 定义一个节点函数 human_node,使用 interrupt 暂停执行,等待人类输入
def human_node(state: State):value = interrupt({"text_to_revise": state["some_text"]  # 向人类展示当前文本以供修改})return {"some_text": value  # 返回人类修改后的文本}# 构建状态图
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)  # 添加 human_node 节点
graph_builder.add_edge(START, "human_node")       # 连接起始点到 human_nodecheckpointer = InMemorySaver()  # 创建内存检查点保存器graph = graph_builder.compile(checkpointer=checkpointer)  # 编译图,启用检查点保存# 生成唯一的线程 ID,传入配置
config = {"configurable": {"thread_id": uuid.uuid4()}}# 运行图,直到遇到 interrupt 暂停
result = graph.invoke({"some_text": "original text"}, config=config)print(result['__interrupt__'])  
# 输出中断信息,包含展示给人类的文本和中断节点信息# 传入人类修改的文本继续恢复执行
print(graph.invoke(Command(resume="Edited text"), config=config))
# 输出修改后的状态 {"some_text": "Edited text"}

当图运行时遇到中断,会返回一个特殊键 __interrupt__。在 0.4.0 版本中,invokeainvoke 都支持返回 __interrupt__。如果你使用的是旧版本,只有通过 streamastream 才会在结果中看到 __interrupt__。你也可以通过 graph.get_state(thread_id) 获取中断的值。

注意事项

中断机制既强大又便捷,但请注意:

  • 它们在开发体验上类似 Python 的 input() 函数,但不会自动从中断点继续执行
  • 实际上,恢复执行时会重新运行包含 interrupt 的整个节点。
  • 因此,通常建议将中断放在节点开始处或单独节点中。

要在你的图中使用 interrupt,需要:

  • 指定一个检查点保存器(checkpointer),用于在每一步之后保存图的状态。
  • 在合适的位置调用 interrupt()。
  • 使用线程 ID 运行图,直到触发 interrupt。
  • 使用 invoke/ainvoke/stream/astream 等方法恢复执行。

设计范式

在人机交互(human-in-the-loop)工作流中,通常可以执行以下三种不同的操作:

  • 批准或拒绝(Approve or Reject):在关键步骤(例如 API 调用)之前暂停图,供人工审核和批准。如果操作被拒绝,可以阻止图执行该步骤,并可能执行替代操作。此模式通常涉及根据人工输入来路由图的执行路径。
  • 编辑图状态(Edit Graph State):暂停图以审查和编辑图的状态。适用于修正错误或用额外信息更新状态。此模式通常涉及使用人工输入来更新状态。
  • 获取输入(Get Input):在图的特定步骤明确请求人工输入。适合收集额外的信息或上下文,以辅助代理的决策过程。

下面展示了可以使用这些操作实现的不同设计范式。

批准或拒绝

在这里插入图片描述
根据人工的批准或拒绝,图可以继续执行该操作,或采取替代路径。

在关键步骤(例如 API 调用)之前暂停图,供人工审查并批准该操作。如果操作被拒绝,可以阻止图执行该步骤,并可能执行替代操作。

from typing import Literal
from langgraph.types import interrupt, Commanddef human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:is_approved = interrupt({"question": "Is this correct?",# 展示应由人工审查和批准的输出内容"llm_output": state["llm_output"]})if is_approved:return Command(goto="some_node")else:return Command(goto="another_node")# 在图中合适的位置添加该节点,并连接到相关节点。
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)# 运行图并触发中断后,图会暂停。
# 可以通过批准或拒绝来恢复执行。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)

扩展示例:通过中断实现批准或拒绝

from typing import Literal, TypedDict
import uuidfrom langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver# 定义共享的图状态
class State(TypedDict):llm_output: strdecision: str# 模拟生成 LLM 输出的节点
def generate_llm_output(state: State) -> State:return {"llm_output": "这是生成的输出内容。"}# 人工批准节点
def human_approval(state: State) -> Command[Literal["approved_path", "rejected_path"]]:decision = interrupt({"question": "你是否批准以下输出?","llm_output": state["llm_output"]})if decision == "approve":return Command(goto="approved_path", update={"decision": "approved"})else:return Command(goto="rejected_path", update={"decision": "rejected"})# 批准后的后续步骤
def approved_node(state: State) -> State:print("✅ 已通过批准路径。")return state# 拒绝后的替代路径
def rejected_node(state: State) -> State:print("❌ 已进入拒绝路径。")return state# 构建图
builder = StateGraph(State)
builder.add_node("generate_llm_output", generate_llm_output)
builder.add_node("human_approval", human_approval)
builder.add_node("approved_path", approved_node)
builder.add_node("rejected_path", rejected_node)builder.set_entry_point("generate_llm_output")
builder.add_edge("generate_llm_output", "human_approval")
builder.add_edge("approved_path", END)
builder.add_edge("rejected_path", END)checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)# 运行直到中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"])
# 输出示例:
# Interrupt(value={'question': '你是否批准以下输出?', 'llm_output': '这是生成的输出内容。'}, ...)# 模拟通过人工输入恢复执行
# 若要测试拒绝,将 resume="approve" 改为 resume="reject"
final_result = graph.invoke(Command(resume="approve"), config=config)
print(final_result)
审核并编辑状态

在这里插入图片描述

人类可以审核并编辑图的状态。这对于纠正错误或用附加信息更新状态非常有用。

from langgraph.types import interruptdef human_editing(state: State):...result = interrupt(# 提供给客户端的中断信息。# 可以是任何 JSON 可序列化的值。{"task": "请审核 LLM 生成的输出并做必要的编辑。","llm_generated_summary": state["llm_generated_summary"]})# 用编辑后的文本更新状态return {"llm_generated_summary": result["edited_text"]}# 在合适的位置将该节点添加到图中
# 并连接到相关节点。
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)...# 运行图直到中断,图将暂停。
# 使用编辑后的文本恢复执行。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume={"edited_text": "编辑后的文本"}),config=thread_config
)

扩展示例:使用 interrupt 编辑状态

from typing import TypedDict
import uuidfrom langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver# 定义图的状态
class State(TypedDict):summary: str# 模拟 LLM 生成摘要
def generate_summary(state: State) -> State:return {"summary": "The cat sat on the mat and looked at the stars."}# 人工审核编辑节点
def human_review_edit(state: State) -> State:result = interrupt({"task": "请审核并在必要时编辑生成的摘要。","generated_summary": state["summary"]})return {"summary": result["edited_summary"]}# 模拟对编辑后摘要的后续使用
def downstream_use(state: State) -> State:print(f"✅ 使用编辑后的摘要: {state['summary']}")return state# 构建图
builder = StateGraph(State)
builder.add_node("generate_summary", generate_summary)
builder.add_node("human_review_edit", human_review_edit)
builder.add_node("downstream_use", downstream_use)builder.set_entry_point("generate_summary")
builder.add_edge("generate_summary", "human_review_edit")
builder.add_edge("human_review_edit", "downstream_use")
builder.add_edge("downstream_use", END)# 设置内存检查点支持 interrupt
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)# 调用图直到中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)# 输出中断内容
print(result["__interrupt__"])
# 示例输出:
# Interrupt(
#   value={
#     'task': '请审核并在必要时编辑生成的摘要。',
#     'generated_summary': 'The cat sat on the mat and looked at the stars.'
#   },
#   resumable=True,
#   ...
# )# 使用人工编辑后的输入恢复图执行
edited_summary = "The cat lay on the rug, gazing peacefully at the night sky."
resumed_result = graph.invoke(Command(resume={"edited_summary": edited_summary}),config=config
)
print(resumed_result)
工具调用审核

在这里插入图片描述

人类可以在继续执行之前审核并编辑来自大语言模型(LLM)的输出。这在 LLM 请求的工具调用可能敏感或需要人工监督的应用中特别重要。

def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:# 这是我们通过 Command(resume=<human_review>) 提供的值human_review = interrupt({"question": "这是正确的吗?",# 显示需审核的工具调用内容"tool_call": tool_call})review_action, review_data = human_review# 批准工具调用并继续执行if review_action == "continue":return Command(goto="run_tool")# 手动修改工具调用后继续elif review_action == "update":...updated_msg = get_updated_msg(review_data)# 注意:修改现有消息时,需要传递具有匹配 ID 的消息return Command(goto="run_tool", update={"messages": [updated_msg]})# 给出自然语言反馈,然后传回给智能体elif review_action == "feedback":...feedback_msg = get_feedback_msg(review_data)return Command(goto="call_llm", update={"messages": [feedback_msg]})
验证人工输入

如果你需要在图的节点内部(而非客户端)验证人类提供的输入,可以通过在同一个节点内多次调用 interrupt 来实现。

from langgraph.types import interruptdef human_node(state: State):"""带验证的人类节点。"""question = "你几岁了?"while True:answer = interrupt(question)# 验证答案,如果答案无效则重新询问。if not isinstance(answer, int) or answer < 0:question = f"'{answer}' 不是有效的年龄。你几岁了?"answer = Nonecontinueelse:# 如果答案有效,则继续执行。breakprint(f"环中人类的年龄是 {answer} 岁。")return {"age": answer}

扩展示例:验证用户输入

from typing import TypedDict
import uuidfrom langgraph.constants import START, END
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver# 定义图的状态
class State(TypedDict):age: int# 询问人类输入并验证的节点
def get_valid_age(state: State) -> State:prompt = "请输入你的年龄(必须是非负整数)。"while True:user_input = interrupt(prompt)# 验证输入try:age = int(user_input)if age < 0:raise ValueError("年龄必须是非负数。")break  # 有效输入,退出循环except (ValueError, TypeError):prompt = f"'{user_input}' 无效。请输入一个非负整数作为年龄。"return {"age": age}# 使用有效输入的节点
def report_age(state: State) -> State:print(f"✅ 人类年龄为 {state['age']} 岁。")return state# 构建图
builder = StateGraph(State)
builder.add_node("get_valid_age", get_valid_age)
builder.add_node("report_age", report_age)builder.set_entry_point("get_valid_age")
builder.add_edge("get_valid_age", "report_age")
builder.add_edge("report_age", END)# 创建带内存检查点的图
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)# 运行图直到首次中断
config = {"configurable": {"thread_id": uuid.uuid4()}}
result = graph.invoke({}, config=config)
print(result["__interrupt__"])  # 首次提示:"请输入你的年龄..."# 模拟无效输入(例如输入字符串而非数字)
result = graph.invoke(Command(resume="not a number"), config=config)
print(result["__interrupt__"])  # 带验证提示的后续提示# 模拟第二次无效输入(例如负数)
result = graph.invoke(Command(resume="-10"), config=config)
print(result["__interrupt__"])  # 继续重试# 提供有效输入
final_result = graph.invoke(Command(resume="25"), config=config)
print(final_result)  # 应包含有效年龄

使用 Command 原语继续执行

当图中调用 interrupt 函数时,执行会在该点暂停,等待用户输入。

要恢复执行,可以使用 Command 原语,通过 invoke、ainvoke、stream 或 astream 方法传入。

提供对中断的响应:要继续执行,传入用户输入,形式为 Command(resume=value)。图从最初调用 interrupt(…) 的节点起始处恢复执行。这次 interrupt 函数将返回 Command(resume=value) 中提供的值,而不会再次暂停。

# 通过提供用户输入恢复图执行
graph.invoke(Command(resume={"age": "25"}), thread_config)

中断恢复是如何工作的?

⚠️ 警告:中断恢复与 Python 的 input() 函数不同,input() 恢复时会从调用处继续执行。使用 interrupt 时,关键点是:恢复执行时,图会从最后一次触发中断的节点开始重新执行。该节点从开始到 interrupt 的代码都会被重新执行。

使用 interrupt 的一个关键点是理解恢复执行的方式。恢复执行时,图的执行会从最后一次触发中断的节点的开头开始。

该节点从开始到 interrupt 调用处的所有代码都会被重新执行。

counter = 0
def node(state: State):# 当图恢复执行时,# 节点开始到 interrupt 处的代码会被重新执行。global countercounter += 1print(f"> Entered the node: {counter} # of times")# 暂停图执行,等待用户输入。answer = interrupt()print("The value of counter is:", counter)...

图恢复后,counter 会被第二次加一,输出示例:

> Entered the node: 2 # of times
The value of counter is: 2

如果任务队列中存在多个中断,可以通过传入一个字典,键为中断 id,值为对应恢复值,使用 Command.resume 一次调用 invoke / stream 来恢复多个中断。

例如,图被中断(理论上多次)并暂停后:

resume_map = {i.interrupt_id: f"human input for prompt {i.value}"for i in parent.get_state(thread_config).interrupts
}parent_graph.invoke(Command(resume=resume_map), config=thread_config)

常见误区

副作用

含有副作用的代码(比如 API 调用)应放在 interrupt 之后,以避免重复执行,因为每次节点恢复时都会重新执行这些代码。

示例:

以下代码中,API 调用会在节点从 interrupt 恢复时被重复执行。如果该 API 调用不是幂等的或执行成本高,就会引发问题。

from langgraph.types import interruptdef human_node(state: State):"""带校验的人类节点。"""api_call(...)  # 节点恢复时,这段代码会被重新执行answer = interrupt(question)
作为函数调用的子图

当以函数形式调用子图时,父图会从调用子图的节点开头开始恢复执行(该节点也触发了中断)。子图同样会从调用 interrupt() 的节点开头恢复。

示例:

def node_in_parent_graph(state: State):some_code()  # <-- 子图恢复时,这里会被重新执行# 以函数形式调用子图,子图中包含 interrupt 调用subgraph_result = subgraph.invoke(some_input)...

假设有一个父图包含 3 个节点:

父图:node_1 → node_2(调用子图)→ node_3

子图也有 3 个节点,其中第二个节点包含一个 interrupt:

子图:sub_node_1 → sub_node_2(interrupt)→ sub_node_3

恢复执行时,流程如下:

  • 跳过父图中的 node_1(已执行过,图状态已保存快照)
  • 从头开始重新执行父图中的 node_2
  • 跳过子图中的 sub_node_1(已执行过,状态已保存)
  • 从头开始重新执行子图中的 sub_node_2
  • 继续执行 sub_node_3 及之后的节点

下面是简化示例代码,演示子图与 interrupt 的工作方式。代码会统计每个节点被进入的次数并打印。

import uuid
from typing import TypedDictfrom langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaverclass State(TypedDict):"""图状态"""state_counter: intcounter_node_in_subgraph = 0
def node_in_subgraph(state: State):global counter_node_in_subgraphcounter_node_in_subgraph += 1  # 这段代码不会被重复执行print(f"Entered `node_in_subgraph` a total of {counter_node_in_subgraph} times")counter_human_node = 0
def human_node(state: State):global counter_human_nodecounter_human_node += 1  # 这段代码会被重复执行print(f"Entered human_node in sub-graph a total of {counter_human_node} times")answer = interrupt("what is your name?")print(f"Got an answer of {answer}")checkpointer = MemorySaver()subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)counter_parent_node = 0
def parent_node(state: State):"""父图中的节点,会调用子图"""global counter_parent_nodecounter_parent_node += 1  # 恢复时会再次执行print(f"Entered `parent_node` a total of {counter_parent_node} times")# 注意这里故意修改了图状态中的计数器,# 用来演示子图和父图状态更新不会冲突subgraph_state = subgraph.invoke(state)return subgraph_statebuilder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")# 必须启用检查点才能支持中断
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": uuid.uuid4(),}
}for chunk in graph.stream({"state_counter": 1}, config):print(chunk)print('--- Resuming ---')for chunk in graph.stream(Command(resume="35"), config):print(chunk)

输出结果示例:

Entered `parent_node` a total of 1 times
Entered `node_in_subgraph` a total of 1 times
Entered human_node in sub-graph a total of 1 times
{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['parent_node:4c3a0248-21f0-1287-eacf-3002bc304db4', 'human_node:2fe86d52-6f70-2a3f-6b2f-b1eededd6348'], when='during'),)}
--- Resuming ---
Entered `parent_node` a total of 2 times
Entered human_node in sub-graph a total of 2 times
Got an answer of 35
{'parent_node': {'state_counter': 1}}
使用多个 interrupt

在单个节点内使用多个 interrupt 有助于实现诸如人类输入校验的模式。但如果处理不当,可能导致不可预期的行为。

节点内有多个 interrupt 调用时,LangGraph 会为执行该节点的任务维护一个恢复值列表。每次恢复执行,都会从节点开头开始。对于遇到的每个 interrupt,LangGraph 会根据顺序(严格基于索引)查找对应的恢复值。中断调用的顺序非常关键。

为避免问题,应避免在不同执行间动态更改节点结构,包括添加、删除或重新排序 interrupt 调用,否则会导致索引错位。此类问题常因非常规模式引起,如通过 Command(resume=..., update=某状态变更) 变异状态,或依赖全局变量动态修改节点结构。

扩展示例:引入非确定性的错误代码

import uuid
from typing import TypedDict, Optionalfrom langgraph.graph import StateGraph
from langgraph.constants import START 
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaverclass State(TypedDict):"""图状态"""age: Optional[str]name: Optional[str]def human_node(state: State):if not state.get('name'):name = interrupt("what is your name?")else:name = "N/A"if not state.get('age'):age = interrupt("what is your age?")else:age = "N/A"print(f"Name: {name}. Age: {age}")return {"age": age,"name": name,}builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")# 必须启用检查点才能支持中断!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": uuid.uuid4(),}
}for chunk in graph.stream({"age": None, "name": None}, config):print(chunk)for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):print(chunk)

输出示例:

{'__interrupt__': (Interrupt(value='what is your name?', resumable=True, ns=['human_node:3a007ef9-c30d-c357-1ec1-86a1a70d8fba'], when='during'),)}
Name: N/A. Age: John
{'human_node': {'age': 'John', 'name': 'N/A'}}

这段代码引入了非确定性,因为节点中有多个 interrupt,且中断点的顺序和条件判断基于状态变化。由于恢复时从节点开头重新执行,interrupt 调用的顺序可能被打乱,导致输出与预期不符(例如姓名变成了“N/A”,年龄变成了“John”)。此外,通过 Command(resume=..., update=...) 动态修改状态,进一步加剧了这种不确定性。

断点(Breakpoints)

断点会在预设的位置暂停图的执行,让你可以逐步检查每个阶段。它们使用 LangGraph 的持久化层,在每一步之后保存图的状态。

通过断点,你可以在任意时刻查看图的状态和节点输入。执行会无限期暂停,直到你恢复,因为检查点会保留当前状态。

在这里插入图片描述

这是一个包含3个顺序步骤的示例图,断点设置在 step_3 之前。

要使用断点,你需要:

  • 指定一个检查点保存器(checkpointer),用于在每一步后保存图的状态。
  • 设置断点,指定执行应暂停的位置。
  • 以线程 ID 运行图,在断点处暂停执行。
  • 使用 invoke/ainvoke/stream/astream 恢复执行,传入 None 作为输入参数。

断点可以设置在两个位置:

  1. 在节点执行前或执行后,通过编译时或运行时设置断点,这称为静态断点(static breakpoints)。
  2. 在节点内部根据条件抛出 NodeInterrupt 异常,这称为动态断点(dynamic breakpoints)。

静态断点

静态断点会在节点执行前或执行后触发。你可以在编译阶段或运行阶段通过指定 interrupt_beforeinterrupt_after 来设置静态断点。

静态断点特别适合调试,如果你想一步步查看图的执行过程,或想在特定节点暂停图的执行。

编译时设置示例:

graph = graph_builder.compile( interrupt_before=["node_a"], interrupt_after=["node_b", "node_c"], checkpointer=checkpointer, 
)config = {"configurable": {"thread_id": "some_thread"}
}# Run the graph until the breakpoint
graph.invoke(inputs, config=thread_config) # Resume the graph
graph.invoke(None, config=thread_config)

设置静态断点

from IPython.display import Image, display
from typing_extensions import TypedDictfrom langgraph.checkpoint.memory import InMemorySaver 
from langgraph.graph import StateGraph, START, ENDclass State(TypedDict):input: strdef step_1(state):print("---步骤 1---")passdef step_2(state):print("---步骤 2---")passdef step_3(state):print("---步骤 3---")passbuilder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)# 设置检查点保存器
checkpointer = InMemorySaver()  # (1)!graph = builder.compile(checkpointer=checkpointer,      # (2)!interrupt_before=["step_3"]     # (3)!
)# 显示图示
display(Image(graph.get_graph().draw_mermaid_png()))# 输入
initial_input = {"input": "hello world"}# 线程配置
thread = {"configurable": {"thread_id": "1"}}# 运行图直到第一个断点
for event in graph.stream(initial_input, thread, stream_mode="values"):print(event)# 此时执行会暂停在断点处
# 可以获取此时图的状态
print(graph.get_state(thread))# 通过传入 None 作为输入继续执行图
for event in graph.stream(None, thread, stream_mode="values"):print(event)

运行时配置示例:

graph.invoke( inputs, interrupt_before=["node_a"], interrupt_after=["node_b", "node_c"] config={"configurable": {"thread_id": "some_thread"}}, 
)config = {"configurable": {"thread_id": "some_thread"}
}# Run the graph until the breakpoint
graph.invoke(inputs, config=config) # Resume the graph
graph.invoke(None, config=config)

注意:你不能在运行时为子图设置静态断点。如果有子图,必须在编译时设置断点。

动态断点

如果需要根据条件在节点内部中断图的执行,可以使用动态断点,方法是抛出 NodeInterrupt 异常。

示例:

from langgraph.errors import NodeInterruptdef step_2(state: State) -> State:if len(state["input"]) > 5:raise NodeInterrupt( f"Received input that is longer than 5 characters: {state['foo']}")return state

使用动态断点

from typing_extensions import TypedDict
from IPython.display import Image, displayfrom langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterruptclass State(TypedDict):input: strdef step_1(state: State) -> State:print("---Step 1---")return statedef step_2(state: State) -> State:# 如果输入长度超过5个字符,则主动抛出 NodeInterruptif len(state["input"]) > 5:raise NodeInterrupt(f"Received input that is longer than 5 characters: {state['input']}")print("---Step 2---")return statedef step_3(state: State) -> State:print("---Step 3---")return statebuilder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)# 设置内存检查点
memory = MemorySaver()# 编译图,带检查点支持
graph = builder.compile(checkpointer=memory)# 展示图形
display(Image(graph.get_graph().draw_mermaid_png()))

在这里插入图片描述

首先,用一个长度小于等于5的输入运行图,这时不会触发断点,图执行结束时返回原始输入。

initial_input = {"input": "hello"}
thread_config = {"configurable": {"thread_id": "1"}}for event in graph.stream(initial_input, thread_config, stream_mode="values"):print(event)

输出:

{'input': 'hello'}
---Step 1---
{'input': 'hello'}
---Step 2---
{'input': 'hello'}
---Step 3---
{'input': 'hello'}

如果我们在此时检查图的状态,可以看到没有剩余任务需要执行,说明图已完成执行。

state = graph.get_state(thread_config)
print(state.next)   # ()
print(state.tasks)  # ()

然后,用长度超过5的输入运行图,这会触发 step_2 节点中定义的动态断点,抛出 NodeInterrupt

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "2"}}for event in graph.stream(initial_input, thread_config, stream_mode="values"):print(event)

输出:

{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}
{'__interrupt__': (Interrupt(value='Received input that is longer than 5 characters: hello world', resumable=False, ns=None),)}

我们可以看到图在执行 step_2 时停止了。如果我们此时检查图的状态,可以看到接下来将要执行的节点是 step_2,触发中断的节点也是 step_2,同时还可以查看关于该中断的更多信息。

state = graph.get_state(thread_config)
print(state.next)   # ('step_2',)
print(state.tasks)  # 包含触发中断的任务信息
('step_2',)
(PregelTask(id='bfc767e3-a6c4-c5af-dbbf-0d20ea64501e', name='step_2', path=('__pregel_pull', 'step_2'), error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', resumable=False, ns=None),), state=None, result=None),)

如果我们尝试从断点恢复图的执行,由于输入和图的状态没有发生变化,图会再次触发中断。

for event in graph.stream(None, thread_config, stream_mode="values"):print(event)

输出:

{'input': 'hello world'}
{'__interrupt__': (Interrupt(value='Received input that is longer than 5 characters: hello world', resumable=False, ns=None),)}

查看状态依旧:

state = graph.get_state(thread_config)
print(state.next)   # ('step_2',)
print(state.tasks)  # 断点任务信息

总结:

  • 动态断点通过在节点内部条件判断抛出 NodeInterrupt 实现。
  • 图在触发断点时暂停,等待用户操作恢复。
  • 恢复执行时若输入不变,断点会重复触发。

与子图一起使用

给子图添加断点,可以:

  • 在编译子图时指定静态断点。
  • 在子图节点内部使用动态断点。

给子图添加断点

from typing_extensions import TypedDictfrom langgraph.graph import START, StateGraph
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interruptclass State(TypedDict):foo: strdef subgraph_node_1(state: State):return {"foo": state["foo"]}# 构建子图
subgraph_builder = StateGraph(State)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_edge(START, "subgraph_node_1")# 编译子图时在 subgraph_node_1 前设置断点
subgraph = subgraph_builder.compile(interrupt_before=["subgraph_node_1"])# 主图中直接将子图作为节点添加
builder = StateGraph(State)
builder.add_node("node_1", subgraph)
builder.add_edge(START, "node_1")# 设置内存检查点
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}}# 运行图,传入初始状态
graph.invoke({"foo": ""}, config)# 获取图状态,包括子图状态
print(graph.get_state(config, subgraphs=True).tasks[0].state)# 恢复子图执行
graph.invoke(None, config)

输出示例(状态快照):

StateSnapshot(values={'foo': ''}, next=('subgraph_node_1',), config={'configurable': {'thread_id': '1','checkpoint_ns': 'node_1:dfc321bb-7c91-ccfe-23b8-c2374ae3f1cc','checkpoint_id': '1f02a8d1-985a-6e2c-8000-77034088c0ce','checkpoint_map': {'': '1f02a8d1-9856-6264-8000-ed1534455427','node_1:dfc321bb-7c91-ccfe-23b8-c2374ae3f1cc': '1f02a8d1-985a-6e2c-8000-77034088c0ce'}}},metadata={ ... },created_at='2025-05-06T15:16:35.543192+00:00',parent_config={ ... },tasks=(PregelTask(id='33218e09-8747-5161-12b1-5dc705d30b51',name='subgraph_node_1',path=('__pregel_pull', 'subgraph_node_1'),error=None,interrupts=(),state=None,result=None),),interrupts=()
){'foo': ''}

时间旅行

在处理基于模型决策的非确定性系统(例如由大型语言模型驱动的代理)时,详细检查其决策过程可能非常有帮助:

  • 🤔 理解推理过程:分析导致成功结果的各个步骤。
  • 🐞 调试错误:识别错误发生的位置和原因。
  • 🔍 探索替代路径:测试不同路径以发现更优解。

LangGraph 提供了“时间旅行”功能来支持这些场景。具体来说,你可以从先前的检查点恢复执行 —— 可以选择重放相同的状态,或者修改它以探索其他可能性。

在所有情况下,从过去的某个点恢复执行都会在历史中生成一个新的分支(fork)。

在 LangGraph 中使用时光旅行的方法如下:

  1. 使用 invokestream API 运行图流程,传入初始输入。
  2. 识别一个已有线程中的检查点: 使用 get_state_history() 方法获取特定 thread_id 的执行历史,并定位所需的 checkpoint_id
  3. (可选)设置断点: 在希望中断执行的位置节点之前设置断点。然后可以找到最近一次记录的检查点。
  4. (可选)修改图状态: 使用 update_state 方法修改图在某个检查点的状态,并从这个修改后的状态恢复执行。
  5. 从检查点恢复执行: 使用 invokestream API,输入设置为 None,并传入包含适当 thread_idcheckpoint_id 的配置。

示例

本示例构建了一个简单的 LangGraph 工作流,使用 LLM 生成一个笑话主题并写一个笑话。它演示了如何运行图流程、检索历史检查点、(可选)修改状态,并从某个检查点恢复执行以探索不同结果。

首先需要安装所需依赖:

pip install --quiet -U langgraph langchain_anthropic

然后设置 Anthropic 的 API Key(我们使用它的 LLM):

import getpass
import osdef _set_env(var: str):if not os.environ.get(var):os.environ[var] = getpass.getpass(f"{var}: ")_set_env("ANTHROPIC_API_KEY")

构建图工作流:

import uuid
from typing_extensions import TypedDict, NotRequired
from langgraph.graph import StateGraph, START, END
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import InMemorySaverclass State(TypedDict):topic: NotRequired[str]joke: NotRequired[str]llm = init_chat_model("anthropic:claude-3-7-sonnet-latest",temperature=0,
)def generate_topic(state: State):"""LLM 生成笑话主题"""msg = llm.invoke("Give me a funny topic for a joke")return {"topic": msg.content}def write_joke(state: State):"""LLM 基于主题写笑话"""msg = llm.invoke(f"Write a short joke about {state['topic']}")return {"joke": msg.content}# 构建工作流
workflow = StateGraph(State)# 添加节点
workflow.add_node("generate_topic", generate_topic)
workflow.add_node("write_joke", write_joke)# 添加边连接节点
workflow.add_edge(START, "generate_topic")
workflow.add_edge("generate_topic", "write_joke")
workflow.add_edge("write_joke", END)# 编译图
checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

在这里插入图片描述

运行图流程

config = {"configurable": {"thread_id": uuid.uuid4(),}
}
state = graph.invoke({}, config)print(state["topic"])
print()
print(state["joke"])

输出示例:

The Secret Life of Socks in the DryerI finally discovered where all my missing socks go after the dryer. Turns out they're not missing at all—they've just eloped with someone else's socks from the laundromat to start new lives together.My blue argyle is now living in Bermuda with a red polka dot, posting vacation photos on Sockstagram and sending me lint as alimony.

获取检查点:

# 状态按时间倒序返回
states = list(graph.get_state_history(config))for state in states:print(state.next)print(state.config["configurable"]["checkpoint_id"])print()

示例输出:

()
1f02ac4a-ec9f-6524-8002-8f7b0bbeed0e('write_joke',)
1f02ac4a-ce2a-6494-8001-cb2e2d651227('generate_topic',)
1f02ac4a-a4e0-630d-8000-b73c254ba748('__start__',)
1f02ac4a-a4dd-665e-bfff-e6c8c44315d9

选择倒数第二个状态(写笑话前的状态):

selected_state = states[1]
print(selected_state.next)
print(selected_state.values)

输出:

('write_joke',)
{'topic': 'How about "The Secret Life of Socks in the Dryer"? ...'}

修改状态(可选):

# update_state 会创建一个新的检查点
new_config = graph.update_state(selected_state.config, values={"topic": "chickens"})
print(new_config)

输出示例:

{'configurable': {'thread_id': 'c62e2e03-c27b-4cb6-8cea-ea9bfedae006','checkpoint_ns': '','checkpoint_id': '1f02ac4a-ecee-600b-8002-a1d21df32e4c'}
}

从检查点恢复执行:

graph.invoke(None, new_config)

输出示例:

{'topic': 'chickens','joke': 'Why did the chicken join a band?\n\nBecause it had excellent drumsticks'
}

工具(Tools)

许多 AI 应用程序会直接与人类互动。在这种情况下,模型以自然语言进行响应是合适的。但如果我们希望模型也能直接与系统交互,比如数据库或 API 呢?这些系统通常有特定的输入结构;例如,API 通常要求特定格式的请求负载。在这种情况下,你可以使用 工具调用(tool calling) 来让模型生成符合特定结构的响应。

工具是一种将函数及其输入结构封装起来的方法,可以传递给支持工具调用的聊天模型。这样,模型就可以根据需要请求执行某个函数并传入指定参数。

你可以传递工具给支持工具调用的聊天模型,模型会在合适的时候请求执行某个函数,并提供必要的输入。

你既可以创建自定义工具,也可以使用预构建的工具。

在这里插入图片描述

工具调用的关键原则是:模型根据输入内容是否相关来决定是否调用工具。模型并不总是需要调用工具。例如,对于一个与工具无关的输入,模型不会调用工具:

result = llm_with_tools.invoke("Hello world!")

此时,result 是一个 AIMessage,其中包含模型自然语言的响应(如 "Hello!")。但如果输入与工具相关,模型就会选择调用它:

result = llm_with_tools.invoke("What is 2 multiplied by 3?")

如前所述,输出仍然是一个 AIMessage,但如果工具被调用了,result 将包含 tool_calls 属性。该属性包含执行工具所需的一切信息,例如工具名和输入参数:

result.tool_calls
# {'name': 'multiply', 'args': {'a': 2, 'b': 3}, 'id': 'xxx', 'type': 'tool_call'}

执行工具(Execute tools)

LangGraph 提供了预构建的组件(如 ToolNodecreate_react_agent),可以代表用户调用工具执行。

预构建工具(Prebuilt tools)

LangChain 支持多种预构建工具集成,支持与 API、数据库、文件系统、网络数据等进行交互。这些工具扩展了代理(agent)的功能,使开发更加快捷。

你可以在 LangChain 集成目录 中浏览所有可用的工具集成。

一些常见的工具类别包括:

  • 搜索:Bing、SerpAPI、Tavily
  • 代码解释器:Python REPL、Node.js REPL
  • 数据库:SQL、MongoDB、Redis
  • 网页数据:网页抓取和浏览
  • API:OpenWeatherMap、NewsAPI 等

这些集成工具可以使用与上述示例中相同的 tools 参数配置并添加到代理中。

使用工具(Use tools)

工具是一种将函数及其输入结构(schema)封装起来的方法,便于传递给支持工具调用的聊天模型。这使得模型能够请求以指定参数执行这些函数。本文档将展示如何创建工具并在 LangGraph 中使用它们。

创建工具(Create tools)

你可以使用 @tool 装饰器或原始 Python 函数来创建工具:

from langchain_core.tools import tool@tool
def multiply(a: int, b: int) -> int:"""将两个数字相乘。"""return a * b

普通函数:

def multiply(a: int, b: int) -> int:"""Multiply two numbers."""return a * b

这需要使用 LangGraph 提供的预构建组件 ToolNode 或 agent,它们会自动将函数转换为 LangChain 工具。

自定义工具(Customize tools):如果你希望对工具行为进行更多控制,可以使用带参数的 @tool 装饰器:

from langchain_core.tools import tool@tool("multiply_tool", parse_docstring=True)
def multiply(a: int, b: int) -> int:"""将两个数字相乘。参数:a: 第一个操作数b: 第二个操作数"""return a * b

你还可以使用 Pydantic 定义自定义输入结构:

from pydantic import BaseModel, Fieldclass MultiplyInputSchema(BaseModel):"""将两个数字相乘"""a: int = Field(description="第一个操作数")b: int = Field(description="第二个操作数")@tool("multiply_tool", args_schema=MultiplyInputSchema)
def multiply(a: int, b: int) -> int:return a * b

有关更多定制方法,请参阅自定义工具指南。

对模型隐藏参数(Hide arguments from the model)

有些工具在运行时需要使用特定参数(如用户 ID 或会话上下文),这些参数不应该由模型控制。

你可以将这些参数放入 agent 的 stateconfig 中,并在工具内部访问这些信息:

from langchain_core.tools import tool
from langchain_core.runnables import RunnableConfig
from langgraph.prebuilt import InjectedState
from langgraph.graph import MessagesState@tool
def my_tool(# 由 LLM 提供tool_arg: str,# 获取 agent 中动态更新的信息state: Annotated[MessagesState, InjectedState],# 获取在调用 agent 时传入的静态配置config: RunnableConfig,
) -> str:"""我的工具。"""do_something_with_state(state["messages"])do_something_with_config(config)...

访问配置(Access config)

你可以在运行时向图(graph)提供静态信息,比如 user_id 或 API 凭证。这些信息可以通过特殊的参数注解 RunnableConfig 在工具内部访问:

from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool@tool
def get_user_info(config: RunnableConfig,
) -> str:"""查询用户信息。"""user_id = config["configurable"].get("user_id")return "User is John Smith" if user_id == "user_123" else "Unknown user"

在工具中访问 config:

from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agentdef get_user_info(config: RunnableConfig,
) -> str:"""查询用户信息。"""user_id = config["configurable"].get("user_id")return "User is John Smith" if user_id == "user_123" else "Unknown user"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],
)agent.invoke({"messages": [{"role": "user", "content": "look up user information"}]},config={"configurable": {"user_id": "user_123"}}
)

短期记忆(Short-term memory)

LangGraph 允许代理(agent)在工具内部访问并更新其短期记忆(state)。

读取状态(Read state)

要在工具中访问图状态(graph state),可以使用一个特殊的参数注解 —— InjectedState

from typing import Annotated
from langchain_core.tools import tool
from langgraph.prebuilt import InjectedStateclass CustomState(AgentState):user_id: str@tool
def get_user_info(state: Annotated[CustomState, InjectedState]
) -> str:"""查询用户信息。"""user_id = state["user_id"]return "User is John Smith" if user_id == "user_123" else "Unknown user"

在工具中访问状态:

from typing import Annotated
from langchain_core.tools import tool
from langgraph.prebuilt import InjectedState, create_react_agentclass CustomState(AgentState):user_id: str@tool
def get_user_info(state: Annotated[CustomState, InjectedState]
) -> str:"""查询用户信息。"""user_id = state["user_id"]return "User is John Smith" if user_id == "user_123" else "Unknown user"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],state_schema=CustomState,
)agent.invoke({"messages": "look up user information","user_id": "user_123"
})
更新状态(Update state)

你可以从工具中直接返回状态更新。这对于持久化中间结果,或在后续工具或提示中使用这些信息非常有用。

from langgraph.graph import MessagesState
from langgraph.types import Command
from langchain_core.tools import tool, InjectedToolCallId
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableConfigclass CustomState(MessagesState):user_name: str@tool
def update_user_info(tool_call_id: Annotated[str, InjectedToolCallId],config: RunnableConfig
) -> Command:"""查询并更新用户信息。"""user_id = config["configurable"].get("user_id")name = "John Smith" if user_id == "user_123" else "Unknown user"return Command(update={"user_name": name,"messages": [ToolMessage("Successfully looked up user information",tool_call_id=tool_call_id)]})

使用支持状态更新的工具,这是一个使用内置代理(agent)和支持更新图状态的工具的完整示例:

from typing import Annotated
from langchain_core.tools import tool, InjectedToolCallId
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import ToolMessage
from langgraph.prebuilt import InjectedState, create_react_agent
from langgraph.prebuilt.chat_agent_executor import AgentState
from langgraph.types import Commandclass CustomState(AgentState):user_name: str@tool
def update_user_info(tool_call_id: Annotated[str, InjectedToolCallId],config: RunnableConfig
) -> Command:"""查询并更新用户信息。"""user_id = config["configurable"].get("user_id")name = "John Smith" if user_id == "user_123" else "Unknown user"return Command(update={"user_name": name,"messages": [ToolMessage("Successfully looked up user information",tool_call_id=tool_call_id)]})def greet(state: Annotated[CustomState, InjectedState]
) -> str:"""一旦获取用户信息,就使用这个函数来问候用户。"""user_name = state["user_name"]return f"Hello {user_name}!"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[update_user_info, greet],state_schema=CustomState
)agent.invoke({"messages": [{"role": "user", "content": "greet the user"}]},config={"configurable": {"user_id": "user_123"}}
)

⚠️ 注意事项

如果你希望使用返回 Command 并更新图状态的工具,你可以:

  • 使用内置的 create_react_agentToolNode 组件;

  • 或者自己实现一个工具执行节点,该节点收集工具返回的 Command 对象并返回它们的列表,例如:

    def call_tools(state):...commands = [tools_by_name[tool_call["name"]].invoke(tool_call) for tool_call in tool_calls]return commands
    

什么时候你应该手动返回 Command?只有在你需要:

  • 显式更新 agent 状态(如 state["user_name"]);
  • 添加一条 ToolMessage 到消息历史;
  • 返回多个字段并参与下一步控制判断(如 if-else);

否则你完全可以像 TavilySearch 一样,返回普通字符串、数字或 dict。

长期记忆(Long-term memory)

长期记忆用于跨会话存储特定用户或应用的数据。这对于聊天机器人等应用非常有用,因为你可能希望记住用户偏好或其他信息。

要使用长期记忆,你需要:

  1. 配置一个数据存储(store),以在多次调用之间持久化数据;
  2. 在工具或提示词中通过 get_store 函数访问该存储。
读取长期记忆
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langgraph.graph import StateGraph
from langgraph.config import get_store@tool
def get_user_info(config: RunnableConfig) -> str:"""查询用户信息。"""store = get_store()user_id = config["configurable"].get("user_id")user_info = store.get(("users",), user_id)return str(user_info.value) if user_info else "未知用户"builder = StateGraph(...)
...
graph = builder.compile(store=store)

在实际中访问长期记忆:

from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStorestore = InMemoryStore()store.put(("users",),"user_123",{"name": "John Smith","language": "English",}
)@tool
def get_user_info(config: RunnableConfig) -> str:"""查询用户信息。"""store = get_store()user_id = config["configurable"].get("user_id")user_info = store.get(("users",), user_id)return str(user_info.value) if user_info else "未知用户"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[get_user_info],store=store
)agent.invoke({"messages": [{"role": "user", "content": "look up user information"}]},config={"configurable": {"user_id": "user_123"}}
)
更新长期记忆
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langgraph.graph import StateGraph
from langgraph.config import get_store@tool
def save_user_info(user_info: str, config: RunnableConfig) -> str:"""保存用户信息。"""store = get_store()user_id = config["configurable"].get("user_id")store.put(("users",), user_id, user_info)return "用户信息保存成功。"builder = StateGraph(...)
...
graph = builder.compile(store=store)

示例:保存用户信息

from typing_extensions import TypedDict
from langchain_core.tools import tool
from langgraph.config import get_store
from langgraph.prebuilt import create_react_agent
from langgraph.store.memory import InMemoryStorestore = InMemoryStore()class UserInfo(TypedDict):name: str@tool
def save_user_info(user_info: UserInfo, config: RunnableConfig) -> str:"""保存用户信息。"""store = get_store()user_id = config["configurable"].get("user_id")store.put(("users",), user_id, user_info)return "用户信息保存成功。"agent = create_react_agent(model="anthropic:claude-3-7-sonnet-latest",tools=[save_user_info],store=store
)agent.invoke({"messages": [{"role": "user", "content": "My name is John Smith"}]},config={"configurable": {"user_id": "user_123"}}
)# 你也可以直接访问 store 获取值:
store.get(("users",), "user_123").value

将工具附加到模型(Attach tools to a model)

要将工具的 schema 附加到聊天模型上,需要使用 model.bind_tools() 方法。

from langchain_core.tools import tool
from langchain.chat_models import init_chat_model@tool
def multiply(a: int, b: int) -> int:"""两个数字相乘。"""return a * bmodel = init_chat_model(model="claude-3-5-haiku-latest")
model_with_tools = model.bind_tools([multiply])model_with_tools.invoke("42 乘以 7 是多少?")

返回的 AIMessage(人工智能消息):

AIMessage(content=[{'text': "我将使用 multiply 函数帮你计算。", 'type': 'text'},{'id': 'toolu_01GhULkqytMTFDsNv6FsXy3Y', 'input': {'a': 42, 'b': 7}, 'name': 'multiply', 'type': 'tool_use'}],tool_calls=[{'name': 'multiply', 'args': {'a': 42, 'b': 7}, 'id': 'toolu_01GhULkqytMTFDsNv6FsXy3Y', 'type': 'tool_call'}]
)

使用工具(Use tools)

LangChain 中的工具遵循 Runnable 接口,因此你可以使用 .invoke().ainvoke() 方法来调用工具。

from langchain_core.tools import tool@tool
def multiply(a: int, b: int) -> int:"""两个数字相乘。"""return a * bmultiply.invoke({"a": 42, "b": 7})

输出:

294

如果你希望工具返回一个 ToolMessage,则需要用工具调用格式调用它:

tool_call = {"type": "tool_call","id": "1","args": {"a": 42, "b": 7}
}
multiply.invoke(tool_call)

输出:

ToolMessage(content='294', name='multiply', tool_call_id='1')

配合聊天模型使用工具

from langchain_core.tools import tool
from langchain.chat_models import init_chat_model@tool
def multiply(a: int, b: int) -> int:"""两个数字相乘。"""return a * bmodel = init_chat_model(model="claude-3-5-haiku-latest")
model_with_tools = model.bind_tools([multiply])response_message = model_with_tools.invoke("42 乘以 7 是多少?")
tool_call = response_message.tool_calls[0]multiply.invoke(tool_call)

返回:

ToolMessage(content='294', name='multiply', tool_call_id='toolu_0176DV4YKSD8FndkeuuLj36c')

使用预构建 Agent(Use prebuilt agent)

若你想创建一个会自动调用工具的 Agent,可使用预构建的 create_react_agent

from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent@tool
def multiply(a: int, b: int) -> int:"""两个数字相乘。"""return a * bagent = create_react_agent(model="anthropic:claude-3-7-sonnet",tools=[multiply]
)graph.invoke({"messages": [{"role": "user", "content": "42 乘以 7 是多少?"}]})

使用预设 ToolNode

ToolNode 是一个用于执行工具调用的 LangGraph 预构建节点。

为什么使用 ToolNode?

  • 支持同步与异步工具
  • 工具可并发执行
  • 工具执行过程中的错误处理(默认启用,可以通过 handle_tool_errors=True 开启/关闭)

ToolNode 作用于 MessagesState

  • 输入:最后一条消息为带有 tool_calls 参数的 AIMessage
  • 输出:带有工具调用结果的 ToolMessage

💡 Tip:ToolNode 与 LangGraph 的预设 agent 无缝配合,也可以与任何使用 MessagesState 的自定义 StateGraph 搭配使用。

from langgraph.prebuilt import ToolNodedef get_weather(location: str):"""Call to get the current weather."""if location.lower() in ["sf", "san francisco"]:return "It's 60 degrees and foggy."else:return "It's 90 degrees and sunny."def get_coolest_cities():"""Get a list of coolest cities"""return "nyc, sf"tool_node = ToolNode([get_weather, get_coolest_cities])
tool_node.invoke({"messages": [...]})

示例:单个工具调用

from langchain_core.messages import AIMessage
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool@tool
def get_weather(location: str):"""获取当前天气。"""if location.lower() in ["sf", "san francisco"]:return "旧金山现在是 60 华氏度,雾气弥漫。"else:return "现在是 90 华氏度,阳光明媚。"tool_node = ToolNode([get_weather])message = AIMessage(content="",tool_calls=[{"name": "get_weather","args": {"location": "sf"},"id": "tool_call_id","type": "tool_call",}],
)tool_node.invoke({"messages": [message]})

输出结果:

{'messages': [ToolMessage(content="旧金山现在是 60 华氏度,雾气弥漫。", name='get_weather', tool_call_id='tool_call_id')
]}

示例:多个工具调用

from langchain_core.messages import AIMessage
from langgraph.prebuilt import ToolNodedef get_weather(location: str):if location.lower() in ["sf", "san francisco"]:return "旧金山现在是 60 华氏度,雾气弥漫。"else:return "现在是 90 华氏度,阳光明媚。"def get_coolest_cities():return "纽约、旧金山"tool_node = ToolNode([get_weather, get_coolest_cities])message = AIMessage(content="",tool_calls=[{"name": "get_coolest_cities","args": {},"id": "tool_call_id_1","type": "tool_call",},{"name": "get_weather","args": {"location": "sf"},"id": "tool_call_id_2","type": "tool_call",},],
)tool_node.invoke({"messages": [message]})

输出:

{'messages': [ToolMessage(content='纽约、旧金山', name='get_coolest_cities', tool_call_id='tool_call_id_1'),ToolMessage(content="旧金山现在是 60 华氏度,雾气弥漫。", name='get_weather', tool_call_id='tool_call_id_2')]
}

搭配聊天模型使用

from langchain.chat_models import init_chat_model
from langgraph.prebuilt import ToolNodedef get_weather(location: str):if location.lower() in ["sf", "san francisco"]:return "旧金山现在是 60 华氏度,雾气弥漫。"else:return "现在是 90 华氏度,阳光明媚。"tool_node = ToolNode([get_weather])model = init_chat_model(model="claude-3-5-haiku-latest")
model_with_tools = model.bind_tools([get_weather])response = model_with_tools.invoke("what's the weather in sf?")
tool_node.invoke({"messages": [response]})

输出:

{'messages': [ToolMessage(content="旧金山现在是 60 华氏度,雾气弥漫。", name='get_weather', tool_call_id='toolu_01Pnkgw5JeTRxXAU7tyHT4UW')
]}

在工具调用 Agent 中使用

下面是使用 ToolNode 从零构建一个工具调用 Agent 的完整示例:

from langchain.chat_models import init_chat_model
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, ENDdef get_weather(location: str):if location.lower() in ["sf", "san francisco"]:return "旧金山现在是 60 华氏度,雾气弥漫。"else:return "现在是 90 华氏度,阳光明媚。"tool_node = ToolNode([get_weather])
model = init_chat_model(model="claude-3-5-haiku-latest")
model_with_tools = model.bind_tools([get_weather])def should_continue(state: MessagesState):last_message = state["messages"][-1]if last_message.tool_calls:return "tools"return ENDdef call_model(state: MessagesState):messages = state["messages"]response = model_with_tools.invoke(messages)return {"messages": [response]}builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("tools", tool_node)
builder.add_edge(START, "call_model")
builder.add_conditional_edges("call_model", should_continue, ["tools", END])
builder.add_edge("tools", "call_model")graph = builder.compile()
graph.invoke({"messages": [{"role": "user", "content": "what's the weather in sf?"}]})

输出结构:

{'messages': [HumanMessage(content="what's the weather in sf?"),AIMessage(content=[{'text': "I'll help you check the weather in San Francisco right now.", 'type': 'text'},{'id': 'toolu_01A4vwUEgBKxfFVc5H3v1CNs', 'input': {'location': 'San Francisco'}, 'name': 'get_weather', 'type': 'tool_use'}],tool_calls=[{'name': 'get_weather', 'args': {'location': 'San Francisco'}, 'id': 'toolu_01A4vwUEgBKxfFVc5H3v1CNs', 'type': 'tool_call'}]),ToolMessage(content="旧金山现在是 60 华氏度,雾气弥漫。"),AIMessage(content="旧金山目前的天气是 60 华氏度,雾气弥漫——典型的旧金山气候,著名的海洋层!")]
}
错误处理

默认情况下,ToolNode 会捕捉工具执行中抛出的所有异常,并将错误信息作为 ToolMessage 返回。你可以通过 handle_tool_errors 参数自定义行为:

启用错误处理(默认)

from langchain_core.messages import AIMessage
from langgraph.prebuilt import ToolNodedef multiply(a: int, b: int) -> int:"""两个数字相乘。"""if a == 42:raise ValueError("终极错误")return a * btool_node = ToolNode([multiply])# 使用默认的错误处理运行
message = AIMessage(content="",tool_calls=[{"name": "multiply","args": {"a": 42, "b": 7},"id": "tool_call_id","type": "tool_call",}],
)tool_node.invoke({"messages": [message]})

返回结果:

{'messages': [ToolMessage(content="Error: ValueError('The ultimate error')\n Please fix your mistakes.", name='multiply', tool_call_id='tool_call_id', status='error')]}

禁用错误处理

from langchain_core.messages import AIMessage
from langgraph.prebuilt import ToolNodedef multiply(a: int, b: int) -> int:"""两个数字相乘。"""if a == 42:raise ValueError("终极错误")return a * btool_node = ToolNode([multiply],handle_tool_errors=False  # 禁用错误处理
)message = AIMessage(content="",tool_calls=[{"name": "multiply","args": {"a": 42, "b": 7},"id": "tool_call_id","type": "tool_call",}],
)tool_node.invoke({"messages": [message]})

自定义错误处理

from langchain_core.messages import AIMessage
from langgraph.prebuilt import ToolNodedef multiply(a: int, b: int) -> int:"""两个数字相乘。"""if a == 42:raise ValueError("终极错误")return a * btool_node = ToolNode([multiply],handle_tool_errors=("不能用42作为第一个操作数,必须交换操作数!")
)tool_node.invoke({"messages": [message]})

返回结果:

{'messages': [ToolMessage(content="不能用42作为第一个操作数,必须交换操作数!", name='multiply', tool_call_id='tool_call_id', status='error')]}

处理大量工具

随着可用工具数量的增加,你可能希望限制大语言模型(LLM)选择的工具范围,以减少令牌消耗,并帮助管理 LLM 推理中的错误来源。

为了解决这个问题,你可以通过语义搜索在运行时动态检索相关工具,从而动态调整模型可用的工具集。

更多细节请参见 langgraph-bigtool 预构建库的现成实现。

相关文章:

  • FFmpeg移植教程(linux平台)
  • Webpack依赖
  • 【git-首次初始化本地项目、关联远程仓库】
  • 基于Qt的app开发的过渡期
  • 数据库系统概论(十六)数据库安全性(安全标准,控制,视图机制,审计与数据加密)
  • Linux运维笔记:服务器感染 netools 病毒案例
  • PostgreSQL不同的等级认证体系
  • 【Android】MT6835 + MT6631 WiFi进入Meta模式出现WiFi_HQA_OpenAdapter failed
  • BUUCTF[HCTF 2018]WarmUp 1题解
  • 【iOS】ARC 与 Autorelease
  • (未解决)日历清单-扩展屏壁纸显示问题
  • 代码随想录60期day54
  • 定制开发开源AI智能名片驱动下的海报工厂S2B2C商城小程序运营策略——基于社群口碑传播与子市场细分的实证研究
  • mysql数据库实现分库分表,读写分离中间件sharding-sphere
  • 【MySQL】视图与用户管理
  • 指挥中心系统建设与应用方案PPT(46页)
  • 渗透实战PortSwigger Labs AngularJS DOM XSS利用详解
  • AIGC工具平台-GPT-SoVITS-v4-TTS音频推理克隆
  • pikachu靶场通关笔记10 XSS关卡06-XSS之盲打
  • 「Python教案」字符串格式化操作
  • 如果制作个人网站/国内免费推广产品的网站
  • 网站建设 企炬江阴/正版google下载
  • 淘宝联盟网站备案/营销手段有哪些
  • 网站作弊/济南网站建设方案
  • 上海市建设网站/百度引流推广怎么收费
  • 网站右下角浮动效果如何做/求网址