当前位置: 首页 > news >正文

【Linux手册】生产消费者模型的多模式实践:阻塞队列、信号量与环形队列的并发设计


请添加图片描述


半桔:个人主页

 🔥 个人专栏: 《Linux手册》《手撕面试算法》《C++从入门到入土》

🔖简单是可靠的前提条件。

文章目录

  • 前言
  • 一. 计算机中的生产消费者模型
  • 二. 基于阻塞队列的生产消费者模型
  • 三. 信号量
  • 四. 基于环形队列的生产消费者模型

前言

在多线程并发编程的领域中,线程间的高效协作与资源协调是构建稳定、高效系统的核心难题。当多个线程共享资源并协同完成任务时,如何避免访问冲突、解决线程执行速度差异引发的冲突,以及实现安全的信息交互,成为开发者必须攻克的关键问题。

生产消费者模型是我们解决多线程同步互斥问题的经典模型,为上述挑战提供了合适的解决方案。该模型通过引入 “生产者”“消费者” 两种角色与共享缓冲区,将线程任务拆解为数据生成、存储与消费的闭环流程:生产者线程负责生成数据并放入缓冲区,消费者线程从缓冲区取出数据进行处理,而同步互斥机制则确保了缓冲区操作的原子性与顺序性,有效避免了数据不一致,临界资源访问冲突等问题。

深入剖析生产消费者模型的设计思想对我们进行多线程编程是百利而无一害的。

本文将从3方面解析生产消费者模型:

  1. 计算机中的生产消费者模型;
  2. 基于阻塞队列的生产消费者模型;
  3. 基于环形队列的生产消费者模型。

一. 计算机中的生产消费者模型

如下图是生产消费者模型示意图:
![[Pasted image 20250824100132.png]]

生产消费者模型主要解决了:生产者与消费者的强耦合关系,生产者在生产数据的时候不需要等消费者使用,而是直接将数据放到缓冲区中;同样消费者要使用数据也不会直接找生产者要,而是直接从缓冲区中进行读取。通过中间的缓冲区将生产者和消费者进行解耦

通过中间的缓冲区,生产消费者模型有了诸多优势:

  1. 将生产者与消费者解耦;
  2. 允许忙闲不均,生产者可以生产大量数据,而消费者只使用一点;
  3. 支持并发,此处的并发指的是:允许多个生产者一起生产数据,多个消费者一起消费数据;但是不论是向缓冲区写,还是向缓冲区中读,都要保证是线程安全的

各个任务自己之间的关系:

  1. 生产者与生产者:互斥,保证向缓冲区中写入是线程安全的;
  2. 消费者与消费者:互斥,保证从缓冲区中读入是线程安全的;
  3. 生产者与消费者:互斥 + 同步,保证生产者和消费者访问缓冲区是顺序的。

二. 基于阻塞队列的生产消费者模型

阻塞队列:实现生产消费者模型时最常用的一种数据结构。
将队列作为中间的缓冲区,生产者写入的数据放到队列中,消费者从队列中拿数据。

实现生产消费者模型:

我们将生产消费者模型进行封装,封装成一个类,向外提供接口让用户进行使用。

类的成员:

  • 我们为了保证对队列的访问是原子的,所以要使用互斥锁
  • 并且要保证生产者和消费者是同步的,因此需要使用条件变量,并且生产者和消费者不能在同一个条件变量下进行阻塞,所以要使用两个条件变量;
  • 最后还需要设置中间的缓冲区,因此队列是必须使用的;
  • 可以再设置一个队列的最大值,防止生产者一直生产,而消费者不使用;
  • 生产者在生产数据后要通知消费者使用数据,相反也一样;一次可以设置两个标准,分别是highlevel表示当队列中数据达到限制时就通知消费者使用,lowlevel表示当队列中数据低于限制时就通知生产者生产。
// 阻塞队列
template<class T>
class BlockQueue
{
public:
private:std::queue<T> q;pthread_mutex_t mutex;    // 互斥锁pthread_cond_t p_cond;    // 生产者的条件变量pthread_cond_t c_cond;    // 消费者的条件变量int capacity_;int highlevel_;int lowlevel_;
};

编写初始化和析构:

  • 初始化和析构只需要对锁和条件变量进行初始化和销毁即可。
    BlockQueue(const int capacity = defaultcapacity , const int highlevel = defaulthighlevel , const int lowlevel = defaultlowlevel){pthrea_mutex_init(&mutex, nullptr);pthread_cond_init(&p_cond , nullptr);pthread_cond_init(&c_cond , nullptr);}~BlockQueue(){pthread_mutex_destory(&mutex);pthread_cond_destory(&p_cond);pthread_cond_destory(&c_cond);}

向缓冲区中添加元素:

  1. 队列属于临界资源,因此访问之前要先加锁;
  2. 加锁后判断临界资源是否就绪,对于生产者来说只要队列没满就可以继续加入;对于消费者来说只要队列不空,就可以拿数据;
  3. 访问完临界资源,判断队列中的数据是否小于/超出临界值,如果满足条件就唤醒对方;
  4. 最后将锁归还。
// 向队列中添加元素void push(const T& data){// 要向队列中添加了元素,先加锁pthread_mutex_lock(&mutex);while(q.size() == capacity_)       // 如果队列已经满了,不能再继续先队列中加入;使用循环是为了防止消费者被误唤醒pthread_cond_wait(&p_cond , &mutex);// 添加数据q.push(data);// 通知消费者使用数据if(q.size() >= highlevel_) pthread_cond_broadcast(&c_cond);pthread_mutex_unlock(&mutex);       // 还锁}

将缓冲区从元素拿出,以上面原来相同:

    // 从队列中取出元素T pop(){// 先加锁pthread_mutex_lock(&mutex);while(q.size() < 1)                   // 队列为空,等待pthread_cond_wait(&c_cond, &mutex);// 取出元素T ret = q.front();q.pop();if(q.size() <= lowlevel_)pthread_cond_broadcast(&p_cond);pthread_mutex_unlock(&mutex);return ret;}

一行就是基于阻塞队列的生产消费者模型。

生产者的数据从哪里来?

一般生产者的数据都来源于网络,从网络中获取数据,将数据放到缓冲区中;消费者将缓冲区中的数据拿出来进行处理。

生产消费者模型好在那???

虽然先队列中添加元素,和从队列中拿出元素是互斥的。
但是当消费者在放入元素的时候,生产者可以对之前拿到的元素进行处理,同理,当生产者从缓冲区中拿数据的时候,消费者可以在从网络中获取数据

这样就保证了,数据的获取和使用是并行的。

在上面代码中,我们使用了while(q.size() < 1) ,来防止线程被伪唤醒,这个什么理解???

当有多个生产者和消费者的时候,我们为了更快的唤醒线程,使用pthread_cond_broadcast()来将所有在条件变量中的线程全部唤醒,但是这也就有可能导致,前面一部分的线程可以正常拿到数据,将数据都用完了,此时后面的线程就不应该在访问临界资源了,而是应该继续进入条件变量中进行等待。

所以,此时使用while循环来判断后面被唤醒的进程在使用临界资源的时候,资源是否满足条件。

三. 信号量

共享资源的使用是互斥的,但是可以通过信号量来预定共享资源,简单说就是:可使用的资源有多份,但是每一份都只能一个个的使用,通过信号量先将使用的共享资源的权利给要访问的线程,再让有权利的线程依次进行访问

信号量可以理解为对共享资源的预定机制,想要访问共享资源,想要拿到信号量,有信号量后续才能进行访问。

信号量作为一把计数器,用来描述临界资源的多少,把资源是否就绪放在临界区外进行判断,而不需要在内部进行判断和使用条件变量了

信号量的接口:

初始化信号量:int sem_init(sem_t *sem , int pshared , unsigned int value)

  1. 参数一:是要进行初始化的信号量;
  2. 参数二:选项,0表示在线程间共享信号量,非0表示在进程间共享;
  3. 参数三:信号量的初始值,即临界资源的初始大小。

申请信号量:int sem_wait(sem_t *sem)
归还信号量:int sem_post(sem_t *sem)

销毁信号量:int sem_destroy(sem_t *sem)

信号量也被所有线程共享,也数据临界资源,但是信号量的申请和释放是线程安全的。

四. 基于环形队列的生产消费者模型

基于环形队列的生产消费者模型,与上一个阻塞队列的不同指之处在于:

环形队列使用的是一个定长的数据,数组的长度是固定的。生产者在前面放数据,消费者在后面使用数据。

该模型需要使用到信号量,来记录生产者和消费者可以使用的空间多少。

  • 生产者只需要关注队列中是否还有数据,如果有就可以访问;
  • 消费者只需要关注队列是否还用空位置,如果有就可以放数据。

上面两种判断临界资源是否就绪使用信号量来进行判断,如果有信号量就表示有资源供使用。

下面可以开始写基于环形队列的生产消费者模型:

类的成员:

  • 依旧需要使用锁,因为多个不能让多个生产者同时放数据,也不能让多个消费者同时拿数据,但是允许生产者一边在前面放数据,消费者一边在后面拿数据,因此需要两把锁。
  • 需要两个信号量,来表示生产者能用多少资源,消费者能用多少资源;
  • 需要使用一个数组来模拟循环队列;
  • 需要保存数组的大小,来保证循环队列访问不越界;
  • 还需要记录生产者和消费者分别使用到数组的哪一个位置了。
template<class T>
class RingQueue
{
private:std::vector<T> ringqueue_;sem_t cdata_sem_;sem_t pdata_sem_;pthread_mutex_t pmutex_;pthread_mutex_t cmutex_;int capacity_;int cstep_ = 0;int pstep_ = 0;
};

初始化和销毁,只需要负责对信号量,锁的初始化和销毁即可:

    RingQueue(const int& capacity):ringqueue_(capacity) , capacity_(capacity){pthread_mutex_init(&pmutex_ , nullptr);pthread_mutex_init(&cmutex_ , nullptr);sem_init(&cdata_sem_ , 0 , 0);sem_init(&pdata_sem_ , 0 , capacity_);}~RingQueue(){pthread_mutex_destroy(&pmutex_);pthread_mutex_destroy(&cmutex_);sem_destroy(&pdata_sem_);sem_destroy(&cdata_sem_);}

生产者放入数据:

  1. 先申请信号量
  2. 申请到信号让才能访问临界资源,并且生产者不能同时放入数据
  3. 放入数据
  4. 解锁,将消费者的信号量增加
    void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}void Lock(pthread_mutex_t *mutex){pthread_mutex_lock(mutex);}void Unlock(pthread_mutex_t *mutex){pthread_mutex_unlock(mutex);}void push(const T &data){// 1. 先申请信号量// 2. 申请到信号让才能访问临界资源,并且生产者不能同时放入数据// 3. 放入数据// 4. 解锁,将消费者的信号量增加P(&pdata_sem_);Lock(&pmutex_);ringqueue_[pstep_++] = data;pstep_ %= capacity_;Unlock(&pmutex);V(&cdata_sem_);}

消费者拿出数据:

  1. 申请信号量
  2. 访问临界资源,保证生产者不能同时拿数据
  3. 拿数据
  4. 解锁,将生产者的信号量增加
    T pop(){// 1. 申请信号量// 2. 访问临界资源,保证生产者不能同时拿数据// 3. 拿数据// 4. 解锁,将生产者的信号量增加P(&cdata_sem_);Lock(&cmutex_);T ret = ringqueue_[cstep_++];cstep_ %= capacity_;Unlock(&cmutex_);V(&pdata_sem_);return ret;}

以上就是基于环形队列的生产消费者模型的全部实现。

http://www.dtcms.com/a/392110.html

相关文章:

  • Python + Flask + API Gateway + Lambda + EKS 实战
  • 【OpenGL】openGL常见矩阵
  • DeepSeek大模型混合专家模型,DeepSeekMoE 重构 MoE 训练逻辑
  • 450. 删除二叉搜索树中的节点
  • 实用工具:基于Python的图片定位导出小程序
  • 滚珠螺杆在工业机器人关节与线性模组的智能控制
  • 【AI】coze的简单入门构建智能体
  • Python数据分析:函数定义时的装饰器,好甜的语法糖。
  • Java数据结构——包装类和泛型
  • 【C++进阶】C++11的新特性 | 列表初始化 | 可变模板参数 | 新的类功能
  • 广东省省考备考(第一百零三天9.20)——言语(强化训练)
  • 面试编程题(四)
  • OpenHarmony之充电振动定制
  • 前端单元测试入门:使用 Vitest + Vue 测试组件逻辑与交互
  • 泛英国生物样本库全基因组关联分析
  • 【LeetCode 每日一题】2785. 将字符串中的元音字母排序
  • 游戏开发中的友好提示,错误信息,异常描述等数据管理的必要性
  • 总线协议(Bus Protocol)如何支持总线错误条件?
  • simuilink 中的引用模型(reference model)的作用? 它和子系统的区别? 如何生成引用模型?
  • HTML+JS实现table表格和鼠标移入移出效果
  • windows11用Qt6自带的mingw编译OSGEarth(自用记录)
  • 仓颉编程语言青少年基础教程:泛型(Generic)和区间(Range)类型
  • 原码反码补码------相关理解
  • 【Python】字典
  • 玩转deepseek之海报生成器
  • C++强制类型转换和I/O流深度解析
  • Transformer 和 MoE
  • Python基础 7》数据类型_元组(Tuple)
  • AI大模型入门第四篇:借助RAG实现精准用例自动生成!
  • leetcode 198 打家劫舍问题,两个dp数组->一个dp数组