【LINUX操作系统】线程池——线程部分综合运用并实现一个自己的线程池
目录
1. 认识线程池
线程池的运用场景
工作原理:
2. 实现自己的线程池
搭建框架
引入任务队列
传参————理清各个接口的关系
完成Stop模块
3. 再谈线程安全和重入问题
4. 饿汉懒汉 ---单例模式
5. 死锁与避免死锁的方法
1. 认识线程池
希望阅读本篇的读者都学习过本系列前面的文章,比如自己封装的Thread\Mutex\Cond\Log\生产消费者模型以及C++11等内容,本篇将通过利用自己封装的各个接口实现一个线程池。
线程池是⼀种线程使⽤模式。 就像进程池的池化技术一样,是多线程的调度模式。线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多 个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的 代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并发处理器、处理器内核、内存、⽹络sockets等的数量。![]()
线程池的运用场景
线程池的应⽤场景:•需要⼤量的线程来完成的任务,且完成任务的时间⽐较短。 ⽐如WEB服务器完成⽹⻚请求这样的任 务,使⽤线程池技术是⾮常合适的。因为单个任务⼩,⽽任务数量巨⼤,你可以想象⼀个热⻔⽹站 的点击次数。 但对于⻓时间的任务,⽐如⼀个Telnet连接请求,线程池的优点就不明显了。因为 Telnet会话时间⽐线程的创建时间⼤多了。对性能要求苛刻的应⽤,⽐如要求服务器迅速响应客⼾请求。•接受突发性的⼤量请求,但不⾄于使服务器因此产⽣⼤量线程的应⽤。突发性⼤量客⼾请求,在没 有线程池情况下,将产⽣⼤量线程,虽然理论上⼤部分操作系统线程数⽬最⼤值不是问题,短时间内产⽣⼤量线程可能使内存到达极限,出现错误线程池分为:创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执⾏任务对象中的任务接⼝ ;浮动线程池,其他同上
工作原理:
- 核心组件:
- 任务队列:存储待执行的任务
- 工作线程:从任务队列中获取任务并执行。
- 管理机制:控制线程创建、销毁及任务分配。(ThreadPool类)
工作机制
- 初始化:创建固定数量的线程,进入空闲等待状态。
- 任务提交:用户将任务提交到任务队列。
- 任务执行:工作线程从队列中取出任务并执行。
- 线程复用:任务完成后,线程返回队列等待新任务,避免重复创建销毁。
2. 实现自己的线程池
搭建框架
首先完成框架的搭建,引入之前写的所有文件
#pragma once #include "Pthread.hpp" #include "Log.hpp" #include "Cond.hpp" #include "Mutex.hpp"namespace ThreadPoolModule {using namespace ThreadModule;using namespace LockMoudle;using namespace LogModule;using namespace CondModule;void DefaultTest(){ENABLE_CONSOLE_LOG;LOG(LogLevel::DEBUG) << "Lets GO!";}using thread_t = std::shared_ptr<Thread>;template <typename T>class ThreadPool{public:ThreadPool(){}~ThreadPool(){}private:vector<thread_t> _threads; // 用于存放创建的各个线程int _num; // 根据操作系统的具体情况确定的线程数目,此处我们手动设置即可。queue<func_t> _taskq; // 任务队列}; }
在主函数中,大概有以下四个模块需要我们来完成
第一步,先启动线程:
然后先完成任务调用和线程等待。
错误反思:博主第一次写的时候在Wait()下面调成e->Name了,导致主程序运行太快,没有等待子线程,Lets Go经常不能打完整。大家一定要记着Join子线程呀!另外,Log的年份写错了。
此时的完整代码:
#pragma once
#include "Pthread.hpp"
#include "Log.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"#include <vector>
#include <queue>
#include <iostream>namespace ThreadPoolModule
{using namespace ThreadModule;using namespace LockMoudle;using namespace LogModule;using namespace CondModule;void DefaultTest(){//ENABLE_CONSOLE_LOG;LOG(LogLevel::DEBUG) << "Lets GO!";}using thread_t = std::shared_ptr<Thread>;static int default_num = 5;template <typename T>class ThreadPool{public:ThreadPool(): _num(default_num){for (int i = 0; i < _num; i++){//_threads.emplace_back(Thread(DefaultTest));不用这么写,因为传一个指针进去就可以了_threads.push_back(std::make_shared<Thread>(DefaultTest));LOG(LogLevel::INFO)<<"构建线程"<<_threads.back()->Name();}}void Start(){for (auto &e : _threads){e->Start();LOG(LogLevel::INFO)<<"启动线程"<<e->Name();//sleep(1);}}void Enqueue(){}void Wait(){for (auto &e : _threads){e->Join();LOG(LogLevel::INFO)<<"等待"<<e->Name();}}void Stop(){}~ThreadPool(){}private:std::vector<thread_t> _threads; // 用于存放创建的各个线程int _num; // 根据操作系统的具体情况确定的线程数目,此处我们手动设置即可。std::queue<func_t> _taskq; // 任务队列};
}
使用我们写的测试函数,已经可以看到明显的现象了
引入任务队列
总不能一直用测试函数当任务吧?
任务队列,本质是一个生产者消费者模型中的共享资源,生产者应该是主线程(今天应该是由我们手动设置),所以在Threadpool中需要一个接口HandlerTask来承当消费者角色,从而去共享资源中获得我们传进去的任务,而ThreadPool只需要负责在:
的时候把这个HandlerTask构造进入_threads即可。(也就是在make_shared的时候当作Thread的参数即可)
HandlerTask
![]()
何时解锁呢?处理任务的时候是不需要加锁的,如果处理任务加锁那就真的把线程池变成一个串行任务接口,毫无效率可言;又因为MutexGuard是一个RAII风格的接口,所以:
再把这个Handler引入到ThreadPool的构造函数中:
介绍一下bind函数:
可是到底该用哪种语法?
c++语法规定,非静态的成员函数,就是说函数名+类域,是取不到函数地址的,必须强制加取地址符号,所以必须选第三种。
HandlerTasks还没有结束 。
现在需要给这些进入了Cond条件变量下等待的线程一个唤醒的机会。如果所有线程都去等待,还没有唤醒机制,那么整个线程池就无法工作。
只要在Enqueue下完成这个工作就可以了。如果等待队列中有线程,就唤醒一个前去工作。
void Enqueue(T && in){MutexGuard lockguard(_lock);_taskq.push(std::move(in));if(_wait_num>0){_cond.notify();}}
因为插入的函数是一个右值,所以Enqueue必须采用一个完美转发。
现在只差我们在主程序中取插入一个任务了
Push是单独写在Task.hpp的一个文件。注意,Push是一个右值
此时的完整代码:
#pragma once #include "Pthread.hpp" #include "Log.hpp" #include "Cond.hpp" #include "Mutex.hpp"#include <vector> #include <queue> #include <iostream>namespace ThreadPoolModule {using namespace ThreadModule;using namespace LockMoudle;using namespace LogModule;using namespace CondModule;void DefaultTest(){// ENABLE_CONSOLE_LOG;LOG(LogLevel::DEBUG) << "Lets GO!";}using thread_t = std::shared_ptr<Thread>;static int default_num = 5;template <typename T>class ThreadPool{public:bool IsQueueEmpty(){return _taskq.empty();}void HandlerTask()//func_t {while (true) // 一直不停的执行任务{T t;// 1.拿任务{MutexGuard lock_guard(_lock);while (IsQueueEmpty()){_wait_num++;_cond.wait(_lock);_wait_num--;}t = _taskq.front();_taskq.pop();}// 2.执行任务t();}}ThreadPool(): _num(default_num), _wait_num(0){for (int i = 0; i < _num; i++){//_threads.emplace_back(Thread(DefaultTest));不用这么写,因为传一个指针进去就可以了//_threads.push_back(std::make_shared<Thread>(DefaultTest));// _threads.push_back(std::make_shared<Thread>(std::bind(&HandlerTask,this)));// _threads.push_back(std::make_shared<Thread>(std::bind(HandlerTask,this)));_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask,this)));LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name();}}void Start(){for (auto &e : _threads){e->Start();LOG(LogLevel::INFO) << "启动线程" << e->Name();// sleep(1);}}void Enqueue(T && in){MutexGuard lockguard(_lock);_taskq.push(std::move(in));if(_wait_num>0){_cond.notify();}}void Wait(){for (auto &e : _threads){e->Join();LOG(LogLevel::INFO) << "等待" << e->Name();}}void Stop(){}~ThreadPool(){}private:std::vector<thread_t> _threads; // 用于存放创建的各个线程int _num; // 根据操作系统的具体情况确定的线程数目,此处我们手动设置即可。std::queue<T> _taskq; // 任务队列,t被实例化为task_t// 作为消费者,取任务的时候自然需要锁和条件变量Mutex _lock;Cond _cond;int _wait_num; //计数等待在条件变量中的线程数}; }
传参————理清各个接口的关系
现在想区分是谁打印出来的内容,希望给task_t进行一个传参。而给task_t传参,就需要给HandlerTask传参,HandlerTask被ThreadPool当作参数传到Thread里当作任务函数,HandlerTask这个func_t去调用_taskq里的task_t,所以两个变量类型都需要修改
这样就可以正常运行了。
其中的关键语句:
_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1)));
placeholders::_1 表示在bind函数中,this作为Handler的第一个参数,this被放在参数_1的位置
完成Stop模块
现在的问题如上图所示,整个线程还不会自行停止
我们明明写了wait模块,但是没有执行wait模块。
原因在于:在HandlerTask中,当线程池外部的主程序已经结束了推送任务后,所有的线程都会回到条件变量下等待,等待下一次推任务。就我们的主观意志来说,我们希望现在的线程池已经停止工作了,为了解决这个问题,还需要引入变量来细腻化的控制整个逻辑
void HandlerTask(std::string name)//func_t {while (true) // 一直不停的执行任务{T t;// 1.拿任务{MutexGuard lock_guard(_lock);while (IsQueueEmpty()){_wait_num++;_cond.wait(_lock);_wait_num--;}t = _taskq.front();_taskq.pop();}// 2.执行任务t(name);}}
用is_running来判断是否启动:
启动线程池的时候要设计成true
线程退出需要满足的条件:
1.线程自己退出(本来被卡在条件变量下的线程们得醒过来!)
2.历史任务被处理完了(任务队列为空)
因此,此时不能再入任务了,需要我们调整一下
在之前的代码中,当10次任务都被Equeue之后,Wait函数奉主函数main的命令,Join所有的slave线程,但是所有的slave线程都卡在条件变量之下,wait函数起不了作用。
实现stop函数:
如果池子在启动状态,任务队列又为空,并且所有的线程都在条件变量下等待,说明此时达到了停止线程池工作的时候了。
最后,因为卡在条件变量下的线程一直在while(true)下疯狂的检测while(IsQueueEmpty()),我们再加上如下控制:
第一种情况,对应的是还有任务,但是现在暂时没有了,整个池子不能停;
第二种情况,已经没有任务,并且池子打算停下来
大功告成!
3. 再谈线程安全和重入问题
线程安全:就是多个线程在访问共享资源时,能够正确地执⾏,不会相互⼲扰或破坏彼此的执⾏结果。⼀般⽽⾔,多个线程并发同⼀段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进⾏操作,并且没有锁保护的情况下,容易出现该问题。重⼊:同⼀个函数被不同的执⾏流调⽤,当前⼀个流程还没有执⾏完,就有其他的执⾏流再次进⼊, 我们称之为重⼊。⼀个函数在重⼊的情况下,运⾏结果不会出现任何不同或者任何问题,则该函数被称为可重⼊函数,否则,是不可重⼊函数。学到现在,其实我们已经能理解重⼊其实可以分为两种情况•多线程重⼊函数•信号导致⼀个执⾏流重复进⼊函数
类别 情况 具体描述与示例 线程不安全场景 1. 不保护共享变量的函数 - 问题:多线程直接操作共享变量导致数据不一致。
- 示例:两个线程同时修改全局计数器。
- 解决:使用互斥锁(如pthread_mutex
)保护共享资源。2. 函数状态跨调用变化的函数 - 问题:函数内部状态依赖历史调用结果(如伪随机数生成器 rand()
)。
- 示例:srand(seed)
初始化后,rand()
结果依赖上次调用。
- 解决:使用线程局部存储(TLS)或包装函数加锁。3. 返回静态变量指针的函数 - 问题:静态变量生命周期贯穿程序运行,多线程访问导致竞争。
- 示例:char* get_static_str() { static char buf[] = "test"; return buf; }
。
- 解决:避免返回静态变量,或通过线程局部存储隔离。4. 调用线程不安全函数的函数 - 问题:即使函数本身线程安全,若调用其他不安全函数,仍需保护。
- 示例:函数A调用线程不安全的gethostbyname()
。
- 解决:使用互斥锁包裹调用,或改用可重入版本(如gethostbyname_r()
)。不可重入场景 1. 调用 malloc/free
- 问题: malloc
使用全局链表管理堆,多线程调用需加锁,但可能导致死锁。
- 示例:信号处理函数中调用malloc
可能破坏内存链表。
- 解决:使用线程安全分配器(如pthread_malloc
)或预先分配内存池。2. 调用标准I/O库函数 - 问题:标准I/O函数(如 printf
)常使用全局缓冲区,多线程调用导致输出混乱。
- 示例:两个线程同时调用printf
输出交叉。
- 解决:使用线程安全版本(如fprintf_unlocked
)或加锁。3. 函数内使用静态数据结构 - 问题:静态变量在多线程间共享,导致状态混乱。
- 示例:函数内静态计数器被多个线程修改。
- 解决:改用局部变量或线程局部存储。线程安全场景 1. 仅读全局/静态变量 - 问题:读操作不修改数据,天然线程安全。
- 示例:多线程读取配置文件中的常量。
- 注意:写操作需额外同步。2. 原子操作接口 - 问题:非原子操作可能被中断导致数据不一致。
- 示例:i++
非原子,需用原子操作(如atomic_fetch_add
)。
- 解决:使用原子类型或锁机制。3. 接口执行结果无二义性 - 问题:线程切换导致接口行为依赖执行顺序。
- 示例:银行账户转账接口需保证操作原子性。
- 解决:通过同步机制确保接口原子性。可重入场景 1. 仅使用局部变量 - 问题:局部变量在栈上分配,线程间独立。
- 示例:函数内所有变量均为局部变量。
- 优势:天然可重入,无需额外同步。2. 不调用不可重入函数 - 问题:嵌套调用不可重入函数导致资源竞争。
- 示例:函数内调用malloc
。
- 解决:仅调用可重入函数,或通过锁保护。3. 数据由调用者提供 - 问题:返回静态/全局数据导致共享。
- 示例:函数返回静态缓冲区指针。
- 解决:要求调用者提供缓冲区,或返回动态分配内存(需配合线程安全分配器)。4. 制作全局数据的本地拷贝 - 问题:直接操作全局数据导致竞争。
- 示例:多线程读取同一全局数组。
- 解决:复制全局数据到线程局部变量后再操作。线程安全不⼀定是可重⼊的,⽽可重⼊函数则⼀定是线程安全的。如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重⼊函数若锁还未释放则会产⽣死锁,因此是不可重⼊的。但是线程安全侧重说明线程访问公共资源的安全情况,表现的是并发线程的特点可重⼊描述的是⼀个函数是否能被重复进⼊,表⽰的是函数的特点
4. 饿汉懒汉 ---单例模式
某些类, 只应该具有⼀个对象(实例), 就称之为单例。在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要⽤⼀个单例的类来管理这些数据.
那什么是饿汉懒汉?
吃完饭 , ⽴刻洗碗 , 这种就是饿汉⽅式 . 因为下⼀顿吃的时候可以⽴刻拿着碗就能吃饭 .吃完饭 , 先把碗放下 , 然后下⼀顿饭⽤到这个碗了再洗碗 , 就是懒汉⽅式 .懒汉⽅式最核⼼的思想是 "延时加载". 从⽽能够优化服务器的启动速度,这样的加载方式在Linux中比比皆是(懒汉方式比饿汉方式更实用!),这样就能避免一开始就初始化一个很大的数据并且放在内存中,懒汉只需要申请一个指针即可。
比如用懒汉方式实现单例模式:
template <typename T> class Singleton {static T* inst;public:static T* GetInstance() {if (inst == NULL) {inst = new T();}return inst;} };
而用饿汉方式:
template <typename T> class Singleton { static T data; public: static T* GetInstance() { return &data; } };
只要通过 Singleton 这个包装类来使⽤ T 对象, 则⼀个进程中只有⼀个 T 对象的实例。使用的时候调用Get方法即可。但是懒汉方法虽然效率更高,但是有一个致命问题:线程不安全.第⼀次调⽤ GetInstance 的时候, 如果两个线程同时调⽤, 可能会创建出两份 T 对象的实例.但是后续再次调⽤, 就没有问题了.
将线程池代码改成懒汉单例模式:
单例模式不允许拷贝不允许赋值语句,自然也不允许普通的构造。
将构造函数都设计在private里面就能达到避免构造的目的。
用static修饰一个指针:并在类外初始化static修饰的函数。
并且实现一个GetInstance,用于获得单例模式下唯一的这个线程池
static ThreadPool<T>* GetInstance(){if(_tp==nullptr){_tp = new ThreadPool<T>();}return _tp;}
如何使用?
回归到懒汉的关键问题:在第一次创建的时候,有可能被重入进去,都new一次
如果直接在确定单例是否被创建之前就加锁,其实成本有点高。如果在多线程公用线程池的情况,就会每一个都去拿锁,降低效率。
所以:采用双判断。
最终代码:
#pragma once
#include "Pthread.hpp"
#include "Log.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"#include <vector>
#include <queue>
#include <iostream>namespace ThreadPoolModule
{using namespace ThreadModule;using namespace LockMoudle;using namespace LogModule;using namespace CondModule;void DefaultTest(){// ENABLE_CONSOLE_LOG;LOG(LogLevel::DEBUG) << "Lets GO!";}using thread_t = std::shared_ptr<Thread>;static int default_num = 5;template <typename T>class ThreadPool{public:bool IsQueueEmpty(){return _taskq.empty();}void HandlerTask(std::string name) // func_t{while (true) // 一直不停的执行任务{T t;// 1.拿任务{MutexGuard lock_guard(_lock);while (IsQueueEmpty() && _is_pool_running){_wait_num++;_cond.wait(_lock);_wait_num--;}if (IsQueueEmpty() && !_is_pool_running){break;}t = _taskq.front();_taskq.pop();}// 2.执行任务t(name);}}static ThreadPool<T> *GetInstance(){if(_tp == nullptr){MutexGuard mutex(_mutex);if (_tp == nullptr){_tp = new ThreadPool<T>();}}return _tp;}void Start(){if (_is_pool_running)return; // 假如整个线程池已经启动,就不用再启动了_is_pool_running = true;for (auto &e : _threads){e->Start();LOG(LogLevel::INFO) << "启动线程" << e->Name();// sleep(1);}}void Enqueue(T &&in){if (!_is_pool_running)return;MutexGuard lockguard(_lock);_taskq.push(std::move(in));if (_wait_num > 0){_cond.notify();}}void Wait(){for (auto &e : _threads){e->Join();LOG(LogLevel::INFO) << "等待" << e->Name();}}void Stop(){if (!_is_pool_running)return; // 线程池都停止了/没有启动还stop什么呢!_is_pool_running = false;if (IsQueueEmpty() && _wait_num == _num){_cond.notifyall();}}~ThreadPool(){}private:std::vector<thread_t> _threads; // 用于存放创建的各个线程int _num; // 根据操作系统的具体情况确定的线程数目,此处我们手动设置即可。std::queue<T> _taskq; // 任务队列,t被实例化为task_t// 作为消费者,取任务的时候自然需要锁和条件变量Mutex _lock;Cond _cond;int _wait_num; // 计数等待在条件变量中的线程数bool _is_pool_running; // 判断整个线程池的工作状态// 懒汉单例static ThreadPool<T> *_tp;static Mutex _mutex; // 只用来保护单例进程池ThreadPool(const ThreadPool &pool) = delete;ThreadPool &operator=(const ThreadPool &pool) = delete;ThreadPool(): _num(default_num), _wait_num(0), _is_pool_running(false){for (int i = 0; i < _num; i++){//_threads.emplace_back(Thread(DefaultTest));不用这么写,因为传一个指针进去就可以了//_threads.push_back(std::make_shared<Thread>(DefaultTest));// _threads.push_back(std::make_shared<Thread>(std::bind(&HandlerTask,this)));// _threads.push_back(std::make_shared<Thread>(std::bind(HandlerTask,this)));_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1)));LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name();}}};template <typename T>ThreadPool<T> *ThreadPool<T>::_tp = nullptr;template <typename T>Mutex ThreadPool<T>::_mutex;
}
5. 死锁与避免死锁的方法
死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站⽤不会释放的资源⽽处于的⼀种永久等待状态。![]()
死锁有四个必要条件:
1. 互斥条件,即一个锁资源一次只能被一个线程(执行流)使用
2.请求与保持条件:⼀个执⾏流因请求资源⽽阻塞时,对已获得的资源保持不放
3.不剥夺条件:⼀个执⾏流已获得的资源,在末使⽤完之前,不能强⾏剥夺
4.循环等待条件:若⼲执⾏流之间形成⼀种头尾相接的循环等待资源的关系(A找B要,B找C要。。。。。Z找A要)
在这里我们只提一种可能的解决方法:
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);// 使⽤ std::lock 同时锁定两个互斥锁std::lock(lock1, lock2);
这是C++标准库中锁的用法,如何使用Lock破坏循环机制呢?就像上述代码,lock同时锁住两个锁,就完成了任务。
如何“原子性”的锁住两个锁?只要在lock内部再加一层锁不就行了!
举一反三,10个锁同步锁住,需要11个锁;100个锁同步锁住,需要101个锁。
常见死锁误区
智能指针是否是线程安全的?对于 unique_ptr, 由于只是在当前代码块范围内⽣效, 因此不涉及线程安全问题.对于 shared_ptr, 多个对象需要共⽤⼀个引⽤计数变量, 所以会存在线程安全问题. 但是标准库实现的时 候考虑到了这个问题, 基于原⼦操作(CAS)的⽅式保证 shared_ptr 能够⾼效, 原⼦的操作引⽤计数.因此,智能指针都是安全的。但是STL为了考虑效率,几乎都不是线程安全的。