当前位置: 首页 > news >正文

Linux学习:基于环形队列的生产者消费者模型

目录

  • 1. 环形队列的概念与实现方法
    • 1.1 环形队列的概念
    • 1.2 环形队列的一般实现方法
  • 2. 多线程相关的信号量概念与接口
    • 2.1 信号量类型
    • 2.2 信号量的初始化与销毁
    • 2.3 信号量的P申请、V释放操作
  • 3. 基于环形队列实现p、c模型的设计方案
    • 3.1 环形队列(ringqueue)作为共享资源缓冲区的设计要求
    • 3.2 环形队列作为共享资源缓冲区的实现方法
  • 4. 代码实现
    • 4.1 线程类
    • 4.2 线程执行的任务
    • 4.3 RingQueue类
    • 4.4 主干代码

1. 环形队列的概念与实现方法

1.1 环形队列的概念

  • 环形队列:
      容量固定,首尾相连的一个队列(遵循先进先出),于逻辑上形状为一个环形。当存储数据超过容量上限后,再次给环形队列插入元素时,不会产生越界的情况。新的数据会回绕到数据首部进行插入,将空间中的旧数据覆盖
    在这里插入图片描述

1.2 环形队列的一般实现方法

  最常用于实现环形队列的数据结构为顺序表,其通过对下标的取模操作,就可以很好的实现存储空间逻辑上的环形要求,具体实现方案如下:


方案1:

  1. 开辟要求容量cap大小的顺序表

  2. 以变量begin记录队列中首个数据的下标,起始设置为0

  3. 以变量end记录队列中新数据插入位置的下标,起始设置为0
    在这里插入图片描述

  4. 以变量count记录队列中存储数据的个数,每次插入数据后进行++,每次删除数据后进行--。队列为空与为满时,beginend的位置相同,因此,需要以count中记录的值来做区分

  5. 插入元素时,向end记录的下标位置写入数据,然后再++

  6. 删除元素时,只需让begin进行++即可,beginend下标之间的空间中存储着队列中的数据

  7. beginend每次++后都要对其进行取模操作%= cap

方案2:
  方案2的大体实现思路与方案1相同,只是于队列空满判定实现上有所区别。此种方法,会将队列额外多开辟出一块空间,队列容量为cap,实际大小为cap + 1。此块多出来的空间不用来存放数据,而是用来标识环形队列的满状态(end + 1) % (cap + 1) == begin
在这里插入图片描述

2. 多线程相关的信号量概念与接口

2.1 信号量类型

#include <semaphore.h>
//原生线程库中的内容,编译时带-lpthread选项
sem_t sem;//同样为计数器

2.2 信号量的初始化与销毁

信号量的初始化:

int sem_init(sem_t *sem, int pshared, unsigned int value);
  • 参数1sem_t* 传入需要初始化的信号量地址
  • 参数2int pshared 用于父子进程之间共享设置为1,用于多线程之间共享设置为0
  • 参数3unsigned int value 用于设置信号量的大小
  • 返回值int 成功时,返回0;失败时,返回-1,并将错误码写入errno

信号量的销毁:

int sem_destroy(sem_t* sem);
  • 参数sem_t* sem 传入需要销毁的信号量的地址
  • 返回值int 成功时,返回0;失败时,返回-1,并将错误码写入errno

2.3 信号量的P申请、V释放操作

信号量的P操作:

int sem_wait(sem_t* sem);
  • 参数sem_t* sem 传入需要进行P操作的信号量地址
  • 返回值int 成功时,返回0;失败时,返回-1,并将错误码写入errno

信号量的V操作:

int sem_post(sem_t* sem);
  • 参数sem_t* sem 传入需要进行V操作的信号量地址
  • 返回值int 成功时,返回0;失败时,返回-1,并将错误码写入errno

  信号量不仅仅其申请与释放操作都是互斥的,而且本身还同时具有计数的作用。因此,在功能上,其就相当于是互斥锁与条件变量的结合,但其又只需要一条操作语句就可以实现互斥锁与条件变量的配合效果。所以,在代码编写角度,信号量比互斥锁 + 条件变量更简单也更简洁。

3. 基于环形队列实现p、c模型的设计方案

3.1 环形队列(ringqueue)作为共享资源缓冲区的设计要求

在这里插入图片描述
  生产者生产数据存入队列,消费者消费数据从队列中获取,此处使用方案1中的环形队列设计方式。

  • 生产者与消费者访问共享资源时,需要保持互斥且同步
  • 环形队列中有多块空间,只有队列为满或为空时,生产者与消费者才会访问到同一块空间,即访问共享资源。因此,除开队列为空为满的情况之外,其他场景中,消费者与生产者都可以并发地访问环形队列

3.2 环形队列作为共享资源缓冲区的实现方法

在这里插入图片描述

  • 对于生产者而言,其需要向队列中插入数据,所以,队列中空余的空间(room)是其所需的资源
  • 对于消费者而言,其需要从队列中获取数据,所以,队列空间中存储的数据(data)是其所需的资源

在这里插入图片描述
  定义两个信号量room_semdata_sem,分别作为生产者资源与消费者资源的计数器,每次在生产者、消费者线程访问环形队列之前,都要预先申请对应的信号量资源。变量productor_index记录生产者生产资源的放置位置,变量consumer_index记录消费者从环形队列中获取数据的位置。生产者之间、消费者之间它们对于环形队列的访问需要保持互斥性,此处实现可以使用一把锁来实现,但这样生产者、消费者之间大多数情况下的并发就会被影响,因此,我们定义分别定义两把锁,来分别实现同类之间的互斥,彼此之间并发。


  • 环形队列作为共享资源缓冲区的优势
      队列中拥有多块空间,生产者与消费者线程大多数情况下都不会访问同一块资源,因此,可以保证它们之间的并发访问,提高程序的效率。

4. 代码实现

4.1 线程类

namespace ThreadModule
{template<typename T>using func_t = function<void(T&, string)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data, _name);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}

4.2 线程执行的任务

using task_t = function<void()>;void download()
{cout << "task is download" << endl;
}

4.3 RingQueue类

#ifndef RING_QUEUE
#define RING_QUEUE
#include <vector>
#include <semaphore.h>
#include <pthread.h>template<typename T>
class RingQueue
{
private:void P(sem_t* sem){sem_wait(sem);//wait申请}void V(sem_t* sem){sem_post(sem);//post单词译为放入,post操作为释放}void Lock(pthread_mutex_t* lock){pthread_mutex_lock(lock);}void Unlock(pthread_mutex_t* lock){pthread_mutex_unlock(lock);}public:RingQueue(int cap):_cap(cap), _ringbuffer(cap), _productor_index(0), _consumer_index(0){sem_init(&_room_sem, 0, _cap);//父子进程之间共享pshared: 1, 多线程之间共享pshared: 0sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_productor_lock, nullptr);pthread_mutex_init(&_consumer_lock, nullptr);}void Enqueue(const T& data){P(&_room_sem);Lock(&_productor_lock);_ringbuffer[_productor_index++] = data;_productor_index %= _cap;Unlock(&_productor_lock);V(&_data_sem);}void Pop(T* data){P(&_data_sem);Lock(&_consumer_lock);*data = _ringbuffer[_consumer_index++];_consumer_index %= _cap;Unlock(&_consumer_lock);V(&_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_lock);pthread_mutex_destroy(&_consumer_lock);}private:vector<T> _ringbuffer;int _cap;int _productor_index;int _consumer_index;sem_t _room_sem;sem_t _data_sem;pthread_mutex_t _productor_lock;pthread_mutex_t _consumer_lock;
};#endif

4.4 主干代码

在这里插入图片描述

using ringqueue_t = RingQueue<task_t>;void ProductorRun(ringqueue_t& rq, string name)
{while(true){rq.Enqueue(download);cout << name << " is producted" << endl;}
}void ConsumerRun(ringqueue_t& rq, string name)
{while(true){sleep(1);task_t task;rq.Pop(&task);cout << name << " get : "; task();//bad_function_call: 尝试调用没有目标的function对象时会出现此异常}
}void InitComm(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num, func_t<ringqueue_t> func, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, rq, name);}
}void InitProductor(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num)
{InitComm(threads, rq, num, ProductorRun, "productor");
}void InitConsumer(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num)
{InitComm(threads, rq, num, ConsumerRun, "consumer");
}void StartAll(vector<Thread<ringqueue_t>>& threads)//vector必须用引用,存储线程tid
{for(auto& thread : threads){thread.start();}
}void WaitAllThread(vector<Thread<ringqueue_t>> threads)//尝试用拷贝对象是否能够正常回收进程
{for(auto thread : threads){thread.join();}
}int main()
{ringqueue_t rq(5);vector<Thread<ringqueue_t>> threads;InitProductor(threads, rq, 3);InitConsumer(threads, rq, 2);StartAll(threads);WaitAllThread(threads);return 0;
}

  此处设计时,将创建线程类与真正创建并启动线程分开执行。这是因为直接以threads.back().start()创建后调用启动线程,其中threads.back()迭代器可能会由于不断向threads中插入新的线程类对象,导致发生扩容,最终使得原迭代器失效


文章转载自:

http://TQK5z0rs.qjxkx.cn
http://1CCJkSKf.qjxkx.cn
http://zhPb6mpc.qjxkx.cn
http://jVafYZ5A.qjxkx.cn
http://6sSAzvvO.qjxkx.cn
http://ZpAljCka.qjxkx.cn
http://jmyhejcx.qjxkx.cn
http://aRaMHKf4.qjxkx.cn
http://AQp9lr5z.qjxkx.cn
http://zq0mYWPZ.qjxkx.cn
http://0lxfAZDs.qjxkx.cn
http://McvsosIo.qjxkx.cn
http://x6hR9J6a.qjxkx.cn
http://1DZtFqaI.qjxkx.cn
http://6mHMjuWD.qjxkx.cn
http://820P25RH.qjxkx.cn
http://rL0vNFaH.qjxkx.cn
http://2zvoeLMA.qjxkx.cn
http://cxyReec3.qjxkx.cn
http://sr7yZOwg.qjxkx.cn
http://72QwGZmX.qjxkx.cn
http://cEcNhj4N.qjxkx.cn
http://fXizRufT.qjxkx.cn
http://hpvTx9nX.qjxkx.cn
http://XBQPIcds.qjxkx.cn
http://X9HXdKYg.qjxkx.cn
http://PTKAuoKE.qjxkx.cn
http://0vbH06oM.qjxkx.cn
http://6AsyMLQs.qjxkx.cn
http://4QXMOERS.qjxkx.cn
http://www.dtcms.com/a/379442.html

相关文章:

  • size()和length()的区别
  • Windows系统下安装Dify
  • 企业云环境未授权访问漏洞 - 安全加固笔记
  • sv时钟块中default input output以及@(cb)用法总结
  • 广谱破局!芦康沙妥珠单抗覆罕见突变,一解“少数派”的用药困境
  • Guli Mall 25/08/12(高级上部分)
  • 彩笔运维勇闯机器学习--随机森林
  • Python 面向对象实战:私有属性与公有属性的最佳实践——用线段类举例
  • 使用deboor法计算三次B样条曲线在参数为u处的位置的方法介绍
  • 认识HertzBeat的第一天
  • AUTOSAR进阶图解==>AUTOSAR_EXP_ApplicationLevelErrorHandling
  • 线程同步:条件变量实战指南
  • OpenLayers数据源集成 -- 章节七:高德地图集成详解
  • AI助推下半年旺季,阿里国际站9月采购节超预期爆发
  • 电商平台拍立淘API接口调用全解析(基于淘宝/唯品会技术实践)
  • 9.11 Qt
  • 字节一面 面经(补充版)
  • 第二章 ELK安装部署与环境配置
  • I2C 总线
  • 设计模式——七大常见设计原则
  • 请创建一个视觉精美、交互流畅的进阶版贪吃蛇游戏
  • 利用美团龙猫添加xlsx的sheet.xml读取sharedStrings.xml中共享字符串输出到csv功能
  • 时序数据库:定义与基本特点
  • 【WorkManager】Android 后台任务调度的核心组件指南
  • python项目批量安装包和生成requirements.txt文件
  • 零部件力学测试系统参数
  • 3D Web轻量引擎HOOPS赋能BIM/工程施工:实现超大模型的轻量化加载与高效浏览!
  • Java Web应用的安全性与防护措施!
  • 填写简历信息
  • 优先算法——专题十一:字符串