当前位置: 首页 > news >正文

Linux的生产者消费者模型

基于前文Linux的多线程-CSDN博客。

目录

1、生产者消费者模型

2、基于阻塞队列的生产者消费者模型

2.1 思路

2.2 代码

2.2.1 Main.cc

2.2.2 CondBlockingQueue.hpp

2.2.3 Cond.hpp

2.2.4 Mutex.hpp

3、基于环形阻塞队列的生产者消费者模型

3.1 思路

3.2 代码

3.2.1 Main.cc

3.2.2 RingQueue.hpp

3.2.3 Sem.hpp

3.2.4 Mutex.hpp

4、总结一下

5、一道有意思的面试题

5.1 思路

5.2 代码

5.2.1 信号量

5.2.2 锁+条件变量


1、生产者消费者模型

  • 1个交易场所(以特定结构构成的一种“内存”空间)。
  • 2种角色(生产者消费者,由线程承担)。
  • 3种关系(生产者之间,互斥,消费者之间,互斥,生产者与消费者之间,互斥+同步)。
  • 生产者与消费者模型的优点:
  1. 生产过程与消费过程解耦。
  2. 支持忙闲不均。
  3. 提高效率。不是体现在交易场所的出/入,而是体现在生产任务和处理任务是并发的(交易场所非空非满时)。

2、基于阻塞队列的生产者消费者模型

  • 阻塞队列,是一种常用于实现生产者和消费者模型的数据结构,当队列为空时,向队列里获取元素的操作会阻塞,当队列为满时,向队列里存放元素的操作会阻塞

画外音:管道的本质也是阻塞队列,为空时,读端阻塞,为满时,写端阻塞。

2.1 思路

  • 生产者与消费者之间采用锁+条件变量,实现互斥+同步
  • 生产者之间消费者之间,采用,实现互斥
  • 一个锁互斥用于生产者与消费者竞争(互斥),生产者之间竞争(互斥),消费者之间竞争(互斥)。
  • 两个条件变量同步分别控制生产者的阻塞和唤醒消费者的阻塞和唤醒
  • 两个sleep_num,分别用于记录因空阻塞的消费者和因满阻塞的生产者。
  • 要点:
  1. 阻塞队列的什么时候唤醒,是满或空,还是生成一个唤醒或消费一个唤醒?是生成一个唤醒或消费一个唤醒,为了更好的并发(生产者在生产,同时消费者在消费)。
  2. 唤醒一个还是全部?通常是唤醒一个(signal()),以避免惊群效应,提高效率
  3. 为什么while阻塞?因为当线程被唤醒时,线程切换,其他线程可能已经将状态改变,或者被误唤醒需重新判断条件

2.2 代码

2.2.1 Main.cc
#include <iostream>
#include <unistd.h>
#include <functional>
#include "BlockingQueue.hpp"void DownLoad()
{std::cout << "下载一个任务" << std::endl;sleep(3); // 假设任务的耗时。
}using task_t = std::function<void()>;void* Producer(void* arg)
{BlockingQueue<task_t>* bq = static_cast<BlockingQueue<task_t>*>(arg);while(true){bq->Push(DownLoad);std::cout << "生产了一个任务" << std::endl;sleep(3);}
}void* Consumer(void* arg)
{BlockingQueue<task_t>* bq = static_cast<BlockingQueue<task_t>*>(arg);while(true){task_t task = bq->Pop();task();sleep(3);}
}int main()
{BlockingQueue<task_t> bq;pthread_t p[2],c[2];pthread_create(p,nullptr,Producer,&bq);pthread_create(p+1,nullptr,Producer,&bq);pthread_create(c,nullptr,Consumer,&bq);pthread_create(c+1,nullptr,Consumer,&bq);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(c[0],nullptr);pthread_join(c[2],nullptr);return 0;
}
2.2.2 CondBlockingQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include "Cond.hpp"
#include "Mutex.hpp"using namespace CondModule;
using namespace MutexModule;const int default_cap = 5;template <typename T>
class BlockingQueue
{bool isEmpty(){return bq.empty();}bool isFull(){return bq.size() >= _cap;}
public:BlockingQueue(int cap = default_cap): _cap(cap),_p_sleep_num(0),_c_sleep_num(0){}void Push(const T& in){// 生产者LockGuard lockguard(_mutex);while(isFull()){++_p_sleep_num;std::cout << "生产者因为满而阻塞: " << _p_sleep_num << std::endl;_full_cond.Wait(_mutex); // 生产者因为满而阻塞--_p_sleep_num;}bq.push(in);if(_c_sleep_num > 0){_empty_cond.Signal();// 唤醒一个因为空而阻塞的消费者std::cout << "唤醒一个因为空而阻塞的消费者" << std::endl;}}T Pop(){// 消费者LockGuard lockguard(_mutex);while(isEmpty()){++_c_sleep_num;std::cout << "消费者者因为空而阻塞: " << _c_sleep_num << std::endl;_empty_cond.Wait(_mutex); // 消费者者因为空而阻塞--_c_sleep_num;}T data = bq.front();bq.pop();if(_p_sleep_num > 0){_full_cond.Signal(); // 唤醒一个因为满而阻塞的生产者std::cout << "唤醒一个因为满而阻塞的生产者" << std::endl;}return data;}private:std::queue<T> bq;int _cap;Mutex _mutex;Cond _empty_cond;Cond _full_cond;int _p_sleep_num;int _c_sleep_num;
};
2.2.3 Cond.hpp
#pragma once#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{class Cond{public:// RAII,资源的初始化与释放与对象的生命周期绑定Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(MutexModule::Mutex& mutex){pthread_cond_wait(&_cond,mutex.Get());}void Signal(){pthread_cond_signal(&_cond);}void Broadcast(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}
2.2.4 Mutex.hpp
#pragma once#include <pthread.h>namespace MutexModule
{class Mutex{// RAII,资源的初始化与释放与对象的生命周期绑定public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}

3、基于环形阻塞队列的生产者消费者模型

  • 环形队列,使用取模进行循环。和阻塞队列一样,空了,消费者阻塞,满了,生产者阻塞。

3.1 思路

  • 生产者与消费者之间采用信号量,实现互斥+同步
  • 生产者之间采用,实现互斥消费者之间采用,实现互斥
  • 两个信号量互斥,信号量的原子操作保证了计数的一致性;生产者和消费者不能操作同一个位置(满了或空了,处于同一个位置,不能同时操作)。同步,记录可用空位数量,控制生产者的阻塞和生产;记录已有数据数量,控制消费者的阻塞和消费
  • 两个锁互斥,分别用于生产者之间竞争(互斥),消费者之间竞争(互斥)。
  • 两个step,用于下标索引,分别用于生产者push,消费者pop。
  • 要点:
  1. 前面阻塞队列的queue是push,会自动扩容,但是环形队列使用下标索引,所以在初始化的时候,要先开辟空间。

3.2 代码

3.2.1 Main.cc
#include <iostream>
#include <unistd.h>
#include <functional>
#include "RingQueue.hpp"void DownLoad()
{std::cout << "下载一个任务" << std::endl;sleep(3); // 假设任务的耗时。
}using task_t = std::function<void()>;void* Producer(void* arg)
{RingQueue<task_t>* rq = static_cast<RingQueue<task_t>*>(arg);while(true){rq->Push(DownLoad);std::cout << "生产了一个任务" << std::endl;sleep(3);}
}void* Consumer(void* arg)
{RingQueue<task_t>* rq = static_cast<RingQueue<task_t>*>(arg);while(true){task_t task = rq->Pop();task();sleep(3);}
}int main()
{RingQueue<task_t> rq;pthread_t p[2],c[2];pthread_create(p,nullptr,Producer,&rq);pthread_create(p+1,nullptr,Producer,&rq);pthread_create(c,nullptr,Consumer,&rq);pthread_create(c+1,nullptr,Consumer,&rq);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);return 0;
}
3.2.2 RingQueue.hpp
#pragma once#include <vector>
#include "Mutex.hpp"
#include "Sem.hpp"using namespace MutexModule;
using namespace SemModule;const int default_cap = 5;template<typename T>
class RingQueue
{
public:RingQueue(int cap = default_cap):_rq(cap),_cap(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}void Push(const T& in){_blank_sem.P();LockGuard lockguard(_p_mutex);_rq[_p_step] = in;++_p_step;_p_step %= _cap;_data_sem.V();}T& Pop(){_data_sem.P();LockGuard lockguard(_c_mutex);T& data = _rq[_c_step];++_c_step;_c_step %= _cap;_blank_sem.V();return data;}
private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位数int _p_step;Mutex _p_mutex; // 生产者之间的互斥// 消费者Sem _data_sem; // 资源数int _c_step;Mutex _c_mutex; // 消费者之间的互斥
};
3.2.3 Sem.hpp
#pragma once#include <semaphore.h>namespace SemModule
{const unsigned int default_value = 1;class Sem{public:// RAII,资源的初始化与释放与对象的生命周期绑定Sem(unsigned int sem_value = default_value): _sem_value(sem_value){sem_init(&_sem, 0, _sem_value);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}~Sem(){sem_destroy(&_sem);}private:unsigned int _sem_value;sem_t _sem;};
}
3.2.4 Mutex.hpp
#pragma once#include <pthread.h>namespace MutexModule
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};// RAII,资源的初始化与释放与对象的生命周期绑定class LockGuard{public:LockGuard(Mutex &mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}

4、总结一下

特性普通阻塞队列环形阻塞队列
内存分配可能频繁分配 / 释放内存(链表实现)一次性分配固定内存,无内存碎片
扩容机制支持动态扩容容量固定,无法动态扩容
访问效率队列头尾操作 O (1),但可能有缓存不友好问题数组连续存储,缓存局部性更好,访问效率更高
空间利用率动态大小,空间利用率灵活固定空间,利用率稳定但可能有浪费
实现复杂度相对简单稍复杂(需处理环形边界条件)
  • 一种整块临界资源 -> 锁+条件变量
  • 一种多块临界资源 -> 锁+信号量。显然,在生产者消费者模型中,临界资源是一种多块临界资源,锁+信号量的方式更好实现,如:逻辑上更清晰,信号量直接表达资源数量,不需要在唤醒时重新检查复杂条件等。

5、一道有意思的面试题

题目:char arr1[5] = "1234"; char arr2[5] = "abcd";,请打印出"1a2b3c4d";

5.1 思路

  • 基于前面的练习,如果是锁+条件变量,竞争锁,顺序是不确定的,而两个信号量刚好能控制顺序,一个为arr1_sem = 1,arr1_sem .P(),arr2_sem.V();一个arr2_sem = 0,arr2_sem .P(),arr1_sem.V()。
  • 如果非要用(同一时间只能打印一个),那需要一个条件变量,控制两个线程的打印阻塞与唤醒(同时只有一个会阻塞),还要一个标志,true为arr1打印,false为arr2打印。

5.2 代码

5.2.1 信号量
#include <iostream>
#include <pthread.h>
#include "Sem.hpp"using namespace SemModule;Sem sem1(1),sem2(0);char arr1[5] = "1234"; // 包含'/0'
char arr2[5] = "abcd";void *Func1(void *arg)
{for(const auto& e : arr1){sem1.P();std::cout << e;sem2.V();}return nullptr;
}void *Func2(void *arg)
{for(const auto& e : arr2){sem2.P();std::cout << e;sem1.V();}return nullptr;
}int main()
{pthread_t tid1, tid2;pthread_create(&tid1, nullptr, Func1, nullptr);pthread_create(&tid2, nullptr, Func2, nullptr);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);std::cout << std::endl;return 0;
}
5.2.2 锁+条件变量
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"using namespace MutexModule;
using namespace CondModule;char arr1[5] = "1234"; // 包含'/0'
char arr2[5] = "abcd";Mutex mutex;
Cond cond;
bool flag = true; void *Func1(void *arg)
{for(const auto& e : arr1){LockGuard lockguard(mutex);while(!flag)cond.Wait(mutex);std::cout << e;flag = !flag;cond.Signal();}return nullptr;
}void *Func2(void *arg)
{for(const auto& e : arr2){LockGuard lockguard(mutex);while(flag)cond.Wait(mutex);std::cout << e;flag = !flag;cond.Signal();}return nullptr;
}int main()
{pthread_t tid1, tid2;pthread_create(&tid1, nullptr, Func1, nullptr);pthread_create(&tid2, nullptr, Func2, nullptr);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);std::cout << std::endl;return 0;
}


文章转载自:

http://HPf4miZu.kLpwL.cn
http://fhcCyAQv.kLpwL.cn
http://XaC9yTrU.kLpwL.cn
http://mIXrwR9H.kLpwL.cn
http://zuqVqYkM.kLpwL.cn
http://9hVrwScG.kLpwL.cn
http://LzDspYLO.kLpwL.cn
http://bR3lGNLg.kLpwL.cn
http://6ogWUkck.kLpwL.cn
http://BYnl4j27.kLpwL.cn
http://20rAvhR0.kLpwL.cn
http://Jtt92qGW.kLpwL.cn
http://RIslGaYd.kLpwL.cn
http://a55vq0JL.kLpwL.cn
http://NVs52eXs.kLpwL.cn
http://oqXObUNa.kLpwL.cn
http://GUX8K8K6.kLpwL.cn
http://SViHGbA3.kLpwL.cn
http://lO8jHEsq.kLpwL.cn
http://Omd1nhhp.kLpwL.cn
http://rRytFsIO.kLpwL.cn
http://yCHAIofE.kLpwL.cn
http://Gvgl8tzg.kLpwL.cn
http://dhE8rTxD.kLpwL.cn
http://M3j9OYVE.kLpwL.cn
http://YqJpuqAw.kLpwL.cn
http://NbiBS5UF.kLpwL.cn
http://lMWwVVDg.kLpwL.cn
http://hhp8LHgd.kLpwL.cn
http://0xnesQn2.kLpwL.cn
http://www.dtcms.com/a/384953.html

相关文章:

  • 深度学习基础、pytorch使用①
  • 国产化PDF处理控件Spire.PDF教程:在 ASP.NET Core 中创建 PDF的分步指南
  • 某村通信网络改造:从痛点到解决方案的全景分析
  • Elastic APM 入门指南:快速设置应用性能监控
  • 流式响应的demo , 前端markdown格式显示, 打字机效果展示
  • 【免费体验】旗讯 OCR手写识别:破解工厂数据处理痛点,实现从 “人工录入” 到 “AI读单” 的升级
  • 远程开机wakeonlan
  • 健康有益:车载健康化系统推动智能汽车健康管理新变革
  • JavaWeb--day6--MySQL(补漏)
  • 手机群控平台的智能管控技术深度解析
  • 什么是手持采集终端PDA?智慧移动工作的数字基石!
  • C语言中的递归问题——爬楼梯问题
  • LeetCode:2.字母异位词分组
  • 计算机视觉案例分享之实时文档扫描
  • 提升PDF处理效率,Stirling-PDF带你探索全新体验!
  • 【React】闭包陷阱
  • 4.RocketMQ集群高级特性
  • 周选择日历组件
  • Golang引用类型
  • Go的Gob编码介绍与使用指南
  • Golang语言入门篇001_Golang简介
  • Kafka消息队列进阶:发送策略与分区算法优化指南
  • 台积电生态工程深度解析:从晶圆厂到蜂巢的系统架构迁移
  • 机器学习-网络架构搜索
  • 邪修实战系列(5)
  • 突破限制:Melody远程音频管理新体验
  • 深入解析Seata:一站式分布式事务解决方案
  • static_cast:C++类型系统的“正经翻译官”
  • Python面试题及详细答案150道(126-135) -- 数据库交互篇
  • 【新书预告】《大模型应用开发》