A2A + MCP 的python实现的最小可运行骨架
A2A + MCP Python Skeleton
目的:演示 A2A(代理↔代理) 作为编排骨干、MCP(代理↔工具/数据) 作为工具接入的最小可运行骨架。包含一个 Router、三个子 Agent(ASR / Legal / TTS),以及一个 Mock MCP Server(JSON‑RPC 风格),全链路支持 SSE 流式。
目录结构
.
├─ requirements.txt
├─ .env.sample
├─ src/
│ ├─ common/
│ │ ├─ a2a.py
│ │ └─ mcp.py
│ ├─ router.py
│ ├─ agents/
│ │ ├─ asr_agent.py
│ │ ├─ legal_agent.py
│ │ └─ tts_agent.py
│ └─ tools/
│ └─ mock_mcp_server.py
requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.0
httpx==0.27.2
pydantic==2.8.2
python-dotenv==1.0.1
.env.sample
# Router 将任务路由到各个 Agent 的地址
ASR_AGENT_URL=http://127.0.0.1:8011
LEGAL_AGENT_URL=http://127.0.0.1:8012
TTS_AGENT_URL=http://127.0.0.1:8013# MCP Server 地址(示例使用 mock)
MCP_ENDPOINT=http://127.0.0.1:8099/mcp
运行前复制为
.env
并按需修改。
src/common/a2a.py
from __future__ import annotations
import json
import uuid
from typing import Any, AsyncIterator, Dictfrom pydantic import BaseModel, FieldA2A_MIME_SSE = "text/event-stream"# === A2A 任务与事件 ===
class A2ATask(BaseModel):id: str = Field(default_factory=lambda: str(uuid.uuid4()))type: str # 例如: "asr.transcribe", "legal.analyze", "tts.speak"payload: Dict[str, Any] = {}stream: bool = True # 是否请求流式class A2AEvent(BaseModel):task_id: strevent: str # 例如: "progress", "delta", "final", "error"data: Dict[str, Any] = {}# === SSE 编解码 ===def sse_encode(event: A2AEvent) -> bytes:"""将事件编码为 SSE 文本块。"""line = "data: " + event.model_dump_json() + "\n\n"return line.encode("utf-8")async def sse_stream_generator(generator: AsyncIterator[A2AEvent]):async for ev in generator:yield sse_encode(ev)# === 工具函数 ===
class A2AError(RuntimeError):pass
src/common/mcp.py
from __future__ import annotations
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optionalimport httpx
from pydantic import BaseModelclass MCPError(RuntimeError):passclass MCPClient:"""最小 MCP JSON-RPC 客户端(HTTP 版)。- list_tools(): 调用 JSON-RPC method="tools/list_tools"- call(tool, args): 调用 JSON-RPC method="tools/call"注:真实 MCP 可用 stdio / SSE 等传输,这里给出 HTTP JSON‑RPC 骨架,便于快速落地与替换。"""def __init__(self, endpoint: str, timeout: float = 30.0):self.endpoint = endpoint.rstrip("/")self.timeout = timeoutself._client = httpx.AsyncClient(timeout=timeout)async def _rpc(self, method: str, params: Dict[str, Any]) -> Any:req = {"jsonrpc": "2.0","id": str(uuid.uuid4()),"method": method,"params": params,}r = await self._client.post(self.endpoint, json=req)r.raise_for_status()data = r.json()if "error" in data:raise MCPError(str(data["error"]))return data.get("result")async def list_tools(self) -> List[Dict[str, Any]]:return await self._rpc("tools/list_tools", {})async def call(self, tool: str, args: Dict[str, Any]) -> Dict[str, Any]:return await self._rpc("tools/call", {"tool": tool, "args": args})async def aclose(self):await self._client.aclose()
src/router.py
from __future__ import annotations
import os
from typing import AsyncIteratorimport httpx
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_encodeload_dotenv()ASR_URL = os.getenv("ASR_AGENT_URL", "http://127.0.0.1:8011")
LEGAL_URL = os.getenv("LEGAL_AGENT_URL", "http://127.0.0.1:8012")
TTS_URL = os.getenv("TTS_AGENT_URL", "http://127.0.0.1:8013")ROUTE_TABLE = {"asr.": ASR_URL,"legal.": LEGAL_URL,"tts.": TTS_URL,
}app = FastAPI(title="A2A Router")def _pick_agent(task_type: str) -> str:for prefix, url in ROUTE_TABLE.items():if task_type.startswith(prefix):return urlraise HTTPException(status_code=400, detail=f"No agent for task type: {task_type}")@app.post("/a2a/task")
async def route_task(task: A2ATask):agent_base = _pick_agent(task.type)agent_endpoint = f"{agent_base}/a2a/task"async with httpx.AsyncClient(timeout=None) as client:if task.stream:# 以 SSE 方式转发,并将下游的 SSE 原样转发给调用方resp = await client.post(agent_endpoint, json=task.model_dump(), headers={"accept": "text/event-stream"})if resp.status_code != 200:raise HTTPException(status_code=resp.status_code, detail=resp.text)async def forward() -> AsyncIterator[bytes]:async for chunk in resp.aiter_bytes():# 直接转发下游 SSE 字节流yield chunkreturn StreamingResponse(forward(), media_type="text/event-stream")else:resp = await client.post(agent_endpoint, json=task.model_dump())return JSONResponse(status_code=resp.status_code, content=resp.json())
src/agents/asr_agent.py
from __future__ import annotations
import os
import asyncio
from typing import AsyncIteratorfrom dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClientload_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")app = FastAPI(title="ASR Agent")async def _asr_stream(mcp: MCPClient, audio_url: str, task_id: str) -> AsyncIterator[A2AEvent]:# 这里演示“边调用工具边产出增量”的流程。真实情况可采用 chunk 推理。yield A2AEvent(task_id=task_id, event="progress", data={"msg": "asr_started"})# 调用 MCP 工具(mock)result = await mcp.call("asr.transcribe", {"audio_url": audio_url})text = result.get("text", "")# 模拟分词增量for token in text.split():await asyncio.sleep(0.05)yield A2AEvent(task_id=task_id, event="delta", data={"token": token})yield A2AEvent(task_id=task_id, event="final", data={"text": text})@app.post("/a2a/task")
async def handle(task: A2ATask):if task.type != "asr.transcribe":raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")audio_url = task.payload.get("audio_url")if not audio_url:raise HTTPException(status_code=400, detail="payload.audio_url required")mcp = MCPClient(MCP_ENDPOINT)if task.stream:gen = _asr_stream(mcp, audio_url, task.id)return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")else:# 非流式:一次性返回result = await mcp.call("asr.transcribe", {"audio_url": audio_url})return JSONResponse(result)
src/agents/legal_agent.py
from __future__ annotations
import os
import asyncio
from typing import AsyncIteratorfrom dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClientload_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")app = FastAPI(title="Legal Expert Agent")async def _legal_stream(mcp: MCPClient, query: str, task_id: str) -> AsyncIterator[A2AEvent]:yield A2AEvent(task_id=task_id, event="progress", data={"msg": "legal_started"})# 1) RAG 检索(示例:调用 MCP 的 kb.search)kb = await mcp.call("kb.search", {"query": query, "top_k": 3})yield A2AEvent(task_id=task_id, event="progress", data={"kb_hits": kb.get("hits", [])})# 2) LLM 归纳(示例:调用 MCP 的 llm.complete)prompt = f"根据以下材料做初步法律分析:\n{kb.get('context', '')}\n---\n问题:{query}\n"llm = await mcp.call("llm.complete", {"prompt": prompt, "temperature": 0.2})# 模拟分段流出text = llm.get("text", "")for chunk in [text[i:i+60] for i in range(0, len(text), 60)]:await asyncio.sleep(0.05)yield A2AEvent(task_id=task_id, event="delta", data={"text": chunk})yield A2AEvent(task_id=task_id, event="final", data={"text": text})@app.post("/a2a/task")
async def handle(task: A2ATask):if task.type != "legal.analyze":raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")query = task.payload.get("query")if not query:raise HTTPException(status_code=400, detail="payload.query required")mcp = MCPClient(MCP_ENDPOINT)if task.stream:gen = _legal_stream(mcp, query, task.id)return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")else:kb = await mcp.call("kb.search", {"query": query, "top_k": 3})llm = await mcp.call("llm.complete", {"prompt": str(kb), "temperature": 0.2})return JSONResponse(llm)
注意:
from __future__ annotations
是 Python 3.12 写法,若报错改为from __future__ import annotations
。
src/agents/tts_agent.py
from __future__ import annotations
import os
import asyncio
from typing import AsyncIteratorfrom dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponsefrom common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClientload_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")app = FastAPI(title="TTS Agent")async def _tts_stream(mcp: MCPClient, text: str, task_id: str) -> AsyncIterator[A2AEvent]:yield A2AEvent(task_id=task_id, event="progress", data={"msg": "tts_started"})# 调用 MCP 的 tts.speak(mock),返回“伪音频块”数组result = await mcp.call("tts.speak", {"text": text})chunks = result.get("chunks", [])for i, ch in enumerate(chunks):await asyncio.sleep(0.05)yield A2AEvent(task_id=task_id, event="delta", data={"seq": i, "audio_chunk": ch})yield A2AEvent(task_id=task_id, event="final", data={"ok": True})@app.post("/a2a/task")
async def handle(task: A2ATask):if task.type != "tts.speak":raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")text = task.payload.get("text")if not text:raise HTTPException(status_code=400, detail="payload.text required")mcp = MCPClient(MCP_ENDPOINT)if task.stream:gen = _tts_stream(mcp, text, task.id)return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")else:result = await mcp.call("tts.speak", {"text": text})return JSONResponse(result)
src/tools/mock_mcp_server.py
from __future__ import annotations
import random
from typing import Any, Dictfrom fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI(title="Mock MCP Server (JSON-RPC over HTTP)")TOOLS = [{"name": "asr.transcribe", "args": {"audio_url": "str"}},{"name": "kb.search", "args": {"query": "str", "top_k": "int"}},{"name": "llm.complete", "args": {"prompt": "str", "temperature": "float"}},{"name": "tts.speak", "args": {"text": "str"}},
]class JSONRPCRequest(BaseModel):jsonrpc: strid: strmethod: strparams: Dict[str, Any] | None = Noneclass JSONRPCResponse(BaseModel):jsonrpc: str = "2.0"id: strresult: Dict[str, Any] | None = Noneerror: Dict[str, Any] | None = None@app.post("/mcp")
async def mcp(req: JSONRPCRequest) -> JSONRPCResponse:try:if req.method == "tools/list_tools":return JSONRPCResponse(id=req.id, result={"tools": TOOLS})if req.method == "tools/call":params = req.params or {}tool = params.get("tool")args = params.get("args", {})if tool == "asr.transcribe":text = f"(mock asr) transcribed from {args.get('audio_url','?')}"return JSONRPCResponse(id=req.id, result={"text": text})if tool == "kb.search":hits = [{"id": f"doc{i}", "score": round(random.random(), 3), "snippet": f"snippet {i}"}for i in range(1, (args.get("top_k", 3) + 1))]context = "\n".join(h["snippet"] for h in hits)return JSONRPCResponse(id=req.id, result={"hits": hits, "context": context})if tool == "llm.complete":prompt = args.get("prompt", "")return JSONRPCResponse(id=req.id, result={"text": f"(mock llm) summary for: {prompt[:60]}..."})if tool == "tts.speak":text = args.get("text", "")chunks = [f"audio-bytes-chunk-{i}" for i in range(5)]return JSONRPCResponse(id=req.id, result={"chunks": chunks, "desc": f"(mock) tts of {text[:20]}..."})return JSONRPCResponse(id=req.id, error={"message": f"unknown tool: {tool}"})return JSONRPCResponse(id=req.id, error={"message": f"unknown method: {req.method}"})except Exception as e:return JSONRPCResponse(id=req.id, error={"message": str(e)})
启动方式
1) 安装依赖
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.sample .env
2) 分别启动 Mock MCP 与各 Agent、Router
# 终端 1:Mock MCP Server
uvicorn src.tools.mock_mcp_server:app --host 0.0.0.0 --port 8099 --reload# 终端 2:ASR Agent
uvicorn src.agents.asr_agent:app --host 0.0.0.0 --port 8011 --reload# 终端 3:Legal Agent
uvicorn src.agents.legal_agent:app --host 0.0.0.0 --port 8012 --reload# 终端 4:TTS Agent
uvicorn src.agents.tts_agent:app --host 0.0.0.0 --port 8013 --reload# 终端 5:Router
uvicorn src.router:app --host 0.0.0.0 --port 8000 --reload
测试(SSE 流式)
例如让 Router 把任务派给 Legal Agent:
curl -N -H 'Accept: text/event-stream' \-H 'Content-Type: application/json' \-d '{"type": "legal.analyze","payload": {"query": "公司违法辞退赔偿怎么计算?"},"stream": true}' \http://127.0.0.1:8000/a2a/task
你会看到 data: {"task_id":..., "event":"progress"...}
、delta
、final
等事件逐步返回。
架构要点回顾
- A2A:
/a2a/task
即代理之间的统一入口;Router 依据task.type
做分发,并透传 SSE 流式结果。 - MCP:各代理内部通过
MCPClient
调用工具(这里用mock_mcp_server
演示),未来可无缝替换为真实 MCP 工具(向量库、Redis 记忆、企业 API、LLM、TTS/ASR 服务等)。 - 可扩展:新增代理=加一个服务并在 Router 的
ROUTE_TABLE
加前缀映射;新增工具=在 MCP 端注册新tool
名并在代理里调用。
下一步可升级点
- 鉴权/租户隔离:在 Router 与 Agent 层统一加 Bearer/JWT;任务元数据里携带 tenant_id / session_id。
- 重试与超时:Router 对下游 Agent 实现熔断/超时、错误回传;Agent 内部对 MCP 工具加重试策略。
- 真正的流式工具:将 MCP 端改为 SSE/分块输出,
MCPClient
增加acall_stream
,实现端到端全链路流式。 - 记忆:通过 MCP 工具挂 Redis:
memory.write/read/append
,在 Router/Agent 层封装对话级短期记忆。 - 发现与注册:为 A2A 增加
/.well-known/agents
,支持 Agent 动态注册/心跳;或引入服务发现组件。 - 观测:统一埋点(OpenTelemetry)、任务状态表(Postgres/ClickHouse)、SSE 日志镜像通道用于回放与质检。