异步MySQL连接池实现
文章目录
- 一、异步连接池
- 1.1 设计思想
- 1.2 future 和 promise
- 1.3 统一封装
- 二、模块划分
- 2.1 异步回调处理
- 2.2 SQL 操作任务
- 2.3 MySQL 连接与工作线程
- 2.4 连接池管理
- 2.5 工具
- 三、核心函数
- 3.1 用户调用示例
- 3.2 Query() 方法内部流程
- 3.3 任务对象:SQLOperation
- 3.4 MySQLWorker工作线程
- 3.5 MySQLConn::Query方法
- 3.6 AsyncProcessor实现
- 3.7 QueryCallback实现
- 四、核心组件
- 4.1 BlockingQueue阻塞队列
- 4.2 MySQLConn连接管理
- 五、问题与优化
一、异步连接池
1.1 设计思想
MySQL 的 send
(发送 SQL 请求)和 recv
(接收执行结果)本质上是耗时操作,会阻塞线程。如果主线程直接去等待结果,程序就会卡住。解决办法是使用一个线程池(异步连接池),让这些耗时的数据库操作在后台线程执行,而主线程只需要等待结果即可继续执行其他逻辑。
实现异步连接池的目的是异步执行 SQL,并把耗时的网络或数据库操作放到后台线程去做,把结果在主线程以回调的方式拿回并处理。整体流程为:
-
每个 MySQL 连接配一个后台工作线程(
MySQLWorker
),该线程从一个共享的任务队列(BlockingQueue<SQLOperation*>
)取任务执行。 -
查询以 任务(
SQLOperation
)+承诺(std::promise
) 的形式提交,连接池线程执行并用promise
把结果(或异常)交还给主线程; -
主线程不直接阻塞等待结果,而是用
QueryCallback
(内部持有std::future
+ 用户回调)并由AsyncProcessor
定期轮询future
是否就绪,如果就绪则在主线程调用用户回调; -
连接池
MySQLConnPool
负责创建连接、创建工作线程、维护任务队列,并给用户一个简单的Query(sql, callback)
接口。
简单流程图:
用户调用|v
MySQLConnPool::Query(sql, cb)||--> 创建 SQLOperation(sql)|--> future + cb 封装成 QueryCallback|--> SQLOperation 入 BlockingQueue|v
MySQLWorker(后台线程)|--> Pop SQLOperation|--> MySQLConn::Query(sql)|--> promise.set_value(result)|v
QueryCallback(future+cb)|
AsyncProcessor::InvokeIfReady()|--> 检查 future 是否完成|--> cb(result)
1.2 future 和 promise
这是 C++11 引入的标准库机制,用来在 不同线程之间传递结果。
- promise(承诺):任务执行完后,线程池线程通过
promise
把结果交给主线程。 - future(期望):主线程持有
future
,可以在需要的时候调用get()
来拿到结果。
工作流程:
- 发起发起 SQL 请求的时候,核心线程封装的任务包应该携带一个承诺
promise
- 核心线程把任务
push
到队列时需要取到对应的future
。 promise
从核心线程传到线程池线程中- 等线程池线程取得 MySQL 返回结果
result
后,通过promise
去履行承诺,用promise.set_value(result)
把结果传递出去。 - 线程池线程在履行承诺后,核心线程通过
future.get()
拿到结果。
伪代码示例:
// 发起请求,返回一个期望
auto future = promise.get_future();// 线程池线程执行完毕后,履行承诺,设置结果
promise.set_value(res);// 核心线程通过 future 取得结果
auto result = future.get();
1.3 统一封装
线程池负责执行数据库的耗时操作,每个请求任务都会占用一个线程;连接池则为每个数据库维护固定数量的连接,并提供异步查询接口,用户只需通过 MySQLConnPool
提交 SQL 请求,就能立即得到一个 future
,稍后再通过 future.get()
获取结果,而无需关心底层线程池和 future/promise
的实现细节。
接口示例:
auto future = connPool.execute("SELECT * FROM users");
auto result = future.get(); // 等待并获取结果
二、模块划分
2.1 异步回调处理
-
QueryCallback
- 保存
future
(结果的占位符)和回调函数。 - 提供
InvokeIfReady()
:检查 future 是否完成,如果完成 → 调用回调函数。
- 保存
-
AsyncProcessor
- 容器:保存一堆
QueryCallback
。 - 提供
InvokeIfReady()
:遍历所有回调,调用已完成的,未完成的继续保留。 - 定期由线程调用,确保用户的回调被执行。
- 容器:保存一堆
2.2 SQL 操作任务
SQLOperation
- 表示“一条 SQL 操作”。
- 内部有
promise
,执行时通过promise.set_value(result)
把结果塞进去。 - 用户这边拿到
future
,异步等待结果。
2.3 MySQL 连接与工作线程
-
MySQLConn
- 代表一个数据库连接。
- 功能:
Open()
连接,Close()
关闭,Query()
执行 SQL。 - 每个连接都会创建一个
MySQLWorker
,后台线程不断从队列取任务来执行。
-
MySQLWorker
- 一个线程,循环从
BlockingQueue
里取SQLOperation
。 - 取到任务 -> 调用
op->Execute(conn)
-> 执行 SQL。 - 完成后结果进入
promise
,最终触发回调。
- 一个线程,循环从
2.4 连接池管理
MySQLConnPool
-
单例模式:每个数据库一个连接池。
-
InitPool(url, pool_size)
:初始化多个MySQLConn
,放进池子。 -
Query(sql, cb)
:- 封装
SQLOperation
,丢进队列。 - 返回一个
QueryCallback
,里面包含 future + 回调函数。
- 封装
-
关键点:它把 “用户请求” 转化为 “后台执行任务 + 异步回调”。
-
2.5 工具
-
BlockingQueue
- 一个线程安全的队列,支持
Push
和Pop
。 - 保证多线程安全通信。
- 一个线程安全的队列,支持
-
辅助函数
Tokenize
- 用来解析
url;user;password
这样的字符串。
- 用来解析
三、核心函数
3.1 用户调用示例
// 用户发起查询
auto cb = pool->Query("SELECT * FROM user", [](auto result){// 回调:拿到查询结果后的处理逻辑while (result->next()) {std::cout << "User ID: " << result->getInt("id") << std::endl;}
});
processor.AddQueryCallback(std::move(cb));
- 用户调用
pool->Query()
发送 SQL 语句。 - 第一个参数是SQL查询语句,第二个参数是一个lambda函数,作为查询完成后的回调函数。
Query()
返回一个 QueryCallback 对象,其中保存了future
和用户定义的回调函数。- 用户把
QueryCallback
交给AsyncProcessor
,由它定期检查查询是否完成。 - 一旦查询完成,用户回调被自动触发,拿到
ResultSet
并处理结果。
用户视角非常简单:只要写 SQL 和回调函数,异步查询的复杂性全部被隐藏
3.2 Query() 方法内部流程
QueryCallback MySQLConnPool::Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb) {// 创建 SQLOperation 对象,封装 SQL 查询和 promiseSQLOperation *op = new SQLOperation(sql);// 获取与 promise 关联的 future,用于后续获取结果auto future = op->GetFuture();// 将任务添加到阻塞队列中,等待工作线程处理task_queue_->Push(op);// 返回 QueryCallback 对象,封装 future 和用户回调函数return QueryCallback(std::move(future), std::move(cb));
}
- 封装任务:将SQL查询语句和 promise 对象封装在一起,SQL 被封装为
SQLOperation
对象,内部自带一个promise
- 获取future对象:通过 promise 的 get_future() 方法获取关联的 future 对象
- 任务入队:将SQLOperation对象添加到阻塞队列中,等待工作线程处理
- 返回QueryCallback:将 future 和用户回调函数封装成 QueryCallback 对象返回
3.3 任务对象:SQLOperation
class SQLOperation {
public:explicit SQLOperation(const std::string &sql) : sql_(sql) {}void Execute(MySQLConn *conn) {// 执行 SQL 查询auto result = conn->Query(sql_);// 通过 promise 设置查询结果promise_.set_value(std::unique_ptr<sql::ResultSet>(result));}// 获取与 promise 关联的 futurestd::future<std::unique_ptr<sql::ResultSet>> GetFuture() {return promise_.get_future();}private:std::string sql_; // SQL 查询语句std::promise<std::unique_ptr<sql::ResultSet>> promise_; // 承诺对象,用于设置结果
};
- 构造函数:接收SQL查询语句并保存。
SQLOperation
= SQL + promise - Execute方法:执行SQL查询并通过 promise 设置结果
- 通过MySQLConn对象执行SQL查询
- 使用 promise 的 set_value 方法设置查询结果
- GetFuture方法:返回与 promise 关联的 future 对象
- future对象用于异步获取查询结果
这里的 promise/future
就像一个 单向通信管道:后台线程写入结果,前台 future
负责读取。
3.4 MySQLWorker工作线程
// 线程的入口函数
void MySQLWorker::Worker() {while (true) {SQLOperation *op = nullptr;// 从阻塞队列中获取任务,如果队列为空则阻塞等待if (!task_queue_->Pop(op)) {break; // 如果 Pop 返回 false,退出线程}if (op) {// 执行 SQL 操作op->Execute(conn_);// 释放操作对象内存delete op;}}
}
-
无限循环:工作线程持续运行,处理队列中的任务
-
从队列获取任务:使用Pop方法从阻塞队列获取SQLOperation任务
- 如果队列为空,线程会阻塞等待
- 如果Pop返回false,表示需要退出线程
-
执行SQL操作:调用
SQLOperation::Execute()
,运行 SQL 并写入promise
-
释放内存:执行完成后删除SQLOperation对象,释放内存
3.5 MySQLConn::Query方法
sql::ResultSet* MySQLConn::Query(const std::string &sql) {try {// 创建 Statement 对象sql::Statement *stmt = conn_->createStatement();// 执行查询并返回结果集return stmt->executeQuery(sql);} catch (sql::SQLException &e) {// 处理异常,如连接断开则尝试重连HandlerException(e);printf("MySQLConn::Query sql exception: %s, code: %d\n", e.what(), e.getErrorCode());}return nullptr;
}
- 创建Statement对象:通过数据库连接创建Statement对象
- 执行查询:使用Statement对象的executeQuery方法执行SQL查询
- 异常处理:捕获可能出现的SQLException异常
- 调用HandlerException方法处理异常
- 如果是连接断开错误,会自动尝试重连
- 返回结果:返回查询结果集,如果出现异常则返回nullptr
3.6 AsyncProcessor实现
void AsyncProcessor::AddQueryCallback(QueryCallback &&query_callback) {// 将查询回调添加到待处理列表中pending_queries_.emplace_back(std::move(query_callback));
}void AsyncProcessor::InvokeIfReady() {// 遍历所有待处理的查询回调for (auto it = pending_queries_.begin(); it != pending_queries_.end();) {// 检查回调是否就绪(查询是否完成)if (it->InvokeIfReady()) {// 如果就绪,调用回调并从列表中移除it = pending_queries_.erase(it);} else {// 如果未就绪,继续保留在列表中++it;}}
}
- AddQueryCallback方法:将QueryCallback对象添加到待处理列表中
- InvokeIfReady方法:遍历所有待处理的查询回调
- 检查每个回调是否就绪(查询是否完成)
- 如果就绪,调用回调函数并从列表中移除
- 如果未就绪,继续保留在列表中等待下次检查
3.7 QueryCallback实现
class QueryCallback {
public:QueryCallback(std::future<std::unique_ptr<sql::ResultSet>> &&future,std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb): future_(std::move(future)), cb_(std::move(cb)) {}// 检查并调用回调(如果就绪)bool InvokeIfReady() {// 检查 future 是否就绪(查询是否完成)if (future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {// 调用用户回调函数,传递查询结果cb_(std::move(future_.get()));return true; // 表示回调已处理}return false; // 表示回调未就绪}private:std::future<std::unique_ptr<sql::ResultSet>> future_; // 用于获取查询结果std::function<void(std::unique_ptr<sql::ResultSet>)> cb_; // 用户回调函数
};
- 构造函数:接收future对象和用户回调函数
- InvokeIfReady方法:检查future是否就绪
- 使用wait_for方法检查future状态
- 如果就绪,调用用户回调函数并传递查询结果
- 返回true表示回调已处理,false表示未就绪
四、核心组件
4.1 BlockingQueue阻塞队列
template <typename T>
class BlockingQueue {
public:explicit BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {}// 入队操作void Push(const T &value) {std::lock_guard<std::mutex> lock(mutex_);queue_.push(value);not_empty_.notify_one(); // 通知一个等待的线程}// 出队操作bool Pop(T &value) {std::unique_lock<std::mutex> lock(mutex_);// 等待条件:队列不为空或处于非阻塞模式not_empty_.wait(lock, [this] { return !queue_.empty() || nonblock_; });// 如果队列为空且处于非阻塞模式,返回 falseif (queue_.empty() && nonblock_) {return false;}// 获取队首元素并出队value = queue_.front();queue_.pop();return true;}// 取消阻塞void Cancel() {std::lock_guard<std::mutex> lock(mutex_);nonblock_ = true; // 设置为非阻塞模式not_empty_.notify_all(); // 唤醒所有等待的线程}private:bool nonblock_; // 是否为非阻塞模式std::queue<T> queue_; // 队列容器std::mutex mutex_; // 互斥锁std::condition_variable not_empty_; // 条件变量
};
-
Push方法:添加元素到队列
- 使用互斥锁保证线程安全
- 添加元素后通知一个等待的线程
-
Pop方法:从队列获取元素
- 如果队列为空,线程会阻塞等待
- 使用条件变量实现高效的等待/通知机制
- 如果处于非阻塞模式且队列为空,返回false
-
Cancel方法:取消阻塞
- 设置非阻塞模式
- 唤醒所有等待的线程,用于优雅关闭
4.2 MySQLConn连接管理
class MySQLConn {
public:MySQLConn(const std::string &info, const std::string &db, BlockingQueue<SQLOperation*> &task_queue): info_(info, db) {// 创建工作线程处理SQL操作worker_ = new MySQLWorker(this, task_queue);worker_->Start();}~MySQLConn() {if (worker_) {worker_->Stop();delete worker_;}if (conn_) {delete conn_;}}// 建立数据库连接int Open() {int err = 0;try {driver_ = get_driver_instance();conn_ = driver_->connect(info_.url, info_.user, info_.password);if (!conn_) {printf("MySQLConn::Open connect failed\n");return -1;}conn_->setSchema(info_.database); // 设置默认数据库} catch (sql::SQLException &e) {HandlerException(e); // 处理异常err = e.getErrorCode();}return err;}// 处理MySQL异常void HandlerException(sql::SQLException &e) {// 如果是连接断开错误,尝试重连if (e.getErrorCode() == 2006 || e.getErrorCode() == 2013) {printf("MySQLConn::HandlerException reconnecting...\n");Close();Open();} else {printf("MySQLConn::HandlerException sql exception: %s, code: %d\n", e.what(), e.getErrorCode());}}private:sql::Driver *driver_; // MySQL驱动sql::Connection *conn_; // MySQL连接MySQLWorker *worker_; // 工作线程MySQLConnInfo info_; // 连接信息
};
-
构造函数:初始化连接信息并创建工作线程
- 每个MySQLConn对象都有一个关联的工作线程
-
析构函数:清理资源
- 停止工作线程
- 释放数据库连接
-
Open方法:建立数据库连接
- 获取MySQL驱动实例
- 建立数据库连接
- 设置默认数据库
-
HandlerException方法:处理MySQL异常
- 如果是连接断开错误(错误码2006或2013),自动尝试重连
- 其他异常记录日志
五、问题与优化
1. 进一步封装
- 对用户而言,只需要调用连接池提供的接口(如
execute
)。 - 底层线程池和连接管理完全隐藏。
2. 安全与稳定性
数据库连接池除了基本的执行,还需要考虑:
-
断线重连机制:
- MySQL 如果长时间没有请求,会主动断开连接。
- 需要检测连接是否有效,断开时自动重连。
-
心跳检测机制:
- 定时发送一个简单 SQL(如
SELECT 1
),保持连接活跃。 - 避免被数据库服务器判定为闲置而断开。
- 定时发送一个简单 SQL(如
3. 资源管理
- 限制最大连接数,避免资源耗尽。
- 使用智能指针(
std::shared_ptr
、std::unique_ptr
)管理连接,避免内存泄漏。
4. 性能优化
- 避免重复创建连接,复用已建立的连接。
- 支持连接池参数可配置(最小连接数、最大连接数、超时策略)。