MCP的stdio和SSE通信方式使用案例
一、内容概述
这是一个基于 OpenAI API 的多服务器工具调用系统,主要功能包括:
1. **多服务器工具集成**
- 支持同时连接多个 MCP (Model-Controller-Provider) 服务器
- 目前集成了三个服务器:
- 天气服务 (weather)
- Python 执行服务 (PythonServer)
- SQL 查询服务 (SQLServer)
2. **两种运行模式**
- SSE (Server-Sent Events) 模式:通过 <mcfile name="client_sse.py"
- 标准 I/O 模式:通过 <mcfile name="client_stdio.py"
3. **基于 Function Calling 的工具调用**
- 使用 OpenAI 的最新 Function Calling 功能
- 系统会自动:
1. 收集各服务器提供的工具信息
2. 将工具信息转换为 OpenAI Function Calling 格式
3. 根据用户输入智能选择合适的工具
4. 执行工具调用并返回结果
4. **主要依赖**
- OpenAI API 用于 LLM 调用
- httpx 用于 HTTP 客户端
- pandas 用于数据处理
- pymysql 用于数据库操作
一个典型的 AI Agent 系统,通过 LLM 的能力来智能调用不同服务器提供的各种工具,实现更复杂的任务处理能力。作为一个模板样例便于以后学习。
主要文件如下所示:
二、依赖库
即requirements.txt
annotated-types==0.7.0
anyio==4.9.0
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
cryptography==44.0.2
distro==1.9.0
exceptiongroup==1.2.2
h11==0.14.0
httpcore==1.0.8
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
jiter==0.9.0
mcp==1.6.0
numpy==2.2.4
openai==1.74.0
pandas==2.2.3
pycparser==2.22
pydantic==2.11.3
pydantic-core==2.33.1
pydantic-settings==2.8.1
pymysql==1.1.1
python-dateutil==2.9.0.post0
python-dotenv==1.1.0
pytz==2025.2
requests==2.32.3
six==1.17.0
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.2
tqdm==4.67.1
typing-extensions==4.13.2
typing-inspection==0.4.0
tzdata==2025.2
urllib3==2.4.0
uvicorn==0.34.1
三、主要代码
.env用于存放LLM的API密钥
BASE_URL=http://192.168.0.10:11434/v1/
MODEL=qwq:32b-fp16 # qwq:32b-fp16 qwen2.5-coder:14b
OPENAI_API_KEY=ollama# BASE_URL=https://api.siliconflow.cn/v1/
# MODEL=deepseek-ai/DeepSeek-R1-Distill-Qwen-7B
# OPENAI_API_KEY=sk-usbsjapwulwdmlpjseihldovpiyccpkadasylecgtriwsuwj
client_sse.py
import asyncio
import os
import json
from typing import Optional, Dict
from contextlib import AsyncExitStackfrom openai import OpenAI
from dotenv import load_dotenvfrom mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_clientload_dotenv()class MultiServerMCPClient:def __init__(self):"""管理多个 MCP 服务器的客户端"""self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("OPENAI_API_KEY") self.base_url = os.getenv("BASE_URL") self.model = os.getenv("MODEL") if not self.openai_api_key:raise ValueError("❌ 未找到 OPENAI_API_KEY,请在 .env 文件中配置")# 初始化 OpenAI Clientself.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)# 存储 (server_name -> MCP ClientSession) 映射self.sessions: Dict[str, ClientSession] = {}# 存储工具信息self.tools_by_session: Dict[str, list] = {} # 每个 session 的 tools 列表self.all_tools = [] # 合并所有工具的列表async def connect_to_servers(self, servers: dict):"""同时启动多个服务器并获取工具servers: 形如 {"weather": "weather_server.py", "rag": "rag_server.py"}"""for server_name, script_path in servers.items():session = await self._start_one_server(script_path)self.sessions[server_name] = session# 列出此服务器的工具resp = await session.list_tools()self.tools_by_session[server_name] = resp.tools # 保存到 self.tools_by_sessionfor tool in resp.tools:# OpenAI Function Calling 格式修正function_name = f"{server_name}_{tool.name}"# print(tool.name)self.all_tools.append({"type": "function","function": {"name": function_name,"description": tool.description,"input_schema": tool.inputSchema}})# 转化function calling格式self.all_tools = await self.transform_json(self.all_tools)# print(self.all_tools)print("\n✅ 已连接到下列服务器:")for name in servers:print(f" - {name}: {servers[name]}")print("\n汇总的工具:")for t in self.all_tools:print(f" - {t['function']['name']}")async def transform_json(self, json2_data):"""将类似 json2 的格式转换为类似 json1 的格式,多余字段会被直接删除。:param json2_data: 一个可被解释为列表的 Python 对象(或已解析的 JSON 数据):return: 转换后的新列表"""result = []for item in json2_data:# 确保有 "type" 和 "function" 两个关键字段if not isinstance(item, dict) or "type" not in item or "function" not in item:continueold_func = item["function"]# 确保 function 下有我们需要的关键子字段if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:continue# 处理新 function 字段new_func = {"name": old_func["name"],"description": old_func["description"],"parameters": {}}# 读取 input_schema 并转成 parametersif "input_schema" in old_func and isinstance(old_func["input_schema"], dict):old_schema = old_func["input_schema"]# 新的 parameters 保留 type, properties, required 这三个字段new_func["parameters"]["type"] = old_schema.get("type", "object")new_func["parameters"]["properties"] = old_schema.get("properties", {})new_func["parameters"]["required"] = old_schema.get("required", [])new_item = {"type": item["type"],"function": new_func}result.append(new_item)return result async def _start_one_server(self, server_url: str) -> ClientSession:"""启动单个 MCP 服务器子进程,并返回 ClientSession"""# 创建 SSE 客户端连接上下文管理器sse_transport = await self.exit_stack.enter_async_context(sse_client(url=server_url))# 异步初始化 SSE 连接,获取数据流对象read_stream, write_stream = sse_transportsession = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))await session.initialize()return sessionasync def chat_base(self, messages: list) -> list:# messages = [{"role": "user", "content": query}]response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.all_tools)if response.choices[0].finish_reason == "tool_calls":max_number = 10 # 防止无限循环调用工具while max_number:messages = await self.create_function_response_messages(messages, response)response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.all_tools)if response.choices[0].finish_reason != "tool_calls":breakmax_number = max_number - 1# return response.choices[0].message.contentreturn responseasync def create_function_response_messages(self, messages, response):function_call_messages = response.choices[0].message.tool_callsmessages.append(response.choices[0].message.model_dump())for function_call_message in function_call_messages:tool_name = function_call_message.function.nametool_args = json.loads(function_call_message.function.arguments)# 运行外部函数function_response = await self._call_mcp_tool(tool_name, tool_args)# 拼接消息队列messages.append({"role": "tool","content": function_response,"tool_call_id": function_call_message.id,})return messages async def process_query(self, user_query: str) -> str:"""OpenAI 最新 Function Calling 逻辑:1. 发送用户消息 + tools 信息2. 若模型 `finish_reason == "tool_calls"`,则解析 toolCalls 并执行相应 MCP 工具3. 把调用结果返回给 OpenAI,让模型生成最终回答"""messages = [{"role": "user", "content": user_query}]# 第一次请求response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.all_tools)content = response.choices[0]print(content)print(self.all_tools)# 如果模型调用了 MCP 工具if content.finish_reason == "tool_calls":# 解析 tool_callstool_call = content.message.tool_calls[0]tool_name = tool_call.function.name # 形如 "weather_query_weather"tool_args = json.loads(tool_call.function.arguments)print(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")# 执行MCP工具result = await self._call_mcp_tool(tool_name, tool_args)# 把工具调用历史写进 messagesmessages.append(content.message.model_dump())messages.append({"role": "tool","content": result,"tool_call_id": tool_call.id,})# 第二次请求,让模型整合工具结果,生成最终回答response = self.client.chat.completions.create(model=self.model,messages=messages)return response.choices[0].message.content# 如果模型没调用工具,直接返回回答return content.message.contentasync def _call_mcp_tool(self, tool_full_name: str, tool_args: dict) -> str:"""根据 "serverName_toolName" 调用相应的服务器工具"""parts = tool_full_name.split("_", 1) # 拆分 "weather_query_weather" -> ["weather", "query_weather"]if len(parts) != 2:return f"无效的工具名称: {tool_full_name}"server_name, tool_name = partssession = self.sessions.get(server_name)if not session:return f"找不到服务器: {server_name}"# 执行 MCP 工具resp = await session.call_tool(tool_name, tool_args)print(resp)return resp.content if resp.content else "工具执行无输出"async def chat_loop(self):print("\n🤖 多服务器 MCP + 最新 Function Calling 客户端已启动!输入 'quit' 退出。")messages = []while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:messages.append({"role": "user", "content": query})messages = messages[-20: ]# print(messages)response = await self.chat_base(messages)messages.append(response.choices[0].message.model_dump())result = response.choices[0].message.contentprint(f"\nAI: {result}")except Exception as e:print(f"\n⚠️ 调用过程出错: {e}")async def cleanup(self):# 关闭所有资源await self.exit_stack.aclose()async def main():# 服务器配置 以SSE 方式运行 MCP 服务器servers = {"weather": "http://192.168.0.10:8001/sse","PythonServer": "http://192.168.0.10:8002/sse","SQLServer": "http://192.168.0.10:8003/sse"}client = MultiServerMCPClient()try:await client.connect_to_servers(servers)await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())
client_stdio.py
import asyncio
import os
import json
from typing import Optional, Dict
from contextlib import AsyncExitStackfrom openai import OpenAI
from dotenv import load_dotenvfrom mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_clientload_dotenv()class MultiServerMCPClient:def __init__(self):"""管理多个 MCP 服务器的客户端"""self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("OPENAI_API_KEY") self.base_url = os.getenv("BASE_URL") self.model = os.getenv("MODEL") if not self.openai_api_key:raise ValueError("❌ 未找到 OPENAI_API_KEY,请在 .env 文件中配置")# 初始化 OpenAI Clientself.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)# 存储 (server_name -> MCP ClientSession) 映射self.sessions: Dict[str, ClientSession] = {}# 存储工具信息self.tools_by_session: Dict[str, list] = {} # 每个 session 的 tools 列表self.all_tools = [] # 合并所有工具的列表async def connect_to_servers(self, servers: dict):"""同时启动多个服务器并获取工具servers: 形如 {"weather": "weather_server.py", "rag": "rag_server.py"}"""for server_name, script_path in servers.items():session = await self._start_one_server(script_path)self.sessions[server_name] = session# 列出此服务器的工具resp = await session.list_tools()self.tools_by_session[server_name] = resp.tools # 保存到 self.tools_by_sessionfor tool in resp.tools:# OpenAI Function Calling 格式修正function_name = f"{server_name}_{tool.name}"# print(tool.name)self.all_tools.append({"type": "function","function": {"name": function_name,"description": tool.description,"input_schema": tool.inputSchema}})# 转化function calling格式self.all_tools = await self.transform_json(self.all_tools)# print(self.all_tools)print("\n✅ 已连接到下列服务器:")for name in servers:print(f" - {name}: {servers[name]}")print("\n汇总的工具:")for t in self.all_tools:print(f" - {t['function']['name']}")async def transform_json(self, json2_data):"""将类似 json2 的格式转换为类似 json1 的格式,多余字段会被直接删除。:param json2_data: 一个可被解释为列表的 Python 对象(或已解析的 JSON 数据):return: 转换后的新列表"""result = []for item in json2_data:# 确保有 "type" 和 "function" 两个关键字段if not isinstance(item, dict) or "type" not in item or "function" not in item:continueold_func = item["function"]# 确保 function 下有我们需要的关键子字段if not isinstance(old_func, dict) or "name" not in old_func or "description" not in old_func:continue# 处理新 function 字段new_func = {"name": old_func["name"],"description": old_func["description"],"parameters": {}}# 读取 input_schema 并转成 parametersif "input_schema" in old_func and isinstance(old_func["input_schema"], dict):old_schema = old_func["input_schema"]# 新的 parameters 保留 type, properties, required 这三个字段new_func["parameters"]["type"] = old_schema.get("type", "object")new_func["parameters"]["properties"] = old_schema.get("properties", {})new_func["parameters"]["required"] = old_schema.get("required", [])new_item = {"type": item["type"],"function": new_func}result.append(new_item)return result async def _start_one_server(self, script_path: str) -> ClientSession:"""启动单个 MCP 服务器子进程,并返回 ClientSession"""is_python = script_path.endswith(".py")is_js = script_path.endswith(".js")if not (is_python or is_js):raise ValueError("服务器脚本必须是 .py 或 .js 文件")command = "python" if is_python else "node"server_params = StdioServerParameters(command=command,args=[script_path],env=None)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read_stream, write_stream = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read_stream, write_stream))await session.initialize()return sessionasync def chat_base(self, messages: list) -> list:# messages = [{"role": "user", "content": query}]response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.all_tools)if response.choices[0].finish_reason == "tool_calls":max_number = 10 # 防止无限循环调用工具while max_number:messages = await self.create_function_response_messages(messages, response)response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.all_tools)if response.choices[0].finish_reason != "tool_calls":breakmax_number = max_number - 1# return response.choices[0].message.contentreturn responseasync def create_function_response_messages(self, messages, response):function_call_messages = response.choices[0].message.tool_callsmessages.append(response.choices[0].message.model_dump())for function_call_message in function_call_messages:tool_name = function_call_message.function.nametool_args = json.loads(function_call_message.function.arguments)# 运行外部函数function_response = await self._call_mcp_tool(tool_name, tool_args)# 拼接消息队列messages.append({"role": "tool","content": function_response,"tool_call_id": function_call_message.id,})return messages async def process_query(self, user_query: str) -> str:"""OpenAI 最新 Function Calling 逻辑:1. 发送用户消息 + tools 信息2. 若模型 `finish_reason == "tool_calls"`,则解析 toolCalls 并执行相应 MCP 工具3. 把调用结果返回给 OpenAI,让模型生成最终回答"""messages = [{"role": "user", "content": user_query}]# 第一次请求response = self.client.chat.completions.create(model=self.model,messages=messages,tools=self.all_tools)content = response.choices[0]print(content)print(self.all_tools)# 如果模型调用了 MCP 工具if content.finish_reason == "tool_calls":# 解析 tool_callstool_call = content.message.tool_calls[0]tool_name = tool_call.function.name # 形如 "weather_query_weather"tool_args = json.loads(tool_call.function.arguments)print(f"\n[ 调用工具: {tool_name}, 参数: {tool_args} ]\n")# 执行MCP工具result = await self._call_mcp_tool(tool_name, tool_args)# 把工具调用历史写进 messagesmessages.append(content.message.model_dump())messages.append({"role": "tool","content": result,"tool_call_id": tool_call.id,})# 第二次请求,让模型整合工具结果,生成最终回答response = self.client.chat.completions.create(model=self.model,messages=messages)return response.choices[0].message.content# 如果模型没调用工具,直接返回回答return content.message.contentasync def _call_mcp_tool(self, tool_full_name: str, tool_args: dict) -> str:"""根据 "serverName_toolName" 调用相应的服务器工具"""parts = tool_full_name.split("_", 1) # 拆分 "weather_query_weather" -> ["weather", "query_weather"]if len(parts) != 2:return f"无效的工具名称: {tool_full_name}"server_name, tool_name = partssession = self.sessions.get(server_name)if not session:return f"找不到服务器: {server_name}"# 执行 MCP 工具resp = await session.call_tool(tool_name, tool_args)print(resp)return resp.content if resp.content else "工具执行无输出"async def chat_loop(self):print("\n🤖 多服务器 MCP + 最新 Function Calling 客户端已启动!输入 'quit' 退出。")messages = []while True:query = input("\n你: ").strip()if query.lower() == "quit":breaktry:messages.append({"role": "user", "content": query})messages = messages[-20: ]# print(messages)response = await self.chat_base(messages)messages.append(response.choices[0].message.model_dump())result = response.choices[0].message.contentprint(f"\nAI: {result}")except Exception as e:print(f"\n⚠️ 调用过程出错: {e}")async def cleanup(self):# 关闭所有资源await self.exit_stack.aclose()async def main():# 服务器脚本 以标准 I/O 方式运行 MCP 服务器servers = {"weather": "weather_server.py","SQLServer":"sql_server.py","PythonServer":"python_server.py"}client = MultiServerMCPClient()try:await client.connect_to_servers(servers)await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())
python_server.py
import json
from typing import Any
import csv
import numpy as np
import pandas as pd
import random
from mcp.server.fastmcp import FastMCP# 初始化 MCP 服务器
mcp = FastMCP("PythonServer", port=8002)
USER_AGENT = "Pythonserver-app/1.0"@mcp.tool()
async def python_inter(py_code):"""运行用户提供的 Python 代码,并返回执行结果。:param py_code: 字符串形式的 Python 代码:return: 代码运行的最终结果"""g = globals()try:# 若是表达式,直接运行并返回result = eval(py_code, g)return json.dumps(str(result), ensure_ascii=False)except Exception:global_vars_before = set(g.keys())try:exec(py_code, g)except Exception as e:return json.dumps(f"代码执行时报错: {e}", ensure_ascii=False)global_vars_after = set(g.keys())new_vars = global_vars_after - global_vars_beforeif new_vars:# 只返回可序列化的变量值safe_result = {}for var in new_vars:try:json.dumps(g[var]) # 尝试序列化,确保可以转换为 JSONsafe_result[var] = g[var]except (TypeError, OverflowError):safe_result[var] = str(g[var]) # 如果不能序列化,则转换为字符串return json.dumps(safe_result, ensure_ascii=False)else:return json.dumps("已经顺利执行代码", ensure_ascii=False)if __name__ == "__main__":# 以标准 I/O 方式运行 MCP 服务器# mcp.run(transport='stdio')# 使用 SSE 方式运行 MCP 服务器mcp.run(transport='sse')
sql_server.py
import json
import httpx
from typing import Any
import pymysql
import csv
from mcp.server.fastmcp import FastMCP# 初始化 MCP 服务器
mcp = FastMCP("SQLServer", port=8003)
USER_AGENT = "SQLserver-app/1.0"g_host_ip = "192.168.0.10"
# g_host_ip = "localhost"
g_user = "root"
g_passwd = "123000000"
g_db = "school"@mcp.tool()
async def sql_inter(sql_query):"""查询本地MySQL数据库,通过运行一段SQL代码来进行数据库查询。\:param sql_query: 字符串形式的SQL查询语句,用于执行对MySQL中school数据库中各张表进行查询,并获得各表中的各类相关信息:return:sql_query在MySQL中的运行结果。"""connection = pymysql.connect(host=g_host_ip, # 数据库地址user=g_user, # 数据库用户名passwd=g_passwd, # 数据库密码db=g_db, # 数据库名charset='utf8' # 字符集选择utf8)try:with connection.cursor() as cursor:# SQL查询语句sql = sql_querycursor.execute(sql)# 获取查询结果results = cursor.fetchall()finally:connection.close()return json.dumps(results)@mcp.tool()
async def export_table_to_csv(table_name, output_file):"""将 MySQL 数据库中的某个表导出为 CSV 文件。:param table_name: 需要导出的表名:param output_file: 输出的 CSV 文件路径"""# 连接 MySQL 数据库connection = pymysql.connect(host=g_host_ip, # 数据库地址user=g_user, # 数据库用户名passwd=g_passwd, # 数据库密码db=g_db, # 数据库名charset='utf8' # 字符集选择utf8)try:with connection.cursor() as cursor:# 查询数据表的所有数据query = f"SELECT * FROM {table_name};"cursor.execute(query)# 获取所有列名column_names = [desc[0] for desc in cursor.description]# 获取查询结果rows = cursor.fetchall()# 将数据写入 CSV 文件with open(output_file, mode='w', newline='', encoding='utf-8') as file:writer = csv.writer(file)# 写入表头writer.writerow(column_names)# 写入数据writer.writerows(rows)print(f"数据表 {table_name} 已成功导出至 {output_file}")except Exception as e:print(f"导出失败: {e}")finally:connection.close()if __name__ == "__main__":# 以标准 I/O 方式运行 MCP 服务器# mcp.run(transport='stdio')# 使用 SSE 方式运行 MCP 服务器mcp.run(transport='sse')
weather_server.py
import json
import httpx
from typing import Any, Union
from mcp.server.fastmcp import FastMCP
from datetime import datetimeimport requests# 初始化 MCP 服务器
mcp = FastMCP("WeatherServer", port=8001)# OpenWeather API 配置
OPENWEATHER_API_BASE = "https://key.wenwen-ai.com"
API_KEY = "sk-NYsoG3VBKDiTuvdtC969F95aFc4f45379aD3854a93602327" # 请替换为你自己的 OpenWeather API Key
USER_AGENT = "v1"# async def fetch_weather(city: str) -> dict[str, Any] | None:
# """
# 从 OpenWeather API 获取天气信息。
# :param city: 城市名称(需使用英文,如 Beijing)
# :return: 天气数据字典;若出错返回包含 error 信息的字典
# """
# params = {
# "q": city,
# "appid": API_KEY,
# "units": "metric",
# "lang": "zh_cn"
# }
# headers = {"User-Agent": USER_AGENT}
# async with httpx.AsyncClient() as client:
# try:
# response = await client.get(OPENWEATHER_API_BASE, params=params,headers=headers, timeout=60.0)
# response.raise_for_status()
# return response.json() # 返回字典类型
# except httpx.HTTPStatusError as e:
# return {"error": f"HTTP 错误: {e.response.status_code}"}
# except Exception as e:
# return {"error": f"请求失败: {str(e)}"}# def format_weather(data: dict[str, Any] | str) -> str:
# def format_weather(data: Union[dict[str, Any], str]) -> str:
# """
# 将天气数据格式化为易读文本。
# :param data: 天气数据(可以是字典或 JSON 字符串)
# :return: 格式化后的天气信息字符串
# """
# # 如果传入的是字符串,则先转换为字典
# if isinstance(data, str):
# try:
# data = json.loads(data)
# except Exception as e:
# return f"无法解析天气数据: {e}"# # 如果数据中包含错误信息,直接返回错误提示
# if "error" in data:
# return f"⚠️ {data['error']}"# # 提取数据时做容错处理
# city = data.get("place", "未知")
# temp = data.get("temperature", "N/A")
# humidity = data.get("humidity", "N/A")
# wind_speed = data.get("windSpeed", "N/A")
# description = data.get("weather1", "未知")
# current_time = datetime.now()
# format_time = current_time.strftime("%Y-%m-%d %H:%M:%S")# return (
# f"更新时间: {format_time}\n"
# f"🌍 {city}\n"
# f"🌡 温度: {temp}°C\n"
# f"💧 湿度: {humidity}%\n"
# f"🌬 风速: {wind_speed} m/s\n"
# f"🌤 天气: {description}\n"
# )# @mcp.tool()
# async def query_weather(city: str) -> str:
# """
# 输入指定城市的英文名称,返回今日天气查询结果。
# :param city: 城市名称(需使用英文)
# :return: 格式化后的天气信息
# """
# data = await fetch_weather(city)
# return format_weather(data)# 国内天气server
g_id = "88888888"
g_key = "88888888"
g_base_url = "https://cn.apihz.cn/api/tianqi/tqyb.php"async def get_weather(sheng:str, place: str) -> dict:"""获取指定地区的天气信息,比如place是南昌,则推出sheng是江西,最后获取南昌的天气信息并返回Args:sheng: 省级名称(如:江西等)place: 地区名称(如:南昌等)Returns:dict: 天气信息字典,包含温度、天气状况等信息"""# 构建请求参数params = {"id": g_id,"key": g_key,"place": place,"sheng": sheng}try:# 发送GET请求response = requests.get(g_base_url, params=params)response.raise_for_status() # 检查请求是否成功# 解析JSON响应weather_data = response.json()return weather_dataexcept requests.exceptions.RequestException as e:print(f"请求失败: {e}")return Noneexcept json.JSONDecodeError as e:print(f"JSON解析失败: {e}")return Nonedef format_weather(data: dict[str, Any] | str) -> str:"""将天气数据格式化为易读文本。:param data: 天气数据(可以是字典或 JSON 字符串):return: 格式化后的天气信息字符串"""# 如果传入的是字符串,则先转换为字典if isinstance(data, str):try:data = json.loads(data)except Exception as e:return f"无法解析天气数据: {e}"# 如果数据中包含错误信息,直接返回错误提示if "error" in data:return f"⚠️ {data['error']}"# 提取数据时做容错处理city = data.get("place", "未知")temp = data.get("temperature", "N/A")humidity = data.get("humidity", "N/A")wind_speed = data.get("windSpeed", "N/A")description = data.get("weather1", "未知")current_time = datetime.now()format_time = current_time.strftime("%Y-%m-%d %H:%M:%S")return (f"更新时间: {format_time}\n"f"🌍 {city}\n"f"🌡 温度: {temp}°C\n"f"💧 湿度: {humidity}%\n"f"🌬 风速: {wind_speed} m/s\n"f"🌤 天气: {description}\n")@mcp.tool()
async def query_weather(place:str, sheng:str) -> dict:"""获取指定地区的天气信息,比如place是南昌,则推出sheng是江西,最后获取南昌的天气信息并返回;如果输入的原本就是省级单位,则place为首府,比如sheng为江苏,则place为南京;如果输入的是直辖市,则sheng与place相同,比如place为北京,则sheng为北京。Args:sheng: 省级名称(如:江西等)place: 地区名称(如:南昌等):return: 格式化后的天气信息"""data = await get_weather(sheng, place)return format_weather(data)if __name__ == "__main__":# 以标准 I/O 方式运行 MCP 服务器# mcp.run(transport='stdio')# print(format_weather(get_weather("江西", "南昌")))# 使用 SSE 方式运行 MCP 服务器mcp.run(transport='sse')
四、启动
需要说明的是,SSE和stdio代码和启动都不一样,注意*_server.py文件里if __name__ == "__main__"下的注释,需要对应上。
4.1 SSE通信模型启动程序
每个*_server.py需要单独一个终端执行如下命令:
uv run sql_server.py
uv run pyhton_server.py
uv run weather_server.py
客户端启动
uv run client_sse.py
4.2 stdio通信模型启动程序
uv run client_stdio.py
五、效果图
SSE运行效果图:
stdio运行效果图: