【Linux】POSIX信号量、环形队列、基于环形队列实现生产者消费者模型
1.信号量
1.1 相关概念
信号量(sem)也叫信号灯,类型是sem_t,就是一个计数器,用来表明临界资源的资源数量,比如说看电影的时候,电影票就是资源,我们只要买了票这个资源就是自己的,所以信号量的本质就是对特定资源的预定机制。
资源为1的信号量叫二元信号量,二元信号量的本质就是互斥。
多线程使用资源有两种场景:
- 将目标资源整体使用(要用到锁mutex/二元信号量),我们之前实现的阻塞队列,就是把阻塞队列整体使用,就要加锁
- 将目标资源按不同的“块”分批使用(要用到信号量),今天要介绍的环形队列
所有线程申请信号量,就都要看到信号量,所以信号量也是临界资源。
信号量就是一个计数器,申请信号量就是对这个计数器--(P操作),释放资源就是对计数器++(V操作),PV操作都是原子的。
- 初始化:pshared: 0表⽰线程间共享,⾮零表⽰进程间共享;value:信号量初始值
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- 销毁信号量:
int sem_destroy(sem_t *sem);
-
等待信号量:P操作,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
-
发布信号量:V操作,表⽰资源使⽤完毕,可以归还资源了,将信号量值加1。
int sem_post(sem_t *sem);//V()
1.2 封装信号量
//Sem.hpp文件
#pragma once
#include <iostream>
#include <semaphore.h>namespace MySem
{const unsigned int default_sem_value = 1;class Sem{public:Sem(unsigned int sem_value = default_sem_value){sem_init(&_sem, 0, sem_value);}void P() // 原子的{int n = sem_wait(&_sem);(void)n;}void V() // 原子的{int n = sem_post(&_sem);(void)n;}~Sem(){sem_destroy(&_sem);}private:sem_t _sem;};
}
2.环形队列
现在我们的环形队列每一个位置都能放数据,为空或为满的情况通过 信号量判断。
现在要求一个生产者往这个队列里放数据,一个消费者从这个队列里拿数据。生产者不能把消费者套一个圈,消费者不能超过生产者。
- 队列为空或为满时:生产者和消费者指向同一个位置。
- 队列为空时:消费者没东西可拿,所以要求只能(互斥实现)让生产者先(同步实现)运行,
- 队列为满时:生产者没位置再生产,所以要求只能(互斥实现)让消费者先(同步实现)运行。
- 不为空也不为满时:消费者和生产者就能同时运行。
3.基于环形队列的生产者消费者模型
3.1 单生产单消费
所以对生产者来说,有用的资源是空位置,对消费者来说,有用的资源是数据,初始状态下,队列有多大,生产者的信号量就有多少,消费者此时没有资源,信号量就是0。
#pragma once#include "Sem.hpp" //用我们前面封装好的的信号量
#include <iostream>
#include <string>
#include <vector>using namespace MySem;const int defualt_cap = 1;template <typename T>
class CirQueue
{
public:CirQueue(int cap = defualt_cap): _cq(cap),_cap(cap),_producer_sem(cap),_p_index(0),_consumer_sem(0),_c_index(0){}~CirQueue(){}private:std::vector<T> _cq;int _cap; Sem _producer_sem; // 生产者信号量Sem _consumer_sem; // 消费者信号量int _p_index; // 生产者下标int _c_index; // 消费者下标
};
对于生产者,生产之前要申请信号量(P操作,就是-1),生产完后,要释放消费者信号量(V操作,就是+1)。
void Equeue(const T &data) // 生产者{_producer_sem.P(); // 申请信号量_cq[_p_index++] = data; // 生产,生产完了要往后走_p_index %= _cap; // 维持环状结构_consumer_sem.V(); // 让消费者的信号量+1}
对于消费者,消费之前要申请信号量,生产完后,要释放生产者的信号量。
T Pop() // 消费者{_consumer_sem.P(); // 申请信号量T data = _cq[_c_index++]; // 获取数据,获取之后往后走_c_index %= _cap; // 维持环状结构_producer_sem.V(); // 让生产者信号量+1return data;}
验证一下。
//Main.cc文件
#include "CircularQueue.hpp"
#include <unistd.h>void *PThreadFunc(void *arg) // 生产者
{int data = 1;CirQueue<int> *cq = static_cast<CirQueue<int> *>(arg);while (true){std::cout << "生产了一个数据: " << data<< std::endl;cq->Equeue(data);data++;sleep(1);}return nullptr;
}void *CThreadFunc(void *arg) // 消费者
{CirQueue<int> *cq = static_cast<CirQueue<int> *>(arg);while (true){int data = cq->Pop();std::cout << "消费了一个数据: " << data << std::endl;}return nullptr;
}int main()
{CirQueue<int> cq(5); // 环形队列pthread_t p1, c1;pthread_create(&p1, nullptr, PThreadFunc, &cq);pthread_create(&c1, nullptr, CThreadFunc, &cq);pthread_join(p1, nullptr);pthread_join(c1, nullptr);return 0;
}
这是消费者比生产者快的情况,生产者生产一个,消费者就消费一个。
还可以让消费者比生产者慢,就会出现生产者一次性把队列干满,然后消费者消费一个生产者就生产一个。
3.2 多生产多消费
多生产多消费就要加锁来维护生产者与生产者之间的关系,以及消费者与消费者之间的关系,所以我们要两把锁。锁用之前我们封装过的,在【Linux】线程的互斥 里有详细介绍。
void Equeue(const T &data) // 生产者{//_producer_mutex.Lock(); //之前加锁?_producer_sem.P(); // 申请信号量//_producer_mutex.Lock(); //之后加锁?_cq[_p_index++] = data; // 生产,生产完了要往后走_p_index %= _cap; // 维持环状结构//_producer_mutex.UnLock(); //之前解锁?_consumer_sem.V(); // 让消费者的信号量+1//_producer_mutex.UnLock(); //之后解锁?}
加锁的操作是在申请信号量之前还是之后呢?
肯定有很多同学认为,信号量也是临界资源,肯定要放在锁里面,也就是先加锁再申请信号量。事实并非如此。
可以理解为加锁就是我们在电影院门口排队,申请信号量就是在手机上买票,我们是先排着队再票效率高,还是先把票买了再去排队效率高?肯定是先买票。
对信号量的申请本质就是对资源的预定机制,而且申请信号量本身就是原子的,所有的线程先把信号量资源瓜分了,然后在串行式的申请锁,我申请到锁进入临界区后,别的线程可以去申请信号量,这样效率更高;反之,申请到锁的线程再去申请信号量,别的线程还要等这个线程把锁释放了才能申请信号量。
所以我们应该先申请信号量,再加锁。
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"using namespace MySem;
using namespace MyMutex;const int defualt_cap = 1;
template <typename T>
class CirQueue
{
public:CirQueue(int cap = defualt_cap): _cq(cap),_cap(cap),_producer_sem(cap), // 空位的数量_p_index(0),_consumer_sem(0), // 数据的数量_c_index(0){}void Equeue(const T &data) // 生产者{_producer_sem.P(); // 申请信号量{LockGuard lg(&_producer_mutex); // 加锁_cq[_p_index++] = data; // 生产,生产完了要往后走_p_index %= _cap; // 维持环状结构}_consumer_sem.V(); // 让消费者的信号量+1}T Pop() // 消费者{T data;_consumer_sem.P(); // 申请信号量{LockGuard lg(&_consumer_mutex); // 加锁data = _cq[_c_index++]; // 获取数据,获取之后往后走_c_index %= _cap; // 维持环状结构}_producer_sem.V(); // 让生产者信号量+1return data;}~CirQueue(){}private:std::vector<T> _cq;int _cap;Sem _producer_sem; // 生产者信号量Sem _consumer_sem; // 消费者信号量int _p_index; // 生产者下标int _c_index; // 消费者下标Mutex _producer_mutex; // 生产者的互斥锁Mutex _consumer_mutex; // 消费者的互斥锁
};
如果现在环形队列的大小为1,就意味着环形队列被当作一个整体使用了,也就变成了互斥锁,也就是之前说过的阻塞队列。
本片分享就到这里,我们下篇见~