OpenAI Agents 并行化实现
OpenAI Agents 并行化实现
概述
OpenAI Agents 框架提供了强大的并行化能力,允许开发者同时运行多个Agent实例来提高处理效率。本文档基于 parallelization.py
示例,详细解释了如何在OpenAI Agents中实现并行化处理。
示例代码
import asynciofrom agents import Agent, ItemHelpers, Runner, traceimport os
from openai import AsyncOpenAI
from agents import Agent, OpenAIChatCompletionsModel, Runner
from dotenv import load_dotenv
load_dotenv()
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
QWEN_BASE_URL = os.getenv("QWEN_BASE_URL")
QWEN_MODEL_NAME = os.getenv("QWEN_MODEL_NAME")
import base64
# Replace with your Langfuse keys.
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-276a7628-a121-43b1-a533-bc7c46bdb412"
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-2039c6c4-2af8-41cc-9508-7d01df1be3d3"
os.environ["LANGFUSE_HOST"] = "http://localhost:3000"# Build Basic Auth header.
LANGFUSE_AUTH = base64.b64encode(f"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}".encode()
).decode()# Configure OpenTelemetry endpoint & headers
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"
client = AsyncOpenAI(base_url=QWEN_BASE_URL, api_key=QWEN_API_KEY)
import logfirelogfire.configure(service_name='my_agent_service',send_to_logfire=False,
)logfire.instrument_openai_agents()spanish_agent = Agent(name="spanish_agent",instructions="You translate the user's message to Spanish",model=OpenAIChatCompletionsModel(model=QWEN_MODEL_NAME, openai_client=client),
)translation_picker = Agent(name="translation_picker",instructions="You pick the best Spanish translation from the given options.",model=OpenAIChatCompletionsModel(model="qwen-plus", openai_client=client),
)async def main():msg = input("Hi! Enter a message, and we'll translate it to Spanish.\n\n")# Ensure the entire workflow is a single tracewith trace("Parallel translation"):res_1, res_2, res_3 = await asyncio.gather(Runner.run(spanish_agent,msg,),Runner.run(spanish_agent,msg,),Runner.run(spanish_agent,msg,),)outputs = [ItemHelpers.text_message_outputs(res_1.new_items),ItemHelpers.text_message_outputs(res_2.new_items),ItemHelpers.text_message_outputs(res_3.new_items),]translations = "\n\n".join(outputs)print(f"\n\nTranslations:\n\n{translations}")best_translation = await Runner.run(translation_picker,f"Input: {msg}\n\nTranslations:\n{translations}",)print("\n\n-----")print(f"Best translation: {best_translation.final_output}")if __name__ == "__main__":asyncio.run(main())
运行结果
(openai-agents) PS D:\agent-llm2\openai-agents-python> uv run .\examples\agent_patterns\parallelization.py
Hi! Enter a message, and we'll translate it to Spanish.I love you!
23:16:48.280 OpenAI Agents trace: Parallel translation
23:16:48.282 Agent run: 'spanish_agent'
23:16:48.282 Agent run: 'spanish_agent'
23:16:48.283 Agent run: 'spanish_agent'
23:16:48.284 Chat completion with 'qwen-turbo' [LLM]
23:16:48.383 Chat completion with 'qwen-turbo' [LLM]
23:16:48.383 Chat completion with 'qwen-turbo' [LLM]Translations:Te quiero.Te quiero.Te quiero.
23:16:48.987 Agent run: 'translation_picker'
23:16:48.987 Chat completion with 'qwen-plus' [LLM]-----
Best translation: Te quiero.
OPENAI_API_KEY is not set, skipping trace export
langfuse在openai-agents框架中的设置可查看此链接:https://blog.csdn.net/qq_41472205/article/details/149128981
langfuse监控情况:
核心概念
1. 异步编程基础
OpenAI Agents 基于Python的异步编程模型构建,使用 asyncio
库来实现并发执行:
import asyncio
from agents import Agent, ItemHelpers, Runner, trace
2. Agent定义
在并行化场景中,我们通常会定义多个专门的Agent来处理不同的任务:
# 翻译Agent - 负责将文本翻译成西班牙语
spanish_agent = Agent(name="spanish_agent",instructions="You translate the user's message to Spanish",model=OpenAIChatCompletionsModel(model=QWEN_MODEL_NAME, openai_client=client),
)# 选择Agent - 负责从多个翻译结果中选择最佳的
translation_picker = Agent(name="translation_picker",instructions="You pick the best Spanish translation from the given options.",model=OpenAIChatCompletionsModel(model="qwen-plus", openai_client=client),
)
并行化实现机制
1. 使用 asyncio.gather()
实现并行执行
核心的并行化实现通过 asyncio.gather()
函数完成:
# 并行运行三个相同的翻译任务
res_1, res_2, res_3 = await asyncio.gather(Runner.run(spanish_agent, msg),Runner.run(spanish_agent, msg),Runner.run(spanish_agent, msg),
)
工作原理:
asyncio.gather()
同时启动多个异步任务- 每个
Runner.run()
调用都是一个独立的Agent执行实例 - 所有任务并行执行,等待全部完成后返回结果
2. 分布式追踪支持
使用 trace()
上下文管理器确保整个并行工作流被统一追踪:
with trace("Parallel translation"):# 并行执行的代码块res_1, res_2, res_3 = await asyncio.gather(...)# 后续处理best_translation = await Runner.run(translation_picker, ...)
追踪特性:
- 所有并行任务都在同一个追踪上下文中
- 可以监控每个Agent的执行时间和状态
- 支持与外部监控系统(如Langfuse)集成
3. 结果聚合与处理
并行执行完成后,需要对结果进行聚合处理:
# 提取每个Agent的输出文本
outputs = [ItemHelpers.text_message_outputs(res_1.new_items),ItemHelpers.text_message_outputs(res_2.new_items),ItemHelpers.text_message_outputs(res_3.new_items),
]# 合并所有翻译结果
translations = "\n\n".join(outputs)# 使用另一个Agent选择最佳翻译
best_translation = await Runner.run(translation_picker,f"Input: {msg}\n\nTranslations:\n{translations}",
)
配置与监控
1. 环境配置
示例中展示了完整的监控配置:
# Langfuse配置用于追踪
os.environ["LANGFUSE_PUBLIC_KEY"] = "your-public-key"
os.environ["LANGFUSE_SECRET_KEY"] = "your-secret-key"
os.environ["LANGFUSE_HOST"] = "http://localhost:3000"# OpenTelemetry配置
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"
2. Logfire集成
import logfirelogfire.configure(service_name='my_agent_service',send_to_logfire=False,
)logfire.instrument_openai_agents()
执行流程分析
基于示例的执行日志,我们可以看到并行化的实际效果:
23:16:48.280 OpenAI Agents trace: Parallel translation
23:16:48.282 Agent run: 'spanish_agent' # 第一个Agent开始
23:16:48.282 Agent run: 'spanish_agent' # 第二个Agent开始
23:16:48.283 Agent run: 'spanish_agent' # 第三个Agent开始
23:16:48.284 Chat completion with 'qwen-turbo' [LLM] # 并行LLM调用
23:16:48.383 Chat completion with 'qwen-turbo' [LLM]
23:16:48.383 Chat completion with 'qwen-turbo' [LLM]
时间分析:
- 三个Agent几乎同时启动(时间差仅1-2毫秒)
- LLM调用并行执行,总耗时约100毫秒
- 相比串行执行,并行化显著提升了处理速度
性能优势
1. 时间效率
- 串行执行:总时间 = 单个任务时间 × 任务数量
- 并行执行:总时间 ≈ 最长单个任务时间
2. 资源利用
- 充分利用I/O等待时间
- 提高CPU和网络资源利用率
- 适合处理大量独立的AI推理任务
3. 可扩展性
- 易于水平扩展处理能力
- 支持动态调整并发数量
- 与云服务和容器化部署兼容
适用场景
- 批量文本处理:同时处理多个文档的翻译、摘要、分析
- 多模型对比:使用不同模型处理相同输入,比较结果质量
- 管道式处理:将复杂任务分解为多个并行的子任务
- A/B测试:并行运行不同版本的Agent进行效果对比
总结
OpenAI Agents的并行化机制通过Python的异步编程模型,提供了高效、可扩展的AI任务处理能力。结合完善的追踪和监控功能,开发者可以构建高性能的AI应用系统。
关键要点:
- 使用
asyncio.gather()
实现真正的并行执行 - 通过
trace()
确保完整的执行追踪 - 合理设计任务分解和结果聚合逻辑
- 注意错误处理和资源管理
这种并行化模式特别适合需要处理大量相似任务或需要多个AI模型协作的场景。