【A2A】管中窥豹,google源码python-demo介绍
前言
A2A(Agent2Agent)是 Google 推出的一项新协议,旨在解决多智能体(Multi-Agent)系统中跨平台、跨组织协作的难题。它为 AI 代理之间的通信、协作和任务分工提供了一个统一的标准,可以类比为网页世界的 HTTP 协议——即 AI 代理之间的“通用语言”
项目地址:https://github.com/google/A2A/tree/main/samples/python
一、项目架构介绍
1. 项目概述
基于 A2A (Agent-to-Agent) 协议的示例项目,展示了如何使用不同的 AI 框架来实现智能代理之间的通信和协作。
2. 项目结构
samples/python/
├── agents/ # 各种AI代理实现
│ ├── ag2/ # AG2框架实现的代理
│ ├── crewai/ # CrewAI框架实现的代理
│ ├── langgraph/ # LangGraph框架实现的代理
│ ├── google_adk/ # Google ADK框架实现的代理
│ ├── llama_index_file_chat/ # LlamaIndex实现的文件聊天代理
│ ├── marvin/ # Marvin框架实现的代理
│ ├── mindsdb/ # MindsDB实现的代理
│ └── semantickernel/ # Semantic Kernel实现的代理
├── hosts/ # 客户端实现
├── common/ # 公共代码
└── .vscode/ # VS Code配置
3. 技术特点
-
多框架支持:
- 支持多种流行的AI框架(LangGraph、CrewAI、AG2等)
- 每个框架都有独立的代理实现
-
标准化通信:
- 使用A2A协议进行代理间通信
- 基于HTTP的通信机制
- 统一的请求/响应格式
-
客户端实现:
- 提供CLI命令行客户端
- 支持与多个代理的交互
- 包含任务编排功能
4. 运行环境要求
- Python 3.13 或更高版本
- UV 包管理器
- 各框架所需的API密钥(如Google API Key等)
5. 使用流程
-
启动代理服务器:
cd samples/python/agents/[agent_name] uv run .
-
启动客户端:
cd samples/python/hosts/cli uv run .
6. 主要功能模块
6.1 代理实现(agents/)
- AG2代理:基于AG2框架的代理实现
- CrewAI代理:使用CrewAI框架的代理
- LangGraph代理:基于LangGraph的代理
- Google ADK代理:使用Google Agent Development Kit的代理
- LlamaIndex代理:文件聊天功能
- Marvin代理:基于Marvin框架的代理
- MindsDB代理:数据库交互代理
- Semantic Kernel代理:基于Microsoft Semantic Kernel的代理
6.2 客户端实现(hosts/)
- CLI命令行界面
- 支持多代理交互
- 任务编排功能
6.3 公共模块(common/)
- A2A协议实现
- 共享工具类
- 通用接口定义
7. 项目特点
-
模块化设计:
- 每个代理都是独立的模块
- 可以单独运行和测试
-
标准化接口:
- 统一的A2A协议
- 一致的通信格式
-
可扩展性:
- 易于添加新的代理实现
- 支持不同的AI框架
-
示例性质:
- 主要用于演示A2A功能
- 非生产级代码
二、场景介绍
1. 核心场景代理实现
1.1 LangGraph 货币转换代理
- 功能:提供货币汇率转换服务
- 技术特点:
- 使用 LangGraph 框架
- 集成 Google Gemini 模型
- 支持多轮对话
- 实时流式响应
- 使用 Frankfurter API 获取实时汇率
- 通信流程:
1.2 CrewAI 图像生成代理
- 功能:基于文本描述生成图像
- 技术特点:
- 使用 CrewAI 框架
- 集成 Google Gemini API
- 支持图像修改
- 缓存系统
- 通信流程:
2. 通信协议(A2A)
2.1 请求格式
{"jsonrpc": "2.0","id": "unique_id","method": "tasks/send","params": {"id": "task_id","sessionId": "session_id","acceptedOutputModes": ["text"],"message": {"role": "user","parts": [{"type": "text","text": "query"}]}}
}
2.2 响应格式
{"jsonrpc": "2.0","id": "unique_id","result": {"id": "task_id","status": {"state": "completed","timestamp": "timestamp"},"artifacts": [{"parts": [{"type": "text","text": "response"}],"index": 0}]}
}
三、Client代码实现
- 核心类:A2AClient
a) 初始化
def __init__(self, agent_card: AgentCard = None, url: str = None, timeout: TimeoutTypes = 60.0):# 支持两种初始化方式:# 1. 通过 AgentCard 初始化# 2. 直接通过 URL 初始化# 默认超时时间 60 秒
b) 主要方法
# 1. 发送任务
async def send_task(self, payload: dict[str, Any]) -> SendTaskResponse:# 同步任务发送# 返回任务响应# 2. 流式任务
async def send_task_streaming(self, payload: dict[str, Any]) -> AsyncIterable[SendTaskStreamingResponse]:# 支持 Server-Sent Events (SSE)# 返回流式响应# 3. 获取任务
async def get_task(self, payload: dict[str, Any]) -> GetTaskResponse:# 获取任务状态和结果# 4. 取消任务
async def cancel_task(self, payload: dict[str, Any]) -> CancelTaskResponse:# 取消正在执行的任务# 5. 设置回调
async def set_task_callback(self, payload: dict[str, Any]) -> SetTaskPushNotificationResponse:# 设置任务完成后的回调通知# 6. 获取回调
async def get_task_callback(self, payload: dict[str, Any]) -> GetTaskPushNotificationResponse:# 获取当前任务的回调配置
- 辅助类:A2ACardResolver
class A2ACardResolver:def __init__(self, base_url, agent_card_path='/.well-known/agent.json'):# 初始化解析器# 默认从 /.well-known/agent.json 获取代理卡片def get_agent_card(self) -> AgentCard:# 获取并解析代理卡片# 返回 AgentCard 对象
- 使用示例
# 1. 基本使用
client = A2AClient(url="http://agent-server")
response = await client.send_task({"message": "生成一张图片","sessionId": "session-123"
})# 2. 流式响应
async for event in client.send_task_streaming({"message": "生成一张图片","sessionId": "session-123"
}):print(event)# 3. 获取代理卡片
resolver = A2ACardResolver("http://agent-server")
card = resolver.get_agent_card()
客户端实现提供了:
- 完整的 A2A 协议支持
- 异步操作支持
- 类型安全
- 错误处理
- 流式响应
- 代理发现
四、Server实现
- 核心类:A2AServer
a) 初始化
def __init__(self, host='0.0.0.0', port=5000, endpoint='/', agent_card: AgentCard = None, task_manager: TaskManager = None):# 配置服务器参数# 初始化路由# 设置代理卡片和任务管理器
b) 主要功能
# 1. 启动服务器
def start(self):# 验证必要组件# 启动 uvicorn 服务器# 2. 处理请求
async def _process_request(self, request: Request):# 解析请求# 路由到对应的处理方法# 返回响应# 3. 获取代理卡片
def _get_agent_card(self, request: Request) -> JSONResponse:# 返回代理卡片信息
- 任务管理器:TaskManager
a) 抽象基类
class TaskManager(ABC):# 定义任务管理接口@abstractmethodasync def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse@abstractmethodasync def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse@abstractmethodasync def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse# ... 其他抽象方法
b) 内存实现
class InMemoryTaskManager(TaskManager):def __init__(self):# 初始化存储self.tasks = {} # 任务存储self.push_notification_infos = {} # 推送通知配置self.task_sse_subscribers = {} # SSE 订阅者
- 关键功能实现
a) 任务管理
# 1. 获取任务
async def on_get_task(self, request: GetTaskRequest) -> GetTaskResponse:# 获取任务状态和结果# 2. 取消任务
async def on_cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:# 取消正在执行的任务# 3. 更新任务
async def update_store(self, task_id: str, status: TaskStatus, artifacts: list[Artifact]) -> Task:# 更新任务状态和结果
b) 推送通知
# 1. 设置通知
async def set_push_notification_info(self, task_id: str, notification_config: PushNotificationConfig):# 配置任务完成通知# 2. 获取通知
async def get_push_notification_info(self, task_id: str) -> PushNotificationConfig:# 获取任务通知配置
c) SSE 支持
# 1. 设置 SSE 消费者
async def setup_sse_consumer(self, task_id: str, is_resubscribe: bool = False):# 创建 SSE 事件队列# 2. 事件入队
async def enqueue_events_for_sse(self, task_id, task_update_event):# 将事件发送给订阅者# 3. 事件出队
async def dequeue_events_for_sse(self, request_id, task_id, sse_event_queue):# 从队列获取事件并发送
- 错误处理
def _handle_exception(self, e: Exception) -> JSONResponse:# 处理不同类型的错误if isinstance(e, json.decoder.JSONDecodeError):json_rpc_error = JSONParseError()elif isinstance(e, ValidationError):json_rpc_error = InvalidRequestError()else:json_rpc_error = InternalError()
这个服务器实现提供了:
- 完整的 A2A 协议支持
- 异步操作支持
- 任务管理
- 推送通知
- 流式响应
- 错误处理
一个功能完整的 A2A 服务器实现,可以作为其他语言实现的参考。
五、LangGraph 货币转换Agent
/samples/python/agents/langgraph
- 整体架构
- 采用三层架构:
__main__.py
: 服务器入口点agent.py
: 核心代理实现task_manager.py
: 任务管理实现
- 核心组件详解
a) 入口点 (__main__.py
)
- 配置服务器参数(host, port)
- 设置代理能力(streaming, pushNotifications)
- 定义代理技能(convert_currency)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器
b) 代理实现 (agent.py
)
- 使用 LangGraph 框架实现 ReAct 模式
- 核心组件:1. 工具函数:get_exchange_rate- 调用 Frankfurter API 获取实时汇率- 支持指定日期查询- 错误处理和响应验证2. 响应格式:ResponseFormat- status: input_required/completed/error- message: 响应消息3. CurrencyAgent 类- 使用 Google Gemini 2.0 Flash 模型- 支持同步调用(invoke)和流式响应(stream)- 状态管理和会话追踪
c) 任务管理器 (task_manager.py
)
- 继承自 InMemoryTaskManager
- 主要功能:1. 任务验证- 检查输出模式兼容性- 验证推送通知配置2. 任务处理- 同步任务处理(on_send_task)- 流式任务处理(on_send_task_subscribe)- 任务状态更新和通知3. 推送通知- 支持任务状态变更通知- 验证通知 URL 所有权
- 关键流程
a) 同步请求流程
1. 客户端发送请求
2. 任务管理器验证请求
3. 代理处理请求
4. 更新任务状态
5. 发送响应
b) 流式请求流程
1. 客户端订阅任务
2. 创建 SSE 事件队列
3. 异步处理代理响应
4. 实时更新任务状态
5. 发送流式事件
六、CrewAI 图像生成Agent
- 整体架构
- 采用三层架构:
__main__.py
: 服务器入口点agent.py
: 核心代理实现task_manager.py
: 任务管理实现
- 核心组件详解
a) 入口点 (__main__.py
)
- 配置服务器参数(host, port)
- 设置代理能力(streaming=False)
- 定义代理技能(image_generator)
- 创建代理卡片(AgentCard)
- 初始化服务器和任务管理器
b) 代理实现 (agent.py
)
- 使用 CrewAI 框架实现图像生成
- 核心组件:1. 数据模型:Imagedata- id: 图像唯一标识- name: 图像名称- mime_type: MIME类型- bytes: Base64编码的图像数据- error: 错误信息2. 工具函数:generate_image_tool- 使用 Google Gemini API 生成图像- 支持图像修改- 缓存管理- 错误处理3. ImageGenerationAgent 类- 支持文本和图像输入- 使用 CrewAI 框架- 图像生成和修改功能- 会话状态管理
c) 任务管理器 (task_manager.py
)
- 继承自 InMemoryTaskManager
- 主要功能:1. 任务处理- 验证输出模式- 处理任务请求- 管理任务状态2. 图像处理- 获取图像数据- 处理图像响应- 错误处理
- 关键流程
a) 图像生成流程
1. 接收用户提示
2. 创建 CrewAI 任务
3. 调用 Gemini API
4. 处理生成的图像
5. 返回图像数据
b) 图像修改流程
1. 接收修改请求
2. 获取参考图像
3. 生成新图像
4. 更新缓存
5. 返回结果