当前位置: 首页 > news >正文

Agent2Agent(谷歌A2A)协议原理讲解

文章目录

  • 核心概念
  • 典型工作流程
  • Agent发现
    • 发现 Agent Card
        • 🌐 开放发现(Open Discovery)
        • 📚 筛选式发现(Curated Discovery,基于注册表)
        • 🔐 私有发现(Private Discovery,基于 API)
      • 保护 Agent Card(Securing Agent Cards)
  • 企业级就绪性 (Enterprise Readiness)
      • 传输层安全(Transport Level Security)
      • 服务器身份验证(Server Identity)
      • 客户端和用户身份(Client and User Identity)
      • 客户端认证(Authenticating Clients)
      • 授权与数据隐私(Authorization and Data Privacy)
      • 可观测性与追踪(Tracing and Observability)
  • 与MCP协议的互补关系
  • 案例实战:基于A2A协议实现MindsDB企业数据agent
  • REF

谷歌在25年4月初发布了A2A协议,作为MCP协议的补充。Agent2Agent协议致力于促进独立agent间的通信,帮助不同生态系统的agent沟通和协作。
在这里插入图片描述

核心概念

  • Agent Card:一个公共元数据文件(通常位于 /.well-known/agent.json),用于描述代理的能力、技能、端点 URL 以及认证要求。客户端通过它来发现Agent。
    Agent Card通常包括agent的技能说明、供应商、身份验证、链接、名称等:
// An AgentCard conveys key information:
// - Overall details (version, name, description, uses)
// - Skills: A set of capabilities the agent can perform
// - Default modalities/content types supported by the agent.
// - Authentication requirements
interface AgentCard {// Human readable name of the agent.// (e.g. "Recipe Agent")name: string;// A human-readable description of the agent. Used to assist users and// other agents in understanding what the agent can do.// (e.g. "Agent that helps users with recipes and cooking.")description: string;// A URL to the address the agent is hosted at.url: string;// The service provider of the agentprovider?: {organization: string;url: string;};// The version of the agent - format is up to the provider. (e.g. "1.0.0")version: string;// A URL to documentation for the agent.documentationUrl?: string;// Optional capabilities supported by the agent.capabilities: {streaming?: boolean; // true if the agent supports SSEpushNotifications?: boolean; // true if the agent can notify updates to clientstateTransitionHistory?: boolean; //true if the agent exposes status change history for tasks};// Authentication requirements for the agent.// Intended to match OpenAPI authentication structure.authentication: {schemes: string[]; // e.g. Basic, Bearercredentials?: string; //credentials a client should use for private cards};// The set of interaction modes that the agent// supports across all skills. This can be overridden per-skill.defaultInputModes: string[]; // supported mime types for inputdefaultOutputModes: string[]; // supported mime types for output// Skills are a unit of capability that an agent can perform.skills: {id: string; // unique identifier for the agent's skillname: string; //human readable name of the skill// description of the skill - will be used by the client or a human// as a hint to understand what the skill does.description: string;// Set of tag words describing classes of capabilities for this specific// skill (e.g. "cooking", "customer support", "billing")tags: string[];// The set of example scenarios that the skill can perform.// Will be used by the client as a hint to understand how the skill can be// used. (e.g. "I need a recipe for bread")examples?: string[]; // example prompts for tasks// The set of interaction modes that the skill supports// (if different than the default)inputModes?: string[]; // supported mime types for inputoutputModes?: string[]; // supported mime types for output}[];
}
  • A2A Server:一个实现了 A2A 协议方法(在 JSON 规范中定义)的 HTTP 端点,由Agent公开,用于接收请求并管理任务执行。
  • A2A Client:消费 A2A 服务的应用程序或其他代理。它向 A2A Server 的 URL 发送请求(如 tasks/send)。
  • Task(任务):工作的核心单元。客户端通过发送消息(tasks/sendtasks/sendSubscribe)来发起任务。任务拥有唯一 ID,并依次经历不同状态(submittedworkinginput-requiredcompletedfailedcanceled)。
  • Message(消息):表示客户端(role: "user")与Agent(role: "agent")之间的交互轮次。消息由多个 Part 组成。
  • Part(消息部分):消息或工件中的基本内容单元,可为 TextPartFilePart(内联字节或 URI)或 DataPart(结构化 JSON,例如表单)。
  • Artifact(工件):代理在任务执行过程中生成的输出,例如生成的文件或最终结构化数据。工件同样由多个 Part 组成。
  • Streaming(流式传输):对于长时间运行的任务,支持流式功能的服务器可使用 tasks/sendSubscribe。客户端会接收包含 TaskStatusUpdateEventTaskArtifactUpdateEvent 的 Server-Sent Events(SSE),以获取实时进度。
  • Push Notifications(推送通知):支持推送通知的服务器可以主动将任务更新发送到客户端提供的 webhook URL,该 URL 可通过 tasks/pushNotification/set 进行配置。

典型工作流程

  1. 发现:客户端从服务器的 well‑known URL(通常是 https://DOMAIN/.well-known/agent.json)获取 Agent Card。
  2. 发起:客户端发送包含初始用户消息和唯一任务 ID 的 tasks/sendtasks/sendSubscribe 请求。
  3. 处理
    • 流式:服务器在任务进展过程中,通过 Server‑Sent Events(SSE)发送状态更新或 Artifact 更新事件。
    • 非流式:服务器同步处理任务,并在 HTTP 响应中返回最终的 Task 对象。
  4. 交互(可选):如果任务进入 input-required 状态,客户端可使用相同的任务 ID,通过 tasks/sendtasks/sendSubscribe 发送后续消息以提供所需输入。
  5. 完成:任务最终达到终止状态之一:completedfailedcanceled

Agent发现

发现 Agent Card

🌐 开放发现(Open Discovery)

我们建议企业将 Agent Card 托管在一个约定路径上,例如:
https://DOMAIN/.well-known/agent.json
客户端通过 DNS 解析域名,向该路径发送 GET 请求,即可获取 Agent Card。
这允许网页爬虫和应用程序轻松发现已知或配置的代理,实质上将“发现代理”简化为“找到域名”。

📚 筛选式发现(Curated Discovery,基于注册表)

企业可能通过目录界面提供代理注册表,供公司或团队内部使用。这种方式便于由管理员进行管理与筛选。
官方正在考虑将注册表支持加入协议标准,如果你对此有看法或需求,欢迎反馈。

🔐 私有发现(Private Discovery,基于 API)

某些代理可能托管于私有“代理商店”或通过自定义 API 交换 Agent Card。
A2A 协议目前不打算将私有发现 API 纳入标准,但欢迎社区反馈你的看法和使用场景。


保护 Agent Card(Securing Agent Cards)

Agent Card 可能包含敏感信息,实现方可根据需要为其设置身份验证和访问控制。
例如,即使是放在 .well-known 路径上的 Agent Card,也可以通过 mTLS(双向 TLS) 进行限制,仅允许特定客户端访问。注册表和私有 API 更应强制认证,并根据不同身份返回不同内容。

⚠️ 注意:Agent Card 中可能会包含认证信息(如 API Key),强烈建议此类内容不得在未验证身份的情况下公开访问。


企业级就绪性 (Enterprise Readiness)

A2A 协议旨在支持企业级需求,与现有企业基础设施无缝集成。它将 Agent 看作标准的基于 HTTP 的企业应用,因此依赖现有的认证、安全、隐私、追踪与监控机制。

传输层安全(Transport Level Security)

A2A 基于 HTTP,生产环境必须使用 HTTPS,并启用现代 TLS 加密套件。

服务器身份验证(Server Identity)

服务器应使用由受信任证书颁发机构签发的数字证书,客户端需验证证书以确认身份。

客户端和用户身份(Client and User Identity)

A2A 协议本身不定义客户端或用户标识,而是通过 Agent Card 指明所需的认证方式。客户端需通过外部认证机制(如 OAuth)获取凭证,并通过 HTTP 头传输。

多身份联合登录仍是开放议题,如某任务需同时访问 A 系统和 B 系统,用户需同时提供两个系统的认证凭据。

客户端认证(Authenticating Clients)

A2A 服务器应在 Agent Card 中公布其支持的认证协议,如 API Key、OAuth、OIDC 等,并要求客户端每次请求都进行认证。认证失败将返回标准 HTTP 401 或 403 错误码。

授权与数据隐私(Authorization and Data Privacy)

A2A 建议基于 技能(skills)工具(tools) 两个维度进行授权管理:

  • 技能授权:如通过 OAuth 范围控制访问特定技能;
  • 工具授权:如通过 API 管理工具限制访问敏感操作或数据。

可观测性与追踪(Tracing and Observability)

A2A 基于 HTTP,因此可直接使用企业现有的日志、追踪与监控工具(如 OpenTracing)。客户端和服务器应插入必要的追踪头,并记录日志与事件。


与MCP协议的互补关系

构建agent应用需要同时使用A2A和MCP协议。agent通过MCP连接工具,通过A2A协议连接其他agent。

  • MCP(Model Context Protocol) 是正在兴起的标准,用于连接大模型与工具、数据等资源,正在统一“函数调用”接口,降低了工具接入的复杂度,已被多个框架和平台采纳。

  • A2A 则聚焦另一个层面 —— 它是一个应用层协议,允许agent之间以“agent”的方式进行自然交流(而非简单工具调用)。我们希望 A2A 能与 MCP 形成互补,共同推动智能代理生态的发展。

在这里插入图片描述
例如一个汽车修理店,员工使用各种专用工具(如升降机、电压表、扳手)来诊断和修理汽车问题。他们经常处理以前没见过的故障,过程中可能还需与客户交谈、查资料、与零件供应商合作。
现在把这些员工抽象为Agent:

MCP协议用来连接agent与结构化工具(例如:“升高平台2米”、“扳手右转4毫米”)。
A2A协议用来让最终用户或其他代理与员工交流(例如:“我的车有异响”)。A2A 支持多轮对话和任务计划的演进(如:“拍张左车轮的照片”、“我看到有液体泄漏,这种情况多久了?”),也让修理工可以与其他代理(如零件供应商)协作。

案例实战:基于A2A协议实现MindsDB企业数据agent

通过A2A协议实现自然语言查询和跨多个企业数据源的数据分析,基座模型使用Gemini 2.5 Flash 模型,MindsDB提供文本到SQL的能力,A2A 协议:作为通信协议,实现智能体之间的协作和任务分发。
具体流程为:
自然语言输入:用户通过智能体输入自然语言查询,例如:“查询过去三个月的销售数据”。
任务解析与分发:智能体将用户的查询解析为任务,并通过 A2A 协议将任务分发给适当的执行代理。
数据查询与分析:执行代理接收到任务后,使用 MindsDB 的文本到 SQL(Text-to-SQL)技能,将自然语言查询转换为 SQL 查询,并在多个数据源上执行这些查询。
结果整合与反馈:执行代理将查询结果整合,并通过 A2A 协议将结果反馈给用户。
在这里插入图片描述
Agent的实现:

import json
import aiohttp
import os
from typing import Any, AsyncIterable, Dict, Optional
from dotenv import load_dotenv# 加载 .env 文件中的环境变量
load_dotenv()class MindsDBAgent:"""An agent that data requests from any database, datawarehouse, app."""# 支持的内容类型列表,仅文本SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]# MindsDB API 的基础 URLAPI_URL = "https://ai.staging.mindsdb.com/chat/completions"def __init__(self):# 从环境变量获取 MindsDB API Keyself.api_key = os.getenv('MINDS_API_KEY')if not self.api_key:raise ValueError("MINDS_API_KEY environment variable is not set")# 从环境变量获取默认的 Mind 名称self.model = os.getenv('MIND_NAME')if not self.model:# 如果未设置,则使用示例中默认的 Mindself.model = "Sales_Data_Expert_Demo_Mind"# HTTP 请求头,包含认证信息self.headers = {"Content-Type": "application/json","Authorization": f"Bearer {self.api_key}"}def invoke(self, query, session_id) -> str:# 同步调用接口占位,提示使用流式获取真实结果return {"content": "Use stream method to get the results!"}async def stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:"""流式方法:针对长时任务返回 SSE 流,每次 yield 一个消息段或完成事件"""# 构造请求的 JSON 负载,包含 system 和 user 消息,以及 stream 标志payload = {"model": self.model,"messages": [{"role": "system", "content": "You are a helpful assistant."},{"role": "user", "content": query}],"stream": True}# 使用 aiohttp 发送异步 HTTP POST 请求async with aiohttp.ClientSession() as session:async with session.post(self.API_URL, headers=self.headers, json=payload) as response:# 解析响应内容为字节流,按行处理 SSEasync for line in response.content:if not line:continue  # 跳过空行line = line.decode('utf-8').strip()if not line.startswith("data: "):continue  # 只处理带 "data: " 前缀的行# 去除前缀并尝试解析 JSONjson_str = line[6:]try:data = json.loads(json_str)except json.JSONDecodeError:continue  # 跳过无法解析的行# 处理 choices 数组,通常只有一个元素if "choices" in data:choice = data["choices"][0]delta = choice.get("delta", {})content = delta.get("content")role = delta.get("role", "")# 构造基本的消息部分列表parts = [{"type": "text", "text": content}]# 判断是否已完成if choice.get("finish_reason") == "stop":yield {"is_task_complete": True,"parts": parts}continue# 默认子类型为推理subtype = "analysis"tool_calls = delta.get("tool_calls", [])# 如果是助手角色发言,转换为 ack 类型if role == "assistant":subtype = "acknowledge"# 处理工具调用示例(例如 SQL 查询)if tool_calls:tool_call = tool_calls[0]function = tool_call.get("function", {})function_name = function.get("name")arguments = function.get("arguments", {})if function_name == "sql_db_query":subtype = "execute_query"# 将查询参数也作为一个 message partparts.append({"type": "text","text": str(arguments)})# 返回中间事件,is_task_complete 为 Falseyield {"is_task_complete": False,"parts": parts,"metadata": {"type": "reasoning", "subtype": subtype}}continue

task manager的实现:

import json
from typing import AsyncIterable, Union
import logging# 引入 A2A 协议相关类型
from common.types import (SendTaskRequest,TaskSendParams,Message,TaskStatus,Artifact,TaskStatusUpdateEvent,TaskArtifactUpdateEvent,TextPart,TaskState,Task,SendTaskResponse,InternalError,JSONRPCResponse,SendTaskStreamingRequest,SendTaskStreamingResponse,
)# 引入任务管理基类(内存实现)
from common.server.task_manager import InMemoryTaskManager
# 引入具体的 MindsDBAgent 实现
from agent import MindsDBAgent
# 通用工具函数,用于检查内容类型兼容性等
import common.server.utils as utilslogger = logging.getLogger(__name__)class AgentTaskManager(InMemoryTaskManager):"""A2A 协议任务管理器:继承自 InMemoryTaskManager,处理 tasks/send 与 tasks/sendSubscribe。调用 MindsDBAgent 执行实际任务,并将结果按 A2A 事件流式推送。"""def __init__(self, agent: MindsDBAgent):super().__init__()# 注入具体的代理实例self.agent = agentasync def _stream_generator(self, request: SendTaskStreamingRequest) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:"""实现流式订阅:接收 A2A 客户端的 tasks/sendSubscribe 请求并将代理 stream() 方法的输出转换为 A2A 事件流。"""# 从请求中获取 Task 参数task_send_params: TaskSendParams = request.paramsquery = self._get_user_query(task_send_params)try:# 调用代理的 SSE 流方法async for item in self.agent.stream(query, task_send_params.sessionId):is_complete = item["is_task_complete"]parts = item["parts"]if not is_complete:# 未完成:状态为 WORKINGtask_state = TaskState.WORKINGmetadata = item.get("metadata")# 构造 A2A 消息message = Message(role="agent", parts=parts, metadata=metadata)task_status = TaskStatus(state=task_state, message=message)# 更新内存任务状态await self._update_store(task_send_params.id, task_status, [])# 构造 TaskStatusUpdateEventupdate_event = TaskStatusUpdateEvent(id=task_send_params.id,status=task_status,final=False)# 推送给客户端yield SendTaskStreamingResponse(id=request.id, result=update_event)else:# 完成:状态为 COMPLETEDtask_state = TaskState.COMPLETED# 构造 Artifact(仅一段 parts)artifact = Artifact(parts=parts, index=0, append=False)# 首先发送工件更新事件yield SendTaskStreamingResponse(id=request.id,result=TaskArtifactUpdateEvent(id=task_send_params.id,artifact=artifact))# 更新内存中的任务状态与工件列表task_status = TaskStatus(state=task_state)await self._update_store(task_send_params.id, task_status, [artifact])# 最终发送状态完成事件yield SendTaskStreamingResponse(id=request.id,result=TaskStatusUpdateEvent(id=task_send_params.id,status=TaskStatus(state=task_state),final=True))except Exception as e:# 异常处理:返回 JSON-RPC 错误logger.error(f"Streaming error: {e}")yield JSONRPCResponse(id=request.id,error=InternalError(message="Error streaming response"))def _validate_request(self, request: Union[SendTaskRequest, SendTaskStreamingRequest]) -> Optional[JSONRPCResponse]:"""验证客户端请求的输出模式与代理支持的内容类型是否兼容。不兼容时返回 JSON-RPC 错误响应。"""params: TaskSendParams = request.paramsif not utils.are_modalities_compatible(params.acceptedOutputModes,MindsDBAgent.SUPPORTED_CONTENT_TYPES):logger.warning("Incompatible modes: %s", params.acceptedOutputModes)return utils.new_incompatible_types_error(request.id)return Noneasync def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:"""处理 tasks/send 同步请求,返回最终 Task 或 INPUT_REQUIRED。"""error = self._validate_request(request)if error:return error# 插入或更新任务初始记录await self.upsert_task(request.params)# 调用同步 invoke 实现return await self._invoke(request)async def on_send_task_subscribe(self, request: SendTaskStreamingRequest) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:"""处理 tasks/sendSubscribe 流式请求,返回一个事件流。"""error = self._validate_request(request)if error:return errorawait self.upsert_task(request.params)return self._stream_generator(request)async def _update_store(self, task_id: str, status: TaskStatus, artifacts: list[Artifact]) -> Task:"""更新内存中任务的状态和工件列表,用于后续查询与状态回放。"""async with self.lock:task = self.tasks.get(task_id)if not task:logger.error(f"Task {task_id} not found")raise ValueError(f"Task {task_id} not found")task.status = statusif artifacts:task.artifacts = (task.artifacts or []) + artifactsreturn taskasync def _invoke(self, request: SendTaskRequest) -> SendTaskResponse:"""同步调用代理的 invoke,处理简单或需要额外输入的场景。"""params: TaskSendParams = request.paramsquery = self._get_user_query(params)try:result = self.agent.invoke(query, params.sessionId)except Exception as e:logger.error(f"Invoke error: {e}")raise ValueError(f"Error invoking agent: {e}")# 将返回内容封装为 TextPartparts = [{"type": "text", "text": result}]# 根据返回结果判断是否需要更多输入state = TaskState.INPUT_REQUIRED if "MISSING_INFO:" in result else TaskState.COMPLETED# 更新存储并返回完整 Tasktask = await self._update_store(params.id,TaskStatus(state=state, message=Message(role="agent", parts=parts)),[Artifact(parts=parts)])return SendTaskResponse(id=request.id, result=task)def _get_user_query(self, params: TaskSendParams) -> str:"""从 TaskSendParams.message.parts 中提取用户文本查询,目前仅支持 TextPart。"""part = params.message.parts[0]if not isinstance(part, TextPart):raise ValueError("Only text parts are supported")return part.text

REF

https://developers.googleblog.com/en/a2a-a-new-era-of-agent-interoperability/
https://google.github.io/A2A/#a2a-unlock-collaborative-agent-to-agent-scenarios-with-a-new-open-protocol
https://github.com/google/A2A
https://github.com/google/A2A/blob/main/specification/json/a2a.json
https://github.com/google/A2A/blob/main/samples/python/agents/mindsdb/README.md

相关文章:

  • CatBoost算法原理及Python实现
  • 牛客 Wall Builder II 题解
  • DeepSeek-提示词工程
  • 形式化数学——Lean的介绍与安装
  • Python异步编程入门:从同步到异步的思维转变
  • 链表操作练习
  • 【C++】WSL常用语法
  • 电子商务商家后台运营专员模板
  • Android工厂模式
  • 设一个测试情境,新用户注册后显示的名字不完整,测试思路是怎么样的?
  • 【C/C++】inline关键词
  • 用网页显示工控仪表
  • 精益数据分析(40/126):移动应用商业模式的关键指标与盈利策略
  • 常见小模型的实现原理及使用示例:Android端
  • 黑马点评day02(缓存)
  • 常用CPU、GPU、NPU、DSP、ASIC等芯片区别介绍
  • IL2CPP 技术深度解析
  • 三星SMT贴片机选型与效能提升指南
  • 学习路线(c++)
  • 使用Mathematica绘制Sierpinski地毯
  • 农村青年寻路纪|劳动者的书信⑤
  • 中小企业数字化转型的破局之道何在?
  • 习近平将对俄罗斯进行国事访问并出席纪念苏联伟大卫国战争胜利80周年庆典
  • 短剧迷|《权宠》一出,《名不虚传》
  • 韩国代总统、国务总理韩德洙宣布辞职,将择期宣布参选总统
  • 女冰队长于柏巍,拒绝被年龄定义