Linux的生产者消费者模型
基于前文Linux的多线程-CSDN博客。
目录
1、生产者消费者模型
2、基于阻塞队列的生产者消费者模型
2.1 思路
2.2 代码
2.2.1 Main.cc
2.2.2 CondBlockingQueue.hpp
2.2.3 Cond.hpp
2.2.4 Mutex.hpp
3、基于环形阻塞队列的生产者消费者模型
3.1 思路
3.2 代码
3.2.1 Main.cc
3.2.2 RingQueue.hpp
3.2.3 Sem.hpp
3.2.4 Mutex.hpp
4、总结一下
5、一道有意思的面试题
5.1 思路
5.2 代码
5.2.1 信号量
5.2.2 锁+条件变量
1、生产者消费者模型
- 1个交易场所(以特定结构构成的一种“内存”空间)。
- 2种角色(生产者,消费者,由线程承担)。
- 3种关系(生产者之间,互斥,消费者之间,互斥,生产者与消费者之间,互斥+同步)。
- 生产者与消费者模型的优点:
- 生产过程与消费过程解耦。
- 支持忙闲不均。
- 提高效率。不是体现在交易场所的出/入,而是体现在生产任务和处理任务是并发的(交易场所非空非满时)。
2、基于阻塞队列的生产者消费者模型
- 阻塞队列,是一种常用于实现生产者和消费者模型的数据结构,当队列为空时,向队列里获取元素的操作会阻塞,当队列为满时,向队列里存放元素的操作会阻塞。
画外音:管道的本质也是阻塞队列,为空时,读端阻塞,为满时,写端阻塞。
2.1 思路
- 生产者与消费者之间采用锁+条件变量,实现互斥+同步。
- 生产者之间和消费者之间,采用锁,实现互斥。
- 这一个锁,互斥,用于生产者与消费者竞争(互斥),生产者之间竞争(互斥),消费者之间竞争(互斥)。
- 两个条件变量,同步,分别控制生产者的阻塞和唤醒,消费者的阻塞和唤醒。
- 两个sleep_num,分别用于记录因空阻塞的消费者和因满阻塞的生产者。
- 要点:
- 阻塞队列的什么时候唤醒,是满或空,还是生成一个唤醒或消费一个唤醒?是生成一个唤醒或消费一个唤醒,为了更好的并发(生产者在生产,同时消费者在消费)。
- 唤醒一个还是全部?通常是唤醒一个(signal()),以避免惊群效应,提高效率。
- 为什么while阻塞?因为当线程被唤醒时,线程切换,其他线程可能已经将状态改变,或者被误唤醒,需重新判断条件。
2.2 代码
2.2.1 Main.cc
#include <iostream>
#include <unistd.h>
#include <functional>
#include "BlockingQueue.hpp"void DownLoad()
{std::cout << "下载一个任务" << std::endl;sleep(3); // 假设任务的耗时。
}using task_t = std::function<void()>;void* Producer(void* arg)
{BlockingQueue<task_t>* bq = static_cast<BlockingQueue<task_t>*>(arg);while(true){bq->Push(DownLoad);std::cout << "生产了一个任务" << std::endl;sleep(3);}
}void* Consumer(void* arg)
{BlockingQueue<task_t>* bq = static_cast<BlockingQueue<task_t>*>(arg);while(true){task_t task = bq->Pop();task();sleep(3);}
}int main()
{BlockingQueue<task_t> bq;pthread_t p[2],c[2];pthread_create(p,nullptr,Producer,&bq);pthread_create(p+1,nullptr,Producer,&bq);pthread_create(c,nullptr,Consumer,&bq);pthread_create(c+1,nullptr,Consumer,&bq);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(c[0],nullptr);pthread_join(c[2],nullptr);return 0;
}
2.2.2 CondBlockingQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include "Cond.hpp"
#include "Mutex.hpp"using namespace CondModule;
using namespace MutexModule;const int default_cap = 5;template <typename T>
class BlockingQueue
{bool isEmpty(){return bq.empty();}bool isFull(){return bq.size() >= _cap;}
public:BlockingQueue(int cap = default_cap): _cap(cap),_p_sleep_num(0),_c_sleep_num(0){}void Push(const T& in){// 生产者LockGuard lockguard(_mutex);while(isFull()){++_p_sleep_num;std::cout << "生产者因为满而阻塞: " << _p_sleep_num << std::endl;_full_cond.Wait(_mutex); // 生产者因为满而阻塞--_p_sleep_num;}bq.push(in);if(_c_sleep_num > 0){_empty_cond.Signal();// 唤醒一个因为空而阻塞的消费者std::cout << "唤醒一个因为空而阻塞的消费者" << std::endl;}}T Pop(){// 消费者LockGuard lockguard(_mutex);while(isEmpty()){++_c_sleep_num;std::cout << "消费者者因为空而阻塞: " << _c_sleep_num << std::endl;_empty_cond.Wait(_mutex); // 消费者者因为空而阻塞--_c_sleep_num;}T data = bq.front();bq.pop();if(_p_sleep_num > 0){_full_cond.Signal(); // 唤醒一个因为满而阻塞的生产者std::cout << "唤醒一个因为满而阻塞的生产者" << std::endl;}return data;}private:std::queue<T> bq;int _cap;Mutex _mutex;Cond _empty_cond;Cond _full_cond;int _p_sleep_num;int _c_sleep_num;
};
2.2.3 Cond.hpp
#pragma once#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{class Cond{public:// RAII,资源的初始化与释放与对象的生命周期绑定Cond(){pthread_cond_init(&_cond,nullptr);}void Wait(MutexModule::Mutex& mutex){pthread_cond_wait(&_cond,mutex.Get());}void Signal(){pthread_cond_signal(&_cond);}void Broadcast(){pthread_cond_broadcast(&_cond);}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
}
2.2.4 Mutex.hpp
#pragma once#include <pthread.h>namespace MutexModule
{class Mutex{// RAII,资源的初始化与释放与对象的生命周期绑定public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
3、基于环形阻塞队列的生产者消费者模型
- 环形队列,使用取模进行循环。和阻塞队列一样,空了,消费者阻塞,满了,生产者阻塞。
3.1 思路
- 生产者与消费者之间采用信号量,实现互斥+同步。
- 生产者之间采用锁,实现互斥;消费者之间采用锁,实现互斥。
- 两个信号量,互斥,信号量的原子操作保证了计数的一致性;生产者和消费者不能操作同一个位置(满了或空了,处于同一个位置,不能同时操作)。同步,记录可用空位数量,控制生产者的阻塞和生产;记录已有数据数量,控制消费者的阻塞和消费。
- 两个锁,互斥,分别用于生产者之间竞争(互斥),消费者之间竞争(互斥)。
- 两个step,用于下标索引,分别用于生产者push,消费者pop。
- 要点:
- 前面阻塞队列的queue是push,会自动扩容,但是环形队列使用下标索引,所以在初始化的时候,要先开辟空间。
3.2 代码
3.2.1 Main.cc
#include <iostream>
#include <unistd.h>
#include <functional>
#include "RingQueue.hpp"void DownLoad()
{std::cout << "下载一个任务" << std::endl;sleep(3); // 假设任务的耗时。
}using task_t = std::function<void()>;void* Producer(void* arg)
{RingQueue<task_t>* rq = static_cast<RingQueue<task_t>*>(arg);while(true){rq->Push(DownLoad);std::cout << "生产了一个任务" << std::endl;sleep(3);}
}void* Consumer(void* arg)
{RingQueue<task_t>* rq = static_cast<RingQueue<task_t>*>(arg);while(true){task_t task = rq->Pop();task();sleep(3);}
}int main()
{RingQueue<task_t> rq;pthread_t p[2],c[2];pthread_create(p,nullptr,Producer,&rq);pthread_create(p+1,nullptr,Producer,&rq);pthread_create(c,nullptr,Consumer,&rq);pthread_create(c+1,nullptr,Consumer,&rq);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);return 0;
}
3.2.2 RingQueue.hpp
#pragma once#include <vector>
#include "Mutex.hpp"
#include "Sem.hpp"using namespace MutexModule;
using namespace SemModule;const int default_cap = 5;template<typename T>
class RingQueue
{
public:RingQueue(int cap = default_cap):_rq(cap),_cap(cap),_blank_sem(cap),_p_step(0),_data_sem(0),_c_step(0){}void Push(const T& in){_blank_sem.P();LockGuard lockguard(_p_mutex);_rq[_p_step] = in;++_p_step;_p_step %= _cap;_data_sem.V();}T& Pop(){_data_sem.P();LockGuard lockguard(_c_mutex);T& data = _rq[_c_step];++_c_step;_c_step %= _cap;_blank_sem.V();return data;}
private:std::vector<T> _rq;int _cap;// 生产者Sem _blank_sem; // 空位数int _p_step;Mutex _p_mutex; // 生产者之间的互斥// 消费者Sem _data_sem; // 资源数int _c_step;Mutex _c_mutex; // 消费者之间的互斥
};
3.2.3 Sem.hpp
#pragma once#include <semaphore.h>namespace SemModule
{const unsigned int default_value = 1;class Sem{public:// RAII,资源的初始化与释放与对象的生命周期绑定Sem(unsigned int sem_value = default_value): _sem_value(sem_value){sem_init(&_sem, 0, _sem_value);}void P(){sem_wait(&_sem);}void V(){sem_post(&_sem);}~Sem(){sem_destroy(&_sem);}private:unsigned int _sem_value;sem_t _sem;};
}
3.2.4 Mutex.hpp
#pragma once#include <pthread.h>namespace MutexModule
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}pthread_mutex_t *Get(){return &_mutex;}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};// RAII,资源的初始化与释放与对象的生命周期绑定class LockGuard{public:LockGuard(Mutex &mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
4、总结一下
特性 | 普通阻塞队列 | 环形阻塞队列 |
---|---|---|
内存分配 | 可能频繁分配 / 释放内存(链表实现) | 一次性分配固定内存,无内存碎片 |
扩容机制 | 支持动态扩容 | 容量固定,无法动态扩容 |
访问效率 | 队列头尾操作 O (1),但可能有缓存不友好问题 | 数组连续存储,缓存局部性更好,访问效率更高 |
空间利用率 | 动态大小,空间利用率灵活 | 固定空间,利用率稳定但可能有浪费 |
实现复杂度 | 相对简单 | 稍复杂(需处理环形边界条件) |
- 一种整块临界资源 -> 锁+条件变量。
- 一种多块临界资源 -> 锁+信号量。显然,在生产者消费者模型中,临界资源是一种多块临界资源,锁+信号量的方式更好实现,如:逻辑上更清晰,信号量直接表达资源数量,不需要在唤醒时重新检查复杂条件等。
5、一道有意思的面试题
题目:char arr1[5] = "1234"; char arr2[5] = "abcd";,请打印出"1a2b3c4d";
5.1 思路
- 基于前面的练习,如果是锁+条件变量,竞争锁,顺序是不确定的,而两个信号量刚好能控制顺序,一个为arr1_sem = 1,arr1_sem .P(),arr2_sem.V();一个arr2_sem = 0,arr2_sem .P(),arr1_sem.V()。
- 如果非要用锁(同一时间只能打印一个),那需要一个条件变量,控制两个线程的打印阻塞与唤醒(同时只有一个会阻塞),还要一个标志,true为arr1打印,false为arr2打印。
5.2 代码
5.2.1 信号量
#include <iostream>
#include <pthread.h>
#include "Sem.hpp"using namespace SemModule;Sem sem1(1),sem2(0);char arr1[5] = "1234"; // 包含'/0'
char arr2[5] = "abcd";void *Func1(void *arg)
{for(const auto& e : arr1){sem1.P();std::cout << e;sem2.V();}return nullptr;
}void *Func2(void *arg)
{for(const auto& e : arr2){sem2.P();std::cout << e;sem1.V();}return nullptr;
}int main()
{pthread_t tid1, tid2;pthread_create(&tid1, nullptr, Func1, nullptr);pthread_create(&tid2, nullptr, Func2, nullptr);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);std::cout << std::endl;return 0;
}
5.2.2 锁+条件变量
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"using namespace MutexModule;
using namespace CondModule;char arr1[5] = "1234"; // 包含'/0'
char arr2[5] = "abcd";Mutex mutex;
Cond cond;
bool flag = true; void *Func1(void *arg)
{for(const auto& e : arr1){LockGuard lockguard(mutex);while(!flag)cond.Wait(mutex);std::cout << e;flag = !flag;cond.Signal();}return nullptr;
}void *Func2(void *arg)
{for(const auto& e : arr2){LockGuard lockguard(mutex);while(flag)cond.Wait(mutex);std::cout << e;flag = !flag;cond.Signal();}return nullptr;
}int main()
{pthread_t tid1, tid2;pthread_create(&tid1, nullptr, Func1, nullptr);pthread_create(&tid2, nullptr, Func2, nullptr);pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);std::cout << std::endl;return 0;
}