【Linux线程】阻塞队列环形队列(消费者生产者模型的实现)
目录
前言
1. 阻塞队列
2. 环形队列
总结
前言
了解了线程控制、同步与互斥、以及消费者生产者模型,本篇文章为实践篇,对以上内容的实践,使用阻塞队列和环形队列来实现生产者消费者模型;
1. 阻塞队列
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出;
模型整体结构:
一个生产线程,一个消费线程,生产线程负责在队列中生产,一个线程负责去取进行消费;
- 队列满的时候,生产线程发生阻塞,不再生产
- 队列为空时,消费线程发生阻塞,不再拿任务;
在这个体系中要理清楚:
由谁来通知生产线程和消费线程? 谁来唤醒线程?
消费者生产者体系中只有生产者知道什么时候需要消费,只有消费者知道什么时候生产;
所以唤醒线程只能互相唤醒
整体结构较为简单,借助数据结构Queue来实现:
在此之前,可以依据RAII的思想对mutex进行封装,不需要手动的添加解锁(也可以使用C++库在的锁):
#pragma once
#include <pthread.h>
class mutex
{
public:
mutex(pthread_mutex_t* lock)
:_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void UnLock()
{
pthread_mutex_unlock(_lock);
}
~mutex()
{}
private:
pthread_mutex_t* _lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t* lock)
:_mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.UnLock();
}
private:
mutex _mutex;
};
阻塞队列成员设计:
const int defaultcap = 5;
template<class T>
class BlockQueue
{
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond;
pthread_cond_t _c_cond;
};
容量设置阻塞队列的大小,锁控制线程安全、两个条件变量用于控制生产线程与消费线程的同步;
核心接口也就只有两个:Push(生产)、Pop(消费);
void Push(const T& in) //生产者
{
LockGuard lockguard(&_mutex);
while (IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
// 有数据了可以唤醒消费者线程来进行消费
pthread_cond_signal(&_c_cond);
}
void Pop(T* out) //消费者
{
LockGuard lockguard(&_mutex);
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
// 唤醒生产者
pthread_cond_signal(&_p_cond); //唤醒放在释放锁的前边后边都可以
}
整体逻辑:
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"
const int defaultcap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultcap)
:_capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T& in) //生产者
{
LockGuard lockguard(&_mutex);
//pthread_mutex_lock(&_mutex);
while (IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
// if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond); //达到生产水平线就唤醒线程
pthread_cond_signal(&_c_cond);//唤醒线程时如果线程本来就醒着,不会有什么影响
//pthread_mutex_unlock(&_mutex);
}
void Pop(T* out) //消费者
{
LockGuard lockguard(&_mutex);
//pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
//消费者生产者体系中,只有生产者知道什么时候需要消费
//只有消费者知道什么时候生产
// if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);
pthread_cond_signal(&_p_cond);//唤醒放在释放锁的前边后边都可以
// 在锁内唤醒,线程不会在条件变量上等了,转而会到阻塞到申请锁的队列
//pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond;
pthread_cond_t _c_cond;
// int _consumer_water_line; // _consumer_water_line = _capacity / 3 * 2
// int _productor_water_line; // _productor_water_line = _capacity / 3
};
void *consumer(void *arg)
{
BlockQueue<int> *bqp = (BlockQueue<int> *)arg;
int data;
for (;;)
{
bqp->Pop(&data);
std::cout << "Consume data done : " << data << std::endl;
}
}
// more faster
void *producter(void *arg)
{
BlockQueue<int> *bqp = (BlockQueue<int> *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
bqp->Push(data);
std::cout << "Prodoct data done: " << data << std::endl;
sleep(1);
}
}
int main()
{
BlockQueue<int> bq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&bq);
pthread_create(&p, NULL, producter, (void *)&bq);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
2. 环形队列
阻塞队列的方法也有部分欠缺,比如:把资源放到队列中,使用互斥锁一次就只能一个线程访问这个队列;这也会降低效率;这时就可以使用信号量来解决;
使用互斥锁时,把队列看成一个整体来访问,使用信号量可以完美解决 多个线程同时访问队列的不同区域;
基于信号量实现环形队列:
- 信号量本质就是一个计数器
- 申请信号量本质是预定资源
- PV操作是原子性的
场景分析:
- 生产者打满这个队列后就不能再生产了(会覆盖原有资源)
- 消费者不能超过生产者
生产者和消费者指向同一位置的两种情况:
- 队列为空(只能让生产者跑)
- 队列为满(只能让消费者跑)
也就是说我们需要局部维持互斥和同步
对资源的描述与认识:空间-->p,数据-->d
所以我们需要两个信号量:
- p: _space_sem N
- c: _data_sem 0
生产者生产:
P(_space_sem)// _space_sem 减减
// 生产数据/任务
V(_data_sem)// _data_sem 加加
消费者消费:
P(_data_sem)// _data_sem 减减
// 生产数据/任务
V(_space_sem )// _space_sem 加加
完整代码:
#include <pthread.h>
#include <ctime>
#include <unistd.h>
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#define NUM 16
class RingQueue
{
private:
std::vector<int> q;
int cap; // 队列容量
sem_t data_sem; // 数据的数量
sem_t space_sem; // 空余空间数量
int consume_step; // 消费偏移量
int product_step; // 生产偏移量
public:
RingQueue(int _cap = NUM) : q(_cap), cap(_cap)
{
sem_init(&data_sem, 0, 0);
sem_init(&space_sem, 0, cap);
consume_step = 0;
product_step = 0;
}
// 生产
void PutData(const int &data)
{
sem_wait(&space_sem); // P
q[consume_step] = data;
consume_step++;
consume_step %= cap;
sem_post(&data_sem); // V
}
// 消费
void GetData(int &data)
{
sem_wait(&data_sem);
data = q[product_step];
product_step++;
product_step %= cap;
sem_post(&space_sem);
}
~RingQueue()
{
sem_destroy(&data_sem);
sem_destroy(&space_sem);
}
};
void *consumer(void *arg)
{
RingQueue *rqp = (RingQueue *)arg;
int data;
for (;;)
{
rqp->GetData(data);
std::cout << "Consume data done : " << data << std::endl;
sleep(1);
}
}
void *producter(void *arg)
{
RingQueue *rqp = (RingQueue *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
rqp->PutData(data);
std::cout << "Prodoct data done: " << data << std::endl;
// sleep(1);
}
}
int main()
{
RingQueue rq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&rq);
pthread_create(&p, NULL, producter, (void *)&rq);
pthread_join(c, NULL);
pthread_join(p, NULL);
}
这是一个简易版本的环形队列,如对信号量或线程控制不太熟悉的伙伴,可以阅读我的这篇文章:
【Linux线程】线程互斥与同步
【Linux线程】线程控制
总结
以上便是本文的全部内容,希望对你有所帮助,感谢阅读!