当前位置: 首页 > news >正文

【基础组件】手撕 MYSQL 连接池(C++ 版本)

文章目录

  • 介绍
  • 连接池的 C++ 实现(提供生产者的操作接口)
    • MYSQL 连接封装的 C++ 实现(提供消费者的操作接口)
    • MYSQL 的工作负责员—— MySQLWorker
    • SQLOperation 包装 MYSQL 操作语句与执行函数
    • QueryCallback——在查询任务结束后立刻生成的对象类
    • AsyncProcessor——异步的数据处理器
    • 测试代码
  • 总结

推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接。

介绍

我写过一篇文章介绍了 C++ 线程池的写法(链接 在这),也归纳了所谓的 “池” 必须有的一些性质:
1、池,总是有一些东西是可以复用的,不会被废弃。
2、池,当 “池” 没人使用的时候,它是可以维持自身的可用属性,并处于低功耗状态。
3、池,当 “池” 负载太大的时候,需要存储缓存新加入的消费者。

我也曾经写过 MYSQL 远程插入图片的代码(不过是 C 语言版本的,链接 在这),我们连接 MYSQL 的目的是查询结构化的数据,然后再根据这些数据来执行处理函数。我们是输入对应的 MYSQL 查询语句才能查询到数据。对于 MYSQL 连接池来说,这个 “池” 应该具有的性质是

  1. 本机对对应的 MYSQL 服务器的网络连接管道本身可以复用,往管道输入 MYSQL 查询语句的工作线程本身可以复用(MYSQL 连接池可以看成是另一种的线程池).
  2. 当无查询任务任务的时候,连接管道所对应线程会休眠
  3. 当任务负荷量过多时候,需要有一个缓冲涌入的任务的容器

总的来说,是需要在线程池的基础之上再做修正的,
1、没有所谓的任务函数了,所有任务函数一律转换成 MYSQL 的查询操作语句。那么就需要包装任务的类型了——Operation。
2、同时,也得重新包装工作线程了,因为一个工作线程是需要锁定一个 MYSQL 连接通道的。
3、由于涉及 MYSQL 的查询,一定会有查询结果的,生产者可以根据这些查询结果来进行数据处理。
4、我们在原来的队列、向量等容器上,需要加上同步的语句操作(比如互斥锁、原子操作),一个原本普普通通的容器才能变成具有多线程操作功能的特殊容器。线程池、连接池都是这种东西。我们需要考虑清楚

连接池的 C++ 实现(提供生产者的操作接口)

连接池的构造如下

#pragma once
#include <vector>
#include <memory>
#include <functional>
#include <cppconn/resultset.h>#include <unordered_map>#include "QueryCallback.h"namespace sql {class ResultSet;
}class MySQLConn;
template <typename T>
class BlockingQueuePro;class SQLOperation;class MySQLConnPool {
public:static MySQLConnPool *GetInstance(const std::string &db);       //  这是单例模式、全局唯一的对象,传入的参数是自己想要访问的那一个数据库void InitPool(const std::string &url, int pool_size);           //  资源标识符(长得像网址的东西)QueryCallback Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb);//  QueryCallback 类的构造函数,这个类负责
private:MySQLConnPool(const std::string &db) : database_(db) {}~MySQLConnPool();std::string database_;      //  数据库std::vector<MySQLConn *> pool_;     //  若干个连接(一个连接绑定一个工作线程)//  instances_ 是一个 “全局哈希表”,用来把 不同数据库名 映射到 各自的单例连接池对象static std::unordered_map<std::string, MySQLConnPool *> instances_;     //  类似于hash表BlockingQueuePro<SQLOperation *> *task_queue_;     //  MYSQL 操作语句队列
};

首先,任务操作的装载容器是拥塞队列,具体可见我这篇 文章,此处不赘述。MYSQL 的任务操作 SQLOperation 是我们设定并且用于包装 MYSQL 语句的。

前置声明
template <typename T>
class BlockingQueuePro;连接池里的 private 
BlockingQueuePro<SQLOperation *> *task_queue_;     //  MYSQL 操作语句队列

这个线程池是全局唯一的,一个类型就只能有一个实例化,这是面向对象编程的单例模式,

public:static MySQLConnPool *GetInstance(const std::string &db);       //  这是单例模式、全局唯一的对象,传入的参数是自己想要访问的那一个数据库
private:MySQLConnPool(const std::string &db) : database_(db) {}~MySQLConnPool();

我们注意到单例模式的特点:
1、私有化构造函数,防止外部直接创建实例。 每一个 class 的 private 阻止的是“外部访问”,并不阻止“类外定义静态成员变量”;真正的访问仍只能在类内或友元中进行。
2、提供一个静态方法获取唯一实例
3、线程安全(在多线程环境下需要特别注意)

获取单例的函数如下:

//  单例模式,全局唯一个连接池,这个连接池是针对某个数据库 db 的
MySQLConnPool *MySQLConnPool::GetInstance(const std::string &db) {if (instances_.find(db) == instances_.end()) {instances_[db] = new MySQLConnPool(db);}return instances_[db];
}

在声明一个单例之后,我们得对这个线程池对象进行初始化。这包括了 url 初始化,连接到 MYSQL 服务器这个网络资源,并指定要连接那个数据库。

void MySQLConnPool::InitPool(const std::string &url, int pool_size) {task_queue_ = new BlockingQueuePro<SQLOperation *>();                      //  动态中创建for (int i = 0; i < pool_size; ++i) {MySQLConn *conn = new MySQLConn(url, database_, *task_queue_);      //  动态创建,而且这里是左值引用,连接池、连接、工作线程是共用一个拥塞队列conn->Open();pool_.push_back(conn);  //  pool_ 是一个连接向量}//  可以,MySQL 允许同一个用户并发创建多条连接到同一数据库。//  限制来自 MySQL 服务器端参数 和 用户级配额,而不是“同一用户 + 同一库”这一组合本身。
}

接下来就是提供生产者的接口

QueryCallback MySQLConnPool::Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb) {SQLOperation *op = new SQLOperation(sql);       //  创建 MYSQL 语句操作对象auto future = op->GetFuture();task_queue_->Push(op);return QueryCallback(std::move(future), std::move(cb));
}

QueryCallback 是用来包装 MYSQL 查询结果与生产者对应的数据处理函数,这个函数接口是用来给生产者向 MYSQL 连接池推送 MYSQL 查询语句的,等待查询结果出来再做数据处理。推送到的地方是该单例的 MYSQL 任务操作拥塞队列。

BlockingQueuePro<SQLOperation *> *task_queue_;

读者可能会问,“连接池为什么现在还没有给出消费者的操作接口呢?即不断地 pop MySQL 查询语句。” 我说,其实是有点,不过相当之隐蔽,他就隐藏在 函数 InitPool 里面的这句话

MySQLConn *conn = new MySQLConn(url, database_, *task_queue_);

以及,MYSQL 连接池类型里面的这个成员

std::vector<MySQLConn *> pool_;     //  若干个连接(一个连接绑定一个工作线程)

MYSQL 连接封装的 C++ 实现(提供消费者的操作接口)

头文件决定了具体的架构

#pragma once#include "SQLOperation.h"
#include <string>namespace sql 
{class Driver;class Connection;class SQLException;class ResultSet;
}class MySQLWorker;//  源文件是不需要这样的,但是头文件这样做是为了避免头文件依赖
template <typename T>
class BlockingQueuePro;class SQLOperation;//  struct 也是一个类,只是所有成员都是 public
struct MySQLConnInfo {explicit MySQLConnInfo(const std::string &info, const std::string &db);//  这是 MYSQL 的账户信息std::string user;std::string password;std::string database;std::string url;
};//  这是代表网络连接本身,由 MYSQL 本身所驱动
class MySQLConn {
public:MySQLConn(const std::string &info, const std::string &db, BlockingQueuePro<SQLOperation *> &task_queue);~MySQLConn();int Open();void Close();sql::ResultSet* Query(const std::string &sql);private:void HandlerException(sql::SQLException &e);sql::Driver *driver_;sql::Connection *conn_;MySQLWorker *worker_;       //  一个连接绑定一个工作线程MySQLConnInfo info_;
};

struct MySQLConnInfo 就是我们所登陆的 MYSQL 数据库的账号、密码、远程地址等等。一个连接绑定一个工作者(线程+具体信息),这是绑定了工作线程的指针。sql::Connection *conn_ 是 MYSQL 的 API 接口所提供的一个类型,就是我们连接的本身

    sql::Driver *driver_;sql::Connection *conn_;MySQLWorker *worker_;       //  一个连接绑定一个工作线程

这个类型的构造函数在构造对象之初就已经在初始化工作线程了,而且这个 MySQLConn 是可以触碰到 MYSQL 任务操作拥塞队列的(它将会被左值引用)。

//  这是 MYSQL 连接的构造函数
MySQLConn::MySQLConn(const std::string &info, const std::string &db, BlockingQueuePro<SQLOperation *> &task_queue): info_(info, db)
{worker_ = new MySQLWorker(this, task_queue);    //  注意,这里是左值引用,连接池、连接、工作线程是共用一个拥塞队列worker_->Start();
}

紧接着就是提供消费者的操作函数的接口,createStatement()executeQuery(sql) 都是 MYSQL 提供的 API 接口函数,就是向服务器提交查询命令的函数组合。

//  这是要执行查询语句,并返回查询结果
sql::ResultSet* MySQLConn::Query(const std::string &sql)
{//  C++ 特有的纠错机制try {sql::Statement *stmt = conn_->createStatement();        //  声明将要写入 MYSQL 语句return stmt->executeQuery(sql);                         //  查询结果} catch (sql::SQLException &e) {HandlerException(e);}return nullptr;
}

但是,连接并不是直接工作的线程,连接 MySQLConn 无法主动使用这个接口函数,它必须是线程来触发这个查询任务。而且这个函数也不是完美的,因为执行它的是连接池里的工作线程,但做数据处理的还是生产者,故而我们需要把查询结果传递给生产者,在做一个包装函数,往外包装一层,做数据通知传递。

MYSQL 的工作负责员—— MySQLWorker

#pragma once#include <thread>class MySQLConn;template <typename T>
class BlockingQueuePro;        //  前置声明class SQLOperation;         //  前置声明//  一个 mysql 工作人员该有的东西
class MySQLWorker {
public://  双参的构造函数是无需显示构造MySQLWorker(MySQLConn *conn, BlockingQueuePro<SQLOperation *> &task_queue);~MySQLWorker(); //  析构函数void Start();void Stop();private:void Worker();          //  工作函数MySQLConn *conn_;std::thread worker_;BlockingQueuePro<SQLOperation *> &task_queue_;         //  mysql 操作指令拥塞队列//  必须注意到这是左值引用,在构造变量的时候,插入了的引用的正是连接池的队列
};

MySQL 工作负责员的构造函数说明,其可以触碰之前那个 MYSQL 操作语句的拥塞队列,因为这是左值引用。

MySQLWorker::MySQLWorker(MySQLConn *conn, BlockingQueuePro<SQLOperation *> &task_queue): conn_(conn), task_queue_(task_queue)
{
}

启动线程

void MySQLWorker::Start()
{worker_ = std::thread(&MySQLWorker::Worker, this);
}

线程的消费行为,我们注意到它是会消费 MYSQL 查询语句的拥塞队列的,它消费的方法是 op->Execute(conn_); 这个 op 就是 SQLOperation 类型的对象,因而我们再定义这个类型,里面就会有具体的消费方法.

//  从拥塞队列里面拿命令语句去执行
void MySQLWorker::Worker() {while (true) {SQLOperation *op = nullptr;if (!task_queue_.Pop(op)) {break;}op->Execute(conn_);delete op;}
}

停止工作线程

//  停止工作线程
void MySQLWorker::Stop()
{if (worker_.joinable()) {worker_.join();}
}

SQLOperation 包装 MYSQL 操作语句与执行函数

头文件决定架构

#pragma once#include <string>
#include <future>
#include <memory>
#include <cppconn/resultset.h>
namespace sql
{class ResultSet;
}class MySQLConn;    //  这是前置声明,避免头文件依赖。class SQLOperation {
public:explicit SQLOperation(const std::string &sql) : sql_(sql) {}        //  显式构造函数,将要执行的 sql 命令 void Execute(MySQLConn *conn);std::future<std::unique_ptr<sql::ResultSet>> GetFuture() {return promise_.get_future();}private:std::string sql_;       //  sql 命令std::promise<std::unique_ptr<sql::ResultSet>> promise_;     //  promise 类型,查询命令的返回结果用作承诺
};

需要注意到 MYSQL 命令只是一个字符串

std::string sql_;       //  sql 命令

还需注意到线程间内存变量的通信 promise-future 语法,promise 与 future 两个内存是会了解彼此的

std::promise<std::unique_ptr<sql::ResultSet>> promise_;std::future<std::unique_ptr<sql::ResultSet>> GetFuture() {return promise_.get_future();
}

它们的语法如下

2、promise 和 future 是两个类。可用于不同子线程之间的通信,跟 condition_variable 的 wait-notify 很类似(这个用于阻塞-唤醒线程),而 promise-future 就是传递变量,一个线程(秉持 promise)执行完一个阶段后,由另一个线程通过 future 去获取 promise 指向的数据。通常需要阻塞,以等待 future注意:promise/future 是“单程票”,用完即废;想来回通信,请换别的同步原语。即set_value 之后 promise 就被废了2.1 std::promise<T> 这是一个类作用:提供者,用于在一个线程中存储一个值(或异常),以便在另一个线程中通过与之关联的 std::future 对象来获取它。它代表了一个异步操作的“承诺”。重要成员函数:(好比期货,美式期权,到底什么时候截断卖出,它是握住开关的人)get_future(): 返回一个与 promise 共享状态的 std::future<T> 对象。每个 promise 只能调用一次。注意,只能一次!!!!set_value(const T&) / set_value(T&&): 设置共享状态的值,并令其就绪。立即把值塞进共享状态,唤醒等待的 futureset_exception(std::exception_ptr p): 在共享状态中存储一个异常,并令其就绪。当 future 调用 get() 时,这个异常会被重新抛出。立即把异常塞进共享状态,唤醒后 future.get() 会重抛set_value_at_thread_exit(...): 在当前线程退出时,自动设置值并令共享状态就绪。用于确保线程局部存储对象在 promise 设置值之前不会被销毁的特定场景。延迟到线程退出再塞值,确保线程局部对象生命周期完整2.2 std::future<T>作用:消费者,用于从某个异步操作提供者(如 std::promise 或 std::packaged_task)中获取结果。它代表了一个“未来”才会可用的值。重要成员函数:(它是期权的接收方,promise 设置了 set_value,就立马接受)get(): 最关键的函数。获取共享状态的结果。如果共享状态未就绪,则阻塞当前线程直到就绪。如果共享状态包含值,则返回该值(移动或引用)。注意:get() 只能调用一次,调用后 future 变为无效(valid() 返回 false)。如果共享状态包含异常,则重新抛出该异常。wait(): 阻塞当前线程,直到共享状态就绪。wait_for(std::chrono::duration): 阻塞当前线程一段时间,或直到共享状态就绪。返回一个 std::future_status 枚举值,表示是超时 (timeout)、就绪 (ready) 还是延迟 (deferred,与 std::async 相关)wait_until(std::chrono::time_point): 阻塞当前线程直到某个时间点,或直到共享状态就绪。同样返回 std::future_status。valid(): 检查 future 对象是否与一个共享状态关联。在调用 get() 后,此函数通常返回 false

MYSQL 查询语句的执行函数,绕了一个大圈。

void SQLOperation::Execute(MySQLConn *conn)
{auto result = conn->Query(sql_);        //  写 auto,编译器帮你填类型————“类型推导”promise_.set_value(std::unique_ptr<sql::ResultSet>(result));    //  通知 promise 的对家 future,值已经准备好//  把原始指针 result 交给 std::unique_ptr 接管,让智能指针自动管理 sql::ResultSet 对象的生命周期,无需手动 delete。
}

呼应了 conn 里面的查询执行函数,

//  这是要执行查询语句,并返回查询结果
sql::ResultSet* MySQLConn::Query(const std::string &sql)
{//  C++ 特有的纠错机制try {sql::Statement *stmt = conn_->createStatement();        //  声明将要写入 MYSQL 语句return stmt->executeQuery(sql);                         //  查询结果} catch (sql::SQLException &e) {HandlerException(e);}return nullptr;
}

QueryCallback——在查询任务结束后立刻生成的对象类

那么,生产者在消费者完成了任务后,生产者会用过 promise-future 语法获取查询结果

#pragma once#include <future>
#include <functional>
#include <memory>
#include <cppconn/resultset.h>namespace sql
{class ResultSet;
}//  QueryCallback 是用于集成查询结果、针对查询结果的操作函数
class QueryCallback {
public://  构造函数,并初始化两个 private 成员//  && 是右值引用,是用来保护传入参数的(之前的变量被废),搬运资源,原变量废掉,但变量内容被转移到 QueryCallbackQueryCallback(std::future<std::unique_ptr<sql::ResultSet>> &&future, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb): future_(std::move(future)), cb_(std::move(cb)){}//  future “看一眼” 异步任务的 promise 是否已经就绪,就绪就立即把结果交给回调函数,执行新任务,否则什么都不做直接返回。bool InvokeIfReady() {if (future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {cb_(std::move(future_.get()));      //  到点了就处理return true;}return false;}
private:std::future<std::unique_ptr<sql::ResultSet>> future_;std::function<void(std::unique_ptr<sql::ResultSet>)> cb_;
};

必须要注意到,SQLOperation 里面的函数 void SQLOperation::Execute(MySQLConn *conn),在执行完查询任务后立马构造对象查询结果对象,并且通知传递给生产者。

void SQLOperation::Execute(MySQLConn *conn)
{auto result = conn->Query(sql_);        //  写 auto,编译器帮你填类型————“类型推导”promise_.set_value(std::unique_ptr<sql::ResultSet>(result));    //  通知 promise 的对家 future,值已经准备好//  把原始指针 result 交给 std::unique_ptr 接管,让智能指针自动管理 sql::ResultSet 对象的生命周期,无需手动 delete。
}

这个函数是给生产者的接口,它是可以多次调用的,用来查看查询任务的执行情况,如果刚完成了,那么就得立刻执行数据处理函数。

//  future “看一眼” 异步任务的 promise 是否已经就绪,就绪就立即把结果交给回调函数,执行新任务,否则什么都不做直接返回。bool InvokeIfReady() {if (future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {cb_(std::move(future_.get()));      //  到点了就处理return true;}return false;}

AsyncProcessor——异步的数据处理器

当生产者接收到查询结果,可以选择立即执行数据处理函数,这就是所谓的 “同步处理”。当然也可以做 “异步处理”,只要我们所调用的函数是一个向其他线程派发对应任务的函数就行了。AsyncProcessor 就是这个异步的数据处理器。

#pragma once#include <vector>
#include <mutex>//  有一个点:这个异步处理器实质上是主函数执行的,它统一管理某项资源,比如我们打印的屏幕,最好不要多个线程共同访问一个公共资源(这样不需要上锁),否则需要上锁
//  另外要说明的是,工作线程异步于主线程,那主线程又何尝不是异步于工作线程呢?主线程也可以看作是工作线程的工作线程。
//  QueryCallback 是用于集成查询结果、针对查询结果的操作函数  
class QueryCallback;//  这是一个静态类,无实例化
class AsyncProcessor
{
public:void AddQueryCallback(QueryCallback &&query_callback);void InvokeIfReady();private://  这是本地包装的查询结果的句柄std::vector<QueryCallback> pending_queries_;
};

这个异步处理器是可以堆积生产者发送过来的数据处理委托(观察类型里面有以下组分)

//  这是本地包装的查询结果的句柄std::vector<QueryCallback> pending_queries_;

生产者就是通过这个函数去把数据处理委托给异步处理器的(提供了以下的接口)

//  需要注意右值引用
void AsyncProcessor::AddQueryCallback(QueryCallback &&query_callback)
{pending_queries_.emplace_back(std::move(query_callback));   //  搬运资源,这是待处理的查询结果
}

这个异步处理器会代替生产者本身,去查询对应的MYSQL 查询任务执行完没有,有的话就立刻执行数据处理。

//  这是轮询唤醒,可以多次调用的,绝不阻塞
void AsyncProcessor::InvokeIfReady()
{//  使用迭代器遍历 std::vector<QueryCallback>,注意 循环体里手动调整迭代器,避免 erase 失效。for (auto it = pending_queries_.begin(); it != pending_queries_.end();){if (it->InvokeIfReady())    it = pending_queries_.erase(it);else++it;}
}
/*
InvokeIfReady() 是一个“轻量级事件循环”:扫描就绪任务→执行→擦除;未就绪任务继续留在队列等待下一次扫描。
*/

另外说一句,生产者对于主线程来说是 “其他线程”,那主线程对于生产者来说何尝不是 “其他线程” 呢?况且主线程就一直在休眠,我们可以把异步处理器的运行交给主线程。

我的另一个思考是,之所以委托给异步处理器这一个线程,而非多个线程,是因为单个线程访问专属于的内存资源是安全的,不用所谓的同步通知。故而以后遇到 K 个任务,我们可以多设计 K 个异步处理器,分别访问不同内存资源做对应的数据处理任务。那样就是无锁的了。

测试代码

生产者对于主线程来说是 “其他线程”,那主线程对于生产者来说何尝不是 “其他线程” 呢?况且主线程就一直在休眠,我们可以把异步处理器的运行交给主线程。

异步处理器被我全局变量化了。生产携带 MYSQL 的查询语句,以及对查询结果的数据处理函数,其中这个数据处理函数也只是打印结果而已。当然了,生产者在向 MYSQL 连接池发出查询请求后,他同时也会把对查询结果的数据处理委托给异步处理器,接下来的时间是把持异步处理器的主线程与连接池里的工作线程打交道,核心机制就是 C++ 里的 promise-future 语法。异步处理器会周期性的问询连接池是否查询完毕,其余时间都在睡觉,这大大减轻了 CPU 负担,毕竟查询结果何时结束是一个随机的事,不能太着急,得先睡上一觉再说。

#include "MySQLConnPool.h"
#include "AsyncProcessor.h"
#include <cppconn/resultset.h>
#include <iostream>
#include <thread>
#include <chrono>//  对 MYSQL 查询语句的处理函数
void HandleQueryResult(std::unique_ptr<sql::ResultSet> res)
{   //  逐行获取while (res->next()){std::cout << "lady_name: " << res->getString("U_NAME") <<  std::endl;}std::cout << "/n-------------------------------/n"  <<  std::endl;
}// 全局计数器,用于统计任务完成的数量
std::atomic<int> task_counter{0};//  全局变量化
AsyncProcessor response_handler;        //  异步处理器(轮询唤醒删除查询结果 + 添加查询结果(到列就删))//  生产者线程函数,请注意,pool 是指针
//  生产者线程所想要提交的任务 Task 被具象化为 MYSQL 查询语句(我们不用特地去写任务函数,我们只需要先编写好 MYSQL 命令的字符串数组)
//  生产者还需要根据查询结果作出处理(生产者线程假处理,因为他只是提交到主线程的异步处理器之中,真正处理的是主线程)
void Producer(MySQLConnPool* pool, int producer_id, int num_tasks) {for (int i = 0; i < num_tasks; ++i) {int task_id = producer_id * 1000 + i; // 生成唯一的任务IDstd::cout << "Producer id" << producer_id << " posted task " << task_id << std::endl;//  这是相当于线程池的 Push 任务操作,插入 SQL 命令任务序列之中,连接池的 conn 连接所对应的工作线程已经嗷嗷待哺(只负责查询)auto query_callback1 = pool->Query("SELECT U_NAME FROM TBL_USER", HandleQueryResult);  //  查询方案+处理方案,一同打包进 QueryCallback//  这是相当于线程池的 Push 任务操作,在 worker 执行完 MYSQL 语句后的查询结果还需要插入 QueryCallback 队列之中response_handler.AddQueryCallback(std::move(query_callback1));      //  实行异步处理//  主线程执行完后,还得回来画一个句号std::cout << "Producer id" << producer_id << " posted task " << task_id  << " has been completed. " << std::endl;}
}int main() {const int num_producers = 4;  // 生产者线程数量const int num_tasks_per_producer = 10; // 每个生产者提交的任务数量const int num_conn_in_pool = 2; // 连接池中的工作线程数量//  Initialize the connection pool for the first databaseMySQLConnPool *pool1 = MySQLConnPool::GetInstance("QM_DB");   //  单例模式,全局唯一的连接池//  初始化的时候,就已经启动连接的工作线程了,准备异步的执行查询语句(会等待生产者线程传来)pool1->InitPool("tcp://127.0.0.1:3306;admin;123456", num_conn_in_pool);        //  远程连接+用户+密码+连接数std::vector<std::thread> producers; // 生产者线程集合// 启动生产者线程for (int i = 0; i < num_producers; ++i) {producers.emplace_back(Producer, std::ref(pool1), i, num_tasks_per_producer);}//  Producer 函数的形参正是 std::ref(pool), i, num_tasks_per_producer 三者// 上面执行// Periodically invoke callbacks if they are readywhile (task_counter < num_producers * num_tasks_per_producer){//  这是主线程在执行针对查询结果的处理函数,response_handler.InvokeIfReady();std::this_thread::sleep_for(std::chrono::milliseconds(100));}std::cout << "All tasks completed. Total tasks executed: " << task_counter << std::endl;// 等待所有生产者线程完成for (auto& producer : producers) {producer.join();}return 0;
}

针对于本测试代码,我所访问的 MYSQL 数据库的结构化数据表如下

在这里插入图片描述
代码的执行如下

qiming@qiming:~/share/CTASK/BASIC-COMPONENT/3.1.3-mysqlconnpool$ g++ AsyncProcessor.cpp main.cpp MySQLConn.cpp MySQLConnPool.cpp MySQLWorker.cpp SQLOperation.cpp -o main -lpthread -lmysqlcppconn -std=c++17
qiming@qiming:~/share/CTASK/BASIC-COMPONENT/3.1.3-mysqlconnpool$ ./main 
Producer id2 posted task 2000
Producer idProducer id0 posted task 0
3 posted task 3000
Producer id3 posted task 3000 has been completed. 
Producer id3 posted task 3001
Producer id3 posted task 3001 has been completed. 
Producer id3 posted task 3002
Producer id3 posted task 3002 has been completed. 
Producer id3 posted task 3003
Producer id3 posted task 3003 has been completed. 
Producer id3 posted task 3004
Producer id3 posted task 3004 has been completed. 
Producer id3 posted task 3005
Producer id3 posted task 3005 has been completed. 
Producer id3 posted task 3006------------------  略去后面的长输出  -------------------------

最后,再给出这幅图,让大家再复习一下

在这里插入图片描述

总结

promise-future 是相当重要的语法。它是线程之间通信的基础。正因为有了他,生产者就好像不用干活一样,它可以只负责派活给 MYSQL 连接池(MYSQL 查询任务),而后立即把数据处理任务委托给异步处理器。异步处理器会根据 promise-future 的信号机制,与连接池里对应的工作线程打交道,时常问 “你准备好了没有?准备好的话我要处理了。”,发现都没准备好,那就先休眠。


文章转载自:

http://HncA1VYA.bmtyn.cn
http://t80xeY1n.bmtyn.cn
http://9TdfIEXN.bmtyn.cn
http://Z0zKZGUs.bmtyn.cn
http://z6fJAGgo.bmtyn.cn
http://hOHByY63.bmtyn.cn
http://CDH2z4hT.bmtyn.cn
http://AFsUuHta.bmtyn.cn
http://7cQsx6cm.bmtyn.cn
http://z2SmZU94.bmtyn.cn
http://NHzbtNCD.bmtyn.cn
http://nLae4VCn.bmtyn.cn
http://2lbrUiOm.bmtyn.cn
http://GDji1aL1.bmtyn.cn
http://OODyn3fE.bmtyn.cn
http://1KAAqThb.bmtyn.cn
http://PmFc9qkN.bmtyn.cn
http://Pign59q5.bmtyn.cn
http://9vatneZL.bmtyn.cn
http://ER3CUK7P.bmtyn.cn
http://AgEAEvru.bmtyn.cn
http://6dA7ccvz.bmtyn.cn
http://zQLLZp0S.bmtyn.cn
http://6gbxf9si.bmtyn.cn
http://b9TRSCyR.bmtyn.cn
http://cWhMlFiz.bmtyn.cn
http://IxiIfbJ3.bmtyn.cn
http://hPalrxeV.bmtyn.cn
http://WCKuNjcB.bmtyn.cn
http://oanMftoV.bmtyn.cn
http://www.dtcms.com/a/368433.html

相关文章:

  • 【FastDDS】Layer Transport ( 01-overview )
  • 算法备案全流程-纯干货
  • Linux 进程信号的产生
  • 【华为Mate XTs 非凡大师】麒麟芯片回归:Mate XTs搭载麒麟9020,鸿蒙5.1体验新境界
  • Swift 解题:LeetCode 372 超级次方(Super Pow)
  • 深入理解 JVM 字节码文件:从组成结构到 Arthas 工具实践
  • C# 阿里云 OSS 图片上传步骤及浏览器查看方法
  • JVM新生代和老生代比例如何设置?
  • 基于OpenGL封装摄像机类:视图矩阵与透视矩阵的实现
  • MySQL 8.0.36 主从复制完整实验
  • 无需bootloader,BootROM -> Linux Kernel 启动模式
  • 【Vue3+TypeScript】H5项目实现企业微信OAuth2.0授权登录完整指南
  • 为什么MySQL可重复读级别不能完全避免幻读
  • Gradle Task 进阶:Task 依赖关系、输入输出、增量构建原理
  • 串口通信基础知识
  • webshell及冰蝎双击无法打开?
  • Doris 数据仓库例子
  • 从零构建企业级LLMOps平台:LMForge——支持多模型、可视化编排、知识库与安全审核的全栈解决方案
  • 如何根据Excel数据表生成多个合同、工作证、录取通知书等word文件?
  • Highcharts 数据源常见问题解析:连接方式、格式处理与性能优化指南
  • T06_RNN示例
  • 【Android】Room数据库的使用
  • CoolGuard风控系统配置评分卡、权重策略|QLExpress脚本
  • 【FastDDS】Layer Transport ( 02-Transport API )
  • 确保 SQL Server 备份安全有效的最佳实践
  • 盘点完今年CoRL最火的VLA论文,发现最强的机器人,竟是用“假数据”喂大的
  • 新闻丨重庆两江新区党工委副书记、管委会主任许宏球一行莅临华院计算考察指导
  • 基于YOLO目标检测模型的视频推理GUI工具
  • latex公式符号与字体
  • SQL Server事务隔离级别