当前位置: 首页 > news >正文

agent设计模式:第三章节—并行化

1.1 并行化模式概述

       在前面的章节中,我们探讨了用于顺序工作流的 Prompt Chaining 以及用于动态决策和不同路径之间转换的 Routing。虽然这些模式至关重要,但许多复杂的代理任务涉及多个可以同时执行而不是一个接一个执行的子任务。这就是并行化模式变得关键的地方。
       并行化涉及同时执行多个组件,例如 LLM 调用、工具使用,甚至整个子代理(见图 1)。并行执行允许独立任务同时运行,而不是等待一个步骤完成后再开始下一个步骤,这显著减少了可以分解为独立部分的任务的整体执行时间
      考虑一个设计用于研究主题并总结其发现的代理。顺序方法可能如下:

  • 搜索来源 A。
  • 总结来源 A。
  • 搜索来源 B。
  • 总结来源 B。
  • 从总结 A 和 B 中综合出一个最终答案。

      一种并行方法可以:

  • 同时搜索源 A 和源 B。
  • 当两个搜索都完成后,同时总结源 A 和源 B。
  • 从总结 A 和 B 中综合出一个最终答案(这一步通常是顺序的,等待并行步骤完成)。

      核心思想是识别工作流程中不依赖于其他部分输出的部分,并执行并行操作。这在处理具有延迟的外部服务(如 API 或数据库)时特别有效,因为你可以同时发出多个请求。
      实现并行化通常需要支持异步执行或多线程/多进程的框架。现代代理框架在设计时考虑了异步操作,使您可以轻松定义可以并行运行的步骤。
在这里插入图片描述

      LangChain、LangGraph 和 Google ADK 等框架提供了并行执行的机制。在 LangChain 表达式语言(LCEL)中,您可以通过使用 |(用于顺序)等运算符组合可运行对象来实现并行执行,并通过构建您的链或图来使其具有可以并行执行的分支。LangGraph 通过其图结构,允许您定义可以从单个状态转换中执行的多个节点,有效地在工作流中启用并行分支。Google ADK 提供了强大的原生机制来促进和管理代理的并行执行,显著提高了复杂多代理系统的效率和可扩展性。ADK 框架中的这种内在能力使开发人员能够设计和实现解决方案,其中多个代理可以并行运行,而不是顺序运行。

1.2 实际应用与用例

      并行化是一种强大的模式,用于优化代理在不同应用中的性能:

1.2.1. 信息收集和研究

      同时从多个来源收集信息是一个经典用例。
      用例:一个正在研究某公司的代理。

  • 并行任务:同时搜索新闻文章、获取股票数据、检查社交媒体提及以及查询公司数据库。
  • 优势:比顺序查找更快地收集全面信息。

1.2.2. 数据处理和分析

      应用不同的分析技术或同时处理不同的数据段。
      用例:一个代理分析客户反馈。

  • 并行任务:在批量的反馈条目中同时运行情感分析、提取关键词、分类反馈和识别紧急问题。
  • 好处:能够快速进行多方面的分析。

1.2.3. 多 API 或工具交互

      调用多个独立的 API 或工具来收集不同类型的信息或执行不同的操作。
      用例:旅行规划代理。

  • 并行任务:同时检查航班价格、搜索酒店可用性、查询当地活动以及寻找餐厅推荐。
  • 优势:更快地呈现完整的旅行计划。

1.2.4. 多组件内容生成

      并行生成复杂内容的不同部分。
      用例:一个代理创建营销邮件。

  • 并行任务:同时生成主题行、起草邮件正文、寻找相关图片和创建行动号召按钮文本。
  • 优势:更高效地组装最终邮件。

1.2.5. 验证和确认

      同时执行多个独立的检查或验证。
      用例:代理验证用户输入。

  • 并行任务:同时检查邮箱格式、验证电话号码、在数据库中验证地址以及检查是否包含脏话。
  • 优势:提供更快的输入有效性反馈。

1.2.6. 多模态处理

      同时处理同一输入的不同模态(文本、图像、音频)。
      用例:一个代理分析带有文本和图像的社交媒体帖子。

  • 并行任务:同时分析文本的情感和关键词,以及分析图像中的物体和场景描述。
  • 优势:能更快地整合不同模态的见解。

1.2.7. A/B 测试或多种选项生成

      并行生成多个响应或输出变体,以选择最佳方案。
      应用场景:一个代理生成不同的创意文本选项。

  • 并行任务:同时使用略微不同的提示或模型为文章生成三个不同的标题。
  • 优点:能够快速比较并选择最佳选项。

      并行化是智能体设计中的一个基本优化技术,通过利用并发执行独立任务,使开发者能够构建性能更优、响应更快的应用程序。

1.3 动手代码示例(LangChain)

      LangChain 框架内的并行执行由 LangChain 表达式语言(LCEL)实现。主要方法涉及在字典或列表结构中组织多个可运行组件。当将此集合作为输入传递给链中的后续组件时,LCEL 运行时将并行执行其中包含的可运行组件。
      在 LangGraph 的上下文中,这一原则应用于图的拓扑结构。并行工作流通过构建图实现,使得多个无直接顺序依赖关系的节点可以从单个公共节点启动。这些并行路径独立执行,直到其结果在图中的后续汇聚点聚合。
      以下实现展示了使用 LangChain 框架构建的并行处理工作流。该工作流设计为对单个用户查询同时执行两个独立操作。这些并行过程被实例化为不同的链或函数,其各自的输出随后被聚合为统一的结果。
      此实现的先决条件包括安装必要的 Python 包,例如 langchain、langchain-community,以及像 langchain-openai 这样的模型提供库。此外,必须在本机环境中配置所选语言模型的合法 API 密钥以进行身份验证。


import osimport asynciofrom typing import Optionalfrom langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough# --- Configuration ---# Ensure your API key environment variable is set (e.g., OPENAI_API_KEY)try:llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)except Exception as e:print(f"Error initializing language model: {e}")llm = None# --- Define Independent Chains ---# These three chains represent distinct tasks that can be executed in parallel.summarize_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "Summarize the following topic concisely:"),("user", "{topic}")])| llm| StrOutputParser())questions_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "Generate three interesting questions about the following topic:"),("user", "{topic}")])| llm| StrOutputParser())terms_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "Identify 5-10 key terms from the following topic, separated by commas:"),("user", "{topic}")])| llm| StrOutputParser())# --- Build the Parallel + Synthesis Chain ---# 1. Define the block of tasks to run in parallel. The results of these,#    along with the original topic, will be fed into the next step.map_chain = RunnableParallel({"summary": summarize_chain,"questions": questions_chain,"key_terms": terms_chain,"topic": RunnablePassthrough(),  # Pass the original topic through})# 2. Define the final synthesis prompt which will combine the parallel results.synthesis_prompt = ChatPromptTemplate.from_messages([("system", """Based on the following information:Summary: {summary}Related Questions: {questions}Key Terms: {key_terms}Synthesize a comprehensive answer."""),("user", "Original topic: {topic}")])# 3. Construct the full chain by piping the parallel results directly#    into the synthesis prompt, followed by the LLM and output parser.full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()# --- Run the Chain ---async def run_parallel_example(topic: str) -> None:"""Asynchronously invokes the parallel processing chain with a specific topicand prints the synthesized result.Args:topic: The input topic to be processed by the LangChain chains."""if not llm:print("LLM not initialized. Cannot run example.")returnprint(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---")try:# The input to `ainvoke` is the single 'topic' string,# then passed to each runnable in the `map_chain`.response = await full_parallel_chain.ainvoke(topic)print("\n--- Final Response ---")print(response)except Exception as e:print(f"\nAn error occurred during chain execution: {e}")if __name__ == "__main__":test_topic = "The history of space exploration"# In Python 3.7+, asyncio.run is the standard way to run an async function.asyncio.run(run_parallel_example(test_topic))

      提供的 Python 代码实现了一个 LangChain 应用程序,旨在通过并行执行来高效处理给定主题。请注意,asyncio 提供的是并发性而非并行性。它通过使用一个事件循环,在单个线程上智能地在任务之间切换,从而实现这一点——当一个任务处于空闲状态时(例如等待网络请求)。这创造了多个任务同时推进的效果,但代码本身仍然由一个线程执行,受 Python 的全局解释器锁(GIL)的限制。
      代码首先从 langchain_openai 和 langchain_core 中导入必要的模块,包括语言模型、提示、输出解析和可运行结构等组件。代码尝试初始化一个 ChatOpenAI 实例,特别使用"gpt-4o-mini"模型,并指定温度值以控制创造力。为了在语言模型初始化过程中增强鲁棒性,使用了 try-except 块。然后定义了三个独立的 LangChain"链",每个链设计用于对输入主题执行不同的任务。第一个链用于简洁地总结主题,使用系统消息和包含主题占位符的用户消息。第二个链配置为生成与主题相关的三个有趣问题。第三个链设置为从输入主题中识别 5 到 10 个关键词,并要求以逗号分隔。这些独立的链都包含一个针对其特定任务的 ChatPromptTemplate,随后是初始化的语言模型和 StrOutputParser 以将输出格式化为字符串。
      然后构建一个 RunnableParallel 块来捆绑这三个链,使它们能够同时执行。这个并行可运行块还包括一个 RunnablePassthrough,以确保原始输入主题在后续步骤中可用。为最终综合步骤定义了一个单独的 ChatPromptTemplate,它将摘要、问题、关键词和原始主题作为输入来生成全面的答案。通过将 map_chain(并行块)按顺序序列化到综合提示中,然后是语言模型和输出解析器,创建了完整的端到端处理链,命名为 full_parallel_chain。提供了一个异步函数 run_parallel_example 来演示如何调用这个 full_parallel_chain。该函数将主题作为输入,并使用 invoke 来运行异步链。最后,标准的 Python if name == “main”: 块展示了如何使用 asyncio.run 来管理异步执行,并使用示例主题 “太空探索的历史” 来执行 run_parallel_example。
      本质上,这段代码设置了一个工作流程,其中针对特定主题的多个 LLM 调用(用于摘要、提问和术语)同时进行,然后通过一个最终的 LLM 调用将它们的结果组合起来。这展示了使用 LangChain 在代理式工作流程中实现并行化的核心思想。

1.4 实际代码示例(Google ADK)

      好的,现在让我们关注 Google ADK 框架中一个具体示例,来说明这些概念。我们将考察 ADK 原语(如 ParallelAgent 和 SequentialAgent)如何应用于构建一个利用并发执行以提高效率的智能体流程。

from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgentfrom google.adk.tools import google_searchGEMINI_MODEL="gemini-2.0-flash"# --- 1. Define Researcher Sub-Agents (to run in parallel) ---# Researcher 1: Renewable Energyresearcher_agent_1 = LlmAgent(name="RenewableEnergyResearcher",model=GEMINI_MODEL,instruction="""You are an AI Research Assistant specializing in energy.Research the latest advancements in 'renewable energy sources'.Use the Google Search tool provided.Summarize your key findings concisely (1-2 sentences).Output *only* the summary.""",description="Researches renewable energy sources.",tools=[google_search],# Store result in state for the merger agentoutput_key="renewable_energy_result")# Researcher 2: Electric Vehiclesresearcher_agent_2 = LlmAgent(name="EVResearcher",model=GEMINI_MODEL,instruction="""You are an AI Research Assistant specializing in transportation.Research the latest developments in 'electric vehicle technology'.Use the Google Search tool provided.Summarize your key findings concisely (1-2 sentences).Output *only* the summary.""",description="Researches electric vehicle technology.",tools=[google_search],# Store result in state for the merger agentoutput_key="ev_technology_result")# Researcher 3: Carbon Captureresearcher_agent_3 = LlmAgent(name="CarbonCaptureResearcher",model=GEMINI_MODEL,instruction="""You are an AI Research Assistant specializing in climate solutions.Research the current state of 'carbon capture methods'.Use the Google Search tool provided.Summarize your key findings concisely (1-2 sentences).Output *only* the summary.""",description="Researches carbon capture methods.",tools=[google_search],# Store result in state for the merger agentoutput_key="carbon_capture_result")# --- 2. Create the ParallelAgent (Runs researchers concurrently) ---# This agent orchestrates the concurrent execution of the researchers.# It finishes once all researchers have completed and stored their results in state.parallel_research_agent = ParallelAgent(name="ParallelWebResearchAgent",sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],description="Runs multiple research agents in parallel to gather information.")# --- 3. Define the Merger Agent (Runs *after* the parallel agents) ---# This agent takes the results stored in the session state by the parallel agents# and synthesizes them into a single, structured response with attributions.merger_agent = LlmAgent(name="SynthesisAgent",model=GEMINI_MODEL,  # Or potentially a more powerful model if needed for synthesisinstruction="""You are an AI Assistant responsible for combining research findings into a structured report.Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly.**Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.****Input Summaries:***   **Renewable Energy:**{renewable_energy_result}*   **Electric Vehicles:**{ev_technology_result}*   **Carbon Capture:**{carbon_capture_result}**Output Format:**## Summary of Recent Sustainable Technology Advancements### Renewable Energy Findings(Based on RenewableEnergyResearcher's findings)[Synthesize and elaborate *only* on the renewable energy input summary provided above.]### Electric Vehicle Findings(Based on EVResearcher's findings)[Synthesize and elaborate *only* on the EV input summary provided above.]### Carbon Capture Findings(Based on CarbonCaptureResearcher's findings)[Synthesize and elaborate *only* on the carbon capture input summary provided above.]### Overall Conclusion[Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.]Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content.""",description="Combines research findings from parallel agents into a structured, cited report, strictly grounded on provided inputs.",# No tools needed for merging# No output_key needed here, as its direct response is the final output of the sequence)# --- 4. Create the SequentialAgent (Orchestrates the overall flow) ---# This is the main agent that will be run. It first executes the ParallelAgent# to populate the state, and then executes the MergerAgent to produce the final output.sequential_pipeline_agent = SequentialAgent(name="ResearchAndSynthesisPipeline",# Run parallel research first, then mergesub_agents=[parallel_research_agent, merger_agent],description="Coordinates parallel research and synthesizes the results.")root_agent = sequential_pipeline_agent

      这段代码定义了一个多智能体系统,用于研究和综合可持续技术进步方面的信息。它设置了三个 LlmAgent 实例作为专门的研究人员。ResearcherAgent_1 专注于可再生能源,ResearcherAgent_2 研究电动汽车技术,ResearcherAgent_3 调查碳捕获方法。每个研究人员智能体配置为使用 GEMINI_MODEL 和 google_search 工具。他们被指示简洁地总结他们的发现(1-2 句话),并将这些总结存储在会话状态中,使用 output_key。
      然后创建了一个名为 ParallelWebResearchAgent 的 ParallelAgent 来并行运行这三个研究人员智能体。这允许研究并行进行,可能节省时间。ParallelAgent 在其所有子智能体(研究人员)完成并填充状态后完成其执行。
      接下来,定义了一个 MergerAgent(也是一个 LlmAgent),用于综合研究成果。该代理以并行研究人员在会话状态中存储的摘要作为输入。其指令强调输出必须严格仅基于提供的输入摘要,禁止添加外部知识。MergerAgent 被设计为将综合发现结构化为报告,每个主题都有标题,并附有简短的整体结论。
      最后,创建了一个名为 ResearchAndSynthesisPipeline 的 SequentialAgent 来协调整个工作流程。作为主要控制器,该主代理首先执行 ParallelAgent 进行研究。一旦 ParallelAgent 完成,SequentialAgent 接着执行 MergerAgent 来综合收集到的信息。sequential_pipeline_agent 被设置为 root_agent,代表运行这个多代理系统的入口点。整个过程被设计为高效地从多个来源并行收集信息,然后将其整合为一份结构化的报告。

1.5 一目了然

       是什么:许多代理式工作流涉及多个子任务,必须完成这些子任务才能实现最终目标。纯顺序执行,即每个任务都等待前一个任务完成,通常效率低下且速度缓慢。当任务依赖外部 I/O 操作时,这种延迟会成为一个显著瓶颈,例如调用不同的 API 或查询多个数据库。如果没有并发执行的机制,总处理时间就是所有单个任务持续时间的总和,这会阻碍系统的整体性能和响应能力。
       为什么:并行化模式通过支持独立任务的并发执行,提供了一种标准化的解决方案。它通过识别工作流中的组件(如工具使用或 LLM 调用),这些组件不依赖于彼此的即时输出。像 LangChain 和 Google ADK 这样的代理框架提供了内置结构来定义和管理这些并发操作。例如,主进程可以调用多个并行运行的子任务,并等待所有任务完成后才继续下一步。通过同时运行这些独立任务而不是依次执行,这种模式显著减少了总执行时间。
       经验法则:当工作流包含多个可以同时运行的独立操作时,使用此模式,例如从多个 API 获取数据、处理不同的数据块,或为后续合成生成多个内容片段。

       视觉总结
在这里插入图片描述

      关键要点

  • 并行化是一种执行独立任务以同时提高效率的模式。
  • 当任务涉及等待外部资源(如 API 调用)时,它特别有用。
  • 采用并发或并行架构会引入显著复杂性和成本,影响设计、调试和系统日志记录等关键开发阶段。
  • LangChain 和 Google ADK 等框架为定义和管理并行执行提供了内置支持。
  • 在 LangChain 表达式语言(LCEL)中,RunnableParallel 是运行多个可运行项并行处理的关键结构。
  • Google ADK 可以通过 LLM-Driven Delegation 促进并行执行,其中协调代理的 LLM 识别独立子任务,并触发由专业子代理的并发处理。
  • 并行化有助于降低整体延迟,并使智能体系统在处理复杂任务时更加响应迅速。

1.6 结论

      并行化模式是一种通过并发执行独立子任务来优化计算工作流的方法。这种方法可以减少整体延迟,特别是在涉及多个模型推理或调用外部服务的复杂操作中。
      框架提供了不同的机制来实现这种模式。在 LangChain 中,像 RunnableParallel 这样的结构被用来显式定义和同时执行多个处理链。相比之下,像 Google Agent Developer Kit(ADK)这样的框架可以通过多代理委托实现并行化,其中主要协调模型将不同的子任务分配给可以并发运行的专用代理。
      通过将并行处理与顺序(链式)和条件(路由)控制流相结合,可以构建出复杂、高性能的计算系统,这些系统能够高效地管理多样化和复杂的任务。

http://www.dtcms.com/a/511121.html

相关文章:

  • Rust语言特性深度解析:所有权、生命周期与模式匹配之我见
  • 利用DuckDB rusty_sheet插件0.2版在xlsx文件中测试tpch
  • 设计模式之:单例模式
  • 第一章 不可变的变量
  • AUTOSAR 中 Trusted Platform(可信平台)详解
  • 2510rs,rust清单2
  • PINN物理信息神经网络股票价格预测模型Matlab实现
  • 2510rs,rust清单3
  • 用ps做网站方法茂名建站模板搭建
  • 怎么建设vip电影网站wordpress轮播图设置
  • docker 更新layer
  • 基于卷积神经网络的香蕉成熟度识别系统,resnet50,vgg16,resnet34【pytorch框架,python代码】
  • 深度学习YOLO实战:6、通过视频案例,解析YOLO模型的能力边界与选型策略
  • C# 识别图片中是否有人
  • [Power BI] 漏斗图(Funnel Chart)
  • 做网站优化响应式网站 企业模版
  • 视觉学习篇——图像存储格式
  • GB28181视频服务wvp搭建(二)
  • Spring Boot安全配置全解析
  • EasyGBS如何通过流媒体技术提升安防监控效率?
  • 做展览的网站国家免费职业培训平台
  • 农业技术网站建设原则曲阜网站建设
  • 【python】基于 生活方式与健康数据预测数据集(Lifestyle and Health Risk Prediction)的可视化练习,附数据集源文件。
  • C#WPF如何实现登录页面跳转
  • 健康与生活方式数据库编程手册(Python方向教学2025年4月)
  • HarmonyOS测试与上架:单元测试、UI测试与App Gallery Connect发布实战
  • 以太网学习理解
  • 微算法科技(NASDAQ MLGO)标准化API驱动多联邦学习系统模型迁移技术
  • 【Redis】三种缓存问题(穿透、击穿、双删)的 Golang 实践
  • 第1部分-并发编程基础与线程模型