当前位置: 首页 > news >正文

langchain从入门到精通(六)——LCEL 表达式与 Runnable 可运行协议

1. 多组件 invoke 嵌套的缺点

prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
# 获取输出内容
content = parser.invoke(
llm.invoke(
prompt.invoke(
{"query": req.query.data}
)
)
)

这种写法虽然能实现对应的功能,但是存在很多缺陷:

  1. 嵌套式写法让程序的维护性与可阅读性大大降低,当需要修改某个组件时,变得异常困难。
  2. 没法得知每一步的具体结果与执行进度,出错时难以排查。
  3. 嵌套式写法没法集成大量的组件,组件越来越多时,代码会变成“一次性”代码

2. 手写一个"Chain"优化代码

观察发现,虽然 Prompt、Model、OutputParser 分别有自己独立的调用方式,例如:
Prompt 组件 :format、invoke、to_string、to_messages。
Model 组件 :generate、invoke、batch。
OutputParser 组件 :parse、invoke

但是有一个共同的调用方法:invoke,并且每一个组件的输出都是下一个组件的输入,是否可以将所有组件组装得到一个列表,然后循环依次调用 invoke 执行每一个组件,然后将当前组件的输出作为下一个组件的输入。

from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
class Chain: steps: list = []
def __init__(self, steps: list): self.steps = steps
def invoke(self, input: Any) -> Any: output: Any = input
for step in self.steps: output = step.invoke(output)
print(step)
print("执行结果:", output)
print("===============")
return output
chain = Chain([prompt, llm, parser])
print(chain.invoke({"query": "你好,你是?"}))

输出结果为:

input_variables=['query'] messages=
[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['query'],
template='{query}'))]
执行结果: messages=[HumanMessage(content='你好,你是?')] ===============
client=<openai.resources.chat.completions.Completions object at
0x000001C6BF694310> async_client=
<openai.resources.chat.completions.AsyncCompletions object at 0x000001C6BF695BD0> model_name='gpt-3.5-turbo-16k' openai_api_key=SecretStr('**********') openai_api_base='https://api.xty.app/v1' openai_proxy='' 执行结果: content='你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样
的问题,帮助解决问题,提供信息和创意。有什么我可以帮助你的吗?' response_metadata=
{'token_usage': {'completion_tokens': 72, 'prompt_tokens': 13, 'total_tokens': 85}, 'model_name': 'gpt-3.5-turbo-16k', 'system_fingerprint': 'fp_b28b39ffa8',
'finish_reason': 'stop', 'logprobs': None} id='run-5bf9e183-4b28-4be9-bf65- ce0ad9590785-0' =============== 执行结果: 你好!我是 ChatGPT,一个由OpenAI开发的人工智能语言模型。我可以回答各种各样的问题,帮
助解决问题,提供信息和创意。有什么我可以帮助你的吗?
=============== 你好!我是 ChatGPT,一个由OpenAI开发的人工智

3. Runnable 简介与 LCEL 表达式

为了尽可能简化创建自定义链,LangChain 官方实现了一个 Runnable 协议,这个协议适用于
LangChain 中的绝大部分组件,并实现了大量的标准接口,涵盖:

  1. stream :将组件的响应块流式返回,如果组件不支持流式则会直接输出。
  2. invoke :调用组件并得到对应的结果。
  3. batch :批量调用组件并得到对应的结果。
  4. astream :stream 的异步版本。
  5. ainvoke :invoke 的异步版本。
  6. abatch :batch 的异步版本。
  7. astream_log :除了流式返回最终响应块之外,还会流式返回中间步骤。
    除此之外,在 Runnable 中还重写了 orror 方法,这是 Python 中 | 运算符的计算逻辑,所有的 Runnable 组件,均可以通过 | 或者 pipe() 的方式将多个组件拼接起来形成一条链。

在这里插入图片描述

组件输入类型输出类型
Prompt 提示Dict 字典PromptValue 提示值
ChatModel 聊天模型字符串、聊天消息列表、PromptValue提示值ChatMessage 聊天消息
LLM 大语言模型字符串、聊天消息列表、PromptValue提示值String 字符串
OutputParser 输出解析器LLM 或聊天模型的输出取决于解析器
Retriever 检索器单个字符串List of Document 文档列表
Tool 工具字符串、字典或取决于工具取决于工具

例如上面自定义"Chain"的写法等同于如下的代码:

from typing import Any
import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
prompt = ChatPromptTemplate.from_template("{query}")
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
parser = StrOutputParser()
chain = prompt | llm | parser # 等价于以下写法
composed_chain_with_pipe = (
RunnableParallel({"query": RunnablePassthrough()})
.pipe(prompt)
.pipe(llm)
.pipe(parser)
)
print(chain.invoke({"query": "你好,你是?"}))

Runnable 底层的运行逻辑本质上也是将每一个组件添加到列表中,然后按照顺序执行并返回最终结果,这里参考核心源码:

def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context
# setup callbacks and context config = config_with_context(ensure_config(config), self.steps) callback_manager = get_callback_manager_for_config(config)
# start the root run
run_manager = callback_manager.on_chain_start(
dumpd(self),
input, name=config.get("run_name") or self.get_name(), run_id=config.pop("run_id", None),
)
# 调用所有步骤并逐个执行得到对应的输出,然后作为下一个的输入
try:
for i, step in enumerate(self.steps):
input = step.invoke(
input, # mark each step as a child run patch_config( config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)

4. 两个Runnable核心类的讲解与使用

1. RunnableParallel 并行运行

RunnableParallel 是 LangChain 中封装的支持运行多个 Runnable 的类,一般用于操作 Runnable 的输出,以匹配序列中下一个 Runnable 的输入,起到并行运行 Runnable 并格式化输出结构的作用。例如 RunnableParallel 可以让我们同时执行多条 Chain,然后以字典的形式返回各个 Chain 的结果,对比每一条链单独执行,效率会高很多。

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排2个提示模板
joke_prompt = ChatPromptTemplate.from_template("请讲一个关于{subject}的冷笑话,尽可能
短")
poem_prompt = ChatPromptTemplate.from_template("请写一篇关于{subject}的诗,尽可能短")
# 2.创建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.构建两条链
joke_chain = joke_prompt | llm | parser poem_chain = poem_prompt | llm | parser
# 5.使用RunnableParallel创建并行可运行
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
# 6.运行并行可运行组件得到响应结果
resp = map_chain.invoke({"subject": "程序员"})
print(resp)

输出内容:

{'joke': '为什么程序员总是用尺子测电脑屏幕?因为他们听说了“像素”是屏幕上的一种“尺寸”。',
'poem': '在代码的海洋里徜徉,\n程序员心怀梦想与创意。\n键盘敲击是旋律,\nbug 是诗歌的瑕疵。
\n\n算法如诗的韵律,\n逻辑是句子的构思。\n编程者如诗人般,\n创造出数字的奇迹。'}

除了并行执行,RunnableParallel 还可以用于操作 Runnable 的输出,用于产生符合下一个 Runnable 组件的数据。
例如:用户传递数据,并行执行检索策略得到上下文随后传递给 Prompt 组件,如下。

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模拟一个检索器,传入查询query,输出文本""" print("执行检索:", query)
return "我叫老铁,是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下
文进行回复。
<context>
{context} <context> 用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链
chain = RunnableParallel( context=retrieval, query=RunnablePassthrough(),
) | prompt | llm | parser
# 5.调用链生成结果
content = chain.invoke("你好,我叫什么?")
print(content)

输出内容:

执行检索: 你好,我叫什么?
你好,你叫老铁。

在创建 RunnableParallel 的时候,支持传递字典、函数、映射、键值对数据等多种方式,
RunnableParallel 底层会执行检测并将数据统一转换为 Runnable,核心源码如下:

# langchain-core/runnables/base.py
def __init__( self, steps__: Optional[
Mapping[
str, Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
]
] = None, **kwargs: Union[
Runnable[Input, Any], Callable[[Input], Any], Mapping[str, Union[Runnable[Input, Any], Callable[[Input], Any]]],
],
) -> None: # 1.检测是否传递字典,如果传递,则提取字段内的所有键值对
merged = {**steps__} if steps__ is not None else {}
# 2.传递了键值对,则将键值对更新到merged进行合并
merged.update(kwargs) super().__init__( # type: ignore[call-arg]
# 3.循环遍历merged的所有键值对,并将每一个元素转换成Runnable
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)

除此之外,在 Chains 中使用时,可以简写成字典的方式, orror 会自动将字典转换成
RunnableParallel,核心源码:

# langchain_core/runnables/base.py
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]: """Coerce a runnable-like object into a Runnable.
Args:
thing: A runnable-like object.
Returns: A Runnable. """
if isinstance(thing, Runnable): return thing
elif is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing) elif callable(thing): return RunnableLambda(cast(Callable[[Input], Output], thing)) elif isinstance(thing, dict): # 如果类型为字典,使用字典创建RunnableParallel并转换成Runnable格式
return cast(Runnable[Input, Output], RunnableParallel(thing)) else:
raise TypeError(
f"Expected a Runnable, callable or dict."
f"Instead got an unsupported type: {type(thing)}"
)

2. RunnablePassthrough 传递数据

除了 RunnablePassthrough,在 LangChain 中,另外一个高频使用的 Runnable 类是RunnablePassthrough,这个类透传上游参数输入,简单来说,就是可以获取上游的数据,并保持不变或者新增额外的键。通常与 RunnableParallel 一起使用,将数据分配给映射中的新键。
例如:使用 RunnablePassthrough 来简化 invoke 的调用流程。

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
# 1.编排prompt
prompt = ChatPromptTemplate.from_template("{query}")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo-16k")
# 3.创建链
chain = {"query": RunnablePassthrough()} | prompt | llm | StrOutputParser()
# 4.调用链并获取结果
content = chain.invoke("你好,你是")
print(content)

输出内容:

你好!是的,我是ChatGPT,你需要什么帮助吗?

RunnablePassthrough() 获取的是整个输入的内容(字符串或者字典),如果想获取字典内的某个部
分,可以使用 itemgetter 函数,并传入对应的字段名即可,如下:

from operator import itemgetter
chain = {"query": itemgetter("query")} | prompt | llm | StrOutputParser()
content = chain.invoke({"query": "你好,你是?"})

除此之外,如果想在传递的数据中添加数据,还可以使用 RunnablePassthrough.assign() 方法来实现快速添加。例如:为

import dotenv
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
dotenv.load_dotenv()
def retrieval(query: str) -> str: """模拟一个检索器,传入查询query,输出文本""" print("执行检索:", query)
return "我叫老铁,是一名AI应用开发工程师。"
# 1.编排Prompt
prompt = ChatPromptTemplate.from_template("""请根据用户的提问回答问题,可以参考对应的上下
文进行回复。
<context>
{context} <context> 用户的问题是: {query}""")
# 2.构建大语言模型
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 3.创建输出解析器
parser = StrOutputParser()
# 4.编排链,RunnablePassthrough.assign写法
chain = (
RunnablePassthrough.assign(context=lambda query: retrieval(query)) |
prompt |
llm |
parser
)
# 5.调用链生成结果
content = chain.invoke({"query": "你好,我叫什么?"})
print(content)

5. LangChain 中 Runnable 的进阶组合用法


1. 整体流程说明

一个典型的 AI 应用链可以分为以下几个模块,每个模块都是一个 Runnable

用户输入 → Prompt 模板 → LLM 模型 → 输出解析器 → 最终输出

2. 所需组件导入

from langchain_core.runnables import RunnableLambda, RunnableMap
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

3. 定义 Prompt 模板(PromptTemplate)

prompt = PromptTemplate.from_template("请用中文写一首关于{topic}的诗。")

这是一个可填充的提示模板,接收参数 topic


4. 初始化 LLM 模型(ChatOpenAI)

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)

你也可以替换为 Claude、Gemini、Qwen 等其他模型。


5. 定义输出解析器(StrOutputParser)

parser = StrOutputParser()

用于将 LLM 的结构化输出(如 JSON、Message)转换为纯字符串。


6. 构建完整的链(组合多个 Runnable)

# 将输入 topic 映射为 prompt 所需格式
input_mapper = RunnableLambda(lambda x: {"topic": x})# 构建完整链条:输入 → Prompt → LLM → 解析器
chain = input_mapper | prompt | llm | parser

7. 调用链条(invoke、batch、stream)

7.1 单次调用
result = chain.invoke("长城")
print(result)
7.2 批量调用
topics = ["长江", "黄山", "故宫"]
results = chain.batch(topics)
print(results)
7.3 流式输出(逐 token 打印)
for chunk in chain.stream("泰山"):print(chunk, end="")

8. 使用 RunnableMap 处理多输入字段

如果你的提示模板需要多个输入字段(如 productaudience),可以使用 RunnableMap

prompt = PromptTemplate.from_template("请写一个关于{product},面向{audience}的广告文案。")# 输入映射器:将用户输入拆分为多个字段
input_map = RunnableMap({"product": lambda x: x["product"],"audience": lambda x: x["audience"]
})# 构建链
chain = input_map | prompt | llm | parser# 调用
result = chain.invoke({"product": "智能手表","audience": "年轻上班族"
})
print(result)

9. 使用 OutputParser 解析 JSON 输出(可选)

如果你的 LLM 输出是结构化 JSON,可以使用 JsonOutputParser

from langchain.output_parsers import JsonOutputParserparser = JsonOutputParser()

然后在提示中引导模型输出 JSON 格式:

prompt = PromptTemplate.from_template("""
请以 JSON 格式回答以下问题:
问题:{question}
输出格式:
{{"answer": "...","confidence": "高/中/低"
}}
""")

10. 总结:链式组合的优势

特性描述
可组合各模块可自由组合、替换
可调试每个模块可单独测试
可扩展支持加入工具、检索器、函数调用等
可追踪可与 LangSmith 集成,进行链路追踪

完整组合链一览

from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI# 模块定义
prompt = PromptTemplate.from_template("请写一首关于{topic}的七言绝句。")
llm = ChatOpenAI(model="gpt-3.5-turbo")
parser = StrOutputParser()# 输入映射
input_mapper = RunnableLambda(lambda x: {"topic": x})# 链式组合
chain = input_mapper | prompt | llm | parser# 调用链
print(chain.invoke("西湖"))

相关文章:

  • 同旺科技 USB TO SPI / I2C适配器(专业版)--EEPROM读写——B
  • 出现端口占用,关闭端口进程命令
  • LeetCode 第73题:矩阵置零
  • EngineAI 1. Start/Resume Training
  • 1.1 Linux 编译FFmpeg 4.4.1
  • Git详解:初学者完全指南
  • Git 工作流与版本管理策略
  • 日本生活:日语语言学校-日语作文-沟通无国界(1)-题目:假装写日记
  • YOLOv8分类的三种C++实现:opencv dnn/libtorch/onnxruntime
  • java 设计模式_行为型_16访问者模式
  • 深入解析ArrayList源码:从短链项目实战到底层原理
  • 2025年EAAI SCI1区TOP,贪婪策略粒子群算法GS-IPSO+无人机桥梁巡检覆盖路径规划,深度解析+性能实测
  • 【项目实训#08】HarmonyOS知识图谱前端可视化实现
  • 计算机网络-自顶向下—第一章概述重点复习笔记
  • XMLDecoder、LDAP 注入与修复
  • WebSocket与XMPP:即时通讯技术的本质区别与选择逻辑优雅草卓伊凡|片翼|许贝贝
  • [每周一更]-(第147期):使用 Go 语言实现 JSON Web Token (JWT)
  • 深度学习——基于卷积神经网络的MNIST手写数字识别详解
  • 大规模异步新闻爬虫的分布式实现
  • 【Jmeter】Jmeter 高并发性能压力测试
  • 杭州租房网站建设/百度移动
  • 重庆网站开发/推荐友情链接
  • 做网站还用注册商标吗/seo优化文章网站
  • 抚顺市城市建设档案馆网站/站长统计app最新版本2023
  • 烟台专业做网站公司哪家好/凡科网免费建站
  • 金华电子商务网站建设/四川网络推广推广机构