当前位置: 首页 > news >正文

【Linux跬步积累】—— 线程池详解(有源代码)

文章目录

  • 一、如何实现一个线程
    • 1、基本结构
    • 2、实现成员函数
    • 3、演示
    • 4、代码总汇
      • `Thread.hpp`
      • `Main.cc`
  • 二、如何封装线程池
    • 1、设计成员变量
    • 2、构造函数与析构函数
    • 3、初始化
    • 4、启动与回收
    • 5、主线程放入任务
    • 6、子线程读取任务
    • 7、终止线程池
  • 三、测试
  • 四、线程池总代码
    • 1、`ThreadPool.hpp`
    • 2、`Main.cc`

一、如何实现一个线程

1、基本结构

Thread类是用来描述一个线程的,所以成员变量中需要有tid线程名。所以第一个成员变量就是==_tid==;

一个线程创建出来,肯定是用来执行对于的任务的,那么我们还需要一个成员变量来接收传递的函数,所以第三个成员变量就是==_func==,是一个void(T&)类型的参数,所以第四个成员变量就是传递过来的参数_data

template<class T>
using func_t = std::function<void(T&)>;//模版方法

template<class T>
class Thread
{
public:
    Thread(){}
    static void* ThreadRoutinue(){}
    void Start(){}
    void Join(){}
    void Detach(){}
    ~Thread(){}
private:
    pthread_t _tid;          //线程tid
    std::string _threadname; //线程名
    func_t<T> _func;         //线程执行的函数
    T _data;                 //需要处理的数据
};

2、实现成员函数

成员函数中我们需要注意的就是pthread_create()函数,它的第三个参数是一个参数为void *,返回值为void *的一个函数。

但是我们将这个函数ThreadRoutinue()定义在这个类里,他就是一个成员函数,成员函数的参数,会隐藏this指针,所以如果我们正常写,就会报错,因为这个函数不满足pthread_create()函数的条件。

但是只要让ThreadRoutinue()的参数中没有this指针就可以了,那么该如何实现呢?

在前面加上static就可以,变成静态成员变量,就不会有this指针了。那么问题又来了,没有了this指针,我们又该如何访问到成员变量呢?

别忘了这个函数可以传递一个返回值为void*的参数,我们只需要将this指针传递过去,在函数内部强转一下就可以了。

template<class T>
using func_t = std::function<void(T&)>;//模版方法

template<class T>
class Thread
{
public:
    //thread(func,5,"thread-1");
    Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
        :_func(func),_data(data),_threadname(name)
    {}
    //需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
    static void* ThreadRoutinue(void* args)
    {
        //将传过来的this指针强转一下,然后就可以访问到_func和_data了
        Thread<T>* self = static_cast<Thread<T>*>(args);
        self->_func(self->_data);
        return nullptr;
    }
    void Start()
    {
        //创建线程
        int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
        return ret==0;
    }
    void Join()
    {
        pthread_join(_tid,nullptr);
    }
    void Detach()
    {
        pthread_detach(_tid);
    }
    ~Thread(){}
private:
    pthread_t _tid;          //线程tid
    std::string _threadname; //线程名
    func_t<T> _func;         //线程执行的函数
    T _data;                 //需要处理的数据
};

3、演示

让我们写一段测试代码,来看一下效果:

#include"Thread.hpp"

void test(int x)
{
    while(true)
    {
        std::cout<<x<<std::endl;
        sleep(1);
    }
}

int main()
{
    MyThread::Thread<int> mt(test,2025,"thread-1");
    if(mt.Start() == true)
    {
        std::cout<<"MyThread start success!\n";
    }
    else
    {
        std::cout<<"MyThread start failed!\n";
    }
    mt.Join();
    return 0;
}

运行结果:

在这里插入图片描述

让我们使用ps -aL指令来查看一下是否真的创建了线程:

在这里插入图片描述

可以看到,程序运行之后,真的创建出了两个名为mythread的线程。

4、代码总汇

Thread.hpp

#pragma once
#include<string>
#include<pthread.h>
#include<unistd.h>
#include<iostream>
#include<functional>

namespace MyThread
{
    template<class T>
    using func_t = std::function<void(T&)>;//模版方法

    template<class T>
    class Thread
    {
    public:
        //thread(func,5,"thread-1");
        Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
            :_func(func),_data(data),_threadname(name)
        {}
        //需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
        static void* ThreadRoutinue(void* args)
        {
            //将传过来的this指针强转一下,然后就可以访问到_func和_data了
            Thread<T>* self = static_cast<Thread<T>*>(args);
            self->_func(self->_data);
            return nullptr;
        }
        bool Start()
        {
            //创建线程
            int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
            return ret==0;
        }
        void Join()
        {
            pthread_join(_tid,nullptr);
        }
        void Detach()
        {
            pthread_detach(_tid);
        }
        ~Thread(){}
    private:
        pthread_t _tid;          //线程tid
        std::string _threadname; //线程名
        func_t<T> _func;         //线程执行的函数
        T _data;                 //需要处理的数据
    };
}

Main.cc

#include"Thread.hpp"

void test(int x)
{
    while(true)
    {
        std::cout<<x<<std::endl;
        sleep(1);
    }
}

int main()
{
    MyThread::Thread<int> mt(test,2025,"thread-1");
    if(mt.Start() == true)
    {
        std::cout<<"MyThread start success!\n";
    }
    else
    {
        std::cout<<"MyThread start failed!\n";
    }
    mt.Join();
    return 0;
}

二、如何封装线程池

1、设计成员变量

线程池内部维护多个线程和一个任务队列,主线程将任务放入任务队列当中,然后子线程就从任务队列中拿取任务进行处理。

所以需要一个数组来管理多个线程:_threads

以及一个任务队列:_taskQueue

此外我们还需要知道一共有多少个线程:_threadNum

然后还可以设置一个变量来查看真在等待任务的线程数目:_waitNum

最后我们再设置一个变量来判断当前线程池是否运行,如果已经退出了,我们需要将任务队列中的任务处理完再退出:_isRunning

定义这些变量就够了吗?

我们忽略了一个多线程编程中最重要的问题:线程之间的互斥和同步

我们的任务是:主线程往任务队列中放入任务,子线程从任务队列中拿取任务。那么我们思考一下下面几个问题:

  1. 多个线程之间可以同时从任务队列中拿任务吗?

    答:不能,任务队列是临界资源,线程和线程之间要互斥,否则会出现不同的线程拿取同一个任务的情况。

  2. 主线程放入任务时,子线程可以同时拿取任务吗?

    答:不能,主线程和子线程之间也需要互斥。

因为他们都是竞争任务队列这一个资源,所以我们只要定一个一把锁就可以了。这样互斥的问题就解决了。

那么同步呢?

是不是只有任务队列中有任务时,子线程才能获取任务,所以需要主线程先放任务,子线程才能拿任务,这就需要一个条件变量来维护。

综上:

我们还需要两个成员变量:_mutex_cond

#include"Thread.hpp"
#include<vector>

template<class T>
class ThreadPool
{   
private:
    std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
    std::queue<T> _taskQueue;//任务队列
    int _threadNum;//线程数
    int _waitNum;//等待的线程数
    bool _isRunning;//线程池是否在运行

    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _cond;//条件变量
};

2、构造函数与析构函数

构造和析构的主要作用就是对_mutex_cond的初始化和销毁。

同时我们还需要知道这个线程池需要创建多少个线程,所以需要外部传递参数来告诉我们。

然后就是构造函数对其他成员变量进行初始化。

template<class T>
class ThreadPool
{   
public:
    ThreadPool(const int num = 5)
        :_threadNum(num),_waitNum(0),_isRunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
}

3、初始化

上面的构造函数只是创建了锁和条件变量,以及部分变量的初始化,并没有创建出线程对象。

我们可以定义一个ThreadInit()函数来创建线程。

让我们先回顾一下Thread的构造函数需要哪些变量:

Thread(func_t<T> func,const T &data,const std::string &name = "none-name")

func参数是需要调用的函数,data是这个函数需要处理的数据,name是线程名。

在此,我们让线程去执行一个叫做handerTask的函数,这个函数内部实现线程到任务队列中获取任务的过程。

handerTask的第一个参数也是线程的名字,以便在handerTask内部查看是哪个线程执行了任务。

这里我们使用bind函数来将HanderTask函数与this参数绑定在一起,并且将这个参数绑定到 HandleTask 的第一个参数位置。

void HanderTask(std::string)
{
    //执行任务队列的任务
}

void InitThread()
{
    for(int i=0;i<_threadNum;i++)
    {
        auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
        std::string name = "Thread-"+std::to_string(i);
        //_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
        _threads.emplace_back(func,name,name);
    }
    _isRunning = true;
}

4、启动与回收

我们已经创建出来了一批线程,接下来还需要启动这一批线程,并且回收。

因此还需要定义成员函数StartAllJoinAll来启动和等待这批线程。

void StartAll()
{
    for(auto& thread : _threads)
    {
        thread.Start();
    }
}
void JoinAll()
{
    for(auto& thread : _threads)
    {
        thread.Join();
    }
}

5、主线程放入任务

我们可以定义一个EnQueue,用来让主线程往任务队列中投放任务。

投放任务的要求:

  1. 访问队列时需要与其他线程互斥,即对_mutex加锁;
  2. 添加任务后,就可以唤醒在等待的线程了
void EnQueue(const T& task)
{
    pthread_mutex_lock(&_mutex);

    if(_isRunning)
    {
        _taskQueue.push(task);
        if(_waitNum > 0)
        {
            pthread_cond_signal(&_cond);
        }
    }

    pthread_mutex_unlock(&_mutex);
}

6、子线程读取任务

子线程读取任务的要求如下:

  1. 保持互斥,从任务队列获取数据前需要加锁,获取结束后解锁;
  2. 保持同步,如果任务队列中没有数据,就去_cond下等待,等待被唤醒。
void HanderTask(std::string name)
{
    //子线程需要一直处理,所以这里使用死循环
    while(true)
    {
        pthread_mutex_lock(&_mutex);
        while(_taskQueue.empty())//这里是while循环,不是if判断,避免伪唤醒
        {
            _waitNum++;
            pthread_cond_wait(&_cond,&_mutex);
            _waitNum--;
        }
        T task = _taskQueue.front();
        _taskQueue.pop();
        std::cout<<name<<"get a task..."<<std::endl;
        pthread_mutex_unlock(&_mutex);

        task();
    }
}

这里需要注意一点,判断当前任务队列是否为空时,使用的是while循环,而不是if语句,因为当前线程被主线程唤醒之后,可能会发生伪唤醒,其实任务队列中根本没有任务。所以还要进入下一次while判断,确保访问任务队列时,一定是有任务的。

但是目前还有一个问题,如果线程访问任务队列时,线程池被终止了怎么办?

我们可以通过_isRunning来判定,在执行任务时判断一下_isRunning的值:

  1. 如果为true:正常运行
  2. 如果为false:
    • 如果任务队列中还有任务:把任务执行完
    • 如果没有任务:当前线程退出

于是我们的代码改进为:

void HanderTask(std::string name)
{
    //子线程需要一直处理,所以这里使用死循环
    while(true)
    {
        pthread_mutex_lock(&_mutex);
        while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
        {
            _waitNum++;
            pthread_cond_wait(&_cond,&_mutex);
            _waitNum--;
        }
        //线程池终止了,并且任务队列中没有任务 --> 线程退出
        if(_taskQueue.empty()&&!_isRunning)
        {
            pthread_mutex_unlock(&_mutex);
            std::coud<<name<<" quit..."<<std::endl;
            break;
        }

        //走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
        T task = _taskQueue.front();
        _taskQueue.pop();
        std::cout<<name<<" get a task..."<<std::endl;
        pthread_mutex_unlock(&_mutex);

        task();
    }
}

7、终止线程池

终止线程池不仅仅是将_isRunning设置为false这么简单,需要考虑以下问题:

  1. 如果在Stop的时候,有线程正在调用HanderTask函数怎么办?

    答:此时多个线程访问变量_isRunning,就有可能会造成线程安全问题,所以访问_isRunning时也要加锁,由于之前所有的访问_isRunning的操作,都在_mutex锁中,所以和之前共用同一把锁就行。

  2. 如果Stop之后,还有线程在_cond下面等待怎么办?

    答:如果线程一直在_cond下面等待,就会导致无法退出,此时在_isRunning = false之后,还要通过pthread_cond_broadcast唤醒所有等待的线程,让他们重新执行HanderTask的逻辑,从而正常退出。

void Stop()
{
    pthread_mutex_lock(&_mutex);

    _isRunning = false;//终止线程池
    pthread_cond_broadcast(&_cond);//唤醒所有等待的线程

    pthread_mutex_unlock(&_mutex);
}

三、测试

我们可以用以下代码进行测试:

#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include"ThreadPool.hpp"

int Add()
{
    int a = rand() % 100 + 1;
    int b = rand() % 100 + 1;
    std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
    return a+b;
}

int main()
{
    srand(static_cast<unsigned int>(time(nullptr)));

    ThreadPool<int(*)(void)> tp(3);

    tp.ThreadInit();
    tp.StartAll();

    for (int i = 0; i < 10; i++)
    {
        tp.EnQueue(Add);
        sleep(1);
    }

    tp.Stop();
    tp.JoinAll();
    return 0;
}

通过ThreadPool<int(*)(void)> tp(3);创建有三个线程的线程池,执行的任务类型为int(void),但是要注意,此处要传入可调用对象,C++的可调用对象有:函数指针,仿函数,lambda表达式。此处我用了函数指针int(*)(void)

接着ThreadInit初始化线程池,此时线程对象Thread已经创建出来了,但是还有没创建线程。随后调用StartAll,此时才真正创建了线程。

然后进入一个for循环,给任务队列派发任务,总共派发十个任务,都是函数Add,其中生成两个随机数的加法。

最后调用Stop终止退出线程池,此时线程也会一个个退出,然后调用JoinAll回收所有线程。

运行结果:

在这里插入图片描述

四、线程池总代码

1、ThreadPool.hpp

#include"Thread.hpp"
#include<vector>
#include<queue>
#include<string>
#include <unistd.h>
#include <pthread.h>

template<class T>
class ThreadPool
{   
public:
    ThreadPool(const int num = 5)
        :_threadNum(num),_waitNum(0),_isRunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }

    void HanderTask(std::string name)
    {
        //子线程需要一直处理,所以这里使用死循环
        while(true)
        {
            pthread_mutex_lock(&_mutex);
            while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
            {
                _waitNum++;
                pthread_cond_wait(&_cond,&_mutex);
                _waitNum--;
            }
            //线程池终止了,并且任务队列中没有任务 --> 线程退出
            if(_taskQueue.empty()&&!_isRunning)
            {
                pthread_mutex_unlock(&_mutex);
                std::cout<<name<<" quit..."<<std::endl;
                break;
            }

            //走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
            T task = _taskQueue.front();
            _taskQueue.pop();
            std::cout<<name<<" get a task..."<<std::endl;
            pthread_mutex_unlock(&_mutex);

            task();
        }
    }

    void ThreadInit()
    {
        for(int i=0;i<_threadNum;i++)
        {
            auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
            std::string name = "Thread-"+std::to_string(i);
            //_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
            _threads.emplace_back(func,name,name);
        }
        _isRunning = true;
    }

    void StartAll()
    {
        for(auto& thread : _threads)
        {
            thread.Start();
        }
    }
    void JoinAll()
    {
        for(auto& thread : _threads)
        {
            thread.Join();
        }
    }

    void EnQueue(const T& task)
    {
        pthread_mutex_lock(&_mutex);

        if(_isRunning)
        {
            _taskQueue.push(task);
            if(_waitNum > 0)
            {
                pthread_cond_signal(&_cond);
            }
        }

        pthread_mutex_unlock(&_mutex);
    }

    void Stop()
    {
        pthread_mutex_lock(&_mutex);

        _isRunning = false;//终止线程池
        pthread_cond_broadcast(&_cond);//唤醒所有等待的线程

        pthread_mutex_unlock(&_mutex);
    }

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
    std::queue<T> _taskQueue;//任务队列
    int _threadNum;//线程数
    int _waitNum;//等待的线程数
    bool _isRunning;//线程池是否在运行

    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _cond;//条件变量
};

2、Main.cc

#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include"ThreadPool.hpp"

int Add()
{
    int a = rand() % 100 + 1;
    int b = rand() % 100 + 1;
    std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
    return a+b;
}

int main()
{
    srand(static_cast<unsigned int>(time(nullptr)));

    ThreadPool<int(*)(void)> tp(3);

    tp.ThreadInit();
    tp.StartAll();

    for (int i = 0; i < 10; i++)
    {
        tp.EnQueue(Add);
        sleep(1);
    }

    tp.Stop();
    tp.JoinAll();
    return 0;
}

相关文章:

  • 7.1 Hugging Face PEFT 快速入门:参数高效微调实战指南
  • tomcat的安装与配置(包含在idea中配置tomcat)
  • 20_simt_canonical
  • 依赖注入是什么?什么时候要用到依赖注入?为什么相较于使用@Resource或者@Autowired,spring官方更推荐使用构造函数进行依赖注入?
  • Day02-云服务器+小皮phpstudy一键部署建站
  • Pytorch实现之LSRGAN,轻量化SRGAN超分辨率SAR
  • setlocale()的参数,“zh_CN.UTF-8“, “chs“, “chinese-simplified“的差异。
  • postgresql
  • GaussDB自带诊断工具实战指南
  • 青训营:简易分布式爬虫
  • 【Spring Boot 应用开发】-04-02 自动配置-数据源-手撸一个最简持久层工具类
  • 【CF记录】贪心——A. Scrambled Scrabble
  • 计算机毕业设计SpringBoot+Vue.js教师工作量管理系统(源码+文档+PPT+讲解)
  • PHP之变量
  • 前端模拟数据调试的方法
  • io学习----->文件io
  • 什么是索引下推?
  • 机器视觉开发教程——封装Halcon通用模板匹配工具【含免费教程源码】
  • java 查找两个集合的交集部分数据
  • K8s 1.27.1 实战系列(一)准备工作
  • 贵州省总工会正厅级副主席梁伟被查,曾任贵州省纪委副书记
  • 综艺还有怎样的新可能?挖掘小众文化领域
  • 昆明一学校门外小吃摊占满人行道,城管:会在重点时段加强巡查处置
  • 印巴战火LIVE丨“快速接近战争状态”:印度袭击巴军事基地,巴启动反制军事行动
  • 泰特现代美术馆25年:那些瞬间,让艺术面向所有人
  • 国家主席习近平会见斯洛伐克总理菲佐