RunnableParallel
在 LangChain 的 LCEL(LangChain Expression Language)中,RunnableParallel
是一个非常实用的组件,它的核心作用是 “并行执行多个独立的可运行对象(Runnable)”,并将它们的结果合并成一个字典返回。
简单说,如果你需要同时处理多个任务(比如同时获取两个不同的数据源、并行调用两个模型、或者同时处理输入的不同部分),RunnableParallel
可以让这些任务“同时跑”,而不是“一个接一个跑”,从而提高效率。
一、基本概念:什么是“可运行对象(Runnable)”?
在 LangChain 中,Runnable
是所有可执行组件的基础接口(比如模型、提示词模板、输出解析器、检索器等),它们都支持 invoke()
(同步调用)、ainvoke()
(异步调用)、stream()
(流式输出)等方法。
RunnableParallel
本身也是一个 Runnable
,但它的特殊之处在于:它可以包裹多个子 Runnable
,让它们并行执行。
二、核心用法:并行执行,合并结果
RunnableParallel
的使用非常直观,只需传入一个“键-值”字典:
- 键:自定义的名称(用于标识每个子任务的结果);
- 值:要并行执行的子
Runnable
(比如模型、检索器、提示词模板等)。
调用 RunnableParallel
时,所有子 Runnable
会同时执行,最终返回一个字典,键是你定义的名称,值是对应子 Runnable
的执行结果。
示例 1:最基础的并行执行
比如同时运行两个简单的“字符串处理”任务:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough# 定义两个子任务(这里用 RunnablePassthrough 简单处理输入)
# 任务1:给输入加前缀"Hello, "
task1 = RunnablePassthrough() | (lambda x: f"Hello, {x}")
# 任务2:给输入加后缀"!"
task2 = RunnablePassthrough() | (lambda x: f"{x}!")# 用 RunnableParallel 包裹,并行执行
parallel = RunnableParallel(greeting=task1, # 键为"greeting",对应任务1emphasis=task2 # 键为"emphasis",对应任务2
)# 调用并行任务(输入"World")
result = parallel.invoke("World")
print(result)
# 输出:
# {
# 'greeting': 'Hello, World', # 任务1的结果
# 'emphasis': 'World!' # 任务2的结果
# }
可以看到,两个任务同时处理输入“World”,结果被合并到一个字典中,用我们定义的“greeting”和“emphasis”作为键。
示例 2:在检索问答链中并行获取上下文和问题
这是 RunnableParallel
最常用的场景之一:在构建检索增强生成(RAG)链时,需要同时做两件事:
- 用检索器获取上下文(context);
- 传递用户的原始问题(question)。
用 RunnableParallel
可以让这两件事并行执行,然后将结果传给提示词模板:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_community.vectorstores import FAISS
from langchain_core.embeddings import FakeEmbeddings# 1. 准备一个简单的检索器(示例用 FakeEmbeddings 模拟)
vectorstore = FAISS.from_texts(["Harrison worked at Kensho"], embedding=FakeEmbeddings())
retriever = vectorstore.as_retriever()# 2. 定义并行任务:同时获取 context 和 question
parallel = RunnableParallel(context=retriever, # 任务1:用检索器获取上下文question=RunnablePassthrough() # 任务2:直接传递用户问题(不处理)
)# 3. 定义提示词模板(接收 context 和 question 作为输入)
prompt = ChatPromptTemplate.from_template("Answer the question based on the context: {context}\nQuestion: {question}"
)# 4. 搭建完整链:并行任务 → 提示词 → 模型 → 输出解析
chain = parallel | prompt | ChatOpenAI() | StrOutputParser()# 调用链(输入问题)
result = chain.invoke("Where did Harrison work?")
print(result) # 输出:Harrison worked at Kensho.
这里的关键是:parallel
会同时执行 retriever
(获取上下文)和 RunnablePassthrough()
(传递问题),然后将两者的结果(context
和 question
)合并成字典,传给后续的提示词模板。
三、与串行执行的区别
在 LCEL 中,|
符号表示串行执行(前一个的输出作为后一个的输入),而 RunnableParallel
表示并行执行(所有子任务共享同一个输入,同时执行)。
比如:
A | B
:先执行 A,再用 A 的输出执行 B(串行);RunnableParallel(a=A, b=B)
:A 和 B 同时接收相同的输入,各自执行,结果合并为{"a": A的结果, "b": B的结果}
(并行)。
四、异步与流式支持
RunnableParallel
同样支持异步调用(ainvoke()
)和流式输出(stream()
):
- 异步调用:
await parallel.ainvoke(input)
,子任务会异步并行执行,更节省资源; - 流式输出:
for chunk in parallel.stream(input)
,会实时返回每个子任务的流式结果(以字典形式)。
总结
RunnableParallel
是 LCEL 中用于并行执行多个独立任务的工具,核心价值在于:
- 提高效率:避免串行等待,同时处理多个任务;
- 简化代码:用直观的“键-值”结构组织多个子任务,结果自动合并;
- 灵活集成:可与任何
Runnable
(模型、检索器、函数等)结合,构建复杂链。
最典型的应用场景是 RAG 中的“并行获取上下文和问题”,以及需要同时处理多个数据源的场景。