【LINUX操作系统】生产者消费者模型(下):封装、信号量与环形队列
1.封装、完善基于阻塞队列的productor-consumer module
前文中我们封装了自己的Mutex
【LINUX操作系统】线程同步与互斥-CSDN博客
按照老规矩,现在我们对同步与互斥的理解更进一步了,现在把这种面向过程的语言封装成面向对象的写法
1.1 封装条件变量
#pragma once#include <iostream> #include <string> #include <pthread.h> #include "Mutex.hpp"namespace CondModule {class Cond{public:Cond(){pthread_cond_init(&_cond,nullptr);}~Cond(){pthread_cond_destroy(&_cond);}void Wait(){}void notify(){pthread_cond_signal(&_cond);}void notifyall(){pthread_cond_broadcast(&_cond);}private:pthread_cond_t _cond;}; }
在上面的代码中,需要注意
wait
函数,因为让线程在条件变量下等待时,pthread_cond_wait
接口会释放线程所持有的锁,所以需要让该接口接收一个参数,用于pthread_cond_wait
的第二个参数
再由上一文中的对Mutex的封装:
【LINUX操作系统】线程同步与互斥-CSDN博客
1.2 在BlockQueue中使用自己封装的接口
完整的使用自己接口的第二版代码,将代码改成面向对象的风格
//version 2 引入自己的接口 #include <iostream> #include <string> #include <queue> #include <pthread.h> #include <unistd.h> #include "Mutex.hpp" #include "Cond.hpp"namespace BQModule {using namespace CondModule;using namespace LockMoudle;size_t gsize = 5;//整个队列最多装5个数据template<typename T>class BlockQueue{public:bool IsFull(){return _bq.size()==_capacity;}bool IsEmpty(){return _bq.empty();}BlockQueue(): _cwait_num(0),_pwait_num(0),_capacity(gsize){//自己封装的接口都是在定义时就初始化了// pthread_mutex_init(&_lock,nullptr);// pthread_cond_init(&_con_cond,nullptr);// pthread_cond_init(&_pro_cond,nullptr);}void Pop(T* p_data)//designed for Consumer{//pthread_mutex_lock(&_lock);MutexGuard mutexguard(_lock);while(IsEmpty()){_cwait_num++;//bq中已经没有数据,消费者线程需要进入条件变量去等待//pthread_cond_wait(&_con_cond,&_lock);//再次理解为什么wait必须释放锁_con_cond.wait(_lock);_cwait_num--;}*p_data = _bq.front();_bq.pop();//此处,刚刚减少数据,并且还没有解锁,bq中一定没有满,可以唤醒一个生产者线程if(_pwait_num)_pro_cond.notify();//pthread_mutex_unlock(&_lock);}void Enqueue(const T& data)//designed for Productor{//pthread_mutex_lock(&_lock);MutexGuard mutexguard(_lock);while(IsFull()){_pwait_num++;//bq中数据已经满了,生产者需要进入条件变量去等待_pro_cond.wait(_lock);_pwait_num--;}_bq.push(data);//此处,刚刚加入数据,并且还没有解锁,bq中一定有数据,可以去唤醒一个消费者条件变量中的线程if(_cwait_num)//pthread_cond_signal(&_con_cond);_con_cond.notify();//pthread_mutex_unlock(&_lock);}~BlockQueue(){//自己封装的类在析构时都会自动释放资源// pthread_mutex_destroy(&_lock);// pthread_cond_destroy(&_con_cond);// pthread_cond_destroy(&_pro_cond);}private:std::queue<T> _bq;Cond _pro_cond; //生产者的条件变量Cond _con_cond; //消费者的条件变量Mutex _lock;//一把需要被各生产者和消费者看到的锁,用于互斥size_t _capacity; //容量int _cwait_num;int _pwait_num;}; }
这样一来,所有的锁都不再需要手动释放,而是代码在RAII风格下自动释放锁。
1.3 实现多生产者多消费者版本
由于我们设计的本来就只有一把锁,消费者与消费者、生产者与生产者这两种关系先天就是互斥且同步的
测试结果:
1.4 传递任务的生产消费模型
交易场所不仅仅用来传递数据,也可以用来传递任务。
假设今天有个Task类,并且按照Task修改主函数中模板:
class Task { public:Task(int x = 1, int y = 1): _x(x), _y(y){}~Task() {}void Excute(){_res = _x+_y;}int res(){return _res;} private:int _x;int _y;int _res; };
using namespace BQModule; void *Consumer(void *arg) // 消费者 {BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){sleep(2);// 1.从bq中获得数据Task data;bq->Pop(&data); // 输出型参数// 2.处理数据data.Excute();std::cout << "consumer get: " << data.res() << std::endl;} }void *Productor(void *arg) // 生产者 {BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){// sleep(2);// 1.从外部获取数据//::data+=10;//+=操作不是原子的,可能发生进入了相同数据的情况int x = rand() % 5 + 1; //[1,5]int y = rand() % 9 + 1; //[1,9]Task t(x, y);// 2.将数据入队列bq->Enqueue(t);printf("productor asked %d+%d=? \n", x, y);} }int main() {srand(time(nullptr) ^ getpid());BlockQueue<Task> *bq = new BlockQueue<Task>;// pthread_t p1,p2,c1,c2,c3;pthread_t p1, c1;pthread_create(&p1, nullptr, Productor, (void *)bq);pthread_create(&c1, nullptr, Consumer, (void *)bq);pthread_join(p1, nullptr);pthread_join(c1, nullptr);delete bq;return 0; }
也可以使用using task_t =function<void()>来实现,直接把task_t当作BlockQueue的类型即可。
2. 进一步理解 生产消费者模型
互斥访问,何以高效?
生产消费模型都是在各种锁或者信号量的控制下进行的,那是否能说明模型中的各个线程都是串行执行的呢?串行执行难道不是有违设计线程的初衷,降低了整体效率吗?
由以上的传递任务的模型不难看出,生产消费模型并不是单纯的用于在阻塞队列中去传递数据(一些网络服务器虽然确实是做这个的),他还有很多其他运用场景,比如:玩卡牌游戏,一个线程用于获得键鼠等外设上的信息(生产者),另一个(可能是好几个)线程用于处理背后的逻辑(消费者),那其实对于整个进程来说,线程A外设上等待信息的时间和线程B处理背后逻辑的时间才是真正花时间的,而等待信息和处理逻辑的时候,显然两个线程是在并行。
超市中去放货取货只是整个进程中损耗很小的一部分,不应该由此担心是不是变成了串行。
3. POSIX信号量
信号量(Semaphore)是操作系统中用于进程或线程同步的核心工具,其本质是一个非负整数计数器,用于表示共享资源的剩余数量或访问权限的状态。通过计数器的增减操作,信号量可以协调多个进程/线程对临界资源的访问,避免竞争条件(Race Condition)和数据不一致问题。
在进程间通信的时候我们大致介绍过基于systemV的信号量,现在我们介绍POSIX信号量:
之前我们的mutex都是对于资源的整体预订,而信号量是一种不用去整体预定的策略。
信号量就像买电影票,买了电影票表示预定了这个座位:一个大数组,我不需要全部预定完,可以只使用一个空间。信号量就是对剩余座位的计数器,表示当下还有多少“部分资源”可供使用。
POSIX的信号量多用于满足线程间同步。
就像mutex的锁一样,信号量sem本身就是临界资源,所以信号量的--(P操作)或者++(V操作)本身必须是原子的。
所以对于锁来说(mutex),其实就是一个二元信号量去控制,也就是对于该信号量:非0即1。整体资源还存在、没被拿就是1,被拿了(在讲锁的原理的时候提到过,本质是swap,是原子性的汇编操作)就是0
- P操作(
sem_wait
):也叫等待信号量
减少信号量的值。若结果为负,则阻塞调用线程(或进程,取决于信号量类型),直到信号量非负。
- 函数原型:
int sem_wait(sem_t *sem);
- 行为:
- 若信号量值 > 0,则减1并继续执行。
- 若信号量值 = 0,则线程被挂起,直到其他线程调用
sem_post
增加信号量值。- V操作(
sem_post
):也叫发布信号量
增加信号量的值。若信号量原本为0(即有线程因sem_wait
被阻塞),则唤醒一个阻塞的线程。
- 函数原型:
int sem_post(sem_t *sem);
- 行为:
- 信号量值加1。
- 若存在因
sem_wait
阻塞的线程,则唤醒其中一个。
顺便再把semaphore中其他常用的接口也记录一下(semaphore头文件编译的时候也需要加上-lpthread)
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value); 参数: pshared:0表⽰线程间共享,⾮零表⽰进程间共享 value:信号量初始值销毁信号量 int sem_destroy(sem_t *sem);
value表示一开始有的“部分资源”的数量。比如这种情况就该设置为16,如果已经有一个被使用就设置成15。
中间的参数都默认设置成0即可。
4. 基于环形队列的⽣产消费模型
循环队列版本的生产消费模型本质就是利用了信号量实现并发处理,在前面的阻塞队列版本,不论是单生产、消费线程版本还是多生产、消费线程版本,都无法做到生产的同时消费,但是实际上如果可以以信号量来控制资源的使用逻辑就不影响了。
因此,今天我们选择用一个数组vector来作为存放数据的基本容器
循环队列原理
head是循环队列的头,tail是循环队列的尾。整个队列为空时,head和tail同时指向同一个位置(如图)。一旦开始插入数据,head指向下一个会被使用的数据(也就是整个队列的第一个数据,队列的头,消费者下一次就从head处拿出数据),tail指向下一个即将插入数据的位置(也就是整个队列的第一个空位置,生产者生产出数据就放在tail指向的位置),整个循环队列满的时候,head和tail也都指向同一个位置。可以用取模的方法保证tail会去转圈。
资源,分为数据、空间。
对于生产者来说,tail从0开始,对空间资源进行P操作(--),对数据资源进行V操作(++)
对于消费者来说,head从0开始,对空间资源进行V操作,对数据资源进行P操作
分析:刚开始没有数据的时候,head和tail指向同一个位置,难道不会让两个线程发生竞态条件吗?此时没有数据,数据资源的计数器----数据资源的信号量为0,该信号量自动不让消费者进入,保证生产者先进行原子性生产(对空间资源进行V操作)
最后数据满的时候,head和tail又指向同一个位置,此时没有空间了,所有的空间都有数据,空间资源的计数器----空间资源的信号量为0,该信号量自动不让生产者进入,保证消费者先进性原子性消费(对数据进行P操作)。
注意,至于每次等待的队列到底是FIFO还是随机争抢,这个取决于操作系统的实现方法
代码实现
借鉴下之前写BlockQueue的主程序,稍加改造
【LINUX操作系统】生产者消费者模型(上):概念与阻塞队列-CSDN博客
(记得再改改变量名,bq是blockqueue的缩写......)
构建ringbuffer类:
namespace RingBufferModule {template <typename T>class ringbuffer{public:ringbuffer(int init_sem_num=DEFAULTSENMVAL):_p_pos(0),_c_pos(0),_ring(SIZE),_size(SIZE){int n = sem_init(&_data_sem,0,init_sem_num);if(n<0){std::cerr<<"sem init error"<<std::endl;}int m = sem_init(&_space_sem,0,init_sem_num);if(m<0){std::cerr<<"sem init error"<<std::endl;}}void Enqueue(const T& in)//给生产者用{//1.获取数据//2.入环}void Pop(T* out)//给消费者用{}~ringbuffer(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);}ringbufferprivate:std::vector<T> _ring; //存储数据的基本容器,也是重点临界资源size_t _size; //整个环的大小sem_t _data_sem;//数据资源信号量sem_t _space_sem;//空间资源信号量int _p_pos;//生产者下标位置,即tail,表示下一次放数据的地方int _c_pos;//消费者下标位置,即head,表示下一次取数据的地方}; }
对于这个循环队列,需要有Enqueue和Pop的接口,分别给生产者和消费者用于放数据。
前几次都是先用最基本的C库接口封装,然后自己封装一个面向对象的类,再替换接口,这次直接先封装,再调接口。
就可以改变我们刚刚的构造函数
并且完成具体逻辑的函数,将各种sem_wait还有sem_post就都封装起来了。
在一开始数据为空的时候,不用担心消费者进来,因为此时_data_sem无法进行P操作,会阻塞在原地。
测试
这样,我们的循环队列保证在为空为满时都是进行“互斥与同步”,而在其余时刻,各自访问各自的信号量,提升了一丢丢的效率。
5. 多生产多消费的循环队列生产消费模型
再加入其他生产者消费者之后,关系又恢复成了需要控制:生产者与生产者、消费者与消费者,生产者与消费者。我们采取如下思路:给生产者们加一个锁,所有的生产者竞争出来一个线程去参加这次循环队列的sem的PV操作;给所有的消费者一个锁,所有的消费者竞争出来一个线程去参加这次循环队列的sem的PV操作。
依然是直接引进我们自己实现的RAII风格的锁。【LINUX操作系统】线程同步与互斥-CSDN博客
现在的问题是,锁在哪里加效率更高呢?
比如Pop,是在MutexGuard后使用data_sem.P()还是在MutexGuard之前呢?
两者都能完成只竞争出一个进入循环队列的栈,区别在于:
在1,先锁住,只有一个去进行数据信号量的“取电影票”,然后正常执行。
在2,先让可以取电影票的线程(消费者们)都取一张电影票,然后再锁住,依次让人进入电影院获取资源。自然,方案2效率更高。一次性的并发_data_sem.P()可以减少之后串行调用该接口的时间。
这样依然不担心生产者与消费者的互斥,因为能拿到票这一步已经天然的构成了一道屏障,只要拿到了票就一定不会冲突,可以各自使用各自的部分资源。
最终代码:
#define SIZE 10using namespace LockMoudle;namespace RingBufferModule
{template <typename T>class ringbuffer{public:ringbuffer(): _p_pos(0), _c_pos(0), _ring(SIZE), _size(SIZE), _data_sem(0), _space_sem(SIZE){}void Enqueue(const T &in) // 给生产者用{_space_sem.P();{MutexGuard lockguard(_p_mutex);_ring[_p_pos] = in;_p_pos++;_p_pos %= _size;}_data_sem.V();}void Pop(T *out) // 给消费者用{// MutexGuard lockguard(_c_mutex);//锁在这里1_data_sem.P();{MutexGuard lockguard(_c_mutex); // 锁在这里2*out = _ring[_c_pos];_c_pos++;_c_pos %= _size;}_space_sem.V();}~ringbuffer(){// 都自动销毁了}private:std::vector<T> _ring; // 存储数据的基本容器,也是重点临界资源size_t _size; // 整个环的大小int _p_pos; // 生产者下标位置,即tail,表示下一次放数据的地方int _c_pos; // 消费者下标位置,即head,表示下一次取数据的地方Sem _data_sem; // 数据资源信号量Sem _space_sem; // 空间资源信号量Mutex _p_mutex;Mutex _c_mutex;};
}