当前位置: 首页 > news >正文

OpenAI Agents 并行化实现

OpenAI Agents 并行化实现

概述

OpenAI Agents 框架提供了强大的并行化能力,允许开发者同时运行多个Agent实例来提高处理效率。本文档基于 parallelization.py 示例,详细解释了如何在OpenAI Agents中实现并行化处理。

示例代码

import asynciofrom agents import Agent, ItemHelpers, Runner, traceimport os
from openai import AsyncOpenAI
from agents import Agent, OpenAIChatCompletionsModel, Runner
from dotenv import load_dotenv
load_dotenv()
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
QWEN_BASE_URL = os.getenv("QWEN_BASE_URL")
QWEN_MODEL_NAME = os.getenv("QWEN_MODEL_NAME")
import base64
# Replace with your Langfuse keys.
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-276a7628-a121-43b1-a533-bc7c46bdb412"
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-2039c6c4-2af8-41cc-9508-7d01df1be3d3" 
os.environ["LANGFUSE_HOST"] = "http://localhost:3000"# Build Basic Auth header.
LANGFUSE_AUTH = base64.b64encode(f"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}".encode()
).decode()# Configure OpenTelemetry endpoint & headers
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"
client = AsyncOpenAI(base_url=QWEN_BASE_URL, api_key=QWEN_API_KEY)
import logfirelogfire.configure(service_name='my_agent_service',send_to_logfire=False,
)logfire.instrument_openai_agents()spanish_agent = Agent(name="spanish_agent",instructions="You translate the user's message to Spanish",model=OpenAIChatCompletionsModel(model=QWEN_MODEL_NAME, openai_client=client),
)translation_picker = Agent(name="translation_picker",instructions="You pick the best Spanish translation from the given options.",model=OpenAIChatCompletionsModel(model="qwen-plus", openai_client=client),
)async def main():msg = input("Hi! Enter a message, and we'll translate it to Spanish.\n\n")# Ensure the entire workflow is a single tracewith trace("Parallel translation"):res_1, res_2, res_3 = await asyncio.gather(Runner.run(spanish_agent,msg,),Runner.run(spanish_agent,msg,),Runner.run(spanish_agent,msg,),)outputs = [ItemHelpers.text_message_outputs(res_1.new_items),ItemHelpers.text_message_outputs(res_2.new_items),ItemHelpers.text_message_outputs(res_3.new_items),]translations = "\n\n".join(outputs)print(f"\n\nTranslations:\n\n{translations}")best_translation = await Runner.run(translation_picker,f"Input: {msg}\n\nTranslations:\n{translations}",)print("\n\n-----")print(f"Best translation: {best_translation.final_output}")if __name__ == "__main__":asyncio.run(main())

运行结果

(openai-agents) PS D:\agent-llm2\openai-agents-python> uv run .\examples\agent_patterns\parallelization.py
Hi! Enter a message, and we'll translate it to Spanish.I love you!
23:16:48.280 OpenAI Agents trace: Parallel translation
23:16:48.282   Agent run: 'spanish_agent'
23:16:48.282   Agent run: 'spanish_agent'
23:16:48.283   Agent run: 'spanish_agent'
23:16:48.284     Chat completion with 'qwen-turbo' [LLM]
23:16:48.383     Chat completion with 'qwen-turbo' [LLM]
23:16:48.383     Chat completion with 'qwen-turbo' [LLM]Translations:Te quiero.Te quiero.Te quiero.
23:16:48.987   Agent run: 'translation_picker'
23:16:48.987     Chat completion with 'qwen-plus' [LLM]-----
Best translation: Te quiero.
OPENAI_API_KEY is not set, skipping trace export

langfuse在openai-agents框架中的设置可查看此链接:https://blog.csdn.net/qq_41472205/article/details/149128981
langfuse监控情况:
在这里插入图片描述

核心概念

1. 异步编程基础

OpenAI Agents 基于Python的异步编程模型构建,使用 asyncio 库来实现并发执行:

import asyncio
from agents import Agent, ItemHelpers, Runner, trace

2. Agent定义

在并行化场景中,我们通常会定义多个专门的Agent来处理不同的任务:

# 翻译Agent - 负责将文本翻译成西班牙语
spanish_agent = Agent(name="spanish_agent",instructions="You translate the user's message to Spanish",model=OpenAIChatCompletionsModel(model=QWEN_MODEL_NAME, openai_client=client),
)# 选择Agent - 负责从多个翻译结果中选择最佳的
translation_picker = Agent(name="translation_picker",instructions="You pick the best Spanish translation from the given options.",model=OpenAIChatCompletionsModel(model="qwen-plus", openai_client=client),
)

并行化实现机制

1. 使用 asyncio.gather() 实现并行执行

核心的并行化实现通过 asyncio.gather() 函数完成:

# 并行运行三个相同的翻译任务
res_1, res_2, res_3 = await asyncio.gather(Runner.run(spanish_agent, msg),Runner.run(spanish_agent, msg),Runner.run(spanish_agent, msg),
)

工作原理:

  • asyncio.gather() 同时启动多个异步任务
  • 每个 Runner.run() 调用都是一个独立的Agent执行实例
  • 所有任务并行执行,等待全部完成后返回结果

2. 分布式追踪支持

使用 trace() 上下文管理器确保整个并行工作流被统一追踪:

with trace("Parallel translation"):# 并行执行的代码块res_1, res_2, res_3 = await asyncio.gather(...)# 后续处理best_translation = await Runner.run(translation_picker, ...)

追踪特性:

  • 所有并行任务都在同一个追踪上下文中
  • 可以监控每个Agent的执行时间和状态
  • 支持与外部监控系统(如Langfuse)集成

3. 结果聚合与处理

并行执行完成后,需要对结果进行聚合处理:

# 提取每个Agent的输出文本
outputs = [ItemHelpers.text_message_outputs(res_1.new_items),ItemHelpers.text_message_outputs(res_2.new_items),ItemHelpers.text_message_outputs(res_3.new_items),
]# 合并所有翻译结果
translations = "\n\n".join(outputs)# 使用另一个Agent选择最佳翻译
best_translation = await Runner.run(translation_picker,f"Input: {msg}\n\nTranslations:\n{translations}",
)

配置与监控

1. 环境配置

示例中展示了完整的监控配置:

# Langfuse配置用于追踪
os.environ["LANGFUSE_PUBLIC_KEY"] = "your-public-key"
os.environ["LANGFUSE_SECRET_KEY"] = "your-secret-key"
os.environ["LANGFUSE_HOST"] = "http://localhost:3000"# OpenTelemetry配置
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"

2. Logfire集成

import logfirelogfire.configure(service_name='my_agent_service',send_to_logfire=False,
)logfire.instrument_openai_agents()

执行流程分析

基于示例的执行日志,我们可以看到并行化的实际效果:

23:16:48.280 OpenAI Agents trace: Parallel translation
23:16:48.282   Agent run: 'spanish_agent'  # 第一个Agent开始
23:16:48.282   Agent run: 'spanish_agent'  # 第二个Agent开始
23:16:48.283   Agent run: 'spanish_agent'  # 第三个Agent开始
23:16:48.284     Chat completion with 'qwen-turbo' [LLM]  # 并行LLM调用
23:16:48.383     Chat completion with 'qwen-turbo' [LLM]
23:16:48.383     Chat completion with 'qwen-turbo' [LLM]

时间分析:

  • 三个Agent几乎同时启动(时间差仅1-2毫秒)
  • LLM调用并行执行,总耗时约100毫秒
  • 相比串行执行,并行化显著提升了处理速度

性能优势

1. 时间效率

  • 串行执行:总时间 = 单个任务时间 × 任务数量
  • 并行执行:总时间 ≈ 最长单个任务时间

2. 资源利用

  • 充分利用I/O等待时间
  • 提高CPU和网络资源利用率
  • 适合处理大量独立的AI推理任务

3. 可扩展性

  • 易于水平扩展处理能力
  • 支持动态调整并发数量
  • 与云服务和容器化部署兼容

适用场景

  1. 批量文本处理:同时处理多个文档的翻译、摘要、分析
  2. 多模型对比:使用不同模型处理相同输入,比较结果质量
  3. 管道式处理:将复杂任务分解为多个并行的子任务
  4. A/B测试:并行运行不同版本的Agent进行效果对比

总结

OpenAI Agents的并行化机制通过Python的异步编程模型,提供了高效、可扩展的AI任务处理能力。结合完善的追踪和监控功能,开发者可以构建高性能的AI应用系统。

关键要点:

  • 使用 asyncio.gather() 实现真正的并行执行
  • 通过 trace() 确保完整的执行追踪
  • 合理设计任务分解和结果聚合逻辑
  • 注意错误处理和资源管理

这种并行化模式特别适合需要处理大量相似任务或需要多个AI模型协作的场景。

http://www.dtcms.com/a/461862.html

相关文章:

  • CNN卷计计算
  • 腾讯云服务器做网站可以吗徐州网站建设
  • 上市公司协会网站建设汇报wordpress接入qq互联
  • 前端 = [...this.orderList] (深拷贝)和this.orderList (引用赋值)
  • 部门管理|“删除部门”功能实现(Django5零基础Web平台)
  • 从 0 到 1 搭建 Python 语言 Web UI自动化测试学习系列 12--日志模块设计
  • 服务器网站源码在哪七牛云配置wordpress
  • SQL-多对多关系
  • PostgreSQL 18 异步 I/O(AIO)调优指南
  • 购物网站名字大全云虚拟主机 多个网站
  • 使用DuckDB SQL求三阶六角幻方
  • 电子商务网站建设一般流程无忧代理 在线
  • 一文了解Function Calling、MCP、Agent联系与区别
  • 存储芯片核心产业链主营产品:兆易创新、北京君正、澜起科技、江波龙、长电科技、佰维存储,6家龙头公司主营产品深度数据
  • Git 常用命令完整指南
  • 网站维护入口房子装修设计软件
  • MySQL 延时从库的作用与意义
  • h5网站价格wordpress footer.php添加qq悬浮
  • 【脚本升级】银河麒麟V10一键安装MySQL9.3.0
  • android pdf框架-15,mupdf工具与其它
  • 前端通用文件下载方案:从 Blob 流处理到实际业务落地
  • 箭头函数的this指向问题
  • 【Vue】——生命周期、ref属性、hooks
  • 网站服务器如何维护小米商城wordpress主题
  • 寻梦数据空间 | 架构篇:从概念到落地的技术实践与突破性创新
  • PySide6 文本编辑器(QPlainTextEdit)实现查找对话功能(匹配完整单词,区分大小写)——重构版本
  • golang面经——GMP相关
  • 谷歌英文网站简单的网站php开发教程
  • 免费一键自助建站官网域名及对应网站
  • AI编程Cursor最强竞争对手来了,CodeX三种操作系统喂饭级安装教程!