30.线程的互斥与同步(四)
线程池
线程池V1(单生产者多消费者):
#pragma once#include <pthread.h> #include <vector> #include <queue> #include "Thread.hpp" #include "Mutex.hpp" #include "Cond.hpp" #include "Log.hpp"namespace threadpool {using namespace ThreadModule;using namespace MutexModule;using namespace CondModule;using namespace LogModule;static const int gnum = 5;template <typename T>class ThreadPool{private:void WakeUpAllThread(){LOG(LogLevel::DEBUG) << "唤醒全部线程";{MutexGuard mutexguard(_mutex);// 有线程休眠才唤醒if (_csleep_num > 0)_cond.Broadcast();}}public:ThreadPool(int num = gnum) : _num(num), _isrunning(false), _csleep_num(0){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){HandlerTask();});}}void Start(){if (_isrunning)return;_isrunning = true;for (auto &thread : _threads){thread.Start();LOG(LogLevel::DEBUG) << "线程启动了, " << thread.GetName();}}void Stop(){if (!_isrunning)return;_isrunning = false;// 唤醒所有线程WakeUpAllThread();}void HandlerTask(){while (true){T t;{MutexGuard mutexguard(_mutex);while (_isrunning && _q.empty()){_csleep_num++;_cond.Wait(_mutex);_csleep_num--;}// running为false && 队列为空if (!_isrunning && _q.empty()){break;}t = _q.front();_q.pop();}t();}}// 生产者发任务void EnQueue(const T &data){if (!_isrunning)return;// 加锁:生产者消费者互斥,生产者和生产者互斥// 条件变量:生产者消费者同步{MutexGuard mutexguard(_mutex);_q.push(data);// 全部都在休眠,唤醒一个if (_csleep_num == _num)_cond.Signal();}}void Join(){if (_isrunning)return;for (auto &thread : _threads){thread.Join();LOG(LogLevel::DEBUG) << "线程 " << thread.GetName() << " 回收成功";}}private:std::vector<Thread> _threads; // 线程数组int _num; // 线程的数量std::queue<T> _q; // 任务队列Mutex _mutex; // 锁Cond _cond; // 为空的条件变量int _csleep_num; // 消费者休眠数量bool _isrunning; // 运行状态}; }
易错点1:
写的时候忘记初始化_csleep_num为0了,未初始化的值是一个很大的负数。
流程:
1.启动线程池后,队列为空,全部阻塞在条件变量处。
2.入10个任务进队列,每次判断_csleep_num == _num,加不到等于,还是全部阻塞
3.暂停线程池,_isrunning改为true,但_csleep_num>0为假,从而没有做唤醒工作,还是全部阻塞
线程就全部没退出了,全都在条件变量处阻塞。
细节1:
线程池停止运行条件:任务处理完了 && _isrunning==false
线程池运行条件:任务没处理完 && _isruning==true
细节2:
Thread的构造函数的参数类型是function<void()>的类型。
此处采用了绑定this指针这个参数的方式,来达到可以调用HandlerTask()
细节3:
停止线程池要唤醒所有正在休眠的线程,使其能够被终止。
且注意_csleep_num是临界资源,访问要加锁。
细节4:
生产者发任务,任务队列是临界资源,访问时加锁。
当全部线程都在休眠时,要唤醒一个线程使其执行任务。
条件变量的条件是:没有任务且线程池在运行就阻塞
线程安全的单例模式
单例模式的特点某些类, 只应该具有⼀个对象(实例), 就称之为单例.
饿汉实现方式和懒汉实现方式
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
懒汉方式实现单例模式
单例版本线程池(懒汉模式)
#pragma once#include <pthread.h> #include <vector> #include <queue> #include "Thread.hpp" #include "Mutex.hpp" #include "Cond.hpp" #include "Log.hpp"namespace threadpool {using namespace ThreadModule;using namespace MutexModule;using namespace CondModule;using namespace LogModule;static const int gnum = 5;template <typename T>class ThreadPool{private:void WakeUpAllThread(){LOG(LogLevel::DEBUG) << "唤醒全部线程";{MutexGuard mutexguard(_mutex);// 有线程休眠才唤醒if (_csleep_num > 0)_cond.Broadcast();}}// 构造函数私有ThreadPool(int num = gnum) : _num(num), _isrunning(false), _csleep_num(0){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){HandlerTask();});}}// 拷贝构造和赋值重载禁用ThreadPool(const ThreadPool<T> &threadpool) = delete;ThreadPool &operator =(const ThreadPool<T> &threadpool) = delete;public:void Start(){if (_isrunning)return;_isrunning = true;for (auto &thread : _threads){thread.Start();LOG(LogLevel::DEBUG) << "线程启动了, " << thread.GetName();}}void Stop(){if (!_isrunning)return;_isrunning = false;// 唤醒所有线程WakeUpAllThread();}void HandlerTask(){while (true){T t;{MutexGuard mutexguard(_mutex);while (_isrunning && _q.empty()){_csleep_num++;_cond.Wait(_mutex);_csleep_num--;}// running为false && 队列为空if (!_isrunning && _q.empty()){break;}t = _q.front();_q.pop();}t();}}// 生产者发任务void EnQueue(const T &data){if (!_isrunning)return;// 加锁:生产者消费者互斥,生产者和生产者互斥// 条件变量:生产者消费者同步{MutexGuard mutexguard(_mutex);_q.push(data);// 全部都在休眠,唤醒一个if (_csleep_num == _num)_cond.Signal();}}void Join(){if (_isrunning)return;for (auto &thread : _threads){thread.Join();LOG(LogLevel::DEBUG) << "线程 " << thread.GetName() << " 回收成功";}}static ThreadPool<T> *GetInstance(){LOG(LogLevel::DEBUG) << "获取单例";if (_inst == nullptr){MutexGuard mutexguard(_mutex_inst);if (_inst == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例,创建";_inst = new ThreadPool<T>();_inst->Start();}}return _inst;}private:std::vector<Thread> _threads; // 线程数组int _num; // 线程的数量std::queue<T> _q; // 任务队列Mutex _mutex; // 锁Cond _cond; // 为空的条件变量int _csleep_num; // 消费者休眠数量bool _isrunning; // 运行状态static ThreadPool<T> *_inst; // 单例指针static Mutex _mutex_inst; // 单例锁};// 静态成员属性类外初始化template <typename T>ThreadPool<T> *ThreadPool<T>::_inst = nullptr;template <typename T>Mutex ThreadPool<T>::_mutex_inst; }
核心部分:
构造函数私有,拷贝构造和赋值重载禁用。
单例的指针和锁是静态成员。初始化在类外指定初始化。
获取单例加锁是为了解决多线程并发访问时,同时拿到的都是nullptr,会导致创建两次线程池对象的问题。(创建好了,Start())
点睛之笔:在获取单例时,只需要创建一次,大部分时间都是访问,所以在外部再套一层判断 _inst == nullptr,这样可以保证除了第一次获取单例需要创建的情况要阻塞等待,后续多线程对_inst做判断时就可以并发访问,不用申请锁,提高了效率。
线程安全和重入问题
线程安全:就是多个线程在访问共享资源时,能够正确地执行,不会相互干扰或破坏彼此的执行结果。重入:同⼀个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。重入分为两种情况:1)多线程重入函数2)信号导致一个执行流重复进入函数例如:一个线程在持有锁后,被切走,从用户态到内核态,内核态检查,执行自定义捕捉方法,去申请同一把锁,这时就造成了死锁问题。结论:线程安全不一定是可重入的,可重入的一定是线程安全的。
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源而处于的一种永久等待状态。死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
破坏死锁的四个必要条件破坏循环等待条件问题,资源一次性分配, 使用超时机制、加锁顺序一致。避免死锁算法死锁检测算法(了解)银行家算法(了解)
不是. 原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶). 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全。
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.对于 shared_ptr, 多个对象需要共用⼀个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数。
悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种放式:版本号机制和CAS操作。CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则⽤新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。自旋锁,读写锁,加餐课详细介绍