Linux: 生产者消费者模型
目录
一 前言
二 理解生产者-消费者模型
三 以阻塞队列模拟生产者消费者模型
四 生产者消费者模型的并发
五 POSIX信号量
1. 什么是信号量
2. 信号量接口
2.1 sem_init() 信号量初始化接口
2.2 sem_destroy(),信号量销毁接口
2.3 sem_wait(),信号量等待接口,即申请信号量
2.4 sem_post(),信号量释放接口
六 环形队列模拟生产者消费者模型
1. 什么是环形队列
2. 模拟生产者消费者模型
一 前言
生产者-消费者模型(Producer-Consumer Pattern)是一种经典的多线程同步问题,它描述了一组生产者线程和一组消费者线程共享一个有限缓冲区的情况。生产者负责创建数据并将其放入缓冲区中,而消费者从缓冲区中取出数据并处理它们。这个模型广泛应用于各种并发编程场景中,以确保线程之间高效、安全地交换数据。
二 理解生产者-消费者模型
生产者-消费者模型遵循着 3 2 1 原则:
- 3种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥和同步)
- 2种角色:生产者线程,消费者线程
- 一个交易场所:一段特定结构的缓冲区
接下来我们举个例子来更好的理解生产者-消费者模型。以生活中的超市为例,超市给学生们提供商品,我们以学生来代表消费者,厂家给超市提货物,我们以厂家来代表生产者,这样学生购买商品和厂家提供商品中间的媒介都是该超市。
那这里为什么需要超市作为中间的媒介呢?为什么消费者不直接去生产者那里消费呢?
如同生活中的那样,厂家生产商品是要成本的,通常一次就会生产很多商品,而单个消费者很少一次能消费那么多,而如果只是因为消费者的少量需求而厂家生产大量的商品,这会导致成本的浪费,所以一般情况下厂家是不直接和消费者进行买卖的。所以我们就需要超市这个媒介,一方面只要有商品可以卖出去,厂家就可以一直生产商品,而超市一直有商品,学生就可以随时来直接购买,这也就是说厂家和学生之间的强耦合的关系没有了,变成了弱耦合。即消费者没有去购买商品,厂家也可以一直去生产。厂家不生产,只要超市有货,消费者也可以去消费
上述的模型拿到我们编程环境中来,消费者就是消费线程,生产者就是生产线程,那么超市就是临界资源了。
继续拿现实生活中的例子来说,现实中不止有一个生产者也不止有一个消费者。那他们之间的关系是什么呢?
- 消费者与消费者之间是一种 竞争关系 ,在商品充足的情况下大家是互不干扰的,但是如果面临着商品不足的问题,消费者之间是需要竞争的。其实也就是一种 互斥 关系。
- 生产者与生产者之间也是一种竞争关系,他们竞争的是超市内部的空间资源,也是一种互斥关系。
- 消费者和生产者是一种什么样子的关系呢?首先需要考虑到的是对于临界资源,生产者和消费者是有可能在同一时间访问的,如果此时生产线程还没有向临界资源内写完数据,消费线程是不可以直接从临界资源中拿走数据的,这可能会导致资源的不完整。所以,它们首先要保持一个互斥的关系。同时它们还需要保持同步,因为生产线程不能在临界资源已满的情况下继续向临界资源中写入数据,消费线程也不能在临界资源为空的状态下去进行读取。
综上所述,以操作系统层面来看待生产者消费者模型可以这么概括:
生产者消费者模型存在的关系:
- 生产者之间:互斥关系
- 消费者之间:互斥关系
- 生产者和消费者之间:互斥和同步关系
生产者和消费者是操作系统中线程承担的两种角色。
临界资源区是为生产者和消费者提供 “ 交易 ” 的场所,可以理解为缓冲区。
在我们实际中,我们就需要按照这样的思路来解决问题,其中最重要的是如何让生产者或者消费者进行等待呢?又如何将两者唤醒呢?如何判断所需的条件被满足呢?这就需要我们在上章节学到的条件变量了。
🌿 生产者-消费者模型优点:
- 解耦
- 并发
- 支持忙闲不均
三 以阻塞队列模拟生产者消费者模型
为了满足生产者和消费者之间的同步关系,我们设计一个阻塞队列来实现该过程。当队列为空时,消费线程从队列中获取元素的操作会被阻塞,直到队列中被放入了元素;当队列满时,生产线程往队列中存放元素的操作也会被阻塞,直到有元素被从队列中取出。
********makefile*****************//
MainCp:MainCp.cppg++ -o MainCp MainCp.cpp -std=c++11 -lpthread
.PHONY:clean
clean:rm -f MainCp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>/BlockQueue.hpp///
//阻塞队列
const int gmaxcap=5;
template<class T>
class BlockQueue
{public://1.队列的构造函数BlockQueue(const int &maxcap=gmaxcap ):_maxcap(maxcap){//对锁以及条件变量初始化pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_pcond,nullptr);pthread_cond_init(&_ccond,nullptr);}//生产行为void push(const T& in){pthread_mutex_lock(&_mutex);//1.判断if(is_full()){//因为生产条件不满足,无法等待,所以生产者进行等待pthread_cond_wait(&_pcond,&_mutex);}//2. 走到这一步,说明队列没有满,进行生产_q.push(in);//3. 队列有元素,唤醒消费者进行消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);}//消费行为void pop(T* out){pthread_mutex_lock(&_mutex);//1.判断if(is_empty()){//因为生产条件不满足,无法等待,所以生产者进行等待pthread_cond_wait(&_ccond,&_mutex);//可以理解为用的同一把锁}//2. 走到这一步,说明队列不为空,进行消费*out=_q.front();_q.pop();//3. 走到这里,能保证队列中有一个空位置,唤醒生产者进行生产pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}
private:bool is_empty(){return _q.empty();}bool is_full(){return _q.size()==_maxcap;}
private:std::queue<T> _q;int _maxcap;//队列中元素上限pthread_mutex_t _mutex;//定义一把锁,为了让阻塞队列满足互斥性pthread_cond_t _pcond;//生产者对应的条件变量pthread_cond_t _ccond;//消费者对应的条件变量
};
#include "BlockQueue.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>/MainCp.cpp/////消费者行为
void* consumer(void* bq_)
{BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(bq_);while(true){//消费活动int data;bq->pop(&data);std::cout<<"消费数据:"<<data<<std::endl;}return nullptr;
}//生产者行为
void* productor(void* bq_)
{BlockQueue<int>* bq=static_cast<BlockQueue<int>*>(bq_);while(true){//生产活动int data=rand()%10 +1;//生产随机数bq->push(data);std::cout<<"生产数据: "<<data<<std::endl;sleep(1);}return nullptr;}//生产者消费者模型
int main()
{srand((unsigned long)time(nullptr)^getpid());//获取随机种子//一般情况下,new出来的是类对象,加不加()调用的都是默认构造函数,因此没有区别。BlockQueue<int>* bq =new BlockQueue<int>();//阻塞队列pthread_t c, p;//消费者,生产者线程pthread_create(&c,nullptr,consumer,bq);//这里的bq充当两个线程共享资源pthread_create(&p,nullptr,productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0;
}
测试如下:
为了 更明显的看到生产者消费者同步现象,我们让生产者休眠sleep(1),最后我们一定能看到生产者生产一个,消费者消费一个。
🍀:pthread_cond_wait(&_pcond,&_mutex)为什么第二个参数必须是我们的 互斥锁?
例如
pthread_mutex_lock(&_mutex);
if(is_full())
{
pthread_cond_wait(&_pcond,&_mutex);
}
_q.push(in);
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
当pthread_cond_wait()进行等待的时候,说明队列满了,消费者要进行消费,这个时候wait()调用的时候,必须将锁释放,然后将自己挂起,如果不将锁释放,则消费者申请不到锁,此外该函数在被唤醒返回的时候,必须要重新获取传入的锁,以此来保证数据安全性。
🍂此时还有需要注意的点?
//生产行为void push(const T& in){pthread_mutex_lock(&_mutex);//1.判断
//****************************************//while(is_full())//注意这里是while
//***************************************//{pthread_cond_wait(&_pcond,&_mutex);}_q.push(in);pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);}
为什么这里是while(),而不是if呢?首先我们这里用的是一个生产者一个消费者,如果多个生产者多个消费者,此外唤醒的时候,用的是 pthread_cond_broadcas()线程全部唤醒,很有可能当其中一个线程已经push一个又满了。而生产者没有回过头判断而多push一次。用while增强了代码的健壮性。
四 生产者消费者模型的并发
上面的例子中我们只设计了一个生产者和一个消费者,所以对于并发这一特点我们没法看到,接下来我们使用另外一个类,当作阻塞队列的数据类型。
#pragma once
#include <cstdio>
#include <iostream>
#include <functional>
Task.hpp//
/BlockQueue.hpp代码不变///
class Task
{using func_t =std::function<int(int,int,char)>;
public:Task(){}Task(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){} //()重载,仿函数std::string operator()(){int result=_callback(_x,_y,_op);char buffer[1024];snprintf(buffer,sizeof(buffer),"%d %c %d=%d",_x,_op,_y,result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer,sizeof(buffer),"%d %c %d=?",_x,_op,_y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;//任务中定义一个方法
};
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
/MainCp.cpp///
const std::string oper ="+-*/%";
//回调函数
int mymath(int x,int y,char op)
{int result=0;switch(op){case '+':result =x+y;break;case '-':result =x-y;break; case '*':result =x*y;break; case '/':if(y==0){std::cerr<<"div zero error!"<<std::endl;result -1;}else result =x/y;break; case '%':if(y==0){std::cerr<<"mod zero error!"<<std::endl;result -1;}else result =x%y;break; default:break;}return result;
}
//消费者行为
void* consumer(void* bq_)
{BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(bq_);while(true){//消费活动Task t;bq->pop(&t);std::cout<<"消费任务:"<<t()<<std::endl;}return nullptr;
}
//生产者行为
void* productor(void* bq_)
{BlockQueue<Task>* bq=static_cast<BlockQueue<Task>*>(bq_);while(true){//生产活动int x=rand()%10 +1;//生产随机数int y=rand()%5;int operCode=rand()%oper.size();Task t(x,y,oper[operCode],mymath);bq->push(t);std::cout<<"生产任务: "<<t.toTaskString()<<std::endl;//sleep(1);}return nullptr;
}
//生产者消费者模型
int main()
{srand((unsigned long)time(nullptr)^getpid());//获取随机种子//一般情况下,new出来的是类对象,加不加()调用的都是默认构造函数,因此没有区别。BlockQueue<Task>* bq =new BlockQueue<Task>();//阻塞队列pthread_t c[2], p[3];//2个消费者,3生产者线程pthread_create(c,nullptr,consumer,bq);//这里的bq充当多个线程共享资源pthread_create(c,nullptr,consumer,bq);//这里的bq充当多个线程共享资源pthread_create(p,nullptr,productor,bq);pthread_create(p+1,nullptr,productor,bq);pthread_create(p+2,nullptr,productor,bq);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(p[2],nullptr);delete bq;return 0;
}
测试结果:
上面就是我们使用阻塞队列模拟生产者消费者模型,生产和消费加减乘和除余操作的任务。
🍃:但是上述的代码中我们依旧是没有看到并发的现象,实际上生产者消费者模型中支持并发并不是说 消费者和生产者可以 ‘并发’ 地向超市(阻塞队列)消费或者生产数据。因为超市是临界资源加了锁的所以每次只能有一个线程执行,我们所说的消费者生产者模型高并发高效而是指生产者生产商品之前、消费者消费商品之后的过程是可以并发进行的。
五 POSIX信号量
POSIX信号量 也是同步的一种机制,最终的效果和条件变量一样,都是为了实现线程同步
1. 什么是信号量
我们以生活中的看电影的例子来解释一下信号量,在我们生活中去电影院看电影时,我们首先要去买票,选择自己要看的电影然后进行选座,但是电影院中的座位都是有限的,我们每买一张票,电影院的空位置就少一个,买到了电影票,实际上就是让该场次电影的座位数减1,即预定了放映厅的一个座位。
那在我们编程的角度如何理解呢?
我们将放映厅看作一个临时资源,每一个座位都代表着临界资源的一小部分,这时所有的座位都可以看作是信号量,当有人买票选中了座位时,座位数量减1,可以看作是 信号量-- ;如果有人退票,则可以看作是 信号量++;当放映厅里面没有座位的时候,此时就表示信号量为0,其他人想要再买票的时候,就需要等人退票。
也就是说,当 信号量-- 时也就代表临界资源中的一部分被选中了,也就表示后面只能选择除了该部分临界资源的其他部分。
🍇上面我们用电影院选座位的方式来类比信号量,将整个临界资源比作放映厅的所有座位,那么临界资源的內容也可以分为一小部分一小部分吗?
是可以的,临界资源分为一小部分一小部分的是通过信号量操作来让线程选中的。
🍈申请信号量,实际上就是对一部分临界资源的申请,如果申请到了信号量,那么就一定代表获得了一部分的临界资源吗?
是的,只要申请到了信号量,就一定获得了一部分的临界资源,因为只要申请到了,在不释放的情况下,别人是无法申请的,从原则上来说,已经获得了该部分的资源。
信号量可以被所有的线程申请和释放,即通过 ++和-- 操作,即信号量也是一个临界资源,信号量的申请和释放必须是原子性的,事实上也是的确如此。
🍉信号量本质上就是一个计数器
2. 信号量接口
信号量的类型为 sem_t ,与互斥锁和条件变量一样,它的基本接口也涉及初始化、等待(即申请信号量)、释放信号量,销毁信号量。
2.1 sem_init() 信号量初始化接口
- 参数sem_t *sem :要传入的信号量对象
- int pshared :传入0,表示多线程之间不共享,申请的信号量为线程独有的
- int value : value的大小表示信号量的大小,不如value=10,代表电影票十张。
2.2 sem_destroy(),信号量销毁接口
2.3 sem_wait(),信号量等待接口,即申请信号量
它会减少信号量的计数值即信号量- -。如果信号量的当前值大于0,则 sem_wait
会成功地将信号量的值减1并立即返回。如果信号量的值为0,sem_wait
将阻塞调用线程,直到其他线程通过 sem_post
函数增加信号量的值为止。
2.4 sem_post(),信号量释放接口
sem_post
是 POSIX 信号量的一个函数,用于增加(“发布”)信号量的值。它通常用来通知其他正在等待该信号量的线程,表明某个资源现在可用或某个操作可以继续进行。当sem_post()调用成功,则信号量++。
信号量的使用一般流程
sem_t sem 创建------->sem_init() 初始化信号量-------->sem_wait 申请信号量
----------->sem_post 信号量释放--------------->sem_destroy 信号量的销毁
六 环形队列模拟生产者消费者模型
在之前我们使用条件变量以阻塞队列模拟了生产者消费者模型。下面我们通过信号量以环形队列模拟生产者消费者模型。
1. 什么是环形队列
环形队列只是一个在逻辑上环形的,我们使用普通的数组模拟出该环形队列。如我们现在使用[0,7]一共8个空间的数组来模拟一个环形队列,我们知道队列的特点是先进先出,我们要在数组中实现先进先出,就要考虑 可变的队头。
如果用数组实现普通的队列时,一般是固定队头,按照[0,7]的数组,我们就先要固定队头恒为0。
队列的先进先出总是从队尾进,从0的位置出,然后将后边的元素向前移动一位。
而环形的队列则不同,逻辑上我们可以想象将数组卷了起来。
队列使用两个指针来表示队头和队尾,不同于普通的队列,环形队列的队头是可以变化的。
举个例子来说,如果在上图所示的环形队列中,如果0、1、2、3位置存储了数据,那么队头的指针就指向了0,队尾指针指向3或者4.在出队列时,我们并不用考虑pop数据出去,只需要将队头指针++,即将队头指针从0挪到1,新的队头就是1的位置。入队列就是队尾指针++,再存放新的数据,当指针移动到最后一个元素时,再执行 ++ 操作就会回到0位置(该操作一般由取模控制)。
环形队列的好处就是我们在进行出入队列操作时,不需要移动数据只需要挪动队头队尾指针即可,出队列的数据会被后来的入队列的数据覆盖掉。但是环形队列还有一个缺点就是不容易直接判断队列是否为满或者空。
我们上面的环形队列,当队列为空时,队头和队尾指向同一位置,当队列满时,队头和队尾也指向同一位置,所以为了判断环形队列是否为空或者为满,一般使用的是入队列出队列计数器。
2. 模拟生产者消费者模型
接下来考虑使用环形队列模拟生产者消费者模型。
- 将线程分为生产者消费者两种,生产者生产数据,将数据入队列,消费者消费数据,将数据出队列。
- 当队列为空时,生产者可以生产,但是消费者不能消费,当队列为满时,消费者可以消费,但是生产者不能生产。此时生产线程和消费线程是互斥和同步的关系。
- 当队列为其他情况时,生产者和消费者可以并发地进行生产和消费,因为此时生产线程和消费线程访问的不是同一个位置。
上述地实现都是由信号量保证的,那么如何保证呢?
在队列中,生产者需要空间资源,因为需要向队列中入数据,消费者需要数据资源,因为需要出数据。所以我们就可以针对不同的资源,创建两个信号量,一个是表示空间资源的 _spaceSem,另一个是表示数据资源的 _dataSem 。
生产者要生产即申请空间资源信号量,申请成功,则空间资源信号量 -- ,并且申请成功就表示获得了一块空间资源,别人就无法获取,然后等到生产数据入队列之后,数据资源信号量还需要 ++ ,因为有数据入队列了。
对于消费者来说,申请数据资源信号量,申请成功则数据资源信号量 --,等到数据出队列之后,空间资源信号量也是需要++的。
初始情况下,我们将 _spaceSem设置为5,_dataSem设置为0.
只要控制好,_spaceSem++时,对应的_dataSem就需要 --;_dataSem++时,对应的_spaceSem--,就可以保证生产者消费者不会互相超越。
队列中没有数据时,roomSem为5,dataSem为0,需要生产者先生产;队列中满数据时,roomSem为0,dataSem为5,需要消费者先消费,这两种情况,生产者和消费者是同步的。其他时候,两种线程生产和消费就是可以并发执行的。
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>//RingQueue.hpp
static const int gcap=5;
template<class T>
class RingQueue
{
private:void P(sem_t &sem){int n=sem_wait(&sem);//申请信号量,会将信号量的值减1 assert(n==0);(void)n;}void V(sem_t &sem){int n=sem_post(&sem);//消费信号量,发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。assert(n==0);(void)n;}
public:RingQueue(const int &cap=gcap):_queue(cap),_cap(cap){int n=sem_init(&_spaceSem,0,_cap);//信号集初始化,0,代表线程不共享信号集,_cap=5assert(n==0);(void)n;n=sem_init(&_dataSem,0,0);//对数据资源进行初始化assert(n==0);(void)n;_productorStep=_consumerStep=0;}//生产者void Push(const T&in){P(_spaceSem);//申请空间信号量_queue[_productorStep++]=in;//所以用vector_productorStep %= _cap;V(_dataSem);}void Pop(T* out){P(_dataSem);//申请信号量对应的就是--操作,从数据资源_dataSem进行申请*out=_queue[_consumerStep++];_consumerStep%= _cap;V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}
private:std::vector<T> _queue;//环形队列物理上就是一个数组int _cap;sem_t _spaceSem;//生产者的空间资源sem_t _dataSem;//消费者的数据资源int _productorStep;int _consumerStep;
};
#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
/main.cpp//void* ProductorRoutine(void* rq)
{RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);while(true){//todoint data=rand()%10+1;ringqueue->Push(data);std::cout<< "生成完成,生产的数据是"<<data<<std::endl;}
}void* ConsumerRoutine(void* rq)
{RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);while(true){int data;ringqueue->Pop(&data);std::cout<<"消费完成, 消费的数据是"<<data<<std::endl;}
}
int main()
{//生成随机数种子srand((unsigned int)time(nullptr)^getpid()^pthread_self()^0x134322);RingQueue<int>* rq=new RingQueue<int>();pthread_t c, p;//两个线程,消费者,生产者pthread_create(&p,nullptr,ProductorRoutine,rq);pthread_create(&p,nullptr,ConsumerRoutine,rq);pthread_join(p,nullptr);//线程等待pthread_join(c,nullptr);//线程等待delete rq;return 0;
}
🍊此时我们实现的代码是单生产线程和单消费线程的. 如果是多生产线程和多消费线程, 我们当前模拟的生产者消费者模型的代码会出现错误吗?
毫无疑问, 会出现错误.因为, 如果是多线程生产和消费, 那么我们对索引下标的++保护就不合格.
在多线程生产时, 只要队列中存在足够的空间, 多线程就会并发的去访问 索引下标, 而一个对象内只有一个索引下标, 如果不对索引下标添加保护, 就一定会造成错误. 所以, 我们需要用锁来对索引下标进行保护.
那么, RingQueue.hpp封装就需要改动一下,下面是多线程并发的生产者消费者模型。
#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
/main.cpp//void* ProductorRoutine(void* rq)
{RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);while(true){//todoint data=rand()%10+1;ringqueue->Push(data);std::cout<< "生成完成,生产的数据是"<<data<<std::endl;sleep(1);}
}
void* ConsumerRoutine(void* rq)
{RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);while(true){int data;ringqueue->Pop(&data);std::cout<<"消费完成, 消费的数据是"<<data<<std::endl;}
}
int main()
{//生成随机数种子srand((unsigned int)time(nullptr)^getpid()^pthread_self()^0x134322);RingQueue<int>* rq=new RingQueue<int>();// pthread_t c, p;//两个线程,消费者,生产者// pthread_create(&p,nullptr,ProductorRoutine,rq);// pthread_create(&p,nullptr,ConsumerRoutine,rq);// pthread_join(p,nullptr);//线程等待// pthread_join(c,nullptr);//线程等待//多生产多消费pthread_t p[4] ,c[4];for(int i=0;i<4;i++) pthread_create(p+i,nullptr,ProductorRoutine,rq);for(int i=0;i<4;i++) pthread_create(c+i,nullptr,ConsumerRoutine,rq);for(int i=0;i<4;i++) pthread_join(p[i],nullptr);for(int i=0;i<4;i++) pthread_join(c[i],nullptr);delete rq;return 0;
}
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>//RingQueue.hpp
static const int gcap=5;
template<class T>
class RingQueue
{
private:void P(sem_t &sem){int n=sem_wait(&sem);//申请信号量,会将信号量的值减1 assert(n==0);(void)n;}void V(sem_t &sem){int n=sem_post(&sem);//消费信号量,发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。assert(n==0);(void)n;}
public:RingQueue(const int &cap=gcap):_queue(cap),_cap(cap){int n=sem_init(&_spaceSem,0,_cap);//信号集初始化,0,代表线程不共享信号集,_cap=5assert(n==0);(void)n;n=sem_init(&_dataSem,0,0);//对数据资源进行初始化assert(n==0);(void)n;_productorStep=_consumerStep=0;pthread_mutex_init(&_pmutex,nullptr);pthread_mutex_init(&_cmutex,nullptr);}//生产者void Push(const T&in){//************这里是个改动*******///P(_spaceSem);//申请空间信号量pthread_mutex_lock(&_pmutex);//为什么在申请信号量之后加锁,后续_productorStep++要保证原子性//所以加锁,我们可以在加锁之前,并行申请信号量,增加效率_queue[_productorStep++]=in;//所以用vector_productorStep %= _cap;pthread_mutex_unlock(&_pmutex);V(_dataSem);//************这里是个改动*******///}void Pop(T* out){//************这里是个改动*******///P(_dataSem);//申请信号量对应的就是--操作,从数据资源_dataSem进行申请pthread_mutex_lock(&_cmutex);//上锁*out=_queue[_consumerStep++];_consumerStep%= _cap;pthread_mutex_unlock(&_cmutex);V(_spaceSem);//************这里是个改动*******/// }~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}
private:std::vector<T> _queue;//环形队列物理上就是一个数组int _cap;sem_t _spaceSem;//生产者的空间资源sem_t _dataSem;//消费者的数据资源int _productorStep;int _consumerStep;//************这里是个改动*******///pthread_mutex_t _pmutex;pthread_mutex_t _cmutex;
};
测试结果:
七 总结
- 信号量和条件变量都是实现线程同步的手段,不同的是信号量可以提前知道资源的使用情况,而条件变量则需要通过不断的判断资源的使用情况进而选择是否等待wait
- 条件变量的使用,一般需要多个线程之间拥有共同繁荣一把锁,而信号量则可以线程之间独立有自己的锁。
- 条件变量配合锁使用是对整个资源的使用,而信号量是对整体资源进行划分,对不同资源进行使用。