【Linux系统】生产者消费者模型:基于阻塞队列 BlockingQueue
线程安全的阻塞队列(Blocking Queue)允许生产者和消费者在多线程环境下安全地共享数据。下面我将逐步剖析阻塞队列代码的编写逻辑,并讲解如何编写一个阻塞队列。
1. 基本概念与需求
- 阻塞队列是一种特殊的队列,当队列满时,生产者线程会被阻塞直到有空间可用;当队列空时,消费者线程会被阻塞直到有数据可用。
- 线程安全是关键:多个线程同时访问队列时,必须确保数据一致性。
- 使用条件变量(
Cond
)和互斥锁(Mutex
)来实现线程间的同步和通信。(注意本文使用的 条件变量(Cond
)和互斥锁(Mutex
)都是我自己封装过的,完整代码放到文末)
2. 代码结构分析
头文件保护
#ifndef _BLOCKING_QUEUE_HPP_
#define _BLOCKING_QUEUE_HPP_
- 防止头文件重复包含,避免编译错误。
引入必要的库
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"
<iostream>
:用于调试输出。<queue>
:标准库中的队列容器,用于存储数据。<pthread.h>
:POSIX线程相关函数(假设Cond
和Mutex
是基于此实现的)。"Cond.hpp"
和"Mutex.hpp"
:封装了条件变量和互斥锁的功能模块。
命名空间
namespace BlockingQueueModule {
template<typename T>
class BlockQueue { ... };
}
- 将阻塞队列封装在命名空间
BlockingQueueModule
中,避免命名冲突。
3. 类定义与成员变量
类模板定义
template<typename T>
class BlockQueue {
public:
BlockQueue(int cap = 5) : _size(0), _cap(cap) {}
~BlockQueue() {}
...
private:
std::queue<T> _bq; // 存储数据的队列
int _size; // 队列中数据的个数
int _cap; // 队列容量
Cond _producer_cond; // 生产者条件变量
Cond _consumer_cond; // 消费者条件变量
Mutex _mutex; // 互斥锁
int producer_count; // 等待的生产者数量
int consumer_count; // 等待的消费者数量
};
- 模板参数
T
:支持任意类型的元素。 - 构造函数:
- 初始化
_size
为 0,表示队列初始为空。 - 设置队列容量
_cap
,默认值为 5。
- 初始化
- 成员变量:
_bq
:底层存储数据的标准队列。_size
和_cap
:分别表示当前队列大小和最大容量。_producer_cond
和_consumer_cond
:用于生产者和消费者的等待/唤醒机制。_mutex
:保护对共享资源的访问。producer_count
和consumer_count
:记录当前等待的生产者和消费者数量。
4. 核心功能实现
生产者方法:Equeue(T& data)
void Equeue(T& data)
{
_mutex.lock(); // 加锁,确保线程安全
while (IsFull()) { // 使用 while 而不是 if,防止伪唤醒问题
producer_count++;
std::cout << "queue is full, 生产者进入阻塞" << '\n';
_producer_cond.Wait(_mutex); // 如果队列满,生产者线程阻塞
std::cout << "生产者被唤醒" << '\n';
producer_count--;
}
_bq.push(data); // 将数据加入队列
_size++;
if (consumer_count) { // 如果有消费者在等待
std::cout << "唤醒消费者" << '\n';
_consumer_cond.NotifyOne(); // 唤醒一个消费者线程
}
_mutex.unlock(); // 解锁
}
- 加锁:使用
_mutex.lock()
确保只有一个线程可以修改队列。 - 检查队列是否已满:
- 如果队列已满(
IsFull()
返回true
),调用_producer_cond.Wait(_mutex)
,让当前生产者线程进入等待状态。 - 当队列中有空闲空间时,条件变量会唤醒该线程。
- 如果队列已满(
- 插入数据:将数据压入队列,并更新队列大小。
- 唤醒消费者:如果当前有消费者在等待(
consumer_count > 0
),调用_consumer_cond.NotifyOne()
唤醒其中一个消费者线程。 - 解锁:释放锁,允许其他线程访问队列。
消费者方法:Pop()
T Pop()
{
_mutex.lock(); // 加锁
while (IsEmpty()) { // 使用 while 而不是 if,防止伪唤醒问题
consumer_count++;
std::cout << "queue is empty, 消费者进入阻塞" << '\n';
_consumer_cond.Wait(_mutex); // 如果队列空,消费者线程阻塞
std::cout << "消费者被唤醒" << '\n';
consumer_count--;
}
T data = _bq.back(); // 获取队列尾部的数据
_bq.pop(); // 移除队列尾部的数据
_size--;
if (producer_count) { // 如果有生产者在等待
std::cout << "唤醒生产者" << '\n';
_producer_cond.NotifyOne(); // 唤醒一个生产者线程
}
_mutex.unlock(); // 解锁
return data;
}
- 加锁:确保线程安全。
- 检查队列是否为空:
- 如果队列为空(
IsEmpty()
返回true
),调用_consumer_cond.Wait(_mutex)
,让当前消费者线程进入等待状态。 - 当队列中有数据时,条件变量会唤醒该线程。
- 如果队列为空(
- 获取数据:从队列尾部取出数据并移除。
- 唤醒生产者:如果当前有生产者在等待(
producer_count > 0
),调用_producer_cond.NotifyOne()
唤醒其中一个生产者线程。 - 解锁:释放锁,允许其他线程访问队列。
辅助方法
bool IsEmpty() { return _bq.empty(); }
bool IsFull() { return _size == _cap; }
IsEmpty()
:判断队列是否为空。IsFull()
:判断队列是否已满。
5. 设计细节与注意事项
为什么需要 if (consumer_count)
或 if (producer_count)
?
- 条件变量的
NotifyOne()
方法只会唤醒一个等待的线程。 - 如果直接调用
NotifyOne()
而不检查是否有等待线程,可能会导致不必要的唤醒,浪费系统资源。 - 因此,只有在确实有等待线程时才调用
NotifyOne()
。
为什么使用 while
而不是 if
?
-
在多线程环境中,可能存在虚假唤醒(spurious wakeup)问题,也就是伪唤醒。
-
具体来说:一个线程被唤醒,能够继续往下消费或生产一个物品,需要满足两个条件:1、获取到互斥锁;2、当前队列不为极端情况(不是满的或不是空的)
如果一个消费者线程被唤醒,它获取到了互斥锁,但此时队列里为空,它还继续往下执行代码消费物品,岂不是出错?
这种在条件不满足(为满或为空,线程不能生产或消费)的情况下被唤醒,叫做伪唤醒!!!
因此需要将 if 该成 while:竞争失败了就应该继续进入 wait 状态
条件变量休眠 pthread_cond_wait(&cond, &lock)
- 需要传入当前线程持有的锁,因为线程在进入条件变量睡眠时,是持有锁的!
- 线程一般是在线程内部的临界区进行条件变量休眠的。
- 如果让一个线程带着锁休眠,就会导致死锁或资源竞争问题。因此需要传入锁,让
pthread_cond_wait
自动释放锁。
问题:线程的 pthread_cond_wait
醒来时,若此时互斥锁获取失败怎么办?
- 如果线程从
pthread_cond_wait
中醒来但无法获取锁,则该线程会被放入锁的等待队列中。 - 等待队列中的线程会一直等到锁可用为止,然后继续执行后续逻辑。
线程唤醒逻辑放在释放锁之前还是之后?
- 放在释放锁之前:
- 唤醒线程后,虽然当前线程仍然持有锁,但被唤醒的线程会自动尝试获取锁。
- 如果被唤醒的线程暂时无法获取锁,它会被放入锁的等待队列中,不会影响系统的正常运行。
- 放在释放锁之后:
- 如果在释放锁之后才唤醒其他线程,可能导致多个线程同时争抢锁。
- 对于消费者线程来说,可能会导致所有消费者线程迅速消耗完队列中的资源,最终使队列变空。
- 对于生产者线程来说,可能会导致队列快速填满,从而引发阻塞。
- 将唤醒逻辑放在释放锁之前通常更好,因为它避免了不必要的锁争抢,同时保证了线程间的公平性。
如何实现多生产者多消费者?
- 直接创建多个生产者和消费者线程即可。
- 在生产者和生产者之间,在消费者和消费者之间的关系是互斥的,而我们线程之间对于同一个临界区用的是同一把锁,因此天然地就具有一种互斥关系!
为什么定义多生产者时会出现生产相同数据的情况?
- 因为每个生产者线程都有自己的栈空间,因此它们可以独立生成数据。
- 如果多个生产者线程生成的数据来源相同(例如,基于相同的初始值或算法),则可能会产生相同的数据。
- 这种情况是正常的,因为每个生产者线程的行为是独立的,除非明确要求生产唯一的数据。
生产者消费者模型的并行和串行:理解生产者消费者模型效率高的原因
生产者消费者模型实际上包含两部分,第一个部分是我们前面认识到的,生产者将任务放入任务队列,消费者从任务队列获取任务,这个访问任务队列的过程是互斥的过程,是串行的过程,无论是生产者放入还是消费者拿出,都必须一个一个的串行执行
第二个部分是并行,生产者会消耗一定时间从别处获取任务,即生产任务,等待系统给生产者发布一个任务需求,让生产者作为一个发布者发布给消费者,如阻塞等待响应任务、刷新缓冲区任务、页面响应任务、数据库数据迁移任务….消费者也会消耗一定时间处理获取到的任务。在这个过程中,生产者等待任务和消费者处理任务,生产者和消费者是并行执行的!!
实际上,在整个生产者消费者模型中,并行操作占据了绝大部分时间。相比之下,生产者和消费者访问任务队列的串行操作所占用的时间非常少。这种设计使得任务从分发到执行的过程变得极其高效——任务无需花费过多时间在分发环节上,从而显著提升了整体系统的效率。
总结来说,生产者消费者模型之所以高效,正是因为它充分利用了生产者与消费者之间的并行性,同时将串行操作的影响降到最低,从而实现了任务的快速流转与处理。
6. 总结
这段代码实现了一个功能完善的阻塞队列,其核心思想如下:
- 使用互斥锁保护共享资源。
- 使用条件变量实现线程间的等待和唤醒。
- 通过
while
循环处理伪唤醒问题。 - 通过计数器(
producer_count
和consumer_count
)优化唤醒操作。
7. 完整代码
BlockQueue.hpp
#ifndef _BLOCKING_QUEUE_HPP_
#define _BLOCKING_QUEUE_HPP_
#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"
using namespace CondModule;
using namespace MutexModule;
// 使用封装的条件变量和互斥锁
namespace BlockingQueueModule
{
// 数据
int num = 10;
template<typename T>
class BlockQueue
{
public:
BlockQueue(int cap = 5)
: _size(0), _cap(cap)
{}
~BlockQueue()
{}
// 对于生产者: 数据入队列
void Equeue(T& data)
{
// 加锁
_mutex.lock();
while (IsFull())
{
// 队列已满,生产者阻塞
producer_count++;
std::cout << "queue is full, 生产者进入阻塞" << '\n';
_producer_cond.Wait(_mutex);
std::cout << "生产者被唤醒" << '\n';
producer_count--;
}
// 生产数据
_bq.push(data);
_size++;
// 生产完数据,唤醒消费者
if(consumer_count){ // 为什么要加上这个判断:必须保证现在有消费者在等待
std::cout << "唤醒消费者" << '\n';
_consumer_cond.NotifyOne();
}
// 解锁
_mutex.unlock();
}
// 对于消费者: 数据出队列
T Pop()
{
// 加锁
_mutex.lock();
while (IsEmpty())
{
// 队列为空,消费者阻塞
consumer_count++;
std::cout << "queue is empty, 消费者进入阻塞" << '\n';
_consumer_cond.Wait(_mutex);
std::cout << "消费者被唤醒" << '\n';
consumer_count--;
}
// 消费数据
T data = _bq.back();
_bq.pop();
_size--;
// 消费完数据,唤醒生产者
if(producer_count){ // 为什么要加上这个判断:必须保证现在有生产者在等待
std::cout << "唤醒生产者" << '\n';
_producer_cond.NotifyOne();
}
// 解锁
_mutex.unlock();
return data;
}
// 队列是否为空
bool IsEmpty()
{
return _bq.empty();
}
// 队列是否已满
bool IsFull()
{
return _size == _cap;
}
private:
std::queue<T> _bq; // 存储数据的队列
int _size; // 队列中数据的个数
int _cap; // 队列容量
Cond _producer_cond; // 生产者条件变量;
Cond _consumer_cond; // 消费者条件变量;
Mutex _mutex;
// 计数器
int producer_count;
int consumer_count;
};
}
#endif
Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
namespace CondModule
{
using namespace MutexModule;
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond, nullptr);
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
// 阻塞等待
void Wait(Mutex& mtx)
{
pthread_cond_wait(&_cond, mtx.getLockPtr());
}
// 随机唤醒一个
void NotifyOne()
{
pthread_cond_signal(&_cond);
}
// 广播唤醒所有
void NotifyAll()
{
pthread_cond_broadcast(&_cond);
}
private:
pthread_cond_t _cond;
};
}
Mutex.hpp
#ifndef _MUTEX_HPP
#define _MUTEX_HPP
#include <iostream>
#include <pthread.h>
namespace MutexModule
{
class Mutex
{
public:
// 禁止拷贝
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
Mutex()
{
pthread_mutex_init(&_lock, nullptr);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
// 加锁
void lock()
{
pthread_mutex_lock(&_lock);
}
// 解锁
void unlock()
{
pthread_mutex_unlock(&_lock);
}
// 获取锁
pthread_mutex_t *getLockPtr()
{
return &_lock;
}
private:
pthread_mutex_t _lock;
};
// 锁保护类
class LockGuard
{
public:
LockGuard(Mutex &mtx) : _mtx(mtx)
{
_mtx.lock();
}
~LockGuard()
{
_mtx.unlock();
}
private:
Mutex& _mtx; // 因为是保护某个已存在的锁,所以这里不是创建锁,而是引用
};
}
#endif
8. BlockQueue
的使用:构建任务队列
Main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <stdlib.h>
#include <functional>
using namespace BlockingQueueModule;
using task_t = std::function<void()>;
// 打印任务
void Task_Print()
{
std::cout << "这是一个打印任务" << std::endl;
}
// 拷贝任务
void Task_Copy()
{
std::cout << "这是一个拷贝任务" << std::endl;
}
void *Producer(void *args)
{
BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
while (true)
{
// 任务队列(funtion)
task_t task = Task_Print;
bq->Equeue(task);
std::cout << "--------------生产者发布任务...---------------" << '\n';
}
}
void *Consumer(void *args)
{
BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
while (true)
{
sleep(2);
// 任务队列
task_t task = bq->Pop();
std::cout << "--------------消费者获取并执行任务...---------------" << '\n';
task();
}
}
int main()
{
BlockQueue<task_t> bq;
// 创建生产者线程: 1 个
pthread_t _tid1;
pthread_create(&_tid1, nullptr, Producer, &bq);
// 创建消费者线程: 2 个
pthread_t _tid10, _tid11;
pthread_create(&_tid10, nullptr, Consumer, &bq);
pthread_create(&_tid11, nullptr, Consumer, &bq);
// 回收线程
pthread_join(_tid1, nullptr);
pthread_join(_tid10, nullptr);
pthread_join(_tid11, nullptr);
return 0;
}
运行结果如下: