Linux(生产消费者模型/线程池)
目录
一 生产消费者模型
1. 概念:
2. 基于阻塞队列的生产消费者模型:
1. 对锁封装
2. 对条件变量封装
二 信号量(posix)
1. 概念
2. API
3. 基于环形队列的生产消费者模型
三 线程池
1. 概念
2. 示例
四 补充字段
1. 可重入函数 VS 线程安全
2. 死锁的4个特征
一 生产消费者模型
1. 概念:
在现实生活中,工厂/超市/人,工厂给超市供货,超市缓存货物,人来取货物,避免了人与工厂直接交互,工厂和人只需要与超市进行交互,也就是变相的解耦。
这种模型典型的特征:
- 1种交易场所:超市
超市作为人和工厂进行交互的对象,人拿货物,工厂继续生产,工厂出货物,人处理货物,不存在工厂生产的时候,人去工厂拿货物,工厂停下生产提供货物,反向也是,所以超市是用来提高效率的。
- 2种角色:人和工厂
人就是消费者,工厂就是生产者。
- 三种关系:
工厂和工厂的关系:如果超市还差一个货物满了,工厂与工厂之间同时出货物,超市只能处理一个,剩下处理不了,所以工厂和工厂是互斥关系。
人和人的关系:同理上面,超市只剩一个货物,人与人同时拿,必定有一个拿不到,所以也是互斥关系。
工厂和人:工厂在提供货物的时候,提供到一半,人就拿走了,导致数据不一致问题,所以他们也是互斥关系。
超市有没有货物人怎么知道:生产者知道,生产者生产数据就会告诉消费者。
超市货物满没满生产者怎么知道:人知道,人拿货物就会告诉消费者。
所以工厂和人也有同步关系:没有数据人就等,告诉工厂去生产,反向一样。
2. 基于阻塞队列的生产消费者模型:
首先消费者和消费者之间需要互斥(生产者一样),消费者和生产者之间也需要互斥,交易场所没数据需要生产者通知消费者,反之数据满了消费者通知生产者生产数据。
1. 对锁封装
#include <pthread.h>class mymutex
{
public:// 初始化互斥锁mymutex(){pthread_mutex_init(&_mymutex, nullptr);}// 加锁void lock() { pthread_mutex_lock(&_mymutex); }// 解锁void unlock() { pthread_mutex_unlock(&_mymutex); }// 返回锁地址pthread_mutex_t *getrefmutex() { return &_mymutex; }// 释放锁~mymutex(){pthread_mutex_destroy(&_mymutex);}private:pthread_mutex_t _mymutex;private:// 锁不能拷贝mymutex(const mymutex ©) = delete;mymutex &operator=(const mymutex ©) = delete;
};// 让另一个对象来管理锁的初始化和释放
class mymutexguard
{
public:// 初始化/获取锁mymutexguard(mymutex *mtu) : _mtu(mtu){_mtu->lock();}// 解锁~mymutexguard() { _mtu->unlock(); }
private:mymutex *_mtu;
};
2. 对条件变量封装
#include <pthread.h>class mycond
{
public:// 初始化 条件变量mycond(){pthread_cond_init(&_mycond, nullptr);}// 唤醒条件变量队头的线程void signal_one(){pthread_cond_signal(&_mycond);}// 唤醒条件变量全部线程void signal_all(){pthread_cond_broadcast(&_mycond);}// 线程去条件变量队列中等待void wait(pthread_mutex_t *mymtu){pthread_cond_wait(&_mycond, mymtu);}// 释放条件变量~mycond(){pthread_cond_destroy(&_mycond);}private:pthread_cond_t _mycond;
};
3. 交易场所
#include "mycond.hpp"
#include "mymutex.hpp"// 交易场所
template <class T>
class Pro_co
{
public:// 队列大小为 sizePro_co(int size = 5) : _size(size) {}~Pro_co() {}// 生产数据void mypush(const T &val){// 自动管理锁的加锁和解锁,局部变量初始化自动加锁,出作用域自动调用析构解锁mymutexguard mg(&_mymutex);//_mymutex.lock();// 如果队列满了,去生产者的条件变量等待,等待前自动解锁,唤醒重新申请锁// 注意这里要 循环 判断// 如果采用 if() 如果数据还差一个满了,消费者唤醒一个生产者,新来的生产者比唤醒的先抢到锁,并放入数据,此时队列数据满,// 后续的消费者申请锁,如果被被唤醒的这个生产者也先抢到锁,生产数据,但数据已经满了,出问题// 如果同时唤醒,数据还差一个满了,生产数据出问题,同理上面while (isfull()){// 入生产者队列_mypro.wait(_mymutex.getrefmutex());}// 放入数据_q.push(val);std::cout << "生产了 :" << val << std::endl;//_mymutex.unlock();// 唤醒一个消费者_myco.signal_one();// 出作用域自动解锁}// 消费数据void mypop(T *val){mymutexguard mg(&_mymutex);// 如果队列满了,去消费者的条件变量等待,等待前自动解锁,唤醒重新申请锁// 注意这里要 循环 判断// 如果采用 if() 如果数据还有1个,条件变量里有多个,如果唤醒一个,锁被新来的消费者抢到,消费完数据变成1,// 此时被唤醒的消费者也申请到锁,比生产者先抢到(此时不在条件变量等待队列里,不管数据还有没有),都会执行消费数据,但数据为空,出问题// 如果全部唤醒,数据只有一个,同时消费数据,也会出问题while (empty()){// 去消费者队列等待_myco.wait(_mymutex.getrefmutex());}// 处理数据*val = _q.front();_q.pop();std::cout << "消费了 :" << *val << std::endl;// 唤醒一个生产者_mypro.signal_one();// 出作用域自动解锁}// 队列是否为空bool empty() { return _q.empty(); }// 队列是否为满bool isfull() { return _q.size() == _size; }private:mymutex _mymutex;mycond _mypro;mycond _myco;std::queue<T> _q;int _size;
};
当访问交易场所的时候,只能有一个线程能访问,纯串行访问,效率何在?
这里的效率指的是生产者生产数据和消费者消费数据都需要时间,仅放/拿数据是互斥的,放/拿数据和他们生产任务/处理任务不和交易场所直接关联,也就是通过交易场所进行解耦,互相不冲突达到并行效果,所以总体来说效率高,并且上面是把交易场所当整体使用了,也可以比如交易场所有10个数据,同时放10个消费者取数据,维护他们之间的互斥和同步,也能达到同时消费,进而提高效率,所以下面引入信号量。
二 信号量(posix)
1. 概念
信号量本质是一个计数器,自带同步互斥机制,P/V操作对应--/++,表示资源的数量,一般用来辅助进程/线程同步的。
2. API
#include <semaphore.h>int sem_init(sem_t *sem, // 信号量 int pshared, // 信号量属性unsigned int value // 初始值);
// P--操作
int sem_wait(sem_t *sem);// V++操作
int sem_post(sem_t *sem);// 释放信号量
int sem_destroy(sem_t *sem);
3. 基于环形队列的生产消费者模型
还是和之前一样,生产和消费者之间是互斥关系,但这里不一样,如果生产和消费者指向环形队列的同一个位置,表示队列为空或者满了,这时候只能有一个角色可以进入队列,互斥,但如果不空或者不满,他们指向的位置一定是不一样的,所以可以并行访问队列。
生产和生产者之间是互斥关系,因为STL容器本身不是线程安全的,多个生产者对同一个下标进行操作,有线程安全问题,消费者也是,所以需要锁同步。
1. 封装信号量
#include <semaphore.h>
#include <iostream>// 封装信号量
class mysem
{public:// 初始化信号量值mysem(int size){sem_init(&_mysem,0,size);}// 释放信号量~mysem(){sem_destroy(&_mysem);}// --操作void P(){sem_wait(&_mysem);}// ++操作void V(){sem_post(&_mysem);}private:sem_t _mysem;
};
2. 环形队列
#include "mymutex.hpp"
#include "sem.hpp"
#include <vector>
#include <iostream>// 环形队列
template <class T>
class Circular_queue
{
public:// 初始化信号量,缓冲区大小Circular_queue(int size): _v(size), _cap(size), _prosem(size), _co(0),_head(0),_tail(0){}~Circular_queue() {}// 生产任务void mypush(const T &val){// 信号量自带原子操作,资源不足自动阻塞_prosem.P();{// 生产者和生产者互斥关系mymutexguard guard(&_promutex);_v[_tail++] = val;// 保证环形队列的性质_tail %= _cap;}// 生产数据,告诉消费者有数据_co.V();}// 拿任务void mypop(T *val){// 信号量自带原子操作,资源不足自动阻塞_co.P();{// 消费者和消费者互斥关系mymutexguard guard(&_comutex);*val = _v[_head++];// 保证环形队列的性质_head = _head % _cap;}// 拿走数据,告诉生产者有空间_prosem.V();}private:// 缓冲区/大小std::vector<T> _v;int _cap;// 生产者的锁和信号量和下标mysem _prosem;mymutex _promutex;int _tail;// 消费者的锁和信号量和下标mysem _co;mymutex _comutex;int _head;
};
三 线程池
1. 概念
什么是线程池,顾名思义,池子就是预先预留一块对象,比如内存块,进程/线程....等,先创建出来,想要用直接用,省去自己创建的麻烦,也是提高效率的设计,比如Linux中的按需分配,写时拷贝,CPU缓存.....等等都是对效率有很大的提升。
而线程池就是预先创建一批线程,进行后续合理分配任务。
2. 示例
下面基于生产消费者模型实现的线程池:
#include <iostream>
#include <vector>
#include "thread.hpp"
#include <queue>
#include "mycond.hpp"
#include "mymutex.hpp"
#include <functional>template <class T>
class thread_pool
{
public:// 线程池线程个数thread_pool(int nums) : _nums(nums), _isrunning(false), _threads_nums(0){for (int i = 0; i < _nums; i++){// 创建线程池_v.push_back(mythread(std::to_string(i), std::bind(&thread_pool::mypop, this)));}}// 释放线程~thread_pool(){// for (auto &e : _v)// e.join();}// 启动线程void start(){// 如果线程池在运行直接返回if (_isrunning)return;_isrunning = true;// 启动线程for (auto &e : _v)e.start();}// 缓冲区是否为空bool empty() { return _q.empty(); }// 推送数据void mypush(const T &val){// 生产和消费者保持互斥关系。加锁mymutexguard guard(&_mtu);_q.push(val);// 推送任务,如果有线程在等待则唤醒if (_threads_nums > 0)_cond.signal_one();}// 线程拿任务void mypop(){while (true){T val;{// 消费和消费者是互斥关系,加锁mymutexguard guard(&_mtu);// 如果缓冲区为空并且线程池不停止进行等待while (empty() && _isrunning==true){_threads_nums++;_cond.wait(_mtu.getrefmutex());_threads_nums--;}// 如果缓冲区为空并且线程池停止,则退出if (empty() && !_isrunning){std::cout<<"线程退出"<<std::endl;return;}// 拿走数据val = _q.front();_q.pop();}// 处理任务val();}}// 停止线程void stop(){mymutexguard guard(&_mtu);if (!_isrunning)return;_isrunning=false;std::cout<<"cnt :"<<_threads_nums<<std::endl;_isrunning == false;if (_threads_nums > 0)_cond.signal_all();std::cout << "主线程停止" << std::endl;}private: // 线程池个数int _nums;std::vector<mythread> _v;// 缓冲区std::queue<T> _q;// 条件锁mycond _cond;// 互斥锁mymutex _mtu;// 线程池状态bool _isrunning;// 线程在条件变量等待的个数int _threads_nums;
};
四 补充字段
1. 可重入函数 VS 线程安全
可重入函数:
- 函数被多个执行流调用,出现问题,函数称为不可重入,反之可重入。
- 一般全局对象被多执行流共享,在函数内修改可能出现问题。
线程安全:
- 多个线程访问数据资源,可能因为相互影响造成数据不一致等问题,称为线程安全问题。
- 多个线程对同一个共享对象进行操作,可能异常。
1. 可重入函数一定是线程安全的,因为函数可以被多个执行流进入且不会出现问题。
2. 线程安全不一定可重入,加了锁再次进入该函数,比如递归,就死锁了。
3. 可重入函数强调的是函数有没有问题,线程安全强调的是线程之间会不会互相影响。
2. 死锁的4个特征
互斥条件:一个锁只能一个人使用。
请求与保持条件:我干任何事情(包括申请别人持有的锁),锁还是我的。
不可剥夺条件:你不能抢我的锁。
循环等待条件:我想要一个锁,但这个锁被你拿了,你想要一个锁,但这个锁被我拿了,互相卡住不放,其实是对请求与保持的补充。
避免死锁只需要破坏上述一个条件即可。