【AI总结】python连接MySQL(5)- 高级数据库配置与连接器设计
目录
- 高级数据库配置与连接器设计
- 一、优化后的配置架构设计
- 目录结构
- 二、基础配置与连接器设计
- 1. 基础配置类
- 2. 基础连接器类
- 三、MySQL专用配置与连接器
- 1. MySQL配置类
- 2. MySQL连接器
- 四、其他数据库配置与连接器
- 1. MongoDB配置与连接器
- 2. Redis配置与连接器
- 3. DuckDB配置与连接器
- 五、业务层统一使用接口
- 数据库管理器
- 业务服务使用示例
- 六、架构优势与最佳实践
- 设计优势
- 命名规范建议
- 最佳实践
高级数据库配置与连接器设计
摘要:本文将展示一个基于继承的数据库配置架构,通过基础配置类和专用连接器实现多数据库的统一管理,解决配置复用和命名一致性问题。
一、优化后的配置架构设计
目录结构
project_root/
├── configs/
│ ├── base_config.py # 基础配置类
│ ├── mysql_config.py # MySQL专用配置
│ ├── mongo_config.py # MongoDB专用配置
│ ├── redis_config.py # Redis专用配置
│ └── duckdb_config.py # DuckDB专用配置
├── connectors/
│ ├── base_connector.py # 连接器基类
│ ├── mysql_connector.py # MySQL连接器
│ ├── mongo_connector.py # MongoDB连接器
│ ├── redis_connector.py # Redis连接器
│ └── duckdb_connector.py # DuckDB连接器
└── services/ └── ... # 业务服务
二、基础配置与连接器设计
1. 基础配置类
# configs/base_config.py
class BaseDBConfig:"""数据库基础配置"""def __init__(self):# 通用连接参数self.timeout = 10 # 秒self.max_retries = 3self.auto_reconnect = True# 连接池参数self.pool_enabled = Trueself.pool_size = 5self.pool_timeout = 30 # 秒# 监控参数self.monitor_enabled = Trueself.monitor_interval = 60 # 秒def get_connection_params(self) -> dict:"""获取连接参数(子类必须实现)"""raise NotImplementedError("子类必须实现此方法")
2. 基础连接器类
# connectors/base_connector.py
from abc import ABC, abstractmethod
import logging
from configs.base_config import BaseDBConfiglogger = logging.getLogger(__name__)class BaseDBConnector(ABC):"""数据库连接器基类"""def __init__(self, config: BaseDBConfig):"""初始化连接器:param config: 数据库配置实例"""self.config = configself._connection = Noneself._connections = {} # 多实例连接缓存logger.info(f"初始化 {self.__class__.__name__} 连接器")@abstractmethoddef connect(self, config_name: str = "default"):"""建立数据库连接(子类实现)"""pass@abstractmethoddef close(self, config_name: str = "default"):"""关闭数据库连接(子类实现)"""pass@abstractmethoddef execute_query(self, query: str, params=None, config_name: str = "default"):"""执行查询操作(子类实现)"""pass@abstractmethoddef execute_command(self, command: str, params=None, config_name: str = "default"):"""执行写操作(子类实现)"""passdef health_check(self, config_name: str = "default") -> bool:"""健康检查(默认实现,子类可覆盖)"""try:self.connect(config_name)return Trueexcept Exception as e:logger.error(f"数据库健康检查失败: {e}")return Falsefinally:self.close(config_name)def __enter__(self):"""上下文管理器支持"""self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):"""退出时自动关闭连接"""self.close()
三、MySQL专用配置与连接器
1. MySQL配置类
# configs/mysql_config.py
from configs.base_config import BaseDBConfigclass MySQLConfig(BaseDBConfig):"""MySQL数据库专用配置"""def __init__(self):super().__init__()# MySQL特有参数self.charset = "utf8mb4"self.autocommit = Falseself.client_flag = 0self.ssl_disabled = Falsedef get_connection_params(self) -> dict:"""获取MySQL连接参数"""return {"host": self.host,"port": self.port,"user": self.user,"password": self.password,"database": self.database,"charset": self.charset,"autocommit": self.autocommit,"ssl_disabled": self.ssl_disabled}# 具体数据库实例配置
class MainMySQLConfig(MySQLConfig):"""主业务数据库配置"""def __init__(self):super().__init__()self.host = "db-main.example.com"self.port = 3306self.user = "app_user"self.password = "secure_pass"self.database = "main_db"self.pool_size = 10 # 更大的连接池class LogMySQLConfig(MySQLConfig):"""日志数据库配置"""def __init__(self):super().__init__()self.host = "log-db.example.com"self.port = 3307self.user = "log_user"self.password = "log_pass"self.database = "app_logs"self.autocommit = True # 日志自动提交
2. MySQL连接器
# connectors/mysql_connector.py
import mysql.connector
from mysql.connector import pooling, Error
from configs.mysql_config import MySQLConfig
from connectors.base_connector import BaseDBConnectorclass MySQLConnector(BaseDBConnector):"""MySQL数据库连接器"""def __init__(self, config: MySQLConfig):super().__init__(config)self._pools = {} # 连接池缓存def connect(self, config_name: str = "default"):"""建立数据库连接"""if config_name not in self._pools:# 创建新连接池params = self.config.get_connection_params()pool_config = {"pool_name": f"{config_name}_pool","pool_size": self.config.pool_size,**params}try:self._pools[config_name] = pooling.MySQLConnectionPool(**pool_config)self._connections[config_name] = self._pools[config_name].get_connection()except Error as e:raise ConnectionError(f"MySQL连接失败: {e}")return self._connections[config_name]def close(self, config_name: str = "default"):"""关闭数据库连接"""if config_name in self._connections:conn = self._connections.pop(config_name)conn.close()def execute_query(self, query: str, params=None, config_name: str = "default"):"""执行查询操作"""conn = self.connect(config_name)try:with conn.cursor(dictionary=True) as cursor:cursor.execute(query, params)return cursor.fetchall()except Error as e:raise RuntimeError(f"查询执行失败: {e}")def execute_command(self, command: str, params=None, config_name: str = "default"):"""执行写操作"""conn = self.connect(config_name)try:with conn.cursor() as cursor:cursor.execute(command, params)affected = cursor.rowcountif not self.config.autocommit:conn.commit()return affectedexcept Error as e:conn.rollback()raise RuntimeError(f"命令执行失败: {e}")
四、其他数据库配置与连接器
1. MongoDB配置与连接器
# configs/mongo_config.py
from configs.base_config import BaseDBConfigclass MongoDBConfig(BaseDBConfig):"""MongoDB专用配置"""def __init__(self):super().__init__()self.uri = ""self.db_name = ""self.tls = Falseself.replica_set = ""def get_connection_params(self) -> dict:return {"host": self.uri,"tls": self.tls,"replicaSet": self.replica_set}class UserProfileMongoConfig(MongoDBConfig):"""用户档案数据库配置"""def __init__(self):super().__init__()self.uri = "mongodb://user:pass@cluster0.example.com:27017"self.db_name = "user_profiles"self.tls = True
# connectors/mongo_connector.py
from pymongo import MongoClient
from configs.mongo_config import MongoDBConfig
from connectors.base_connector import BaseDBConnectorclass MongoDBConnector(BaseDBConnector):"""MongoDB数据库连接器"""def __init__(self, config: MongoDBConfig):super().__init__(config)self._clients = {} # 客户端缓存def connect(self, config_name: str = "default"):"""建立数据库连接"""if config_name not in self._clients:params = self.config.get_connection_params()try:self._clients[config_name] = MongoClient(**params)self._connections[config_name] = self._clients[config_name][self.config.db_name]except Exception as e:raise ConnectionError(f"MongoDB连接失败: {e}")return self._connections[config_name]def get_collection(self, collection_name: str, config_name: str = "default"):"""获取集合对象"""db = self.connect(config_name)return db[collection_name]# MongoDB不需要传统SQL查询方法def execute_query(self, query: dict, collection_name: str, config_name: str = "default"):"""执行MongoDB查询"""collection = self.get_collection(collection_name, config_name)return list(collection.find(query))def execute_command(self, command: dict, collection_name: str, config_name: str = "default"):"""执行MongoDB写操作"""# 这里简化处理,实际需要区分insert/update/deletecollection = self.get_collection(collection_name, config_name)if command.get("insert"):return collection.insert_many(command["data"])elif command.get("update"):return collection.update_many(command["filter"], command["update"])elif command.get("delete"):return collection.delete_many(command["filter"])else:raise ValueError("无效的MongoDB命令")
2. Redis配置与连接器
# configs/redis_config.py
from configs.base_config import BaseDBConfigclass RedisConfig(BaseDBConfig):"""Redis专用配置"""def __init__(self):super().__init__()self.host = "localhost"self.port = 6379self.db = 0self.decode_responses = Trueself.ssl = Falsedef get_connection_params(self) -> dict:return {"host": self.host,"port": self.port,"db": self.db,"decode_responses": self.decode_responses,"ssl": self.ssl}class SessionRedisConfig(RedisConfig):"""会话缓存配置"""def __init__(self):super().__init__()self.host = "cache.example.com"self.db = 1self.decode_responses = False # 原始字节数据
# connectors/redis_connector.py
import redis
from configs.redis_config import RedisConfig
from connectors.base_connector import BaseDBConnectorclass RedisConnector(BaseDBConnector):"""Redis数据库连接器"""def __init__(self, config: RedisConfig):super().__init__(config)def connect(self, config_name: str = "default"):"""建立Redis连接"""if config_name not in self._connections:params = self.config.get_connection_params()try:self._connections[config_name] = redis.Redis(**params)# 测试连接self._connections[config_name].ping()except Exception as e:raise ConnectionError(f"Redis连接失败: {e}")return self._connections[config_name]# Redis不需要传统SQL查询方法def execute_query(self, key: str, config_name: str = "default"):"""获取键值"""conn = self.connect(config_name)return conn.get(key)def execute_command(self, command: str, *args, config_name: str = "default"):"""执行Redis命令"""conn = self.connect(config_name)return conn.execute_command(command, *args)
3. DuckDB配置与连接器
# configs/duckdb_config.py
from configs.base_config import BaseDBConfigclass DuckDBConfig(BaseDBConfig):"""DuckDB专用配置"""def __init__(self):super().__init__()self.path = ":memory:" # 默认内存数据库self.read_only = Falseself.config = {} # 额外配置def get_connection_params(self) -> dict:return {"database": self.path,"read_only": self.read_only,"config": self.config}class AnalyticsDuckDBConfig(DuckDBConfig):"""分析数据库配置"""def __init__(self):super().__init__()self.path = "/data/analytics.duckdb"self.config = {"threads": 8} # 使用8线程
# connectors/duckdb_connector.py
import duckdb
from configs.duckdb_config import DuckDBConfig
from connectors.base_connector import BaseDBConnectorclass DuckDBConnector(BaseDBConnector):"""DuckDB数据库连接器"""def __init__(self, config: DuckDBConfig):super().__init__(config)def connect(self, config_name: str = "default"):"""建立DuckDB连接"""if config_name not in self._connections:params = self.config.get_connection_params()try:self._connections[config_name] = duckdb.connect(**params)# 应用额外配置for key, value in params["config"].items():self._connections[config_name].execute(f"SET {key} = {value}")except Exception as e:raise ConnectionError(f"DuckDB连接失败: {e}")return self._connections[config_name]def execute_query(self, query: str, config_name: str = "default"):"""执行查询返回DataFrame"""conn = self.connect(config_name)return conn.execute(query).fetchdf()def execute_command(self, command: str, config_name: str = "default"):"""执行DDL/DML语句"""conn = self.connect(config_name)return conn.execute(command)
五、业务层统一使用接口
数据库管理器
# connectors/db_manager.py
from configs import mysql_config, mongo_config, redis_config, duckdb_config
from connectors import mysql_connector, mongo_connector, redis_connector, duckdb_connectorclass DBManager:"""多数据库统一管理"""_instances = {} # 连接器实例缓存# 配置实例MYSQL_MAIN = mysql_config.MainMySQLConfig()MYSQL_LOG = mysql_config.LogMySQLConfig()MONGO_PROFILE = mongo_config.UserProfileMongoConfig()REDIS_SESSION = redis_config.SessionRedisConfig()DUCKDB_ANALYTICS = duckdb_config.AnalyticsDuckDBConfig()@classmethoddef get_mysql(cls, config: MySQLConfig = MYSQL_MAIN) -> mysql_connector.MySQLConnector:"""获取MySQL连接器"""key = f"mysql_{id(config)}"if key not in cls._instances:cls._instances[key] = mysql_connector.MySQLConnector(config)return cls._instances[key]@classmethoddef get_mongo(cls, config: MongoDBConfig = MONGO_PROFILE) -> mongo_connector.MongoDBConnector:"""获取MongoDB连接器"""key = f"mongo_{id(config)}"if key not in cls._instances:cls._instances[key] = mongo_connector.MongoDBConnector(config)return cls._instances[key]@classmethoddef get_redis(cls, config: RedisConfig = REDIS_SESSION) -> redis_connector.RedisConnector:"""获取Redis连接器"""key = f"redis_{id(config)}"if key not in cls._instances:cls._instances[key] = redis_connector.RedisConnector(config)return cls._instances[key]@classmethoddef get_duckdb(cls, config: DuckDBConfig = DUCKDB_ANALYTICS) -> duckdb_connector.DuckDBConnector:"""获取DuckDB连接器"""key = f"duckdb_{id(config)}"if key not in cls._instances:cls._instances[key] = duckdb_connector.DuckDBConnector(config)return cls._instances[key]
业务服务使用示例
# services/user_service.py
from connectors.db_manager import DBManagerclass UserService:"""用户服务 - 多数据库操作"""def __init__(self):# 初始化各数据库连接器self.mysql = DBManager.get_mysql()self.mongo = DBManager.get_mongo()self.redis = DBManager.get_redis()def get_user_data(self, user_id: int):"""获取用户完整数据"""# MySQL获取基本信息sql = "SELECT * FROM users WHERE id = %s"user_info = self.mysql.execute_query(sql, (user_id,))[0]# MongoDB获取档案user_profile = self.mongo.execute_query({"user_id": user_id}, collection_name="profiles")# Redis获取会话状态session_state = self.redis.execute_query(f"session:{user_id}")return {"info": user_info,"profile": user_profile,"session": session_state}def update_user(self, user_id: int, data: dict):"""更新用户信息"""# MySQL更新基本信息update_sql = """UPDATE users SET name = %s, email = %s WHERE id = %s"""self.mysql.execute_command(update_sql, (data["name"], data["email"], user_id))# MongoDB更新档案self.mongo.execute_command({"update": True},collection_name="profiles",command={"filter": {"user_id": user_id},"update": {"$set": data["profile"]}})# Redis更新缓存self.redis.execute_command("SET", f"user:{user_id}", data["cache_data"], "EX", 3600)
六、架构优势与最佳实践
设计优势
- 统一接口:所有数据库连接器继承自同一基类,提供一致API
- 配置继承:基础配置复用,专用配置扩展
- 类型安全:强类型配置和连接器,减少错误
- 连接复用:管理器确保连接器单例
- 多实例支持:同一数据库类型支持多个配置实例
- 开箱即用:预置主流数据库实现
- 可扩展性:轻松添加新数据库支持
命名规范建议
-
配置类:
[数据库类型]Config
+[用途]Config
(e.g.,MySQLConfig
,MainMySQLConfig
) -
连接器类:
[数据库类型]Connector
(e.g.,MySQLConnector
,MongoDBConnector
) -
配置实例:全大写描述性名称
(e.g.,MYSQL_MAIN
,REDIS_SESSION
) -
方法命名:
connect()
: 建立连接close()
: 关闭连接execute_query()
: 执行查询execute_command()
: 执行写操作
最佳实践
-
环境区分:使用环境变量切换配置
class MainMySQLConfig(MySQLConfig):def __init__(self):super().__init__()env = os.getenv("ENV", "dev")self.host = f"db-{env}.example.com"
-
配置加密:敏感数据加密存储
from cryptography.fernet import Fernetclass SecureConfig:@staticmethoddef decrypt(encrypted: str) -> str:cipher = Fernet(os.getenv("CONFIG_KEY"))return cipher.decrypt(encrypted.encode()).decode()# 在配置中使用 self.password = SecureConfig.decrypt(ENCRYPTED_PASSWORD)
-
连接监控:定期检查连接健康
import threading import timedef monitor_connections():while True:for name, connector in DBManager._instances.items():status = connector.health_check()print(f"{name} 状态: {'OK' if status else 'FAIL'}")time.sleep(DBManager.MYSQL_MAIN.monitor_interval)threading.Thread(target=monitor_connections, daemon=True).start()
-
资源清理:应用退出时关闭所有连接
import atexit@atexit.register def cleanup():for connector in DBManager._instances.values():connector.close_all()
结语:通过这种基于继承的配置架构和统一命名的连接器设计,我们实现了多数据库的高效管理。这种设计既保持了配置的灵活性,又提供了代码的一致性,是中型到大型项目的理想选择。
扩展方向:
- 添加异步IO支持
- 集成分布式配置中心
- 实现数据库迁移工具
- 添加ORM集成层
- 开发Web管理界面
安全提示:
- 生产环境禁用默认配置
- 定期轮换数据库凭证
- 使用网络隔离限制数据库访问
- 加密敏感配置数据
- 实施最小权限原则
关键词:
Python数据库架构
、多数据库管理
、配置继承模式
、数据库连接器设计
、企业级Python开发