在本地部署mcp服务器实现自然语言操作mysql数据库,轻松实现数据表的增~ 删~ 改~ 查~
1.将写好的mcp_server代码放在本地任意盘!
import asyncio
import logging
import os
import sys
from mysql.connector import connect, Error
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# Configure logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mysql_mcp_server")def get_db_config():"""Get database configuration from environment variables."""config = {"host": os.getenv("MYSQL_HOST", "localhost"),"port": int(os.getenv("MYSQL_PORT", "3306")),"user": os.getenv("MYSQL_USER"),"password": os.getenv("MYSQL_PASSWORD"),"database": os.getenv("MYSQL_DATABASE"),# Add charset and collation to avoid utf8mb4_0900_ai_ci issues with older MySQL versions# These can be overridden via environment variables for specific MySQL versions"charset": os.getenv("MYSQL_CHARSET", "utf8mb4"),"collation": os.getenv("MYSQL_COLLATION", "utf8mb4_unicode_ci"),# Disable autocommit for better transaction control"autocommit": True,# Set SQL mode for better compatibility - can be overridden"sql_mode": os.getenv("MYSQL_SQL_MODE", "TRADITIONAL")}# Remove None values to let MySQL connector use defaults if not specifiedconfig = {k: v for k, v in config.items() if v is not None}if not all([config.get("user"), config.get("password"), config.get("database")]):logger.error("Missing required database configuration. Please check environment variables:")logger.error("MYSQL_USER, MYSQL_PASSWORD, and MYSQL_DATABASE are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("mysql_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List MySQL tables as resources."""config = get_db_config()try:logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")with connect(**config) as conn:logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")with conn.cursor() as cursor:cursor.execute("SHOW TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"mysql://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Error as e:logger.error(f"Failed to list resources: {str(e)}")logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("mysql://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[8:].split('/')table = parts[0]try:logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")with connect(**config) as conn:logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Error as e:logger.error(f"Database error reading resource {uri}: {str(e)}")logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")raise RuntimeError(f"Database error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available MySQL tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the MySQL server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""Execute SQL commands."""config = get_db_config()logger.info(f"Calling tool: {name} with arguments: {arguments}")if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")query = arguments.get("query")if not query:raise ValueError("Query is required")try:logger.info(f"Connecting to MySQL with charset: {config.get('charset')}, collation: {config.get('collation')}")with connect(**config) as conn:logger.info(f"Successfully connected to MySQL server version: {conn.get_server_info()}")with conn.cursor() as cursor:cursor.execute(query)# Special handling for SHOW TABLESif query.strip().upper().startswith("SHOW TABLES"):tables = cursor.fetchall()result = ["Tables_in_" + config["database"]] # Headerresult.extend([table[0] for table in tables])return [TextContent(type="text", text="\n".join(result))]# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)elif cursor.description is not None:columns = [desc[0] for desc in cursor.description]try:rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Error as e:logger.warning(f"Error fetching results: {str(e)}")return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# Non-SELECT querieselse:conn.commit()return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Error as e:logger.error(f"Error executing SQL '{query}': {e}")logger.error(f"Error code: {e.errno}, SQL state: {e.sqlstate}")return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""Main entry point to run the MCP server."""from mcp.server.stdio import stdio_server# Add additional debug outputprint("Starting MySQL MCP server with config:", file=sys.stderr)config = get_db_config()print(f"Host: {config['host']}", file=sys.stderr)print(f"Port: {config['port']}", file=sys.stderr)print(f"User: {config['user']}", file=sys.stderr)print(f"Database: {config['database']}", file=sys.stderr)logger.info("Starting MySQL MCP server...")logger.info(f"Database config: {config['host']}/{config['database']} as {config['user']}")async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())
2.再cherry studio中导入json配置参数
{"mcpServers": {"mysql": {"command": "uv","args": ["--directory","path/to/mysql_mcp_server","run","mysql_mcp_server"],"env": {"MYSQL_HOST": "localhost","MYSQL_PORT": "3306","MYSQL_USER": "your_username","MYSQL_PASSWORD": "your_password","MYSQL_DATABASE": "your_database"}}}
}
其中args中的参数说明:~
“–directory”, # 目录参数
“path/to/mysql_mcp_server.py”, # mcpserver端程序放置绝对路径!
“run”, # uv启动python脚本!
“mysql_mcp_server.py” # mcpserver代码 !
其中env中为本地或者远程数据库配置参数:~
“MYSQL_HOST”: “localhost”, #mysql主机IP
“MYSQL_PORT”: “3306”, #mysql服务端口
“MYSQL_USER”: “your_username”, #数据库用户名
“MYSQL_PASSWORD”: “your_password”, #数据库密码
“MYSQL_DATABASE”: “your_database” #数据库名
3.再cherry studio中新建会话,选择mysql mcp服务器
即可使用自然语言对数据表进行增删改查!