【Linux学习笔记】基于阻塞队列和环形队列的生产者消费者模型
【Linux学习笔记】基于阻塞队列和环形队列的生产者消费者模型

🔥个人主页:大白的编程日记
🔥专栏:Linux学习笔记

文章目录
- 【Linux学习笔记】基于阻塞队列和环形队列的生产者消费者模型
- 前言
- 2-5基于BlockingQueue的生产者消费者模型
- 2-5-1 BlockingQueue
- 2-5-2 C++ queue模拟阻塞队列的生产消费模型
- 2-5-3 BlockQueue.hpp
- 2-6 为什么 pthread_cond_wait 需要互斥量?
- 2-7条件变量使用规范
- 2-8条件变量的封装
- 2-9 POSIX信号量
- 2-9-1 基于环形队列的生产消费模型
- 注意:
- 后言
前言
哈喽,各位小伙伴大家好!上期我们讲了生产者消费者模型 今天我们讲的是基于阻塞队列和环形队列的生产者消费者模型。话不多说,我们进入正题!向大厂冲锋!
2-5基于BlockingQueue的生产者消费者模型
2-5-1 BlockingQueue
在多线程编程中阻塞队列
(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
2-5-2 C++ queue模拟阻塞队列的生产消费模型
代码:
- 我们以单生产者,单消费者,来进行讲解。
• 刚开始写,我们采用原始接口。 - 我们先写单生产,单消费。然后改成多生产,多消费(这里代码其实不变)。
2-5-3 BlockQueue.hpp
#pragma once
#include <pthread.h>
#include <iostream>
#include <string>
#include <queue>
using namespace std;const int defaultcap = 5; // for test
template <class T>
class BlockQueue
{
public:bool IsFull(){return _cap <= _q.size();}bool IsEmpty(){return _q.size() == 0;}BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}T Pop(){pthread_mutex_lock(&_mutex);// 避免休眠失败向后直接pop 进行二次检查!while (IsEmpty()){// 队列为空消费者去条件变量下休眠_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T ret = _q.front();_q.pop();// 走到这一定有生产空间 唤醒生产者生产if (_psleep_num){pthread_cond_signal(&_full_cond);cout << "唤醒一个生产者" << endl;}pthread_mutex_unlock(&_mutex);return ret;}void Equeue(const T& date){pthread_mutex_lock(&_mutex);// 避免休眠失败向后直接push 进行二次检查!while (IsFull()){cout<<"生产者进行休眠了!:"<<_psleep_num<<endl;// 队列为满生产者去条件变量下休眠_psleep_num++;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}_q.push(date);// 走到这一定有消费空间 唤醒消费者生产if (_csleep_num){pthread_cond_signal(&_empty_cond);cout << "唤醒一个消费者" << endl;}pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty_cond);}private:std::queue<T> _q; // 临界资源!!!int _cap; // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};
注意:这里采用模版,是想告诉我们,队列中不仅仅可以防止内置类型,比如int,对象也可以作为任务来参与生产消费的过程哦。
1 #pragma once
2
3 #include<iostream>
4 #include<string>
5 #include<functional>
6
7 //任务类型1
8 //class Task
9 //{
10 //public:
11 //Task(){}
12 //Task(int a,int b):_a(a),_b(b),_result(0)
13 // {
14 // }
15 // void Excute()
16 // {
17 // _result = _a + _b;
18 // }
19 // std::string ResultToString()
20 // {
21 // return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
22 // }
23 // std::string DebugToString()
24 // {
25 // return std::to_string(_a) + "+" + std::to_string(_b) + "=";
26 // }
27
28 // private:
29 // int _a;
30 // int _b;
31 // int _result;
32 // };
33
34 // 任务类型2
35 using Task = std::function<void>();
2-6 为什么 pthread_cond_wait 需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

互斥量
被锁挡住了没办法执行
造成死锁
- 按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {pthread_mutex_unlock(&mutex);// 解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过pthread_cond_wait(&cond, &mutex);pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 由于解锁和等待不是原子操作。调用解锁之后,
pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait。所以解锁和等待必须是一个原子操作。 intPTHread_cond_wait(pthread_cond_t \*cond,pthread_mutex_t\* mutex);进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样。

2-7条件变量使用规范
等待条件代码
1 pthread_mutex_lock(& mutex);
2 while(条件为假)//if??
3 pthread_cond_waitcond,mutex);
4 修改条件
5 pthread_mutex_unlock(&mutex);
- 给条件发送信号代码
1 pthread_mutex_lock(& mutex);
2 设置条件为真
3 pthread_cond_signalcond);
4 pthread_mutex_unlock(& mutex);
2-8条件变量的封装
- 基于上面的基本认识,我们已经知道条件变量如何使用,虽然细节需要后面再来进行解释,但这里可以做一下基本的封装,以备后用。
Cond.hpp
#include <pthread.h>
#include <iostream>
using namespace std;
#include"Mutex.hpp"
using namespace MutexModule;
namespace CondModule
{class Cond{public:Cond(){pthread_cond_init(&_cond,nullptr);}~Cond(){pthread_cond_destroy(&_cond);}void Wait(Mutex& mutex){pthread_cond_wait(&_cond,mutex.Get());}void Signal(){pthread_cond_signal(&_cond);}void Brodcast(){pthread_cond_broadcast(&_cond);}private:pthread_cond_t _cond;};
}
2-9 POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。
初始化信号量
1 #include <semaphore.h>
2 int sem_init(sem_t *sem, int pshared, unsigned int value);
3 参数:
4 pshared:0表示线程间共享,非零表示进程间共享
5 value: 信号量初始值
销毁信号量
1 int semdestroy(sem_t *sem);
等待信号量
1 功能:等待信号量,会将信号量的值减1
2 int sem_wait(sem_t *sem); //P()
发布信号量
1 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
2 int sem_post(sem_t *sem); //V()
上一节生产者- 消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):

2-9-1 基于环形队列的生产消费模型

- 环形队列采用数组模拟,用模运算来模拟环状特性

-
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
-
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。

#include <pthread.h>
#include <iostream>
#include<semaphore.h>
using namespace std;
namespace SemModule
{const int defaultvalue = 1;class Sem{public:Sem(int value=defaultvalue){sem_init(&_sem,0,value);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}~Sem(){sem_destroy(&_sem);}private:sem_t _sem;};
}

注意:
- 这里我们还是忍住,先进行原始接口的使用
- 先单生产,单消费,然后改成多生产,多消费。
- 关于任务,cond处已经介绍,这里就不再重复了。
#pragma once#include <iostream>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"static const int gcap = 5; // for debug
using namespace SemModule;
using namespace MutexModule;
template <typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _cap(cap), _rq(cap), _p_step(0), _c_step(0), _blank_sem(cap), _data_sem(0){}void Equeue(const T &in){_blank_sem.P();// 生产信号量--{LockGuard guard(_pmutex);_rq[_p_step++] = in;// 生产数据_p_step %= gcap;// 维持环状特性}_data_sem.V();// 消费信号量++}void Pop(T *out){_data_sem.P();{LockGuard guard(_cmutex);// 消费信号量--*out = _rq[_c_step++];// 消费数据_c_step %= gcap;// 维持环状特性}_blank_sem.V();// 生产信号量++}private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位置int _p_step;// 消费者Sem _data_sem; // 数据int _c_step;Mutex _cmutex; // 消费者互斥锁Mutex _pmutex; // 生产者互斥锁
};

后言
这就是基于阻塞队列和环形队列的生产者消费者模型。大家自己好好消化!今天就分享到这! 感谢各位的耐心垂阅!咱们下期见!拜拜~


