当前位置: 首页 > news >正文

阻塞队列(BlockingQueue)原理、实现与应用:多线程编程中的核心数据结构

目录

一、BlockingQueue(阻塞队列)

1、基本定义与特性

2、与普通队列的对比

3、底层实现原理

4、模拟实现基于阻塞队列的生产消费模型

1. queue_has_items 条件变量

2. queue_has_space 条件变量

3. 使用 queue 的原因

1. 数据存储与管理

2. 与同步机制配合

3. 通用性和可扩展性

5、在生产者 - 消费者模型中的应用

6、优势与局限性

优势

局限性

7、阻塞队列(BlockQueue)的模板化设计与任务队列应用

1. 设计概述

2. 任务队列的两种实现方式(重点!!!)

(1) 自定义任务类(Task)

(2) 通用函数任务(std::function)

3. 关键设计分析

(1) 线程安全机制

(2) 唤醒策略

(3) 模板化的优势

4. 测试用例

二、回顾:生产者消费者模型

1、生产者消费者模型的概念

2、生产者消费者模型的特点

(一)三种关系

(二)两种角色

(三)一个交易场所

3、互斥关系和同步关系的成因分析

(一)互斥关系的成因

(二)同步关系的成因

4、生产者消费者模型的优点

(一)解耦

(二)支持并发

(三)支持忙闲不均


一、BlockingQueue(阻塞队列)

        在多线程编程的复杂生态中,阻塞队列(BlockingQueue)是一种极为重要且常用的数据结构,它为解决生产者 - 消费者模型中的同步与协作问题提供了高效且便捷的方案。

1、基本定义与特性

        阻塞队列本质上是一种特殊的队列,它遵循先进先出(FIFO)的原则来管理元素。然而,与普通队列相比,阻塞队列具有独特的阻塞行为。在多线程环境下,当队列处于特定状态时,对队列的操作将会引发线程的阻塞。具体表现为:

  • 队列为空时:当消费者线程试图从队列中获取元素,而此时队列为空,该获取操作将会被阻塞。消费者线程会进入等待状态,暂停执行,直到有其他线程向队列中放入元素,将其唤醒。

  • 队列已满时:当生产者线程尝试向已满的队列中存放元素,该存放操作会被阻塞。生产者线程同样会进入等待状态,直至队列中有元素被取出,腾出空间,生产者线程才会被唤醒并继续执行存放操作。

        这种阻塞机制是基于不同线程之间的交互实现的。也就是说,一个线程对阻塞队列的操作可能会因为队列的状态而阻塞,而另一个线程的操作则可能会改变队列状态,从而唤醒被阻塞的线程。        

2、与普通队列的对比

        普通队列仅仅是一个简单的数据存储结构,它提供了基本的入队(enqueue)和出队(dequeue)操作。在单线程或简单的多线程场景下,普通队列可以满足一定的需求。然而,在复杂的并发环境中,普通队列存在明显的局限性:

  • 缺乏同步机制:普通队列没有内置的同步控制,当多个线程同时对队列进行操作时,可能会导致数据不一致的问题。例如,一个线程正在从队列中取出元素,而另一个线程同时向队列中插入元素,这可能会破坏队列的完整性。

  • 无法处理空或满的情况:在队列为空时,消费者线程直接进行出队操作会得到无效的数据;在队列已满时,生产者线程的入队操作会导致数据丢失或溢出。普通队列没有提供有效的机制来处理这些特殊情况。

        相比之下,阻塞队列通过其阻塞行为和内置的同步机制,很好地解决了这些问题。它确保了在多线程环境下对队列的安全访问,并且能够根据队列的状态自动调整线程的执行流程。

知识关联:阻塞队列的概念让我们自然联想到管道,而其最典型的应用场景正是管道的实现!!!

3、底层实现原理

阻塞队列的底层实现通常依赖于同步机制,如互斥锁(Mutex)和条件变量(Condition Variable)。

  • 互斥锁:用于保护队列的内部状态,确保同一时间只有一个线程能够对队列进行修改。例如,当一个线程正在向队列中插入元素时,其他线程无法同时进行插入或删除操作,从而避免了数据竞争。

  • 条件变量:用于实现线程的阻塞和唤醒。当队列为空时,消费者线程通过条件变量进入等待状态;当有元素被放入队列时,生产者线程通过条件变量通知消费者线程。同样,当队列已满时,生产者线程等待,消费者线程取出元素后通知生产者线程。

4、模拟实现基于阻塞队列的生产消费模型

以一个简单的基于互斥锁和条件变量实现的阻塞队列为例,其核心代码结构可能如下:

#include <pthread.h>
#include <queue>template<typename T>
class BlockingQueue {
private:std::queue<T> queue;pthread_mutex_t mutex;pthread_cond_t queue_has_items;  // 表示队列中有元素可供消费pthread_cond_t queue_has_space;  // 表示队列中有空间可存放新元素size_t capacity;public:BlockingQueue(size_t cap) : capacity(cap) {pthread_mutex_init(&mutex, NULL);pthread_cond_init(&queue_has_items, NULL);pthread_cond_init(&queue_has_space, NULL);}~BlockingQueue() {pthread_mutex_destroy(&mutex);pthread_cond_destroy(&queue_has_items);pthread_cond_destroy(&queue_has_space);}void enqueue(T item) {pthread_mutex_lock(&mutex);while (queue.size() == capacity) {// 队列已满,等待队列有空位pthread_cond_wait(&queue_has_space, &mutex);}queue.push(item);// 通知等待的消费者,队列中有新元素pthread_cond_signal(&queue_has_items);pthread_mutex_unlock(&mutex);}T dequeue() {pthread_mutex_lock(&mutex);while (queue.empty()) {// 队列为空,等待队列有元素pthread_cond_wait(&queue_has_items, &mutex);}T item = queue.front();queue.pop();// 通知等待的生产者,队列有空位了pthread_cond_signal(&queue_has_space);pthread_mutex_unlock(&mutex);return item;}
};

        在上述代码中,enqueue 方法用于向队列中添加元素。如果队列已满,生产者线程将通过 pthread_cond_wait 函数阻塞,直到队列有空位(由消费者线程取出元素后通过 pthread_cond_signal 通知)。dequeue 方法用于从队列中取出元素,如果队列为空,消费者线程将阻塞,直到队列中有元素。

1. queue_has_items 条件变量

  • 当消费者线程尝试从队列中取出元素,但队列为空时,消费者线程会通过 pthread_cond_wait(&queue_has_items , &mutex) 函数进入等待状态。

  • 此时,queue_has_items 条件变量用于挂起消费者线程,直到队列中有新的元素被放入。

  • 当生产者线程向队列中添加元素后,会调用 pthread_cond_signal(&not_empty) 来通知等待在 queue_has_items 条件变量上的消费者线程,告知队列不再为空,可以尝试取出元素。

2. queue_has_space 条件变量

  • 当生产者线程试图向队列中添加元素,但队列已满时,生产者线程会通过 pthread_cond_wait(&queue_has_space , &mutex) 函数进入等待状态。

  • queue_has_space 条件变量用于挂起生产者线程,直到队列中有元素被取出,腾出空间。

  • 当消费者线程从队列中取出元素后,会调用 pthread_cond_signal(&queue_has_space) 来通知等待在 queue_has_space 条件变量上的生产者线程,告知队列不再已满,可以继续添加元素。

3. 使用 queue 的原因

1. 数据存储与管理
  • 顺序存储std::queue 是一个先进先出(FIFO)的数据结构,它非常适合用于阻塞队列的场景。在生产者 - 消费者模型中,生产者按照一定的顺序生产数据,消费者也希望按照生产者生产数据的顺序来消费数据。std::queue 能够很好地满足这种顺序存储和访问的需求。

  • 便捷操作std::queue 提供了简单易用的接口,如 push 用于将元素添加到队列尾部,pop 用于从队列头部移除元素,front 用于获取队列头部的元素等。这些操作使得在阻塞队列中存储和检索数据变得非常方便。

2. 与同步机制配合
  • 状态判断:在阻塞队列的实现中,需要频繁地判断队列是否为空或已满。std::queue 的 empty 和 size 方法可以方便地获取队列的当前状态。例如,在 enqueue 方法中,通过 queue.size() == capacity 判断队列是否已满;在 dequeue 方法中,通过 queue.empty() 判断队列是否为空。

  • 数据一致性:结合互斥锁(mutex),std::queue 可以确保在多线程环境下对队列的操作是线程安全的。互斥锁保护了队列的内部状态,防止多个线程同时修改队列导致数据不一致的问题。

3. 通用性和可扩展性
  • 泛型支持:代码中使用了模板类 BlockingQueue<typename T>std::queue 同样支持泛型,可以存储任意类型的数据。这使得 BlockingQueue 具有很强的通用性,可以应用于不同类型的数据传输和处理的场景。

  • 易于扩展:基于 std::queue 实现的阻塞队列可以方便地进行扩展和优化。例如,可以根据实际需求调整队列的容量、添加更多的同步机制或优化队列的性能等。

        综上,使用 std::queue 作为阻塞队列的底层数据结构,能够充分利用其顺序存储、便捷操作、与同步机制的良好配合以及通用性和可扩展性等优点,从而有效地实现生产者 - 消费者模型中的数据缓冲和线程同步功能。

        为便于理解,我们以单生产者单消费者模型为例(其实可以支持多生产者多消费者模型)进行实现。其中BlockQueue作为生产者与消费者的交易场所,可直接使用C++STL中的queue实现。

// 阻塞队列的实现
#pragma once#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>const int defaultcap = 5; // for testtemplate <typename T>
class BlockQueue
{
private:bool IsFull() { return _q.size() >= _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);}void Equeue(const T &in){pthread_mutex_lock(&_mutex);// 生产者调用while (IsFull()){// 应该让生产者线程进行等待// 重点1:pthread_cond_wait调用成功,挂起当前线程之前,要先自动释放锁!!// 重点2:当线程被唤醒的时候,默认就在临界区内唤醒!要从pthread_cond_wait// 成功返回,需要当前线程,重新申请_mutex锁!!!// 重点3:如果我被唤醒,但是申请锁失败了??我就会在锁上阻塞等待!!!_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" <<  _psleep_num << std::endl;// 问题1: pthread_cond_wait是函数吗?有没有可能失败?pthread_cond_wait立即返回了// 问题2:pthread_cond_wait可能会因为,条件其实不满足,pthread_cond_wait 伪唤醒pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 100%确定:队列有空间_q.push(in);// 临时方案// v2if(_csleep_num>0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}// pthread_cond_signal(&_empty_cond); // 可以pthread_mutex_unlock(&_mutex); // TODO// pthread_cond_signal(&_empty_cond); // 可以}T Pop(){// 消费者调用pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}// pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q; // 临界资源!!!int _cap;         // 容量大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};

这段代码实现了一个模板类的阻塞队列(BlockQueue),用于生产者消费者模型中的线程间通信。下面我将详细讲解代码中的各个知识点。

1. 头文件和命名空间

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
  • <iostream>: 提供标准输入输出功能

  • <string>: 提供字符串处理功能

  • <queue>: 提供队列数据结构

  • <pthread.h>: 提供POSIX线程相关功能,包括互斥锁和条件变量

2. 默认容量定义

const int defaultcap = 5; // for test

定义了一个默认的队列容量5,主要用于测试目的。

BlockQueue类实现

1. 私有成员函数

bool IsFull() { return _q.size() >= _cap; }
bool IsEmpty() { return _q.empty(); }
  • IsFull(): 检查队列是否已满

  • IsEmpty(): 检查队列是否为空

2. 构造函数

BlockQueue(int cap = defaultcap): _cap(cap), _csleep_num(0), _psleep_num(0)
{pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full_cond, nullptr);pthread_cond_init(&_empty_cond, nullptr);
}
  • 初始化队列容量(_cap)、消费者休眠数(_csleep_num)和生产者休眠数(_psleep_num)

  • 初始化互斥锁(_mutex)和两个条件变量(_full_cond_empty_cond)

3. 入队操作(Equeue)

void Equeue(const T &in)
{pthread_mutex_lock(&_mutex);// 生产者调用while (IsFull()){_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" <<  _psleep_num << std::endl;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}// 100%确定:队列有空间_q.push(in);if(_csleep_num>0){pthread_cond_signal(&_empty_cond);std::cout << "唤醒消费者..." << std::endl;}pthread_mutex_unlock(&_mutex);
}

关键知识点

  1. 互斥锁保护:使用pthread_mutex_lock获取锁,确保对共享资源(队列)的访问是线程安全的

  2. 队列满处理

    • 使用while循环检查队列是否满(防止虚假唤醒)

    • 如果队列满,增加生产者休眠计数(_psleep_num)

    • 调用pthread_cond_wait使当前线程等待:

      • 重点1:调用成功时,在挂起线程前会自动释放锁

      • 重点2:被唤醒时,默认在临界区内唤醒,需要重新申请锁

      • 重点3:如果申请锁失败,会在锁上阻塞等待

  3. 数据入队:确认队列有空位后,将数据压入队列

  4. 唤醒消费者

    • 如果有消费者在等待(通过_csleep_num判断)

    • 使用pthread_cond_signal唤醒一个等待的消费者

  5. 释放锁:最后释放互斥锁

关于使用while而非if判断生产消费条件的必要性:

  • pthread_cond_wait存在调用失败的可能,失败后线程会继续执行后续代码,此时需要重新检查条件

  • 在多消费者场景下,使用pthread_cond_broadcast可能同时唤醒多个消费者线程,但实际只有一个数据可供消费,导致伪唤醒问题

  • 为确保线程被唤醒后能再次验证条件是否真正满足,必须使用while循环进行条件判断

4. 出队操作(Pop)

T Pop()
{pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if(_psleep_num > 0){pthread_cond_signal(&_full_cond);std::cout << "唤醒消费者" << std::endl;}pthread_mutex_unlock(&_mutex);return data;
}

关键知识点

  1. 结构与Equeue对称,但处理的是队列空的情况

  2. 使用_csleep_num记录等待的消费者数量

  3. 当取出数据后,如果有生产者在等待,唤醒一个生产者

5. 析构函数

~BlockQueue()
{pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);
}
  • 销毁互斥锁和条件变量,释放系统资源

6. 私有成员变量

std::queue<T> _q; // 临界资源!!!
int _cap;         // 容量大小pthread_mutex_t _mutex;
pthread_cond_t _full_cond;
pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数
int _psleep_num; // 生产者休眠的个数
  • _q: 存储数据的队列(临界资源)

  • _cap: 队列最大容量

  • _mutex: 互斥锁,保护对队列的访问

  • _full_cond: 条件变量,用于队列满时的生产者等待

  • _empty_cond: 条件变量,用于队列空时的消费者等待

  • _csleep_num: 记录等待的消费者数量

  • _psleep_num: 记录等待的生产者数量

代码中的关键问题和注意事项

1. 条件变量使用注意事项

// 问题1: pthread_cond_wait是函数吗?有没有可能失败?
// 问题2:pthread_cond_wait可能会因为条件其实不满足而伪唤醒
pthread_cond_wait(&_full_cond, &_mutex);
  • pthread_cond_wait确实是一个函数,可能会失败(如被信号中断)

  • 存在"伪唤醒"(spurious wakeup)问题,即使没有显式唤醒,线程也可能被唤醒

  • 解决方案:使用while循环而不是if来检查条件,可以处理伪唤醒

伪唤醒的完整示例:

初始状态

  • 队列为空,消费者 C1 在 _empty_cond 上等待。

  • 生产者 P1 准备添加数据。

伪唤醒流程:

  1. C1 调用 Dequeue,发现队列为空,进入 pthread_cond_wait(&_empty_cond, &_mutex)

  2. 操作系统错误地唤醒 C1(伪唤醒)

    • 没有线程调用 pthread_cond_signal

    • C1 从 pthread_cond_wait 返回,检查 if (_q.empty()):队列仍然为空,但 if 不会重新检查,直接执行 pop() → 崩溃或数据错误

  3. 如果用 whileC1 被伪唤醒后,while (_q.empty()) 会重新检查,发现队列仍为空,再次进入等待。

2. 锁的释放时机

pthread_mutex_unlock(&_mutex); // TODO
// pthread_cond_signal(&_empty_cond); // 可以
  • 注释显示这里曾经考虑过在解锁前后唤醒线程的问题

  • 最佳实践是在持有锁的时候发出信号(pthread_cond_signal),这样可以确保被唤醒的线程在尝试获取锁时,锁的状态是一致的

3. 唤醒策略

if(_csleep_num>0)
{pthread_cond_signal(&_empty_cond);
}
  • 使用pthread_cond_signal而不是pthread_cond_broadcast,只唤醒一个等待线程

  • 这样可以减少不必要的上下文切换,提高效率

  • 通过计数器(_csleep_num_psleep_num)来优化,只在有线程等待时才发出信号

代码优化建议

  • 错误处理:当前代码没有检查pthread_*函数的返回值,实际应用中应该添加错误处理

  • 性能优化:可以考虑使用pthread_cond_broadcast在某些情况下唤醒所有等待线程

  • 资源管理:可以使用RAII技术管理互斥锁和条件变量,确保异常安全

  • 更精确的唤醒:当前使用计数器判断是否需要唤醒,可以考虑更精确的等待线程管理

总结:这段代码实现了一个线程安全的阻塞队列,关键点在于:

  • 使用互斥锁保护共享资源(队列)

  • 使用两个条件变量分别处理队列满和队列空的情况

  • 通过计数器优化唤醒操作,避免不必要的信号发送

  • 正确处理了条件变量的等待和唤醒流程

这种实现是生产者消费者模型的典型应用,能够有效协调生产者和消费者的速度差异,避免忙等待,提高系统效率。

相关说明:

        本实现采用单生产者单消费者的生产者-消费者模型,因此无需处理生产者间或消费者间的关系,只需确保生产者与消费者之间的同步与互斥关系。

实现要点:

  1. 将BlockingQueue设计为模板类,提升代码复用性

  2. 设置阻塞队列容量上限为5,达到上限时生产者将自动阻塞

  3. 使用互斥锁保护阻塞队列,防止生产者和消费者同时访问造成冲突

线程控制机制:

  • 生产者线程在队列未满时可执行Push操作,队列已满时自动阻塞等待

  • 消费者线程在队列非空时可执行Pop操作,队列为空时自动阻塞等待

  • 采用两个条件变量实现线程调度:

    • _empty_cond条件变量:标识队列为空状态

    • _full_cond条件变量:标识队列已满状态

互斥锁管理:

  1. 线程进入临界区前必须先获取锁

  2. 若条件不满足,线程通过pthread_cond_wait自动释放锁并挂起

  3. 被唤醒时线程会自动重新获取锁

线程唤醒机制:

  • 生产者完成操作后唤醒等待_empty_cond条件的消费者线程

  • 消费者完成操作后唤醒等待_full_cond条件的生产者线程

5、在生产者 - 消费者模型中的应用

  • 在生产者 - 消费者模型中,阻塞队列作为中间缓冲区(在内存中开辟空间),起到了关键的解耦作用。

  • 生产者线程将生产好的数据放入阻塞队列,而不需要关心消费者线程何时消费这些数据;

  • 消费者线程从阻塞队列中取出数据进行处理,也不需要直接与生产者线程进行交互。

  • 例如,在一个日志处理系统中,多个生产者线程负责收集不同来源的日志信息,并将这些信息放入阻塞队列。

  • 而消费者线程则从阻塞队列中取出日志信息进行存储或分析。

  • 通过使用阻塞队列,系统能够有效地平衡生产者和消费者之间的处理速度差异,避免数据丢失或消费者线程的空闲等待。

6、优势与局限性

优势

  • 简化多线程编程:阻塞队列提供了一种简单而有效的方式来实现线程间的同步和通信,减少了开发人员手动处理同步逻辑的复杂性。

  • 提高系统性能:通过缓冲数据和自动阻塞/唤醒机制,阻塞队列能够平衡生产者和消费者的处理速度,提高系统的整体吞吐量。

  • 增强系统的稳定性:避免了因数据竞争和空/满队列处理不当而导致的系统崩溃或数据错误。

局限性

  • 性能开销:由于依赖于互斥锁和条件变量等同步机制,阻塞队列在高性能要求的场景下可能会引入一定的性能开销。

  • 容量限制:阻塞队列通常有固定的容量,如果生产者的生产速度持续超过消费者的消费速度,可能会导致队列满后生产者长时间阻塞,影响系统的响应时间。

        总之,阻塞队列作为多线程编程中的重要数据结构,在生产者 - 消费者模型中发挥着不可替代的作用。理解其原理和应用,对于开发高效、稳定的多线程程序具有重要意义。

7、阻塞队列(BlockQueue)的模板化设计与任务队列应用

1. 设计概述

本实现是一个线程安全的阻塞队列,支持多生产者和多消费者模型。其核心特点包括:

  • 模板化设计:支持任意数据类型(如 int、自定义类、std::function<void()> 等)。

  • 线程同步:通过 pthread_mutex_t 和 pthread_cond_t 实现互斥和条件等待。

  • 灵活的任务类型

    • 自定义任务类(如 Task,封装计算逻辑和结果)。

    • std::function<void()>(通用函数对象,适合lambda或函数指针)。

2. 任务队列的两种实现方式(重点!!!)

(1) 自定义任务类(Task
class Task {
public:Task(int a, int b) : _a(a), _b(b), _result(0) {}void Execute() {_result = _a + _b; // 模拟计算任务}std::string ResultToString() {return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);}std::string DebugToString() {return std::to_string(_a) + "+" + std::to_string(_b) + "=?";}private:int _a, _b, _result;
};// 使用示例
BlockQueue<Task> taskQueue;
(2) 通用函数任务(std::function<void()>
using Task = std::function<void()>;// 使用示例
BlockQueue<Task> funcQueue;
funcQueue.Equeue([]() { std::cout << "Lambda task executed!" << std::endl; });void Download()
{std::cout << "我是一个下载任务..." << std::endl;sleep(3); // 假设处理任务比较耗时
}

3. 关键设计分析

(1) 线程安全机制
  • 互斥锁(_mutex:保护队列的并发访问。

  • 条件变量(_full_cond 和 _empty_cond

    • 生产者等待队列非满(_full_cond)。

    • 消费者等待队列非空(_empty_cond)。

  • 伪唤醒处理:通过 while 循环重新检查条件,避免虚假唤醒。

(2) 唤醒策略
  • 当前使用 pthread_cond_signal 唤醒单个等待线程。

  • 优化建议:若需唤醒所有等待线程(如批量生产数据),可改用 pthread_cond_broadcast

(3) 模板化的优势
  • 灵活性:支持内置类型(如 int)、自定义类(如 Task)和函数对象(如 std::function)。

  • 性能:避免虚函数开销(相比基类-派生类设计)。

4. 测试用例

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>void *consumer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){sleep(10);// 1. 消费任务task_t t = bq->Pop();// 2. 处理任务 -- 处理任务的时候,这个任务,已经被拿到线程的上下文中了,不属于队列了t();}
}void *productor(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){// 1. 获得任务//std::cout << "生产了一个任务" << x << "+" << y << "=?" << std::endl;std::cout << "生产了一个任务" << std::endl;// 2. 生产任务bq->Equeue(Download);}
}int main()
{// 扩展认识: 阻塞队列: 可以放任务吗?// 申请阻塞队列BlockQueue<task_t> *bq = new BlockQueue<task_t>();// 构建生产和消费者pthread_t c[2], p[3];pthread_create(c, nullptr, consumer, bq);pthread_create(c+1, nullptr, consumer, bq);pthread_create(p, nullptr, productor, bq);pthread_create(p+1, nullptr, productor, bq);pthread_create(p+2, nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);return 0;
}// #include "BlockQueue.hpp"
// #include "Task.hpp"
// #include <iostream>
// #include <pthread.h>
// #include <unistd.h>// void *consumer(void *args)
// {
//     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);//     while (true)
//     {
//         sleep(1);
//         Task t = bq->Pop();//         t.Execute();//         std::cout << "消费了一个任务" << t.X() << "+" << t.Y() << "=" << t.Result()  << std::endl;
//     }
// }// void *productor(void *args)
// {
//     int x = 1;
//     int y = 1;
//     BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
//     while (true)
//     {
//         // sleep(1);
//         std::cout << "生产了一个任务" << x << "+" << y << "=?" << std::endl;
//         Task t(x, y);
//         bq->Equeue(t);//         x++, y++;
//     }
// }// int main()
// {
//     // 扩展认识: 阻塞队列: 可以放任务吗?
//     // 申请阻塞队列
//     BlockQueue<Task> *bq = new BlockQueue<Task>();//     // 构建生产和消费者
//     pthread_t c, p;//     pthread_create(&c, nullptr, consumer, bq);
//     pthread_create(&p, nullptr, productor, bq);//     pthread_join(c, nullptr);
//     pthread_join(p, nullptr);//     return 0;
// }

二、回顾:生产者消费者模型

1、生产者消费者模型的概念

  • 生产者消费者模型是一种经典的软件设计模式,其核心思想是通过引入一个中间容器(缓冲区),实现生产者与消费者之间的解耦。

  • 在该模型中,生产者和消费者不直接进行通信,而是通过共享的容器来间接交互。

  • 生产者完成数据生产后,无需等待消费者处理,直接将数据存入容器;消费者则从容器中获取所需数据进行处理,无需主动向生产者请求数据。

  • 这个中间容器起到了缓冲区的关键作用,它能够有效平衡生产者和消费者在处理能力上的差异。

  • 当生产者速度较快、消费者速度较慢时,容器可以暂时存储多余的数据,避免生产者因等待消费者而阻塞;反之,当消费者速度较快、生产者速度较慢时,容器可以为消费者提供数据,防止消费者因无数据可处理而闲置。

  • 通过这种方式,生产者消费者模型实现了生产者和消费者之间的松耦合,提高了系统的灵活性和可扩展性。

2、生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的典型应用场景,具有以下显著特点:

(一)三种关系

  • 生产者和生产者(互斥关系):多个生产者线程可能同时向共享容器中写入数据,为了避免数据混乱和冲突,需要确保同一时间只有一个生产者能够访问容器进行写入操作,因此生产者之间存在互斥关系。

  • 消费者和消费者(互斥关系):类似地,多个消费者线程可能同时从共享容器中读取数据,为了保证数据读取的正确性和一致性,同一时间只能有一个消费者从容器中获取数据,所以消费者之间也存在互斥关系。

  • 生产者和消费者(互斥关系、同步关系):生产者和消费者共享同一个容器,因此它们在访问容器时也存在互斥关系,以防止数据的不一致。同时,生产者和消费者之间还需要同步关系,即生产者的生产速度和消费者的消费速度需要相互协调,避免出现容器满时生产者继续生产导致数据丢失,或者容器空时消费者继续消费导致无效操作的情况。

(二)两种角色

  • 生产者和消费者是该模型中的两个核心角色,通常由进程或线程来承担。

  • 生产者负责生成数据并将其放入共享容器,消费者则从共享容器中取出数据进行处理。

  • 这两个角色的分工明确,各自专注于自身的任务,通过共享容器实现数据的传递和交互。

(三)一个交易场所

  • 通常指的是内存中的一段缓冲区,它是生产者和消费者进行数据交换的场所。

  • 这个缓冲区可以通过各种方式组织起来,例如使用队列、栈等数据结构。

  • 在实现生产者消费者模型时,核心任务之一就是对这个缓冲区进行合理的管理和维护,确保生产者和消费者能够正确、高效地访问其中的数据。

在编写生产者消费者模型的代码时,本质上就是对上述三种关系进行精确的维护和实现,以保证模型能够正常运行并发挥其优势。

3、互斥关系和同步关系的成因分析

(一)互斥关系的成因

  • 介于生产者和消费者之间的容器作为共享资源,可能会被多个执行流(生产者线程或消费者线程)同时访问。

  • 例如,多个生产者可能同时尝试向容器中写入数据,多个消费者可能同时尝试从容器中读取数据。这种同时访问的情况如果不加以控制,就会导致数据的不一致和错误。

  • 因此,我们需要将该临界资源(共享容器)用互斥锁保护起来,确保同一时间只有一个线程能够访问容器。

  • 在这个过程中,所有的生产者和消费者都会竞争式地申请锁,这就导致了生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。

(二)同步关系的成因

  • 如果让生产者一直不停地生产数据,而不考虑容器的容量限制,那么当容器被数据塞满后,生产者继续生产数据就会导致生产失败,甚至可能引发数据丢失等问题。

  • 反之,如果让消费者一直不停地消费数据,而不考虑容器中是否还有数据,那么当容器中的数据被消费完后,消费者继续消费就会陷入无效操作,浪费系统资源。

  • 虽然这种情况不会直接造成数据不一致的问题,但会引起另一方的饥饿问题,即生产者因容器满而无法生产,消费者因容器空而无法消费,导致系统整体效率低下。

        为了避免这种情况的发生,我们应该让生产者和消费者访问容器时具有一定的顺序性。例如,可以采用信号量等同步机制,让生产者在容器未满时生产数据,消费者在容器不为空时消费数据,从而实现生产者和消费者之间的协同工作。

需要注意的是,互斥关系和同步关系在生产者消费者模型中扮演着不同的角色。

  • 互斥关系主要保证数据的正确性和一致性,防止多个线程同时访问共享资源导致的数据冲突;

  • 而同步关系则是为了让多线程之间能够协同工作,提高系统的整体效率和性能。

4、生产者消费者模型的优点

(一)解耦

  • 在传统的函数调用方式中,如果在主函数中调用某一函数,必须等待该函数体执行完毕后才能继续执行主函数的后续代码,这种调用方式本质上是一种紧耦合。

  • 而在生产者消费者模型中,函数传参可以看作是生产者生产的过程,执行函数体则是消费者消费的过程。

  • 生产者只负责生成数据并将其放入共享容器,消费者只负责从共享容器中取出数据进行处理,在消费者消费期间,生产者可以同时进行生产,两者之间相互独立,互不干扰。

  • 因此,生产者消费者模型实现了生产者和消费者之间的松耦合,提高了系统的灵活性和可维护性。

(二)支持并发

  • 生产者消费者模型允许多个生产者和多个消费者同时运行,充分利用了多核处理器的优势,提高了系统的并发处理能力。

  • 通过合理的同步和互斥机制,可以确保多个线程在访问共享资源时的正确性和一致性,从而实现高效的并发执行。

(三)支持忙闲不均

  • 在实际应用中,生产者和消费者的处理速度往往是不均衡的。

  • 生产者可能因为某些原因(如数据源的供应速度)而生产速度较快或较慢,消费者也可能因为处理复杂度不同而消费速度有快有慢。

  • 生产者消费者模型通过共享容器的缓冲作用,能够很好地适应这种忙闲不均的情况。

  • 当生产者速度较快时,多余的数据可以存储在容器中,等待消费者慢慢处理;当消费者速度较快时,可以从容器中快速获取数据进行处理,避免了生产者或消费者的闲置,提高了系统的资源利用率。

        综上所述,生产者消费者模型凭借其解耦、支持并发和支持忙闲不均等优点,在多线程编程、分布式系统等领域得到了广泛的应用。通过合理设计和实现生产者消费者模型,可以有效提高系统的性能、可靠性和可扩展性。

http://www.dtcms.com/a/557281.html

相关文章:

  • mstscax!CCC::CCFSMProc调试记录设置为1打开调试开关
  • 树莓派连接海康威视工业相机
  • 建设家具网站手机端怎么看世界杯
  • Go语言设计模式:工厂模式详解
  • Docker 部署 openEuler 教程及常见问题解决
  • 厦门专业做网站 厦门做网站的公司 厦门做服饰网站网站开发程序员需要会的技能
  • W55MH32三模自由控:小程序按键网页随选
  • 物联网入侵检测技术综合综述报告
  • 大模型-Qwen-Agent框架:系列Agent功能介绍 (2)
  • 网站 设计理念淄博网站建设优化运营熊掌号
  • R 包的制作
  • 【矩阵分析与应用】【第5章 梯度分析与最优化】【5.2.2 矩阵迹的微分计算示例d(tr(U))=tr(dU)证明】
  • 岳阳网站设计公司网站开发意义
  • MySQL的CONCAT函数介绍
  • 潜山云建站网站建设wordpress获取当前用户id
  • makefile 函数全解
  • day01_牛客_数字统计_C++
  • Redis RDB 持久化机制深入理解:Copy-On-Write 与数据一致性保障
  • 做哪方面的网站阳泉哪里做网站
  • 电商网站改版方案有哪些免费的ppt模板下载网站
  • LeetCode 3217.从链表中移除在数组中存在的节点:哈希表(一次遍历)
  • LeetCode - 寻找两个正序数组的中位数
  • 上海网站设计公司 静安沙井建网站
  • VMware17完成克隆ubuntu20.04时IP地址相同的问题
  • 【问题排查】hadoop-shaded-guava依赖问题
  • 百度地图网页版在线使用搜索引擎优化搜索优化
  • 网站优化排名兰州网站建设尚美
  • leetcode 3217 从链表中移除在数组中存在的节点
  • C++音视频就业路线
  • 46-基于STM32的智能宠物屋设计与实现