当前位置: 首页 > news >正文

【AI总结】python连接MySQL(5)- 高级数据库配置与连接器设计

目录

  • 高级数据库配置与连接器设计
  • 一、优化后的配置架构设计
    • 目录结构
  • 二、基础配置与连接器设计
    • 1. 基础配置类
    • 2. 基础连接器类
  • 三、MySQL专用配置与连接器
    • 1. MySQL配置类
    • 2. MySQL连接器
  • 四、其他数据库配置与连接器
    • 1. MongoDB配置与连接器
    • 2. Redis配置与连接器
    • 3. DuckDB配置与连接器
  • 五、业务层统一使用接口
    • 数据库管理器
    • 业务服务使用示例
  • 六、架构优势与最佳实践
    • 设计优势
    • 命名规范建议
    • 最佳实践

高级数据库配置与连接器设计

摘要:本文将展示一个基于继承的数据库配置架构,通过基础配置类和专用连接器实现多数据库的统一管理,解决配置复用和命名一致性问题。


一、优化后的配置架构设计

目录结构

project_root/
├── configs/  
│   ├── base_config.py      # 基础配置类  
│   ├── mysql_config.py     # MySQL专用配置  
│   ├── mongo_config.py     # MongoDB专用配置  
│   ├── redis_config.py     # Redis专用配置  
│   └── duckdb_config.py    # DuckDB专用配置  
├── connectors/  
│   ├── base_connector.py   # 连接器基类  
│   ├── mysql_connector.py  # MySQL连接器  
│   ├── mongo_connector.py  # MongoDB连接器  
│   ├── redis_connector.py  # Redis连接器  
│   └── duckdb_connector.py # DuckDB连接器  
└── services/  └── ...                 # 业务服务  

二、基础配置与连接器设计

1. 基础配置类

# configs/base_config.py
class BaseDBConfig:"""数据库基础配置"""def __init__(self):# 通用连接参数self.timeout = 10  # 秒self.max_retries = 3self.auto_reconnect = True# 连接池参数self.pool_enabled = Trueself.pool_size = 5self.pool_timeout = 30  # 秒# 监控参数self.monitor_enabled = Trueself.monitor_interval = 60  # 秒def get_connection_params(self) -> dict:"""获取连接参数(子类必须实现)"""raise NotImplementedError("子类必须实现此方法")

2. 基础连接器类

# connectors/base_connector.py
from abc import ABC, abstractmethod
import logging
from configs.base_config import BaseDBConfiglogger = logging.getLogger(__name__)class BaseDBConnector(ABC):"""数据库连接器基类"""def __init__(self, config: BaseDBConfig):"""初始化连接器:param config: 数据库配置实例"""self.config = configself._connection = Noneself._connections = {}  # 多实例连接缓存logger.info(f"初始化 {self.__class__.__name__} 连接器")@abstractmethoddef connect(self, config_name: str = "default"):"""建立数据库连接(子类实现)"""pass@abstractmethoddef close(self, config_name: str = "default"):"""关闭数据库连接(子类实现)"""pass@abstractmethoddef execute_query(self, query: str, params=None, config_name: str = "default"):"""执行查询操作(子类实现)"""pass@abstractmethoddef execute_command(self, command: str, params=None, config_name: str = "default"):"""执行写操作(子类实现)"""passdef health_check(self, config_name: str = "default") -> bool:"""健康检查(默认实现,子类可覆盖)"""try:self.connect(config_name)return Trueexcept Exception as e:logger.error(f"数据库健康检查失败: {e}")return Falsefinally:self.close(config_name)def __enter__(self):"""上下文管理器支持"""self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):"""退出时自动关闭连接"""self.close()

三、MySQL专用配置与连接器

1. MySQL配置类

# configs/mysql_config.py
from configs.base_config import BaseDBConfigclass MySQLConfig(BaseDBConfig):"""MySQL数据库专用配置"""def __init__(self):super().__init__()# MySQL特有参数self.charset = "utf8mb4"self.autocommit = Falseself.client_flag = 0self.ssl_disabled = Falsedef get_connection_params(self) -> dict:"""获取MySQL连接参数"""return {"host": self.host,"port": self.port,"user": self.user,"password": self.password,"database": self.database,"charset": self.charset,"autocommit": self.autocommit,"ssl_disabled": self.ssl_disabled}# 具体数据库实例配置
class MainMySQLConfig(MySQLConfig):"""主业务数据库配置"""def __init__(self):super().__init__()self.host = "db-main.example.com"self.port = 3306self.user = "app_user"self.password = "secure_pass"self.database = "main_db"self.pool_size = 10  # 更大的连接池class LogMySQLConfig(MySQLConfig):"""日志数据库配置"""def __init__(self):super().__init__()self.host = "log-db.example.com"self.port = 3307self.user = "log_user"self.password = "log_pass"self.database = "app_logs"self.autocommit = True  # 日志自动提交

2. MySQL连接器

# connectors/mysql_connector.py
import mysql.connector
from mysql.connector import pooling, Error
from configs.mysql_config import MySQLConfig
from connectors.base_connector import BaseDBConnectorclass MySQLConnector(BaseDBConnector):"""MySQL数据库连接器"""def __init__(self, config: MySQLConfig):super().__init__(config)self._pools = {}  # 连接池缓存def connect(self, config_name: str = "default"):"""建立数据库连接"""if config_name not in self._pools:# 创建新连接池params = self.config.get_connection_params()pool_config = {"pool_name": f"{config_name}_pool","pool_size": self.config.pool_size,**params}try:self._pools[config_name] = pooling.MySQLConnectionPool(**pool_config)self._connections[config_name] = self._pools[config_name].get_connection()except Error as e:raise ConnectionError(f"MySQL连接失败: {e}")return self._connections[config_name]def close(self, config_name: str = "default"):"""关闭数据库连接"""if config_name in self._connections:conn = self._connections.pop(config_name)conn.close()def execute_query(self, query: str, params=None, config_name: str = "default"):"""执行查询操作"""conn = self.connect(config_name)try:with conn.cursor(dictionary=True) as cursor:cursor.execute(query, params)return cursor.fetchall()except Error as e:raise RuntimeError(f"查询执行失败: {e}")def execute_command(self, command: str, params=None, config_name: str = "default"):"""执行写操作"""conn = self.connect(config_name)try:with conn.cursor() as cursor:cursor.execute(command, params)affected = cursor.rowcountif not self.config.autocommit:conn.commit()return affectedexcept Error as e:conn.rollback()raise RuntimeError(f"命令执行失败: {e}")

四、其他数据库配置与连接器

1. MongoDB配置与连接器

# configs/mongo_config.py
from configs.base_config import BaseDBConfigclass MongoDBConfig(BaseDBConfig):"""MongoDB专用配置"""def __init__(self):super().__init__()self.uri = ""self.db_name = ""self.tls = Falseself.replica_set = ""def get_connection_params(self) -> dict:return {"host": self.uri,"tls": self.tls,"replicaSet": self.replica_set}class UserProfileMongoConfig(MongoDBConfig):"""用户档案数据库配置"""def __init__(self):super().__init__()self.uri = "mongodb://user:pass@cluster0.example.com:27017"self.db_name = "user_profiles"self.tls = True
# connectors/mongo_connector.py
from pymongo import MongoClient
from configs.mongo_config import MongoDBConfig
from connectors.base_connector import BaseDBConnectorclass MongoDBConnector(BaseDBConnector):"""MongoDB数据库连接器"""def __init__(self, config: MongoDBConfig):super().__init__(config)self._clients = {}  # 客户端缓存def connect(self, config_name: str = "default"):"""建立数据库连接"""if config_name not in self._clients:params = self.config.get_connection_params()try:self._clients[config_name] = MongoClient(**params)self._connections[config_name] = self._clients[config_name][self.config.db_name]except Exception as e:raise ConnectionError(f"MongoDB连接失败: {e}")return self._connections[config_name]def get_collection(self, collection_name: str, config_name: str = "default"):"""获取集合对象"""db = self.connect(config_name)return db[collection_name]# MongoDB不需要传统SQL查询方法def execute_query(self, query: dict, collection_name: str, config_name: str = "default"):"""执行MongoDB查询"""collection = self.get_collection(collection_name, config_name)return list(collection.find(query))def execute_command(self, command: dict, collection_name: str, config_name: str = "default"):"""执行MongoDB写操作"""# 这里简化处理,实际需要区分insert/update/deletecollection = self.get_collection(collection_name, config_name)if command.get("insert"):return collection.insert_many(command["data"])elif command.get("update"):return collection.update_many(command["filter"], command["update"])elif command.get("delete"):return collection.delete_many(command["filter"])else:raise ValueError("无效的MongoDB命令")

2. Redis配置与连接器

# configs/redis_config.py
from configs.base_config import BaseDBConfigclass RedisConfig(BaseDBConfig):"""Redis专用配置"""def __init__(self):super().__init__()self.host = "localhost"self.port = 6379self.db = 0self.decode_responses = Trueself.ssl = Falsedef get_connection_params(self) -> dict:return {"host": self.host,"port": self.port,"db": self.db,"decode_responses": self.decode_responses,"ssl": self.ssl}class SessionRedisConfig(RedisConfig):"""会话缓存配置"""def __init__(self):super().__init__()self.host = "cache.example.com"self.db = 1self.decode_responses = False  # 原始字节数据
# connectors/redis_connector.py
import redis
from configs.redis_config import RedisConfig
from connectors.base_connector import BaseDBConnectorclass RedisConnector(BaseDBConnector):"""Redis数据库连接器"""def __init__(self, config: RedisConfig):super().__init__(config)def connect(self, config_name: str = "default"):"""建立Redis连接"""if config_name not in self._connections:params = self.config.get_connection_params()try:self._connections[config_name] = redis.Redis(**params)# 测试连接self._connections[config_name].ping()except Exception as e:raise ConnectionError(f"Redis连接失败: {e}")return self._connections[config_name]# Redis不需要传统SQL查询方法def execute_query(self, key: str, config_name: str = "default"):"""获取键值"""conn = self.connect(config_name)return conn.get(key)def execute_command(self, command: str, *args, config_name: str = "default"):"""执行Redis命令"""conn = self.connect(config_name)return conn.execute_command(command, *args)

3. DuckDB配置与连接器

# configs/duckdb_config.py
from configs.base_config import BaseDBConfigclass DuckDBConfig(BaseDBConfig):"""DuckDB专用配置"""def __init__(self):super().__init__()self.path = ":memory:"  # 默认内存数据库self.read_only = Falseself.config = {}  # 额外配置def get_connection_params(self) -> dict:return {"database": self.path,"read_only": self.read_only,"config": self.config}class AnalyticsDuckDBConfig(DuckDBConfig):"""分析数据库配置"""def __init__(self):super().__init__()self.path = "/data/analytics.duckdb"self.config = {"threads": 8}  # 使用8线程
# connectors/duckdb_connector.py
import duckdb
from configs.duckdb_config import DuckDBConfig
from connectors.base_connector import BaseDBConnectorclass DuckDBConnector(BaseDBConnector):"""DuckDB数据库连接器"""def __init__(self, config: DuckDBConfig):super().__init__(config)def connect(self, config_name: str = "default"):"""建立DuckDB连接"""if config_name not in self._connections:params = self.config.get_connection_params()try:self._connections[config_name] = duckdb.connect(**params)# 应用额外配置for key, value in params["config"].items():self._connections[config_name].execute(f"SET {key} = {value}")except Exception as e:raise ConnectionError(f"DuckDB连接失败: {e}")return self._connections[config_name]def execute_query(self, query: str, config_name: str = "default"):"""执行查询返回DataFrame"""conn = self.connect(config_name)return conn.execute(query).fetchdf()def execute_command(self, command: str, config_name: str = "default"):"""执行DDL/DML语句"""conn = self.connect(config_name)return conn.execute(command)

五、业务层统一使用接口

数据库管理器

# connectors/db_manager.py
from configs import mysql_config, mongo_config, redis_config, duckdb_config
from connectors import mysql_connector, mongo_connector, redis_connector, duckdb_connectorclass DBManager:"""多数据库统一管理"""_instances = {}  # 连接器实例缓存# 配置实例MYSQL_MAIN = mysql_config.MainMySQLConfig()MYSQL_LOG = mysql_config.LogMySQLConfig()MONGO_PROFILE = mongo_config.UserProfileMongoConfig()REDIS_SESSION = redis_config.SessionRedisConfig()DUCKDB_ANALYTICS = duckdb_config.AnalyticsDuckDBConfig()@classmethoddef get_mysql(cls, config: MySQLConfig = MYSQL_MAIN) -> mysql_connector.MySQLConnector:"""获取MySQL连接器"""key = f"mysql_{id(config)}"if key not in cls._instances:cls._instances[key] = mysql_connector.MySQLConnector(config)return cls._instances[key]@classmethoddef get_mongo(cls, config: MongoDBConfig = MONGO_PROFILE) -> mongo_connector.MongoDBConnector:"""获取MongoDB连接器"""key = f"mongo_{id(config)}"if key not in cls._instances:cls._instances[key] = mongo_connector.MongoDBConnector(config)return cls._instances[key]@classmethoddef get_redis(cls, config: RedisConfig = REDIS_SESSION) -> redis_connector.RedisConnector:"""获取Redis连接器"""key = f"redis_{id(config)}"if key not in cls._instances:cls._instances[key] = redis_connector.RedisConnector(config)return cls._instances[key]@classmethoddef get_duckdb(cls, config: DuckDBConfig = DUCKDB_ANALYTICS) -> duckdb_connector.DuckDBConnector:"""获取DuckDB连接器"""key = f"duckdb_{id(config)}"if key not in cls._instances:cls._instances[key] = duckdb_connector.DuckDBConnector(config)return cls._instances[key]

业务服务使用示例

# services/user_service.py
from connectors.db_manager import DBManagerclass UserService:"""用户服务 - 多数据库操作"""def __init__(self):# 初始化各数据库连接器self.mysql = DBManager.get_mysql()self.mongo = DBManager.get_mongo()self.redis = DBManager.get_redis()def get_user_data(self, user_id: int):"""获取用户完整数据"""# MySQL获取基本信息sql = "SELECT * FROM users WHERE id = %s"user_info = self.mysql.execute_query(sql, (user_id,))[0]# MongoDB获取档案user_profile = self.mongo.execute_query({"user_id": user_id}, collection_name="profiles")# Redis获取会话状态session_state = self.redis.execute_query(f"session:{user_id}")return {"info": user_info,"profile": user_profile,"session": session_state}def update_user(self, user_id: int, data: dict):"""更新用户信息"""# MySQL更新基本信息update_sql = """UPDATE users SET name = %s, email = %s WHERE id = %s"""self.mysql.execute_command(update_sql, (data["name"], data["email"], user_id))# MongoDB更新档案self.mongo.execute_command({"update": True},collection_name="profiles",command={"filter": {"user_id": user_id},"update": {"$set": data["profile"]}})# Redis更新缓存self.redis.execute_command("SET", f"user:{user_id}", data["cache_data"], "EX", 3600)

六、架构优势与最佳实践

设计优势

  1. 统一接口:所有数据库连接器继承自同一基类,提供一致API
  2. 配置继承:基础配置复用,专用配置扩展
  3. 类型安全:强类型配置和连接器,减少错误
  4. 连接复用:管理器确保连接器单例
  5. 多实例支持:同一数据库类型支持多个配置实例
  6. 开箱即用:预置主流数据库实现
  7. 可扩展性:轻松添加新数据库支持

命名规范建议

  1. 配置类[数据库类型]Config + [用途]Config
    (e.g., MySQLConfig, MainMySQLConfig)

  2. 连接器类[数据库类型]Connector
    (e.g., MySQLConnector, MongoDBConnector)

  3. 配置实例:全大写描述性名称
    (e.g., MYSQL_MAIN, REDIS_SESSION)

  4. 方法命名

    • connect(): 建立连接
    • close(): 关闭连接
    • execute_query(): 执行查询
    • execute_command(): 执行写操作

最佳实践

  1. 环境区分:使用环境变量切换配置

    class MainMySQLConfig(MySQLConfig):def __init__(self):super().__init__()env = os.getenv("ENV", "dev")self.host = f"db-{env}.example.com"
    
  2. 配置加密:敏感数据加密存储

    from cryptography.fernet import Fernetclass SecureConfig:@staticmethoddef decrypt(encrypted: str) -> str:cipher = Fernet(os.getenv("CONFIG_KEY"))return cipher.decrypt(encrypted.encode()).decode()# 在配置中使用
    self.password = SecureConfig.decrypt(ENCRYPTED_PASSWORD)
    
  3. 连接监控:定期检查连接健康

    import threading
    import timedef monitor_connections():while True:for name, connector in DBManager._instances.items():status = connector.health_check()print(f"{name} 状态: {'OK' if status else 'FAIL'}")time.sleep(DBManager.MYSQL_MAIN.monitor_interval)threading.Thread(target=monitor_connections, daemon=True).start()
    
  4. 资源清理:应用退出时关闭所有连接

    import atexit@atexit.register
    def cleanup():for connector in DBManager._instances.values():connector.close_all()
    

结语:通过这种基于继承的配置架构和统一命名的连接器设计,我们实现了多数据库的高效管理。这种设计既保持了配置的灵活性,又提供了代码的一致性,是中型到大型项目的理想选择。

扩展方向

  1. 添加异步IO支持
  2. 集成分布式配置中心
  3. 实现数据库迁移工具
  4. 添加ORM集成层
  5. 开发Web管理界面

安全提示

  • 生产环境禁用默认配置
  • 定期轮换数据库凭证
  • 使用网络隔离限制数据库访问
  • 加密敏感配置数据
  • 实施最小权限原则

关键词:Python数据库架构多数据库管理配置继承模式数据库连接器设计企业级Python开发

http://www.dtcms.com/a/318294.html

相关文章:

  • go语言变量2
  • 开疆智能ModbusTCP转Profinet网关连接安川YRC1000机器人配置案例
  • 嵌入式处理器指令系统:精简指令集RISC与复杂指令集CISC的简介,及区别
  • Cervantes:面向渗透测试人员和红队的开源协作平台
  • 勇芳字体查看器 v1.0 免费版
  • 当前就业形势下,软件测试工程师职业发展与自我提升的必要性
  • Kubesphere搜索镜像问题
  • 深度解析|资源位管理工具如何重构媒体商业化效率?
  • 飞书对接E签宝完整方案
  • AI浪潮下,FPGA如何实现自我重塑与行业变革
  • 动态代理常用的两种方式?
  • 开发教育全链路管理系统 + 微信小程序,为各类教育主体注入数字化动力!
  • LeetCode 面试经典 150_数组/字符串_O(1)时间插入、删除和获取随机元素(12_380_C++_中等)(哈希表)
  • Conda虚拟环境安装包
  • 信号处理:信号产生
  • 2025年WiFi技术白皮书:全球物联网无线通信的关键创新
  • Codeforces Round 987 (Div. 2)
  • [特殊字符]海尔考察游学 | 解码人才培养秘籍
  • 长时间面对电脑屏幕需要使用防晒霜吗?
  • js中的 slice、splice、split、substring、substr
  • 面试题:使用class类来写工个五子棋
  • spring-dubbo
  • Post-train 入门(1):SFT / DPO / Online RL 概念理解和分类
  • C++与C语言实现Stack的对比分析
  • sqli-labs通关笔记-第34关POST宽字符注入(单引号闭合 手工注入+脚本注入两种方法)
  • Verilog 仿真问题:打拍失败
  • FPGA学习笔记——VGA简介
  • Excel单元格设置下拉框、选项背景
  • 20250806给PRO-RK3566开发板在Buildroot系统下扩大rootfs分区2GB
  • 实习文档背诵