20250914-01: Langchain概念:流式传输(Streaming)
20250914-01: Langchain概念:流式传输(Streaming)
任务
🎯 学习目标
- 🔗 核心概念
流式传输
(Streaming)
概述
在大型语言模型应用程序中应该流式传输什么
- 1. 流式传输大型语言模型输出
- 2. 流式传输管道或工作流进度
- 流式传输自定义数据
流式传输 API
stream()
和
astream()
- 与聊天模型一起使用
- 与 LangGraph 一起使用【高阶知识-延后学习】
- 与 LCEL 一起使用
astream\_events
将自定义数据写入流
“自动流式传输”聊天模型
- 工作原理
异步编程
深化理解
如何理解流式传输的意义
核心概念:什么是“自定义数据”?
两种“写信”的方式
- 方法一:LangGraph 的
StreamWriter
(给LangGraph工作流用的)
- 方法二:
dispatch\_events
/
adispatch_events
(更通用,给LCEL链用的)
总结对比
任务
- 阅读 LCEL 官方文档
- 理解
|
操作符的底层是RunnableSequence
🎯 学习目标
理解 LangChain 的“运行时引擎”——所有组件如何被统一调度和编排。
🔗 核心概念
-
可运行接口
(Runnable) -
LangChain 表达式语言
(LCEL) -
回调
(Callbacks) -
追踪
(Tracing) -
流式传输
(Streaming) -
异步编程
(Async)
流式传输
(Streaming)
流式传输 | 🦜️🔗 LangChain 框架
流式传输对于增强基于大型语言模型(LLM)的应用程序的 响应能力 至关重要。通过 渐进式 显示输出,甚至在完整响应就绪之前,流式传输显著改善了 用户体验 (UX),尤其是在处理大型语言模型 延迟 时。
概述
从大型语言模型(LLM)生成完整响应通常会产生几秒钟的延迟,在涉及多次模型调用的复杂应用程序中,这种延迟会更加明显。
这种延迟会更加明显。幸运的是,大型语言模型以迭代方式生成响应,允许在生成过程中显示中间结果
。通过流式传输这些中间输出,LangChain 可以在基于大型语言模型的应用程序中实现更流畅的用户体验,并在其核心设计中提供了对流式传输的内置
支持。
在本指南中,我们将讨论大型语言模型应用程序中的流式传输,并探讨 LangChain 的流式传输 API 如何促进应用程序中各种组件的实时输出。
在大型语言模型应用程序中应该流式传输什么
在涉及大型语言模型的应用程序中,可以通过流式传输多种类型的数据
来减少
感知延迟
并提高透明度
,从而 改善用户体验 。这些包括
1. 流式传输大型语言模型输出
最常见和最关键的流式传输数据是大型语言模型自身生成的输出。这提供了即时反馈,并有助于减少用户的等待时间。
2. 流式传输管道或工作流进度
除了仅仅流式传输大型语言模型输出之外,流式传输通过更复杂工作流或管道的进度也很有用,让用户了解应用程序的整体进展情况。
这可能包括:
- 在
LangGraph 工作流
中: 使用LangGraph,工作流由表示各种步骤的节点和边组成。这里的 流式传输涉及跟踪 ** 图状态 的变化 ,因为单个节点**会请求更新。这允许对工作流中当前活动的节点进行更细粒度的监控,从而在工作流进展到不同阶段时提供实时状态更新。 - 在
LCEL 管道
中: 从LCEL管道流式传输更新涉及捕获单个子可运行组件的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子可运行组件,提供对整个管道进度的实时洞察。
流式传输管道或工作流进度对于向用户提供应用程序在执行过程中所处位置的清晰视图至关重要。
流式传输自定义数据
在某些情况下,您可能需要流式传输自定义数据,这些数据超出了管道或工作流结构所提供的信息。这种自定义信息被注入到工作流中的特定步骤中,无论该步骤是工具还是 LangGraph 节点。
例如,您可以实时流式传输关于工具正在做什么的更新,或者通过 LangGraph 节点的进度。这种粒度数据直接从步骤内部发出,为工作流的执行提供了更详细的洞察,在需要更高可见性的复杂过程中尤其有用。
流式传输 API
LangChain 提供了两个主要的 API 用于实时流式传输输出。这些 API 受任何实现可运行接口的组件支持,包括大型语言模型(LLM)、已编译的 LangGraph 图,以及任何使用LCEL生成的可运行组件。
- 同步 stream 和异步 astream:用于在生成时流式传输 单个可运行组件 (例如,聊天模型)的输出,或者流式传输任何使用 LangGraph 创建的工作流。
- 仅限异步的 astream_events:使用此 API 可以访问完全使用LCEL构建的大型语言模型应用程序中的 自定义事件 和 中间输出 。请注意,此 API 可用,但在使用 LangGraph 时 并非必需 。
此外,还有一个遗留的异步 astream_log API。不推荐在新项目中使用此 API,因为它比其他流式传输 API 更复杂且功能较少。
stream()
和 astream()
stream()
方法返回一个 迭代器 ,该迭代器以 同步方式 在输出生成时 逐块 生成输出。您可以使用 for
循环实时处理每个块。
例如,在使用大型语言模型时,这允许输出在生成时增量流式传输,从而减少用户的等待时间。
stream()
和 astream()
方法生成的 块类型 取决于 正在进行 流式传输的 组件 。例如,当从大型语言模型流式传输时,每个组件都将是AIMessageChunk;但是,对于其他组件,块可能不同。
for chunk in component.stream(some_input):# IMPORTANT: Keep the processing of each chunk as efficient as possible.# While you're processing the current chunk, the upstream component is# waiting to produce the next one. For example, if working with LangGraph,# graph execution is paused while the current chunk is being processed.# In extreme cases, this could even result in timeouts (e.g., when llm outputs are# streamed from an API that has a timeout).print(chunk)
异步版本,即 astream()
,工作方式类似,但专为 非阻塞 工作流设计。您可以在异步代码中使用它来实现相同的实时流式传输行为。
与聊天模型一起使用
当与聊天模型一起使用 stream()
或 astream()
时,输出会以AIMessageChunk的形式流式传输,因为它们是由 大型语言模型生成 的。这允许您在大型语言模型输出生成时逐步呈现或处理它们,这在交互式应用程序或界面中特别有用。
与 LangGraph 一起使用【高阶知识-延后学习】
LangGraph 编译的图是可运行组件,并支持标准的流式传输 API。
当与 LangGraph 一起使用 stream 和 astream 方法时,您可以选择一种或多种流式传输模式,它们允许您控制流式传输的输出类型。可用的流式传输模式有:
- “values” :为每个步骤发出状态的所有值。
- “updates” :仅发出每个步骤后由节点返回的节点名称和更新。
- “debug” :为每个步骤发出调试事件。
- “messages” :逐词发出大型语言模型消息。
- “custom” :发出使用LangGraph 的 StreamWriter 写入的自定义输出。
欲了解更多信息,请参阅
- LangGraph 流式传输概念指南,了解在使用 LangGraph 时如何进行流式传输的更多信息。
- LangGraph 流式传输操作指南,了解 LangGraph 中流式传输的具体示例。
与 LCEL 一起使用
如果您使用LangChain 表达式语言 (LCEL)组合多个可运行组件,则 stream()
和 astream()
方法通常会流式传输链中 最后一步的输出 。这允许最终处理结果以增量方式流式传输。LCEL 尝试优化管道中的流式传输延迟,以便尽快获得最后一步的流式传输结果。
astream_events
使用
astream_events
API 访问完全使用LCEL构建的大型语言模型应用程序中的自定义数据和中间输出。尽管此 API 也可与LangGraph一起使用,但在使用 LangGraph 时通常不需要它,因为
stream
和astream
方法为 LangGraph 图提供了全面的流式传输功能。
对于使用LCEL构建的链,.stream()
方法仅流式传输链中最后一步的输出。
这对于某些应用程序可能已足够,但随着您构建包含多个大型语言模型调用的更复杂链,您可能希望将链的中间值与最终输出一起使用
。例如,在构建基于文档的聊天应用程序时,您可能希望在最终生成的同时返回源信息。
可以使用回调来完成此操作,或者通过构建您的链,使其能够通过类似链式.assign()
调用的方式将中间值传递到末尾,但 LangChain 还包含一个 .astream_events()
方法,它结合了回调的灵活性和 .stream()
的人体工程学优势。调用时,它会返回一个迭代器,该迭代器会生成各种类型的事件,您可以根据项目需求对其进行过滤和处理。
这是一个小例子,它只打印包含流式聊天模型输出的事件
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropicmodel = ChatAnthropic(model="claude-3-sonnet-20240229")prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
#
async for event in chain.astream_events({"topic": "parrot"}):kind = event["event"]if kind == "on_chat_model_stream": # 过滤print(event, end="|", flush=True) #
您可以大致将其视为一个回调事件的迭代器(尽管格式有所不同)——并且您可以在几乎所有 LangChain 组件上使用它!
将自定义数据写入流
要将自定义数据写入流,您需要根据正在使用的组件选择以下方法之一:
- LangGraph 的StreamWriter 可用于写入自定义数据,这些数据在使用 LangGraph 时将通过stream 和 astream API 显示。重要提示:这是 LangGraph 的一个功能,因此在纯 LCEL 环境中不可用。参阅如何流式传输自定义数据了解更多信息。
- dispatch_events / adispatch_events 可用于写入自定义数据,这些数据将通过astream_events API 显示。参阅如何分派自定义回调事件了解更多信息。
“自动流式传输”聊天模型[](https://python.langchain.ac.cn/docs/concepts/streaming/#auto-streaming-chat-models “Direct link to “Auto-Streaming” Chat Models”)
LangChain 通过在某些情况下自动启用流式传输模式来简化聊天模型的流式传输,即使您没有显式调用流式传输方法。这在您使用非流式传输的 invoke
方法但仍希望流式传输整个应用程序(包括聊天模型的中间结果)时特别有用。
工作原理
当您在聊天模型上调用 invoke
(或 ainvoke
)方法时,如果 LangChain 检测到您正在尝试流式传输整个应用程序,它将自动切换到流式传输模式。
在底层,它将使 invoke
(或 ainvoke
)使用 stream
(或 astream
)方法来生成其输出。就使用 invoke
的代码而言,调用的结果将是相同的;然而,当聊天模型进行流式传输时,LangChain 将负责在 LangChain 的回调系统中调用 on_llm_new_token
事件。这些回调事件允许 LangGraph 的 stream
/astream
和 astream_events
实时显示聊天模型的输出。
异步编程
LangChain 提供了许多方法的同步( sync )和异步( async )版本。异步方法通常以字母“ a ”作为前缀(例如,ainvoke
、astream
)。在编写异步代码时,持续使用这些异步方法对于确保 非阻塞 行为和 最佳性能 至关重要。
如果流式传输数据未能实时显示,请确保您的工作流使用了正确的异步方法。
请参阅LangChain 中的异步编程指南,了解如何使用 LangChain 编写异步代码的更多信息。
复习知识点
流式传输 - 核心概念与价值
- 核心目标: 通过渐进式显示输出,显著改善基于 LLM 应用的响应能力和用户体验,对抗感知延迟。
- 工作原理: 利用 LLM 迭代生成响应的特性,在完整结果就绪前就传输和显示中间结果。
应流式传输的内容 (三类数据)
- LLM 输出流 (必备) : 流式传输模型生成的 Token 或 词,提供即时反馈,是改善用户体验最基础和关键的一环。
- 工作流进度流 (高级) : 流式传输复杂管道或工作流(如 LCEL 链或 LangGraph 图)的执行状态和进度,提高应用执行的透明度。
- 自定义数据流 (灵活) : 在特定步骤(如工具调用、节点内部)注入并流式传输任何对用户有价值的自定义信息,提供更详细的执行洞察。
流式传输 API (两大核心接口)
-
stream
/astream
: 同步/异步方法,返回一个迭代器,用于流式传输单个 Runnable 或工作流的最终输出。- 关键说明: 对于 LCEL 链,通常只流式传输最后一步的结果。
-
astream_events
(仅异步) : 更高级的 API,用于流式传输 LCEL 链执行过程中的自定义事件和中间步骤的输出。- 关键说明: 这是访问 LCEL 链中间值的首选方法,但与 LangGraph 结合使用时通常非必需。
API 使用场景与区别
stream
/****astream
用于 LCEL: 优化延迟,旨在尽快开始流式传输最终结果。astream_events
用于 LCEL: 当需要同时获取中间结果和最终输出时使用,提供了类似回调的灵活性但更易用。- LangGraph 的流式传输: 使用
stream
/astream
方法并选择流模式(如values
,updates
)来控制输出内容,功能全面,通常无需astream_events
。
写入自定义数据的方法
- LangGraph
StreamWriter
: 仅在 LangGraph 工作流节点内使用的专用机制,用于将自定义数据写入流。 dispatch_events
/adispatch_events
: 通用的标准方法,主要用于在 LCEL 链的任何组件中分发自定义事件,并通过astream_events
API 捕获。
重要机制与提示
- 自动流式传输: 当在启用流式传输的上下文(如正在调用
astream_events
)中调用聊天模型的invoke
时,LangChain 会自动将其切换为流式模式以优化体验。 - 异步优先: 要确保流式传输的实时性,必须始终使用异步方法(
ainvoke
,astream
,astream_events
)并遵循异步编程最佳实践。 - 处理效率: 在处理
stream()
返回的每个 chunk 时,必须保持高效,因为上游组件会等待当前 chunk 处理完毕后才产生下一个,低效处理可能导致超时。
深化理解
如何理解流式传输的意义
理解
流式传输的核心意义在于 将“处理时间”转化为“体验时间” 。它不仅仅是一项技术优化,更是一种用户体验哲学。
对抗感知延迟(Perceived Latency) :
- 非流式(阻塞式) :用户点击“发送”后,面对的是一个完全空白的屏幕或旋转的加载图标。这段时间是“死寂”的,用户不知道背后发生了什么,也无法判断是卡住了还是在处理。这种不确定性会放大等待的焦虑感,几秒钟的等待也感觉非常漫长。
- 流式:立即返回第一个词或第一个进度更新,将应用程序的“思考过程”外化。用户的注意力从“等待结果”转移到了“消费不断涌现的内容”上。大脑在处理不断流入的信息,感知上的等待时间大大缩短。
提高透明度和信任度:
- 流式传输工作流进度(如您提到的LangGraph或LCEL)相当于为应用程序打开了“后台任务的进度条”。用户清楚地知道“模型正在思考”、“正在调用搜索工具”、“正在生成最终答案”等步骤。这种透明度建立了用户对应用程序的信任,让他们感觉一切尽在掌握,而非在一个黑盒中盲目等待。
实现早期交互和修正:
- 在某些场景下,用户可能一看到开头的几个词就意识到回答方向错了,他们可以提前中断生成,修改问题,从而节省了时间和计算资源。
适应LLM的生成特性:
- LLM的本质是自回归地生成下一个Token(词元),这天然就是一个流式的过程。流式传输只是将这一内在过程暴露给用户,是最符合其工作原理的交互方式。
在大型语言模型应用程序中应该流式传输什么
您列举的三点非常全面,是构建优秀LLM应用的黄金法则。我将对每一项进行展开和举例说明。
1. 流式传输大型语言模型输出(Token流)
这是最基础、最必要的流式传输,是所有LLM应用的标配。
- 传输内容:以Token或词为最小单位的数据块。
- 技术实现:通常通过Server-Sent Events (SSE) 或WebSockets从服务器推送到前端。
- 用户体验:用户看到文字像一个非常快的打字员一样逐个词或逐句地出现。
- 示例:ChatGPT的对话回复、Copilot的代码生成。
2. 流式传输管道或工作流进度(元数据流)
这对于复杂的AI智能体(Agent)或多步骤应用至关重要,是专业级应用和玩具级Demo的关键区别。
传输内容:结构化的元数据(Metadata) ,用于描述当前状态。
LangGraph 工作流示例:
- 状态从
Agent
->Tools
->Agent
的变迁。- 可以流式传输:
{"node": "SearchWeb", "status": "started", "input": "今日美股行情"}
- 紧接着:
{"node": "SearchWeb", "status": "completed", "result": "成功获取到XX数据"}
- 前端可以根据这些信息更新UI,例如高亮当前正在执行的节点,或在一个侧边栏显示执行日志。
LCEL 管道示例:
- 一个RAG管道可能是:
Retriever
->Reranker
->PromptTemplate
->LLM
。- 可以流式传输:
{"component": "Retriever", "progress": "Fetched 10 documents from DB"}
- 紧接着:
{"component": "Reranker", "progress": "Ranking documents..."}
- 这让用户知道应用并非“卡住”,而是在进行后台检索和处理,大幅提升耐心和信任度。
3. 流式传输自定义数据(增强流)
这提供了最大的灵活性,允许开发者将任何认为有价值的信息实时传递给用户。
传输内容:任何开发者定义的、对用户有意义的字符串或JSON对象。
常见应用场景:
- 工具执行详情:当调用一个“执行Python代码”的工具时,除了最终结果,还可以流式传输代码的标准输出(stdout),让用户看到代码运行的中间打印结果。
- 思维链(Chain-of-Thought) :对于一些复杂推理问题,可以选择性地将模型的“内心独白”(“让我先计算A,再比较B…”)流式传输出来,展示其推理过程,这在教育类应用中非常有用。
- 中间答案或草稿:在生成过程中,实时显示模型认为的可能答案列表或草稿版本,最终再收敛到一个最佳答案。
- 置信度或不确定性:为生成的每个事实或语句流式传输一个置信度分数(例如,
[85%] 根据我的知识...
),让用户对信息的准确性有所判断。
核心概念:什么是“自定义数据”?
想象一下你点了一份外卖(向LLM应用提问)。普通的流式传输就像看着地图上外卖员的图标一点点靠近你(模型生成的文字一个个出来)。
而自定义数据,就是你想知道的额外信息,比如:
- “骑手已到店取餐”
- “厨师正在炒菜”
- “骑手正在等红灯”
这些信息不是“菜”本身,但能让你更清楚后台发生了什么,减少焦虑。在LLM中,这就是“模型在想什么”、“工具在做什么”。
两种“写信”的方式
现在,你想把这些“额外信息”(自定义数据)写进数据流里,送给前端用户。LangChain提供了两种主要的“写信”方式,适用于不同的场景。
方法一:LangGraph 的 StreamWriter
(给LangGraph工作流用的)
-
比喻:公司内部汇报系统
- 你在一个大公司(LangGraph)里,公司有很严格的规章制度和汇报流程。你想向上级(前端用户)汇报一个进度,你必须使用公司规定的专用报表(
StreamWriter
)来填写,然后通过公司的官方渠道发送。
- 你在一个大公司(LangGraph)里,公司有很严格的规章制度和汇报流程。你想向上级(前端用户)汇报一个进度,你必须使用公司规定的专用报表(
-
什么时候用?
- 只有当你在构建一个 LangGraph 类型的工作流时。
- LangGraph的工作流是由多个“节点”(Node)和“边”(Edge)组成的图(Graph)。在每个节点内部,你可以用这个
StreamWriter
来写信。
-
怎么用?(伪代码思路)
# 假设这是LangGraph中的一个节点(比如一个叫“research_tool”的工具节点) async def research_tool_node(state, stream_writer): # 注意!参数里有stream_writer# 1. 节点开始工作了,发一条自定义消息await stream_writer.write({"type": "info", "content": "正在上网搜索..."})# 2. 执行一些耗时操作,比如真的去搜索search_results = await google_search(state["question"])# 3. 操作完成,再发一条消息await stream_writer.write({"type": "info", "content": f"找到了{len(search_results)}条结果"})# 4. 返回结果,更新状态return {"results": search_results}
-
如何获取这个流?
- 你在前端调用时,使用LangGraph提供的
stream()
或astream()
方法来启动这个工作流,它就会自动把StreamWriter
写的信和模型生成的Token一起送出来。
- 你在前端调用时,使用LangGraph提供的
-
重要限制:
- 这是LangGraph的“家规”,只能在LangGraph里面用。如果你用的是传统的LCEL链(Chain),就用不了这个方法。
方法二:dispatch_events
/ adispatch_events
(更通用,给LCEL链用的)
-
比喻:公司公共广播系统
- 这家公司(LangChain)有一个全公司都能用的公共广播喇叭。任何部门、任何员工(任何组件)只要有需要,都可以跑去广播室,对着喇叭喊一句话(调用
dispatch_events
),全公司(整个应用流)都能听到。
- 这家公司(LangChain)有一个全公司都能用的公共广播喇叭。任何部门、任何员工(任何组件)只要有需要,都可以跑去广播室,对着喇叭喊一句话(调用
-
什么时候用?
- 当你在构建传统的 LCEL链(Chain) 时。这是更通用、更常见的方法。
- 其实在LangGraph的节点里也能用这个方法,但用
StreamWriter
更符合LangGraph的风格。
-
怎么用?(伪代码思路)
# 假设这是一个自定义的工具(Tool) from langchain_core.runnables import RunnableConfigasync def my_custom_tool(input: str, config: RunnableConfig = None):# 0. 先拿到“广播喇叭”event_dispatcher = config.get("callbacks").get_async_dispatcher() if config else None# 1. 工具开始工作了,用广播喇叭发一条消息if event_dispatcher:await event_dispatcher.emit({"type": "tool_start", "name": "my_tool", "input": input})# 2. 执行一些耗时操作result = await do_something_slow(input)# 3. 操作完成,再广播一条if event_dispatcher:await event_dispatcher.emit({"type": "tool_end", "name": "my_tool", "result": result})return result
-
如何获取这个流?
- 你在前端调用时,需要使用LCEL链的
astream_events()
方法来启动。这个方法专门负责监听全公司的“广播喇叭”,并把所有广播内容(自定义事件)和模型生成的Token一起送出来。
- 你在前端调用时,需要使用LCEL链的
-
优势:
- 通用:可以在几乎所有LangChain组件(工具、链、模型等)中使用。
- 统一:为LCEL链提供了流式传输自定义数据的标准方案。
总结对比
为了帮你更好地理解,我们把两者放在一个表格里:
特性 | LangGraph 的 StreamWriter | dispatch_events / adispatch_events |
---|---|---|
适用场景 | 仅限 LangGraph 工作流中的节点 | 通用,主要用于 LCEL 链 中的任何组件 |
比喻 | 公司内部汇报系统 | 公司公共广播系统 |
如何触发流 | 使用 LangGraph 的 stream() /astream() 方法 | 使用 Runnable 的 astream_events() 方法 |
灵活性 | 与LangGraph状态绑定,更结构化 | 非常灵活,可在任何地方调用 |
关键区别 | LangGraph的专属功能 | LCEL的标准功能 |
简单来说:
- 如果你在用 LangGraph 构建复杂的、有状态的多步骤智能体工作流,就在每个节点里使用
StreamWriter
。 - 如果你在用传统的 LCEL 方式组装链(
|
符号),那么就在你的自定义函数或工具里使用 dispatch_events
。