第三章:工作线程池
目录
第一节:介绍
第二节:代码编写
第三节:测试
下期预告:
第一节:介绍
工作线程池的作用是缓存一些不着急完成的任务,这些任务由子线程来完成,以达到削峰填谷的作用。
在消息队列中,客户端的信道发送请求时,会把请求抛入线程池,由子线程发送请求,主线程继续发送其他请求;服务器推送消息时,也会把推送任务抛入线程池,由子线程完成,主线程继续接受其它请求。
第二节:代码编写
在mqcommon目录下添加名为mq_threadpool的文件,打开并防止重复包含、添加头文件、声明命名空间:
#ifndef __M_THREADPOOL_H__ #define __M_THREADPOOL_H__ #include <iostream> #include <functional> #include <memory> #include <thread> #include <future> #include <vector> #include <condition_variable> namespace zd {}; #endif
然后设置线程池要用的成员变量:
class threadpool { private: // 函数类型:统一任务池的函数类型 using Functor = std::function<void(void)>; // 无参数、返回值void public: using ptr = std::shared_ptr<threadpool>; // 方便外部使用线程池的智能指针 private: std::atomic<bool> _stop; // 线程池状态 std::mutex _mtx; // 互斥锁 std::condition_variable _cv; // 条件变量 std::vector<Functor> _taskpool; // 共享任务池 std::vector<std::thread> _threads; // 工作线程池 };
构造函数只需要传入该线程池的线程数量即可:
// thr_count:工作线程的数量 threadpool(size_t thr_count = 2): _stop(false) { for(int i = 0;i < thr_count;i++) { _threads.emplace_back(&threadpool::entry,this); } }
设置一个私有成员函数entry(),它是每个线程的执行函数,功能是死循环的从任务池拉取任务去执行,然后设置一个条件变量,只有当 线程池销毁 或者 线程池有任务 时才让线程继续执行,否则线程一直阻塞:
// 取出任务的函数 // 每个工作线程初始化时传入的执行函数 void entry() { // 创建一个线程私有任务池,用于向共享任务池获取任务 std::vector<Functor> thr_taskpool; // 调用stop函数,_stop设置为true,子线程就可以执行完毕了 while(!_stop) { // 上锁 std::unique_lock<std::mutex> lock(_mtx); // 解锁: 线程池销毁 || 有任务到来 // 等待成功会自动上锁 _cv.wait(lock,[this](){return _stop || !(_taskpool.empty());}); // 该线程获得任务池的所有任务 thr_taskpool.swap(_taskpool); // 解锁 lock.unlock(); // 该线程执行私有线程池的所有任务 for(auto& task:thr_taskpool) { task(); } thr_taskpool.clear(); } }
设置一个对外的接口 stop(),它的作用是关闭这个线程池:
// 终止线程池 void stop() { // 如果线程已经回收过了,则不需要再回收了 if(_stop == true) return; _stop = true; // 线程池标记为停止 _cv.notify_all(); // 唤醒所有工作线程 // 回收线程 for(auto& thread:_threads) { thread.join(); } }
然后析构函数对 stop 进行一个调用,目的是防止线程池对象销毁之前没有回收线程:
~threadpool() { // 如果忘记回收线程,析构函数自动回收 stop(); }
最后是最重要的 push() 接口,它的作用是向线程池抛入一个外部任务:
// func:用户要执行的函数 // ...args:该函数的参数 // func会由push封装成异步任务——package_task,并使用lambda生成可调用对象,抛入任务池中,等待工作线程执行 // 执行完成后将future返回,供外部获取执行结果 template<class F,class ...Args> auto push(F&& func,Args&& ...args) -> std::future<decltype(func(args...))> //->decltype(func(args...)):推导func的返回值类型,同步给push的返回值类型auto { // 得到func的返回值类型 using return_type = decltype(func(args...)); // 将func包装成无参函数——参数void auto f = std::bind(std::forward<F>(func),std::forward<Args>(args)...); // 1.封装:函数->package_task auto ptask = std::make_shared<std::packaged_task<return_type()>>(f); std::future<return_type> ft = ptask->get_future(); // 2.封装:package_task->lambda——返回值void-无参数 auto task_func = [ptask](){(*ptask)();}; // 访问共享资源任务池,上锁 _mtx.lock(); // 3.将lambda抛入任务池 _taskpool.push_back(task_func); // 解锁 _mtx.unlock(); // 4.唤醒一个工作线程去执行任务 _cv.notify_one(); return ft; }
这里解释一下,上述代码中的 ft 是一个存放了外部任务执行结果的结构体,类型是:
std::future<结果类型>,它也是一个模板,但是push又需要返回对应的结构体类型(即std::future<结果类型>)。
所以使用 ->std::future<decltype(func(args...))> 。decltype(func(args...)) 推导出外部任务的返回值类型后,将返回值类型(例如int),组合成std::future<int>,并将其同步给push的返回类型auto。
如果只使用 auto,它会因为所推到的类型过于复杂而报错。
第三节:测试
在mqtest中新建文件mq_pool_test.cc,打开并添加头文件:
#include "../mqcommon/mq_threadpool.hpp" #include "../mqcommon/mq_logger.hpp"
然后设置一个加法任务,不断地抛入线程池,加法任务不仅要打印结果,还要打印执行这个任务的子线程id:
int Add(int n1,int n2) { LOG("%ld:%d",pthread_self(),n1+n2); return n1+n2; } int main() { zd::threadpool thrpool(3); for(int i = 0;i < 20;i++) { std::future<int> ft = thrpool.push(Add,11,i); std::this_thread::sleep_for(std::chrono::seconds(1)); // 主线程休眠1s } thrpool.stop(); return 0; }
执行结果:
可以看到3个子线程都执行过任务。
下期预告:
所有的辅助代码都完成后,将进入虚拟机模块的实现,首先是交换机管理模块