LangGraph底层API学习
文章目录
- 说明
- 一 LangGraph图结构对象创建
- 1.1 LangChain图结构概念
- 1.2 手动构建图流程
- 1.3 借助Pydantic对象创建状态
- 1.4 创建条件分支图
- 1.5 创建条件循环图
- 二 搭建多轮对话问答机器人
- 2.1 LangGraph中多轮对话实现方法
- 2.2 接入大模型
- 2.3 搭建对话机器人
- 2.4 一次对话
- 2.5 多次对话
- 2.6 最终代码
- 2.7 搭建多轮对话机器人
- 2.8 LangGraph流式打印
说明
- 本文学自赋范社区,仅供学习和交流!
- 采用底层API构建智能体要求开发者掌握更加复杂的构建图的语法,但借助底层API,能够更加灵活的完成各类智能体的开发,而且在某些场景下,如实现人在闭环(Human in the loop)或者搭建多智能体(Multi Agent)系统时,必须要使用更加底层的图结构API才能够完成。
一 LangGraph图结构对象创建
pip install langgraph
pip show langgraph
Name: langgraph
Version: 0.5.4
Summary: Building stateful, multi-actor applications with LLMs
Home-page:
Author:
Author-email:
License-Expression: MIT
Location: C:\Users\kongyue\.conda\envs\langgraph\Lib\site-packages
Requires: langchain-core, langgraph-checkpoint, langgraph-prebuilt, langgraph-sdk, pydantic, xxhash
Required-by:
Note: you may need to restart the kernel to use updated packages.
1.1 LangChain图结构概念
-
在以图构建的框架中,任何可执行的功能都可以作为对话、代理或程序的启动点。这个启动点可以是大模型的 API 接口、基于大模型构建的 AI Agent,通过 LangChain 或其他技术建立的线性序列等等,即下图中的 “Start” 圆圈所示。无论哪种形式,它都首先处理用户的输入,并决定接下来要做什么。
-
在 LangGraph 概念下,最基本的一种代理模型:👇
-
在启动点定义的可运行功能会根据收到的输入决定是否进行检索以及如何响应。 如在执行过程中,如果需要检索信息,则可以利用搜索工具来实现,如Web Search(网络搜索)、Query Database(查询数据库)、RAG等获取必要的信息(图中的 “Action” )。接下来,再使用一个大语言模型(“LLM”)处理工具提供的信息,结合用户最初传入的初始查询,生成最终的响应(图中的 “LLMs” ),最终,这个响应被传递至终点节点(图中的 “End” )。
-
上图在LangGraph框架中一个非常简单的代理构成形式。每个圆圈代表一个“节点”(Nodes),每个箭头表示一条“边”(Edges)。**在 LangGraph 中,无论代理的构建是简单还是复杂,它最终都是由节点和边通过特定的组合形成的图。**这样的构建形式形成的工作流原理就是:当每个节点完成工作后,通过边告诉下一步该做什么。
-
LangGraph的底层图算法就是在使用消息传递来定义通用程序。当节点完成其操作时,它会沿着一条或多条边向其他节点发送消息。然后,这些接收节点执行其功能,将结果消息传递给下一组节点,然后该过程继续。如此循环往复。
- LangGraph框架是通过组合Nodes和Edges去创建复杂的循环工作流程,通过消息传递的方式串联所有的节点形成一个通路。维持消息能够及时的更新并向该去的地方传递,则依赖langGraph构建的State概念。 在LangGraph构建的流程中,每次执行都会启动一个状态,图中的节点在处理时会传递和修改该状态。这个状态不仅仅是一组静态数据,而是由每个节点的输出动态更新,然后影响循环内的后续操作。
1.2 手动构建图流程
- 图的State表示会随着图计算的进行而维护和更新的上下文或记忆。它用来确保图中的每个步骤都可以访问先前步骤的相关信息,从而可以根据整个过程中积累的数据进行动态决策。这个过程通过状态图StateGraph类实现,它是由LangGraph框架提供的核心类之一,专门用来创建state状态。
- 构建state的方法非简单,可以将图的状态设计为一个字典,用于在不同节点间共享和修改数据,然后使用StateGraph类进行图的实例化。
from langgraph.graph import StateGraph# 使用 stategraph 接收一个字典
builder = StateGraph(dict)
- builder也是后面要用到的图构建器(Graph Builder)对象,用于逐步添加节点、边、控制流逻辑,最终编译成可执行的 LangGraph 图。而这个图构建器需要通过带入一个状态对象来创建。
- addition节点是一个加法逻辑,接收当前状态StateGraph(dict),将字典中x的值增加1,并返回新的状态。而subtraction节点是一个减法逻辑,接收从addition节点传来的状态StateGraph(dict),从字典中的x值减去2,创建并返回Cinderella状态。
def addition(state):# 接收到的是初始状态print(f"init_state: {state}")return {"x": state["x"] + 1}def subtraction(state):# 接收到的是上一个节点的状态print(f"addition_state: {state}")return {"x": state["x"] - 2}
- 图的构建
# START 和 END 是两个特殊的节点,分别表示图的开始和结束。
from langgraph.graph import START, END# 向图中添加两个节点
builder.add_node("addition", addition)
builder.add_node("subtraction", subtraction)# 构建节点之间的边
builder.add_edge(START, "addition")
builder.add_edge("addition", "subtraction")
builder.add_edge("subtraction", END)# 通过调用compile()方法将这些设置编译成一个可执行的图
graph = builder.compile()
- LangGraph还提供了多种内置的图形可视化方法,能够将任何Graph以图形的形式展示出来。可视化最大的好处是:直接从代码中生成图形化的表示,可以检查图的执行逻辑是否符合构建的预期。
pip install pyppeteer ipython
from IPython.display import Image, displaydisplay(Image(graph.get_graph(xray=True).draw_mermaid_png()))
# 定义一个初始化的状态
initial_state = {"x":10}
graph.invoke(initial_state)
init_state: {'x': 10}
addition_state: {'x': 11}
{'x': 9}
- LangGraph 的执行模型并不强制要求图中必须有 END 节点。只要执行路径在某个节点“无后继边”(即到达“终点”),那么该节点就被视为隐式终点。也就是说,将一个字典作为状态对象带入到图中,即可进行图的实际运行。
- 在图的执行过程中,每个节点的函数会被调用,并且接收到前一个节点返回的状态作为输入。每个函数处理完状态后,会输出一个新的状态,传递给下一个节点。
- 节点函数不需要返回整个状态,而是仅返回它们更新的部分。 在每个节点的函数内部逻辑中,需要使用和更新哪些State中的参数中,只需要在return的时候指定即可,不必担心未在当前节点处理的State中的其他值会丢失,因为LangGraph的内部机制已经自动处理了状态的合并和维护。
- 状态在任何给定时间只包含来自一个节点的更新信息。这意味着当节点处理状态时,它只能访问与其特定操作直接相关的数据,从而确保每个节点的逻辑是隔离和集中的。
- 使用字典作为状态模式非常简单,由于缺乏预定义的模式,节点可以在没有严格类型约束的情况下自由地读取和写入状态,这样的灵活性有利于动态数据处理。 开发者在整个图的执行过程中保持对键和值的一致性管理。因为如果在任何节点中尝试访问State中不存在的键,会直接中断整个图的运行状态。
1.3 借助Pydantic对象创建状态
- Pydantic 是一个用于创建“数据模型”的 Python 库,它可以自动校验数据类型,并将字典数据转换为结构化对象。它就像是给字典加了一个“类型安全 + 自动验证”的外壳,是现代 Python 项目中最主流的“数据结构定义工具”。
from pydantic import BaseModelclass MyState(BaseModel):x: inty: str = "default" # 设置默认值# 自动校验
state = MyState(x=1)
print(state.x) # 输出 1
print(state.y) # 输出 default# 错误类型会报错
# state = MyState(x="abc") # ❌ 会抛出 ValidationError
- 使用Pydantic的BaseModel定义结构化状态模型,实现自动校验数据类型,避免不必要的数据类型错误。
from pydantic import BaseModel from langgraph.graph import StateGraph, START, END# ✅ 1. 定义结构化状态模型 class CalcState(BaseModel):x: int# ✅ 2. 定义节点函数,接收并返回 CalcState def addition(state: CalcState) -> CalcState:print(f"[addition] 初始状态: {state}")return CalcState(x=state.x + 1)def subtraction(state: CalcState) -> CalcState:print(f"[subtraction] 接收到状态: {state}")return CalcState(x=state.x - 2)# ✅ 3. 构建图 builder = StateGraph(CalcState)builder.add_node("addition", addition) builder.add_node("subtraction", subtraction)builder.add_edge(START, "addition") builder.add_edge("addition", "subtraction") builder.add_edge("subtraction", END)graph = builder.compile()# ✅ 4. 执行图:传入结构化状态对象 initial_state = CalcState(x=10) final_state = graph.invoke(initial_state)# ✅ 5. 打印最终结果 # 无论输入端输入什么结构的对象,最终图计算返回结果是一个字典类型对象。 print("\n[最终结果] ->", final_state)
[addition] 初始状态: x=10 [subtraction] 接收到状态: x=11[最终结果] -> {'x': 9}
1.4 创建条件分支图
- 基于
LangGraph
框架构建了一个简单的有状态条件分支图,用于演示如何使用结构化状态(通过Pydantic
模型定义)在多步骤的决策流程中进行状态传递与条件控制。 - 首先定义一个名为
MyState
的Pydantic
模型,用于描述图中每个节点共享的上下文状态信息。该状态包含两个字段:x
表示输入数值,result
表示最终处理结果。通过使用Pydantic
,能够显著增强状态管理的类型安全性、可读性和扩展性。 - 整体流程
START → check_x → [判断 x 是否为偶数]├─ True → handle_even → END└─ False → handle_odd → END
from typing import Optional from pydantic import BaseModel from langgraph.graph import StateGraph, START, END# ✅ 定义结构化状态 class MyState(BaseModel):x: int# 表示一个可选的字符串,常用于延迟赋值或非必要字段result: Optional[str] = None # ✅ 定义各节点处理逻辑(接受 MyState,返回 MyState) # 当前执行完的节点名称(分支判断起点) def check_x(state: MyState) -> MyState:print(f"[check_x] Received state: {state}")return state # 一个接收状态 state 并返回布尔值的函数,用于判断分支条件 def is_even(state: MyState) -> bool:return state.x % 2 == 0def handle_even(state: MyState) -> MyState:print("[handle_even] x 是偶数")return MyState(x=state.x, result="even")def handle_odd(state: MyState) -> MyState:print("[handle_odd] x 是奇数")return MyState(x=state.x, result="odd")# ✅ 构建图 builder = StateGraph(MyState)builder.add_node("check_x", check_x) builder.add_node("handle_even", handle_even) builder.add_node("handle_odd", handle_odd)# ✅ 添加条件分支 builder.add_conditional_edges("check_x", is_even, {True: "handle_even",False: "handle_odd" })# ✅ 衔接起始和结束 builder.add_edge(START, "check_x") builder.add_edge("handle_even", END) builder.add_edge("handle_odd", END)# ✅ 编译图 graph = builder.compile()# ✅ 执行测试 print("\n✅ 测试 x=4(偶数)") graph.invoke(MyState(x=4))print("\n✅ 测试 x=3(奇数)") graph.invoke(MyState(x=3))from IPython.display import display, Image display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
✅ 测试 x=4(偶数) [check_x] Received state: x=4 result=None [handle_even] x 是偶数✅ 测试 x=3(奇数) [check_x] Received state: x=3 result=None [handle_odd] x 是奇数 {'x': 3, 'result': 'odd'}
1.5 创建条件循环图
- 条件循环图中,有关条件判断的边不用手动添加,自动在条件判断时生成。
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END# ✅ 1. 定义结构化状态模型
class LoopState(BaseModel):x: int# ✅ 2. 定义节点逻辑
def increment(state: LoopState) -> LoopState:print(f"[increment] 当前 x = {state.x}")return LoopState(x=state.x + 1)def is_done(state: LoopState) -> bool:return state.x > 10# ✅ 3. 构建图
builder = StateGraph(LoopState)
builder.add_node("increment", increment)# ✅ 4. 设置循环控制:is_done 为 True 则结束,否则继续
builder.add_conditional_edges("increment", is_done, {True: END,False: "increment"
})builder.add_edge(START, "increment")
graph = builder.compile()# ✅ 5. 测试执行
print("\n✅ 执行循环直到 x > 10")
final_state = graph.invoke(LoopState(x=6))
print(f"[最终结果] -> x = {final_state['x']}")
- 执行结果:
✅ 执行循环直到 x > 10 [increment] 当前 x = 6 [increment] 当前 x = 7 [increment] 当前 x = 8 [increment] 当前 x = 9 [increment] 当前 x = 10 [最终结果] -> x = 11
二 搭建多轮对话问答机器人
2.1 LangGraph中多轮对话实现方法
- 大模型应用都是接受消息列表作为输入,需要接收Message对象列表作为输入。这些消息有多种形式,例如HumanMessage (用户输入)或AIMessage ( 大模型响应)。
- 对于消息序列格式的state,一种更简单的方法就是使用LangGraph预构建的add_messages函数,这个更高级的状态所实现的是:对于全新的消息,它会附加到现有列表,同时它也会正确处理现有消息的更新。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messagesclass State(TypedDict):messages: Annotated[list, add_messages]graph_builder = StateGraph(State)
元素 | 含义 | 是什么? |
---|---|---|
list | 字段的数据类型 | ✅ Python 内置的类型(列表类型) |
add_messages | 字段的“附加语义” | ✅ LangGraph 提供的特殊函数(合并器/reducer),不是 list 的方法 |
-
add_messages的核心逻辑是合并两个消息列表,按 ID 更新现有消息。默认情况下,状态为“仅附加”,当新消息与现有消息具有相同的 ID时,进行更新。合并逻辑则是:如果right的消息与left的消息具有相同的 ID,则right的消息将替换left的消息,否则作为一条新的消息进行追加。具体参数是:
- left ( Messages ) – 消息的基本列表。
- right ( Messages ) – 要合并到基本列表中的消息列表(或单个消息)。
-
使用add_messages函数来进行快速验证。 如果消息的ID不一样,则会进行追加。
from langgraph.graph.message import add_messages from langchain_core.messages import AIMessage, HumanMessagemsgs1 = [HumanMessage(content="你好。", id="1")] msgs2 = [AIMessage(content="你好,很高兴认识你。", id="2")]add_messages(msgs1, msgs2)msgs1 = [HumanMessage(content="你好。", id="1")] msgs2 = [HumanMessage(content="你好呀。", id="1")]add_messages(msgs1, msgs2)
[HumanMessage(content='你好。', additional_kwargs={}, response_metadata={}, id='1'),AIMessage(content='你好,很高兴认识你。', additional_kwargs={}, response_metadata={}, id='2')]
[HumanMessage(content='你好呀。', additional_kwargs={}, response_metadata={}, id='1')]
-
注意:不能直接在普通 Python 代码中测试 add_messages 的合并功能。因为✅ add_messages 并不会在你创建 State 字典时自动生效, ⛔ 它只会在 LangGraph 的内部状态更新系统中被识别和调用。
2.2 接入大模型
from langchain_openai import ChatOpenAI
OPENAI_API_KEY="sk-xxx"
BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
# 初始化ChatOpenAI实例
chat = ChatOpenAI(model="deepseek-r1", # 这里直接使用模型名称openai_api_key=OPENAI_API_KEY,openai_api_base=BASE_URL
)# 调用generate方法生成回答
response = chat.invoke([{"role": "system", "content": "你是乐于助人的助手,请根据用户的问题给出回答"},{"role": "user", "content": "你好,请你介绍一下你自己。"},
])# 打印响应结果
print(response.content)
- 输入如下:
你好!很高兴认识你!😊
我是由深度求索(DeepSeek)研发的智能助手,名字叫 DeepSeek-R1。你可以把我当作一个知识丰富、乐于助人、随时待命的“全能小帮手”!无论你是:
- 📚 想学知识(比如历史、科技、语文、数学……)
- 💼 需要工作支持(写报告、写简历、做PPT、数据分析……)
- 🧠 有创意需求(写文案、编故事、起名字、写剧本……)
- 🌍 好奇世界(旅行推荐、时事热点、冷门知识……)
- 🤔 想聊聊天(情感倾诉、脑洞大开、解闷放松……)
我都可以陪着你,尽力解答!
✨目前我能阅读和处理文字信息(支持上传文件:PDF、Word、Excel等),虽然暂时不能“听”或“看”图像视频,但我非常擅长文字理解和推理,能帮你高效解决问题!
你可以随时跟我说话,有问必答,永不疲倦~🌟
现在有什么我可以帮你的吗?
- 仅仅通过两行代码,我们便可以在LangChain中顺利调用qwen-plus模型,并得到模型的响应结果。使用LangChain调用模型无疑是更加简单的。
from langchain_openai import ChatOpenAI
model="qwen-plus" # deepseek-r1-0528
OPENAI_API_KEY="sk-xxx"
BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"# 初始化ChatOpenAI实例
chat = ChatOpenAI(model=model, openai_api_key=OPENAI_API_KEY,openai_api_base=BASE_URL
)from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model
import os
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["OPENAI_BASE_URL"] = BASE_URL
model=init_chat_model(model=model, model_provider="openai")response = model.invoke("你好,请介绍下你自己。")
print(response.content)
2.3 搭建对话机器人
- 接下来可以直接把
langChain
的Chat Model
接入LangGraph
中作为一个图节点(Node
), 并使用LangGraph
的StateGraph
来管理消息序列。
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messagesclass State(TypedDict):messages: Annotated[list, add_messages]def chatbot(state: State):return {"messages": [model.invoke(state["messages"])]}graph_builder = StateGraph(State)# 添加节点
graph_builder.add_node("chatbot", chatbot)# 添加边
graph_builder.add_edge(START, "chatbot")graph = graph_builder.compile()from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
- 图的基本逻辑是:第一个节点调用大模型并生成一个输出,该输出是一个AIMessage对象类型,然后,第二个节点直接将前一个节点的 AIMessage 提取为具体的JSON格式,完成JSON的解析。
2.4 一次对话
- 一次对话
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from typing import TypedDict, AnnotatedOPENAI_API_KEY="hk-xxx"
BASE_URL="https://api.openai-hk.com/v1"# Initialize your language model first
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o",openai_api_key=OPENAI_API_KEY,openai_api_base=BASE_URL)class State(TypedDict):messages: Annotated[list, add_messages]def chatbot(state: State):return {"messages": [model.invoke(state["messages"])]}graph_builder = StateGraph(State)# Add nodes
graph_builder.add_node("chatbot", chatbot)# Set entry point
graph_builder.set_entry_point("chatbot")graph = graph_builder.compile()final_state = graph.invoke({"messages": ["你好,我叫陈明,好久不见。"]})
print(final_state)
print(final_state['messages'][0].content)
print(final_state['messages'][1].content)
- 输出结果
{'messages': [HumanMessage(content='你好,我叫陈明,好久不见。', additional_kwargs={}, response_metadata={}, id='493116a4-b7f2-4662-9530-d45346f88ff6'), AIMessage(content='你好,陈明!很高兴再次见到你。最近怎么样?有什么新鲜事或需要帮助的地方吗?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 28, 'prompt_tokens': 17, 'total_tokens': 45, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_ee1d74bde0', 'id': 'chatcmpl-BywFOleGTaxACjuUyTLysAmxFoVFp', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--73f1500a-740a-4674-a981-be84725c15e0-0', usage_metadata={'input_tokens': 17, 'output_tokens': 28, 'total_tokens': 45, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
你好,我叫陈明,好久不见。
你好,陈明!很高兴再次见到你。最近怎么样?有什么新鲜事或需要帮助的地方吗?
2.5 多次对话
- 列表消息代码
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from typing import TypedDict, Annotated
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
OPENAI_API_KEY="hk-xxx"
BASE_URL="https://api.openai-hk.com/v1"# Initialize your language model first
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o",openai_api_key=OPENAI_API_KEY,openai_api_base=BASE_URL)class State(TypedDict):messages: Annotated[list, add_messages]def chatbot(state: State):return {"messages": [model.invoke(state["messages"])]}graph_builder = StateGraph(State)# Add nodes
graph_builder.add_node("chatbot", chatbot)# Set entry point
graph_builder.set_entry_point("chatbot")graph = graph_builder.compile()# final_state = graph.invoke({"messages": ["你好,我叫陈明,好久不见。"]})
# print(final_state)
# print("\n")
# print(final_state['messages'][0].content)
# print("\n")
# print(final_state['messages'][1].content)messages_list = [HumanMessage(content="你好,我叫陈明,好久不见。"),AIMessage(content="你好呀!我是小智,一名乐于助人的AI助手。很高兴认识你!"),HumanMessage(content="请问,你还记得我叫什么名字么?"),
]final_state = graph.invoke({"messages": messages_list})
#print(final_state)
print(final_state['messages'][-1].content)
- 运行结果
{'messages': [HumanMessage(content='你好,我叫陈明,好久不见。', additional_kwargs={}, response_metadata={}, id='9c6f3ffd-d31a-4bd2-9427-958055733214'), AIMessage(content='你好呀!我是小智,一名乐于助人的AI助手。很高兴认识你!', additional_kwargs={}, response_metadata={}, id='a5dac7a3-fdb2-4038-b213-069aa98870a7'), HumanMessage(content='请问,你还记得我叫什么名字么?', additional_kwargs={}, response_metadata={}, id='d7dfa503-4052-4b8c-82e1-f596672d86a3'), AIMessage(content='你刚才告诉我你的名字叫陈明。很高兴再次和你交流!如果有任何问题或者需要帮助,请随时告诉我。', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 33, 'prompt_tokens': 57, 'total_tokens': 90, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_ee1d74bde0', 'id': 'chatcmpl-BywIchMMBKmNBorj6SYitO0YMm7bY', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--02a5f659-5aba-4822-9fd0-0a76f9d71258-0', usage_metadata={'input_tokens': 57, 'output_tokens': 33, 'total_tokens': 90, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
'你刚才告诉我你的名字叫陈明。很高兴再次和你交流!如果有任何问题或者需要帮助,请随时
2.6 最终代码
- 据此,我直接调用这个编译好的图graph,即可实现一个可交互式的聊天机器人。
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from typing import TypedDict, Annotated
from langchain_core.messages import HumanMessage
OPENAI_API_KEY="hk-xxx"
BASE_URL="https://api.openai-hk.com/v1"# Initialize your language model first
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o",openai_api_key=OPENAI_API_KEY,openai_api_base=BASE_URL)class State(TypedDict):messages: Annotated[list, add_messages]def chatbot(state: State):return {"messages": [model.invoke(state["messages"])]}graph_builder = StateGraph(State)# Add nodes
graph_builder.add_node("chatbot", chatbot)# Set entry point
graph_builder.set_entry_point("chatbot")graph = graph_builder.compile()messages_list = []while True:try:user_input = input("用户提问: ")if user_input.lower() in ["exit"]: print("下次再见!")breakmessages_list.append(HumanMessage(content=user_input))final_state = graph.invoke({"messages": messages_list})print("🤖 小智:", final_state['messages'][-1].content)messages_list.append(final_state['messages'][-1])messages_list = messages_list[-50:]except:break
用户提问: 你好,我叫陈明,好久不见
🤖 小智: 你好,陈明!好久不见,你最近过得怎么样?有什么新鲜事或想分享的事情吗?
用户提问: 我大学毕业了
🤖 小智: 恭喜你大学毕业!这是一个重要的里程碑,你一定为此付出了很多努力。你对未来有什么计划或者目标吗?准备继续深造还是开始工作呢?
用户提问: exit
下次再见!
2.7 搭建多轮对话机器人
🧠 MemorySaver 的核心功能
- 短期记忆(线程级记忆):
MemorySaver
为每个thread_id
保存和恢复对话状态(State),实现在同一会话中的历史上下文记忆。 - 状态持久化:在每个节点运行后,State 会自动存储;再次调用时,如果使用相同的
thread_id
,MemorySaver
会恢复此前保存的状态,无需手动传递历史信息 。 - 多会话隔离:通过不同的
thread_id
可实现会话隔离,允许多个用户并发交互且各自的对话互不干扰。 - 图状态快照与恢复:不仅包括对话历史,还保存整个工作流状态,可用于错误恢复、时间旅行、断点续跑、
Human‑in‑the‑loop
等高级场景 。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import MemorySaver
import osmodel="deepseek-r1-0528"
model="qwen-plus"
OPENAI_API_KEY="sk-xxx"
BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["OPENAI_BASE_URL"] = BASE_URL# 1. 定义状态类(会自动合并 messages)
class State(TypedDict):messages: Annotated[list, add_messages]# 2. 初始化模型
model = init_chat_model(model=model, model_provider="openai")# 3. 定义聊天节点
def chatbot(state: State) -> State:reply = model.invoke(state["messages"])return {"messages": [reply]}# 4. 构建带 MemorySaver 的图
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)display(Image(graph.get_graph().draw_mermaid_png()))# 5. 运行多轮对话,使用相同 thread_id 实现记忆
thread_config = {"configurable": {"thread_id": "session_10"}}# 第一轮对话
state1 = graph.invoke({"messages": [{"role":"user","content":"你好,好久不见,我叫陈明。"}]}, config=thread_config)
print(state1['messages'][-1].content)
# 第二轮对话
state2 = graph.invoke({"messages":[{"role":"user","content":"你好呀,你还记得我的名字吗?"}]}, config=thread_config)
print(state2['messages'][-1].content)
# 第三轮对话
latest = graph.get_state(thread_config)
print(latest.values["messages"]) # 包含全部轮次对话
你好,陈明!很高兴再次见到你。有什么我可以帮你的吗?😊
当然记得啦,你叫陈明!👋 很高兴你回来找我聊天。有什么新故事或者问题想和我分享吗?✨
[HumanMessage(content='你好,好久不见,我叫陈明。', additional_kwargs={}, response_metadata={}, id='9106fe77-2fda-4181-8695-06a93d08f246'), AIMessage(content='你好,陈明!很高兴再次见到你。有什么我可以帮你的吗?😊', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 22, 'total_tokens': 39, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'qwen-plus', 'system_fingerprint': None, 'id': 'chatcmpl-685a3054-ad08-97e4-956c-9dcdc13ff475', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--4b42fdb8-a6fe-4b03-89f3-e1944229439b-0', usage_metadata={'input_tokens': 22, 'output_tokens': 17, 'total_tokens': 39, 'input_token_details': {'cache_read': 0}, 'output_token_details': {}}), HumanMessage(content='你好呀,你还记得我的名字吗?', additional_kwargs={}, response_metadata={}, id='f28c17d7-853a-4852-b4bc-ad3cfb8579ea'), AIMessage(content='当然记得啦,你叫陈明!👋 很高兴你回来找我聊天。有什么新故事或者问题想和我分享吗?✨', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 31, 'prompt_tokens': 58, 'total_tokens': 89, 'completion_tokens_details': None, 'prompt_tokens_details': {'audio_tokens': None, 'cached_tokens': 0}}, 'model_name': 'qwen-plus', 'system_fingerprint': None, 'id': 'chatcmpl-e13d2763-991e-9b42-985e-9eb340f52879', 'service_tier': None, 'finish_reason': 'stop', 'logprobs': None}, id='run--7812a200-7cf1-4de8-941c-1b554219909e-0', usage_metadata={'input_tokens': 58, 'output_tokens': 31, 'total_tokens': 89, 'input_token_details': {'cache_read': 0}, 'output_token_details': {}})]
2.8 LangGraph流式打印
- 在实际应用中,流式输出尤其适用于需要快速反馈的业务场景,典型的应用场景就是聊天机器人。因为大语言模型可能需要几秒钟才能生成对查询的完整响应,这远远慢于应用程序对最终用户的响应速度约为 200-300 毫秒的阈值,如果是涉及多个大模型调用的复杂应用程序,这种延时会变得更加明显。让应用程序感觉响应更快的关键策略是显示中间进度,即通过 token 流式传输大模型Token的输出,以此来显著提升用户体验。而在开发阶段,利用流式输出功能可以准确追踪到事件的具体执行阶段,并捕获相关数据,从而接入不同逻辑的数据处理和决策流程。
- 流式输出,这种方式允许客户端逐渐接收到大模型生成的每一部分内容,而不是等待整个响应完成后一次性接收。
LangGraph
框架中的工作流中由各个步骤的节点和边组成。这流式传输涉及在各个节点请求更新时跟踪图状态的变化,可以更精细地监控工作流中当前处于活动状态的节点,并在工作流经过不同阶段时提供有关工作流状态的实时更新。其实现方式也是和LangChain
一样通过.stream
和.astream
方法执行流式输出,只不过适配到了图结构中。调用.stream
和.astream
方法时可以指定几种不同的模式,即:- values :在图中的每个步骤之后流式传输状态的完整值。
- updates :在图中的每个步骤之后将更新流式传输到状态。如果在同一步骤中进行多个更新(例如运行多个节点),则这些更新将单独流式传输。
- debug :在整个图的执行过程中流式传输尽可能多的信息,主要用于调试程序。
- messages:记录每个
messages
中的增量token
。 - custom:自定义流,通过
LangGraph 的 StreamWriter
方法实现。
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain.chat_models import init_chat_model
from langgraph.checkpoint.memory import MemorySaver
import os
from langchain_core.messages import HumanMessagemodel="deepseek-r1-0528"
model="qwen-plus"
OPENAI_API_KEY="sk-xxx"
BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["OPENAI_BASE_URL"] = BASE_URL# 1. 定义状态类(会自动合并 messages)
class State(TypedDict):messages: Annotated[list, add_messages]# 2. 初始化模型
model = init_chat_model(model=model, model_provider="openai")# 3. 定义聊天节点
def chatbot(state: State) -> State:reply = model.invoke(state["messages"])return {"messages": [reply]}# 4. 构建带 MemorySaver 的图
builder = StateGraph(State)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END) # 添加结束边
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# 显示图形
display(Image(graph.get_graph().draw_mermaid_png()))# 5. 运行多轮对话,使用相同 thread_id 实现记忆
thread_config = {"configurable": {"thread_id": "session_1"}}# 初始化对话
input_msg = HumanMessage(content="你好,请你详细的介绍一下你自己")async for msg, metadata in graph.astream({"messages": [input_msg]},stream_mode="messages",config=thread_config # 添加 thread_config
):if msg.content and not isinstance(msg, HumanMessage):print(msg.content, end="", flush=True)