生产者消费者模型
简述生产者消费者模型
生产者消费者模型是操作系统中一种重要的模型,主要描述的是等待和通知的机制。其是通过一个容器来解决生产者与消费者的强耦合关系,生产者与消费者之间不直接进行通讯,而是利用容器来进行通讯。
- 在这个模型中,生产者负责生产数据或物品,并将其放入一个共享的缓冲区或仓库中;消费者则从该缓冲区或仓库中取出数据或物品进行消费。
- 这个模型的关键在于平衡生产者和消费者的处理能力,确保数据或物品的流畅生产和消费。
- 其中的交易场所是进行生产消费 的容器,起到了平衡生产和消费速率的作用。通常是一种特定的 缓冲区,常见的有 阻塞队列 和 环形队列。
生产者消费者模型的本质:忙闲不均
生产者消费者模型特点
生产者消费者模型最根本的特点是:321原则
三种关系
在生产者消费者模型中,无论是生产者还是消费者,他们都需要面向共享的缓冲区或仓库进行操作。当多个生产者或消费者试图同时访问和操作这个共享资源时,为了避免数据冲突或不一致,就需要使用互斥机制来确保在任意时刻只有一个生产者或消费者能够访问该资源。同时,生产者和消费者之间还需要保持同步,以确保当缓冲区满时生产者停止生产并等待,当缓冲区空时消费者停止消费并等待。
在模型中,生产者和生产者之间、消费者和消费者之间以及生产者和消费者之间都存在互斥关系或者同步关系。
- 生产者与生产者:互斥
对于生产者之间,他们可能会同时尝试将产品放入缓冲区。如果缓冲区容量有限,多个生产者之间的竞争就会导致互斥关系的产生。
- 消费者与消费者:互斥
同样,对于消费者之间,当缓冲区中的产品数量有限时,多个消费者可能会同时尝试从中取出产品,这也产生了互斥关系。
- 生产者与消费者:同步、互斥
- 互斥关系主要源于生产者和消费者都需要访问共享的资源,即缓冲区。
- 同步关系则是基于生产者和消费者之间的依赖关系。生产者生产产品并将其放入缓冲区,消费者从缓冲区取出产品进行消费。这意味着消费者必须等待生产者生产并放入产品后才能进行消费,即生产者必须先于消费者进行。这种先后顺序的依赖关系构成了同步关系。
注意: 生产者与消费者之间的互斥关系不是必备的,目的是为了让生产、消费之间存在顺序。
生产者消费者模型 是一个存在 生产者、消费者、交易场所 三个条件,以及不同角色间的 同步、互斥 关系的高效模型。
俩种角色
-生产者
-消费者
一个交易场所
通常是一个特定的缓冲区(阻塞队列、环形队列)
生产者消费者模型必备特点
生产者与消费者间的同步关系:
- 生产者不断生产,交易场所堆满商品后,需要通知消费者进行消费。
- 消费者不断消费,交易场所为空时,需要通知生产者进行生产。
通知线程需要用到条件变量,即维护 同步 关系。
备注:
管道本质上就是一个天然的生产者消费者模型,因为它允许多个进程同时访问,并且不会出现问题,意味着它维护好了 互斥、同步 关系;当写端写满管道时,无法再写,通知读端进行读取;当管道为空时,无法读取,通知写端写入数据。
生产者消费者模型的优点
-
解耦合:通过引入缓冲区作为中介,生产者消费者模型降低了生产者和消费者之间的耦合度。这使得生产者和消费者可以独立地进行设计和优化,提高了系统的灵活性和可扩展性。
-
并发性:生产者消费者模型允许生产者和消费者作为两个独立的并发主体运行,互不干扰。这种并发性可以充分利用系统资源,提高系统的吞吐量和效率。
-
平衡性:当生产者和消费者的处理速度不一致时,缓冲区可以起到平衡作用。如果生产者速度快于消费者,缓冲区可以暂存多余的产品;反之,如果消费者速度快于生产者,缓冲区可以提供产品以供消费。这种平衡性确保了系统的稳定性和可靠性。
-
可扩展性:生产者消费者模型可以方便地扩展生产者和消费者的数量。当需要增加生产或消费能力时,只需添加相应的生产者或消费者即可,无需对整个系统进行大规模修改。
-
简化编程:通过使用生产者消费者模型,程序员可以更加专注于生产者和消费者的核心逻辑,而无需过多关注它们之间的同步和互斥问题。这有助于简化编程工作,降低出错的可能性。
生产者消费者模型高效原因:
- 生产者、消费者 可以在同一个交易场所中进行操作。
- 生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置。
- 消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据。
- 可以根据不同的策略,调整生产者于与消费者间的协同关系。
- 生产者消费者模型 可以根据供需关系灵活调整策略,做到 忙闲不均。
生产者消费者模型通过解耦合、并发性、平衡性、可扩展性和简化编程等优点,提高了系统的性能、稳定性和可靠性,降低了开发和维护的复杂度。这使得它在多线程编程、并发控制、资源管理等领域得到了广泛应用。
基于阻塞队列实现生产者消费者模型
- 阻塞队列 Blocking Queue 是一种特殊的队列,作为队列家族的一员,它具备 先进先出 FIFO 的基本特性,与普通队列不同的是: 阻塞队列的大小是固定的,也就说它存在容量的概念。
- 阻塞队列可以为空,也可以为满
将其带入 「生产者消费者模型」 中,入队 就是 生产商品,而出队则是 。‘消费商品
- 阻塞队列为满时:无法入队 -> 无法生产(阻塞)
- 阻塞队列为空时:无法出队 -> 无法消费(阻塞)
设计思路
设计成员变量
设计成员变量1:阻塞队列_queue(交易场所)、阻塞队列容量_cap
- 如何判断阻塞队列是否为空:判断_queue是否为空
- 如何判断阻塞队列是否为满:判断 _
queue
的大小是否为_cap
设计成员变量2:选择使用几个互斥锁?
- 由于无论是生产者 还是 消费者 ,它们需要看到同一个阻塞队列,因此使用一把互斥锁进行保护就行了。
设计成员变量3:选择几个条件变量?
- 在 生产者消费者模型 中,有 满、空 两个条件,这两个条件是 绝对互斥 的,不可能同时满足, 生产者关心 是否为满,消费者 关心是否为空,两者关注的点不一样,也就是说不能只使用一个条件变量来控制两个条件,而是需要 一个生产者条件变量、一个消费者条件变量
设计生产函数push
线程安全的实现
-
使用
pthread_mutex
互斥锁来保护共享资源_queue
。互斥锁确保同一时间只有一个线程可以访问队列,避免了数据竞争和不一致的状态。 -
在
push
方法中,通过pthread_mutex_lock
和pthread_mutex_unlock
对共享资源进行加锁和解锁,确保线程安全。
容量限制
-
通过
_cap
定义队列的最大容量,避免队列无限增长。 -
在
push
方法中,通过Isfull
函数检查队列是否已满。如果队列已满,生产者线程会进入等待状态,直到消费者线程释放空间。
条件变量的使用
使用 pthread_cond_wait
条件变量来实现生产者和消费者之间的协作。
-
当队列满时,生产者线程会调用
pthread_cond_wait
,释放锁并进入等待状态。 -
当消费者线程从队列中取出数据后,会唤醒等待的生产者线程,允许其继续向队列中添加数据。
及时通知消费者
-
在生产者线程每次向队列中添加数据后,调用
pthread_cond_signal
唤醒消费者线程。 -
这种机制确保消费者线程能够及时得知队列中有新数据,避免了消费者线程的空等待。
设计消费函数pop
线程安全的实现
-
使用
pthread_mutex_lock
和pthread_mutex_unlock
对共享资源_queue
进行加锁和解锁,确保同一时间只有一个线程可以访问队列。 -
这种互斥锁机制防止了多线程环境下的数据竞争和不一致状态。
队列为空的处理
-
通过
Isempty
方法检查队列是否为空。如果队列为空,消费者线程会调用pthread_cond_wait
,释放锁并进入等待状态。 -
这种机制确保消费者线程不会在队列为空时尝试取出数据,避免了空操作和错误。
条件变量的协作
-
当队列为空时,消费者线程调用
pthread_cond_wait
,等待生产者线程向队列中添加数据。 -
当队列中有数据时,生产者线程会通过
pthread_cond_signal
唤醒等待的消费者线程,允许其继续从队列中取出数据。
及时通知生产者
-
在消费者线程每次从队列中取出数据后,调用
pthread_cond_signal
唤醒生产者线程。 -
这种机制确保生产者线程能够及时得知队列中有空间可以继续添加数据,避免了生产者线程的空等待。
#pragma once #include<queue> #include<mutex> #include<pthread.h> #include<iostream> #define BQ_SIZE 5 template<class T> class MyBlockQueue { public: MyBlockQueue(size_t cap=BQ_SIZE) :_cap(cap) { //初始化锁和条件变量 pthread_mutex_init(&_mtx,nullptr); pthread_cond_init(&_pro_cond,nullptr); pthread_cond_init(&_con_cond,nullptr); } ~MyBlockQueue() { //销毁锁和条件变量 pthread_mutex_destroy(&_mtx); pthread_cond_destroy(&_pro_cond); pthread_cond_destroy(&_con_cond); } //生产数据(入队) void push(const T& indata) { pthread_mutex_lock(&_mtx); while(isfull()) { pthread_cond_wait(&_pro_cond,&_mtx); } _queue.push(indata); // 可以加一些策略,比如生产了一半就唤醒消费者 //pthread_cond_broadcast(&_con_cond); pthread_cond_signal(&_con_cond); pthread_mutex_unlock(&_mtx); } //消费数据(出队) void pop(T* outdata) { // 加锁解锁 pthread_mutex_lock(&_mtx); // 容量为空就等待 while(Isempty()) { pthread_cond_wait(&_con_cond, &_mtx); } *outdata = _queue.front(); _queue.pop(); // 可以加一些策略,比如消费了一半就唤醒生产者 pthread_cond_signal(&_pro_cond); //pthread_cond_broadcast(&_pro_cond); pthread_mutex_unlock(&_mtx); } pthread_mutex_t* getMutex() { return &_mtx; } private: //判断队列是否为满 bool isfull() { return _queue.size()==_cap; } //判断队列是否为空 bool Isempty() { return _queue.empty(); } private: std::queue<T> _queue; size_t _cap; //无论是生产者 还是 消费者 ,它们需要看到同一个阻塞队列,因此使用一把互斥锁进行保护 pthread_mutex_t _mtx; //在 生产者消费者模型 中,有 满、空 两个条件, //这两个条件是 绝对互斥 的,不可能同时满足, 生产者关心 是否为满,消费者 关心是否为空, //两者关注的点不一样,也就是说不能只使用一个条件变量来控制两个条件, //而是需要 一个生产者条件变量、一个消费者条件变量 pthread_cond_t _pro_cond;//条件变量 pthread_cond_t _con_cond;//条件变量 };
备注:
如果在多线程环境中使用
if
而不是while
来检查条件,可能会导致以下问题:
- pthread_cond_wait 函数可能调用失败(误唤醒、伪唤醒),此时如果是 if 就会向后继续运行,导致在条件不满足的时候进行了 生产/消费。
- 在多线程场景中,可能会使用 pthread_cond_broadcast 唤醒所有等待线程,如果在只生产了一个数据的情况下,唤醒所有线程,会导致只有一个线程进行了合法操作,其他线程都是非法操作了。
生产者消费者模型的高效体现在解耦
- 生产、消费 的过程是加锁的、串行化执行,可能有的人无法 get 到 生产者消费者模型 的高效,这是因为没有对 生产者消费者模型 进行一个全面的理解。
- 单纯的向队列中放数据、从队列中取数据本身效率就很高,但 生产者从某种渠道获取数据、消费者获取数据后进行某种业务处理,这是效率比较低的操作,生产者消费者模型 做到了这两点。
消费者在进行业务处理时,生产者可以直接向队列中 push 数据
- 比如 消费者 在获取到数据后,需要进行某种高强度的运算,当然这个操作与 生产者 是没有任何关系的,得益于 阻塞队列 作为缓冲区,生产者 可以在 消费者 进行运算时 push 数据,避免生产者等待消费者处理完才能继续生产的情况。。
生产者在进行数据生产时,消费者可以直接向队列中 pop 数据
- 消费者 不需要关心 生产者 的状态,只要 阻塞队列 中还有数据,正常 pop 获取就行了,避免空闲等待;
总结:生产者不必关心消费者的消费情况,消费者也不需要关心生产者的生产情况,而这就是生产者消费者模型高效的体现,也是对模型的全面理解。
基于环形队列实现生产者消费者模型
前言知识->POOSIX信号量
互斥、同步 不只能通过 互斥锁、条件变量 实现,还能通过 信号量 sem、互斥锁 实现(出自 POSIX 标准)。
「信号量」 的本质就是一个 计数器
- 申请到资源,计数器 --(P 操作)
- 释放完资源,计数器 ++(V 操作)
信号量 的 PV 操作都是原子的,假设将 信号量 的值设为 1,用来表示 生产者消费者模型 中 阻塞队列 _queue 的使用情况
- 当 sem 值为 1 时,线程可以进行 「生产 / 消费」,sem --
- 当 sem 值为 0 时,线程无法进行 「生产 / 消费」,只能阻塞等待
此时的 信号量 只有两种状态:1、0,可以实现类似 互斥锁 的效果,即实现 线程互斥,像这种只有两种状态的信号量称为 二元信号量。
信号量 不止可以用于 互斥,它的主要目的是 描述临界资源中的资源数目,比如我们可以把 阻塞队列 切割成 N 份,初始化 信号量 的值为 N,当某一份资源就绪时,sem--,资源被释放后,sem++,如此一来可以像 条件变量 一样实现 同步:
- 当 sem == N 时,阻塞队列已经空了,消费者无法消费。
- 当 sem == 0 时,阻塞队列已经满了,生产者无法生产。
用来实现 互斥、同步 的信号量称为 多元信号量
综上所述,在使用 多元信号量 访问资源时,需要先申请 信号量,只有申请成功了才能进行资源访问,否则会进入阻塞等待,即当前资源不可用
在实现 互斥、同步 时,该如何选择?
结合业务场景进行分析,如果待操作的共享资源是一个整体,比较适合使用 互斥锁+条件变量 的方案,但如果共享资源是多份资源,使用 信号量 就比较方便。申请信号量实际是一种资源预订机制
- 如果将 信号量 实际带入我们之前写的 生产者消费者模型 代码中,是不需要进行资源条件判断的,因为 信号量 本身就已经是资源的计数器了。
信号量相关函数
函数 返回值 参数 备注 int sem_init(sem_t *sem, int pshared, unsigned int value);
成功 0
,失败-1
, 并设置错误码
sem
:指向信号量对象的指针。这个信号量对象将被初始化。
pshared
:指定信号量是否在多个进程间共享。
如果
pshared
为 0,则信号量只能在同一个进程内的线程之间共享。如果
pshared
不为 0,则信号量可以在多个进程之间共享。
value
:信号量的初始值。这个值通常表示可用资源的数量。int sem_destroy(sem_t *sem);
参数:待销毁的信号量 int sem_wait(sem_t *sem);
sem
:指向信号量对象的指针。
sem_wait
是一个阻塞函数,用于等待信号量的值大于零。当信号量的值大于零时,sem_wait
会将信号量的值减一,然后继续执行。int sem_trywait(sem_t *sem); sem
:指向信号量对象的指针。sem_trywait
是一个非阻塞函数,用于尝试等待信号量的值大于零。如果信号量的值大于零,sem_trywait
会将信号量的值减一并继续执行;如果信号量的值为零,函数会立即返回错误。
环形队列
生产者消费者模型 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列,所谓的 环形队列 并非 队列,而是用数组模拟实现的 “队列”, 并且它的 判空、判满 比较特殊。
如何让 环形队列 “转” 起来?
- 可以通过取模的方式(可以重复获取一段区间值),确定下标
环形队列 如何判断当前为满、为空?
策略一:多开一个空间,head、tail 位于同一块空间中时,表示当前队列为空;在进行插入、获取数据时,都是对下一块空间中的数据进行操作,因为多开了一块空间,当待生产的数据落在 head 指向的空间时,就表示已经满了:
策略二:参考阻塞队列,搞一个计数器,当计数器的值为 0 时,表示当前为空,当计数器的值为容量时,表示队列为满:
这两种策略都可以确保 环形队列 正确判空和判满,以下是选择策略二,因为信号量本身就是一个天然的计数器,可以通过信号量去获取信息。
游戏场景:环形队列的具象化
想象一个大圆桌,上面摆满了空盘子,每个盘子可以放一个苹果。张三和李四站在圆桌旁边,准备开始一场追逐游戏。
角色设定
李四(被追逐方):生产者,负责往盘子里放苹果,然后顺时针移动到下一个空盘子。
张三(追逐方):消费者,负责从盘子里拿苹果,然后顺时针移动到下一个盘子。
规则:两人每次只能移动一格,不能跳过对方或空盘子。
游戏开始:初始状态
初始位置:张三和李四站在同一个盘子前,圆桌上没有苹果。
问题:两人处于同一位置,无法同时行动。
解决:李四(生产者)先行动,因为他是被追逐方,需要先放苹果。
结果:李四放了一个苹果,然后移动到下一个空盘子。
张三开始追逐:
张三看到盘子里有苹果,拿走苹果,然后移动到下一个盘子。
两人继续顺时针移动,李四不断放苹果,张三不断拿苹果。
正常运行:并发操作
状态:圆桌上有一些苹果,但没有满。
行为:
李四:每次移动到一个空盘子,放一个苹果。
张三:每次移动到一个有苹果的盘子,拿走苹果。
效率:两人可以同时操作,互不干扰,效率极高。
特殊情况:相遇时的处理
情况1:圆桌为空
状态:所有盘子都是空的。
行为:
张三(消费者)无法拿苹果,必须阻塞等待。
李四(生产者)继续放苹果,直到圆桌上有了苹果。
规则:空队列时,消费者阻塞,生产者优先行动。
情况2:圆桌为满
状态:所有盘子里都有苹果。
行为:
李四(生产者)无法放苹果,必须阻塞等待。
张三(消费者)继续拿苹果,直到圆桌上有了空盘子。
规则:满队列时,生产者阻塞,消费者优先行动。
游戏高潮:张三摔倒了
张三摔倒:张三的速度变慢,无法及时拿走苹果。
李四加速:李四继续放苹果,很快圆桌上摆满了苹果。
相遇:李四追上了张三,圆桌上全是苹果。
处理:
张三(消费者)继续拿苹果,李四(生产者)阻塞等待。
张三拿走苹果后,李四恢复生产。
游戏总结:环形队列的运作模式
空队列时:
消费者(张三)阻塞,生产者(李四)优先行动。
李四放苹果后,张三可以继续消费。
满队列时:
生产者(李四)阻塞,消费者(张三)优先行动。
张三拿走苹果后,李四可以继续生产。
正常状态(非空非满):
生产者和消费者可以并发运行,互不干扰,效率最高。
具象化总结
李四(生产者):负责放苹果,不能套圈张三。
张三(消费者):负责拿苹果,不能超过李四。
圆桌(环形队列):空盘子表示可以生产,苹果表示可以消费。
规则:只有在空或满时需要特殊处理,其他情况下可以并发运行。
环形队列运作模式
- 环形队列为空时:消费者阻塞,只能由生产者进行生产,生产完商品后,消费者可以消费商品
- 环形队列为满时:生产者阻塞,只能由消费者进行消费,消费完商品后,生产者可以生产商品
- 其他情况:生产者、消费者并发运行,各干各的事,互不影响
将 环形队列 的运行模式带入 生产者消费者模型
可以使用 信号量 标识资源的使用情况,但生产者和消费者关注的资源并不相同,所以需要使用两个 信号量 来进行操作:
- 生产者信号量:标识当前有多少可用空间
- 消费者信号量:标识当前有多少数据
生产者信号量初始值为环形队列的大小,消费者信号量初始值为 0,
无论是生产者还是消费者,只有申请到自己的 信号量 资源后,才进行 生产 / 消费
上图中的 pro_sem
就表示 生产者还可以进行 3
次生产,con_sem
表示 消费者还可以消费 5
次。
设计思路
设计成员变量
- 设计成员变量1:交易场所(数组模拟环形队列)_ring、队列容量_cap
- 设计成员变量2:生产者信号量_pro_sem(表示可用空间大小)、消费者信号量_con_sem(表示可消费商品数量)
- 设计成员变量3:当前生产者下标_pro_step(表示生产者当前应该插入数据的位置。生产者每次插入数据后,指针会顺时针移动到下一个位置(通过取模运算
_pro_step %= _cap
实现))、当前消费者下标_con_step(表示消费者当前应该取出数据的位置。消费者每次取出数据后,指针会顺时针移动到下一个位置(通过取模运算_con_step %= _cap
实现)) - 设计成员变量4:生产者互斥锁_pro_mtx(用于保护生产者的操作,确保生产者在插入数据时不会与其他线程冲突)、消费者互斥锁_con_mtx(用于保护消费者的操作,确保消费者在取出数据时不会与其他线程冲突)
设计生产函数push
-
信号量检查:生产者通过
P(&_pro_sem)
检查队列是否有空间。 -
锁定队列:通过
lock(&_pro_mtx)
确保插入操作的线程安全。 -
插入数据:将数据插入到
_ring
的当前_pro_step
位置,并更新_pro_step
。 -
循环指针:通过
_pro_step %= _cap
确保指针在队列范围内循环。 -
解锁队列:通过
unlock(&_pro_mtx)
释放对队列的独占访问权。 -
通知消费者:通过
V(&_con_sem)
通知消费者队列中有了新数据(新商品)。
设计消费函数pop
-
信号量检查:消费者通过
P(&_con_sem)
检查队列中是否有数据。 -
锁定队列:通过
lock(&_con_mtx)
确保取出操作的线程安全。 -
取出数据:从
_ring
的当前_con_step
位置取出数据,并更新_con_step
。 -
循环指针:通过
_con_step %= _cap
确保指针在队列范围内循环。 -
解锁队列:通过
unlock(&_con_mtx)
释放对队列的独占访问权。 -
通知生产者:通过
V(&_pro_sem)
通知生产者队列中有了空位。
#pragma once
#include<vector>
#include<semaphore.h>
#define RQ_CAP 5
template<class T>
class MyRingQueue
{
public:
MyRingQueue(size_t cap=RQ_CAP)
:_ring(cap),_cap(cap),_con_step(0),_pro_step(0)
{
sem_init(&_pro_sem,0,_cap);
sem_init(&_con_sem,0,0);
// 初始化互斥锁
pthread_mutex_init(&_pro_mtx, nullptr);
pthread_mutex_init(&_con_mtx, nullptr);
}
~MyRingQueue()
{
sem_destroy(&_pro_sem);
sem_destroy(&_con_sem);
// 销毁互斥锁
pthread_mutex_destroy(&_pro_mtx);
pthread_mutex_destroy(&_con_mtx);
}
void push(const T& indata)
{
P(&_pro_sem);
lock(&_pro_mtx);
_ring[_pro_step++]=indata;
_pro_step%=_cap;
unlock(&_pro_mtx);
V(&_con_sem);
}
void pop(T* outdata)
{
P(&_con_sem);
lock(&_con_mtx);
*outdata=_ring[_con_step++];
_con_step%=_cap;
unlock(&_con_mtx);
V(&_pro_sem);
}
pthread_mutex_t* getMutex()
{
return &_mtx;
}
private:
void P(sem_t* sem)
{
sem_wait(sem);
}
void V(sem_t* sem)
{
sem_post(sem);
}
void lock(pthread_mutex_t *lock)
{
pthread_mutex_lock(lock);
}
void unlock(pthread_mutex_t *lock)
{
pthread_mutex_unlock(lock);
}
private:
std::vector<T> _ring;
size_t _cap;
sem_t _pro_sem;
sem_t _con_sem;
size_t _pro_step;
size_t _con_step;
pthread_mutex_t _mtx;//对外部设置的锁
pthread_mutex_t _pro_mtx;// 生产者锁
pthread_mutex_t _con_mtx;// 消费者锁
};