[智能体设计模式]第3章 并行化
目录
并行化模式:智能体任务优化核心方案
实践应用与场景
1. 信息收集与调研
2. 数据处理与分析
3. 多API或工具交互
4. 多组件内容生成
5. 验证与校验
6. 多模态处理
7. A/B测试或多方案生成
实战代码示例(LangChain)
代码解析
并行化模式:智能体任务优化核心方案
在前几章中,我们已经介绍了用于顺序流程的提示链(PromptChaining),以及用于动态决策和路径切换的路由(Routing)。虽然这些模式非常重要,但许多复杂的智能体任务其实包含多个可以同时执行的子任务,而不是一个接一个地串行处理。这时,并行化设计模式就变得至关重要。
并行化指的是同时执行多个组件,比如LLM调用、工具使用,甚至整个子智能体(见图1)。与等待上一步完成再开始下一步不同,并行执行允许独立任务同时运行,大幅缩短可拆分为独立部分的任务的整体执行时间。
核心思想是识别流程中彼此无依赖的部分,并将它们并行执行。尤其在涉及外部服务(如API或数据库)有延迟时,可以同时发起多个请求,显著提升效率。
实现并行化通常需要支持异步执行或多线程/多进程的框架。现代智能体框架普遍支持异步操作,允许你轻松定义可并行运行的步骤。
实践应用与场景
并行化是优化智能体性能的强大模式,适用于多种场景:
1. 信息收集与调研
同时从多个来源收集信息是典型用例。
- 应用场景:智能体调研某公司
- 并行任务:同时搜索新闻、拉取股票数据、检查社交媒体、查询公司数据库
- 优势:比串行查找更快获得全面视角
2. 数据处理与分析
并行应用不同分析方法或处理不同数据片段。
- 应用场景:智能体分析客户反馈
- 并行任务:同时进行情感分析、关键词提取、分类、紧急问题识别
- 优势:快速获得多维度分析结果
3. 多API或工具交互
并行调用多个独立API或工具,获取不同信息或执行不同操作。
- 应用场景:旅行规划智能体
- 并行任务:同时查机票、酒店、当地活动、餐厅推荐
- 优势:更快生成完整旅行方案
4. 多组件内容生成
并行生成复杂内容的不同部分。
- 应用场景:智能体创建营销邮件
- 并行任务:同时生成主题、正文、图片、CTA按钮文案
- 优势:更高效地组装最终邮件
5. 验证与校验
并行执行多个独立校验任务。
- 应用场景:智能体验证用户输入
- 并行任务:同时检查邮箱格式、手机号、地址数据库校验、敏感词检测
- 优势:更快反馈输入有效性
6. 多模态处理
并行处理同一输入的不同模态(文本、图片、音频)。
- 应用场景:智能体分析带图片的社交媒体帖子
- 并行任务:同时分析文本情感与关键词、图片中的物体与场景
- 优势:更快整合多模态洞察
7. A/B测试或多方案生成
并行生成多个响应或输出,便于选择最佳方案。
- 应用场景:智能体生成多种创意文案
- 并行任务:同时用不同prompt或模型生成三种标题
- 优势:快速对比并选出最佳选项
实战代码示例(LangChain)
在LangChain框架中,并行执行由LangChain Expression Language(LCEL)实现。主要方法是将多个runnable组件结构化为字典或列表,当这些集合被传递给链中的下一个组件时,LCEL运行时会并发执行其中的runnable。
以下代码演示了用LangChain构建的并行处理工作流。该流程针对单一用户查询,同时并发执行两个独立操作,并在最后聚合结果。
import os
import asyncio
from typing import Optionalfrom langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough# --- 配置 ---
# 确保环境变量已设置 API key(如 OPENAI_API_KEY)
try:llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)except Exception as e:print(f"初始化语言模型出错:{e}")llm = None# --- 定义独立链 ---
# 三个链分别执行不同任务,可并行运行summarize_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "请简明扼要地总结以下主题:"),("user", "{topic}")])| llm| StrOutputParser()
)questions_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "请针对以下主题生成三个有趣的问题:"),("user", "{topic}")])| llm| StrOutputParser()
)terms_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "请从以下主题中提取 5-10 个关键词,用逗号分隔:"),("user", "{topic}")])| llm| StrOutputParser()
)# --- 构建并行 + 汇总链 ---# 1. 定义并行任务块,结果与原始 topic 一起传递到下一步
map_chain = RunnableParallel({"summary": summarize_chain,"questions": questions_chain,"key_terms": terms_chain,"topic": RunnablePassthrough(), # 传递原始 topic}
)# 2. 定义最终汇总 prompt,整合并行结果
synthesis_prompt = ChatPromptTemplate.from_messages([("system", """根据以下信息:
摘要:{summary}
相关问题:{questions}
关键词:{key_terms}
请综合生成完整答案。"""),("user", "原始主题:{topic}")
])# 3. 构建完整链,将并行结果直接传递给汇总 prompt,再由 LLM 和输出解析器处理
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()# --- 运行链 ---
async def run_parallel_example(topic: str) -> None:"""异步调用并行处理链,输出综合结果。Args:topic: 传递给 LangChain 的主题输入"""if not llm:print("LLM 未初始化,无法运行示例。")returnprint(f"\n--- 并行 LangChain 示例,主题:'{topic}' ---")try:# `ainvoke` 的输入是单个 topic 字符串,# 会传递给 map_chain 中的每个 runnableresponse = await full_parallel_chain.ainvoke(topic)print("\n--- 最终响应 ---")print(response)except Exception as e:print(f"\n 链执行出错:{e}")if __name__ == "__main__":test_topic = "太空探索的历史"# Python 3.7+ 推荐用 asyncio.run 执行异步函数asyncio.run(run_parallel_example(test_topic))
代码解析
上述Python代码实现了一个LangChain应用,通过并行执行提升主题处理效率。注意asyncio提供的是并发而非真正的并行:它通过事件循环在任务空闲(如等待网络请求)时智能切换,实现多个任务“同时”推进,但实际仍在单线程下受GIL限制。
代码首先导入langchain_openai和langchain_core的核心模块,包括模型、prompt、输出解析和runnable结构。通过try-except块初始化ChatOpenAI实例,指定模型和温度。
随后定义三个独立的LangChain“链”,分别用于主题摘要、问题生成和关键词提取,每个链由定制的ChatPromptTemplate、LLM和输出解析器组成。
接着用RunnableParallel将三条链打包,实现并行执行,并用RunnablePassthrough保留原始输入。再定义一个汇总prompt,整合summary、questions、key_terms和topic,生成综合答案。
最终构建完整处理链full_parallel_chain,并提供异步函数run_parallel_example演示如何调用。主程序用asyncio.run执行示例主题“太空探索的历史”。
本质上,该代码实现了针对单一主题的多路LLM并发调用(摘要、问题、关键词),并在最后用LLM汇总结果,充分展示了智能体工作流中的并行化核心思想。
