【Linux】系统部分——线程同步与生产者消费者模型
29.线程同步与生产者消费者模型
文章目录
- 29.线程同步与生产者消费者模型
- 比喻说明问题
- 同步的概念
- 条件变量
- 条件变量的使用
- 条件变量的封装
- 生产者消费者模型
- 模型概念
- 模型的生活举例
- 生产者与消费者的三种关系分析
- 基于BlockingQueue的⽣产者消费者模型
- 阻塞队列的实现
- 关于模型的效率问题
- 基于环形队列的生产者消费者模型
- POSIX信号量
- 使用
- 模型原理
- 环形队列实现
比喻说明问题
互斥锁在单纯互斥场景(如抢票)中可以有效解决线程安全问题,但并不能解决所有问题。通过一个自习室的例子说明互斥锁的局限性:自习室每次只允许一人进入,钥匙挂在墙上供竞争。一个勤奋的学生凌晨四点拿到钥匙进入自习室,自习五小时后因饥饿短暂离开,但担心回来后无法再次进入,于是反复归还和重新获取钥匙。这种行为导致其他等待的学生长时间无法获取钥匙,形成“饥饿问题”。类似地,在代码中,如果线程在没有资源时仍频繁申请和释放锁,会导致锁竞争激烈,降低系统效率。尽管这种行为并未违反互斥锁的规则,但实际效率低下,浪费资源。
校领导通过引入排队规则优化自习室钥匙的分配:等待的学生必须排队,归还钥匙的学生不能立即重新申请,必须排到队尾。这一规则避免了同一人反复占用钥匙,提高了资源分配的公平性。类似地,在代码中可以通过条件变量或其他同步机制优化锁竞争,避免线程无效占用锁资源。规则调整后,尽管勤奋学生仍能优先获取钥匙,但必须遵循排队机制,确保其他学生也有机会使用自习室。这种优化既保留了互斥锁的安全性,又提高了资源利用效率。
在这个例子中,学生需要归还钥匙并重新排队,确保自习室内部互斥访问,超级自习室被视为临界资源,多个人可以访问该资源,但必须保证互斥性。而引入排队机制,同步的意义在于让线程执行具有一定的顺序性。
同步的概念
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
条件变量
条件变量可以实现线程之间的同步。条件变量的原理是通过让线程在特定条件下等待和唤醒,实现线程间的同步。条件变量需要与互斥锁配合使用,以确保线程安全。线程在条件不满足时进入等待状态,直到被其他线程唤醒。唤醒可以是单个线程或所有线程。
条件变量的使用
-
条件变量类型:
pthread_cond_t
-
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
- cond:要初始化的条件变量(指针)
- attr: 指向条件变量属性对象的指针。如果传递
NULL
,则使用默认的条件变量属性 restrict
是C语言中的一个关键字,用于向编译器指示指针所指向的内存区域不会通过其他指针被意外修改,从而帮助编译器进行优化
-
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
-
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
- cond:要初始化的条件变量(指针)
- mutex:指向互斥量的指针,需要结合互斥量使用
-
唤醒
int pthread_cond_broadcast(pthread_cond_t *cond);//将在cond条件变量下等待的线程全部唤醒 int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个在cond条件变量下等待的线程
-
使用举例
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *active(void *arg) {std::string name = static_cast<const char *>(arg);while (true){pthread_mutex_lock(&mutex);// 没有对于资源是否就绪的判定pthread_cond_wait(&cond, &mutex); // mutex??printf("%s is active!\n", name.c_str());pthread_mutex_unlock(&mutex);} }int main() {pthread_t tid1, tid2, tid3;pthread_create(&tid1, nullptr, active, (void *)"thread-1");pthread_create(&tid2, nullptr, active, (void *)"thread-2");pthread_create(&tid3, nullptr, active, (void *)"thread-3");sleep(1);printf("Main thread ctrl begin...\n");while (true){printf("main wakeup thread...\n");// pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);//主线程定期唤醒其他线程sleep(1);}pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr); }
运行结果:
user@iZ7xvdsb1wn2io90klvtwlZ:~/lesson32_2/test$ ./testCond Main thread ctrl begin... main wakeup thread... thread-1 is active! thread-2 is active! thread-3 is active! main wakeup thread... thread-3 is active! thread-2 is active! thread-1 is active! ^C
条件变量的封装
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace My_Cond
{class Cond{public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(My_Mutex::Mutex &mutex){pthread_cond_wait(&_cond, mutex.MutexPtr());}void Weak(){pthread_cond_signal(&_cond);}void WeakAll(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};} // namespace My_Cond
生产者消费者模型
模型概念
⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的
有点:
- 解耦
- 支持并发
- 支持忙闲不均
模型的生活举例
现实生活中存在许多生产者消费者模型的例子,其中最典型的是超市场景。在超市中,普通消费者购买商品,而超市本身并不制造商品。超市的商品来源于背后的工厂或制造商。工厂为超市供货,超市批量采购后存放商品,消费者再从超市购买。这样就形成了工厂、超市和消费者三方的生产消费模型。从现实角度来看,超市的存在起到了连接生产者和消费者的桥梁作用。超市作为一个中间环节,解决了生产者和消费者之间直接交易的不便。工厂专注于生产,消费者专注于消费,超市则负责商品的集中展示和销售。这种分工模式提高了效率,是生产者消费者模型的典型体现。
从计算机科学角度看,超市本质上是一种缓存机制:对工厂而言是写缓存(批量供货),对消费者而言是读缓存(随时购买)。缓存的存在显著提高了系统效率。
-
解耦:
当超市更换方便面供应商时,只要产品质量和数量保持一致,消费者购买行为不受影响。同样,超市可以自由调整目标客户群体(如从学生转向职场人士),而不影响工厂的生产计划。
-
支持并发
可能有多个生产者线程(如4-5家工厂)和更多消费者线程同时访问共享缓冲区。当生产者正在向缓冲区写入数据时,消费者尝试读取可能导致数据不一致(如不确定商品是否已上架)。这种多生产者多消费者场景需要精确的同步机制来保护共享资源
-
支持忙闲不均
超市作为缓冲区的角色支持"忙闲不均"的运营模式:当需求旺盛时增加供货,需求疲软时减少生产并促销。
生产者与消费者的三种关系分析
在超市模型中,存在三种角色关系:生产者与生产者、消费者与消费者、生产者与消费者。
- 生产者与生产者之间是典型的互斥关系,当一方在向超市供货时,另一方不能同时供货,必须等待当前生产者完成供货后才能进行自己的供货操作。这种互斥关系源于超市作为一个整体资源的限制,比如有限的展位空间。
- 消费者与消费者之间在一般情况下看似没有直接关系,但当资源极度稀缺时(如只剩一包方便面),就会转变为竞争关系,即互斥关系。在计算机术语中,这种关系表现为一方获取资源后另一方就不能同时获取。
- 生产者和消费者之间需要维持互斥关系,以防止数据不一致的问题。当生产者正在向超市放置商品时,消费者不能同时进行购买操作,反之亦然。这种互斥关系保证了共享资源(超市)的安全性。此外,生产者和消费者之间还需要建立同步机制,通过通知方式避免无效的轮询检查,提高系统效率。生产者可以通知消费者商品已到货,消费者可以通知生产者货架有空位,这种同步关系与互斥关系共同构成了完整的生产者-消费者模型。
基于BlockingQueue的⽣产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
阻塞队列的实现
代码:
#pragma once#include <iostream>
#include <pthread.h>
#include <queue>static int qmsize = 10;namespace BlockQueueModule
{template <class T>class BlockQueue{private:bool IfFull() { return _q.size() == _msize; }bool IfEmpty() { return _q.empty(); }public:BlockQueue(int msize = qmsize) : _msize(msize){_cwait_num = _pwait_num = 0;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_producter, nullptr);pthread_cond_init(&_consumer, nullptr);}void Push(const T &data){pthread_mutex_lock(&_mutex);while (IfFull()) //使用while防止出现错误{std::cout << "producter等待中..." << std::endl;_pwait_num++;pthread_cond_wait(&_producter, &_mutex);_pwait_num--;std::cout << "producter被唤醒..." << std::endl;}_q.push(data);if(_cwait_num) //唤醒等待的consumer线程{pthread_cond_signal(&_consumer);}pthread_mutex_unlock(&_mutex);}void Pop(T &data){pthread_mutex_lock(&_mutex);while (IfEmpty()) //使用while防止出现错误{std::cout << "consumer等待中..." << std::endl;_cwait_num++;pthread_cond_wait(&_consumer, &_mutex);_cwait_num--;std::cout << "consumer被唤醒..." << std::endl;}data = _q.front();_q.pop();if(_pwait_num) //唤醒等待的producter线程{pthread_cond_signal(&_producter);}pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_producter);pthread_cond_destroy(&_consumer);}private:std::queue<T> _q;int _msize;pthread_mutex_t _mutex;pthread_cond_t _producter;pthread_cond_t _consumer;int _pwait_num;//目前在条件变量_producter中等待的线程个数int _cwait_num;//目前在条件变量_consumer中等待的线程个数};
}
说明:
-
使用queue作为底层容器保存数据,实现先进先出特性。阻塞队列与普通队列的区别在于为空为满时的阻塞行为,普通队列不考虑这些情况。
-
阻塞队列的创建通过new操作完成
-
临界区操作包括队列状态判断和数据存取,都需要在持有锁的情况下进行。判断队列是否为空或满属于临界区操作,因为涉及共享资源的访问。pthread_cond_wait必须在临界区内调用,因为它是基于判断结果决定是否等待的。等待时会自动释放锁,允许其他线程进入临界区。唤醒后会重新获取锁,保证临界区的原子性。生产者和消费者必须使用同一把锁来维持互斥关系。
- 这也是为什么
pthread_cond_wait
函数需要传入互斥量指针的原因,如果进程需要在条件变量处等待,这个线程需要释放锁给其他线程用。当这个线程等待完成之后,需要再次申请锁继续访问临界资源。 - 总结:线程同步机制中,条件变量用于线程等待特定条件满足。当线程调用条件变量等待时,会自动释放持有的互斥锁,并进入等待状态。线程被唤醒后,必须重新获取锁才能继续执行。这个机制确保线程在临界区内安全地等待和唤醒。
- 这也是为什么
-
创建生产者和消费者线程时无法确定哪个线程会先运行,这由系统调度决定。但代码设计确保了即使消费者先运行也会因队列为空而自动等待,实现线程同步。通过条件变量可以控制线程运行的先后顺序。生产者快速生产时队列会在短时间内填满,导致生产者必须等待并唤醒消费者。消费者消费一条数据后又会唤醒生产者,形成交替执行的同步过程。
-
线程在等待过程中可能因某些原因被无故唤醒,即使条件并未满足,这种现象被称为伪唤醒。伪唤醒会导致线程在不满足条件的情况下被唤醒,进而引发问题。例如,生产者生产一个数据后,不管是否有消费者在等待,都会发送唤醒信号,可能导致消费者线程被大量唤醒,但只有少量线程能成功消费数据。如果使用broadcast唤醒所有线程,只有一个线程能成功消费数据,其他线程会因条件不满足而出错。为了防止伪唤醒,建议在使用条件变量时用while循环代替if判断条件。while循环会在每次唤醒后重新检查条件,确保条件真正满足后才继续执行,从而有效避免伪唤醒问题。
关于模型的效率问题
生产消费模型整体基于互斥机制实现,通过锁保证线程安全。基于BlockingQueue的生产者和消费者访问阻塞队列时都是互斥的,同一时间只能有一个生产者或消费者操作队列。这种设计导致访问是串行的,引发对模型优势的疑问。虽然数据放入和取出过程是串行的,但生产数据的来源和处理可以并行。生产者准备数据的过程可以并发执行,只有放入队列时需要互斥。消费者处理任务时也可以并发,只有取任务时需要互斥。这种设计实现了生产消费的解耦,支持忙闲不均的场景,提高了系统整体效率。互斥只发生在关键资源访问时,其他环节可以并行。
基于环形队列的生产者消费者模型
POSIX信号量
之前在进程间通信了解了SystemV信号量的使用,复习一下:信号量是一个计数器,用于表明临界资源中资源数目的多少。申请信号量是对资源的预订机制,如买电影票预订座位。信号量的计数器在申请资源时减,释放资源时加。资源可以作为整体使用,如抢票代码和阻塞队列,也可以单独使用,如数组的不同子区域。信号量用于控制资源的单独使用,实现多执行流并发或并行访问资源的不同部分。二元信号量(计数器为0或1)实际上实现了互斥锁的功能,当信号量计数器为1时,申请成功的线程可以进入临界区,申请失败的线程则无法进入,这与互斥锁的行为一致。
这次我们使用的是POSIX信号量。POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。
使用
-
初始化
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
- pshared:0表⽰线程间共享,⾮零表⽰进程间共享;value:信号量初始值
-
销毁
int sem_destroy(sem_t *sem);
-
等待信号量(申请资源)
int sem_wait(sem_t *sem); //P()
- 功能:等待信号量,会将信号量的值减1
- 当申请信号量失败时,线程会被挂起并进入信号量的等待队列。当其他线程执行V操作时,会唤醒等待队列中的线程。
-
发布信号量(归还资源)
int sem_post(sem_t *sem);//V()
- 功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。
模型原理
-
环形队列是一种数据结构,可以用数组或链表实现,但数组实现更为简单。环形队列物理上是一个线性数组,逻辑上呈现环状结构。它包含头指针(head)和尾指针(tail),当指针到达数组末尾时会绕回开头形成环形。环形队列的实现可以不使用链表,而是直接使用数组。在使用信号量时,不能将整个环形队列作为整体资源来访问,而需要将其视为单独的资源进行管理。
-
生产消费模型包含固定的三种关系(生产、消费、生产消费)和两种角色。模型的核心是交易场所的变化,本次引入环形队列作为交易场所。环形队列在生产消费模型中的应用需要结合信号量来实现资源管理,因为需要把环形队列中的每一个成员作为单独的资源,生产者和消费者关注的资源不同,生产者关注剩余空间,消费者关注剩余数据。信号量的使用可以确保生产者和消费者在访问队列时不会发生冲突,保证数据的一致性和正确性。
-
在设计生产者消费者模型时,环形队列作为共享资源,需要确保线程安全。生产者通过
tail
指针进行入队操作,代表生产行为;消费者通过head
指针进行出队操作,代表消费行为。生产者和消费者在访问临界资源前必须先申请信号量,信号量作为资源计数器,用于控制资源的访问。生产者关心的是队列中剩余的空间,而消费者关心的是队列中已有的数据。
环形队列实现
#pragma once#include <iostream>
#include <pthread.h>
#include <vector>
#include "Sem.hpp"
#include "Mutex.hpp"namespace My_Ringbuffer
{using namespace My_Sem; //之前自己封装的类型using namespace My_Mutex;template <class T>class RingBuffer{public:RingBuffer(int cap = 0) : _cap(cap), _p_sem(cap), _c_sem(0), _vt(cap){}void Push(const T &data){_p_sem.P();{ //这是一个临界区LockGuard lockguaed(_p_mutex);_vt[_p_step] = data;_p_step++;_p_step %= _cap;}_c_sem.V();}void Pop(T &data){_c_sem.P();{ //这是一个临界区LockGuard lockguaed(_c_mutex);data = _vt[_c_step];_c_step++;_c_step %= _cap;}_p_sem.V();}~RingBuffer(){}private:std::vector<T> _vt;int _cap;int _c_step;int _p_step;Sem _c_sem;Sem _p_sem;Mutex _c_mutex;Mutex _p_mutex;};
}
-
生产者在生产前需要执行P操作申请空间,确保有足够的空间进行生产。生产完成后,执行V操作释放数据资源,表示队列中的数据增加。消费者在消费前需要执行P操作申请数据,确保有数据可供消费。消费完成后,执行V操作释放空间资源,表示队列中的空间增加。信号量的使用简化了队列状态的判断,无需额外处理队列为空或满的情况。
-
在单生产者单消费者模型中,生产者通过tell指针进行入队操作,消费者通过hand指针进行出队操作。由于只有一个生产者和一个消费者,无需处理多线程竞争问题。如果要支持多生产者消费者的情况,需要考虑生产者与生产者,消费者与消费者之间的互斥关系,为了维护这种关系,可以使用互斥锁
-
与阻塞队列相比,环形队列可以实现生产者和消费者并发执行,不需要所有线程都串行访问交易平台,这是因为我们并不把环形队列当做一个资源使用,而是把它分成多分去使用