当前位置: 首页 > news >正文

谷歌ADK接入文件操作MCP

文章目录

  • MCP基础概念
  • 文件操作服务器
  • 文件操作MCP接入谷歌ADK
    • 项目创建
    • 多轮对话代码

MCP基础概念

  • MCP技术体系中,会将外部工具运行脚本称作服务器,而接入这些外部工具的大模型运行环境称作客户端。
    在这里插入图片描述
  • 一个客户端可以接入多个不同类型的服务器,但都要遵循MCP通信协议。
  • MCP服务器的输出内容是一种标准格式的内容,只能被MCP客户端所识别。在客户端和服务器都遵循MCP协议的时候,客户端就能够像Function calling中大模型调用外部工具一样,调用MCP服务器里面的工具。
    在这里插入图片描述
  • ADK作为客户端,接入MCP工具。核心MCPToolset类,该类是ADK连接到MCP服务器的桥梁。
  • 在实际过程中,ADK代理将执行以下操作MCPToolset
    1. 连接:与MCP服务器进程建立连接。该服务器可以是通过标准输入/输出 ( StdioServerParameters ) 进行通信的本地服务器,也可以是使用服务器发送事件 ( SseServerParams ) 的远程服务器。
    2. 发现:查询MCP服务器以获取可用工具(list_tools MCP 方法)。
    3. 适应:将MCP工具模式转换为ADK兼容BaseTool实例。
    4. 公开:将这些适配的工具呈现给ADK Agent
    5. 代理调用:当Agent决定使用其中一个工具时,发到MCP服务器并返回结果。MCPToolset将调用(call_tool MCP方法)转
    6. 管理连接:处理与MCP服务器进程的连接的生命周期,通常需要明确清理的周期(如进程结束后清理)。

文件操作服务器

  • Filesystem服务器是一个最基础同时也是最常用的MCP服务器,同时也是官方推荐的服务器,服务器项目地址。

文件操作MCP接入谷歌ADK

项目创建

  1. 项目框架搭建:
    # 初始化项目
    uv init adk_mcp
    # 进入项目目录
    cd adk_mcp
    # 创建虚拟环境
    uv venv
    # 激活虚拟环境
    .venv\Scripts\activate
    # 安装依赖
    uv add google-adk litellm
    
  2. 创建环境配置文件.env
    OPENAI_API_KEY=xxx
    OPENAI_API_BASE=https://api.openai-hk.com/v1
    MODEL=openai/gpt-4o-mini
    
  3. 创建main.py,代码内容如下:
    # 导入基础库
    import os
    import asyncio
    from dotenv import load_dotenv
    from google.genai import types
    from google.adk.agents import Agent
    from google.adk.models.lite_llm import LiteLlm
    from google.adk.agents.llm_agent import LlmAgent
    from google.adk.runners import Runner
    from google.adk.sessions import InMemorySessionService
    from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters,StdioConnectionParamsDS_API_KEY = os.getenv("OPENAI_API_KEY")
    DS_BASE_URL = os.getenv("OPENAI_API_BASE")
    DS_MODEL = os.getenv("MODEL")model = LiteLlm(model=DS_MODEL,api_base=DS_BASE_URL,api_key=DS_API_KEY
    )# --- Step 1: 创建导入MCP工具函数 ---
    async def get_tools_async():"""Gets tools from the File System MCP Server."""print("Attempting to connect to MCP Filesystem server...")# 确保目录存在test_dir = "D:\\Code\\adk_mcp\\test"if not os.path.exists(test_dir):os.makedirs(test_dir)print(f"Created directory: {test_dir}")try:# 创建MCP工具集实例mcp_toolset = MCPToolset(# 这里采用Studio通信方式进行调用connection_params=StdioConnectionParams(server_params=StdioServerParameters(command='npx',args=["-y", "@modelcontextprotocol/server-filesystem", test_dir],)))# 等待服务器启动await asyncio.sleep(2)# 获取工具列表tools = await mcp_toolset.get_tools()print("MCP Toolset created successfully.")print(f"Fetched {len(tools)} tools from MCP server.")# 返回工具和工具集对象用于清理return tools, mcp_toolsetexcept Exception as e:print(f"Error creating MCP tools: {e}")raise# --- Step 2: 创建ADK Agent --
    async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""try:tools, mcp_toolset = await get_tools_async()root_agent = Agent(model=model,name='filesystem_assistant',instruction='Help user interact with the local filesystem using available tools.',tools=tools,  # 将MCP工具加载到Agent中)return root_agent, mcp_toolsetexcept Exception as e:print(f"Error creating agent: {e}")raise# --- Step 3: 执行主逻辑 ---
    async def async_main():mcp_toolset = Nonetry:# 创建会话管理器session_service = InMemorySessionService()session = await session_service.create_session(state={}, app_name='mcp_filesystem_app', user_id='user_fs')# 用户输入query = "请帮我查找目前文件夹里都有哪些文件?"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, mcp_toolset = await get_agent_async()runner = Runner(app_name='mcp_filesystem_app',agent=root_agent,session_service=session_service,)print("Running agent...")events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)async for event in events_async:print(f"Event received: {event}")except Exception as e:print(f"Error during main execution: {e}")raisefinally:# 运行完成后,关闭MCP服务器连接if mcp_toolset:print("Closing MCP server connection...")await mcp_toolset.close()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")
    
  • 执行结果如下:
    (adk_mcp) D:\Code\adk_mcp>uv run main.py
    D:\Code\adk_mcp\.venv\Lib\site-packages\pydantic\_internal\_fields.py:198: UserWarning: Field name "config_type" in "SequentialAgent" shadows an attribute in parent "BaseAgent"warnings.warn(
    User Query: '请帮我查找目前文件夹里都有哪些文件?'
    Attempting to connect to MCP Filesystem server...
    D:\Code\adk_mcp\.venv\Lib\site-packages\google\adk\tools\mcp_tool\mcp_tool.py:87: UserWarning: [EXPERIMENTAL] BaseAuthenticatedTool: This feature is experimental and may change or be removed in future versions without notice. It may introduce breaking changes at any time.super().__init__(
    auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
    MCP Toolset created successfully.
    Fetched 14 tools from MCP server.
    Running agent...
    Event received: content=Content(parts=[Part(text="""首先,我需要确定您当前的工作目录位置。由于您没有指定具体路径,我将先获取允许访问的目录列表:"""),Part(function_call=FunctionCall(args={},id='call_5201be00eeb24a989be18f',name='list_allowed_directories')),],role='model'
    ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=627,prompt_token_count=1806,total_token_count=2433
    ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='71634100-b10a-4a81-b779-f34f52df12eb' timestamp=1754979735.932518
    Event received: content=Content(parts=[Part(function_response=FunctionResponse(id='call_5201be00eeb24a989be18f',name='list_allowed_directories',response={'result': CallToolResult(content=[<... Max depth ...>,],isError=False)})),],role='user'
    ) grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='00b7f823-5a66-493d-95bd-55ee93466fc2' timestamp=1754979761.37467
    Event received: content=Content(parts=[Part(text="""根据系统配置,允许访问的目录是 `D:\Code\adk_mcp\test`。我将列出该目录下的文件:"""),Part(function_call=FunctionCall(args={'path': 'D:\\Code\\adk_mcp\\test'},id='call_2f66909ac2054847b6f9d3',name='list_directory')),],role='model'
    ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=1105,prompt_token_count=1906,total_token_count=3011
    ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=set() branch=None id='39358200-064d-478e-90f6-ab0855f13ec4' timestamp=1754979761.380695
    Event received: content=Content(parts=[Part(function_response=FunctionResponse(id='call_2f66909ac2054847b6f9d3',name='list_directory',response={'result': CallToolResult(content=[<... Max depth ...>,],isError=False)})),],role='user'
    ) grounding_metadata=None partial=None turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=None live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='2d1197f7-c81d-41a5-9fc5-15652c50fa7e' timestamp=1754979803.754635
    Event received: content=Content(parts=[Part(text="""当前目录 `D:\Code\adk_mcp\test` 中的文件如下:- 📄 **系统架构设计师教程_带目录高清版.pdf**"""),],role='model'
    ) grounding_metadata=None partial=False turn_complete=None error_code=None error_message=None interrupted=None custom_metadata=None usage_metadata=GenerateContentResponseUsageMetadata(candidates_token_count=128,prompt_token_count=2021,total_token_count=2149
    ) live_session_resumption_update=None invocation_id='e-1710d865-f562-4895-ae06-4a3f0155c138' author='filesystem_assistant' actions=EventActions(skip_summarization=None, state_delta={}, artifact_delta={}, transfer_to_agent=None, escalate=None, requested_auth_configs={}) long_running_tool_ids=None branch=None id='15fdd91c-e645-46fe-a613-76e1f28323c5' timestamp=1754979803.761915
    Closing MCP server connection...
    Cleanup complete.
    

多轮对话代码

  • 以下代码是main.py的多轮对话的版本:
    # 导入基础库
    import os
    import asyncio
    from dotenv import load_dotenv
    from google.genai import types
    from google.adk.agents import Agent
    from google.adk.models.lite_llm import LiteLlm
    from google.adk.agents.llm_agent import LlmAgent
    from google.adk.runners import Runner
    from google.adk.sessions import InMemorySessionService
    from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters,StdioConnectionParamsDS_API_KEY = os.getenv("OPENAI_API_KEY")
    DS_BASE_URL = os.getenv("OPENAI_API_BASE")
    DS_MODEL = os.getenv("MODEL")model = LiteLlm(model=DS_MODEL,api_base=DS_BASE_URL,api_key=DS_API_KEY
    )# --- Step 1: 创建导入MCP工具函数 ---
    async def get_tools_async():"""Gets tools from the File System MCP Server."""print("Attempting to connect to MCP Filesystem server...")# 确保目录存在test_dir = "D:\\Code\\adk_mcp\\test"if not os.path.exists(test_dir):os.makedirs(test_dir)print(f"Created directory: {test_dir}")try:# 创建MCP工具集实例mcp_toolset = MCPToolset(# 这里采用Studio通信方式进行调用connection_params=StdioConnectionParams(server_params=StdioServerParameters(command='npx',  # Command to run the serverargs=["-y",  # Arguments for the command"@modelcontextprotocol/server-filesystem",test_dir],)))# 等待服务器启动await asyncio.sleep(2)# 获取工具列表tools = await mcp_toolset.get_tools()print("MCP Toolset created successfully.")print(f"Fetched {len(tools)} tools from MCP server.")# 返回工具和工具集对象用于清理return tools, mcp_toolsetexcept Exception as e:print(f"Error creating MCP tools: {e}")raise# --- Step 2: 创建ADK Agent --
    async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""try:tools, mcp_toolset = await get_tools_async()root_agent = Agent(model=model,name='filesystem_assistant',instruction='Help user interact with the local filesystem using available tools.',tools=tools,  # 将MCP工具加载到Agent中)return root_agent, mcp_toolsetexcept Exception as e:print(f"Error creating agent: {e}")raise# 多轮对话函数
    async def chat_loop(runner, user_id, session_id) -> None:print("\n🤖ADK + MCP对话已启动!输入'quit'退出。")while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:print(f"\n>>> User Query: {query}")# Prepare the user's message in ADK formatcontent = types.Content(role='user', parts=[types.Part(text=query)])final_response_text = "Agent did not produce a final response."  # Default# Key Concept: run_async executes the agent logic and yields Events.# We iterate through events to find the final answer.async for event in runner.run_async(user_id=user_id,session_id=session_id, new_message=content):# You can uncomment the line below to see *all* events during execution# print(f"  [Event] Author: {event.author}, Type:{type(event).__name__}, Final: {event.is_final_response()}, Content:{event.content}")# Key Concept: is_final_response() marks the concluding message for the turn.if event.is_final_response():if event.content and event.content.parts:# Assuming text response in the first partfinal_response_text = event.content.parts[0].textelif event.actions and event.actions.escalate:  # Handle potential errors / escalationsfinal_response_text = f"Agent escalated:{event.error_message or 'No specific message.'}"# Add more checks here if needed (e.g., specific error codes)breakprint(f"<<< Agent Response: {final_response_text}")except Exception as e:print(f"\n⚠调用过程出错: {e}")# --- Step 3: 执行主逻辑 ---
    async def async_main():mcp_toolset = Nonetry:# 创建会话管理器session_service = InMemorySessionService()session = await session_service.create_session(state={}, app_name='mcp_filesystem_app', user_id='user_fs')# 用户输入query = "请帮我查找目前文件夹里都有哪些文件?"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, mcp_toolset = await get_agent_async()runner = Runner(app_name='mcp_filesystem_app',agent=root_agent,session_service=session_service,)print("Running agent...")events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)async for event in events_async:print(f"Event received: {event}")# 启动多轮对话await chat_loop(runner, session.user_id, session.id)except Exception as e:print(f"Error during main execution: {e}")raisefinally:# 运行完成后,关闭MCP服务器连接if mcp_toolset:print("Closing MCP server connection...")await mcp_toolset.close()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")
    
http://www.dtcms.com/a/328239.html

相关文章:

  • Linux中Https配置与私有CA部署指南
  • Java 工厂方法模式
  • C++单继承虚函数表探索
  • 京东方 DV133FHM-NN1 FHD13.3寸 工业液晶模组技术档案
  • 玩转Docker | 使用Docker部署Radicale日历和联系人工具
  • [激光原理与应用-250]:理论 - 几何光学 - 透镜成像的优缺点,以及如克服缺点
  • 万物平台模型导入样例大全(实时更新中~)
  • SM4对称加密算法的加密模式介绍
  • JavaEE 初阶第十八期:叩开网络世界的大门(上)
  • ffmpeg-AVFilter 和 Filter Graph 使用指南
  • ffmpeg,ffplay, vlc,rtsp-simple-server,推拉流命令使用方法,及测试(二)
  • Stereolabs ZED相机 选型指南:双目 / 单目、短距 / 长距,如何为机器人视觉系统匹配最优方案?
  • 力扣-394.字符串解码
  • 【模型剪枝2】不同剪枝方法实现对 yolov5n 剪枝测试及对比
  • Homebrew 入门教程(2025 年最新版)
  • 获取虚谷数据库所有表名、表注释、字段名、字段类型、字段注释到word中
  • clickhouse基础概念及集群部署
  • 疏老师-python训练营-Day43复习日
  • Qwen-Image(阿里通义千问)技术浅析(一)
  • 谷歌 Web Guide 如何重塑搜索排名及其 SEO 影响
  • python技巧:控制转台的2个坑。
  • 从关键词到智能决策:孟庆涛如何用GEO重塑AI时代的搜索优化范式
  • 2025年受自适应差分进化-无人机路径规划的统一元启发式框架-附Matlab完整代码
  • 云计算核心技术
  • 附表B 正则表达式符号列表
  • Java缓冲流
  • Spring面试宝典
  • FPGA自学——FIFO缓存器
  • 游戏中角色持枪:玩家操控角色,角色转向时枪也要转向
  • 西门子PLC跨代通讯实战:S7-200通过以太网模块与S7-1500数据交互