当前位置: 首页 > news >正文

LLM之Agent(二十五)| 使用 A2A Agents和 MCP Server构建一个Demo Multi-Agent系统

       学习某事的最好方法是通过动手练习。今天将分享一下使用 A2A Agents与 MCP server集成起来开发一个多智能体 AI 系统。

一、Agentic AI系统架构

Image

       该系统提供了两个交互式应用程序:一个带有 Streamlit UI,另一个通过CLI命令行。两者都可以向主机代理(Host Agent)发送消息。

       主机代理充当业务流程协调程序,它负责调用适当的工具或其他代理,并将生成的结果返回给用户。同时,它足够灵活,可以决定何时不需要额外的工具或代理;如果它可以自行解决任务,它会将结果直接返回给最终用户。

      下面将按照顺序介绍一下每个组件。

二、创建Math MCP Server

       模型上下文协议(MCP)服务器提供了一个服务器端应用程序,支持多个用户访问其工具、工件和其他资源。它目前支持多种传输协议:stdio、SSE 和 streamable-http。stdio 仅允许连接到本地运行的 MCP 服务器。SSE(通过 HTTP 发送的服务器发送事件)仅支持单向流式处理。streamable-http 支持双向流,比 SSE 应用更广泛。

       可以使用不同的编程语言开发 MCP 服务器,只需确保它符合 MCP 规范,以便其他 LLM 代理可以发现和使用其工具。

       在本演示中,我将使用 Python FastMCP 库快速构建 MCP 服务器。math-mcp-server 重新定义了基本的数学运算,以便返回的值同时包括计算结果和更具描述性的表达式。

       您可能想知道:既然 LLM 已经可以执行此类任务,为什么还要费心使用 MCP 服务器呢?请记住,这只是一个演示项目。该示例本身可能看起来微不足道,但在实际场景中,MCP 服务器可以处理更有意义的任务,例如发送电子邮件、将文件保存为 PDF 或执行计算机程序可以执行的几乎任何操作。

        FastMCP 是使用 MCP 的标准框架,它负责复杂的协议细节和服务器管理,使您能够专注于构建有用的 MCP 工具。以下是定义一个简单的演示 MCP 服务器所需的所有代码。

# mcp/math_mcp_server.pyimport logging
import osfrom mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from config import configlogger = logging.getLogger(__name__)
logging.basicConfig(format="[%(levelname)s]: %(message)s", level=logging.INFO)class ArithmaticInput(BaseModel):a: float = Field(..., description="The first number.")    b: float = Field(..., description="The second number.")    class ArithmaticOutput(BaseModel):result: float = Field(..., description="The result of the operation.")          expression: str = Field(..., description="The expression that was evaluated.")mcp = FastMCP("math_server",host ="localhost",               port = config.MATH_MCP_SERVER_PORT,               stateless_http = True)@mcp.tool(name = "add_two_numbers")
def add_numbers(input: ArithmaticInput) -> ArithmaticOutput:"""Use this to add two numbers together.        Args:    input(ArithmaticInput): The input containing two numbers to add.        Returns:    output(ArithmaticOutput): The output containing the result and expression.    """    result = input.a + input.b    expression = f"🏃‍♀️‍➡️🏃‍♀️‍➡️🏃‍♀️‍➡️ {input.a} + {input.b} = {result}"    logger.info(f">>> Tool: 'add' called with numbers '{input.a}' and '{input.b}'")    return ArithmaticOutput(result=result, expression=expression)@mcp.tool(name = "subtract_two_numbers")
def subtract_numbers(input: ArithmaticInput) -> ArithmaticOutput:    """Use this to subtract two numbers.        Args:    input(ArithmaticInput): The input containing two numbers to subtract.    Returns:    output(ArithmaticOutput): The output containing the result and expression.    """    result = input.a - input.b    expression = f"👏👏👏 {input.a} - {input.b} = {result}"    logger.info(f">>> Tool: 'subtract' called with numbers '{input.a}' and '{input.b}'")    return ArithmaticOutput(result=result, expression=expression)@mcp.tool(name = "multiply_two_numbers")
def multiply_numbers(input: ArithmaticInput) -> ArithmaticOutput:... (skipped)@mcp.tool(name = "divide_two_numbers")
def divide_numbers(input: ArithmaticInput) -> ArithmaticOutput:... (skipped)if __name__ == "__main__":logger.info(f"MCP server started on port {os.getenv('PORT', int(config.MATH_MCP_SERVER_PORT))}")    mcp.run(transport="streamable-http")

       在 FastMCP 中,tools可以将常规 Python 函数转换为 LLM可以调用的功能。通过使用 @mcp.tool 装饰器,其他代理可以自动发现 MCP 服务器,而无需显式注册它们。

       请注意,其他代理依赖于每个工具的文档字符串(docstring)来了解其功能。这意味着编写清晰准确的文档字符串对于代理人工智能系统正确识别和使用正确的工具至关重要。

       如果输入和输出是使用 Pydantic BaseModel 定义的(如本例所示),则也应该在文档字符串中描述它们。这确保了由 LLM 提供支持的其他代理可以以正确的格式提供参数。

       代码准备就绪后,运行 Python 文件以启动服务器。如果一切设置正确,您将看到如下所示显示的运行状态。

Image

三、创建PostDesignAgent

       演示代理称为 PostDesignAgent,它的作用是生成帖子的标题、副标题和高亮。这里有多个框架可以构建代理,在此示例中,我将使用 Google Agent Development Kit(google-adk),原因是其灵活的模块化框架使开发和部署 AI 代理变得容易。

       众所周知,代理人工智能系统的核心是LLM——没有它,应用程序就只是传统软件。在 PostDesignAgent 类中,通过指定模型 (gemini-2.5-flash) 以及指导 LLM 的描述和说明来创建一个 LlmAgent 对象。

       尽管 google-adk 针对 Gemini 和更广泛的 Google 生态系统进行了优化,但它也与其他框架和 LLM 兼容。例如,如果您更喜欢 OpenAI、Anthropic 或其他模型,则可以使用 LiteLlm 类包装它们并将它们传递给 LlmAgent。

       除了 LlmAgent 之外,Runner 对象是系统的另一个核心组件。当 LlmAgent 推理、使用工具、更新内存或会话以及生成输出时,它管理复杂的多步骤过程。

       调用 invoke 方法时,Runner 使用 session_id 在内部跟踪不同的用户会话,确保每个客户端的会话、记忆和工件(文件、结构化对象等)保持隔离。

       从代码中我们可以看到,在将 user_id、session_id 和 new_message 传递给 Runner 后,它会发出事件——每个事件都是包含相关信息的 JSON 结构。

       正如前面在 streamable-http 简介中提到的,它支持双向流,允许数据双向流动。invoke 方法中的事件循环还可能产生其他 JSON 消息,这些消息将返回给调用方函数。

       在实践中,代理可以应用其他控制措施来提高可靠性和安全性,例如超时、身份验证或重试。但是,对于此演示,代码仅根据 event.is_final_response 是否为 True 生成信息。然后, 调用方法的调用方可以捕获此信息并执行进一步的处理。​​​​​​​

# agemts/post_dsign_agent/agent.pyimport os 
from utilities.common.file_loader import load_instruction_file
from google.adk.agents import LlmAgent
from google.adk import Runner
from google.adk.artifacts import InMemoryArtifactService
from google.adk.sessions import InMemorySessionService
from google.adk.memory import InMemoryMemoryService
from collections.abc import AsyncIterable
from google.genai import types 
from dotenv import load_dotenvload_dotenv()class PostDesignAgent:def __init__(self):        """         A simple website builder agent which can create a basic website page and is built with Google's Agent Development Kit.        """  file_path= os.path.dirname(__file__)        self.SYSTEM_INSTRUCTION = load_instruction_file(file_path + "/instructions.txt")        self.DESCRIPTION = load_instruction_file(file_path + "/description.txt")        self.agent = self.build_agent()        self.user_id = "post_design_agent"        self.runner = Runner(      app_name=self.agent.name,            agent = self.agent,            artifact_service=InMemoryArtifactService(),            session_service=InMemorySessionService(),            memory_service=InMemoryMemoryService())            def build_agent(self) -> LlmAgent:    return LlmAgent(       name = "PostDesignAgent",            model="gemini-2.5-flash",            instruction=self.SYSTEM_INSTRUCTION,            description=self.DESCRIPTION,        )            async def invoke(self, query: str, session_id: str = None) -> AsyncIterable[dict]:    """         Invoke the agent with the given query and seesion_id and return the response.         Return a stream of updates back to the caller as the agent processes the request.                {       "is_task_complete": bool,            "updates": str,            "content": str        }        """                session  = await self.runner.session_service.get_session(       app_name=self.agent.name,            user_id=self.user_id,            session_id=session_id            )                if not session:       session = await self.runner.session_service.create_session(           app_name=self.agent.name,                user_id=self.user_id,                session_id=session_id            )                        user_content = types.Content(        role="user",            parts = [types.Part.from_text(text=query)]            )                async for event in self.runner.run_async(        user_id=self.user_id,            session_id=session_id,            new_message=user_content        ):        if event.is_final_response:           final_response = ""                if event.content and event.content.parts and event.content.parts[-1].text:               final_response = event.content.parts[-1].text                                    yield {                "is_task_complete": True,                    "content": final_response                 }            else:            yield {               "is_task_complete": False,                    "updates": "The agent is still working on your request..."                }      

agent_executor.py

       PostDesignAgentExecutor 使用 a2a 框架调用 PostDesignAgent 对象并与外部客户端(用户)交互。

       区分 google-adk 和 a2a 框架的作用非常重要。Google-adk(之前使用)提供了 Runner,它运行代理并在执行过程中生成事件。但是,它不管理代理如何跨网络或在不同进程之间进行通信。

       这就是 a2a 库的用武之地。我们用它来设计 PostDesignAgentExecutor,它继承自抽象类 AgentExecutor。关键方法是 execute,它接受几个参数:RequestContext 包装传入的用户查询和当前任务信息。EventQueue 和 TaskUpdater 用于排队并将任务更新发送回客户端。execute 方法调用代理对象的 invoke 方法(如上所述)并与代理通信。​​​​​​​

# agents/post_design_agent/agent_executor.pyfrom a2a.server.agent_execution import AgentExecutor,  RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_task, new_agent_text_message
from agents.post_design_agent.agent import PostDesignAgent
from a2a.server.tasks import TaskUpdater
from a2a.types import TaskStateimport asyncio
from a2a.types import Messageclass PostDesignAgentExecutor(AgentExecutor):def __init__(self):    """        An executor to run the PostDesignAgent with the provided tools.        """        self.agent = PostDesignAgent()                    async def execute(self, request_context: RequestContext, event_queue: EventQueue) -> str:           query = request_context.get_user_input()        task = request_context.current_task        if not task:         task = new_task(request_context.message)            await event_queue.enqueue_event(task)                updater = TaskUpdater(event_queue, task.id, task.context_id)                try:         async for item in self.agent.invoke(query, task.context_id):            is_task_complete = item.get("is_task_complete", False)                                # C: those returned updates are decided by the agent's invoke method design                if not is_task_complete:               message = item.get("updates", "The agent is still working on your request...")                    await updater.update_status(TaskState.working, new_agent_text_message(message, task.context_id, task.id))                else:                final_result = item.get("content", "no result is received.")                    await updater.update_status(TaskState.completed, new_agent_text_message(final_result, task.context_id, task.id))                    return final_result                                    await asyncio.sleep(0.1)  # Yield control to the event loop                        except Exception as e:        error_message = f"Error during agent execution: {e}"            await updater.update_status(TaskState.failed, new_agent_text_message(error_message, task.context_id, task.id))            return "No result produced by agent."            async def cancel(self):    """ Handle any cleanup if the execution is cancelled. """        pass

__main__.py

       现在我们已经定义了 PostDesignAgent 和 PostDesignAgentExecutor,我们需要一个简单的应用程序来包装它们并使代理可访问。A2A 代理需要“卡片”来定义其功能、技能、描述、版本、URL 和默认输入/输出模式。

       我们还需要提供一个 DefaultRequestHandler,它通过 RequestContext 接收和解析来自其他代理的传入请求,将它们分派到 AgentExecutor,并将事件流式传输回请求者,直到任务完成。

       最后,A2AStarletteApplication 充当 A2A 提供的服务器端 Web 应用程序包装器。它建立在 Starlette(FastAPI 内部使用的 ASGI 框架)之上,并公开代理,以便其他 A2A 代理可以与其交互。​​​​​​​

from a2a.types import AgentSkill, AgentCard, AgentCapabilities
import click
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from agents.post_design_agent.agent_executor import PostDesignAgentExecutor
from a2a.server.apps import A2AStarletteApplication
import uvicorn
from config import config@click.command()
@click.option('--host', default='localhost', help='Host for the agent server.')
@click.option('--port', default=int(config.POST_DESIGN_AGENT_PORT), help='Port for the agent server.')
def main(host: str, port: int):skill = AgentSkill(    name="PostDesignAgent",        id="post_design_agent",        description="A simple post design agent which can create a post title, subtitle, highlight and is built with Google's Agent Development Kit.",        tags =["post design", "google adk", "llm agent"],        examples=["""create a post for a school activity which organize primary school students to attend coding lessons""",                """create a post for a travel blog about visiting Japan"""],    )    card = AgentCard(     name="PostDesignAgent",        description="A simple post design agent which can create a post title, subtitle, highlight and is built with Google's Agent Development Kit.",        skills=[skill],        capabilities=AgentCapabilities(streaming=True, multi_turn=True),        url=f"http://{host}:{port}/",        version="1.0",        default_input_modes=["text"],        default_output_modes=["text"]    )        request_handler = DefaultRequestHandler(    agent_executor=PostDesignAgentExecutor(),        task_store=InMemoryTaskStore(),    )        server = A2AStarletteApplication(    agent_card=card,        http_handler=request_handler    ) uvicorn.run(server.build(), host=host, port=port)    
if __name__ == "__main__":main()

运行此文件以启动代理。如果一切设置正确,它应该如下所示运行。

Image

四、创建主机代理

       与其他代理不同,hostagent负责发现其他代理和 MCP 服务器,以及委派和编排任务。构建主机代理的方法有很多种,例如使用 LangChain、AutoGPT 或自定义编排系统。在此示例中,我将继续使用 Google-adk 和 A2A 协议来创建主机代理。

       HostAgent类提供了一个 list_agents 方法来列出所有现有代理。此方法在内部调用 self.agent_discovery.list_agent_cards() 以检索代理卡信息。list_agent_cards 的定义可以在项目文件夹中找到它 utilities/a2a/agent_discovery.py,这里不做详细介绍。

       HostAgent还通过调用 self.mcp_servers.get_tools()方法收集所有可用的 MCP 服务器。此外,它还需要一个 delegate_task 方法,该方法定义hostagent应如何使用其代理卡将任务委托给特定代理。准备好所有这些信息后,主机代理将创建一个 LlmAgent 对象并传入所有工具。

       具体而言, FunctionTool(self.delegate_task) 在hostagent无法处理用户请求的情况下,将委派逻辑发送给 LLM。 FunctionTool(self.list_agents) 允许代理发现其他可用的代理,使 LLM 能够决定将任务委派给哪个代理。*mcp_tools 扩展了从 MCP 服务器发现的工具列表。

       通过将所有这些工具传递给 Google-adk 的 LlmAgent,对象可以调用工具、调用函数、管理记忆和会话,并提供异步流式反馈。因此,作为 Google-adk 的核心组件之一,LlmAgent 对象处理的是代理编排,而不仅仅是原始的 LLM 推理。

       HostAgent 的 invoke 方法与其他代理的工作方式相同,所以我不会在这里讨论。​​​​​​​

import os 
from utilities.common.file_loader import load_instruction_file
from google.adk.agents import LlmAgent
from google.adk import Runner
from google.adk.artifacts import InMemoryArtifactService
from google.adk.sessions import InMemorySessionService
from google.adk.memory import InMemoryMemoryService
from google.adk.tools.function_tool import FunctionTool
from collections.abc import AsyncIterable
from google.genai import types
from utilities.mcp import mcp_connect
from utilities.a2a import agent_discovery, agent_connector
from dotenv import load_dotenv
from a2a.types import AgentCard
from uuid import uuid4load_dotenv()class HostAgent:"""    Orchestrator agent.        - Discover A2A agent via agent discovery    - Discover MCP servers via MCP connectors and load the MCP tools     - Route the user query by picking the correct agents/tools    """    def __init__(self):    """         A simple website builder agent which can create a basic website page and is built with Google's Agent Development Kit.        """   file_path = os.path.dirname(__file__)        self.SYSTEM_INSTRUCTION = load_instruction_file(file_path + "/instructions.txt")        self.DESCRIPTION = load_instruction_file(file_path + "/description.txt")                self.mcp_connector = mcp_connect.MCPConnector()        self.agent_discovery = agent_discovery.AgentDiscovery()                async def create(self):    self.agent = await self.build_agent()        self.user_id = "host_agent"        self.runner = Runner(        app_name=self.agent.name,            agent = self.agent,            artifact_service=InMemoryArtifactService(),            session_service=InMemorySessionService(),            memory_service=InMemoryMemoryService())        async def list_agents(self) -> list[AgentCard]:     """        A2A tool: return the list of agent card of A2A registered agents        Returns:        list[AgentCard]:        """        cards: list[AgentCard] = await self.agent_discovery.list_agent_cards()        cards_info = [card.model_dump(exclude_none = True)['name'] for card in cards]        print("🌴🌴🌴Identified agent cards:", cards_info)        return  [card.model_dump(exclude_none = True) for card in cards]            async def delegate_task(self, agent_name: str, message: str) -> str:    cards: list[AgentCard] = await self.agent_discovery.list_agent_cards()                matched_card = None        for card in cards:        if card.name.lower() == agent_name.lower():            matched_card = card                 break                 if matched_card is None:       return "Agent not found"                connector = agent_connector.AgentConnector(matched_card)        return await connector.send_task(message=message,                         session_id=str(uuid4()))                            async def build_agent(self) -> LlmAgent:    mcp_tools = await self.mcp_connector.get_tools()        await self.list_agents()                return LlmAgent(       name = "HostAgent",            model="gemini-2.5-flash",            instruction=self.SYSTEM_INSTRUCTION,            description=self.DESCRIPTION,            tools =[FunctionTool(self.delegate_task),                    FunctionTool(self.list_agents),                    *mcp_tools]        )            async def invoke(self, query: str, session_id: str = None) -> AsyncIterable[dict]:     ... (skipped)

如果一切正常,则主机代理可用,如下所示。

Image

五、创建 UI 以调用主机代理

       上述hostagent可以由多个客户端访问,无论是通过命令行应用程序还是其他后端服务。在这个例子中,我创建了一个简单的 Streamlit UI,允许用户与主机代理进行交互。为了简洁起见,省略了一些与 UI 相关的代码。

       当 Streamlit App 想要与主机代理进行通信时,首先通过调用 retrieve_host_card 函数来检索主机代理的卡片信息。然后,此卡信息用于创建 agent_connector.AgentConnector 对象。有了这个 host_connector,我们就可以使用异步方式向 asyncio.run(host_connector.send_task(prompt)) 主机代理发送用户查询并等待响应。​​​​​​​

# app/chat_ai.pyimport streamlit as st
from utilities.a2a import agent_connector, agent_discovery
from utilities.mcp import mcp_discovery
from a2a.client import A2ACardResolver
import httpx
import asyncio
from config import config    async def retrieve_host_card(host_agent_url: str) -> agent_connector.AgentConnector:    # C: the first step is to find host_agent_card    host_agent_card = None    async with httpx.AsyncClient(timeout=300) as httpx_client:    try:        resolver = A2ACardResolver(base_url=host_agent_url, httpx_client=httpx_client)            host_agent_card  = await resolver.get_agent_card()            if host_agent_card:            print(f"🌳🌳🌳 Discovered agent: {host_agent_card.name} at {host_agent_url}")            else:            print(f"⭐️⭐️⭐️ No AgentCard found at {host_agent_url}")        except Exception as e:        print(f"💥💥💥 Error retrieving AgentCard from {host_agent_url}: {e}")        # C: connect to the host agent    return host_agent_card
... (skipped)card = asyncio.run(retrieve_host_card(f"http://localhost:{config.HOST_AGENT_PORT}"))
host_connector = agent_connector.AgentConnector(card)# Display chat history
for msg in st.session_state.messages:with st.chat_message(msg["role"]):    st.markdown(msg["content"], unsafe_allow_html=True)# Chat input
if prompt := st.chat_input("Type your message...", key="prompt"):# Add user message    st.session_state.messages.append({"role": "user", "content": prompt})    with st.chat_message("user"):    st.markdown(prompt)    # bot reply    response = asyncio.run(host_connector.send_task(prompt))    st.session_state.messages.append({"role": "assistant", "content": response})            with st.chat_message("assistant"):     st.markdown(response)

       下面是 Streamlit UI 的屏幕截图。它显示主机代理调用 PostDesignAgent。由于此任务相对简单,主机代理可以直接响应用户。如果我没有明确指定“使用 PostDesignAgent 设计...”在我的查询中,输出格式将是随机的,而不是遵循 PostDesignAgent 定义的结构化格式。这个实验表明,在这个过程中确实调用了后期设计代理!

Image

六、运行快捷方式

       您可以使用提供的 Makefile 轻松运行和停止系统。使用 make run-all 启动所有客户端和 MCP 服务器,并使用 make stop-all 或 make kill-all 停止所有服务。

参考链接:

[1] https://gofastmcp.com/getting-started/welcome

http://www.dtcms.com/a/558078.html

相关文章:

  • 【30】船舶数据集(有v5/v8模型)/YOLO船舶检测
  • 全网视频合集网站建设制作相册图片合集
  • 网站建设建议书嘉兴手机网站建设
  • 公司网站建设 阜阳四川城乡建设网站证件查询
  • MySQL操作库
  • 免费vip影视网站怎么做的重庆网站建设与制作
  • React Hooks 实现表单验证
  • 李宏毅机器学习笔记38
  • 本网站建设优秀个人网站欣赏
  • 基于springboot的河南传统文化展示与推荐系统
  • 宁波专业网站建设公司百度网络营销的概念和含义
  • 《P2656 采蘑菇》
  • 做网站每年需付费吗河南省建设人才信息网官网
  • 做网站要用到数据库吗文网文网站建设
  • 网站策划流程专业做婚庆的网站
  • 中国住房建设部网站wordpress后台拿shell
  • day96—双指针—长按键入(LeetCode-925)
  • 本地部署 Spring AI 完全指南:从环境搭建到实战落地
  • 外贸服装网站建设高邮网站建设
  • 建设电子商务网站期末考试网站开发需求表
  • 如何利用QuickAPI管理企业数据库的API生命周期并提升数据安全
  • 做自媒体网站开发番禺网站开发设计
  • Verilog和FPGA的自学笔记9——呼吸灯
  • @RestController注解
  • 门户网站英文郑州网站模板建设
  • LVS负载均衡集群理论
  • 关于高校网站建设论文的总结网络优化基础知识
  • 规则引擎Drools语法要点
  • 柘林网站建设公司推广做哪个网站
  • 校园网站建设情况统计表logo标志