Google A2A协议解析:构建分布式异构多Agent系统
一、A2A 是什么?有什么用?
1.1 A2A 是什么?
A2A(Agent-to-Agent Protocol)是Google最新推出的一项开源协议,旨在为AI智能体(Agents)提供标准化的通信方式。它允许不同框架(如LangChain、CrewAI)或不同供应商开发的智能体无缝协作,打破智能体间的隔离状态。A2A通过统一的通信格式和交互规则,让智能体像网络服务一样自由“对话”。
[图片来源于Google A2A官网文档]
核心目标:
- 互操作性:无论智能体基于何种技术栈,A2A都提供通用的通信“语言”。
- 模块化:支持智能体动态发现彼此并灵活组合,完成复杂任务。
- 安全性:内置认证和授权机制,确保通信安全。
- 开放性:A2A采用Apache 2.0许可,得到Atlassian、Salesforce、Cohere等50多家企业的支持。
1.2 A2A 有什么用?
A2A解决的是AI系统中智能体碎片化的问题。例如,在企业中,HR系统可能使用Salesforce智能体管理员工数据,而财务系统使用SAP智能体处理报销。如果这些智能体无法直接通信,用户需要手动传递数据,效率低下。
A2A的价值:
- 协作简化:智能体间直接交换数据和任务,例如HR智能体请求财务智能体验证报销状态。
- 任务自动化:支持任务委派和状态跟踪,减少人工干预。
- 生态扩展:开发者可构建专用智能体,通过A2A集成到更大生态中。
- 企业应用:支持实时通信、安全认证,适应复杂企业场景。
应用场景:
- 招聘流程:HR智能体筛选候选人,委托LinkedIn智能体搜索简历,再由日历智能体安排面试。
- 客户服务:客服智能体接收请求,调用库存智能体检查商品,再通过支付智能体完成交易。
1.3 A2A与 MCP的关系
什么是 MCP?
MCP(Model Context Protocol)是一个协议,专注于为 AI 代理提供对工具、数据和上下文的标准化访问。它主要作用于单个代理层面,通过集成外部资源来增强代理的内部能力。MCP 通常与基于语言模型的代理相关联,帮助它们更高效地执行任务。
维度 | A2A | MCP |
---|---|---|
目的 | 促进 AI 代理之间的通信和协作 | 为 AI 代理提供对工具、数据和上下文的标准化访问 |
关注点 | 代理之间的外部交互,强调多代理协同 | 代理的内部能力,提升资源利用效率 |
使用场景 | 代理需要相互配合的场景,如任务委派、信息共享或协调行动 | 代理需要调用外部工具或数据的场景,如处理文档或执行特定功能 |
适用范围 | 适用于任何类型的 AI 代理,具有通用性 | 主要针对基于语言模型的代理,应用范围更具体 |
模态支持 | 支持多种模态(文本、音频、视频等),适合多样化的通信需求 | 主要处理文本交互,未明确强调多模态通信 |
示例 | 代理与代理或用户通信,如请求信息、协调行动 | 代理调用具体工具,如执行物理操作或处理数据 |
互补关系 | 侧重于代理间的协作,适合多代理系统 | 侧重于代理的资源访问,增强单个代理的能力 |
建议应用程序将 A2A 智能体建模为 MCP 资源(由其智能体卡片表示)。然后,这些框架可以使用 A2A 与用户、远程智能体以及其他智能体进行通信。
A2A与MCP整体集成方式如下:
[图片来源于A2A官网说明]
总结
- A2A:专注于代理之间的通信和协作,适用于多代理协同工作的场景。
- MCP:专注于代理对工具和数据的访问,旨在提升单个代理的内部能力。
- 互补性:A2A 实现代理间的“对话”,MCP 提供代理“做事”的能力,两者可以结合使用以构建更强大的 AI 系统。
二、A2A 的核心原理
A2A基于HTTP+JSON协议,设计灵感来自分布式系统和微服务架构。以下是其核心概念和工作流程。
2.1 核心概念
-
Agent Card(智能体名片)
- 一个公开的JSON元数据文件(通常位于
/.well-known/agent.json
),描述智能体的名称、能力、端点URL和认证要求。 - 其他智能体通过Agent Card发现目标智能体的功能。
- 示例:
{ "name": "EchoAgent", "description": "A simple echo agent that repeats messages", "url": "http://localhost:5000", "authentication": {"schemes": ["none"]}, "capabilities": {"streaming": false, "pushNotifications": false} }
- 一个公开的JSON元数据文件(通常位于
-
A2A Server
- 智能体暴露的HTTP服务端点,实现A2A协议的方法(如
tasks/send
)。 - 负责接收请求、执行任务并返回结果。
- 智能体暴露的HTTP服务端点,实现A2A协议的方法(如
-
A2A Client
- 向A2A Server发送请求的应用或智能体,用于发起任务或对话。
- 通过Agent Card定位Server。
-
Task(任务)
- A2A的基本工作单元,通过
tasks/send
或tasks/sendSubscribe
发起。 - 任务有唯一ID,支持状态管理:
submitted
、working
、input-required
、completed
、failed
、canceled
。
- A2A的基本工作单元,通过
-
Message(消息)
- 客户端(
user
角色)与智能体(agent
角色)之间的单次通信。 - 由多个
Part
组成,支持文本、文件或结构化数据。
- 客户端(
2.2 工作流程
- 发现:客户端通过Agent Card了解目标智能体的能力。
- 任务发起:客户端发送
POST /tasks/send
请求,附带任务详情。 - 任务执行:Server处理任务,可能请求额外输入(
input-required
状态)。 - 状态更新:Server通过响应或推送通知更新任务状态。
- 结果返回:任务完成后,Server返回结果。
第三部分:A2A 实践教程 - 构建两个简单的Python智能体
本节参考Google A2A官方Python示例代码(https://github.com/google/A2A/tree/main/samples/python),详细构建一个客户端-服务器智能体系统。我们将创建一个“回声智能体”(Echo Agent),它接收消息并返回相同内容。通过分步构建、代码解读和执行结果分析,帮助读者掌握A2A的实践应用。
3.1 环境准备
-
安装Python
确保系统安装Python 3.8+。
安装依赖库:pip install flask requests
-
获取A2A示例代码
克隆Google A2A GitHub仓库:git clone https://github.com/google/A2A.git
示例代码位于
samples/python
目录。我们将基于此构建。
3.2 创建A2A Server(回声智能体)
我们使用Flask框架创建一个A2A Server,提供Agent Card和任务处理端点。
3.2.1 代码构建过程
步骤1:初始化Flask应用
创建一个名为server.py
的文件,导入Flask并初始化应用:
from flask import Flask, request, jsonify
app = Flask(__name__)
步骤2:实现Agent Card端点
添加/.well-known/agent.json
端点,返回智能体的元数据:
@app.route('/.well-known/agent.json')
def agent_card():
return jsonify({
"name": "EchoAgent",
"description": "A simple echo agent that repeats messages",
"url": "http://localhost:5000",
"authentication": {"schemes": ["none"]},
"capabilities": {
"streaming": false,
"pushNotifications": false
}
})
步骤3:实现任务处理端点
添加/tasks/send
端点,接收任务请求并返回回声响应:
@app.route('/tasks/send', methods=['POST'])
def handle_task():
data = request.json
task_id = data.get('taskId', 'task-001')
messages = data.get('messages', [])
# 解析用户消息并构造回声响应
for message in messages:
if message['role'] == 'user':
for part in message['parts']:
if part['type'] == 'text/plain':
user_text = part['data']
response = {
"taskId": task_id,
"state": "completed",
"messages": [{
"role": "agent",
"parts": [{
"type": "text/plain",
"data": user_text
}]
}]
}
return jsonify(response)
# 处理无效请求
return jsonify({"error": "Invalid request"}), 400
步骤4:运行服务
添加启动代码:
if __name__ == '__main__':
app.run(port=5000)
完整代码(server.py
):
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/.well-known/agent.json')
def agent_card():
return jsonify({
"name": "EchoAgent",
"description": "A simple echo agent that repeats messages",
"url": "http://localhost:5000",
"authentication": {"schemes": ["none"]},
"capabilities": {
"streaming": false,
"pushNotifications": false
}
})
@app.route('/tasks/send', methods=['POST'])
def handle_task():
data = request.json
task_id = data.get('taskId', 'task-001')
messages = data.get('messages', [])
for message in messages:
if message['role'] == 'user':
for part in message['parts']:
if part['type'] == 'text/plain':
user_text = part['data']
response = {
"taskId": task_id,
"state": "completed",
"messages": [{
"role": "agent",
"parts": [{
"type": "text/plain",
"data": user_text
}]
}]
}
return jsonify(response)
return jsonify({"error": "Invalid request"}), 400
if __name__ == '__main__':
app.run(port=5000)
3.2.2 代码解读
Agent Card端点
- 路径:
/.well-known/agent.json
- 功能:返回智能体的元数据,包括名称、描述、服务URL、认证方式和能力(如是否支持流式传输或推送通知)。
- 作用:这是A2A协议中智能体发现的基础,客户端通过此端点了解Server信息。
任务处理端点
- 路径:
/tasks/send
- 方法:POST
- 逻辑:
- 解析请求JSON,提取
taskId
和messages
。 - 遍历
messages
,找到role
为user
的消息。 - 检查消息的
parts
,提取text/plain
类型的data
(用户文本)。 - 构造响应,包含相同的文本,设置
state
为completed
,role
为agent
。 - 如果请求无效,返回400错误。
- 解析请求JSON,提取
3.2.3 运行与验证
运行Server:
python server.py
验证Agent Card:
访问http://localhost:5000/.well-known/agent.json
,预期输出:
{
"name": "EchoAgent",
"description": "A simple echo agent that repeats messages",
"url": "http://localhost:5000",
"authentication": {
"schemes": ["none"]
},
"capabilities": {
"streaming": false,
"pushNotifications": false
}
}
3.3 创建A2A Client
客户端将向Server发送消息并显示响应。
3.3.1 代码构建过程
步骤1:初始化客户端
创建client.py
,导入所需库并定义Server URL:
import requests
import json
SERVER_URL = "http://localhost:5000"
步骤2:获取Agent Card
添加函数以获取Server的Agent Card:
def get_agent_card():
response = requests.get(f"{SERVER_URL}/.well-known/agent.json")
return response.json()
步骤3:发送任务
添加函数构造并发送任务请求:
def send_task(message):
payload = {
"taskId": "task-001",
"messages": [{
"role": "user",
"parts": [{
"type": "text/plain",
"data": message
}]
}]
}
response = requests.post(f"{SERVER_URL}/tasks/send", json=payload)
return response.json()
步骤4:测试客户端
添加主函数,调用上述功能:
if __name__ == '__main__':
# 获取Agent Card
card = get_agent_card()
print("Agent Card:", json.dumps(card, indent=2))
# 发送测试消息
message = "Hello, A2A!"
result = send_task(message)
print("Task Result:", json.dumps(result, indent=2))
完整代码(client.py
):
import requests
import json
SERVER_URL = "http://localhost:5000"
def get_agent_card():
response = requests.get(f"{SERVER_URL}/.well-known/agent.json")
return response.json()
def send_task(message):
payload = {
"taskId": "task-001",
"messages": [{
"role": "user",
"parts": [{
"type": "text/plain",
"data": message
}]
}]
}
response = requests.post(f"{SERVER_URL}/tasks/send", json=payload)
return response.json()
if __name__ == '__main__':
card = get_agent_card()
print("Agent Card:", json.dumps(card, indent=2))
message = "Hello, A2A!"
result = send_task(message)
print("Task Result:", json.dumps(result, indent=2))
3.3.2 代码解读
获取Agent Card
- 函数:
get_agent_card
- 功能:通过GET请求获取Server的Agent Card,验证智能体信息。
发送任务
- 函数:
send_task
- 逻辑:
- 构造JSON payload,包含
taskId
和messages
。 messages
包含一个user
角色的消息,parts
为text/plain
类型的文本。- 发送POST请求到
/tasks/send
,返回Server响应。
- 构造JSON payload,包含
3.3.3 运行与结果分析
运行Client:
确保Server运行后,执行:
python client.py
预期输出:
Agent Card: {
"name": "EchoAgent",
"description": "A simple echo agent that repeats messages",
"url": "http://localhost:5000",
"authentication": {
"schemes": ["none"]
},
"capabilities": {
"streaming": false,
"pushNotifications": false
}
}
Task Result: {
"taskId": "task-001",
"state": "completed",
"messages": [
{
"role": "agent",
"parts": [
{
"type": "text/plain",
"data": "Hello, A2A!"
}
]
}
]
}
结果分析:
- Agent Card:客户端成功获取Server元数据,确认智能体名称、描述和能力。
- 任务执行:客户端发送消息"Hello, A2A!",Server返回相同内容,任务状态为
completed
。 - 通信流程:展示了A2A的发现(Agent Card)和任务处理(
/tasks/send
)完整过程。
3.4 扩展功能
-
添加认证
修改Agent Card支持OAuth2:"authentication": { "schemes": ["oauth2"], "credentials": "your-token-endpoint" }
在Server中验证请求头中的令牌。
-
支持推送通知
在Agent Card中启用pushNotifications: true
,使用WebSocket实现状态更新。 -
集成LLM
将Server与语言模型(如OpenAI)集成,生成动态响应而非简单回声。
第四部分:A2A 的进阶应用与注意事项
在掌握 A2A 协议的基础功能和初步实践后,进一步探索其在复杂场景中的应用以及实施时的注意事项,可以显著提升技术能力并充分发挥 A2A 的潜力。作为一种智能体协作协议,A2A 不仅是通信工具,更是构建高效、灵活的企业级 AI 系统的基石。本节将通过详细的场景分析和实现方法,深入讲解其进阶应用,并结合具体注意事项,确保系统在实际部署中的可靠性与高效性。
4.1 进阶应用
A2A 协议在多智能体协作、动态交互和企业级部署中展现了强大的能力。
4.1.1 多智能体协作
多智能体协作是 A2A 的核心优势之一,尤其在需要多个智能体协同完成复杂任务的场景中。例如,在企业招聘流程中,HR 智能体、LinkedIn 智能体和 Google Calendar 智能体可以通过 A2A 协议无缝协作,实现从候选人搜索到面试安排的全流程自动化。
实现过程:
设想一家公司需要招聘一名 Python 开发人员。流程从 HR 智能体开始:它首先生成一个唯一的任务 ID(如 task-001
),然后向 LinkedIn 智能体发送一个请求,内容可能是这样的 JSON 格式消息:
{
"task_id": "task-001",
"action": "search_candidates",
"parameters": {
"role": "Python developer",
"location": "remote",
"experience": "3-5 years"
}
}
LinkedIn 智能体接收到请求后,利用其内置的搜索功能在平台上查找匹配的候选人,并将结果整理为一个候选人列表,例如:
{
"task_id": "task-001",
"status": "completed",
"result": [
{"name": "Alice", "skills": ["Python", "Django"], "experience": "4 years"},
{"name": "Bob", "skills": ["Python", "Flask"], "experience": "3 years"}
]
}
HR 智能体接收到这个响应后,筛选出合适的候选人(例如 Alice),然后向 Google Calendar 智能体发送一个新请求:
{
"task_id": "task-002",
"action": "schedule_interview",
"parameters": {
"candidate": "Alice",
"date": "2023-11-10",
"time": "14:00-15:00"
}
}
Google Calendar 智能体检查日程安排,确认时间可用后,返回确认消息:
{
"task_id": "task-002",
"status": "completed",
"result": {"confirmation": "Interview scheduled for Alice on 2023-11-10, 14:00-15:00"}
}
应用价值:
这种协作模式通过任务状态(如“working”、“completed”)管理每个步骤,确保流程透明且可追溯。A2A 的消息传递机制让智能体之间能够清晰分工,类似人类团队中的协作。除了招聘,这种模式还可以扩展到客户服务(智能体分配工单)、库存管理(智能体协调供应链)等场景,大幅提升企业的自动化水平和运营效率。
4.1.2 表单交互
A2A 协议支持动态的用户交互,尤其适用于需要用户提供额外信息的场景。以企业报销流程为例,报销智能体可以在任务执行过程中请求用户填写费用详情,从而实现灵活的交互。
实现过程:
假设一名员工需要提交报销请求。他首先通过客户端发送一个初始请求:
{
"task_id": "expense-001",
"action": "submit_expense_report",
"user": "John"
}
报销智能体接收到请求后,发现需要更多信息,于是返回一个“input-required”状态的响应,并附带一个 HTML 表单:
{
"task_id": "expense-001",
"status": "input-required",
"form": "<form><label>Amount:</label><input name='amount'><label>Date:</label><input name='date' type='date'></form>"
}
客户端将这个表单渲染给用户,用户填写费用金额(如 $200
)和日期(如 2023-11-01
),然后提交一个新请求:
{
"task_id": "expense-001",
"action": "submit_form",
"data": {
"amount": 200,
"date": "2023-11-01"
}
}
报销智能体接收到数据后,验证信息无误,完成任务并返回确认:
{
"task_id": "expense-001",
"status": "completed",
"result": {"message": "Expense report submitted successfully"}
}
应用价值:
通过“input-required”状态和 HTML 表单,A2A 实现了智能体与用户之间的动态交互。这种机制非常适合需要用户输入的场景,如客户服务中的问题澄清、数据收集中的字段填写等。表单的灵活性还允许开发者根据任务需求调整字段,提升系统的适应性和用户体验。
4.1.3 企业级部署
A2A 协议支持将智能体部署到云端,实现高可用性和自动扩展。以 Google Cloud Run 为例,开发者可以将 A2A 智能体容器化并部署到云端,利用 Serverless 架构简化运维并支持高并发。
实现过程:
假设我们要部署一个处理招聘任务的 HR 智能体。首先,编写一个简单的 Dockerfile:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "hr_agent.py"]
接着,将智能体代码和依赖打包成 Docker 镜像,并在本地构建:
docker build -t hr-agent:latest .
然后,将镜像推送到 Google Container Registry(GCR):
docker tag hr-agent:latest gcr.io/my-project/hr-agent:latest
docker push gcr.io/my-project/hr-agent:latest
最后,通过 Google Cloud Run 部署镜像:
gcloud run deploy hr-agent \
--image gcr.io/my-project/hr-agent:latest \
--platform managed \
--region us-central1 \
--allow-unauthenticated
部署完成后,Cloud Run 会返回一个服务 URL(如 https://hr-agent-abc123-uc.a.run.app
),智能体即可通过此 URL 接收 A2A 请求。为了增强安全性,可以启用 OAuth2 认证,只允许授权用户访问。
应用价值:
Cloud Run 的 Serverless 特性让智能体能够根据请求量自动扩展,无需手动管理服务器。同时,Google Cloud 的负载均衡功能确保服务在高并发下的稳定性。这种部署方式简化了运维,支持快速迭代和多区域部署,非常适合全球化企业的需求。
4.2 注意事项
在实施 A2A 时,需要关注多个方面,以确保系统的安全性、性能和用户体验。以下是对每个注意事项的详细分析和建议。
4.2.1 安全性
安全性是企业级应用的重中之重。A2A 协议提供了多种机制来保护系统和数据。
例如,在招聘场景中,HR 智能体可能需要访问敏感的员工数据。为了防止未授权访问,可以通过 OAuth2 认证为每个智能体分配一个访问令牌。请求示例:
{
"task_id": "task-001",
"action": "search_candidates",
"auth_token": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
此外,所有通信都应强制使用 HTTPS 协议,确保数据在传输过程中加密。权限管理也很关键:HR 智能体只能访问员工数据,而报销智能体只能处理财务信息,通过细粒度的角色控制防止数据滥用。
4.2.2 性能优化
在高并发场景下,性能优化尤为重要。以招聘流程为例,如果同时有数百个招聘任务,系统需要高效处理请求。
开发者可以使用 FastAPI 替代传统的 Flask,因为 FastAPI 支持异步处理,能显著提升并发能力。此外,对于频繁访问的静态数据(如 Agent Card),可以引入 Redis 缓存:
import redis
cache = redis.Redis(host='localhost', port=6379)
cache.set('agent_card', json.dumps(agent_card_data))
在部署时,可以结合 Google Cloud Load Balancer,将流量分发到多个智能体实例,确保系统稳定运行。
4.2.3 错误处理
良好的错误处理能提升系统的健壮性。例如,如果 LinkedIn 智能体因网络问题无法返回候选人列表,应返回详细错误信息:
{
"task_id": "task-001",
"status": "failed",
"error": {"code": 503, "message": "Service unavailable, please try again later"}
}
同时,使用 Python 的 logging
模块记录每次请求和响应,方便调试:
import logging
logging.basicConfig(level=logging.INFO)
logging.info(f"Task {task_id} failed: {error_message}")
客户端还可以实现指数退避重试策略,应对临时性网络问题。
4.2.4 兼容性
A2A 的兼容性直接影响多智能体协作的效果。开发者应使用 jsonschema
验证消息格式,例如:
from jsonschema import validate
schema = {"type": "object", "properties": {"task_id": {"type": "string"}}}
validate(instance=request, schema=schema)
在集成不同框架(如 LangChain、CrewAI)的智能体时,需进行互操作性测试,确保消息传递无误。此外,跟踪 A2A 协议的版本更新,及时适配新功能。
4.2.5 任务状态管理
任务状态管理是 A2A 的核心。例如,招聘任务可能经历“submitted”、“working”、“completed”等状态。开发者应设计清晰的状态机,并设置超时机制:
import time
if time.time() - task_start_time > 300: # 5分钟超时
task_status = "failed"
使用 SQLite 持久化状态,确保系统重启后任务进度不丢失。
4.2.6 消息设计
A2A 支持多种消息类型。例如,报销智能体可能需要传递图像(如收据扫描件),可以使用嵌套 JSON:
{
"task_id": "expense-001",
"data": {
"text": "Receipt for dinner",
"image": "..."
}
}
这种结构化设计确保数据完整且易于解析。
4.2.7 用户体验
优秀的用户体验能提升系统满意度。例如,通过 WebSocket 实时推送任务进度:
async def notify_user(websocket, task_id, status):
await websocket.send_json({"task_id": task_id, "status": status})
为“input-required”任务提供直观的 UI,并确保错误提示清晰易懂,如“金额必须为正数”。
第五部分:总结与下一步
5.1 总结
通过本教程,你可能了解了Google A2A:
- 基础:掌握A2A的用途、核心概念和工作原理。
- 实践:基于官方Python示例,构建并运行了回声智能体。
- 进阶:学习了多智能体协作、表单交互和企业部署。
- 注意事项:了解安全性、性能等关键实践。
A2A的开放性和标准化特性使其成为构建智能体生态的理想选择。
5.2 下一步如何学习
- 探索示例:尝试A2A仓库中的其他样本,如多轮对话智能体。
- 社区参与:加入Google A2A GitHub讨论。
- 企业集成:对接Salesforce等平台,验证真实场景。
- 持续学习:关注Google A2A 官网更新。