当前位置: 首页 > news >正文

【LINUX操作系统】生产者消费者模型(下):封装、信号量与环形队列

1.封装、完善基于阻塞队列的productor-consumer module

前文中我们封装了自己的Mutex 

【LINUX操作系统】线程同步与互斥-CSDN博客

按照老规矩,现在我们对同步与互斥的理解更进一步了,现在把这种面向过程的语言封装成面向对象的写法

 1.1 封装条件变量

#pragma once#include <iostream>
#include <string>
#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{class Cond{public:Cond(){pthread_cond_init(&_cond,nullptr);}~Cond(){pthread_cond_destroy(&_cond);}void Wait(){}void notify(){pthread_cond_signal(&_cond);}void notifyall(){pthread_cond_broadcast(&_cond);}private:pthread_cond_t _cond;};
}

在上面的代码中,需要注意wait函数,因为让线程在条件变量下等待时,pthread_cond_wait接口会释放线程所持有的锁,所以需要让该接口接收一个参数,用于pthread_cond_wait的第二个参数

                        

再由上一文中的对Mutex的封装:

【LINUX操作系统】线程同步与互斥-CSDN博客

                

1.2 在BlockQueue中使用自己封装的接口

完整的使用自己接口的第二版代码,将代码改成面向对象的风格

//version 2 引入自己的接口
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Mutex.hpp"
#include "Cond.hpp"namespace BQModule
{using namespace CondModule;using namespace LockMoudle;size_t gsize = 5;//整个队列最多装5个数据template<typename T>class BlockQueue{public:bool IsFull(){return _bq.size()==_capacity;}bool IsEmpty(){return _bq.empty();}BlockQueue(): _cwait_num(0),_pwait_num(0),_capacity(gsize){//自己封装的接口都是在定义时就初始化了// pthread_mutex_init(&_lock,nullptr);// pthread_cond_init(&_con_cond,nullptr);// pthread_cond_init(&_pro_cond,nullptr);}void Pop(T* p_data)//designed for Consumer{//pthread_mutex_lock(&_lock);MutexGuard mutexguard(_lock);while(IsEmpty()){_cwait_num++;//bq中已经没有数据,消费者线程需要进入条件变量去等待//pthread_cond_wait(&_con_cond,&_lock);//再次理解为什么wait必须释放锁_con_cond.wait(_lock);_cwait_num--;}*p_data = _bq.front();_bq.pop();//此处,刚刚减少数据,并且还没有解锁,bq中一定没有满,可以唤醒一个生产者线程if(_pwait_num)_pro_cond.notify();//pthread_mutex_unlock(&_lock);}void Enqueue(const T& data)//designed for Productor{//pthread_mutex_lock(&_lock);MutexGuard mutexguard(_lock);while(IsFull()){_pwait_num++;//bq中数据已经满了,生产者需要进入条件变量去等待_pro_cond.wait(_lock);_pwait_num--;}_bq.push(data);//此处,刚刚加入数据,并且还没有解锁,bq中一定有数据,可以去唤醒一个消费者条件变量中的线程if(_cwait_num)//pthread_cond_signal(&_con_cond);_con_cond.notify();//pthread_mutex_unlock(&_lock);}~BlockQueue(){//自己封装的类在析构时都会自动释放资源// pthread_mutex_destroy(&_lock);// pthread_cond_destroy(&_con_cond);// pthread_cond_destroy(&_pro_cond);}private:std::queue<T> _bq;Cond  _pro_cond; //生产者的条件变量Cond _con_cond; //消费者的条件变量Mutex _lock;//一把需要被各生产者和消费者看到的锁,用于互斥size_t _capacity; //容量int _cwait_num;int _pwait_num;};
}

这样一来,所有的锁都不再需要手动释放,而是代码在RAII风格下自动释放锁。 

1.3 实现多生产者多消费者版本

由于我们设计的本来就只有一把锁,消费者与消费者、生产者与生产者这两种关系先天就是互斥且同步的

测试结果:

1.4 传递任务的生产消费模型

交易场所不仅仅用来传递数据,也可以用来传递任务。

假设今天有个Task类,并且按照Task修改主函数中模板:

class Task
{
public:Task(int x = 1, int y = 1): _x(x), _y(y){}~Task() {}void Excute(){_res = _x+_y;}int res(){return _res;}
private:int _x;int _y;int _res;
};
using namespace BQModule;
void *Consumer(void *arg) // 消费者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){sleep(2);// 1.从bq中获得数据Task data;bq->Pop(&data); // 输出型参数// 2.处理数据data.Excute();std::cout << "consumer get: " << data.res() << std::endl;}
}void *Productor(void *arg) // 生产者
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(arg);while (true){// sleep(2);//  1.从外部获取数据//::data+=10;//+=操作不是原子的,可能发生进入了相同数据的情况int x = rand() % 5 + 1; //[1,5]int y = rand() % 9 + 1; //[1,9]Task t(x, y);// 2.将数据入队列bq->Enqueue(t);printf("productor asked  %d+%d=? \n", x, y);}
}int main()
{srand(time(nullptr) ^ getpid());BlockQueue<Task> *bq = new BlockQueue<Task>;// pthread_t p1,p2,c1,c2,c3;pthread_t p1, c1;pthread_create(&p1, nullptr, Productor, (void *)bq);pthread_create(&c1, nullptr, Consumer, (void *)bq);pthread_join(p1, nullptr);pthread_join(c1, nullptr);delete bq;return 0;
}

也可以使用using task_t =function<void()>来实现,直接把task_t当作BlockQueue的类型即可。


2. 进一步理解 生产消费者模型

互斥访问,何以高效?

生产消费模型都是在各种锁或者信号量的控制下进行的,那是否能说明模型中的各个线程都是串行执行的呢?串行执行难道不是有违设计线程的初衷,降低了整体效率吗?

由以上的传递任务的模型不难看出,生产消费模型并不是单纯的用于在阻塞队列中去传递数据(一些网络服务器虽然确实是做这个的),他还有很多其他运用场景,比如:玩卡牌游戏,一个线程用于获得键鼠等外设上的信息(生产者),另一个(可能是好几个)线程用于处理背后的逻辑(消费者),那其实对于整个进程来说,线程A外设上等待信息的时间和线程B处理背后逻辑的时间才是真正花时间的,而等待信息和处理逻辑的时候,显然两个线程是在并行。

超市中去放货取货只是整个进程中损耗很小的一部分,不应该由此担心是不是变成了串行。


3. POSIX信号量

        信号量(Semaphore)是操作系统中用于进程或线程同步的核心工具,其本质是一个非负整数计数器,用于表示共享资源的剩余数量或访问权限的状态。通过计数器的增减操作,信号量可以协调多个进程/线程对临界资源的访问,避免竞争条件(Race Condition)和数据不一致问题。

        在进程间通信的时候我们大致介绍过基于systemV的信号量,现在我们介绍POSIX信号量:

        之前我们的mutex都是对于资源的整体预订,而信号量是一种不用去整体预定的策略。

信号量就像买电影票,买了电影票表示预定了这个座位:一个大数组,我不需要全部预定完,可以只使用一个空间。信号量就是对剩余座位的计数器,表示当下还有多少“部分资源”可供使用。

                        

POSIX的信号量多用于满足线程间同步。

就像mutex的锁一样,信号量sem本身就是临界资源,所以信号量的--(P操作)或者++(V操作)本身必须是原子的。

所以对于锁来说(mutex),其实就是一个二元信号量去控制,也就是对于该信号量:非0即1。整体资源还存在、没被拿就是1,被拿了(在讲锁的原理的时候提到过,本质是swap,是原子性的汇编操作)就是0

  • P操作(sem_wait:也叫等待信号量
    减少信号量的值。若结果为负,则阻塞调用线程(或进程,取决于信号量类型),直到信号量非负。
    • 函数原型:
      int sem_wait(sem_t *sem);
    • 行为:
      • 若信号量值 > 0,则减1并继续执行。
      • 若信号量值 = 0,则线程被挂起,直到其他线程调用sem_post增加信号量值。
  • V操作(sem_post:也叫发布信号量
    增加信号量的值。若信号量原本为0(即有线程因sem_wait被阻塞),则唤醒一个阻塞的线程。
    • 函数原型:
      int sem_post(sem_t *sem);
    • 行为:
      • 信号量值加1。
      • 若存在因sem_wait阻塞的线程,则唤醒其中一个。

顺便再把semaphore中其他常用的接口也记录一下(semaphore头文件编译的时候也需要加上-lpthread)

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值销毁信号量
int sem_destroy(sem_t *sem);

 value表示一开始有的“部分资源”的数量。比如这种情况就该设置为16,如果已经有一个被使用就设置成15。

                

中间的参数都默认设置成0即可。


4. 基于环形队列的⽣产消费模型

        循环队列版本的生产消费模型本质就是利用了信号量实现并发处理,在前面的阻塞队列版本,不论是单生产、消费线程版本还是多生产、消费线程版本,都无法做到生产的同时消费,但是实际上如果可以以信号量来控制资源的使用逻辑就不影响了。

        因此,今天我们选择用一个数组vector来作为存放数据的基本容器

循环队列原理

head是循环队列的头,tail是循环队列的尾。整个队列为空时,head和tail同时指向同一个位置(如图)。一旦开始插入数据,head指向下一个会被使用的数据(也就是整个队列的第一个数据,队列的头,消费者下一次就从head处拿出数据),tail指向下一个即将插入数据的位置(也就是整个队列的第一个空位置,生产者生产出数据就放在tail指向的位置),整个循环队列满的时候,head和tail也都指向同一个位置。可以用取模的方法保证tail会去转圈。

                ​​​​​​​        

资源,分为数据空间。

对于生产者来说,tail从0开始,对空间资源进行P操作(--),对数据资源进行V操作(++)

对于消费者来说,head从0开始,对空间资源进行V操作,对数据资源进行P操作

        ​​​​​​​        

分析:刚开始没有数据的时候,head和tail指向同一个位置,难道不会让两个线程发生竞态条件吗?此时没有数据,数据资源的计数器----数据资源的信号量为0,该信号量自动不让消费者进入,保证生产者先进行原子性生产(对空间资源进行V操作)

最后数据满的时候,head和tail又指向同一个位置,此时没有空间了,所有的空间都有数据,空间资源的计数器----空间资源的信号量为0,该信号量自动不让生产者进入,保证消费者先进性原子性消费(对数据进行P操作)。

注意,至于每次等待的队列到底是FIFO还是随机争抢,这个取决于操作系统的实现方法 

代码实现

借鉴下之前写BlockQueue的主程序,稍加改造

【LINUX操作系统】生产者消费者模型(上):概念与阻塞队列-CSDN博客

(记得再改改变量名,bq是blockqueue的缩写......)

构建ringbuffer类:

 

namespace RingBufferModule
{template <typename T>class ringbuffer{public:ringbuffer(int init_sem_num=DEFAULTSENMVAL):_p_pos(0),_c_pos(0),_ring(SIZE),_size(SIZE){int n = sem_init(&_data_sem,0,init_sem_num);if(n<0){std::cerr<<"sem init error"<<std::endl;}int m = sem_init(&_space_sem,0,init_sem_num);if(m<0){std::cerr<<"sem init error"<<std::endl;}}void Enqueue(const T& in)//给生产者用{//1.获取数据//2.入环}void Pop(T* out)//给消费者用{}~ringbuffer(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);}ringbufferprivate:std::vector<T> _ring; //存储数据的基本容器,也是重点临界资源size_t _size; //整个环的大小sem_t _data_sem;//数据资源信号量sem_t _space_sem;//空间资源信号量int _p_pos;//生产者下标位置,即tail,表示下一次放数据的地方int _c_pos;//消费者下标位置,即head,表示下一次取数据的地方};
}

对于这个循环队列,需要有Enqueue和Pop的接口,分别给生产者和消费者用于放数据。

前几次都是先用最基本的C库接口封装,然后自己封装一个面向对象的类,再替换接口,这次直接先封装,再调接口。

就可以改变我们刚刚的构造函数

并且完成具体逻辑的函数,将各种sem_wait还有sem_post就都封装起来了。

在一开始数据为空的时候,不用担心消费者进来,因为此时_data_sem无法进行P操作,会阻塞在原地。

测试

这样,我们的循环队列保证在为空为满时都是进行“互斥与同步”,而在其余时刻,各自访问各自的信号量,提升了一丢丢的效率。


5. 多生产多消费的循环队列生产消费模型 

        再加入其他生产者消费者之后,关系又恢复成了需要控制:生产者与生产者、消费者与消费者,生产者与消费者。我们采取如下思路:给生产者们加一个锁,所有的生产者竞争出来一个线程去参加这次循环队列的sem的PV操作;给所有的消费者一个锁,所有的消费者竞争出来一个线程去参加这次循环队列的sem的PV操作。

依然是直接引进我们自己实现的RAII风格的锁。【LINUX操作系统】线程同步与互斥-CSDN博客

现在的问题是,锁在哪里加效率更高呢?

比如Pop,是在MutexGuard后使用data_sem.P()还是在MutexGuard之前呢? 

        

两者都能完成只竞争出一个进入循环队列的栈,区别在于:

在1,先锁住,只有一个去进行数据信号量的“取电影票”,然后正常执行。

在2,先让可以取电影票的线程(消费者们)都取一张电影票,然后再锁住,依次让人进入电影院获取资源。自然,方案2效率更高。一次性的并发_data_sem.P()可以减少之后串行调用该接口的时间。

        这样依然不担心生产者与消费者的互斥,因为能拿到票这一步已经天然的构成了一道屏障,只要拿到了票就一定不会冲突,可以各自使用各自的部分资源。

最终代码:

#define SIZE 10using namespace LockMoudle;namespace RingBufferModule
{template <typename T>class ringbuffer{public:ringbuffer(): _p_pos(0), _c_pos(0), _ring(SIZE), _size(SIZE), _data_sem(0), _space_sem(SIZE){}void Enqueue(const T &in) // 给生产者用{_space_sem.P();{MutexGuard lockguard(_p_mutex);_ring[_p_pos] = in;_p_pos++;_p_pos %= _size;}_data_sem.V();}void Pop(T *out) // 给消费者用{// MutexGuard lockguard(_c_mutex);//锁在这里1_data_sem.P();{MutexGuard lockguard(_c_mutex); // 锁在这里2*out = _ring[_c_pos];_c_pos++;_c_pos %= _size;}_space_sem.V();}~ringbuffer(){// 都自动销毁了}private:std::vector<T> _ring; // 存储数据的基本容器,也是重点临界资源size_t _size;         // 整个环的大小int _p_pos;           // 生产者下标位置,即tail,表示下一次放数据的地方int _c_pos;           // 消费者下标位置,即head,表示下一次取数据的地方Sem _data_sem;  // 数据资源信号量Sem _space_sem; // 空间资源信号量Mutex _p_mutex;Mutex _c_mutex;};
}

相关文章:

  • 【Spring】Spring的请求处理
  • SVGPlay:一次 CodeBuddy 主动构建的动画工具之旅
  • 融智学视域下的系统性认知增强框架——基于文理工三类AI助理赋能HI四阶跃迁路径
  • Linux调试生成核心存储文件
  • python线程相关讲解
  • 从0到1:Python项目部署与运维全攻略(10/10)
  • Flowbite 和 daisyUI 那个好用?
  • 数字化转型- 数字化转型路线和推进
  • 【四川省专升本计算机基础】第二章 计算机软硬件基础(2)
  • USRP 射频信号 采集 回放 系统
  • Python基础学习-Day27
  • 【Changer解码头详解及融入neck层数据的实验设计】
  • C#与KepOPC通讯
  • 手动实现 Transformer 模型
  • LeetCode --- 156双周赛
  • ubuntu 24.04安装ros1 noetic
  • 2022河南CCPC(前四题)
  • js中不同循环的使用以及结束循环方法
  • Spring Boot 中 MyBatis 与 Spring Data JPA 的对比介绍
  • NHANES指标推荐:FMI
  • 本周看啥|《歌手》今晚全开麦直播,谁能斩获第一名?
  • 戛纳打破“疑罪从无”惯例,一法国男演员被拒之门外
  • “多规合一”改革7年成效如何?自然资源部总规划师亮成绩单
  • 圆桌丨新能源车超充技术元年,专家呼吁重视电网承载能力可能面临的结构性挑战
  • 魔都眼|锦江乐园摩天轮“换代”开拆,新摩天轮暂定118米
  • 中办、国办关于持续推进城市更新行动的意见