[Linux——Lesson25.线程:生产与消费者模型]



目录
前言
一、😗认识生产消费者模型
1-1🍕为什么使用生产消费者模型
二、🤩基于阻塞队列的生产消费模型
2-1 🍿单生产单消费示例
2-1-1 🦣Thread.hpp线程类封装
2-1-2 🐕BlockQueue.hpp阻塞队列类封装
2-1-3🌟main函数逻辑
2-1-4 🐈⬛运行结果及拓展
2-2 🥐生产消费者模型的理解
三、😋循环队列生产消费模型
3-1 🌭POSIX 信号量
3-2 🥨二元信号量
四、🤔基于环形队列的生产消费者模型
🗝️总结与提炼
1️⃣核心本质与价值
2️⃣两种实现方案对比
3️⃣关键技术点梳理
4️⃣实践启示与注意事项
5️⃣核心收获
结束语
前言
在多线程并发编程的领域中,线程间的高效协作与资源协调是构建稳定、高效系统的核心难题。当多个线程共享资源并协同完成任务时,如何避免访问冲突、解决线程执行速度差异引发的冲突,以及实现安全的信息交互,成为开发者必须攻克的关键问题。
生产消费者模型是我们解决多线程同步互斥问题的经典模型,为上述挑战提供了合适的解决方案。该模型通过引入 “生产者”“消费者” 两种角色与共享缓冲区,将线程任务拆解为数据生成、存储与消费的闭环流程:生产者线程负责生成数据并放入缓冲区,消费者线程从缓冲区取出数据进行处理,而同步互斥机制则确保了缓冲区操作的原子性与顺序性,有效避免了数据不一致,临界资源访问冲突等问题。
深入剖析生产消费者模型的设计思想对我们进行多线程编程是百利而无一害的。
本文将从3方面解析生产消费者模型:
计算机中的生产消费者模型;
基于阻塞队列的生产消费者模型;
基于环形队列的生产消费者模型。
一、😗认识生产消费者模型
生产消费者模型的本质是讨论数据如何 并发的传递的问题,生活当中其实有很多生产消模型,经典模型比如超市。超市本身是不生产产品的,商品源来自于厂商,而客户源就是我们普通老百姓。而厂商可能不止一家,用户也不止一个,那为什么说超市是经典的模型呢?
以线程的角度来说,超市其实就是 共享资源,而厂商和用户其实就是多个线程,那么这个超市就要考虑多线程的同步和互斥问题,如果临界资源是超市,那么超市的商品毫无疑问就是数据,既然商品是数据,那么存储着数据的超市,不就是临时保存数据的 “内存空间” 吗?
实际上确实如此,而这里的 “超市” 并不是指硬件上的物理内存,其 是一种数据结构类型,可以是队列,可以是栈结构等等。总的来说,“超市” 是 “数据(商品)” 的交易场所。
既然超市一边有供货商提供货源,一边有用户来消费购物,OS层面上来说,这个临街资源两边都有多个线程,而我们 把 “供货商” 线程称为 生产者,“客户” 称为 消费者。
那么毫无疑问生产消费者模型是一个多线程访问共享资源的场景,在之前我们曾学习过,如果多线程不加保护的同时访问临界资源会带来严重的后果,一个简单的例子就是带来数据不一致的问题。要了解问题,必究其原因,所以我们需要先把它们的关系理清:
①生产者与生产者之间: 互斥。
②生产者与消费者之间:互斥。
③消费者与消费者之间:互斥、同步。三者皆是互斥关系,其实很好理解,假如超市只剩下了一件商品,而需要这件商品的客户却不止一个,那么这些客户之间就是竞争关系,在OS层面来说叫做互斥关系,同理,超市的货架是有限的,那么生产商会想办法把自己的商品更多的放在货架上让消费者看到,所以他们也是互斥。而当供应商需要到货100件之后才能对超市的账,但是在100件之前如果被客户直接拿走了,很可能会导致后面对账对不上等问题,所以生产者与消费者之间其实也是互斥关系的。如果有一批客户非常需要一件商品,但是这件商品迟迟不上架,所以超市就让他们两个保持距离,当供应商供货的时候让客户来购买。保持的这种关系我们也称之为同步。
注:生产者与生产者,消费者于消费者之间并不绝对的保持互斥,如果你想人为的保持同步也是可以的。综上所述,生产消费者之间存在3种关系(上面三个关系),2个角色(生产者角色和消费者角色)以及一个交易场所(超市)。你可以称之为 “321”原则(仅用来方便记忆)。
1-1🍕为什么使用生产消费者模型
生产消费者模型主要解决了:生产者与消费者的强耦合关系,生产者在生产数据的时候不需要等消费者使用,而是直接将数据放到缓冲区中;同样消费者要使用数据也不会直接找生产者要,而是直接从缓冲区中进行读取。通过中间的缓冲区将生产者和消费者进行解耦。
通过中间的缓冲区,生产消费者模型有了诸多优势:
- 解耦:生产者和消费者无需知道对方的存在,只需通过缓冲区交互,降低模块间耦合度
- 削峰填谷:当生产速度超过消费速度时,缓冲区可以暂存数据;当消费速度快时,也能从缓冲区获取数据
- 并发协作:支持并发,此处的并发指的是:允许多个生产者一起生产数据,多个消费者一起消费数据;但是不论是向缓冲区写,还是向缓冲区中读,都要保证是线程安全的。
- 负载均衡:多个生产者可以向同一缓冲区提交任务,多个消费者可以从缓冲区获取任务处理
二、🤩基于阻塞队列的生产消费模型
上面我们说了,生产消费者模型中的临界资源是一种数据结构,而比较常见的一种是基于阻塞队列的生产消费者模型:
将队列作为中间的缓冲区,生产者写入的数据放到队列中,消费者从队列中拿数据。
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
2-1 🍿单生产单消费示例
2-1-1 🦣Thread.hpp线程类封装
我们要实现生产消费者模型,自己对线程函数进行封装,达到方面的目的:
#ifndef __THREAD_HPP__ #define __THREAD_HPP__#include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h>namespace ThreadModule {template<typename T>using func_t = std::function<void(T&)>;// 重命名,把将来线程需要执行的回调函数进行包装// typedef std::function<void(const T&)> func_t;template<typename T>class Thread{public:void Excute(){_func(_data);}public:Thread(func_t<T> func, T &data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void* threadroutine(void *args) // 类成员函数,形参是有this指针的!!而静态成员函数没有this指针{Thread<T> *self = static_cast<Thread<T> *>(args);// 将args强转为Thread<T>*self->Excute();// 执行回调return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T &_data; // 为了让所有的线程访问同一个全局变量func_t<T> _func;bool _stop;}; } // namespace ThreadModule#endif以上的Thread.hpp的类,包含了线程常用用途,包括:线程创建,线程等待,线程分离。以及线程所需要的共享资源,threadroutine()为创建线程时的入口函数(回调),通过该回调函数,可以调用使用function包装的自定义函数,这样也就实现了让不同线程执行不同的自定义任务功能。
2-1-2 🐕BlockQueue.hpp阻塞队列类封装
有了线程类,我们还需要一个阻塞队列用来作为生产消费模型的场所,实现如下阻塞队列:
#ifndef __BLOCK_QUEUE_HPP__ #define __BLOCK_QUEUE_HPP__#include<iostream> #include <string> #include <pthread.h> #include <queue>template<typename T> class BlockQueue { private:bool IsFull()// 判满{return _block_queue.size() == _cap;}bool IsEmpty()// 判空{return _block_queue.empty();} public:BlockQueue(int cap): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_product_cond, nullptr);pthread_cond_init(&_consum_cond, nullptr);}void Enqueue(T& in)// 生产者{pthread_mutex_lock(&_mutex);// if(IsFull())// 判满// {// pthread_cond_wait(&_product_cond, &_mutex);// }while(IsFull())// 判满{// 之前 : 安全_product_wait_num++;pthread_cond_wait(&_product_cond, &_mutex);_product_wait_num--;// 之后 : 安全}// 生产_block_queue.push(in);// 通知消费者消费if(_consumer_wait_num > 0)pthread_cond_signal(&_consum_cond);// pthread_cond_signal(): 全部唤醒pthread_mutex_unlock(&_mutex);}void Pop(T *out)// 消费者{pthread_mutex_lock(&_mutex);// if(IsEmpty())// {// pthread_cond_wait(&_consum_cond, &_mutex);// }while(IsEmpty()){_consumer_wait_num++;pthread_cond_wait(&_consum_cond, &_mutex);_consumer_wait_num--;}// 进行消费*out = _block_queue.front();_block_queue.pop();// 通知生产者生产if(_product_wait_num > 0)pthread_cond_signal(&_product_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);} private:std::queue<T> _block_queue;pthread_mutex_t _mutex;// 保护blockqueue的锁pthread_cond_t _product_cond;// 专门给生产者提供的条件变量pthread_cond_t _consum_cond;// 专门给消费者提供的条件变量int _cap;// 容量int _consumer_wait_num;// 消费者等待数量int _product_wait_num;// 生产者等待数量 };#endif一个基于阻塞队列的生产消费者模型,使用线程模拟,而阻塞队列作为临街资源,所以线程免不了要进行加锁解锁等待等操作。所以在private成员内,定义了生产消费者的条件变量与互斥锁。
构造函数来初始化锁和互斥量,以及初始化队列容量。队列的操作我们不陌生,无非就是出队列和入队列。这里出队列相当于消费者正在消费,入队列相当于生产者在生产。所以pop和enqueue操作就是临界区。所以在访问接口之前需要加锁。
如果队列满了,生产者需要判满,将该线程进行条件变量等待。只有当消费者消费过后,队列不再满,才能从消费者处进行唤醒。同理,如果队列为空,消费者不可进行消费,进行判空,当生产者生产出数据时,再唤醒消费者进行消费。
这里还有一个细节,我们知道当线程进行条件变量唤入等待队列之后,如果该线程被唤醒,会重新竞争锁,竞争成功后,从上次阻塞的地方继续运行下去。但是这里有一个坑,如果此时生产消费模型是 单生产多消费模型,线程唤醒方式为broadcast(全唤醒),假设此时生产者只来得及生产一个数据,但是此时所有的消费者都被唤醒。只有一个线程竞争到了锁,那么这个线程就可消费这唯一的资源,当消费完成后就会归还锁。
这没有问题,但是我还有其他线程在竞争锁啊!!此时第一个线程归还了锁之后,其他线程又抢到了锁,但是这个时候阻塞队列里确为空,空队列里pop()就会报错。这种行为我们称之为 伪唤醒。为了防止伪唤醒,我们在判空和判满的时候不能单纯的使用if判断,直接使用while循环判断就不会导致伪唤醒行为了。
2-1-3🌟main函数逻辑
以上的准备工作做好,我们就可以执手创建一个生产消费模型了:
#include "BlockQueue.hpp" #include "thread.hpp" #include <iostream> #include <string> #include <vector> #include <unistd.h>using namespace ThreadModule; int a = 10;void Consumer(BlockQueue<int> &bq)// 消费者执行自定义函数 {while(true){sleep(5);int data;bq.Pop(&data);std::cout << "Consumer consum data is: " << data << std::endl;} }void Productor(BlockQueue<int> &bq)// 生产者自定义函数 {int cnt = 10;while(true){bq.Enqueue(cnt);std::cout << "Productor product data is: " << cnt++ << std::endl;} }void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func) {for(int i = 0; i < num; ++i){std::string name = "Thread-" + std::to_string(i + 1);// 传递线程名称threads->emplace_back(func, bq, name);// 将初始化的线程信息插入进数组threads->back().Start();// 创建线程} }// 对Consumer函数进行回调 void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq) {StartComm(threads, num, bq, Consumer); }// 对Productor函数进行回调 void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq) { StartComm(threads, num, bq, Productor); }// 线程等待 void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads) {for(auto &thread : threads){thread.Join();} }int main() {BlockQueue<int> *bq = new BlockQueue<int>(5);// 阻塞队列大小为5std::vector<Thread<BlockQueue<int>>> threads;// 线程集合// 启动线程StartConsumer(&threads, 1, *bq);// 创建消费者线程,1为线程数目StartProductor(&threads, 1, *bq);// 创建生产者线程,1为线程数WaitAllThread(threads);return 0; }main函数的逻辑还是非常简单的,使用数组集合所有线程,通过StartComm函数创建线程,而StartConsumer()和StartProductor()函数用来决定创建的是生产者还是消费者。如果是生产者则回调生产者函数,如果是消费者则回调消费者函数。
2-1-4 🐈⬛运行结果及拓展
我们已将简单构建了一个单生产单消费模型,编译运行后的结果如下:
首先生产者将阻塞队列生产满,随后消费者每消费一个数据,生产者就接着生产一个数据。
当然,上述代码实在是限定死了,谁说我阻塞队列里面的值一定是int,我想要自定义类型的值或者对象呢?当然可以,假设生产者是在发配任务到队列,而消费者是取任务,再解决,那么我们就可以如下定义:
#include "Allfile.hpp"using namespace ThreadModule; int a = 10;using blockqueue_t = BlockQueue<Task>;// 替换为需要执行的任务类型,把所有函数参数类型全部替换为blockqueue_t void Consumer(blockqueue_t &bq) {}void Productor(blockqueue_t &bq) {}void StartComm(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq, func_t<blockqueue_t> func) {}void StartConsumer(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq) {}void StartProductor(std::vector<Thread<blockqueue_t>> *threads, int num, blockqueue_t &bq) {}void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads) {}int main() {blockqueue_t *bq = new blockqueue_t(5); std::vector<Thread<blockqueue_t>> threads;// 启动线程StartConsumer(&threads, 1, *bq);StartProductor(&threads, 1, *bq);WaitAllThread(threads);return 0; }将来替换任务类型的时候,直接把BlockQueue<>内容替换即可, 当然这里的任务不局限于类,也可以是函数,回调,function包装等。
2-2 🥐生产消费者模型的理解
我们知道,人人都说生产消费者模型很好的应用了线程并发,但是我们似乎并没有看到并发的场景啊?在每个线程在访问临界资源的时候都是加锁了的。也就是说,所有的线程在访问临界资源的时候是串行进行的啊。
这里的并发并不体现在存放和取数据,而 在于生产者可以并发的生产数据,在于消费者拿到数据之后可以并发的执行自己的任务。
生产消费者模型的核心要素:
- 生产者:生成数据并放入缓冲区
- 消费者:从缓冲区获取数据并处理
- 缓冲区:存储数据的中间区域
- 同步机制:保证生产者和消费者之间的协作(如互斥锁、条件变量)
该模型的本质是 "削峰填谷"和"解耦异步",通过缓冲区平衡生产和消费速度差异,同时让生产者和消费者可以独立演化。
三、😋循环队列生产消费模型
3-1 🌭POSIX 信号量
我们曾经在进程间通信中接触过信号量,当时解除的名为System V信号量,这是有关进程间通信的信号量,而今天我们要接触的是POSIX信号量,可用于线程间同步工作。
回顾一下之前有关信号量的结论:
.信号量本质是一个计数器。
2.信号量是一种对资源的预定机制。
3.对信号量加减操作称为PV操作,PV操作是 原子的。初始化信号量:
int sem_init(sem_t *sem, int pshared, unsigned int value);参数:
- pshared: 0表示线程间共享,非零表示进程间共享。
- value:信号量初始值。
销毁信号量:
int sem_destroy(sem_t *sem);等待信号量:
int sem_wait(sem_t *sem); //P操作
- 功能:等待信号量,会将信号量的值减1。
发布信号量:
int sem_post(sem_t *sem);//V()功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1。
3-2 🥨二元信号量
二元信号量,也被称为 互斥信号量 或 二值信号量,并且 只能取值为 0 或 1。POSIX信号量对于临界资源有PV操作,而二元信号量实际上是把临界资源看作整体,如此一来该资源的状态就只有 存在 或 不存在 状态了。
所以 二元信号量相当于互斥锁,同一时间确保只有一个进程或线程可以访问共享资源。
二元信号量(值只能为 0 或 1)可以用作互斥锁,控制对临界资源的访问。在循环队列中,通常需要两个计数信号量(empty 和 full)和一个互斥信号量(mutex):
- empty:记录空槽数量(初始值为队列容量)
- full:记录数据数量(初始值为 0)
- mutex:保证对队列的互斥访问
四、🤔基于环形队列的生产消费者模型
除了基于阻塞队列的生产消费者模型以外,存在另一种生产消费者模型,基于环形队列的生产消费者模型。
- 环形队列采用数组模拟,用模运算来模拟环状特性。
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
这里有一些细节需要注意,我们使用数组来模拟环形队列,为了实现环形效果,我们的每一次操作都需要对数组大小取模:
index %= N,我们知道,环形队列为空的时候头指针和尾指针指向同一个位置,而当环形队列为满的时候,环形队列的头指针和尾指针也指向同一个位置。在数据结构中,我们可以选择使用一个计数器来区别当前队列判空和判满,这样就可以区分队列是空还是满了。
而我们使用信号量来进行操作正好适用这种特性,当消费场所没有信息的时候,信号量为0,不可消费,我们就不需要使用计数器了。
当队列满时:生产者停止生产,让消费者跑。
当队列空时:消费者停止消费,让生产者跑。
消费者不能超过生产者
我们在开始编码之前先来有一个大致的概念,我们知道环形队列空间是有限的,所以空间资源应是一种信号量,而数据存储在空间内,数据资源也是一种信号量。
生产者在生产之前需要有空间进行生产,所以需要对空间资源进行P操作进行申请资源,申请成功之后,生产完毕需要释放数据资源,数据资源也就增多。同理消费者需要先申请数据资源,申请成功之后,进行消费,完毕之后,释放占用的空间资源。
我们依旧使用上面自己封装的线程类,只不过有稍稍变化:
using func_t = std::function<void(T&, std::string name)>; // typedef std::function<void(const T&)> func_t;template<typename T> class Thread { public:void Excute(){_func(_data, _threadname);}函数模版多了一个参数,函数模版参数多了一项name, 用来传递线程名称。
因为是基于环形队列的生产消费者模型,所以我们需要设计环形队列类,前面说了,环形队列是使用POSIX信号量来维护的,这里有两个信号量,空间信号量与资源信号量,我们需要时刻知道环形队列生产者和消费者的下标,以及队列的大小。
此类生产消费者模型将来可以实现多生产多消费的场景,而在之前我们提到过,生产消费者之间,以及生产者与生产者,消费者与消费者之间都存在互斥关系,为了维护它们之间的互斥关系,所以我们需要对临界区加锁。
那么向队列内插入数据就是生产者在生产,Pop就是消费者在消费。这样我们在构造时初始化锁与信号量,析构时释放锁与信号量。
#pragma once #include <iostream> #include <semaphore.h> #include <string> #include <vector> #include <pthread.h>template<typename T> class RingQueue { private:void P(sem_t &sem)// P 操作,申请资源{sem_wait(&sem);}void V(sem_t &sem)// V 操作,释放资源{sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void UnLock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RingQueue(int cap):_ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0){sem_init(&_room_sem, 0, _cap);// 空间资源拉满sem_init(&_data_sem, 0, 0);// 开始没有资源// 初始化锁pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}void Enqueue(const T& in){// 生产行为P(_room_sem);Lock(_productor_mutex);// 信号量是原子的,所以加锁可以放在信号量后面// 申请成功一定有空间_ring_queue[_productor_step++] = in;// 生产_productor_step %= _cap;UnLock(_productor_mutex);V(_data_sem);}void Pop(T* out){// 消费行为P(_data_sem);Lock(_consumer_mutex);*out = _ring_queue[_consumer_step++];// 消费_consumer_step %= _cap;UnLock(_consumer_mutex);V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);// 销毁锁pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);} private:// 环形队列属性std::vector<T> _ring_queue;int _cap;// 生产消费者下标int _productor_step;int _consumer_step;// 定义信号量sem_t _room_sem;// 空间信号量,生产者关心sem_t _data_sem;// 资源信号量,消费者关心// 定义锁,维护多生产多消费之间的互斥pthread_mutex_t _productor_mutex;pthread_mutex_t _consumer_mutex; };在进行入队列出队列操作时,有一些细节值得注意,前面我们说了,生产者在生产之前需要先申请空间资源信号量,申请成功才能开始生产,当生产结束时数据新增,进行V操作,释放数据资源。同理,消费者相返。
因为将来要支持多生产多消费,所以我们需要在生产行为和消费行为代码间加锁。如果你仔细观察了上述代码,不难发现,我们加锁是在获取信号量之后才加锁的,解锁是在释放资源前解锁的。这是因为 PV操作是原子的,所以我们可以将加锁放置其后。
至于放置后面的原因,如果先加锁在获取信号量,就如同你到了车站才着急忙慌的去买车票,那为什么我们不能提前买车票呢?等到时间到了,到车站就直接走了。就是这个道理,所以我们可以让线程先获取信号量,等竞争锁到了自己的时候直接去执行自己的代码即可。
main函数里,我们大致逻辑就不需要怎么变化了,因为我们改变的是底层逻辑:
#include "RingQueue.hpp" #include "thread.hpp" #include "Task.hpp" #include <iostream> #include <string> #include <vector> #include <ctime> #include <unistd.h>using namespace ThreadModule;using ringqueue_t = RingQueue<Task_t>;void Consumer(ringqueue_t &rq, const std::string name) {while(true){sleep(2);// 消费任务Task_t t;rq.Pop(&t);std::cout << "Consumer handler task: " << "[" << name << "]" << std::endl;t();} }void Productor(ringqueue_t &rq, const std::string name) {while(true){// 获取任务// 生产任务rq.Enqueue(Download);std::cout << "Productor : " << "[" << name << "]" << std::endl;} }void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func, const std::string who) {for(int i = 0; i < num; ++i){std::string name = "Thread-" + std::to_string(i + 1) + "-" + who;threads->emplace_back(func, rq, name);} }void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq) {InitComm(threads, num, rq, Consumer, "Consumer"); }void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq) { InitComm(threads, num, rq, Productor, "Productor"); }void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads) {for(auto &thread : threads){thread.Join();} }void StartAll(std::vector<Thread<ringqueue_t>> &threads) {for(auto & thread : threads){std::cout << "start: " << thread.name() << std::endl;thread.Start();} }int main() {ringqueue_t *rq = new ringqueue_t(5); std::vector<Thread<ringqueue_t>> threads;// 初始化线程InitConsumer(&threads, 2, *rq);InitProductor(&threads, 3, *rq);// 启动线程StartAll(threads);WaitAllThread(threads);return 0; }main函数里有了些许变化,我们在执行生产和消费任务的时候把线程名称带上了。并且我将线程启动与初始化分离开了。生产者与消费者执行函数内,我们是在执行以下任务:
#pragma once#include <iostream> #include <functional>using Task_t = std::function<void()>;void Download() {std::cout << "This is a download task" << std::endl; }
整体的逻辑还是比较简单的,与基于阻塞队列的生产消费者模型相比较,环形队列的生产消费模型采用了信号量的方式来实现多线程并发执行。阻塞队列的生产消费者模型直接使用条件变量与加锁,逻辑上就会多出很多判断。反观使用信号量,可以避免许多复杂多变的判断场景,因为 信号量本身就代表资源的多少,所以在代码实现上要简单许多。
环形队列 🆚 阻塞队列:
- 环形队列基于数组实现,空间利用率更高,适合固定大小的缓冲区
- 阻塞队列基于链表或动态数组,大小可以动态调整
- 环形队列通过取模操作实现 "环形" 特性,无需数据搬移
🗝️总结与提炼
1️⃣核心本质与价值
生产消费者模型的本质是"通过缓冲区实现供需解耦与异步协作",其核心价值在于平衡生产与消费速度、提升并发效率、降低模块耦合,是解决多线程协作问题的"通用设计模式",广泛应用于任务调度系统、消息中间件、日志收集系统等场景。
2️⃣两种实现方案对比
对比维度
基于阻塞队列的实现
基于环形队列的实现
底层存储
std::queue(链表/动态数组)
std::vector(固定大小数组)
同步机制
互斥锁 + 条件变量
POSIX信号量(计数+二元)
空间效率
动态扩容,存在搬移开销
固定容量,无搬移,空间利用率高
适用场景
容量动态变化、对空间灵活性要求高
固定容量、高吞吐量、低延迟
实现复杂度
中等(需处理条件变量的等待与通知)
中等(需理解信号量的P/V操作)
3️⃣关键技术点梳理
线程封装:核心是通过静态入口函数适配pthread_create要求,利用函数对象实现执行逻辑的灵活注入,同时做好线程生命周期管理(避免内存泄漏)。
同步机制选型:①互斥锁/二元信号量:解决临界资源的并发访问冲突,确保同一时间只有一个线程操作核心数据;②条件变量/计数信号量:解决线程间的状态协同,实现"满则阻塞生产、空则阻塞消费"的逻辑。
缓冲区设计:阻塞队列注重灵活性,环形队列注重效率,需根据业务场景(容量是否固定、吞吐量要求)选择;核心是保证缓冲区的线程安全与状态可控。
4️⃣实践启示与注意事项
线程安全是底线:任何涉及多线程访问的资源(如队列、指针、计数器)都必须通过互斥锁或信号量保护,避免并发读写导致的数据错乱或崩溃。
避免永久阻塞:实际开发中可给条件变量或信号量增加超时机制(如pthread_cond_timedwait),防止因生产者/消费者异常导致的线程永久阻塞。
容量规划要合理:缓冲区容量过小易导致频繁阻塞,过大则浪费内存;需根据生产消费速度差、业务峰值流量合理设定(如通过压测确定最优容量)。
多生产多消费优化:当存在多个生产者或消费者时,可通过细粒度锁(如分段锁)或无锁设计(如CAS原子操作)进一步提升并发性能,但需注意避免锁竞争或ABA问题。
5️⃣核心收获
掌握生产消费者模型,不仅是掌握一种编程技巧,更是理解"异步协作"与"资源隔离"的设计思想。无论是阻塞队列还是环形队列,其核心都是通过合理的同步机制与缓冲区设计,平衡供需关系、解耦模块依赖。在实际开发中,需灵活选择实现方案,结合业务场景优化同步策略,才能设计出高并发、高可靠、易扩展的系统,这也是从中级程序员向高级程序员进阶的核心能力之一。
结束语
以上是我对于【Linux文件系统】线程:生产与消费者模型的理解
感谢您的三连支持!!!










