当前位置: 首页 > news >正文

mysql_mcp_server quickstart

docker启动mysql服务

mysql服务,用于mcp server连接

docker run --name mysql-mcp-new -e MYSQL_ROOT_PASSWORD=123456 -e MYSQL_DATABASE=test_db -p 13306:3306 -d mysql:8.0

环境安装

git clone https://github.com/designcomputer/mysql_mcp_server.git
uv venv
source .venv/bin/activate
uv pip install -r requirements.txt 

完整代码

import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# Configure logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mysql_mcp_server")def get_db_config():"""Get database configuration from environment variables."""config = {"host": os.getenv("MYSQL_HOST", "localhost"),"port": int(os.getenv("MYSQL_PORT", "3306")),"user": os.getenv("MYSQL_USER"),"password": os.getenv("MYSQL_PASSWORD"),"database": os.getenv("MYSQL_DATABASE"),# Add charset and collation to avoid utf8mb4_0900_ai_ci issues with older MySQL versions# These can be overridden via environment variables for specific MySQL versions"charset": os.getenv("MYSQL_CHARSET", "utf8mb4"),"collation": os.getenv("MYSQL_COLLATION", "utf8mb4_unicode_ci"),# Disable autocommit for better transaction control"autocommit": True,# Set SQL mode for better compatibility - can be overridden"sql_mode": os.getenv("MYSQL_SQL_MODE", "TRADITIONAL")}# Remove None values to let MySQL connector use defaults if not specifiedconfig = {k: v for k, v in config.items() if v is not None}if not all([config.get("user"), config.get("password"), config.get("database")]):logger.error("Missing required database configuration. Please check environment variables:")logger.error("MYSQL_USER, MYSQL_PASSWORD, and MYSQL_DATABASE are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("mysql_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List MySQL tables as resources."""config = get_db_config()try:logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")with connect(**config) as conn:logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")with conn.cursor() as cursor:cursor.execute("SHOW TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"mysql://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Error as e:logger.error(f"Failed to list resources: {str(e)}")logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("mysql://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[8:].split('/')table = parts[0]try:logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")with connect(**config) as conn:logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Error as e:logger.error(f"Database error reading resource {uri}: {str(e)}")logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")raise RuntimeError(f"Database error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available MySQL tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the MySQL server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""Execute SQL commands."""config = get_db_config()logger.info(f"Calling tool: {name} with arguments: {arguments}")if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")query = arguments.get("query")if not query:raise ValueError("Query is required")try:logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")with connect(**config) as conn:logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")with conn.cursor() as cursor:cursor.execute(query)# Special handling for SHOW TABLESif query.strip().upper().startswith("SHOW TABLES"):tables = cursor.fetchall()result = ["Tables_in_" + config["database"]]  # Headerresult.extend([table[0] for table in tables])return [TextContent(type="text", text="\n".join(result))]# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)elif cursor.description is not None:columns = [desc[0] for desc in cursor.description]try:rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Error as e:logger.warning(f"Error fetching results: {str(e)}")return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# Non-SELECT querieselse:conn.commit()return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Error as e:logger.error(f"Error executing SQL '{query}': {e}")logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""Main entry point to run the MCP server."""from mcp.server.stdio import stdio_server# Add additional debug outputprint("Starting MySQL MCP server with config:", file=sys.stderr)config = get_db_config()print(f"Host: {config['host']}", file=sys.stderr)print(f"Port: {config['port']}", file=sys.stderr)print(f"User: {config['user']}", file=sys.stderr)print(f"Database: {config['database']}", file=sys.stderr)logger.info("Starting MySQL MCP server...")logger.info(f"Database config: {config['host']}/{config['database']} as {config['user']}")async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())

代码解释

导入模块和配置日志

import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mysql_mcp_server")

这部分导入了必要的模块:

  • asyncio:用于异步编程
  • mysql.connector:MySQL数据库连接器
  • mcp.servermcp.types:MCP协议相关的类和类型
  • 配置了日志系统,便于调试和监控

数据库配置函数

def get_db_config():"""从环境变量获取数据库配置。"""config = {"host": os.getenv("MYSQL_HOST", "localhost"),"port": int(os.getenv("MYSQL_PORT", "3306")),"user": os.getenv("MYSQL_USER"),"password": os.getenv("MYSQL_PASSWORD"),"database": os.getenv("MYSQL_DATABASE"),# 添加字符集和排序规则以避免utf8mb4_0900_ai_ci在旧版MySQL中的问题"charset": os.getenv("MYSQL_CHARSET", "utf8mb4"),"collation": os.getenv("MYSQL_COLLATION", "utf8mb4_unicode_ci"),# 启用自动提交以便更好地控制事务"autocommit": True,# 设置SQL模式以提高兼容性"sql_mode": os.getenv("MYSQL_SQL_MODE", "TRADITIONAL")}# 移除None值,让MySQL连接器使用默认值config = {k: v for k, v in config.items() if v is not None}if not all([config.get("user"), config.get("password"), config.get("database")]):logger.error("缺少必要的数据库配置。请检查环境变量:")logger.error("MYSQL_USER, MYSQL_PASSWORD, 和 MYSQL_DATABASE 是必需的")raise ValueError("缺少必要的数据库配置")return config

这个函数从环境变量中读取MySQL连接配置:

  • 支持配置主机、端口、用户名、密码、数据库名等
  • 设置了默认的字符集和排序规则,解决兼容性问题
  • 验证必要的配置项(用户名、密码、数据库名)是否存在

初始化MCP服务器

# 初始化服务器
app = Server("mysql_mcp_server")

创建了一个名为"mysql_mcp_server"的MCP服务器实例。

资源列表功能

@app.list_resources()
async def list_resources() -> list[Resource]:"""列出MySQL表作为资源。"""# 实现代码...

这个函数实现了MCP的资源列表功能:

  • 连接到MySQL数据库
  • 执行SHOW TABLES命令获取所有表
  • 将每个表转换为MCP资源格式返回
  • 每个资源包含URI、名称、MIME类型和描述

读取资源功能

@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""读取表内容。"""# 实现代码...

这个函数实现了读取资源的功能:

  • 解析URI获取表名
  • 连接到MySQL数据库
  • 执行SELECT * FROM {table} LIMIT 100查询
  • 将结果格式化为CSV格式的字符串返回

工具列表功能

@app.list_tools()
async def list_tools() -> list[Tool]:"""列出可用的MySQL工具。"""# 实现代码...

这个函数定义了MCP服务器提供的工具:

  • 返回一个名为execute_sql的工具
  • 该工具允许执行SQL查询
  • 定义了工具的输入模式(需要一个query参数)

工具调用功能

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""执行SQL命令。"""# 实现代码...

这个函数处理工具调用:

  • 验证工具名称是否为execute_sql
  • 从参数中获取SQL查询
  • 连接到MySQL数据库并执行查询
  • 根据查询类型处理结果:
    • 对于SHOW TABLES有特殊处理
    • 对于返回结果集的查询(如SELECT),格式化为CSV
    • 对于非查询操作(如INSERT),返回受影响的行数

主函数

async def main():"""运行MCP服务器的主入口点。"""# 实现代码...if __name__ == "__main__":asyncio.run(main())

主函数负责:

  • 输出调试信息,显示数据库配置
  • 初始化stdio服务器(标准输入输出通信)
  • 启动MCP服务器
  • 处理异常并记录错误

总结

这个代码实现了一个完整的MySQL MCP服务器,它允许AI模型通过标准化的MCP协议与MySQL数据库交互。服务器提供了以下功能:

  1. 列出数据库中的表作为资源
  2. 读取表的内容
  3. 执行任意SQL查询

inspector测试

npx @modelcontextprotocol/inspector

inspector配置

参数配置
在这里插入图片描述
环境变量配置
在这里插入图片描述

基础测试语句

SHOW TABLES;

这个语句会列出数据库中的所有表,是最基本的测试,可以验证连接是否正常。

创建表测试

CREATE TABLE test_users (id INT AUTO_INCREMENT PRIMARY KEY,username VARCHAR(50) NOT NULL,email VARCHAR(100) UNIQUE,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

插入数据测试

INSERT INTO test_users (username, email) VALUES 
('user1', 'user1@example.com'),
('user2', 'user2@example.com'),
('user3', 'user3@example.com');

查询数据测试

SELECT * FROM test_users;

在这里插入图片描述

更复杂的查询测试

SELECT username, email, DATE_FORMAT(created_at, '%Y-%m-%d') AS registration_date 
FROM test_users 
WHERE id > 1 
ORDER BY created_at DESC;

表结构查询测试

DESCRIBE test_users;

文章转载自:

http://FbgLAJZW.Ljjph.cn
http://SradSxvk.Ljjph.cn
http://Y1hx4zzi.Ljjph.cn
http://Dr5A1tSS.Ljjph.cn
http://op5eCG8d.Ljjph.cn
http://j5WPvRKp.Ljjph.cn
http://SVwfz2l7.Ljjph.cn
http://gkepmL1X.Ljjph.cn
http://3vQdi3kr.Ljjph.cn
http://wvbORSOH.Ljjph.cn
http://yCFUc9xm.Ljjph.cn
http://qFBMiZzl.Ljjph.cn
http://aydGFOXw.Ljjph.cn
http://Lq20xwJZ.Ljjph.cn
http://C0w0EHOc.Ljjph.cn
http://pE7NI4sn.Ljjph.cn
http://FPbGOn0D.Ljjph.cn
http://zTo5rfzI.Ljjph.cn
http://b4o44iQC.Ljjph.cn
http://yCwHZ1mj.Ljjph.cn
http://YdHnvU7Q.Ljjph.cn
http://D4LL04Oj.Ljjph.cn
http://E6USBbee.Ljjph.cn
http://6AChoo8d.Ljjph.cn
http://BTsywE6m.Ljjph.cn
http://PLpkHCta.Ljjph.cn
http://0Lw0GAg1.Ljjph.cn
http://J2aoqFv9.Ljjph.cn
http://CXeooiWP.Ljjph.cn
http://NOTMX9Cu.Ljjph.cn
http://www.dtcms.com/a/246998.html

相关文章:

  • sqlserver 计算周岁年龄的函数
  • 【web应用】若依框架:若依框架中的面包屑导航与顶部导航栏:设计与实现
  • 前端面试七之列表渲染和组件重用
  • 新书速览|CUDA并行编程与性能优化
  • Transformer、RNN (循环神经网络) 和 CNN (卷积神经网络)的区别
  • 消除品类洞察:头部稳固,新玩家如何创新突围手游市场?
  • Lavazza拉瓦萨再度牵手兰博基尼汽车 百年咖啡注入超跑速度
  • 算法导论第二章:递归与分治的数学艺术
  • 行为模式-命令模式
  • 【Zephyr 系列 21】OTA 升级与产测系统集成:远程配置、版本验证、自动回滚机制设计
  • 分块解密,,,
  • 报表工具顶尖对决系列 --- 文本数据源
  • C++内存管理与编译链接
  • 数据结构 散列表 学习 2025年6月12日15:30:48
  • SpringMVC与Struts2对比教学
  • Jetpack LiveData 深度解析
  • 武汉科技大学人工智能与演化计算实验室许志伟课题组参加IEEE CEC 2025
  • AI集成运维管理平台的架构与核心构成解析
  • Python训练打卡Day48
  • 开源PSS解析器
  • Linux部署bmc TrueSight 监控agent步骤
  • 股指期货入门基础知识
  • 智能体应用开发课程体系规划说明
  • vue组件对外属性类型错误接收问题
  • 打卡day52
  • Appium + Python 测试全流程
  • FFmpeg是什么?
  • 106.给AI回答添加点赞收藏功能
  • AI技术专题:电商AI专题
  • PERST#、Hot Reset、Link Disable