6种A2A(智能体到智能体)的协议方案
1. A2A 协议架构概述
A2A 协议是一个标准化的通信框架,使得来自不同平台、组织和框架的智能体能够无缝通信。以下是完整的架构:
2. 核心 A2A 组件
2.1 智能体卡片(发现机制)
智能体卡片是描述智能体能力的标准化 JSON 文档:
{"name": "金融分析智能体","version": "1.2.0","description": "专注于金融数据分析和报告","protocol_version": "A2A/1.0","capabilities": [{"name": "analyze_financial_data","description": "分析金融数据集并生成洞察","parameters": {"dataset_url": {"type": "string", "required": true},"analysis_type": {"type": "string", "enum": ["trend", "forecast", "comparison"]},"time_period": {"type": "string", "format": "date-range"}},"returns": {"type": "object","properties": {"insights": {"type": "array"},"charts": {"type": "array"},"recommendations": {"type": "array"}}}},{"name": "generate_report","description": "生成格式化的财务报告","parameters": {"analysis_id": {"type": "string", "required": true},"format": {"type": "string", "enum": ["pdf", "html", "excel"]}}}],"endpoints": {"chat": "/api/v1/chat","tasks": "/api/v1/tasks","status": "/api/v1/status","health": "/health"},"authentication": {"methods": ["oauth2", "api_key", "mtls"],"oauth2": {"authorization_url": "https://auth.company.com/oauth/authorize","token_url": "https://auth.company.com/oauth/token","scopes": ["financial_analysis", "report_generation"]}},"security": {"encryption": "TLS 1.3","data_retention": "30 days","compliance": ["SOC2", "GDPR", "HIPAA"]},"metadata": {"organization": "FinTech Corp","contact": "api-support@fintech.com","documentation": "https://docs.fintech.com/agents/financial-analysis","pricing": "https://fintech.com/pricing/agents","rate_limits": {"requests_per_minute": 100,"concurrent_tasks": 5}} }
2.2 任务管理系统
A2A 使用结构化的任务对象处理复杂的工作流:
from dataclasses import dataclass from typing import Dict, List, Any, Optional from enum import Enum import uuid from datetime import datetimeclass TaskStatus(Enum):PENDING = "pending" # 挂起RUNNING = "running" # 运行中COMPLETED = "completed" # 已完成FAILED = "failed" # 失败CANCELLED = "cancelled" # 已取消class TaskPriority(Enum):LOW = "low" # 低NORMAL = "normal" # 普通HIGH = "high" # 高URGENT = "urgent" # 紧急@dataclass class A2ATask:"""用于 A2A 通信的结构化任务"""id: stragent_id: strcapability: strparameters: Dict[str, Any]status: TaskStatus = TaskStatus.PENDINGpriority: TaskPriority = TaskPriority.NORMALcreated_at: datetime = Nonestarted_at: Optional[datetime] = Nonecompleted_at: Optional[datetime] = Noneresult: Optional[Dict[str, Any]] = Noneerror: Optional[str] = Nonedependencies: List[str] = None # 此任务所依赖的任务 IDmetadata: Dict[str, Any] = Nonedef __post_init__(self):if self.id is None:self.id = str(uuid.uuid4())if self.created_at is None:self.created_at = datetime.utcnow()if self.dependencies is None:self.dependencies = []if self.metadata is None:self.metadata = {}# 任务工作流示例 class A2ATaskManager:def __init__(self):self.tasks: Dict[str, A2ATask] = {}self.task_dependencies: Dict[str, List[str]] = {}def create_task(self, agent_id: str, capability: str, parameters: Dict[str, Any], **kwargs) -> A2ATask:"""创建一个新的 A2A 任务"""task = A2ATask(id=str(uuid.uuid4()),agent_id=agent_id,capability=capability,parameters=parameters,**kwargs)self.tasks[task.id] = taskreturn taskdef create_workflow(self, workflow_definition: List[Dict]) -> List[A2ATask]:"""创建具有依赖关系的任务工作流"""tasks = []task_map = {}# 首先创建所有任务for step in workflow_definition:task = self.create_task(agent_id=step['agent_id'],capability=step['capability'],parameters=step['parameters'],priority=TaskPriority(step.get('priority', 'normal')))tasks.append(task)task_map[step['step_id']] = task.id# 设置依赖关系for i, step in enumerate(workflow_definition):if 'depends_on' in step:dependency_ids = [task_map[dep] for dep in step['depends_on']]tasks[i].dependencies = dependency_idsreturn tasks# 工作流定义示例 financial_analysis_workflow = [{"step_id": "data_collection", # 数据收集"agent_id": "data_collector_agent", # 数据收集器智能体"capability": "collect_financial_data", # 收集金融数据"parameters": {"sources": ["yahoo_finance", "bloomberg"], "symbols": ["AAPL", "GOOGL"]},"priority": "high" # 高},{"step_id": "data_analysis", # 数据分析"agent_id": "analysis_agent", # 分析智能体"capability": "analyze_financial_data", # 分析金融数据"parameters": {"analysis_type": "trend"}, # 趋势分析"depends_on": ["data_collection"] # 依赖于数据收集},{"step_id": "report_generation", # 报告生成"agent_id": "report_agent", # 报告智能体"capability": "generate_report", # 生成报告"parameters": {"format": "pdf"}, # PDF格式"depends_on": ["data_analysis"] # 依赖于数据分析} ]
2.3 安全与认证
A2A 提供具有多种认证方法的企业级安全性:
import jwt import httpx from typing import Optional from datetime import datetime, timedeltaclass A2ASecurityManager:"""处理 A2A 认证和授权"""def __init__(self):self.auth_methods = {'oauth2': self.oauth2_auth,'api_key': self.api_key_auth,'mtls': self.mtls_auth,'jwt': self.jwt_auth}async def authenticate_agent(self, agent_card: dict, credentials: dict) -> dict:"""与 A2A 智能体进行认证"""auth_config = agent_card.get('authentication', {})supported_methods = auth_config.get('methods', ['api_key'])# 按偏好顺序尝试认证方法for method in supported_methods:if method in credentials:try:auth_result = await self.auth_methods[method](agent_card, credentials[method])return auth_resultexcept Exception as e:print(f"{method} 认证失败: {e}")continueraise Exception("所有认证方法均失败")async def oauth2_auth(self, agent_card: dict, oauth_config: dict) -> dict:"""OAuth 2.0 认证流程"""auth_config = agent_card['authentication']['oauth2']# 步骤 1: 获取授权码 (简化版)auth_url = auth_config['authorization_url']client_id = oauth_config['client_id']redirect_uri = oauth_config['redirect_uri']scopes = ' '.join(auth_config.get('scopes', []))# 步骤 2: 用 code 交换 tokentoken_url = auth_config['token_url']token_data = {'grant_type': 'authorization_code','client_id': client_id,'client_secret': oauth_config['client_secret'],'code': oauth_config['authorization_code'],'redirect_uri': redirect_uri}async with httpx.AsyncClient() as client:response = await client.post(token_url, data=token_data)token_response = response.json()return {'method': 'oauth2','access_token': token_response['access_token'],'token_type': token_response.get('token_type', 'Bearer'),'expires_at': datetime.utcnow() + timedelta(seconds=token_response.get('expires_in', 3600))}async def api_key_auth(self, agent_card: dict, api_key: str) -> dict:"""API 密钥认证"""return {'method': 'api_key','api_key': api_key,'header_name': agent_card.get('authentication', {}).get('api_key_header', 'X-API-Key')}async def mtls_auth(self, agent_card: dict, mtls_config: dict) -> dict:"""双向 TLS (mTLS) 认证"""return {'method': 'mtls','cert_file': mtls_config['cert_file'],'key_file': mtls_config['key_file'],'ca_file': mtls_config.get('ca_file')}async def jwt_auth(self, agent_card: dict, jwt_config: dict) -> dict:"""JWT 令牌认证"""payload = {'agent_id': jwt_config['agent_id'],'iat': datetime.utcnow(),'exp': datetime.utcnow() + timedelta(hours=1),'aud': agent_card['name']}token = jwt.encode(payload, jwt_config['private_key'], algorithm='RS256')return {'method': 'jwt','token': token,'algorithm': 'RS256'}# 使用示例 security_manager = A2ASecurityManager()credentials = {'oauth2': {'client_id': 'your_client_id','client_secret': 'your_client_secret','authorization_code': 'auth_code_from_redirect','redirect_uri': 'https://your-app.com/callback'},'api_key': 'your_api_key_here' }# 与智能体进行认证 auth_result = await security_manager.authenticate_agent(agent_card, credentials)
3. A2A 通信模式
3.1 同步请求-响应
class A2ASyncClient:"""同步 A2A 通信"""async def call_agent_capability(self, agent_url: str, capability: str, parameters: dict, auth: dict) -> dict:"""同步调用特定的智能体能力"""headers = self.build_auth_headers(auth)request_payload = {"capability": capability,"parameters": parameters,"request_id": str(uuid.uuid4()),"timestamp": datetime.utcnow().isoformat(),"response_format": "json"}async with httpx.AsyncClient() as client:response = await client.post(f"{agent_url}/api/v1/capabilities/{capability}",json=request_payload,headers=headers,timeout=30.0)if response.status_code == 200:return response.json()else:raise Exception(f"智能体调用失败: {response.status_code} - {response.text}")def build_auth_headers(self, auth: dict) -> dict:"""构建认证头"""headers = {"Content-Type": "application/json"}if auth['method'] == 'oauth2':headers['Authorization'] = f"{auth['token_type']} {auth['access_token']}"elif auth['method'] == 'api_key':headers[auth['header_name']] = auth['api_key']elif auth['method'] == 'jwt':headers['Authorization'] = f"Bearer {auth['token']}"return headers
3.2 基于任务的异步通信
class A2AAsyncClient:"""具有任务管理的异步 A2A 通信"""async def submit_task(self, agent_url: str, task: A2ATask, auth: dict) -> str:"""提交任务以进行异步执行"""headers = self.build_auth_headers(auth)task_payload = {"task_id": task.id,"capability": task.capability,"parameters": task.parameters,"priority": task.priority.value,"callback_url": f"https://your-agent.com/callbacks/{task.id}","metadata": task.metadata}async with httpx.AsyncClient() as client:response = await client.post(f"{agent_url}/api/v1/tasks",json=task_payload,headers=headers)if response.status_code == 202: # 已接受 (Accepted)result = response.json()return result['task_id']else:raise Exception(f"任务提交失败: {response.status_code}")async def get_task_status(self, agent_url: str, task_id: str, auth: dict) -> dict:"""获取已提交任务的状态"""headers = self.build_auth_headers(auth)async with httpx.AsyncClient() as client:response = await client.get(f"{agent_url}/api/v1/tasks/{task_id}",headers=headers)return response.json()async def wait_for_task_completion(self, agent_url: str, task_id: str, auth: dict, timeout: int = 300) -> dict:"""通过轮询等待任务完成"""start_time = datetime.utcnow()while (datetime.utcnow() - start_time).seconds < timeout:status = await self.get_task_status(agent_url, task_id, auth)if status['status'] in ['completed', 'failed', 'cancelled']:return statusawait asyncio.sleep(5) # 每 5 秒轮询一次raise TimeoutError(f"任务 {task_id} 在 {timeout} 秒内未完成")
3.3 流式通信
class A2AStreamingClient:"""用于实时数据的流式 A2A 通信"""async def stream_capability(self, agent_url: str, capability: str, parameters: dict, auth: dict):"""从智能体能力流式传输结果"""headers = self.build_auth_headers(auth)headers['Accept'] = 'text/event-stream'request_payload = {"capability": capability,"parameters": parameters,"stream": True}async with httpx.AsyncClient() as client:async with client.stream('POST',f"{agent_url}/api/v1/capabilities/{capability}/stream",json=request_payload,headers=headers) as response:async for chunk in response.aiter_text():if chunk.startswith('data: '):data = chunk[6:] # 移除 'data: ' 前缀if data.strip():yield json.loads(data)
4. A2A 网络拓扑
4.1 中心辐射型(协调器模式)
class A2AOrchestrator:"""管理多个专业智能体的中央协调器"""def __init__(self):self.registered_agents = {}self.task_manager = A2ATaskManager()self.security_manager = A2ASecurityManager()async def register_agent(self, agent_url: str, credentials: dict):"""在网络中注册一个新智能体"""# 发现智能体能力agent_card = await self.discover_agent(agent_url)# 与智能体认证auth = await self.security_manager.authenticate_agent(agent_card, credentials)# 存储智能体信息self.registered_agents[agent_card['name']] = {'url': agent_url,'card': agent_card,'auth': auth,'last_seen': datetime.utcnow()}print(f"已注册智能体: {agent_card['name']}")async def execute_workflow(self, workflow_definition: List[Dict]) -> Dict[str, Any]:"""执行多智能体工作流"""# 创建工作流任务tasks = self.task_manager.create_workflow(workflow_definition)# 按照依赖关系执行任务results = {}completed_tasks = set()while len(completed_tasks) < len(tasks):for task in tasks:if task.id in completed_tasks:continue# 检查依赖关系是否满足if all(dep_id in completed_tasks for dep_id in task.dependencies):# 执行任务agent_info = self.registered_agents[task.agent_id]try:result = await self.execute_task(task, agent_info)results[task.id] = resultcompleted_tasks.add(task.id)task.status = TaskStatus.COMPLETEDtask.completed_at = datetime.utcnow()except Exception as e:task.status = TaskStatus.FAILEDtask.error = str(e)results[task.id] = {"error": str(e)}completed_tasks.add(task.id) # 即使失败也标记为已完成return resultsasync def execute_task(self, task: A2ATask, agent_info: dict) -> dict:"""在智能体上执行单个任务"""client = A2ASyncClient()result = await client.call_agent_capability(agent_url=agent_info['url'],capability=task.capability,parameters=task.parameters,auth=agent_info['auth'])return result
4.2 点对点(网状网络)
class A2APeerNetwork:"""智能体直接通信的点对点 A2A 网络"""def __init__(self, agent_id: str):self.agent_id = agent_idself.peer_registry = {}self.message_router = A2AMessageRouter()async def discover_peers(self, discovery_service_url: str):"""发现网络中的其他智能体"""async with httpx.AsyncClient() as client:response = await client.get(f"{discovery_service_url}/agents")agents = response.json()for agent in agents:if agent['id'] != self.agent_id:self.peer_registry[agent['id']] = {'url': agent['url'],'capabilities': agent['capabilities'],'last_seen': datetime.utcnow()}async def broadcast_message(self, message: dict, exclude_agents: List[str] = None):"""向所有对等点广播消息"""exclude_agents = exclude_agents or []for agent_id, peer_info in self.peer_registry.items():if agent_id not in exclude_agents:try:await self.send_message_to_peer(agent_id, message)except Exception as e:print(f"向 {agent_id} 发送消息失败: {e}")async def send_message_to_peer(self, peer_id: str, message: dict):"""向特定对等点发送消息"""peer_info = self.peer_registry.get(peer_id)if not peer_info:raise Exception(f"在注册表中未找到对等点 {peer_id}")message_payload = {"from_agent": self.agent_id,"to_agent": peer_id,"message": message,"timestamp": datetime.utcnow().isoformat(),"message_id": str(uuid.uuid4())}async with httpx.AsyncClient() as client:response = await client.post(f"{peer_info['url']}/api/v1/messages",json=message_payload)return response.json()class A2AMessageRouter:"""在对等网络中的智能体之间路由消息"""def __init__(self):self.routing_table = {}self.message_handlers = {}def register_handler(self, message_type: str, handler_func):"""为特定消息类型注册处理程序"""self.message_handlers[message_type] = handler_funcasync def route_message(self, message: dict) -> dict:"""将传入消息路由到相应的处理程序"""message_type = message.get('type', 'unknown')if message_type in self.message_handlers:handler = self.message_handlers[message_type]return await handler(message)else:return {"error": f"没有针对消息类型 {message_type} 的处理程序"}
5. A2A 生产部署
5.1 Kubernetes 部署
# a2a-agent-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata:name: financial-analysis-agent # 金融分析智能体labels:app: financial-analysis-agentprotocol: a2a spec:replicas: 3 # 副本数selector:matchLabels:app: financial-analysis-agenttemplate:metadata:labels:app: financial-analysis-agentannotations: # 注解a2a.protocol/version: "1.0"a2a.agent/capabilities: "financial_analysis,report_generation" # 能力spec:containers:- name: agentimage: your-registry/financial-analysis-agent:v1.2.0 # 镜像ports:- containerPort: 8080name: http- containerPort: 8443name: httpsenv: # 环境变量- name: A2A_AGENT_IDvalue: "financial-analysis-agent"- name: A2A_PROTOCOL_VERSIONvalue: "1.0"- name: OAUTH2_CLIENT_IDvalueFrom:secretKeyRef:name: a2a-auth-secrets # 认证密钥key: oauth2-client-id- name: OAUTH2_CLIENT_SECRETvalueFrom:secretKeyRef:name: a2a-auth-secretskey: oauth2-client-secretvolumeMounts: # 卷挂载- name: tls-certs # TLS 证书mountPath: /etc/tlsreadOnly: true- name: agent-config # 智能体配置mountPath: /etc/agentreadOnly: truelivenessProbe: # 存活探针httpGet:path: /healthport: 8080initialDelaySeconds: 30periodSeconds: 10readinessProbe: # 就绪探针httpGet:path: /.well-known/agent.jsonport: 8080initialDelaySeconds: 5periodSeconds: 5volumes:- name: tls-certssecret:secretName: a2a-tls-certs- name: agent-configconfigMap:name: agent-config --- apiVersion: v1 kind: Service metadata:name: financial-analysis-agent-serviceannotations:a2a.discovery/register: "true" # 发现服务注册a2a.agent/card-url: "https://financial-analysis-agent/.well-known/agent.json" # 卡片URL spec:selector:app: financial-analysis-agentports:- name: httpport: 80targetPort: 8080- name: httpsport: 443targetPort: 8443type: LoadBalancer # 负载均衡器类型 --- apiVersion: v1 kind: ConfigMap # 配置映射 metadata:name: agent-config data:agent-card.json: |{"name": "Financial Analysis Agent","version": "1.2.0","description": "Specialized in financial data analysis and reporting","protocol_version": "A2A/1.0","capabilities": [{"name": "analyze_financial_data","description": "Analyze financial datasets and generate insights"}]}
5.2 服务网格集成
# istio-a2a-config.yaml apiVersion: networking.istio.io/v1beta1 kind: VirtualService # 虚拟服务 metadata:name: a2a-agent-routing spec:hosts:- financial-analysis-agenthttp:- match:- uri:prefix: "/.well-known/agent.json" # 智能体卡片路径route:- destination:host: financial-analysis-agentport:number: 80headers:response:add:a2a-protocol-version: "1.0"cache-control: "public, max-age=300" # 缓存控制- match:- uri:prefix: "/api/v1/" # API 路径route:- destination:host: financial-analysis-agentport:number: 80timeout: 30s # 超时retries: # 重试attempts: 3perTryTimeout: 10s --- apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy # 授权策略 metadata:name: a2a-agent-authz spec:selector:matchLabels:app: financial-analysis-agentrules:- from:- source:principals: ["cluster.local/ns/a2a-system/sa/agent-caller"] # 调用者身份- to:- operation:methods: ["GET"] # GET 方法paths: ["/.well-known/agent.json", "/health"] # 允许的路径- to:- operation:methods: ["POST"] # POST 方法paths: ["/api/v1/*"] # API 路径when:- key: request.headers[authorization] # 授权头values: ["Bearer *"] # Bearer Token
这个全面的 A2A 方案为构建可扩展、安全且可互操作的多智能体系统提供了坚实的基础。该协议的灵活性允许各种部署模式,同时在不同智能体实现之间保持标准化。
资料来源
Open Protocols for Agent Interoperability Part 4: Inter-Agent Communication on A2A | AWS Open Source Blog (关于 A2A 智能体间通信的开放协议第 4 部分 | AWS 开源博客)
Shaping the future of telco operations with an agentic AI collaboration approach | AWS for Industries (通过智能体 AI 协作方法塑造电信运营的未来 | AWS 行业应用)
Agent-to-agent protocols - AWS Prescriptive Guidance (智能体到智能体协议 - AWS 最佳实践指南)
Open Protocols for Agent Interoperability Part 1: Inter-Agent Communication on MCP | AWS Open Source Blog (关于 MCP 智能体间通信的开放协议第 1 部分 | AWS 开源博客)
Agentic protocols - AWS Prescriptive Guidance (智能体协议 - AWS 最佳实践指南)