Linux操作系统学习之---线程池
一.线程池概念 :
上一节里我们实现了日志这一组件 , 调试信息就有了保障 . 现在可以开始实现线程池了 , 那什么是线程池? 什么又是池化技术?
池化技术 :
池化技术是一个笼统的概念 , 具体的实现有好多好多 :进程池 / 线程池 / 内存池 … , 理解抽象的概念可以从具象化的生活实例入手 : 比如水库 !!!
-
没有水库之前 , 内陆的人们只能去湖泊河流里去打 .
-
这样做有两个弊端 : 一是遇上干季就没有水可以获取 , 二是每次打水都可能要千里迢迢取到湖泊河流边上,效率低下,“远水难救近渴”.
-
如果建立水库 , 就可以解决上述问题 : 解决问题一:干季来临前水库里还有存量,可以应急. 解决问题二: 可以建立管道统一从水库里调水到每家每户 , 集约化供水 , 效率高.
线程池 :
- 顾名思义 , 线程池就是建立一个存放线程的"水库" , 以满足效率的需求.
- 抽象的概念在具象化投入使用后会有更加具体的用途 , 如下 :
一个例子 : 你去餐馆吃饭 , 点了一个西红柿炒鸡蛋 . 厨师处理你这个请求 . 西红柿炒鸡蛋是任务 , 厨师是执行任务的线程
没有线程池时 : 厨师先是去超市里买西红柿和鸡蛋(创建线程) , 然后在回来做菜(执行任务).
有线程池时 : 厨师直接从冰箱里拿出一盘预制西红柿炒鸡蛋(线程早已创建) , 加热好后给你(执行任务)…
总之 , 线程池的核心思想就在于提前创建一定量的线程 , 然后在任务到来时直接执行 .
[!为啥一定要提前创建线程]-
- 创建线程本质就是像操作系统申请内存空间 , 涉及系统调用这个耗时大户 . 需要传递系统调用号/陷入内核/执行系统调用.
- 如果每次一来一个任务才创建一个线程 , 那处理五个任务就需要进行五次系统调用 , 效率崩坏!!!.
- 一次性创建多个线程只需要陷入内核执行一次系统调用函数 , 效率UP!!!
#Linux/线程/线程池的必要
线程池的种类:
线程池主要分为两种 :
- 固定线程数量的线程池 : 一次性创建固定数量的线程 , 线程各自开始循环获取任务和处理任务.
- 浮动线程池 : 主要是线程池运行期间线程的数量可以随需求来动态变化.
我们这里采用线程数固定的线程池.
二.线程池设计(生产者消费者) :
1.准备:
这里要设计的线程池是线程安全的 , 需要互斥和同步机制 , 并且辅以日志来输出调试信息.包含之前自己实现的各个简易组件.
#include "Log.hpp" //日志
#include "mutex.hpp" //互斥锁
#include "Cond.hpp" //条件变量
#include "Thread.hpp" //线程
2.成员变量 :
std::vector<Thread>:用来管理所有创建的线程(消费者).std::queue<T> _task:用于管理到来的任务(消费者)._num: 线程池中线程的个数.Mutex _mutex: 互斥锁 , 用于确保互斥.Cond _cond;: 条件变量 , 用于确保同步;bool _isRun: 用于标识线程池是否正在运行 .
template <typename T>
class ThreadPool
{
private:std::vector<Thread> _threads;std::queue<T> _task;int _num;bool _isRun;Mutex _mutex;Cond _cond;
}
3.基本成员函数(线程创建/启动/终止/回收):
- 构造函数
ThreadPool(int num = gnum)用于创建多个线程. - 函数
void Start()用于批量启动线程 , 并将线程池运行状态_isRun设置为true - 函数
void Stop()用于批量终止线程 , 并将线程池运行状态_isRun设置为false - 函数
void Join()用于回收线程 .
class ThreadPool
{
public:ThreadPool(int num = gnum): _isRun(false),_num(num){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){//执行的任务});}}void Start(){if (!_isRun){for (int i = 0; i < _num; i++){_threads[i].Start();}_isRun = true;}}void Stop(){if (_isRun){for (int i = 0; i < _num; i++){_threads[i].Stop();}_isRun = false;}}void Join(){for (int i = 0; i < _num; i++){_threads[i].Join();}}
private://.....
}
4.线程的任务:
想要让线程执行到任务 , 流程应该是 :
- 将任务添加到任务队列
- 唤醒线程(如果在休眠的话)
- 线程执行任务
但在此之间肯定得先有任务 , 于是先设计一个任务类型,包含在头文件
Task.hpp里**
头文件Task.hpp
#pragma once#include<iostream>
#include<functional>
#include"Log.hpp"
using namespace LogModule;//1任务类型一 , 一个包装器
using func_t = std::function<void()>;void Download()
{LOG(LogLevel::DEBUG) << "我是一个下载任务...";
}//2任务类型二 , 类
class Task
{
public:Task(){}Task(int x, int y):_x(x), _y(y){}void Execute(){_result = _x + _y;}int X() { return _x; }int Y() { return _y; }int Result(){return _result;}
private:int _x;int _y;int _result;
};
5.成员函数 Enqueue(新增成员变量int _sleepNum;)
如果想要插入任务 , 首先要满足一个条件 : 任务队列未满.
- 而判断任务队列是否未满的语句需要加锁保护.(防止任务队列只有一个空间时两个线程都进去了).
一个代码细节 : 因为这是线程池的成员函数 , 聚焦的就是唤醒 , 因此将条件变量的
signal函数封装成私有的WakeUpOne()提升代码可读性
class ThreadPool
{
private:void WakeUpOne() //添加一个私有的函数{_cond.Signal();}
public:
//......bool Enqueue(const T &task){if (_isRun) //线程池还在运行时才能添加任务到任务队列{LockGuard lockguard(_mutex);;//插入任务_task.push(task);//尝试唤醒if(_sleepNum > 0) //如果还有睡眠的线程,则唤醒{WakeUpOne();}return true;}return false;}
//..............
}
6.成员函数Handler
有了任务后 , 再设计**成员函数
Handler**作为线程执行任务的逻辑.
bug版:
关键逻辑:
-
使用互斥锁
LockGuard lockguard确保线程获取任务的互斥性. -
任务队列有元素才能获取任务并执行
while (_task.empty()), 否则进行等待Wait();. -
关键点 : 线程执行任务的动作不用加锁 , 因为消费者生产者模型高效的地方就在于通过互斥机制独立的拿到任务后 , 可以让其他线程也去拿任务 , 从而并发执行各自的任务.
void Handler()
{while (true){T task;{LockGuard lockguard(_mutex);;while (_task.empty()) {_sleepNum++;Wait();_sleepNum--;}task = _task.front();_task.pop();}task(); // 线程获取到任务时就可以丢掉锁了}
}
修改版:
当线程池停止运行 , 但是任务队列里还有任务 , 这时候也应该让剩下的任务得到执行. 考虑到这种情况,
Handler函数里的while循环条件就需要微调.
- 一个线程本身是while的死循环 , 当线程池退出 如果任务队列里的任务全部处理完毕 , 线程池终止后也没有再生产任务 .
- 此时会导致线程在
while (_task.empty()&& _isRun)判断为假后继续试图从任务队列里拿取不存在的任务- 最后执行
task()时就会出错. 所以应该加上判断if(_task.empty() && !_isRun),即当任务队列为空且线程池停止时,break退出循环
void Handler()
{while (true){T task;{LockGuard lockguard(_mutex);;while (_task.empty()&& _isRun) //1. 队列为空&&线程池没有停止 , 才需要进行等待{_sleepNum++;Wait();_sleepNum--;}if(_task.empty() && !_isRun) //2.如果队列为空,线程池停止,则线程也退出{LOG(LogLevel::INFO) << "任务处理完毕 , 线程退出...";break;}task = _task.front();_task.pop();}task();}
}
7.优雅地退出逻辑 :
线程池的基本设计接近尾声 , 但是想要真正实现 线程池终止后线程还能吧剩下的任务处理完后自己乖乖退出还需要再修改一下
Stop()函数
-
此处的关键在于处理
_isRun标志位 -
Handler函数中 , 如果_isRun为假(线程池终止) , 则线程就不会在任务队列为空时等待.同时也能防止线程退出当前while循环后无脑获取不存在的任务 , 从而执行出现问题. -
但是存在一种特殊情况 : 当主线程执行
_isRun = false;时 , 子线程可能正在休眠 , 就没机会进行while (_task.empty()&& _isRun)的判断,从而休眠一辈子!!!(因为此时不会有新任务,也不会唤醒他) -
所以在执行
_isRun = false后 , 还需要唤醒一次所有线程WakeUpAll(), 让线程得以从睡眠中唤醒来执行while循环. -
isRun标志位是共享资源 , 不仅主线程要判断和修改 , 子线程也要判断 . 所以为了避免不稳定的多线程并发的时序问题 , 需要加锁!!!
//暴力终止子线程版....void Stop(){if (_isRun){for (int i = 0; i < _num; i++){_threads[i].Stop();}}}
```c++
```c++//通过修改标志位,让线程自己来决定退出时机.void Stop(){if (_isRun){LockGuard lockguard(_mutex);_isRun = false;WakeUpAll();}}
三.线程池的应用场景 :
线程池的优势在于同一时间存在多个可以快速响应任务的线程 ,
-
数量多的小任务 : 比如
Teinet连接请求 , 处理时间短 . 如果为此临时创建一个线程来处理就很不划算 , 线程池的设计就能实现多个线程低成本的及时并发响应多条请求. -
性能要求苛刻的应用: 如果服务器需要快速响应用户的请求 , 线程池里提前创建的线程就能避免在请求到来时才创建线程导致的效率低下.
-
突发的大量请求 : 如果突发很多请求 , 线程池里已创建的线程就可以及时处理.
其实很好理解 , 一个操作系统运行起来很肯定会有许多不停产生的待处理任务 , 提前创建适量的线程根本就是百利无一害的.
四.单例模式 :
单例模式 , 简单来说就是一个类在运行时只会存在一个对象 . 为了确保这一点 , 往往语法层面来约束对象的创建:
- 将对象的构造函数设为私有 .
- 禁用对象的拷贝构造和赋值构造 , 避免用户私自创建对象.
- 将成员变量设置为静态的 , 实现不用对象也能调用成员函数.
- 如果不是单例模式 , 我们这里的线程池就可以创建出很多对象 .
- 但是线程池是重量级资源 , 一个进程里包含很多线程 , 往往在全局只需要一个线程池就足够了 .
- 比如配置文件的加载就很适合单例模式 , 因为一次实例化全局通用.
1.饿汗的单例模式 :
饿汉模式的核心思想就是在程序运行之前时就实例化出这一个对象.
实现 : 定义一个静态的对象 , 如 static PthreadPool pp,这样在程序运行一开始就会实例化.
缺点 : 1 . 会拖慢程序加载的速度 ; 2 .对象不一定会马上被用到 , 因此过早地实例化为浪费内存资源 .
2.懒汉的单例模式 :
懒汉模式的核心思想是在程序使用到这个对象时才实例化.
实现: 定义一个静态的对象指针 , 如 static PthreadPool* pp , 这样在程序一开始运行时只会占用一个指针的大小 , 节省空间 . 之后使用的了在new空间来初始化.
优点 : 本质就是延迟加载技术 , 在真正使用到时再分配空间实例化 , 在此之前这块内存空间可以给其他进程使用!!!
注意 : 懒汉模式在操作系统的设计里无处不在 , 比如上层malloc一块内存空间 , 操作系统不会立马分配物理空间 , 而是使用虚拟地址为进程画个饼 , 在真正使用到时才通过MMU触发缺页中断从而分配物理内存.
3.单例模式(懒汉)的线程池 :
-
将构造函数私有化 / 禁用拷贝构造 / 禁用赋值构造
-
此时无法在类外实例化对象 , 所以需要一个静态成员指针
static ThreadPool *thread_pool, 并在类外初始化为nullptr. -
为了让上层调用线程池的函数 , 提供一个共有的
ThreadPool<T> *GetInstance()函数 , 返回静态对象(第一次调用时则初始化)供外部调用函数 . -
ThreadPool<T> *GetInstance()函数也得是静态函数 , 因为不能通过this指针调用静态对象static ThreadPool *thread_pool.
性能的优化 :
- 对于是否第一次调用对象的
if (thread_pool == nullptr)判断语句 , 为了防止多个线程同时进入然后创建多个对象 , 必须加锁维持互斥 .- 但如果外部每次调用
GetInstance函数都需要申请锁 , 就会造成效率的损耗(对象在程序运行过程中只会创建一次,理论上也只用加一次锁) , 因此考虑再加一层if (thread_pool == nullptr),就可以解决问题了.
两层锁的限制之下 : 如果两个线程同时进入判断内部 , 其中一个会被锁拦住 , 另一个成功创建对象 , 把_isTrue标志位修改 . 从此以后第一层if判断就能拦住了 , 避免了无谓的加锁.
class ThreadPool
{
private:
//............ThreadPool(int num = gnum) // 私有的构造函数: _isRun(false), _num(num){// 创建线程for (int i = 0; i < _num; i++){_threads.emplace_back([this](){ Handler(); });}}
public:
//......// 禁用拷贝构造ThreadPool(const ThreadPool &obj) = delete;// 禁用赋值构造ThreadPool &operator=(const ThreadPool &obj) = delete;// 对外提供的实例接口static ThreadPool<T> *GetInstance(){if (thread_pool == nullptr){LockGuard lockguard(_static_mutex);LOG(LogLevel::INFO) << "线程实例获取成功!!!";if (thread_pool == nullptr){thread_pool = new ThreadPool(gnum);LOG(LogLevel::INFO) << "线程实例创建成功!!!";thread_pool->Start();}}return thread_pool;}
private:
//.....static ThreadPool *thread_pool; // 静态对象指针static Mutex _static_mutex; // 静态锁(配合静态对象)
}
五.单例模式线程池的代码:
#pragma once
#include "Log.hpp"
#include "mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Task.hpp"#include <vector>
#include <queue>using namespace LogModule;
using namespace MyThreadModule;
using namespace CondModule;
using namespace mutex_module;namespace ThreadPoolModule
{static const int gnum = 5;template <typename T>class ThreadPool{private:void WakeUpOne(){_cond.Signal();}void WakeUpAll(){_cond.Broadcast();}void Wait(){_cond.Wait(_mutex);}ThreadPool(int num = gnum) // 私有的构造函数: _isRun(false), _num(num){// 创建线程for (int i = 0; i < _num; i++){_threads.emplace_back([this](){ Handler(); });}}public:// 禁用拷贝构造ThreadPool(const ThreadPool &obj) = delete;// 禁用赋值构造ThreadPool &operator=(const ThreadPool &obj) = delete;// 对外提供的实例接口static ThreadPool<T> *GetInstance(){if (thread_pool == nullptr){LockGuard lockguard(_static_mutex);LOG(LogLevel::INFO) << "线程实例获取成功!!!";if (thread_pool == nullptr){thread_pool = new ThreadPool();LOG(LogLevel::INFO) << "线程实例创建成功!!!";thread_pool->Start();}}return thread_pool;}void Handler(){while (true){T task;{LockGuard lockguard(_mutex);while (_task.empty() && _isRun) // 队列为空,且线程池没有停止才需要进行等待{_sleepNum++;Wait();_sleepNum--;}if (_task.empty() && !_isRun){LOG(LogLevel::INFO) << "任务处理完毕 , 线程退出...";break;}task = _task.front();_task.pop();}task(); // 线程获取到任务时就可以丢掉锁了}}bool Enqueue(const T &task){if (_isRun){LockGuard lockguard(_mutex);// 插入任务_task.push(task);// 尝试唤醒if (_sleepNum > 0) // 如果还有睡眠的线程,则唤醒{WakeUpOne();}return true;}return false;}void Start(){if (!_isRun){for (int i = 0; i < _num; i++){_threads[i].Start();}_isRun = true;}}void Stop(){if (_isRun){// for (int i = 0; i < _num; i++)// {// _threads[i].Stop();// }LockGuard lockguard(_mutex);_isRun = false;WakeUpAll();}}void Join(){for (int i = 0; i < _num; i++){_threads[i].Join();}}private:std::vector<Thread> _threads;std::queue<T> _task;int _num;int _sleepNum;bool _isRun;Mutex _mutex;Cond _cond;// 静态对象static ThreadPool *thread_pool;static Mutex _static_mutex;};// 类外初始化静态变量template <typename T>ThreadPool<T> *ThreadPool<T>::thread_pool = nullptr;//和静态变量配套一个静态锁template <typename T>Mutex ThreadPool<T>::_static_mutex;
}
