当前位置: 首页 > news >正文

Linux:基于阻塞队列的生产者消费模型

文章目录

  • 一、生产者消费者模型的基本原则
    • 💕💕生产者-消费者模型的 321 原则💕💕
  • 二、为何要使用生产者消费者模型
    • 1. 解耦
    • 2. 支持并发 (提高效率)
    • 3. 忙闲不均的支持
  • 三、基于 BlockingQueue 的生产者消费者模型
    • 1. 阻塞队列的特点
    • 2. C++ 模拟实现
    • 3. 封装更精细的版本
  • 五、总结


一、生产者消费者模型的基本原则

在多线程编程中,生产者消费者模型是一种常见的并发设计模式。它通过一个**缓冲区(阻塞队列)**来解耦生产者和消费者,从而解决两者之间的强耦合问题。生产者只需要负责将数据放入队列,而消费者只需要负责从队列中取数据。

这样设计的好处在于,生产者在完成数据生成后无需等待消费者处理,可以立即返回继续生成;而消费者也无需关心数据来自哪里,只需要从队列里取出任务并处理即可。这种方式既保证了系统的高效性,也增强了并发性。

可以把 321 原则用层次化结构来表达,比表格更直观:


💕💕生产者-消费者模型的 321 原则💕💕

3 个关系

  1. 生产者之间互斥:多个生产者不能同时写入队列,否则会破坏数据一致性。
  2. 消费者之间互斥:多个消费者不能同时读取同一数据,否则会出现重复消费。
  3. 生产者与消费者之间互斥与同步:队列为空时消费者等待,队列满时生产者等待,二者既互斥又必须协同。

2 个角色

  1. 生产者:负责不断产生数据并放入队列。
  2. 消费者:负责从队列中取出数据并进行处理。

1 个交易场所

阻塞队列 / 环形队列:作为共享缓冲区,承载生产与消费的衔接。


二、为何要使用生产者消费者模型

1. 解耦

生产者和消费者之间没有直接依赖关系,它们通过阻塞队列完成数据交互,降低了耦合度。


2. 支持并发 (提高效率)

生产者和消费者可以并发执行,充分利用 CPU 资源。这里提高效率并不是说生产者消费模型可以更快的派发任务,而是通过一个串行的交易场所,可以将任务派发给不同的线程,也可以由不同的线程同时生产,在生产者和消费者自己之间可以做到并发执行,因此提高了效率,毕竟将来派发任务只占有一点点的份额,执行任务才是大头。


3. 忙闲不均的支持

如果生产者生成速度快于消费者处理速度,多余的数据会存放在阻塞队列中;反之,消费者可以在数据不足时自动等待。这样有效平衡了生产和消费之间的速度差异。


三、基于 BlockingQueue 的生产者消费者模型

1. 阻塞队列的特点

在普通队列中,如果取数据时队列为空会直接返回错误,而阻塞队列则会让取数据的线程阻塞等待直到有数据为止;同样地,如果存放数据时队列已满,线程也会被阻塞,直到有空余位置。这种机制天生适合生产者消费者模型。

在这里插入图片描述

在这里插入图片描述


2. C++ 模拟实现

BlockQueue.hpp

// 阻塞队列的实现
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>const int defaultcap = 5; // for testtemplate <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap) : _cap(cap), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_full_cond, NULL);pthread_cond_init(&_empty_cond, NULL);}void Equeue(const T &in){pthread_mutex_lock(&_mutex);while (IsFull()){_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 走到这里一定有空间了_q.push(in);if (_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex);}T Pop(){// 消费者调用pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if (_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q;int _cap;pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};

3. 封装更精细的版本

Cond.hpp

// 阻塞队列的实现
#pragma once#include <iostream>
#include <string>
#include <queue>
#include "Mutex.hpp"
#include "Cond.hpp"const int defaultcap = 10; // for testusing namespace MutexModule;
using namespace CondModule;template <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0){}void Equeue(const T &in){{LockGuard lockguard(_mutex);// 生产者调用while (IsFull()){// 应该让生产者线程进行等待// 重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁!!// 重点2:当线程被唤醒的时候,默认就在临界区内唤醒!要从pthread_cond_wait// 成功返回,需要当前线程,重新申请_mutex锁!!!// 重点3:如果我被唤醒,但是申请锁失败了??我就会在锁上阻塞等待!!!_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;// 问题1: pthread_cond_wait是函数吗?有没有可能失败?pthread_cond_wait立即返回了// 问题2:pthread_cond_wait可能会因为,条件其实不满足,pthread_cond_wait 伪唤醒_full_cond.Wait(_mutex);_psleep_num--;}// 100%确定:队列有空间_q.push(in);// 临时方案// v2if (_csleep_num > 0){_empty_cond.Signal();std::cout << "唤醒消费者..." << std::endl;}}}T Pop(){T data;{// 消费者调用LockGuard lockguard(_mutex);while (IsEmpty()){_csleep_num++;_empty_cond.Wait(_mutex);_csleep_num--;}data = _q.front();_q.pop();if (_psleep_num > 0){_full_cond.Signal();std::cout << "唤醒生产者" << std::endl;}}return data;}~BlockQueue(){}private:std::queue<T> _q; // 临界资源!!!int _cap;         // 容量大小Mutex _mutex;Cond _full_cond;Cond _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};

五、总结

生产者消费者模型是一种经典的多线程设计模式。它利用阻塞队列实现生产与消费的解耦,不仅提高了系统的并发能力,还能平衡两者之间的处理速度差异。在 C++ 中,我们可以通过 queue + pthread + 条件变量 实现一个简易的阻塞队列,再结合任务封装,实现灵活的生产者消费者模型。

未来在实际工程中,如果使用 C++11 及以上版本,可以考虑用 std::mutexstd::condition_variable 替代 pthread,写法会更简洁。



文章转载自:

http://vlgq5mhF.hxwrs.cn
http://UObpzQgq.hxwrs.cn
http://B6SjLieC.hxwrs.cn
http://HzLvypYf.hxwrs.cn
http://8H3DkT4v.hxwrs.cn
http://Pb53VzUV.hxwrs.cn
http://4sDbMcKx.hxwrs.cn
http://si58jcjJ.hxwrs.cn
http://v2o3bqTx.hxwrs.cn
http://34m5Yojb.hxwrs.cn
http://IlovLAIB.hxwrs.cn
http://0x356VAH.hxwrs.cn
http://KnwMO6g3.hxwrs.cn
http://iuyJdDYw.hxwrs.cn
http://RyR11Io3.hxwrs.cn
http://PO9RzqZG.hxwrs.cn
http://lE8I1FnZ.hxwrs.cn
http://WQuBkswl.hxwrs.cn
http://V8ulvCTJ.hxwrs.cn
http://3vUOwlk2.hxwrs.cn
http://hII5SfFU.hxwrs.cn
http://7a8unK7u.hxwrs.cn
http://sCWpwR3X.hxwrs.cn
http://kRtCyHfX.hxwrs.cn
http://4nRegIqp.hxwrs.cn
http://h3xB2n9k.hxwrs.cn
http://86ShS03b.hxwrs.cn
http://zs3T3AbQ.hxwrs.cn
http://ZEo6cBgq.hxwrs.cn
http://eCiCk2G7.hxwrs.cn
http://www.dtcms.com/a/386323.html

相关文章:

  • springboot+vue (ruoyi-vue前后端分离)集成钉钉登录
  • 从单一辅助到深度协作!GPT-5-Codex 改写软件开发工作流
  • JavaScript——document对象
  • 图观 流渲染场景编辑器
  • 探索大语言模型(LLM):Windows系统与Linux系统下的Ollama高级配置(修改模型地址、Service服务以及多卡均衡调用)
  • PowerBI实战-制作带有同比及趋势线的双柱状图
  • Spring 介绍
  • 使用爱思助手(版本8.16)将ipa安装到ios
  • 大模型应用开发2-SpringAI实战
  • 【面板数据】上市公司校企合作论文发表数据集(2002-2025年)
  • MySQL的底层数据结构:B+树
  • 【Linux】LRU缓存(C++模拟实现)
  • 冲击成本敏感度曲线驱动的拆单频率参数动态调优机制
  • Typera+Gitee+PicGo 配置markdown专用图床
  • 正则化:机器学习泛化能力的守护神
  • GCKontrol对嵌入式设备FPGA设计流程的高效优化
  • vue2+vue3-自定义指令
  • Vue基础知识点(接上篇案例)
  • 动物排队+分手厨房?合作模拟《Pao Pao》登录steam
  • 易境通货代系统:如何实现全流程自动化报关管理?
  • OpenCV:答题卡识别
  • leetcode HOT100 个人理解及解析
  • 深入落地“人工智能+”,如何构建安全、高效的算力基础设施?
  • 无人出租车(Robotaxi)还有哪些技术瓶颈?
  • 安全开发生命周期管理
  • 用住宿楼模型彻底理解Kubernetes架构(运行原理视角)
  • 【大模型】minimind2 1: ubuntu24.04安装部署 web demo
  • 扩散模型之(八)Rectified Flow
  • Facebook主页变现功能被封?跨境玩家该如何申诉和预防
  • 《Java接入支付宝沙箱支付全流程详解》