阻塞队列(BlockingQueue)原理、实现与应用:多线程编程中的核心数据结构
目录
一、BlockingQueue(阻塞队列)
1、基本定义与特性
2、与普通队列的对比
3、底层实现原理
4、模拟实现基于阻塞队列的生产消费模型
1. queue_has_items 条件变量
2. queue_has_space 条件变量
3. 使用 queue 的原因
1. 数据存储与管理
2. 与同步机制配合
3. 通用性和可扩展性
5、在生产者 - 消费者模型中的应用
6、优势与局限性
优势
局限性
7、阻塞队列(BlockQueue)的模板化设计与任务队列应用
1. 设计概述
2. 任务队列的两种实现方式(重点!!!)
(1) 自定义任务类(Task)
3. 关键设计分析
(1) 线程安全机制
(2) 唤醒策略
(3) 模板化的优势
4. 测试用例
二、回顾:生产者消费者模型
1、生产者消费者模型的概念
2、生产者消费者模型的特点
(一)三种关系
(二)两种角色
(三)一个交易场所
3、互斥关系和同步关系的成因分析
(一)互斥关系的成因
(二)同步关系的成因
4、生产者消费者模型的优点
(一)解耦
(二)支持并发
(三)支持忙闲不均
一、BlockingQueue(阻塞队列)
在多线程编程的复杂生态中,阻塞队列(BlockingQueue)是一种极为重要且常用的数据结构,它为解决生产者 - 消费者模型中的同步与协作问题提供了高效且便捷的方案。
1、基本定义与特性
阻塞队列本质上是一种特殊的队列,它遵循先进先出(FIFO)的原则来管理元素。然而,与普通队列相比,阻塞队列具有独特的阻塞行为。在多线程环境下,当队列处于特定状态时,对队列的操作将会引发线程的阻塞。具体表现为:
-
队列为空时:当消费者线程试图从队列中获取元素,而此时队列为空,该获取操作将会被阻塞。消费者线程会进入等待状态,暂停执行,直到有其他线程向队列中放入元素,将其唤醒。
-
队列已满时:当生产者线程尝试向已满的队列中存放元素,该存放操作会被阻塞。生产者线程同样会进入等待状态,直至队列中有元素被取出,腾出空间,生产者线程才会被唤醒并继续执行存放操作。
这种阻塞机制是基于不同线程之间的交互实现的。也就是说,一个线程对阻塞队列的操作可能会因为队列的状态而阻塞,而另一个线程的操作则可能会改变队列状态,从而唤醒被阻塞的线程。

2、与普通队列的对比
普通队列仅仅是一个简单的数据存储结构,它提供了基本的入队(enqueue)和出队(dequeue)操作。在单线程或简单的多线程场景下,普通队列可以满足一定的需求。然而,在复杂的并发环境中,普通队列存在明显的局限性:
-
缺乏同步机制:普通队列没有内置的同步控制,当多个线程同时对队列进行操作时,可能会导致数据不一致的问题。例如,一个线程正在从队列中取出元素,而另一个线程同时向队列中插入元素,这可能会破坏队列的完整性。
-
无法处理空或满的情况:在队列为空时,消费者线程直接进行出队操作会得到无效的数据;在队列已满时,生产者线程的入队操作会导致数据丢失或溢出。普通队列没有提供有效的机制来处理这些特殊情况。
相比之下,阻塞队列通过其阻塞行为和内置的同步机制,很好地解决了这些问题。它确保了在多线程环境下对队列的安全访问,并且能够根据队列的状态自动调整线程的执行流程。
知识关联:阻塞队列的概念让我们自然联想到管道,而其最典型的应用场景正是管道的实现!!!
3、底层实现原理
阻塞队列的底层实现通常依赖于同步机制,如互斥锁(Mutex)和条件变量(Condition Variable)。
-
互斥锁:用于保护队列的内部状态,确保同一时间只有一个线程能够对队列进行修改。例如,当一个线程正在向队列中插入元素时,其他线程无法同时进行插入或删除操作,从而避免了数据竞争。
-
条件变量:用于实现线程的阻塞和唤醒。当队列为空时,消费者线程通过条件变量进入等待状态;当有元素被放入队列时,生产者线程通过条件变量通知消费者线程。同样,当队列已满时,生产者线程等待,消费者线程取出元素后通知生产者线程。
4、模拟实现基于阻塞队列的生产消费模型
以一个简单的基于互斥锁和条件变量实现的阻塞队列为例,其核心代码结构可能如下:
#include <pthread.h>
#include <queue>template<typename T>
class BlockingQueue {
private:std::queue<T> queue;pthread_mutex_t mutex;pthread_cond_t queue_has_items; // 表示队列中有元素可供消费pthread_cond_t queue_has_space; // 表示队列中有空间可存放新元素size_t capacity;public:BlockingQueue(size_t cap) : capacity(cap) {pthread_mutex_init(&mutex, NULL);pthread_cond_init(&queue_has_items, NULL);pthread_cond_init(&queue_has_space, NULL);}~BlockingQueue() {pthread_mutex_destroy(&mutex);pthread_cond_destroy(&queue_has_items);pthread_cond_destroy(&queue_has_space);}void enqueue(T item) {pthread_mutex_lock(&mutex);while (queue.size() == capacity) {// 队列已满,等待队列有空位pthread_cond_wait(&queue_has_space, &mutex);}queue.push(item);// 通知等待的消费者,队列中有新元素pthread_cond_signal(&queue_has_items);pthread_mutex_unlock(&mutex);}T dequeue() {pthread_mutex_lock(&mutex);while (queue.empty()) {// 队列为空,等待队列有元素pthread_cond_wait(&queue_has_items, &mutex);}T item = queue.front();queue.pop();// 通知等待的生产者,队列有空位了pthread_cond_signal(&queue_has_space);pthread_mutex_unlock(&mutex);return item;}
};
在上述代码中,enqueue 方法用于向队列中添加元素。如果队列已满,生产者线程将通过 pthread_cond_wait 函数阻塞,直到队列有空位(由消费者线程取出元素后通过 pthread_cond_signal 通知)。dequeue 方法用于从队列中取出元素,如果队列为空,消费者线程将阻塞,直到队列中有元素。
1. queue_has_items 条件变量
-
当消费者线程尝试从队列中取出元素,但队列为空时,消费者线程会通过
pthread_cond_wait(&queue_has_items, &mutex)函数进入等待状态。 -
此时,queue_has_items 条件变量用于挂起消费者线程,直到队列中有新的元素被放入。
-
当生产者线程向队列中添加元素后,会调用
pthread_cond_signal(¬_empty)来通知等待在 queue_has_items 条件变量上的消费者线程,告知队列不再为空,可以尝试取出元素。
2. queue_has_space 条件变量
-
当生产者线程试图向队列中添加元素,但队列已满时,生产者线程会通过
pthread_cond_wait(&queue_has_space, &mutex)函数进入等待状态。 -
queue_has_space 条件变量用于挂起生产者线程,直到队列中有元素被取出,腾出空间。
-
当消费者线程从队列中取出元素后,会调用
pthread_cond_signal(&queue_has_space)来通知等待在 queue_has_space 条件变量上的生产者线程,告知队列不再已满,可以继续添加元素。
3. 使用 queue 的原因
1. 数据存储与管理
-
顺序存储:
std::queue是一个先进先出(FIFO)的数据结构,它非常适合用于阻塞队列的场景。在生产者 - 消费者模型中,生产者按照一定的顺序生产数据,消费者也希望按照生产者生产数据的顺序来消费数据。std::queue能够很好地满足这种顺序存储和访问的需求。 -
便捷操作:
std::queue提供了简单易用的接口,如push用于将元素添加到队列尾部,pop用于从队列头部移除元素,front用于获取队列头部的元素等。这些操作使得在阻塞队列中存储和检索数据变得非常方便。
2. 与同步机制配合
-
状态判断:在阻塞队列的实现中,需要频繁地判断队列是否为空或已满。
std::queue的empty和size方法可以方便地获取队列的当前状态。例如,在enqueue方法中,通过queue.size() == capacity判断队列是否已满;在dequeue方法中,通过queue.empty()判断队列是否为空。 -
数据一致性:结合互斥锁(
mutex),std::queue可以确保在多线程环境下对队列的操作是线程安全的。互斥锁保护了队列的内部状态,防止多个线程同时修改队列导致数据不一致的问题。
3. 通用性和可扩展性
-
泛型支持:代码中使用了模板类
BlockingQueue<typename T>,std::queue同样支持泛型,可以存储任意类型的数据。这使得BlockingQueue具有很强的通用性,可以应用于不同类型的数据传输和处理的场景。 -
易于扩展:基于
std::queue实现的阻塞队列可以方便地进行扩展和优化。例如,可以根据实际需求调整队列的容量、添加更多的同步机制或优化队列的性能等。
综上,使用 std::queue 作为阻塞队列的底层数据结构,能够充分利用其顺序存储、便捷操作、与同步机制的良好配合以及通用性和可扩展性等优点,从而有效地实现生产者 - 消费者模型中的数据缓冲和线程同步功能。
为便于理解,我们以单生产者单消费者模型为例(其实可以支持多生产者多消费者模型)进行实现。其中BlockQueue作为生产者与消费者的交易场所,可直接使用C++STL中的queue实现。
// 阻塞队列的实现
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>const int defaultcap = 5; // for testtemplate <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}void Equeue(const T &in){pthread_mutex_lock(&_mutex);// 生产者调用while (IsFull()){// 应该让生产者线程进行等待// 重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁!!// 重点2:当线程被唤醒的时候,默认就在临界区内唤醒!要从pthread_cond_wait// 成功返回,需要当前线程,重新申请_mutex锁!!!// 重点3:如果我被唤醒,但是申请锁失败了??我就会在锁上阻塞等待!!!_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;// 问题1: pthread_cond_wait是函数吗?有没有可能失败?pthread_cond_wait立即返回了// 问题2:pthread_cond_wait可能会因为,条件其实不满足,pthread_cond_wait 伪唤醒pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 100%确定:队列有空间_q.push(in);// 临时方案// v2if(_csleep_num>0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}// pthread_cond_signal(&_empty_cond); // 可以pthread_mutex_unlock(&_mutex); // TODO// pthread_cond_signal(&_empty_cond); // 可以}T Pop(){// 消费者调用pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; // 临界资源!!!int _cap; // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};
这段代码实现了一个模板类的阻塞队列(BlockQueue),用于生产者消费者模型中的线程间通信。下面我将详细讲解代码中的各个知识点。
1. 头文件和命名空间
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
-
<iostream>: 提供标准输入输出功能 -
<string>: 提供字符串处理功能 -
<queue>: 提供队列数据结构 -
<pthread.h>: 提供POSIX线程相关功能,包括互斥锁和条件变量
2. 默认容量定义
const int defaultcap = 5; // for test
定义了一个默认的队列容量5,主要用于测试目的。
BlockQueue类实现
1. 私有成员函数
bool IsFull() { return _q.size() >= _cap; }
bool IsEmpty() { return _q.empty(); }
-
IsFull(): 检查队列是否已满 -
IsEmpty(): 检查队列是否为空
2. 构造函数
BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0)
{pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);
}
-
初始化队列容量(
_cap)、消费者休眠数(_csleep_num)和生产者休眠数(_psleep_num) -
初始化互斥锁(
_mutex)和两个条件变量(_full_cond,_empty_cond)
3. 入队操作(Equeue)
void Equeue(const T &in)
{pthread_mutex_lock(&_mutex);// 生产者调用while (IsFull()){_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 100%确定:队列有空间_q.push(in);if(_csleep_num>0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex);
}
关键知识点:
-
互斥锁保护:使用
pthread_mutex_lock获取锁,确保对共享资源(队列)的访问是线程安全的 -
队列满处理:
-
使用
while循环检查队列是否满(防止虚假唤醒) -
如果队列满,增加生产者休眠计数(
_psleep_num) -
调用
pthread_cond_wait使当前线程等待:-
重点1:调用成功时,在挂起线程前会自动释放锁
-
重点2:被唤醒时,默认在临界区内唤醒,需要重新申请锁
-
重点3:如果申请锁失败,会在锁上阻塞等待
-
-
-
数据入队:确认队列有空位后,将数据压入队列
-
唤醒消费者:
-
如果有消费者在等待(通过
_csleep_num判断) -
使用
pthread_cond_signal唤醒一个等待的消费者
-
-
释放锁:最后释放互斥锁
关于使用while而非if判断生产消费条件的必要性:
-
pthread_cond_wait存在调用失败的可能,失败后线程会继续执行后续代码,此时需要重新检查条件
-
在多消费者场景下,使用pthread_cond_broadcast可能同时唤醒多个消费者线程,但实际只有一个数据可供消费,导致伪唤醒问题
-
为确保线程被唤醒后能再次验证条件是否真正满足,必须使用while循环进行条件判断
4. 出队操作(Pop)
T Pop()
{pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}pthread_mutex_unlock(&_mutex);return data;
}
关键知识点:
-
结构与
Equeue对称,但处理的是队列空的情况 -
使用
_csleep_num记录等待的消费者数量 -
当取出数据后,如果有生产者在等待,唤醒一个生产者
5. 析构函数
~BlockQueue()
{pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);
}
-
销毁互斥锁和条件变量,释放系统资源
6. 私有成员变量
std::queue<T> _q; // 临界资源!!!
int _cap; // 容量大小pthread_mutex_t _mutex;
pthread_cond_t _full_cond;
pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数
int _psleep_num; // 生产者休眠的个数
-
_q: 存储数据的队列(临界资源) -
_cap: 队列最大容量 -
_mutex: 互斥锁,保护对队列的访问 -
_full_cond: 条件变量,用于队列满时的生产者等待 -
_empty_cond: 条件变量,用于队列空时的消费者等待 -
_csleep_num: 记录等待的消费者数量 -
_psleep_num: 记录等待的生产者数量
代码中的关键问题和注意事项
1. 条件变量使用注意事项
// 问题1: pthread_cond_wait是函数吗?有没有可能失败?
// 问题2:pthread_cond_wait可能会因为条件其实不满足而伪唤醒
pthread_cond_wait(&_full_cond, &_mutex);
-
pthread_cond_wait确实是一个函数,可能会失败(如被信号中断) -
存在"伪唤醒"(spurious wakeup)问题,即使没有显式唤醒,线程也可能被唤醒
-
解决方案:使用
while循环而不是if来检查条件,可以处理伪唤醒
伪唤醒的完整示例:
初始状态
-
队列为空,消费者
C1在_empty_cond上等待。 -
生产者
P1准备添加数据。
伪唤醒流程:
-
C1调用Dequeue,发现队列为空,进入pthread_cond_wait(&_empty_cond, &_mutex)。 -
操作系统错误地唤醒
C1(伪唤醒):-
没有线程调用
pthread_cond_signal。 -
C1从pthread_cond_wait返回,检查if (_q.empty()):队列仍然为空,但if不会重新检查,直接执行pop()→ 崩溃或数据错误。
-
-
如果用
while:C1被伪唤醒后,while (_q.empty())会重新检查,发现队列仍为空,再次进入等待。
2. 锁的释放时机
pthread_mutex_unlock(&_mutex); // TODO
// pthread_cond_signal(&_empty_cond); // 可以
-
注释显示这里曾经考虑过在解锁前后唤醒线程的问题
-
最佳实践是在持有锁的时候发出信号(
pthread_cond_signal),这样可以确保被唤醒的线程在尝试获取锁时,锁的状态是一致的
3. 唤醒策略
if(_csleep_num>0)
{pthread_cond_signal(&_empty_cond);
}
-
使用
pthread_cond_signal而不是pthread_cond_broadcast,只唤醒一个等待线程 -
这样可以减少不必要的上下文切换,提高效率
-
通过计数器(
_csleep_num,_psleep_num)来优化,只在有线程等待时才发出信号
代码优化建议
-
错误处理:当前代码没有检查
pthread_*函数的返回值,实际应用中应该添加错误处理 -
性能优化:可以考虑使用
pthread_cond_broadcast在某些情况下唤醒所有等待线程 -
资源管理:可以使用RAII技术管理互斥锁和条件变量,确保异常安全
-
更精确的唤醒:当前使用计数器判断是否需要唤醒,可以考虑更精确的等待线程管理
总结:这段代码实现了一个线程安全的阻塞队列,关键点在于:
-
使用互斥锁保护共享资源(队列)
-
使用两个条件变量分别处理队列满和队列空的情况
-
通过计数器优化唤醒操作,避免不必要的信号发送
-
正确处理了条件变量的等待和唤醒流程
这种实现是生产者消费者模型的典型应用,能够有效协调生产者和消费者的速度差异,避免忙等待,提高系统效率。
相关说明:
本实现采用单生产者单消费者的生产者-消费者模型,因此无需处理生产者间或消费者间的关系,只需确保生产者与消费者之间的同步与互斥关系。
实现要点:
-
将BlockingQueue设计为模板类,提升代码复用性
-
设置阻塞队列容量上限为5,达到上限时生产者将自动阻塞
-
使用互斥锁保护阻塞队列,防止生产者和消费者同时访问造成冲突
线程控制机制:
-
生产者线程在队列未满时可执行Push操作,队列已满时自动阻塞等待
-
消费者线程在队列非空时可执行Pop操作,队列为空时自动阻塞等待
-
采用两个条件变量实现线程调度:
-
_empty_cond条件变量:标识队列为空状态
-
_full_cond条件变量:标识队列已满状态
-
互斥锁管理:
-
线程进入临界区前必须先获取锁
-
若条件不满足,线程通过pthread_cond_wait自动释放锁并挂起
-
被唤醒时线程会自动重新获取锁
线程唤醒机制:
-
生产者完成操作后唤醒等待_empty_cond条件的消费者线程
-
消费者完成操作后唤醒等待_full_cond条件的生产者线程
5、在生产者 - 消费者模型中的应用
-
在生产者 - 消费者模型中,阻塞队列作为中间缓冲区(在内存中开辟空间),起到了关键的解耦作用。
-
生产者线程将生产好的数据放入阻塞队列,而不需要关心消费者线程何时消费这些数据;
-
消费者线程从阻塞队列中取出数据进行处理,也不需要直接与生产者线程进行交互。
-
例如,在一个日志处理系统中,多个生产者线程负责收集不同来源的日志信息,并将这些信息放入阻塞队列。
-
而消费者线程则从阻塞队列中取出日志信息进行存储或分析。
-
通过使用阻塞队列,系统能够有效地平衡生产者和消费者之间的处理速度差异,避免数据丢失或消费者线程的空闲等待。
6、优势与局限性
优势
-
简化多线程编程:阻塞队列提供了一种简单而有效的方式来实现线程间的同步和通信,减少了开发人员手动处理同步逻辑的复杂性。
-
提高系统性能:通过缓冲数据和自动阻塞/唤醒机制,阻塞队列能够平衡生产者和消费者的处理速度,提高系统的整体吞吐量。
-
增强系统的稳定性:避免了因数据竞争和空/满队列处理不当而导致的系统崩溃或数据错误。
局限性
-
性能开销:由于依赖于互斥锁和条件变量等同步机制,阻塞队列在高性能要求的场景下可能会引入一定的性能开销。
-
容量限制:阻塞队列通常有固定的容量,如果生产者的生产速度持续超过消费者的消费速度,可能会导致队列满后生产者长时间阻塞,影响系统的响应时间。
总之,阻塞队列作为多线程编程中的重要数据结构,在生产者 - 消费者模型中发挥着不可替代的作用。理解其原理和应用,对于开发高效、稳定的多线程程序具有重要意义。
7、阻塞队列(BlockQueue)的模板化设计与任务队列应用
1. 设计概述
本实现是一个线程安全的阻塞队列,支持多生产者和多消费者模型。其核心特点包括:
-
模板化设计:支持任意数据类型(如
int、自定义类、std::function<void()>等)。 -
线程同步:通过
pthread_mutex_t和pthread_cond_t实现互斥和条件等待。 -
灵活的任务类型:
-
自定义任务类(如
Task,封装计算逻辑和结果)。 -
std::function<void()>(通用函数对象,适合lambda或函数指针)。
-
2. 任务队列的两种实现方式(重点!!!)
(1) 自定义任务类(Task)
class Task {
public:Task(int a, int b) : _a(a), _b(b), _result(0) {}void Execute() {_result = _a + _b; // 模拟计算任务}std::string ResultToString() {return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);}std::string DebugToString() {return std::to_string(_a) + "+" + std::to_string(_b) + "=?";}private:int _a, _b, _result;
};// 使用示例
BlockQueue<Task> taskQueue;
(2) 通用函数任务(std::function<void()>)
using Task = std::function<void()>;// 使用示例
BlockQueue<Task> funcQueue;
funcQueue.Equeue([]() { std::cout << "Lambda task executed!" << std::endl; });void Download()
{std::cout << "我是一个下载任务..." << std::endl;sleep(3); // 假设处理任务比较耗时
}
3. 关键设计分析
(1) 线程安全机制
-
互斥锁(
_mutex):保护队列的并发访问。 -
条件变量(
_full_cond和_empty_cond):-
生产者等待队列非满(
_full_cond)。 -
消费者等待队列非空(
_empty_cond)。
-
-
伪唤醒处理:通过
while循环重新检查条件,避免虚假唤醒。
(2) 唤醒策略
-
当前使用
pthread_cond_signal唤醒单个等待线程。 -
优化建议:若需唤醒所有等待线程(如批量生产数据),可改用
pthread_cond_broadcast。
(3) 模板化的优势
-
灵活性:支持内置类型(如
int)、自定义类(如Task)和函数对象(如std::function)。 -
性能:避免虚函数开销(相比基类-派生类设计)。
4. 测试用例
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>void *consumer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){sleep(10);// 1. 消费任务task_t t = bq->Pop();// 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了t();}
}void *productor(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){// 1. 获得任务//std::cout << "生产了一个任务" << x << "+" << y << "=?" << std::endl;std::cout << "生产了一个任务" << std::endl;// 2. 生产任务bq->Equeue(Download);}
}int main()
{// 扩展认识: 阻塞队列: 可以放任务吗?// 申请阻塞队列BlockQueue<task_t> *bq = new BlockQueue<task_t>();// 构建生产和消费者pthread_t c[2], p[3];pthread_create(c, nullptr, consumer, bq);pthread_create(c+1, nullptr, consumer, bq);pthread_create(p, nullptr, productor, bq);pthread_create(p+1, nullptr, productor, bq);pthread_create(p+2, nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}// #include "BlockQueue.hpp"
// #include "Task.hpp"
// #include <iostream>
// #include <pthread.h>
// #include <unistd.h>// void *consumer(void *args)
// {
// BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);// while (true)
// {
// sleep(1);
// Task t = bq->Pop();// t.Execute();// std::cout << "消费了一个任务" << t.X() << "+" << t.Y() << "=" << t.Result() << std::endl;
// }
// }// void *productor(void *args)
// {
// int x = 1;
// int y = 1;
// BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
// while (true)
// {
// // sleep(1);
// std::cout << "生产了一个任务" << x << "+" << y << "=?" << std::endl;
// Task t(x, y);
// bq->Equeue(t);// x++, y++;
// }
// }// int main()
// {
// // 扩展认识: 阻塞队列: 可以放任务吗?
// // 申请阻塞队列
// BlockQueue<Task> *bq = new BlockQueue<Task>();// // 构建生产和消费者
// pthread_t c, p;// pthread_create(&c, nullptr, consumer, bq);
// pthread_create(&p, nullptr, productor, bq);// pthread_join(c, nullptr);
// pthread_join(p, nullptr);// return 0;
// }
二、回顾:生产者消费者模型
1、生产者消费者模型的概念
-
生产者消费者模型是一种经典的软件设计模式,其核心思想是通过引入一个中间容器(缓冲区),实现生产者与消费者之间的解耦。
-
在该模型中,生产者和消费者不直接进行通信,而是通过共享的容器来间接交互。
-
生产者完成数据生产后,无需等待消费者处理,直接将数据存入容器;消费者则从容器中获取所需数据进行处理,无需主动向生产者请求数据。
-
这个中间容器起到了缓冲区的关键作用,它能够有效平衡生产者和消费者在处理能力上的差异。
-
当生产者速度较快、消费者速度较慢时,容器可以暂时存储多余的数据,避免生产者因等待消费者而阻塞;反之,当消费者速度较快、生产者速度较慢时,容器可以为消费者提供数据,防止消费者因无数据可处理而闲置。
-
通过这种方式,生产者消费者模型实现了生产者和消费者之间的松耦合,提高了系统的灵活性和可扩展性。

2、生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的典型应用场景,具有以下显著特点:
(一)三种关系
-
生产者和生产者(互斥关系):多个生产者线程可能同时向共享容器中写入数据,为了避免数据混乱和冲突,需要确保同一时间只有一个生产者能够访问容器进行写入操作,因此生产者之间存在互斥关系。
-
消费者和消费者(互斥关系):类似地,多个消费者线程可能同时从共享容器中读取数据,为了保证数据读取的正确性和一致性,同一时间只能有一个消费者从容器中获取数据,所以消费者之间也存在互斥关系。
-
生产者和消费者(互斥关系、同步关系):生产者和消费者共享同一个容器,因此它们在访问容器时也存在互斥关系,以防止数据的不一致。同时,生产者和消费者之间还需要同步关系,即生产者的生产速度和消费者的消费速度需要相互协调,避免出现容器满时生产者继续生产导致数据丢失,或者容器空时消费者继续消费导致无效操作的情况。
(二)两种角色
-
生产者和消费者是该模型中的两个核心角色,通常由进程或线程来承担。
-
生产者负责生成数据并将其放入共享容器,消费者则从共享容器中取出数据进行处理。
-
这两个角色的分工明确,各自专注于自身的任务,通过共享容器实现数据的传递和交互。
(三)一个交易场所
-
通常指的是内存中的一段缓冲区,它是生产者和消费者进行数据交换的场所。
-
这个缓冲区可以通过各种方式组织起来,例如使用队列、栈等数据结构。
-
在实现生产者消费者模型时,核心任务之一就是对这个缓冲区进行合理的管理和维护,确保生产者和消费者能够正确、高效地访问其中的数据。
在编写生产者消费者模型的代码时,本质上就是对上述三种关系进行精确的维护和实现,以保证模型能够正常运行并发挥其优势。
3、互斥关系和同步关系的成因分析
(一)互斥关系的成因
-
介于生产者和消费者之间的容器作为共享资源,可能会被多个执行流(生产者线程或消费者线程)同时访问。
-
例如,多个生产者可能同时尝试向容器中写入数据,多个消费者可能同时尝试从容器中读取数据。这种同时访问的情况如果不加以控制,就会导致数据的不一致和错误。
-
因此,我们需要将该临界资源(共享容器)用互斥锁保护起来,确保同一时间只有一个线程能够访问容器。
-
在这个过程中,所有的生产者和消费者都会竞争式地申请锁,这就导致了生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
(二)同步关系的成因
-
如果让生产者一直不停地生产数据,而不考虑容器的容量限制,那么当容器被数据塞满后,生产者继续生产数据就会导致生产失败,甚至可能引发数据丢失等问题。
-
反之,如果让消费者一直不停地消费数据,而不考虑容器中是否还有数据,那么当容器中的数据被消费完后,消费者继续消费就会陷入无效操作,浪费系统资源。
-
虽然这种情况不会直接造成数据不一致的问题,但会引起另一方的饥饿问题,即生产者因容器满而无法生产,消费者因容器空而无法消费,导致系统整体效率低下。
为了避免这种情况的发生,我们应该让生产者和消费者访问容器时具有一定的顺序性。例如,可以采用信号量等同步机制,让生产者在容器未满时生产数据,消费者在容器不为空时消费数据,从而实现生产者和消费者之间的协同工作。
需要注意的是,互斥关系和同步关系在生产者消费者模型中扮演着不同的角色。
-
互斥关系主要保证数据的正确性和一致性,防止多个线程同时访问共享资源导致的数据冲突;
-
而同步关系则是为了让多线程之间能够协同工作,提高系统的整体效率和性能。
4、生产者消费者模型的优点
(一)解耦
-
在传统的函数调用方式中,如果在主函数中调用某一函数,必须等待该函数体执行完毕后才能继续执行主函数的后续代码,这种调用方式本质上是一种紧耦合。
-
而在生产者消费者模型中,函数传参可以看作是生产者生产的过程,执行函数体则是消费者消费的过程。
-
生产者只负责生成数据并将其放入共享容器,消费者只负责从共享容器中取出数据进行处理,在消费者消费期间,生产者可以同时进行生产,两者之间相互独立,互不干扰。
-
因此,生产者消费者模型实现了生产者和消费者之间的松耦合,提高了系统的灵活性和可维护性。
(二)支持并发
-
生产者消费者模型允许多个生产者和多个消费者同时运行,充分利用了多核处理器的优势,提高了系统的并发处理能力。
-
通过合理的同步和互斥机制,可以确保多个线程在访问共享资源时的正确性和一致性,从而实现高效的并发执行。
(三)支持忙闲不均
-
在实际应用中,生产者和消费者的处理速度往往是不均衡的。
-
生产者可能因为某些原因(如数据源的供应速度)而生产速度较快或较慢,消费者也可能因为处理复杂度不同而消费速度有快有慢。
-
生产者消费者模型通过共享容器的缓冲作用,能够很好地适应这种忙闲不均的情况。
-
当生产者速度较快时,多余的数据可以存储在容器中,等待消费者慢慢处理;当消费者速度较快时,可以从容器中快速获取数据进行处理,避免了生产者或消费者的闲置,提高了系统的资源利用率。
综上所述,生产者消费者模型凭借其解耦、支持并发和支持忙闲不均等优点,在多线程编程、分布式系统等领域得到了广泛的应用。通过合理设计和实现生产者消费者模型,可以有效提高系统的性能、可靠性和可扩展性。
