LangChain【2】之专业术语
文章目录
- 参考网站
- LangFuse的安装和使用
- 一 LangChain中的专业术语
- 1.1 消息(Messge)
- 🧩 LangChain 中的消息类型
- 📦 Message 的结构
- 🔰简单使用
- 1.2 聊天模型(Chatmodels)
- 1.2.1 `ChatModel` 接口
- 1.2.2 `BaseMessage` 类型
- 1.2.3 核心组件与使用方式
- 1.2.3.1 创建模型
- 1.2.3.2 调用模型
- 1.2.4 支持的功能
- 1.2.4.1 上下文记忆(History)
- 1.2.4.2 流式输出(Streaming)
- 1.2.4.3 工具调用(Function Calling / Tool Calling)
- 1.3 提示词模板(Prompt templates)
- 1.3.1 PromptTemplate(适用于纯文本)
- 1.3.2 ChatPromptTemplate(适用于聊天式对话)
- 1.3.3 MessagesPlaceholder(适用于历史对话)
- 1.4 输出解析器(Output Parsers)
- 1.4.1 常见Output Parser 类型
- 1.4.2 `StrOutputParser`
- 1.4.3 `JsonOutputParser`
- 1.4.4 `PydanticOutputParser`
- 1.4.5 `CommaSeparatedListOutputParser`
- 1.4.6 `BooleanOutputParser`
- 1.5链式调用(Chains)
- 1.6 LangChain表达式 (LCEL)
- 1.6.1 💱LCEL 特性
- 1.6.2 🧩 使用 LCEL 的完整 LangChain 流程
- 1.7 可运行接口(Runable interface)
- 🧱 Runnable 接口的核心能力
- 🧩 常见的 Runnable 类型
- 📦 Runnable 的使用场景
- 🔄 Runnable 的核心特性
- 🛠️ 自定义 Runnable 示例
- 🧪 Runnable 的调试与测试
- 1.8 流式处理(Stream)
- 📌 特点与优势
- 🧠 流式处理的核心概念
- 💡 注意事项
- ✅ 示例:流式翻译中文诗句为英文
- 1.9 事件流(Stream events)
- ⏳事件流概述
- 👍核心概念
- 🔖使用场景
- 📘示例代码
参考网站
- langchain中文网站
LangFuse的安装和使用
- LangFuse:开源LLM工程平台的革新实践
一 LangChain中的专业术语
1.1 消息(Messge)
- LangChain 中的
Message
是用于构建与语言模型交互的历史记录和上下文的核心数据结构。它主要用于组织对话中的多轮交互,包括系统消息、用户消息、助手(AI)消息等。
🧩 LangChain 中的消息类型
LangChain 提供了以下几种常见的消息类型:
消息类型 | 类名 | 说明 |
---|---|---|
SystemMessage | langchain_core.messages.SystemMessage | 表示系统的指令或角色设定,如“你是一个翻译官”、“你是一个客服助手”等 |
HumanMessage | langchain_core.messages.HumanMessage | 表示用户输入的内容,即人类发出的消息 |
AIMessage | langchain_core.messages.AIMessage | 表示 AI 助手生成的回复内容 |
FunctionMessage | langchain_core.messages.FunctionMessage | 表示调用工具后返回的结果,通常用于函数调用流程中 |
📦 Message 的结构
每个 Message
对象都包含以下主要属性:
content
: 字符串,表示消息内容additional_kwargs
: 可选字典,用于存储额外信息(如角色、工具调用参数等)response_metadata
: 可选元数据,例如模型响应的 token 数量、调用耗时等
🔰简单使用
from langchain_core.messages import SystemMessage, HumanMessage, AIMessagesystem_msg = SystemMessage(content="You are a helpful assistant.")
human_msg = HumanMessage(content="Translate '无人扶我青云志' to English.")
ai_msg = AIMessage(content="No one supports my lofty ambitions.")print(system_msg)
# 输出: SystemMessage(content='You are a helpful assistant.', additional_kwargs={}, response_metadata={})
1.2 聊天模型(Chatmodels)
- LangChain 中的 聊天模型(Chat Models) 是一类专门用于处理对话交互的模型接口。与传统的语言模型不同,聊天模型更注重于理解并生成符合上下文的对话内容,适用于构建对话系统、聊天机器人等应用场景。
- 适用场景
场景 | 描述 |
---|---|
聊天机器人 | 构建客服、助手类对话系统 |
多轮对话 | 支持带上下文的历史对话 |
翻译/摘要/问答 | 利用模型进行结构化文本处理 |
Agent 应用 | 结合工具调用,构建智能代理系统 |
1.2.1 ChatModel
接口
在 LangChain 中,ChatModel
是一个抽象接口,定义了聊天模型的基本行为,如接收消息列表、返回响应消息等。
- 主要实现类:
ChatOpenAI
:对接 OpenAI 的 GPT 系列模型(如 gpt-3.5-turbo, gpt-4)ChatAnthropic
:对接 Anthropic 的 Claude 模型ChatGoogleGenerativeAI
:对接 Google Gemini 模型- 自定义模型:可通过继承
BaseChatModel
实现
1.2.2 BaseMessage
类型
- 聊天模型的输入和输出都是基于
BaseMessage
对象组成的列表:
类型 | 含义 |
---|---|
HumanMessage | 用户发出的消息 |
AIMessage | 模型回复的消息 |
SystemMessage | 系统指令或角色设定 |
FunctionMessage | 函数调用结果(用于工具调用) |
ToolMessage | 工具调用返回的内容 |
示例:
from langchain_core.messages import HumanMessage, SystemMessagemessages = [SystemMessage(content="你是一个翻译助手"),HumanMessage(content="请将'无人扶我青云志,我自踏雪至山巅'翻译为英文")
]
1.2.3 核心组件与使用方式
1.2.3.1 创建模型
以 ChatOpenAI
为例:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
1.2.3.2 调用模型
使用 .invoke()
方法传入消息列表:
response = model.invoke(messages)
print(response.content)
输出示例:
No one supports my lofty ambitions; I will tread through the snow to reach the mountain peak on my own.
1.2.4 支持的功能
1.2.4.1 上下文记忆(History)
- 通过
MessagesPlaceholder
可以保留历史对话记录,让模型理解上下文。
from langchain_core.prompts import MessagesPlaceholderprompt = ChatPromptTemplate.from_messages([("system", "你是客服助手"),MessagesPlaceholder("history"),("user", "{input}")
])
1.2.4.2 流式输出(Streaming)
- 支持流式输出,逐字打印模型响应,提升用户体验。
for chunk in chain.stream({"system_message": "你是一个翻译助手","user_input": "无人扶我青云志,我自踏雪至山巅"
}):print(chunk, end="", flush=True)
1.2.4.3 工具调用(Function Calling / Tool Calling)
- 可用于调用外部 API 或执行特定功能。
from langchain_core.messages import ToolCalltool_call = ToolCall(name="search_internet", content={"query": "最新新闻"})
1.3 提示词模板(Prompt templates)
-
LangChain 的 提示词模板(Prompt Templates) 是构建与大语言模型交互时的核心组件之一。它允许你以结构化、可复用的方式定义输入提示,从而动态地生成最终的提示内容。
-
Prompt Template 是一种 参数化模板,用于在运行时将变量插入到固定的文本中,从而生成最终发送给 LLM 的提示。
-
提示词模板的输入是一个字典,其中每个键表示要填充的提示词模板中的变量。
-
提示词模板分为:PromptTemplate、ChatPromptTemplate、消息占位符
1.3.1 PromptTemplate(适用于纯文本)
- 字符串提示词模板:最基础的模板,用于生成字符串提示。
from langchain_core.prompts import PromptTemplate prompt_template = PromptTemplate.from_template("Tell me a joke about {topic}") prompt_template.invoke({"topic": "cats"})
StringPromptValue(text='Tell me a joke about cats')
1.3.2 ChatPromptTemplate(适用于聊天式对话)
- ChatPromptTemplate(适用于聊天式对话):专为支持角色(system/user/assistant)设计的模板,常用于 chat 模型
from langchain_core.prompts import ChatPromptTemplate prompt_template = ChatPromptTemplate.from_messages([("system", "You are a helpful assistant"),("user", "Tell me a joke about {topic}") ]) prompt_template.invoke({"topic": "cats"})
ChatPromptValue(messages=[SystemMessage(content='You are a helpful assistant', additional_kwargs={}, response_metadata={}), HumanMessage(content='Tell me a joke about cats', additional_kwargs={}, response_metadata={})])
1.3.3 MessagesPlaceholder(适用于历史对话)
-
MessagesPlaceholder:用于占位一组消息,常用于历史对话管理。
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import HumanMessage prompt_template = ChatPromptTemplate.from_messages([("system", "You are a helpful assistant"),MessagesPlaceholder("msgs") ]) prompt_template.invoke({"msgs": [HumanMessage(content="hi!")]})
ChatPromptValue(messages=[SystemMessage(content='You are a helpful assistant', additional_kwargs={}, response_metadata={}), HumanMessage(content='hi!', additional_kwargs={}, response_metadata={})])
from langchain_core.messages import HumanMessage from langchain_core.prompts import MessagesPlaceholderprompt = ChatPromptTemplate.from_messages([("system", "You are a helpful assistant"),MessagesPlaceholder("history"),("user", "{input}") ])prompt.invoke({"history": [HumanMessage(content="Hi")],"input": "How are you?" })
ChatPromptValue(messages=[SystemMessage(content='You are a helpful assistant', additional_kwargs={}, response_metadata={}), HumanMessage(content='Hi', additional_kwargs={}, response_metadata={}), HumanMessage(content='How are you?', additional_kwargs={}, response_metadata={})])
1.4 输出解析器(Output Parsers)
- LangChain 的 输出解析器(Output Parsers) 是用于将大语言模型(LLM)的原始输出转换为结构化、可操作的数据格式的重要组件。它在 LangChain 流程中扮演着“翻译官”的角色,帮助开发者更方便地处理和使用模型生成的内容。
- LLM 输出通常是自由文本(字符串),但实际应用中我们往往希望得到:结构化的数据(如 JSON)、特定格式的结果(如布尔值、数字、列表等)、易于后续程序处理的形式。
- Output Parsers 正是为此设计的:它能将非结构化的文本结果转化为结构化对象或特定类型。
1.4.1 常见Output Parser 类型
解析器 | 描述 | 使用场景 |
---|---|---|
StrOutputParser | 直接返回模型输出的字符串内容 | 简单文本输出 |
JsonOutputParser | 将输出解析为 JSON 格式 | 需要结构化数据时 |
PydanticOutputParser | 使用 Pydantic 模型定义结构并解析输出 | 强类型校验与结构化输出 |
CommaSeparatedListOutputParser | 将输出按逗号分割成列表 | 输出多个选项或关键词 |
EnumOutputParser | 限制输出为指定枚举值 | 控制输出范围,如 yes/no、分类标签 |
BooleanOutputParser | 将输出解析为布尔值 | 判断是否满足条件 |
1.4.2 StrOutputParser
- 适用于直接获取模型输出的文本内容。
from langchain_core.output_parsers import StrOutputParserparser = StrOutputParser()
output = parser.invoke("Hello, world!")
# 输出: "Hello, world!"
1.4.3 JsonOutputParser
- 适用于模型输出 JSON 字符串的情况。
from langchain_core.output_parsers import JsonOutputParserparser = JsonOutputParser()
output = parser.invoke('{"name": "Alice", "age": 30}')
# 输出: {'name': 'Alice', 'age': 30}
1.4.4 PydanticOutputParser
适用于需要强类型约束的结构化输出。
- 首先定义一个 Pydantic 模型:
from pydantic import BaseModel
from langchain_core.output_parsers import PydanticOutputParserclass Person(BaseModel):name: strage: intparser = PydanticOutputParser(pydantic_object=Person)
output = parser.invoke("{'name': 'Bob', 'age': 25}")
# 输出: Person(name='Bob', age=25)
1.4.5 CommaSeparatedListOutputParser
- 适用于模型输出多个关键词或项目的情况。
from langchain_core.output_parsers import CommaSeparatedListOutputParserparser = CommaSeparatedListOutputParser()
output = parser.invoke("apple, banana, orange")
# 输出: ['apple', 'banana', 'orange']
1.4.6 BooleanOutputParser
- 适用于判断是否满足某个条件,如“是否包含敏感词”。
from langchain_core.output_parsers import BooleanOutputParserparser = BooleanOutputParser()
output = parser.invoke("Yes")
# 输出: True
import os
from langchain_openai import ChatOpenAI
from langfuse.callback import CallbackHandler
from langchain.chains import LLMChain # 导入链模块
from langchain_core.prompts import ChatPromptTemplate # 导入提示模板
from langchain_core.output_parsers import StrOutputParser
# 配置 API 易环境
os.environ["OPENAI_API_KEY"] = "hk-xxx" # 从API易后台获取
os.environ["OPENAI_API_BASE"] = "https://api.openai-hk.com/v1" # API易基础URL# 配置Langfuse
langfuse_handler = CallbackHandler(public_key="pk-lf-xxx",secret_key="sk-lf-xxx",host="http://192.168.1.21:3000"
)# 创建一个大语言模型
model = ChatOpenAI(model="gpt-3.5-turbo")# 定义提示模板
prompt = ChatPromptTemplate.from_messages([("system", "{system_message}"),("user", "{user_input}")
])# 创建链
chain = LLMChain(llm=model, prompt=prompt)# 定义传递给模型的消息内容
system_message = "把下面的语句翻译为英文。"
user_input = "无人扶我青云志,我自踏雪至山巅"# 调用链并打印结果
response = chain.invoke({"system_message": system_message, "user_input": user_input},config={"callbacks": [langfuse_handler]}
)parser=StrOutputParser()
print(parser.invoke(response)) # 输出模型生成的内容
{'system_message': '把下面的语句翻译为英文。', 'user_input': '无人扶我青云志,我自踏雪至山巅', 'text': 'No one supports my ambition to rise to the top, I will tread through the snow to reach the summit on my own.'}
1.5链式调用(Chains)
- LangChain 的链式调用方式(即 RunnableSequence 或 | 操作符),可以将提示模板、模型和输出解析器串联起来,形成一个完整的处理流程。这样代码更简洁、可读性更强。
#%% import os from langchain_openai import ChatOpenAI from langfuse.callback import CallbackHandler from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser# 配置 API 易环境 os.environ["OPENAI_API_KEY"] = "hk-xxx" os.environ["OPENAI_API_BASE"] = "https://api.openai-hk.com/v1"# 配置Langfuse langfuse_handler = CallbackHandler(public_key="pk-lf-xxx",secret_key="sk-lf-xxx",host="http://192.168.1.21:3000" )# 定义提示模板 prompt = ChatPromptTemplate.from_messages([("system", "{system_message}"),("user", "{user_input}") ])# 创建模型 model = ChatOpenAI(model="gpt-3.5-turbo")# 创建输出解析器 parser = StrOutputParser()# 使用链式调用构建完整流程 chain = prompt | model | parser# 定义输入内容 system_message = "把下面的语句翻译为英文。" user_input = "无人扶我青云志,我自踏雪至山巅"dian# 调用链并打印结果,同时传入回调处理器 response = chain.invoke({"system_message": system_message, "user_input": user_input},config={"callbacks": [langfuse_handler]} )print(response) # 输出模型生成的内容
No one supports my ambition to reach the sky; I will ascend to the mountain peak on my own through the snow.
1.6 LangChain表达式 (LCEL)
- LangChain Expression Language(LCEL)是 LangChain 提供的一种声明式方式,用于构建可组合、可扩展的链式调用流程。它支持同步和异步操作,并且可以轻松集成提示模板、模型、解析器、回调等组件。
1.6.1 💱LCEL 特性
功能 | 描述 |
---|---|
`prompt | model |
RunnableSequence | LangChain 内部自动创建的链结构,支持 .invoke() 和 .stream() |
config={"callbacks": [...]} | 支持回调机制,如 Langfuse、日志记录等 |
可扩展性强 | 可以加入中间处理步骤,例如过滤、重试、缓存等 |
1.6.2 🧩 使用 LCEL 的完整 LangChain 流程
使用 LCEL 构建一个完整的 LangChain 流程:
- 使用
ChatPromptTemplate
定义提示 - 使用
ChatOpenAI
调用大模型 - 使用
StrOutputParser
解析输出 - 使用
RunnableSequence
链接整个流程 - 支持
langfuse
回调追踪调用过程
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langfuse.callback import CallbackHandler# 配置 API 易环境
os.environ["OPENAI_API_KEY"] = "hk-xxx"
os.environ["OPENAI_API_BASE"] = "https://api.openai-hk.com/v1"# 配置 Langfuse 回调处理器
langfuse_handler = CallbackHandler(public_key="pk-lf-xxx",secret_key="sk-lf-xxx",host="http://192.168.1.21:3000"
)# 1. 定义 Prompt 模板
prompt = ChatPromptTemplate.from_messages([("system", "{system_message}"),("user", "{user_input}")
])# 2. 创建 LLM 模型
model = ChatOpenAI(model="gpt-3.5-turbo")# 3. 创建输出解析器
parser = StrOutputParser()# 4. 使用 LCEL 构建 RunnableSequence 链条
chain = prompt | model | parser# 5. 定义输入内容
input_data = {"system_message": "请将以下诗句翻译为英文:","user_input": "无人扶我青云志,我自踏雪至山巅"
}# 6. 调用链并获取结果
response = chain.invoke(input_data,config={"callbacks": [langfuse_handler]}
)# 7. 打印结果
print("翻译结果:", response)
翻译结果:No one supports my lofty ambitions; I will tread through the snow to reach the mountain peak on my own.
1.7 可运行接口(Runable interface)
- LangChain 的 Runnable Interface(可运行接口) 是 LangChain 提供的一组统一的、链式调用风格的接口,用于构建和组合语言模型应用。它使得各种组件(如提示模板、LLM、解析器、工具等)可以像函数一样被调用,并且支持同步和异步操作。
🧱 Runnable 接口的核心能力
Runnable
接口定义了几个关键方法,这些方法构成了LangChain 链式调用的基础:
方法 | 说明 |
---|---|
.invoke(input, config) | 同步调用,处理单个输入并返回结果 |
.batch(inputs, config) | 批量处理多个输入 |
.stream(input, config) | 流式处理,逐块输出结果(适用于聊天机器人、流式生成等场景) |
.astream(input, config) | 异步流式处理 |
.bind(**kwargs) | 绑定参数,用于部分应用(partial application) |
.with_config(config) | 设置配置(如回调、超时等) |
🧩 常见的 Runnable 类型
类型 | 说明 |
---|---|
RunnableLambda | 将任意 Python 函数封装为 Runnable |
RunnableMap | 并行执行多个 Runnable,常用于字段映射 |
RunnableBranch | 条件分支,根据条件选择不同的 Runnable 分支 |
RunnableWithMessageHistory | 支持带消息历史记录的 Runnable |
RunnableParallel | 并行执行多个 Runnable |
RunnableGenerator | 支持流式输出的 Runnable |
RunnableSerializable | 可序列化的 Runnable,可用于 API 服务中传输 |
📦 Runnable 的使用场景
- 构建 LLM 应用流水线:将提示模板、模型、解析器串联成一条完整的推理链。
chain = prompt | model | parser
- 添加中间处理逻辑:你可以插入自定义函数,进行数据清洗、格式转换等。
def add_prefix(text):return "Prefix: " + textchain = prompt | model | parser | add_prefix
- 流式输出:适用于需要逐步输出内容的场景,例如聊天机器人、长文本生成。
for chunk in chain.stream({"topic": "dogs"}):print(chunk, end="", flush=True)
- 带历史记录的对话系统:结合
RunnableWithMessageHistory
和ChatMessageHistory
实现记忆功能。
from langchain_core.runnables import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistorychain = prompt | model | parser
with_message_history = RunnableWithMessageHistory(chain,lambda session_id: message_history, # 获取历史记录input_messages_key="input",history_messages_key="history"
)response = with_message_history.invoke({"input": "What's the weather like today?", "session_id": "abc123"}
)
🔄 Runnable 的核心特性
特性 | 描述 |
---|---|
一致性 | 所有组件都遵循相同的接口(invoke/batch/stream),易于组合 |
可扩展性 | 支持自定义 Runnable,便于集成第三方服务 |
可组合性 | 多种组合方式(串行、并行、分支) |
可观察性 | 支持回调机制(如 Langfuse、日志记录) |
异步支持 | 支持 async/await 编程范式 |
🛠️ 自定义 Runnable 示例
- 将任意函数包装为
RunnableLambda
,从而加入到链中:
from langchain_core.runnables import RunnableLambdadef to_upper(s: str) -> str:return s.upper()chain = prompt | model | parser | RunnableLambda(to_upper)
🧪 Runnable 的调试与测试
由于每个 Runnable 都是独立的单元,你可以单独测试每个步骤:
# 单独测试 prompt
print(prompt.invoke({"topic": "cats"}))# 单独测试 model
print(model.invoke(prompt.invoke({"topic": "cats"})))# 整体调用
print(chain.invoke({"topic": "cats"}))
1.8 流式处理(Stream)
- LangChain 的 流式处理(Streaming) 是一种逐块(chunk-by-chunk)获取模型输出的能力,而不是等待整个响应生成完毕后一次性返回。这种机制非常适合构建实时交互场景,例如聊天机器人、问答系统等。
📌 特点与优势
特性 | 描述 |
---|---|
低延迟体验 | 用户无需等待完整响应,可以边生成边显示 |
适用于对话系统 | 在聊天界面中实现“打字效果” |
节省内存资源 | 不需要一次性加载全部结果 |
兼容性强 | 支持 LCEL、回调处理器(如 Langfuse)、中间处理函数等 |
🧠 流式处理的核心概念
stream()
方法
- LangChain提供
.stream()
方法用于启动流式调用。 - 它会逐步返回模型生成的每个 token 或文本片段。
- 支持与 LCEL(LangChain Expression Language)结合使用。
for chunk in chain.stream(input_data):print(chunk, end="", flush=True)
Runnable
接口
- 所有支持
.stream()
的对象都实现了Runnable
接口。 - 包括:
ChatPromptTemplate
,ChatOpenAI
,StrOutputParser
等。
💡 注意事项
- 并非所有模型都支持流式输出,需确认 API 是否开启 streaming 参数。
- OpenAI 默认支持流式 (
stream=True
)。 - 如果你使用的是本地模型或代理服务,确保其也支持流式响应格式。
- 流式输出在 Jupyter Notebook 中可能不会立即刷新,建议使用
flush=True
强制刷新输出缓冲区。
✅ 示例:流式翻译中文诗句为英文
- 🧩 结合 Langfuse 进行追踪,加入
langfuse_handler
回调。
import os
from langchain_openai import ChatOpenAI
from langfuse.callback import CallbackHandler
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser# 设置环境变量
os.environ["OPENAI_API_KEY"] = "hk-xxx"
os.environ["OPENAI_API_BASE"] = "https://api.openai-hk.com/v1"# 配置Langfuse
langfuse_handler = CallbackHandler(public_key="pk-xxx",secret_key="sk-xxx",host="http://192.168.1.21:3000"
)# 定义 Prompt 模板
prompt = ChatPromptTemplate.from_messages([("system", "你是一个专业的翻译官,请将以下内容翻译成英文:"),("user", "{input}")
])# 创建模型和解析器
model = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()# 构建链
chain = prompt | model | parser# 输入数据
input_data = {"input": "无人扶我青云志,我自踏雪至山巅"}# 启动同步流式处理
print("流式输出:")
for chunk in chain.stream(input_data,config={"callbacks": [langfuse_handler]}):print(chunk, end="", flush=True) ## 启动异步流式处理
print("流式输出:")
async for chunk in chain.astream(input_data, config={"callbacks": [langfuse_handler]}):print(chunk, end="", flush=True)
- 异步调用的另外一种方式:
import asyncio# 定义一个异步函数来处理流式输出
async def stream_output():print("流式输出:")async for chunk in chain.astream(input_data, config={"callbacks": [langfuse_handler]}):print(chunk, end="", flush=True)# 调用异步函数
asyncio.run(stream_output())
1.9 事件流(Stream events)
- 在 LangChain 中,事件流(Event Stream)是处理长时间运行任务的一种方式,它可以让你以异步的方式接收来自模型或应用程序执行过程中的中间结果。
⏳事件流概述
- 定义:事件流是一种机制,允许开发者订阅并监听发生在 LangChain 应用程序中的不同类型的事件。
- 用途:通过事件流可以获取到模型生成文本的实时更新,比如逐词生成的结果;也可以用来监控应用的状态变化或者错误信息。
👍核心概念
- 事件类型:包括但不限于
on_llm_start
,on_llm_end
,on_llm_error
等等,这些事件对应了 LLM 调用的不同阶段。 - 回调函数:用户需要实现特定的回调方法来处理接收到的事件。例如,当一个新的 token 被生成时,会触发相应的回调。
🔖使用场景
- 实时显示模型输出给最终用户。
- 对模型响应进行日志记录以便后续分析。
- 在生成过程中插入自定义逻辑,如过滤敏感内容或添加额外的安全检查。
📘示例代码
- 简单例子展示如何使用事件流来捕获模型生成的信息。
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.callbacks.base import BaseCallbackHandler
from langchain.callbacks.manager import CallbackManager
from langfuse.callback import CallbackHandler as LangfuseCallbackHandler# 自定义回调处理器
class MyCustomHandler(BaseCallbackHandler):def on_llm_start(self, serialized, prompts, **kwargs):print("LLM 开始运行...")def on_llm_end(self, response, **kwargs):print("\nLLM 结束运行.")def on_llm_new_token(self, token: str, **kwargs):print(token, end="", flush=True)def on_llm_error(self, error: Exception, **kwargs):print(f"\n发生错误:{error}")# 设置环境变量
os.environ["OPENAI_API_KEY"] = "hk-xxx"
os.environ["OPENAI_API_BASE"] = "https://api.openai-hk.com/v1"# 配置 Langfuse 回调处理器
langfuse_handler = LangfuseCallbackHandler(public_key="pk-lf-xxx",secret_key="sk-lf-xxx",host="http://192.168.1.21:3000"
)# 自定义回调处理器
custom_handler = MyCustomHandler()# 创建回调管理器
callback_manager = CallbackManager([custom_handler, langfuse_handler])# 定义 Prompt 模板
prompt = ChatPromptTemplate.from_messages([("system", "你是一个专业的翻译官,请将以下内容翻译成英文:"),("user", "{input}")
])# 创建模型和解析器
model = ChatOpenAI(model="gpt-3.5-turbo", callback_manager=callback_manager)
parser = StrOutputParser()# 构建链
chain = prompt | model | parser# 输入数据
input_data = {"input": "无人扶我青云志,我自踏雪至山巅"}# 启动流式处理
print("流式输出:")
async for chunk in chain.astream_events(input_data, config={"callbacks": [custom_handler, langfuse_handler]}):print(chunk, end="", flush=True)