当前位置: 首页 > news >正文

异步MySQL连接池实现

文章目录

  • 一、异步连接池
    • 1.1 设计思想
    • 1.2 future 和 promise
    • 1.3 统一封装
  • 二、模块划分
    • 2.1 异步回调处理
    • 2.2 SQL 操作任务
    • 2.3 MySQL 连接与工作线程
    • 2.4 连接池管理
    • 2.5 工具
  • 三、核心函数
    • 3.1 用户调用示例
    • 3.2 Query() 方法内部流程
    • 3.3 任务对象:SQLOperation
    • 3.4 MySQLWorker工作线程
    • 3.5 MySQLConn::Query方法
    • 3.6 AsyncProcessor实现
    • 3.7 QueryCallback实现
  • 四、核心组件
    • 4.1 BlockingQueue阻塞队列
    • 4.2 MySQLConn连接管理
  • 五、问题与优化

一、异步连接池

1.1 设计思想

MySQL 的 send(发送 SQL 请求)和 recv(接收执行结果)本质上是耗时操作,会阻塞线程。如果主线程直接去等待结果,程序就会卡住。解决办法是使用一个线程池(异步连接池),让这些耗时的数据库操作在后台线程执行,而主线程只需要等待结果即可继续执行其他逻辑。

实现异步连接池的目的是异步执行 SQL,并把耗时的网络或数据库操作放到后台线程去做,把结果在主线程以回调的方式拿回并处理。整体流程为:

  • 每个 MySQL 连接配一个后台工作线程(MySQLWorker),该线程从一个共享的任务队列(BlockingQueue<SQLOperation*>)取任务执行。

  • 查询以 任务(SQLOperation)+承诺(std::promise 的形式提交,连接池线程执行并用 promise 把结果(或异常)交还给主线程;

  • 主线程不直接阻塞等待结果,而是用 QueryCallback(内部持有 std::future + 用户回调)并由 AsyncProcessor 定期轮询 future 是否就绪,如果就绪则在主线程调用用户回调;

  • 连接池 MySQLConnPool 负责创建连接、创建工作线程、维护任务队列,并给用户一个简单的 Query(sql, callback) 接口。

简单流程图

用户调用|v
MySQLConnPool::Query(sql, cb)||--> 创建 SQLOperation(sql)|--> future + cb 封装成 QueryCallback|--> SQLOperation 入 BlockingQueue|v
MySQLWorker(后台线程)|--> Pop SQLOperation|--> MySQLConn::Query(sql)|--> promise.set_value(result)|v
QueryCallback(future+cb)|
AsyncProcessor::InvokeIfReady()|--> 检查 future 是否完成|--> cb(result)

1.2 future 和 promise

这是 C++11 引入的标准库机制,用来在 不同线程之间传递结果

  • promise(承诺):任务执行完后,线程池线程通过 promise 把结果交给主线程。
  • future(期望):主线程持有 future,可以在需要的时候调用 get() 来拿到结果。

工作流程

  • 发起发起 SQL 请求的时候,核心线程封装的任务包应该携带一个承诺 promise
  • 核心线程把任务 push 到队列时需要取到对应的 future
  • promise 从核心线程传到线程池线程中
  • 等线程池线程取得 MySQL 返回结果result后,通过promise去履行承诺,用 promise.set_value(result) 把结果传递出去。
  • 线程池线程在履行承诺后,核心线程通过 future.get() 拿到结果。

伪代码示例

// 发起请求,返回一个期望
auto future = promise.get_future();// 线程池线程执行完毕后,履行承诺,设置结果
promise.set_value(res);// 核心线程通过 future 取得结果
auto result = future.get();

1.3 统一封装

线程池负责执行数据库的耗时操作,每个请求任务都会占用一个线程;连接池则为每个数据库维护固定数量的连接,并提供异步查询接口,用户只需通过 MySQLConnPool 提交 SQL 请求,就能立即得到一个 future,稍后再通过 future.get() 获取结果,而无需关心底层线程池和 future/promise 的实现细节。

接口示例:

auto future = connPool.execute("SELECT * FROM users");
auto result = future.get();  // 等待并获取结果

二、模块划分

2.1 异步回调处理

  • QueryCallback

    • 保存 future(结果的占位符)和回调函数。
    • 提供 InvokeIfReady():检查 future 是否完成,如果完成 → 调用回调函数。
  • AsyncProcessor

    • 容器:保存一堆 QueryCallback
    • 提供 InvokeIfReady():遍历所有回调,调用已完成的,未完成的继续保留。
    • 定期由线程调用,确保用户的回调被执行。

2.2 SQL 操作任务

  • SQLOperation
    • 表示“一条 SQL 操作”。
    • 内部有 promise,执行时通过 promise.set_value(result) 把结果塞进去。
    • 用户这边拿到 future,异步等待结果。

2.3 MySQL 连接与工作线程

  • MySQLConn

    • 代表一个数据库连接。
    • 功能:Open() 连接,Close() 关闭,Query() 执行 SQL。
    • 每个连接都会创建一个 MySQLWorker,后台线程不断从队列取任务来执行。
  • MySQLWorker

    • 一个线程,循环从 BlockingQueue 里取 SQLOperation
    • 取到任务 -> 调用 op->Execute(conn) -> 执行 SQL。
    • 完成后结果进入 promise,最终触发回调。

2.4 连接池管理

  • MySQLConnPool
    • 单例模式:每个数据库一个连接池。

    • InitPool(url, pool_size):初始化多个 MySQLConn,放进池子。

    • Query(sql, cb)

      1. 封装 SQLOperation,丢进队列。
      2. 返回一个 QueryCallback,里面包含 future + 回调函数。
    • 关键点:它把 “用户请求” 转化为 “后台执行任务 + 异步回调”。

2.5 工具

  • BlockingQueue

    • 一个线程安全的队列,支持 PushPop
    • 保证多线程安全通信。
  • 辅助函数 Tokenize

    • 用来解析 url;user;password 这样的字符串。

三、核心函数

3.1 用户调用示例

// 用户发起查询
auto cb = pool->Query("SELECT * FROM user", [](auto result){// 回调:拿到查询结果后的处理逻辑while (result->next()) {std::cout << "User ID: " << result->getInt("id") << std::endl;}
});
processor.AddQueryCallback(std::move(cb));
  • 用户调用 pool->Query() 发送 SQL 语句。
  • 第一个参数是SQL查询语句,第二个参数是一个lambda函数,作为查询完成后的回调函数。
  • Query() 返回一个 QueryCallback 对象,其中保存了 future 和用户定义的回调函数。
  • 用户把 QueryCallback 交给 AsyncProcessor,由它定期检查查询是否完成。
  • 一旦查询完成,用户回调被自动触发,拿到 ResultSet 并处理结果。

用户视角非常简单:只要写 SQL 和回调函数,异步查询的复杂性全部被隐藏

3.2 Query() 方法内部流程

QueryCallback MySQLConnPool::Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb) {// 创建 SQLOperation 对象,封装 SQL 查询和 promiseSQLOperation *op = new SQLOperation(sql);// 获取与 promise 关联的 future,用于后续获取结果auto future = op->GetFuture();// 将任务添加到阻塞队列中,等待工作线程处理task_queue_->Push(op);// 返回 QueryCallback 对象,封装 future 和用户回调函数return QueryCallback(std::move(future), std::move(cb));
}
  • 封装任务:将SQL查询语句和 promise 对象封装在一起,SQL 被封装为 SQLOperation 对象,内部自带一个 promise
  • 获取future对象:通过 promise 的 get_future() 方法获取关联的 future 对象
  • 任务入队:将SQLOperation对象添加到阻塞队列中,等待工作线程处理
  • 返回QueryCallback:将 future 和用户回调函数封装成 QueryCallback 对象返回

3.3 任务对象:SQLOperation

class SQLOperation {
public:explicit SQLOperation(const std::string &sql) : sql_(sql) {}void Execute(MySQLConn *conn) {// 执行 SQL 查询auto result = conn->Query(sql_);// 通过 promise 设置查询结果promise_.set_value(std::unique_ptr<sql::ResultSet>(result));}// 获取与 promise 关联的 futurestd::future<std::unique_ptr<sql::ResultSet>> GetFuture() {return promise_.get_future();}private:std::string sql_;  // SQL 查询语句std::promise<std::unique_ptr<sql::ResultSet>> promise_;  // 承诺对象,用于设置结果
};
  • 构造函数:接收SQL查询语句并保存。SQLOperation = SQL + promise
  • Execute方法:执行SQL查询并通过 promise 设置结果
    • 通过MySQLConn对象执行SQL查询
    • 使用 promise 的 set_value 方法设置查询结果
  • GetFuture方法:返回与 promise 关联的 future 对象
    • future对象用于异步获取查询结果

这里的 promise/future 就像一个 单向通信管道:后台线程写入结果,前台 future 负责读取。

3.4 MySQLWorker工作线程

// 线程的入口函数
void MySQLWorker::Worker() {while (true) {SQLOperation *op = nullptr;// 从阻塞队列中获取任务,如果队列为空则阻塞等待if (!task_queue_->Pop(op)) {break; // 如果 Pop 返回 false,退出线程}if (op) {// 执行 SQL 操作op->Execute(conn_);// 释放操作对象内存delete op;}}
}
  • 无限循环:工作线程持续运行,处理队列中的任务

  • 从队列获取任务:使用Pop方法从阻塞队列获取SQLOperation任务

    • 如果队列为空,线程会阻塞等待
    • 如果Pop返回false,表示需要退出线程
  • 执行SQL操作:调用 SQLOperation::Execute(),运行 SQL 并写入 promise

  • 释放内存:执行完成后删除SQLOperation对象,释放内存

3.5 MySQLConn::Query方法

sql::ResultSet* MySQLConn::Query(const std::string &sql) {try {// 创建 Statement 对象sql::Statement *stmt = conn_->createStatement();// 执行查询并返回结果集return stmt->executeQuery(sql);} catch (sql::SQLException &e) {// 处理异常,如连接断开则尝试重连HandlerException(e);printf("MySQLConn::Query sql exception: %s, code: %d\n", e.what(), e.getErrorCode());}return nullptr;
}
  • 创建Statement对象:通过数据库连接创建Statement对象
  • 执行查询:使用Statement对象的executeQuery方法执行SQL查询
  • 异常处理:捕获可能出现的SQLException异常
    • 调用HandlerException方法处理异常
    • 如果是连接断开错误,会自动尝试重连
  • 返回结果:返回查询结果集,如果出现异常则返回nullptr

3.6 AsyncProcessor实现

void AsyncProcessor::AddQueryCallback(QueryCallback &&query_callback) {// 将查询回调添加到待处理列表中pending_queries_.emplace_back(std::move(query_callback));
}void AsyncProcessor::InvokeIfReady() {// 遍历所有待处理的查询回调for (auto it = pending_queries_.begin(); it != pending_queries_.end();) {// 检查回调是否就绪(查询是否完成)if (it->InvokeIfReady()) {// 如果就绪,调用回调并从列表中移除it = pending_queries_.erase(it);} else {// 如果未就绪,继续保留在列表中++it;}}
}
  • AddQueryCallback方法:将QueryCallback对象添加到待处理列表中
  • InvokeIfReady方法:遍历所有待处理的查询回调
    • 检查每个回调是否就绪(查询是否完成)
    • 如果就绪,调用回调函数并从列表中移除
    • 如果未就绪,继续保留在列表中等待下次检查

3.7 QueryCallback实现

class QueryCallback {
public:QueryCallback(std::future<std::unique_ptr<sql::ResultSet>> &&future,std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb): future_(std::move(future)), cb_(std::move(cb)) {}// 检查并调用回调(如果就绪)bool InvokeIfReady() {// 检查 future 是否就绪(查询是否完成)if (future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {// 调用用户回调函数,传递查询结果cb_(std::move(future_.get()));return true; // 表示回调已处理}return false; // 表示回调未就绪}private:std::future<std::unique_ptr<sql::ResultSet>> future_; // 用于获取查询结果std::function<void(std::unique_ptr<sql::ResultSet>)> cb_; // 用户回调函数
};
  • 构造函数:接收future对象和用户回调函数
  • InvokeIfReady方法:检查future是否就绪
    • 使用wait_for方法检查future状态
    • 如果就绪,调用用户回调函数并传递查询结果
    • 返回true表示回调已处理,false表示未就绪

四、核心组件

4.1 BlockingQueue阻塞队列

template <typename T>
class BlockingQueue {
public:explicit BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {}// 入队操作void Push(const T &value) {std::lock_guard<std::mutex> lock(mutex_);queue_.push(value);not_empty_.notify_one(); // 通知一个等待的线程}// 出队操作bool Pop(T &value) {std::unique_lock<std::mutex> lock(mutex_);// 等待条件:队列不为空或处于非阻塞模式not_empty_.wait(lock, [this] { return !queue_.empty() || nonblock_; });// 如果队列为空且处于非阻塞模式,返回 falseif (queue_.empty() && nonblock_) {return false;}// 获取队首元素并出队value = queue_.front();queue_.pop();return true;}// 取消阻塞void Cancel() {std::lock_guard<std::mutex> lock(mutex_);nonblock_ = true; // 设置为非阻塞模式not_empty_.notify_all(); // 唤醒所有等待的线程}private:bool nonblock_; // 是否为非阻塞模式std::queue<T> queue_; // 队列容器std::mutex mutex_; // 互斥锁std::condition_variable not_empty_; // 条件变量
};
  1. Push方法:添加元素到队列

    • 使用互斥锁保证线程安全
    • 添加元素后通知一个等待的线程
  2. Pop方法:从队列获取元素

    • 如果队列为空,线程会阻塞等待
    • 使用条件变量实现高效的等待/通知机制
    • 如果处于非阻塞模式且队列为空,返回false
  3. Cancel方法:取消阻塞

    • 设置非阻塞模式
    • 唤醒所有等待的线程,用于优雅关闭

4.2 MySQLConn连接管理

class MySQLConn {
public:MySQLConn(const std::string &info, const std::string &db, BlockingQueue<SQLOperation*> &task_queue): info_(info, db) {// 创建工作线程处理SQL操作worker_ = new MySQLWorker(this, task_queue);worker_->Start();}~MySQLConn() {if (worker_) {worker_->Stop();delete worker_;}if (conn_) {delete conn_;}}// 建立数据库连接int Open() {int err = 0;try {driver_ = get_driver_instance();conn_ = driver_->connect(info_.url, info_.user, info_.password);if (!conn_) {printf("MySQLConn::Open connect failed\n");return -1;}conn_->setSchema(info_.database); // 设置默认数据库} catch (sql::SQLException &e) {HandlerException(e); // 处理异常err = e.getErrorCode();}return err;}// 处理MySQL异常void HandlerException(sql::SQLException &e) {// 如果是连接断开错误,尝试重连if (e.getErrorCode() == 2006 || e.getErrorCode() == 2013) {printf("MySQLConn::HandlerException reconnecting...\n");Close();Open();} else {printf("MySQLConn::HandlerException sql exception: %s, code: %d\n", e.what(), e.getErrorCode());}}private:sql::Driver *driver_; // MySQL驱动sql::Connection *conn_; // MySQL连接MySQLWorker *worker_; // 工作线程MySQLConnInfo info_; // 连接信息
};
  1. 构造函数:初始化连接信息并创建工作线程

    • 每个MySQLConn对象都有一个关联的工作线程
  2. 析构函数:清理资源

    • 停止工作线程
    • 释放数据库连接
  3. Open方法:建立数据库连接

    • 获取MySQL驱动实例
    • 建立数据库连接
    • 设置默认数据库
  4. HandlerException方法:处理MySQL异常

    • 如果是连接断开错误(错误码2006或2013),自动尝试重连
    • 其他异常记录日志

五、问题与优化

1. 进一步封装

  • 对用户而言,只需要调用连接池提供的接口(如 execute)。
  • 底层线程池和连接管理完全隐藏。

2. 安全与稳定性

数据库连接池除了基本的执行,还需要考虑:

  • 断线重连机制

    • MySQL 如果长时间没有请求,会主动断开连接。
    • 需要检测连接是否有效,断开时自动重连。
  • 心跳检测机制

    • 定时发送一个简单 SQL(如 SELECT 1),保持连接活跃。
    • 避免被数据库服务器判定为闲置而断开。

3. 资源管理

  • 限制最大连接数,避免资源耗尽。
  • 使用智能指针(std::shared_ptrstd::unique_ptr)管理连接,避免内存泄漏。

4. 性能优化

  • 避免重复创建连接,复用已建立的连接。
  • 支持连接池参数可配置(最小连接数、最大连接数、超时策略)。

文章转载自:

http://R6yNqrhg.zqcsj.cn
http://VBa0t8UH.zqcsj.cn
http://2gBe56uZ.zqcsj.cn
http://H9D8VATS.zqcsj.cn
http://tkFwezaY.zqcsj.cn
http://Vvj79vl2.zqcsj.cn
http://q0ibtDmR.zqcsj.cn
http://lQJDQlU7.zqcsj.cn
http://acw8dnUR.zqcsj.cn
http://PSnVHgAE.zqcsj.cn
http://1r3zDfF8.zqcsj.cn
http://mLfojb33.zqcsj.cn
http://7CcWXvUt.zqcsj.cn
http://sw4SWpxZ.zqcsj.cn
http://EoALhgT1.zqcsj.cn
http://07JY4e4S.zqcsj.cn
http://1SK7Cq8M.zqcsj.cn
http://rAeGDZMk.zqcsj.cn
http://FbCudVVJ.zqcsj.cn
http://pZqWek6c.zqcsj.cn
http://ijfx0npW.zqcsj.cn
http://olygOHXe.zqcsj.cn
http://rWSczQeB.zqcsj.cn
http://BypIpsTb.zqcsj.cn
http://4nZHBFjY.zqcsj.cn
http://m31TUQy8.zqcsj.cn
http://FDOza0s6.zqcsj.cn
http://9nnF45To.zqcsj.cn
http://tyzV8LOw.zqcsj.cn
http://zRzTl6DJ.zqcsj.cn
http://www.dtcms.com/a/387461.html

相关文章:

  • 用Python 连接 MySQL数据库测试实战脚本(文中含源代码)
  • vue中下载文件保存格式和加密方式
  • typescript和vue和node项目的构建打包部署
  • Chat2DB+cpolar组合突破物理限制,成为数据库查询新解
  • Power BI 组件 AI Chart 技术解析:自然语言驱动的可视化革新
  • 【Linux网络】网络传输基本流程
  • 【开题答辩全过程】以 Boss直聘网站数据分析与可视化为例,包含答辩的问题和答案
  • 基于 Node.js 的后端框架:NestJS 和 Express(一)
  • Python 2025:现代Web开发与数据分析的融合新趋势
  • 数据可视化:点亮数据背后的价值
  • 微信小程序答题考试源码系统+独立部署教程 适配学校 / 企业 / 培训机构
  • Apache JMeter介绍(开源的性能测试工具,主要用于对软件系统、服务器、网络或对象进行压力测试和性能测试)
  • 叠衣服的最优解:机器人如何用语言指令完成复杂家务
  • jmeter 数据库连接配置 JDBC Connection Configuration
  • 神经网络与深度学习基础:从线性回归到分类模型
  • Jmeter 参数、设置相关
  • jmeter 提取变量设置为全局变量
  • open61499:重新定义工业编程,让复杂自动化变简单
  • 基于MATLAB的支持向量数据描述算法
  • 超越重命名:如何利用高级规则实现文件的精准自动化分类保存
  • Spring Cloud Gateway:一次不规范 URL 引发的路由转发404问题排查
  • C#开发常用方法汇总(类型转换)
  • 从踩坑到高效选型:基于 AI Ping 平台的 20+MaaS 供应商、220 + 模型服务性能(延迟 / 吞吐 / 可靠性):深度评测与大模型选型指南
  • LeetCode刷题记录----347.前K个高频元素(Medium)
  • Windows 部署hexo并启动自己的博客
  • 建议对下载的geo原始数据进行低表达基因过滤**,这是数据预处理的关键步骤之一,可提升后续分析(如差异表达、WGCNA)的准确性和可靠性
  • MySQL 数据库备份与恢复
  • SQLite 数据库简介
  • Java进阶教程,全面剖析Java多线程编程,线程的优先级,笔记07
  • YOLOv12目标检测:使用自定义数据集训练 YOLOv12 检测坑洞严重程度