Linux:基于阻塞队列的生产者消费模型
文章目录
- 一、生产者消费者模型的基本原则
- 💕💕生产者-消费者模型的 321 原则💕💕
- 二、为何要使用生产者消费者模型
- 1. 解耦
- 2. 支持并发 (提高效率)
- 3. 忙闲不均的支持
- 三、基于 BlockingQueue 的生产者消费者模型
- 1. 阻塞队列的特点
- 2. C++ 模拟实现
- 3. 封装更精细的版本
- 五、总结
一、生产者消费者模型的基本原则
在多线程编程中,生产者消费者模型是一种常见的并发设计模式。它通过一个**缓冲区(阻塞队列)**来解耦生产者和消费者,从而解决两者之间的强耦合问题。生产者只需要负责将数据放入队列,而消费者只需要负责从队列中取数据。
这样设计的好处在于,生产者在完成数据生成后无需等待消费者处理,可以立即返回继续生成;而消费者也无需关心数据来自哪里,只需要从队列里取出任务并处理即可。这种方式既保证了系统的高效性,也增强了并发性。
可以把 321 原则用层次化结构来表达,比表格更直观:
💕💕生产者-消费者模型的 321 原则💕💕
3 个关系
- 生产者之间互斥:多个生产者不能同时写入队列,否则会破坏数据一致性。
- 消费者之间互斥:多个消费者不能同时读取同一数据,否则会出现重复消费。
- 生产者与消费者之间互斥与同步:队列为空时消费者等待,队列满时生产者等待,二者既互斥又必须协同。
2 个角色
- 生产者:负责不断产生数据并放入队列。
- 消费者:负责从队列中取出数据并进行处理。
1 个交易场所
阻塞队列 / 环形队列:作为共享缓冲区,承载生产与消费的衔接。
二、为何要使用生产者消费者模型
1. 解耦
生产者和消费者之间没有直接依赖关系,它们通过阻塞队列完成数据交互,降低了耦合度。
2. 支持并发 (提高效率)
生产者和消费者可以并发执行,充分利用 CPU 资源。这里提高效率并不是说生产者消费模型可以更快的派发任务,而是通过一个串行的交易场所,可以将任务派发给不同的线程,也可以由不同的线程同时生产,在生产者和消费者自己之间可以做到并发执行,因此提高了效率,毕竟将来派发任务只占有一点点的份额,执行任务才是大头。
3. 忙闲不均的支持
如果生产者生成速度快于消费者处理速度,多余的数据会存放在阻塞队列中;反之,消费者可以在数据不足时自动等待。这样有效平衡了生产和消费之间的速度差异。
三、基于 BlockingQueue 的生产者消费者模型
1. 阻塞队列的特点
在普通队列中,如果取数据时队列为空会直接返回错误,而阻塞队列则会让取数据的线程阻塞等待直到有数据为止;同样地,如果存放数据时队列已满,线程也会被阻塞,直到有空余位置。这种机制天生适合生产者消费者模型。
2. C++ 模拟实现
BlockQueue.hpp
// 阻塞队列的实现
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>const int defaultcap = 5; // for testtemplate <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap) : _cap(cap), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex, NULL);pthread_cond_init(&_full_cond, NULL);pthread_cond_init(&_empty_cond, NULL);}void Equeue(const T &in){pthread_mutex_lock(&_mutex);while (IsFull()){_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 走到这里一定有空间了_q.push(in);if (_csleep_num > 0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex);}T Pop(){// 消费者调用pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if (_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q;int _cap;pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};
3. 封装更精细的版本
Cond.hpp
// 阻塞队列的实现
#pragma once#include <iostream>
#include <string>
#include <queue>
#include "Mutex.hpp"
#include "Cond.hpp"const int defaultcap = 10; // for testusing namespace MutexModule;
using namespace CondModule;template <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0){}void Equeue(const T &in){{LockGuard lockguard(_mutex);// 生产者调用while (IsFull()){// 应该让生产者线程进行等待// 重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁!!// 重点2:当线程被唤醒的时候,默认就在临界区内唤醒!要从pthread_cond_wait// 成功返回,需要当前线程,重新申请_mutex锁!!!// 重点3:如果我被唤醒,但是申请锁失败了??我就会在锁上阻塞等待!!!_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;// 问题1: pthread_cond_wait是函数吗?有没有可能失败?pthread_cond_wait立即返回了// 问题2:pthread_cond_wait可能会因为,条件其实不满足,pthread_cond_wait 伪唤醒_full_cond.Wait(_mutex);_psleep_num--;}// 100%确定:队列有空间_q.push(in);// 临时方案// v2if (_csleep_num > 0){_empty_cond.Signal();std::cout << "唤醒消费者..." << std::endl;}}}T Pop(){T data;{// 消费者调用LockGuard lockguard(_mutex);while (IsEmpty()){_csleep_num++;_empty_cond.Wait(_mutex);_csleep_num--;}data = _q.front();_q.pop();if (_psleep_num > 0){_full_cond.Signal();std::cout << "唤醒生产者" << std::endl;}}return data;}~BlockQueue(){}private:std::queue<T> _q; // 临界资源!!!int _cap; // 容量大小Mutex _mutex;Cond _full_cond;Cond _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};
五、总结
生产者消费者模型是一种经典的多线程设计模式。它利用阻塞队列实现生产与消费的解耦,不仅提高了系统的并发能力,还能平衡两者之间的处理速度差异。在 C++ 中,我们可以通过 queue + pthread + 条件变量
实现一个简易的阻塞队列,再结合任务封装,实现灵活的生产者消费者模型。
未来在实际工程中,如果使用 C++11 及以上版本,可以考虑用 std::mutex
和 std::condition_variable
替代 pthread
,写法会更简洁。