当前位置: 首页 > news >正文

第三章:工作线程池

目录

第一节:介绍

第二节:代码编写

第三节:测试

下期预告:


第一节:介绍

        工作线程池的作用是缓存一些不着急完成的任务,这些任务由子线程来完成,以达到削峰填谷的作用。

        在消息队列中,客户端的信道发送请求时,会把请求抛入线程池,由子线程发送请求,主线程继续发送其他请求;服务器推送消息时,也会把推送任务抛入线程池,由子线程完成,主线程继续接受其它请求。

第二节:代码编写

        在mqcommon目录下添加名为mq_threadpool的文件,打开并防止重复包含、添加头文件、声明命名空间:

#ifndef __M_THREADPOOL_H__
#define __M_THREADPOOL_H__

#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <vector>
#include <condition_variable>

namespace zd
{};

#endif

        然后设置线程池要用的成员变量:

    class threadpool
    {
    private:
        // 函数类型:统一任务池的函数类型
        using Functor = std::function<void(void)>; // 无参数、返回值void
    public:
        using ptr = std::shared_ptr<threadpool>; // 方便外部使用线程池的智能指针
    private:
        std::atomic<bool> _stop;            // 线程池状态
        std::mutex _mtx;                    // 互斥锁
        std::condition_variable _cv;        // 条件变量
        std::vector<Functor> _taskpool;     // 共享任务池
        std::vector<std::thread> _threads;  // 工作线程池
    };

        构造函数只需要传入该线程池的线程数量即可:

        // thr_count:工作线程的数量
        threadpool(size_t thr_count = 2):
        _stop(false)
        {
            for(int i = 0;i < thr_count;i++)
            {
                _threads.emplace_back(&threadpool::entry,this);
            }
        }

        设置一个私有成员函数entry(),它是每个线程的执行函数,功能是死循环的从任务池拉取任务去执行,然后设置一个条件变量,只有当 线程池销毁 或者 线程池有任务 时才让线程继续执行,否则线程一直阻塞:

        // 取出任务的函数
        // 每个工作线程初始化时传入的执行函数
        void entry()
        {
            // 创建一个线程私有任务池,用于向共享任务池获取任务
            std::vector<Functor> thr_taskpool;
            
            // 调用stop函数,_stop设置为true,子线程就可以执行完毕了
            while(!_stop)
            {
                // 上锁
                std::unique_lock<std::mutex> lock(_mtx);
    
                // 解锁: 线程池销毁 || 有任务到来
                // 等待成功会自动上锁
                _cv.wait(lock,[this](){return _stop || !(_taskpool.empty());});
    
                // 该线程获得任务池的所有任务
                thr_taskpool.swap(_taskpool);
    
                // 解锁
                lock.unlock();
     
                // 该线程执行私有线程池的所有任务
                for(auto& task:thr_taskpool)
                {
                    task();
                }
                thr_taskpool.clear();
            }
        }

        设置一个对外的接口 stop(),它的作用是关闭这个线程池:

        // 终止线程池
        void stop()
        {   
            // 如果线程已经回收过了,则不需要再回收了
            if(_stop == true) return;
    
            _stop = true;     // 线程池标记为停止
            _cv.notify_all(); // 唤醒所有工作线程
    
            // 回收线程
            for(auto& thread:_threads)
            {
                thread.join();
            }
        }

        然后析构函数对 stop 进行一个调用,目的是防止线程池对象销毁之前没有回收线程:

        ~threadpool()
        {
            // 如果忘记回收线程,析构函数自动回收
            stop();
        }

        最后是最重要的 push() 接口,它的作用是向线程池抛入一个外部任务:

// func:用户要执行的函数
// ...args:该函数的参数
// func会由push封装成异步任务——package_task,并使用lambda生成可调用对象,抛入任务池中,等待工作线程执行
// 执行完成后将future返回,供外部获取执行结果
template<class F,class ...Args>
auto push(F&& func,Args&& ...args) -> std::future<decltype(func(args...))>
     //->decltype(func(args...)):推导func的返回值类型,同步给push的返回值类型auto
{
            // 得到func的返回值类型
            using return_type = decltype(func(args...)); 
            // 将func包装成无参函数——参数void
            auto f = std::bind(std::forward<F>(func),std::forward<Args>(args)...); 
    
            // 1.封装:函数->package_task
            auto ptask = std::make_shared<std::packaged_task<return_type()>>(f);
            std::future<return_type> ft = ptask->get_future();
    
            // 2.封装:package_task->lambda——返回值void-无参数
            auto task_func = [ptask](){(*ptask)();};
    
            // 访问共享资源任务池,上锁
            _mtx.lock();
    
            // 3.将lambda抛入任务池
            _taskpool.push_back(task_func);
    
            // 解锁
            _mtx.unlock();
    
            // 4.唤醒一个工作线程去执行任务
            _cv.notify_one();
    
            return ft;
}

        这里解释一下,上述代码中的 ft 是一个存放了外部任务执行结果的结构体,类型是:

std::future<结果类型>,它也是一个模板,但是push又需要返回对应的结构体类型(即std::future<结果类型>)。

        所以使用 ->std::future<decltype(func(args...))> 。decltype(func(args...)) 推导出外部任务的返回值类型后,将返回值类型(例如int),组合成std::future<int>,并将其同步给push的返回类型auto。

        如果只使用 auto,它会因为所推到的类型过于复杂而报错。

第三节:测试

        在mqtest中新建文件mq_pool_test.cc,打开并添加头文件:

#include "../mqcommon/mq_threadpool.hpp"
#include "../mqcommon/mq_logger.hpp"

        然后设置一个加法任务,不断地抛入线程池,加法任务不仅要打印结果,还要打印执行这个任务的子线程id:

int Add(int n1,int n2)
{
    LOG("%ld:%d",pthread_self(),n1+n2);
    return n1+n2;
}

int main()
{
    zd::threadpool thrpool(3);
    for(int i = 0;i < 20;i++)
    {
        std::future<int> ft = thrpool.push(Add,11,i);
        std::this_thread::sleep_for(std::chrono::seconds(1)); // 主线程休眠1s
    }

    thrpool.stop();
    return 0;
}

        执行结果:

                ​​​​​​​        ​​​​​​​        

        可以看到3个子线程都执行过任务。

下期预告:

        所有的辅助代码都完成后,将进入虚拟机模块的实现,首先是交换机管理模块

相关文章:

  • 【蓝桥杯】1.k倍区间
  • VoIP之音频3A技术
  • Lecture 2 - Python
  • 【前端】【功能函数】eachTree,封装一个通用的遍历树结构的模板
  • Java 大视界 -- 深入剖析 Java 大数据实时 ETL 中的数据质量保障策略(97)
  • GMII(Gigabit Media Independent Interface)详解
  • 登录-10.Filter-登录校验过滤器
  • Docker 2025/2/24
  • 互联网上门洗衣洗鞋小程序
  • Python+Flutter前后端分离开发跨平台待办事项APP实战
  • 微信小程序:完善购物车功能,购物车主页面展示,详细页面展示效果
  • R 语言科研绘图 --- 柱状图-汇总
  • CSS编程基础学习
  • 每日一题——顺时针旋转矩阵
  • 基于 GEE 计算并下载研究区年均叶面积指数 LAI 和光合有效辐射分量 FPAR
  • 量子计算在金融风险评估中的应用:革新与突破
  • dify本地部署
  • Python--函数高级(上)
  • tauri输入js脚本的方法和注意事项initialization_script
  • 什么是MySql的主从复制(主从同步)?
  • 做网站原型的简单工具/怎么搭建一个网站
  • 青岛网站制作价格/二十个优化
  • 为什么没有网站做图文小说/宁波seo在线优化方案公司
  • 网站建设常见问题处理/郑州seo网站有优化
  • 网站建设 m.ykn.cc/为什么中国禁止谷歌浏览器
  • windows搭建网站/优帮云查询数据云查询