当前位置: 首页 > news >正文

【A2A】管中窥豹,google源码python-demo介绍

在这里插入图片描述

前言

A2A(Agent2Agent)是 Google 推出的一项新协议,旨在解决多智能体(Multi-Agent)系统中跨平台、跨组织协作的难题。它为 AI 代理之间的通信、协作和任务分工提供了一个统一的标准,可以类比为网页世界的 HTTP 协议——即 AI 代理之间的“通用语言”

项目地址:https://github.com/google/A2A/tree/main/samples/python

一、项目架构介绍

1. 项目概述

基于 A2A (Agent-to-Agent) 协议的示例项目,展示了如何使用不同的 AI 框架来实现智能代理之间的通信和协作。

2. 项目结构

samples/python/
├── agents/          # 各种AI代理实现
│   ├── ag2/         # AG2框架实现的代理
│   ├── crewai/      # CrewAI框架实现的代理
│   ├── langgraph/   # LangGraph框架实现的代理
│   ├── google_adk/  # Google ADK框架实现的代理
│   ├── llama_index_file_chat/  # LlamaIndex实现的文件聊天代理
│   ├── marvin/      # Marvin框架实现的代理
│   ├── mindsdb/     # MindsDB实现的代理
│   └── semantickernel/  # Semantic Kernel实现的代理
├── hosts/           # 客户端实现
├── common/          # 公共代码
└── .vscode/         # VS Code配置

3. 技术特点

  1. 多框架支持

    • 支持多种流行的AI框架(LangGraph、CrewAI、AG2等)
    • 每个框架都有独立的代理实现
  2. 标准化通信

    • 使用A2A协议进行代理间通信
    • 基于HTTP的通信机制
    • 统一的请求/响应格式
  3. 客户端实现

    • 提供CLI命令行客户端
    • 支持与多个代理的交互
    • 包含任务编排功能

4. 运行环境要求

  • Python 3.13 或更高版本
  • UV 包管理器
  • 各框架所需的API密钥(如Google API Key等)

5. 使用流程

  1. 启动代理服务器:

    cd samples/python/agents/[agent_name]
    uv run .
    
  2. 启动客户端:

    cd samples/python/hosts/cli
    uv run .
    

6. 主要功能模块

6.1 代理实现(agents/)
  • AG2代理:基于AG2框架的代理实现
  • CrewAI代理:使用CrewAI框架的代理
  • LangGraph代理:基于LangGraph的代理
  • Google ADK代理:使用Google Agent Development Kit的代理
  • LlamaIndex代理:文件聊天功能
  • Marvin代理:基于Marvin框架的代理
  • MindsDB代理:数据库交互代理
  • Semantic Kernel代理:基于Microsoft Semantic Kernel的代理
6.2 客户端实现(hosts/)
  • CLI命令行界面
  • 支持多代理交互
  • 任务编排功能
6.3 公共模块(common/)
  • A2A协议实现
  • 共享工具类
  • 通用接口定义

7. 项目特点

  1. 模块化设计

    • 每个代理都是独立的模块
    • 可以单独运行和测试
  2. 标准化接口

    • 统一的A2A协议
    • 一致的通信格式
  3. 可扩展性

    • 易于添加新的代理实现
    • 支持不同的AI框架
  4. 示例性质

    • 主要用于演示A2A功能
    • 非生产级代码

二、场景介绍

1. 核心场景代理实现

1.1 LangGraph 货币转换代理
  • 功能:提供货币汇率转换服务
  • 技术特点
    • 使用 LangGraph 框架
    • 集成 Google Gemini 模型
    • 支持多轮对话
    • 实时流式响应
    • 使用 Frankfurter API 获取实时汇率
  • 通信流程
    Client Server Agent API 发送货币查询 转发查询 获取汇率数据 返回汇率 处理结果 返回结果 Client Server Agent API
1.2 CrewAI 图像生成代理
  • 功能:基于文本描述生成图像
  • 技术特点
    • 使用 CrewAI 框架
    • 集成 Google Gemini API
    • 支持图像修改
    • 缓存系统
  • 通信流程
    Client Server Agent Gemini 发送图像生成请求 转发请求 生成图像 返回图像 存储并返回ID 返回图像 Client Server Agent Gemini

2. 通信协议(A2A)

2.1 请求格式
{"jsonrpc": "2.0","id": "unique_id","method": "tasks/send","params": {"id": "task_id","sessionId": "session_id","acceptedOutputModes": ["text"],"message": {"role": "user","parts": [{"type": "text","text": "query"}]}}
}
2.2 响应格式
{"jsonrpc": "2.0","id": "unique_id","result": {"id": "task_id","status": {"state": "completed","timestamp": "timestamp"},"artifacts": [{"parts": [{"type": "text","text": "response"}],"index": 0}]}
}

三、Client代码实现

  1. 核心类:A2AClient

a) 初始化

def __init__(self, agent_card: AgentCard = None, url: str = None, timeout: TimeoutTypes = 60.0):# 支持两种初始化方式:# 1. 通过 AgentCard 初始化# 2. 直接通过 URL 初始化# 默认超时时间 60 秒

b) 主要方法

# 1. 发送任务
async def send_task(self, payload: dict[str, Any]) -> SendTaskResponse:# 同步任务发送# 返回任务响应# 2. 流式任务
async def send_task_streaming(self, payload: dict[str, Any]) -> AsyncIterable[SendTaskStreamingResponse]:# 支持 Server-Sent Events (SSE)# 返回流式响应# 3. 获取任务
async def get_task(self, payload: dict[str, Any]) -> GetTaskResponse:# 获取任务状态和结果# 4. 取消任务
async def cancel_task(self, payload: dict[str, Any]) -> CancelTaskResponse:# 取消正在执行的任务# 5. 设置回调
async def set_task_callback(self, payload: dict[str, Any]) -> SetTaskPushNotificationResponse:# 设置任务完成后的回调通知# 6. 获取回调
async def get_task_callback(self, payload: dict[str, Any]) -> GetTaskPushNotificationResponse:# 获取当前任务的回调配置
  1. 辅助类:A2ACardResolver
class A2ACardResolver:def __init__(self, base_url, agent_card_path='/.well-known/agent.json'):# 初始化解析器# 默认从 /.well-known/agent.json 获取代理卡片def get_agent_card(self) -> AgentCard:# 获取并解析代理卡片# 返回 AgentCard 对象
  1. 使用示例
# 1. 基本使用
client = A2AClient(url="http://agent-server")
response = await client.send_task({"message": "生成一张图片","sessionId": "session-123"
})# 2. 流式响应
async for event in client.send_task_streaming({"message": "生成一张图片","sessionId": "session-123"
}):print(event)# 3. 获取代理卡片
resolver = A2ACardResolver("http://agent-server")
card = resolver.get_agent_card()

客户端实现提供了:

  1. 完整的 A2A 协议支持
  2. 异步操作支持
  3. 类型安全
  4. 错误处理
  5. 流式响应
  6. 代理发现

四、Server实现

  1. 核心类:A2AServer

a) 初始化

def __init__(self, host='0.0.0.0', port=5000, endpoint='/', agent_card: AgentCard = None, task_manager: TaskManager = None):# 配置服务器参数# 初始化路由# 设置代理卡片和任务管理器

b) 主要功能

# 1. 启动服务器
def start(self):# 验证必要组件# 启动 uvicorn 服务器# 2. 处理请求
async def _process_request(self, request: Request):# 解析请求# 路由到对应的处理方法# 返回响应# 3. 获取代理卡片
def _get_agent_card(self, request: Request) -> JSONResponse:# 返回代理卡片信息
  1. 任务管理器:TaskManager

a) 抽象基类

class TaskManager(ABC):# 定义任务管理接口@abstractmethodasync def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse@abstractmethodasync def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse@abstractmethodasync def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse# ... 其他抽象方法

b) 内存实现

class InMemoryTaskManager(TaskManager):def __init__(self):# 初始化存储self.tasks = {}  # 任务存储self.push_notification_infos = {}  # 推送通知配置self.task_sse_subscribers = {}  # SSE 订阅者
  1. 关键功能实现

a) 任务管理

# 1. 获取任务
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:# 获取任务状态和结果# 2. 取消任务
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:# 取消正在执行的任务# 3. 更新任务
async def update_store(self, task_id: str, status: TaskStatus, artifacts: list[Artifact]) -> Task:# 更新任务状态和结果

b) 推送通知

# 1. 设置通知
async def set_push_notification_info(self, task_id: str, notification_config: PushNotificationConfig):# 配置任务完成通知# 2. 获取通知
async def get_push_notification_info(self, task_id: str) -> PushNotificationConfig:# 获取任务通知配置

c) SSE 支持

# 1. 设置 SSE 消费者
async def setup_sse_consumer(self, task_id: str, is_resubscribe: bool = False):# 创建 SSE 事件队列# 2. 事件入队
async def enqueue_events_for_sse(self, task_id, task_update_event):# 将事件发送给订阅者# 3. 事件出队
async def dequeue_events_for_sse(self, request_id, task_id, sse_event_queue):# 从队列获取事件并发送
  1. 错误处理
def _handle_exception(self, e: Exception) -> JSONResponse:# 处理不同类型的错误if isinstance(e, json.decoder.JSONDecodeError):json_rpc_error = JSONParseError()elif isinstance(e, ValidationError):json_rpc_error = InvalidRequestError()else:json_rpc_error = InternalError()

这个服务器实现提供了:

  1. 完整的 A2A 协议支持
  2. 异步操作支持
  3. 任务管理
  4. 推送通知
  5. 流式响应
  6. 错误处理

一个功能完整的 A2A 服务器实现,可以作为其他语言实现的参考。

五、LangGraph 货币转换Agent

/samples/python/agents/langgraph

  1. 整体架构
  • 采用三层架构:
    • __main__.py: 服务器入口点
    • agent.py: 核心代理实现
    • task_manager.py: 任务管理实现
  1. 核心组件详解

a) 入口点 (__main__.py)

- 配置服务器参数(host, port)
- 设置代理能力(streaming, pushNotifications)
- 定义代理技能(convert_currency)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器

b) 代理实现 (agent.py)

- 使用 LangGraph 框架实现 ReAct 模式
- 核心组件:1. 工具函数:get_exchange_rate- 调用 Frankfurter API 获取实时汇率- 支持指定日期查询- 错误处理和响应验证2. 响应格式:ResponseFormat- status: input_required/completed/error- message: 响应消息3. CurrencyAgent 类- 使用 Google Gemini 2.0 Flash 模型- 支持同步调用(invoke)和流式响应(stream)- 状态管理和会话追踪

c) 任务管理器 (task_manager.py)

- 继承自 InMemoryTaskManager
- 主要功能:1. 任务验证- 检查输出模式兼容性- 验证推送通知配置2. 任务处理- 同步任务处理(on_send_task)- 流式任务处理(on_send_task_subscribe)- 任务状态更新和通知3. 推送通知- 支持任务状态变更通知- 验证通知 URL 所有权
  1. 关键流程

a) 同步请求流程

1. 客户端发送请求
2. 任务管理器验证请求
3. 代理处理请求
4. 更新任务状态
5. 发送响应

b) 流式请求流程

1. 客户端订阅任务
2. 创建 SSE 事件队列
3. 异步处理代理响应
4. 实时更新任务状态
5. 发送流式事件

六、CrewAI 图像生成Agent

  1. 整体架构
  • 采用三层架构:
    • __main__.py: 服务器入口点
    • agent.py: 核心代理实现
    • task_manager.py: 任务管理实现
  1. 核心组件详解

a) 入口点 (__main__.py)

- 配置服务器参数(host, port)
- 设置代理能力(streaming=False- 定义代理技能(image_generator)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器

b) 代理实现 (agent.py)

- 使用 CrewAI 框架实现图像生成
- 核心组件:1. 数据模型:Imagedata- id: 图像唯一标识- name: 图像名称- mime_type: MIME类型- bytes: Base64编码的图像数据- error: 错误信息2. 工具函数:generate_image_tool- 使用 Google Gemini API 生成图像- 支持图像修改- 缓存管理- 错误处理3. ImageGenerationAgent 类- 支持文本和图像输入- 使用 CrewAI 框架- 图像生成和修改功能- 会话状态管理

c) 任务管理器 (task_manager.py)

- 继承自 InMemoryTaskManager
- 主要功能:1. 任务处理- 验证输出模式- 处理任务请求- 管理任务状态2. 图像处理- 获取图像数据- 处理图像响应- 错误处理
  1. 关键流程

a) 图像生成流程

1. 接收用户提示
2. 创建 CrewAI 任务
3. 调用 Gemini API
4. 处理生成的图像
5. 返回图像数据

b) 图像修改流程

1. 接收修改请求
2. 获取参考图像
3. 生成新图像
4. 更新缓存
5. 返回结果

相关文章:

  • 小程序消息订阅的整个实现流程
  • TOGAF 企业架构介绍(4A架构)
  • ADV7842KBCZ - 5 富利威长期稳定供应
  • 代理ARP与传统ARP在网络通信中的应用及区别研究
  • Linux快速入门
  • C++ - 函数重载
  • 深入解析多线程与多进程:从理论到Python实践
  • C语言—指针3
  • 若依定制pdf生成实战
  • gradle3.5的安装以及配置环境变量
  • PX4开始之旅(二)通过自定义 MAVLink 消息与 QGroundControl (QGC) 通信
  • 力扣题解:21.合并两个有序链表(C语言)
  • 2025数维杯数学建模C题完整分析参考论文(共36页)(含模型、可运行代码、数据)
  • 赛季7靶场 - Environment
  • Android 移动应用开发:页面跳转与数据传递功能
  • Android屏蔽通话功能和短信功能
  • MySQL(4)如何查看MySQL数据库的版本?
  • 『不废话』之Python 3.14 Beta版新特性
  • 【传感器】代码——DHT11温湿度传感器
  • 从0开始学linux韦东山教程第一三章问题小结(1)
  • 洗冤录|县令遇豪强:黄榦处理的一起地产纠纷案
  • 铲屎官花5万带猫狗旅行,宠旅生意有多赚?
  • 北上广深均宣布下调个人住房公积金贷款利率
  • 云南临沧一行贿案金额认定比受贿案多41万,重审时检方变更金额起诉
  • 习近平抵达莫斯科对俄罗斯进行国事访问并出席纪念苏联伟大卫国战争胜利80周年庆典
  • 视频丨习近平主席出席俄方在机场举行的迎宾仪式