Langchain流式自定义生成器函数
https://python.langchain.com.cn/docs/expression_language/how_to/generators
这篇文档其实是教你用 LangChain 的 LCEL 语法做“流式数据处理”,核心就是把 AI 输出的流式内容(比如一句一句蹦出来的文字),按你的需求拆成想要的格式(比如逗号分隔的列表),还不丢流式的实时性。我帮你拆成“大白话+步骤拆解”,保证好懂。
一、先搞懂:这东西到底用来干嘛?
简单说,它解决一个常见问题:
AI 输出是“流式”的(比如 ChatGPT 一样,字一个一个出来),如果想实时处理这些输出(比如把“狮子,老虎,狼”拆成 ['狮子','老虎','狼']
),直接用普通函数会等所有内容都出来才处理,失去了“流式”的意义。
而这里的“生成器函数”,能 一边接 AI 的流式输出,一边实时处理、实时返回结果,比如 AI 刚输出“狮子,”,它就立刻返回 ['狮子']
,不用等后面的“老虎”。
它的2个最实用场景:
- 做“自定义输出解析”:比如把 AI 给的逗号/换行分隔内容,拆成列表、字典。
- 改流式输出内容:比如把 AI 输出的英文实时翻译成中文,同时保持流式。
二、必须记住的2个基础规则
不管写同步还是异步生成器,都要遵守:
- 函数输入输出都是“流式”:输入是 AI 流式输出的每一小块内容(比如“lion,”“tiger,”),输出也是你处理后的每一小块(比如
['lion']
['tiger']
)。 - 用
yield
而不是return
:return
会等所有内容处理完才一次性返回,yield
是处理一块就立刻输出一块,这才是“流式”的关键。
三、同步版:最常用,一步步带你看懂
同步版就是处理“普通流式数据”(比如 AI 同步返回的流式内容),文档里的例子是“把 AI 输出的逗号分隔动物名,拆成列表”,我拆成3步讲:
步骤1:先搭个“基础 AI 链”
先让 AI 能输出“狮子,老虎,狼,大猩猩,熊猫”这种内容,代码拆解:
# 1. 装必要的库(LangChain 核心 + OpenAI 模型)
%pip install --upgrade --quiet langchain langchain-openai# 2. 导入需要的工具
from typing import Iterator, List # 用来定义“流式”类型
from langchain.prompts.chat import ChatPromptTemplate # 写提示词的模板
from langchain_core.output_parsers import StrOutputParser # 把 AI 输出转成字符串
from langchain_openai import ChatOpenAI # 调用 OpenAI 模型# 3. 写提示词:让 AI 输出“5个类似{animal}的动物,用逗号分隔”
prompt = ChatPromptTemplate.from_template("Write a comma-separated list of 5 animals similar to: {animal}")# 4. 加载 AI 模型(temperature=0.0 表示输出固定,不随机)
model = ChatOpenAI(temperature=0.0)# 5. 搭基础链:提示词 → 模型 → 输出转字符串
str_chain = prompt | model | StrOutputParser()
这时调用 str_chain.stream({"animal": "bear"})
,会实时输出:lion, tiger, wolf, gorilla, panda
(字一个一个蹦)。
步骤2:写“自定义生成器函数”
核心是用 buffer
(暂存区)存还没处理完的内容,遇到逗号就拆,代码拆解:
# 函数作用:接收 AI 流式输出的字符串,输出流式的列表(比如把“lion,”拆成['lion'])
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:buffer = "" # 暂存区:存还没遇到逗号的内容(比如“lion”“tiger”)for chunk in input: # 循环接收 AI 输出的每一小块(比如先接“lion,”,再接“tiger,”)buffer += chunk # 把当前小块加到暂存区(比如 buffer 变成“lion,”)# 只要暂存区里有逗号,就拆!while "," in buffer:comma_index = buffer.index(",") # 找逗号的位置(比如“lion,”里逗号在4的位置)yield [buffer[:comma_index].strip()] # 输出逗号前的内容(“lion”),转成列表buffer = buffer[comma_index + 1 :] # 暂存区更新:只留逗号后面的内容(比如清空,等下一个“tiger,”)# 循环结束后,处理最后一块没有逗号的内容(比如“panda”)yield [buffer.strip()]
步骤3:把生成器加到链里,看效果
# 基础链 + 自定义生成器 = 最终的链
list_chain = str_chain | split_into_list# 1. 流式调用(实时输出每一块)
for chunk in list_chain.stream({"animal": "bear"}):print(chunk, flush=True)
# 输出结果:['lion'] → ['tiger'] → ['wolf'] → ['gorilla'] → ['panda'](一块一块蹦)# 2. 非流式调用(等所有内容处理完再输出)
list_chain.invoke({"animal": "bear"})
# 输出结果:['lion', 'tiger', 'wolf', 'gorilla', 'panda'](一个完整列表)
四、异步版:区别在哪里?
异步版是处理“异步流式数据”(比如用异步方式调用 AI 模型,更省资源),核心区别就3点,其他和同步版几乎一样:
- 函数定义加
async def
(表示异步函数); - 循环接收输入用
async for
(异步循环,不阻塞其他操作); - 调用时用
async for
循环,且要加await
(异步调用关键字)。
关键代码拆解:
# 1. 异步生成器函数
from typing import AsyncIteratorasync def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:buffer = ""async for chunk in input: # 这里变了:async for 接收异步流式输入buffer += chunkwhile "," in buffer:comma_index = buffer.index(",")yield [buffer[:comma_index].strip()]buffer = buffer[comma_index + 1 :]yield [buffer.strip()]# 2. 异步调用(必须在 async 函数里用)
async for chunk in list_chain.astream({"animal": "bear"}): # 这里变了:astream 是异步流式调用print(chunk, flush=True)# 3. 异步非流式调用
await list_chain.ainvoke({"animal": "bear"}) # 这里变了:ainvoke + await
五、一句话总结重点
- 想实时处理 AI 流式输出 → 用“生成器函数”;
- 同步用
def + for + yield
,调用用stream + for
; - 异步用
async def + async for + yield
,调用用astream + async for + await
; buffer
是暂存数据的“小仓库”,遇到目标符号(比如逗号)就拆,拆完就清空等下一波。
为了让你更方便上手,要不要我帮你整理一份 “简化版代码模板”?里面会去掉多余注释,只保留核心步骤,还会标注出“哪里可以改”(比如把逗号分隔改成换行分隔),你直接填参数就能用。