Dify-MCP服务创建案例
直接上代码:
import mysql.connector
import json
import os
import requests
from contextlib import contextmanager
from dotenv import load_dotenv
from fastmcp import FastMCP# 加载环境变量
load_dotenv()# 第三方API默认配置
DEFAULT_TIMEOUT = int(os.getenv("API_TIMEOUT", 10)) # 默认超时时间10秒
DEFAULT_HEADERS = {"Content-Type": "application/json","User-Agent": "FastMCP-Service/1.0"
}class DbManager:def __init__(self):self.connection_pool = mysql.connector.pooling.MySQLConnectionPool(pool_name="db_pool",pool_size=5,pool_reset_session=True,host=os.getenv("DB_HOST", "106.55.102.254"),port=os.getenv("DB_PORT", 3306), # 添加端口配置user=os.getenv("DB_USER", "root"),password=os.getenv("DB_PASSWD", "dh123456"),database=os.getenv("DB_NAME", "test01"),connect_timeout=30, # 连接超时设置charset='utf8mb4' # 字符集设置)@contextmanagerdef get_cursor(self):with self.connection_pool.get_connection() as connection:cursor = Nonetry:cursor = connection.cursor()yield cursorconnection.commit()except Exception as e:connection.rollback()raise efinally:if cursor:cursor.close()def execute_sql(self, sql: str) -> str:with self.get_cursor() as cursor:cursor.execute(sql)if cursor.description is not None:rows = cursor.fetchall()result = {"columns": [desc[0] for desc in cursor.description],"rows": rows,}return json.dumps(result, default=str)else:return f"row affected:{cursor.rowcount}"class ThirdPartyAPIManager:"""第三方接口请求管理器"""@staticmethoddef request_api(url: str,method: str = "GET",params: dict = None,data: dict = None,headers: dict = None,timeout: int = None) -> str:"""通用的第三方API请求方法参数:url: 接口URL地址method: 请求方法,如GET、POST等params: URL查询参数data: 请求体数据headers: 请求头timeout: 超时时间,秒返回:接口响应的JSON字符串"""try:# 处理默认参数timeout = timeout or DEFAULT_TIMEOUTrequest_headers = DEFAULT_HEADERS.copy()# 合并自定义请求头if headers:request_headers.update(headers)# 处理请求数据kwargs = {"url": url,"headers": request_headers,"timeout": timeout,"params": params or {}}# 根据请求方法处理数据method = method.upper()if method in ["POST", "PUT", "PATCH"]:kwargs["json"] = data or {} # 使用json参数自动处理Content-Type# 发送请求response = requests.request(method, **kwargs)# 检查响应状态码response.raise_for_status()# 尝试解析JSON响应try:result = {"status": "success","status_code": response.status_code,"data": response.json()}except json.JSONDecodeError:# 非JSON响应result = {"status": "success","status_code": response.status_code,"data": response.text,"message": "Response is not JSON format"}return json.dumps(result, default=str)except requests.exceptions.RequestException as e:# 处理请求异常error_info = {"status": "error","message": str(e),"type": type(e).__name__}return json.dumps(error_info, default=str)except Exception as e:# 处理其他异常error_info = {"status": "error","message": f"Unexpected error: {str(e)}"}return json.dumps(error_info, default=str)# 初始化实例
dbManager = DbManager()
apiManager = ThirdPartyAPIManager()# 初始化FastMCP服务器
mcp = FastMCP(name="查询Mysql服务")@mcp.tool(description="查询表结构")
def get_table_definition(table: str) -> str:"""get table definition"""return dbManager.execute_sql(f"show create table {table}")@mcp.tool(description="执行SQL语句")
def execute_sql(sql: str) -> str:"""execute sql"""return dbManager.execute_sql(sql)@mcp.tool(description="请求第三方API接口,支持GET、POST等方法")
def request_third_party_api(url: str,method: str = "GET",params: dict = None,data: dict = None,headers: dict = None,timeout: int = None
) -> str:"""请求第三方API接口参数:url: 必须,API接口的URL地址method: 可选,请求方法,默认为GET,可选项包括GET、POST、PUT、DELETE等params: 可选,URL查询参数,字典类型data: 可选,请求体数据,字典类型,用于POST、PUT等方法headers: 可选,请求头信息,字典类型timeout: 可选,超时时间,单位秒返回:接口响应结果的JSON字符串"""return apiManager.request_api(url=url,method=method,params=params,data=data,headers=headers,timeout=timeout)if __name__ == "__main__":# 使用SSE传输mcp.run(transport="sse", host="0.0.0.0", port=8520)
启动:
python3 my_mcp_01.py