当前位置: 首页 > news >正文

【Linux系统】生产者消费者模型:基于阻塞队列 BlockingQueue




在这里插入图片描述



线程安全的阻塞队列(Blocking Queue)允许生产者和消费者在多线程环境下安全地共享数据。下面我将逐步剖析阻塞队列代码的编写逻辑,并讲解如何编写一个阻塞队列。



1. 基本概念与需求

  • 阻塞队列是一种特殊的队列,当队列满时,生产者线程会被阻塞直到有空间可用;当队列空时,消费者线程会被阻塞直到有数据可用。
  • 线程安全是关键:多个线程同时访问队列时,必须确保数据一致性。
  • 使用条件变量(Cond)和互斥锁(Mutex)来实现线程间的同步和通信。(注意本文使用的 条件变量(Cond)和互斥锁(Mutex)都是我自己封装过的,完整代码放到文末)



2. 代码结构分析


头文件保护

#ifndef _BLOCKING_QUEUE_HPP_
#define _BLOCKING_QUEUE_HPP_
  • 防止头文件重复包含,避免编译错误。

引入必要的库

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"
  • <iostream>:用于调试输出。
  • <queue>:标准库中的队列容器,用于存储数据。
  • <pthread.h>:POSIX线程相关函数(假设 CondMutex 是基于此实现的)。
  • "Cond.hpp""Mutex.hpp":封装了条件变量和互斥锁的功能模块。


命名空间

namespace BlockingQueueModule {
    template<typename T>
    class BlockQueue { ... };
}
  • 将阻塞队列封装在命名空间 BlockingQueueModule 中,避免命名冲突。



3. 类定义与成员变量

类模板定义

template<typename T>
class BlockQueue {
public:
    BlockQueue(int cap = 5) : _size(0), _cap(cap) {}
    ~BlockQueue() {}
    ...
private:
    std::queue<T> _bq; // 存储数据的队列
    int _size;         // 队列中数据的个数
    int _cap;          // 队列容量

    Cond _producer_cond; // 生产者条件变量
    Cond _consumer_cond; // 消费者条件变量
    Mutex _mutex;        // 互斥锁

    int producer_count;  // 等待的生产者数量
    int consumer_count;  // 等待的消费者数量
};
  • 模板参数 T:支持任意类型的元素。
  • 构造函数
    • 初始化 _size 为 0,表示队列初始为空。
    • 设置队列容量 _cap,默认值为 5。
  • 成员变量
    • _bq:底层存储数据的标准队列。
    • _size_cap:分别表示当前队列大小和最大容量。
    • _producer_cond_consumer_cond:用于生产者和消费者的等待/唤醒机制。
    • _mutex:保护对共享资源的访问。
    • producer_countconsumer_count:记录当前等待的生产者和消费者数量。



4. 核心功能实现

生产者方法:Equeue(T& data)

void Equeue(T& data)
{
    _mutex.lock(); // 加锁,确保线程安全

    while (IsFull()) { // 使用 while 而不是 if,防止伪唤醒问题
        producer_count++;
        std::cout << "queue is full, 生产者进入阻塞" << '\n';
        _producer_cond.Wait(_mutex); // 如果队列满,生产者线程阻塞
        std::cout << "生产者被唤醒" << '\n';
        producer_count--;
    }

    _bq.push(data); // 将数据加入队列
    _size++;

    if (consumer_count) { // 如果有消费者在等待
        std::cout << "唤醒消费者" << '\n';
        _consumer_cond.NotifyOne(); // 唤醒一个消费者线程
    }

    _mutex.unlock(); // 解锁
}
  • 加锁:使用 _mutex.lock() 确保只有一个线程可以修改队列。
  • 检查队列是否已满
    • 如果队列已满(IsFull() 返回 true),调用 _producer_cond.Wait(_mutex),让当前生产者线程进入等待状态。
    • 当队列中有空闲空间时,条件变量会唤醒该线程。
  • 插入数据:将数据压入队列,并更新队列大小。
  • 唤醒消费者:如果当前有消费者在等待(consumer_count > 0),调用 _consumer_cond.NotifyOne() 唤醒其中一个消费者线程。
  • 解锁:释放锁,允许其他线程访问队列。


消费者方法:Pop()

T Pop()
{
    _mutex.lock(); // 加锁

    while (IsEmpty()) { // 使用 while 而不是 if,防止伪唤醒问题
        consumer_count++;
        std::cout << "queue is empty, 消费者进入阻塞" << '\n';
        _consumer_cond.Wait(_mutex); // 如果队列空,消费者线程阻塞
        std::cout << "消费者被唤醒" << '\n';
        consumer_count--;
    }

    T data = _bq.back(); // 获取队列尾部的数据
    _bq.pop();           // 移除队列尾部的数据
    _size--;

    if (producer_count) { // 如果有生产者在等待
        std::cout << "唤醒生产者" << '\n';
        _producer_cond.NotifyOne(); // 唤醒一个生产者线程
    }

    _mutex.unlock(); // 解锁
    return data;
}
  • 加锁:确保线程安全。
  • 检查队列是否为空
    • 如果队列为空(IsEmpty() 返回 true),调用 _consumer_cond.Wait(_mutex),让当前消费者线程进入等待状态。
    • 当队列中有数据时,条件变量会唤醒该线程。
  • 获取数据:从队列尾部取出数据并移除。
  • 唤醒生产者:如果当前有生产者在等待(producer_count > 0),调用 _producer_cond.NotifyOne() 唤醒其中一个生产者线程。
  • 解锁:释放锁,允许其他线程访问队列。


辅助方法

bool IsEmpty() { return _bq.empty(); }
bool IsFull() { return _size == _cap; }
  • IsEmpty():判断队列是否为空。
  • IsFull():判断队列是否已满。



5. 设计细节与注意事项

为什么需要 if (consumer_count)if (producer_count)

  • 条件变量的 NotifyOne() 方法只会唤醒一个等待的线程。
  • 如果直接调用 NotifyOne() 而不检查是否有等待线程,可能会导致不必要的唤醒,浪费系统资源。
  • 因此,只有在确实有等待线程时才调用 NotifyOne()


为什么使用 while 而不是 if

  • 在多线程环境中,可能存在虚假唤醒(spurious wakeup)问题,也就是伪唤醒。

  • 具体来说:一个线程被唤醒,能够继续往下消费或生产一个物品,需要满足两个条件:1、获取到互斥锁;2、当前队列不为极端情况(不是满的或不是空的)

    如果一个消费者线程被唤醒,它获取到了互斥锁,但此时队列里为空,它还继续往下执行代码消费物品,岂不是出错?

    这种在条件不满足(为满或为空,线程不能生产或消费)的情况下被唤醒,叫做伪唤醒!!!

    因此需要将 if 该成 while:竞争失败了就应该继续进入 wait 状态


条件变量休眠 pthread_cond_wait(&cond, &lock)

  • 需要传入当前线程持有的锁,因为线程在进入条件变量睡眠时,是持有锁的!
  • 线程一般是在线程内部的临界区进行条件变量休眠的。
  • 如果让一个线程带着锁休眠,就会导致死锁或资源竞争问题。因此需要传入锁,让 pthread_cond_wait 自动释放锁。


问题:线程的 pthread_cond_wait 醒来时,若此时互斥锁获取失败怎么办?

  • 如果线程从 pthread_cond_wait 中醒来但无法获取锁,则该线程会被放入锁的等待队列中。
  • 等待队列中的线程会一直等到锁可用为止,然后继续执行后续逻辑。


线程唤醒逻辑放在释放锁之前还是之后?

  • 放在释放锁之前
    • 唤醒线程后,虽然当前线程仍然持有锁,但被唤醒的线程会自动尝试获取锁。
    • 如果被唤醒的线程暂时无法获取锁,它会被放入锁的等待队列中,不会影响系统的正常运行。
  • 放在释放锁之后
    • 如果在释放锁之后才唤醒其他线程,可能导致多个线程同时争抢锁。
    • 对于消费者线程来说,可能会导致所有消费者线程迅速消耗完队列中的资源,最终使队列变空。
    • 对于生产者线程来说,可能会导致队列快速填满,从而引发阻塞。
  • 将唤醒逻辑放在释放锁之前通常更好,因为它避免了不必要的锁争抢,同时保证了线程间的公平性。


如何实现多生产者多消费者?

  • 直接创建多个生产者和消费者线程即可。
  • 在生产者和生产者之间,在消费者和消费者之间的关系是互斥的,而我们线程之间对于同一个临界区用的是同一把锁,因此天然地就具有一种互斥关系!


为什么定义多生产者时会出现生产相同数据的情况?

  • 因为每个生产者线程都有自己的栈空间,因此它们可以独立生成数据。
  • 如果多个生产者线程生成的数据来源相同(例如,基于相同的初始值或算法),则可能会产生相同的数据。
  • 这种情况是正常的,因为每个生产者线程的行为是独立的,除非明确要求生产唯一的数据。


生产者消费者模型的并行和串行:理解生产者消费者模型效率高的原因

生产者消费者模型实际上包含两部分,第一个部分是我们前面认识到的,生产者将任务放入任务队列,消费者从任务队列获取任务,这个访问任务队列的过程是互斥的过程,是串行的过程,无论是生产者放入还是消费者拿出,都必须一个一个的串行执行

第二个部分是并行,生产者会消耗一定时间从别处获取任务,即生产任务,等待系统给生产者发布一个任务需求,让生产者作为一个发布者发布给消费者,如阻塞等待响应任务、刷新缓冲区任务、页面响应任务、数据库数据迁移任务….消费者也会消耗一定时间处理获取到的任务。在这个过程中,生产者等待任务和消费者处理任务,生产者和消费者是并行执行的!!

实际上,在整个生产者消费者模型中,并行操作占据了绝大部分时间。相比之下,生产者和消费者访问任务队列的串行操作所占用的时间非常少。这种设计使得任务从分发到执行的过程变得极其高效——任务无需花费过多时间在分发环节上,从而显著提升了整体系统的效率。

总结来说,生产者消费者模型之所以高效,正是因为它充分利用了生产者与消费者之间的并行性,同时将串行操作的影响降到最低,从而实现了任务的快速流转与处理。




6. 总结

这段代码实现了一个功能完善的阻塞队列,其核心思想如下:

  1. 使用互斥锁保护共享资源。
  2. 使用条件变量实现线程间的等待和唤醒。
  3. 通过 while 循环处理伪唤醒问题。
  4. 通过计数器(producer_countconsumer_count)优化唤醒操作。


7. 完整代码

BlockQueue.hpp

#ifndef _BLOCKING_QUEUE_HPP_
#define _BLOCKING_QUEUE_HPP_

#include <iostream>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"

using namespace CondModule;
using namespace MutexModule;

// 使用封装的条件变量和互斥锁
namespace BlockingQueueModule
{

    // 数据
    int num = 10;

    template<typename T>
    class BlockQueue
    {
    public:
        BlockQueue(int cap = 5) 
            : _size(0), _cap(cap)
        {}
        ~BlockQueue()
        {}

        // 对于生产者: 数据入队列
        void Equeue(T& data)
        {
            // 加锁
            _mutex.lock();
            while (IsFull())
            {
                // 队列已满,生产者阻塞
                producer_count++;
                std::cout << "queue is full, 生产者进入阻塞" << '\n';
                _producer_cond.Wait(_mutex);
                std::cout << "生产者被唤醒" << '\n';
                producer_count--;
            }
            // 生产数据
            _bq.push(data);
            _size++;

            // 生产完数据,唤醒消费者
            if(consumer_count){ // 为什么要加上这个判断:必须保证现在有消费者在等待
                std::cout << "唤醒消费者" << '\n';
                _consumer_cond.NotifyOne();
            }

            // 解锁
            _mutex.unlock();
        }

        // 对于消费者: 数据出队列
        T Pop()
        {
            // 加锁
            _mutex.lock();
            while (IsEmpty())
            {
                // 队列为空,消费者阻塞
                consumer_count++;
                std::cout << "queue is empty, 消费者进入阻塞" << '\n';
                _consumer_cond.Wait(_mutex);
                std::cout << "消费者被唤醒" << '\n';
                consumer_count--;
            }
            // 消费数据
            T data = _bq.back();
            _bq.pop();
            _size--;

            // 消费完数据,唤醒生产者
            if(producer_count){  // 为什么要加上这个判断:必须保证现在有生产者在等待
                std::cout << "唤醒生产者" << '\n';
                _producer_cond.NotifyOne();
            }
            
            // 解锁
            _mutex.unlock();
            return data;
        }

        // 队列是否为空
        bool IsEmpty()
        {
            return _bq.empty();
        }

        // 队列是否已满
        bool IsFull()
        {
            return _size == _cap;
        }


    private:
        std::queue<T> _bq; // 存储数据的队列
        int _size;            // 队列中数据的个数
        int _cap;             // 队列容量

        Cond _producer_cond; // 生产者条件变量;
        Cond _consumer_cond; // 消费者条件变量;
        Mutex _mutex;

        // 计数器
        int producer_count;
        int consumer_count;
    };
}

#endif


Cond.hpp

#pragma once

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

namespace CondModule
{
    using namespace MutexModule;
    class Cond
    {
    public:
        Cond()
        {
            pthread_cond_init(&_cond, nullptr);
        }
        ~Cond()
        {
            pthread_cond_destroy(&_cond);
        }

        // 阻塞等待
        void Wait(Mutex& mtx)
        {
            pthread_cond_wait(&_cond, mtx.getLockPtr());
        }

        // 随机唤醒一个
        void NotifyOne()
        {
            pthread_cond_signal(&_cond);
        }

        // 广播唤醒所有
        void NotifyAll()
        {
            pthread_cond_broadcast(&_cond);
        }

    private:
        pthread_cond_t _cond;
    };
}


Mutex.hpp

#ifndef _MUTEX_HPP
#define _MUTEX_HPP

#include <iostream>
#include <pthread.h>


namespace MutexModule
{
    class Mutex
    {
    public:
        // 禁止拷贝
        Mutex(const Mutex&) = delete;
        Mutex& operator=(const Mutex&) = delete;

        Mutex()
        {
            pthread_mutex_init(&_lock, nullptr);
        }
        ~Mutex()
        {
            pthread_mutex_destroy(&_lock);
        }

        // 加锁
        void lock()
        {
            pthread_mutex_lock(&_lock);
        }
        // 解锁
        void unlock()
        {
            pthread_mutex_unlock(&_lock);
        }
        // 获取锁
        pthread_mutex_t *getLockPtr()
        {
            return &_lock;
        }

    private:
        pthread_mutex_t _lock;
    };


    // 锁保护类
    class LockGuard
    {
    public:
        LockGuard(Mutex &mtx) : _mtx(mtx)
        {
            _mtx.lock();
        }
        ~LockGuard()
        {
            _mtx.unlock();
        }
    private:
        Mutex& _mtx;  // 因为是保护某个已存在的锁,所以这里不是创建锁,而是引用
    };

}

#endif 


8. BlockQueue 的使用:构建任务队列

Main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <stdlib.h>
#include <functional>

using namespace BlockingQueueModule;
using task_t = std::function<void()>;


// 打印任务
void Task_Print()
{
    std::cout << "这是一个打印任务" << std::endl;
}

// 拷贝任务
void Task_Copy()
{
    std::cout << "这是一个拷贝任务" << std::endl;
}


void *Producer(void *args)
{
    
    BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
    while (true)
    {
        // 任务队列(funtion)
        task_t task = Task_Print;
        bq->Equeue(task);
        std::cout << "--------------生产者发布任务...---------------" << '\n';
    }
}


void *Consumer(void *args)
{
    BlockQueue<task_t>* bq = static_cast<BlockQueue<task_t>*>(args);
    while (true)
    {
        sleep(2);
        // 任务队列
        task_t task = bq->Pop();
        std::cout << "--------------消费者获取并执行任务...---------------" << '\n';
        task();
    }
}


int main()
{
    BlockQueue<task_t> bq;

    // 创建生产者线程: 1 个
    pthread_t _tid1;
    pthread_create(&_tid1, nullptr, Producer, &bq);

    

    // 创建消费者线程: 2 个
    pthread_t _tid10, _tid11;
    pthread_create(&_tid10, nullptr, Consumer, &bq);
    pthread_create(&_tid11, nullptr, Consumer, &bq);

    
    // 回收线程
    pthread_join(_tid1, nullptr);
    pthread_join(_tid10, nullptr);
    pthread_join(_tid11, nullptr);


    return 0;
}




运行结果如下:

在这里插入图片描述

相关文章:

  • 【笔记】LLM|Ubuntu22服务器极简本地部署DeepSeek+API使用方式
  • 使用apt-rdepends制作软件离线deb安装包
  • 网站搭建基本流程
  • RK3568平台开发系列讲解(PWM篇)SG90 舵机驱动实验
  • 蓝桥杯题目(36进制)
  • Recall(召回率)和 Precision(精确率) 的区别和F1分数
  • UML顺序图的建模方法及应用示例
  • 一、《重学设计模式》-设计模式简介
  • 3.9 用户反馈智能分析实战:从情感识别到产品优化的闭环设计指南
  • 低代码(Low Code)全解析:从概念到应用,从选择到价值
  • Spring框架-AOP
  • 【C语言】C语言 食堂自动化管理系统(源码+数据文件)【独一无二】
  • 【git】已上传虚拟环境的项目更改成不再上传虚拟环境
  • cmake:定位Qt的ui文件
  • 练习题:41
  • VideoPipe-使用VLC构建RTSP串流显示
  • 核函数简述
  • RagFlow+Ollama 构建RAG私有化知识库
  • python进阶篇-面向对象
  • 梁文锋亲自挂名DeepSeek发布新论文
  • 特朗普:对所有在国外制作进入美国的电影征收100%关税
  • 建邦高科赴港上市,大股东陈子淳系山东建邦集团董事长陈箭之子
  • 演员扎堆音乐节,是丰富了舞台还是流量自嗨?
  • 人民日报:创新成势、澎湃向前,中国科技创新突围的密码与担当
  • 全球最大汽车板供应商宝钢股份:汽车工业加速转型中材料商如何共舞?
  • 范宇任上海宝山区副区长