当前位置: 首页 > news >正文

python中单例模式应用

数据库连接池单例模式
 

1. 为什么使用单例模式

创建数据库连接是一个昂贵的过程(涉及网络通信、认证等)。单例模式的连接池可以在程序启动时初始化一组连接,并在整个生命周期中重用这些连接,而不是每次请求都新建连接。同时还可以控制连接数量,防止资源耗尽。


2.代码

下面这段代码实现了一个数据库连接池,并且通过单例模式确保整个程序中只有一个连接池实例,包含扩容机制。避免频繁创建和销毁数据库连接,通过单例复用连接池:

通过单例模式确保全局只有一个连接池实例。
使用线程锁和条件变量实现线程安全,处理多线程环境下的并发访问。
提供连接的创建、获取、归还和关闭功能。
通过多线程测试验证连接池的并发性能。

import pymysql
import threading
from pymysql import Error


class DatabasePool:
    _instance = None

    def __new__(cls):
        if not cls._instance:
            cls._instance = super().__new__(cls)
            # 数据库配置
            cls._instance.config = {
                'host': 'localhost',
                'user': 'root',
                'password': '111111',
                'database': 'test',
                'charset': 'utf8mb4',
                'cursorclass': pymysql.cursors.DictCursor
            }
            # 使用可重入锁和条件变量
            cls._instance.lock = threading.RLock()
            cls._instance.condition = threading.Condition(cls._instance.lock)
            cls._instance.max_connections = 10
            cls._instance.connections = []  # 未使用的连接
            cls._instance.in_use = set()   # 正在使用的连接
            cls._instance.init_pool()
        return cls._instance    # 返回唯一的连接池实例

    def init_pool(self):
        """初始化连接池(线程安全)"""
        with self.lock:
            for _ in range(self.max_connections):
                self.add_connection()

    def create_connection(self):
        """创建单个数据库连接(无需加锁)"""
        try:
            conn = pymysql.connect(**self.config)
            print(f"成功创建连接:{conn._sock.getsockname()}")
            return conn
        except Error as e:
            print(f"连接创建失败: {e}")
            return None

    def add_connection(self):
        """向连接池添加连接(线程安全)"""
        if len(self.connections) + len(self.in_use) < self.max_connections:
            conn = self.create_connection()
            if conn:
                self.connections.append(conn)

    def get_connection(self):
        """获取连接(线程安全)"""
        with self.condition:
            while not self.connections:
                if len(self.in_use) < self.max_connections:
                    self.add_connection()
                else:
                    print("连接池已满,等待连接归还...")
                    self.condition.wait()  # 等待连接归还
            conn = self.connections.pop()
            self.in_use.add(conn)
            print(f"获取连接:{conn._sock.getsockname()}")
            return conn

    def release_connection(self, conn):
        """归还连接(线程安全)"""
        with self.condition:
            if conn in self.in_use:
                self.in_use.remove(conn)
                if conn.open:
                    self.connections.append(conn)
                    print(f"归还连接:{conn._sock.getsockname()}")
                    self.condition.notify()  # 通知等待的线程
                else:
                    print("警告:连接已关闭,直接丢弃")

    def close_pool(self):
        """关闭所有连接(线程安全)"""
        with self.lock:
            for conn in self.connections + list(self.in_use):
                if conn.open:
                    conn.close()
            self.connections.clear()
            self.in_use.clear()
            print("所有数据库连接已关闭")

# 多线程测试示例
if __name__ == "__main__":
    import concurrent.futures

    def worker(thread_id):
        pool = DatabasePool()
        conn = pool.get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute("SELECT SLEEP(1)")  # 模拟耗时操作
                print(f"线程 {thread_id} 执行查询")
        finally:
            pool.release_connection(conn)

    # 创建连接池
    pool = DatabasePool()

    # 使用15个线程并发测试,超过最大连接数
    with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
        futures = [executor.submit(worker, i) for i in range(15)]
        for future in concurrent.futures.as_completed(futures):
            future.result()

    pool.close_pool()

3. 核心:__new__ 方法

class DatabasePool:
    _instance = None  # 类变量,用于保存唯一的实例

    def __new__(cls):
        if not cls._instance:
            cls._instance = super().__new__(cls)
            # 数据库配置
            cls._instance.config = {
                'host': 'localhost',
                'user': 'root',
                'password': '111111',
                'database': 'test',
                'charset': 'utf8mb4',
                'cursorclass': pymysql.cursors.DictCursor
            }
            # 使用可重入锁和条件变量
            cls._instance.lock = threading.RLock()
            cls._instance.condition = threading.Condition(cls._instance.lock)
            cls._instance.max_connections = 10
            cls._instance.connections = []  # 未使用的连接
            cls._instance.in_use = set()   # 正在使用的连接
            cls._instance.init_pool()
        return cls._instance    # 返回唯一的连接池实例
  • 作用:确保无论创建多少次 DatabasePool(),都只会生成同一个实例
  • 示例
    pool1 = DatabasePool()  # 第一次创建,初始化连接池
    pool2 = DatabasePool()  # 直接返回 pool1 的实例
    print(pool1 is pool2)   # 输出 True

3. 总结

我们可以把连接池想象成一个共享铅笔盒(全班共享)

  • :管理员(锁)确保一次只有一个人能拿铅笔。
  • 连接:铅笔盒里的铅笔(初始10支)。
  • 动态扩容:当铅笔用完时,管理员临时制作新铅笔。
  • 归还机制:用完后必须归还,否则其他人无法使用。

相关文章:

  • DeepSeek掘金——DeepSeek R1驱动的PDF机器人
  • Linux:进程概念
  • 知识库功能测试难点
  • Windows PicPick Professional-v7.3.2-中文版
  • 植物大战僵尸金铲铲版 v1.1.6(windows+安卓)
  • DeepSeek使用操作指南:开发人员实战手册
  • WPF10-绑定属性
  • 近似最近邻(ANN)算法库实战
  • 从统计学视角看机器学习的训练与推理
  • 文字滚动效果组件和按钮组件
  • 第一章 “流程引擎启蒙课”
  • AI 驱动的智慧大脑:打造企业动态知识库,开启高效管理新时代
  • C语言-7.函数
  • AI工具导航平台功能模块之混合分类器功能说明文档
  • 2024年12月中国电子学会青少年软件编程(Python)等级考试试卷(三级)答案 + 解析
  • 一、对iic类模块分析与使用
  • Matlab 大量接单
  • 初出茅庐的小李博客之按键驱动库使用
  • 论文写作指南
  • 自一致性(Self-Consistency)方法:通过多数投票提升模型生成质量(代码实现)
  • 深圳兼职做网站/电商培训机构推荐
  • 做任务用手机号登录网站/品牌推广
  • 自己网站视频直播怎么做/广东东莞疫情最新消息
  • 建设马克思主义学院网站/中国企业500强排行榜
  • 深圳做二维码网站建设/天津的网络优化公司排名
  • 珠海网站建设公司有哪些/互联网营销怎么赚钱