【Linux手册】生产消费者模型的多模式实践:阻塞队列、信号量与环形队列的并发设计

文章目录
- 前言
- 一. 计算机中的生产消费者模型
- 二. 基于阻塞队列的生产消费者模型
- 三. 信号量
- 四. 基于环形队列的生产消费者模型
前言
在多线程并发编程的领域中,线程间的高效协作与资源协调是构建稳定、高效系统的核心难题。当多个线程共享资源并协同完成任务时,如何避免访问冲突、解决线程执行速度差异引发的冲突,以及实现安全的信息交互,成为开发者必须攻克的关键问题。
生产消费者模型是我们解决多线程同步互斥问题的经典模型,为上述挑战提供了合适的解决方案。该模型通过引入 “生产者”“消费者” 两种角色与共享缓冲区,将线程任务拆解为数据生成、存储与消费的闭环流程:生产者线程负责生成数据并放入缓冲区,消费者线程从缓冲区取出数据进行处理,而同步互斥机制则确保了缓冲区操作的原子性与顺序性,有效避免了数据不一致,临界资源访问冲突等问题。
深入剖析生产消费者模型的设计思想对我们进行多线程编程是百利而无一害的。
本文将从3方面解析生产消费者模型:
- 计算机中的生产消费者模型;
- 基于阻塞队列的生产消费者模型;
- 基于环形队列的生产消费者模型。
一. 计算机中的生产消费者模型
如下图是生产消费者模型示意图:
生产消费者模型主要解决了:生产者与消费者的强耦合关系,生产者在生产数据的时候不需要等消费者使用,而是直接将数据放到缓冲区中;同样消费者要使用数据也不会直接找生产者要,而是直接从缓冲区中进行读取。通过中间的缓冲区将生产者和消费者进行解耦。
通过中间的缓冲区,生产消费者模型有了诸多优势:
- 将生产者与消费者解耦;
- 允许忙闲不均,生产者可以生产大量数据,而消费者只使用一点;
- 支持并发,此处的并发指的是:允许多个生产者一起生产数据,多个消费者一起消费数据;但是不论是向缓冲区写,还是向缓冲区中读,都要保证是线程安全的。
各个任务自己之间的关系:
- 生产者与生产者:互斥,保证向缓冲区中写入是线程安全的;
- 消费者与消费者:互斥,保证从缓冲区中读入是线程安全的;
- 生产者与消费者:互斥 + 同步,保证生产者和消费者访问缓冲区是顺序的。
二. 基于阻塞队列的生产消费者模型
阻塞队列:实现生产消费者模型时最常用的一种数据结构。
将队列作为中间的缓冲区,生产者写入的数据放到队列中,消费者从队列中拿数据。
实现生产消费者模型:
我们将生产消费者模型进行封装,封装成一个类,向外提供接口让用户进行使用。
类的成员:
- 我们为了保证对队列的访问是原子的,所以要使用互斥锁。
- 并且要保证生产者和消费者是同步的,因此需要使用条件变量,并且生产者和消费者不能在同一个条件变量下进行阻塞,所以要使用两个条件变量;
- 最后还需要设置中间的缓冲区,因此队列是必须使用的;
- 可以再设置一个队列的最大值,防止生产者一直生产,而消费者不使用;
- 生产者在生产数据后要通知消费者使用数据,相反也一样;一次可以设置两个标准,分别是
highlevel
表示当队列中数据达到限制时就通知消费者使用,lowlevel
表示当队列中数据低于限制时就通知生产者生产。
// 阻塞队列
template<class T>
class BlockQueue
{
public:
private:std::queue<T> q;pthread_mutex_t mutex; // 互斥锁pthread_cond_t p_cond; // 生产者的条件变量pthread_cond_t c_cond; // 消费者的条件变量int capacity_;int highlevel_;int lowlevel_;
};
编写初始化和析构:
- 初始化和析构只需要对锁和条件变量进行初始化和销毁即可。
BlockQueue(const int capacity = defaultcapacity , const int highlevel = defaulthighlevel , const int lowlevel = defaultlowlevel){pthrea_mutex_init(&mutex, nullptr);pthread_cond_init(&p_cond , nullptr);pthread_cond_init(&c_cond , nullptr);}~BlockQueue(){pthread_mutex_destory(&mutex);pthread_cond_destory(&p_cond);pthread_cond_destory(&c_cond);}
向缓冲区中添加元素:
- 队列属于临界资源,因此访问之前要先加锁;
- 加锁后判断临界资源是否就绪,对于生产者来说只要队列没满就可以继续加入;对于消费者来说只要队列不空,就可以拿数据;
- 访问完临界资源,判断队列中的数据是否小于/超出临界值,如果满足条件就唤醒对方;
- 最后将锁归还。
// 向队列中添加元素void push(const T& data){// 要向队列中添加了元素,先加锁pthread_mutex_lock(&mutex);while(q.size() == capacity_) // 如果队列已经满了,不能再继续先队列中加入;使用循环是为了防止消费者被误唤醒pthread_cond_wait(&p_cond , &mutex);// 添加数据q.push(data);// 通知消费者使用数据if(q.size() >= highlevel_) pthread_cond_broadcast(&c_cond);pthread_mutex_unlock(&mutex); // 还锁}
将缓冲区从元素拿出,以上面原来相同:
// 从队列中取出元素T pop(){// 先加锁pthread_mutex_lock(&mutex);while(q.size() < 1) // 队列为空,等待pthread_cond_wait(&c_cond, &mutex);// 取出元素T ret = q.front();q.pop();if(q.size() <= lowlevel_)pthread_cond_broadcast(&p_cond);pthread_mutex_unlock(&mutex);return ret;}
一行就是基于阻塞队列的生产消费者模型。
生产者的数据从哪里来?
一般生产者的数据都来源于网络,从网络中获取数据,将数据放到缓冲区中;消费者将缓冲区中的数据拿出来进行处理。
生产消费者模型好在那???
虽然先队列中添加元素,和从队列中拿出元素是互斥的。
但是当消费者在放入元素的时候,生产者可以对之前拿到的元素进行处理,同理,当生产者从缓冲区中拿数据的时候,消费者可以在从网络中获取数据;
这样就保证了,数据的获取和使用是并行的。
在上面代码中,我们使用了
while(q.size() < 1)
,来防止线程被伪唤醒,这个什么理解???
当有多个生产者和消费者的时候,我们为了更快的唤醒线程,使用pthread_cond_broadcast()
来将所有在条件变量中的线程全部唤醒,但是这也就有可能导致,前面一部分的线程可以正常拿到数据,将数据都用完了,此时后面的线程就不应该在访问临界资源了,而是应该继续进入条件变量中进行等待。
所以,此时使用while
循环来判断后面被唤醒的进程在使用临界资源的时候,资源是否满足条件。
三. 信号量
共享资源的使用是互斥的,但是可以通过信号量来预定共享资源,简单说就是:可使用的资源有多份,但是每一份都只能一个个的使用,通过信号量先将使用的共享资源的权利给要访问的线程,再让有权利的线程依次进行访问。
信号量可以理解为对共享资源的预定机制,想要访问共享资源,想要拿到信号量,有信号量后续才能进行访问。
信号量作为一把计数器,用来描述临界资源的多少,把资源是否就绪放在临界区外进行判断,而不需要在内部进行判断和使用条件变量了。
信号量的接口:
初始化信号量:int sem_init(sem_t *sem , int pshared , unsigned int value)
:
- 参数一:是要进行初始化的信号量;
- 参数二:选项,0表示在线程间共享信号量,非0表示在进程间共享;
- 参数三:信号量的初始值,即临界资源的初始大小。
申请信号量:int sem_wait(sem_t *sem)
;
归还信号量:int sem_post(sem_t *sem)
;
销毁信号量:int sem_destroy(sem_t *sem)
。
信号量也被所有线程共享,也数据临界资源,但是信号量的申请和释放是线程安全的。
四. 基于环形队列的生产消费者模型
基于环形队列的生产消费者模型,与上一个阻塞队列的不同指之处在于:
环形队列使用的是一个定长的数据,数组的长度是固定的。生产者在前面放数据,消费者在后面使用数据。
该模型需要使用到信号量,来记录生产者和消费者可以使用的空间多少。
- 生产者只需要关注队列中是否还有数据,如果有就可以访问;
- 消费者只需要关注队列是否还用空位置,如果有就可以放数据。
上面两种判断临界资源是否就绪使用信号量来进行判断,如果有信号量就表示有资源供使用。
下面可以开始写基于环形队列的生产消费者模型:
类的成员:
- 依旧需要使用锁,因为多个不能让多个生产者同时放数据,也不能让多个消费者同时拿数据,但是允许生产者一边在前面放数据,消费者一边在后面拿数据,因此需要两把锁。
- 需要两个信号量,来表示生产者能用多少资源,消费者能用多少资源;
- 需要使用一个数组来模拟循环队列;
- 需要保存数组的大小,来保证循环队列访问不越界;
- 还需要记录生产者和消费者分别使用到数组的哪一个位置了。
template<class T>
class RingQueue
{
private:std::vector<T> ringqueue_;sem_t cdata_sem_;sem_t pdata_sem_;pthread_mutex_t pmutex_;pthread_mutex_t cmutex_;int capacity_;int cstep_ = 0;int pstep_ = 0;
};
初始化和销毁,只需要负责对信号量,锁的初始化和销毁即可:
RingQueue(const int& capacity):ringqueue_(capacity) , capacity_(capacity){pthread_mutex_init(&pmutex_ , nullptr);pthread_mutex_init(&cmutex_ , nullptr);sem_init(&cdata_sem_ , 0 , 0);sem_init(&pdata_sem_ , 0 , capacity_);}~RingQueue(){pthread_mutex_destroy(&pmutex_);pthread_mutex_destroy(&cmutex_);sem_destroy(&pdata_sem_);sem_destroy(&cdata_sem_);}
生产者放入数据:
- 先申请信号量
- 申请到信号让才能访问临界资源,并且生产者不能同时放入数据
- 放入数据
- 解锁,将消费者的信号量增加
void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}void Lock(pthread_mutex_t *mutex){pthread_mutex_lock(mutex);}void Unlock(pthread_mutex_t *mutex){pthread_mutex_unlock(mutex);}void push(const T &data){// 1. 先申请信号量// 2. 申请到信号让才能访问临界资源,并且生产者不能同时放入数据// 3. 放入数据// 4. 解锁,将消费者的信号量增加P(&pdata_sem_);Lock(&pmutex_);ringqueue_[pstep_++] = data;pstep_ %= capacity_;Unlock(&pmutex);V(&cdata_sem_);}
消费者拿出数据:
- 申请信号量
- 访问临界资源,保证生产者不能同时拿数据
- 拿数据
- 解锁,将生产者的信号量增加
T pop(){// 1. 申请信号量// 2. 访问临界资源,保证生产者不能同时拿数据// 3. 拿数据// 4. 解锁,将生产者的信号量增加P(&cdata_sem_);Lock(&cmutex_);T ret = ringqueue_[cstep_++];cstep_ %= capacity_;Unlock(&cmutex_);V(&pdata_sem_);return ret;}
以上就是基于环形队列的生产消费者模型的全部实现。