Linux学习:生产者消费者模型
目录
- 1. 生产者消费者模型的相关概念
- 1.1 什么是生产者消费者模型
- 1.2 生产者消费者模型的优势作用
- 2. 多线程简单实现生产者消费者模型
- 2.1 设计方案
- 2.2 代码实现
- 2.2.1 线程类
- 2.2.2 BlockQueue类
- 2.2.3 任务类
- 2.2.4 主干代码
1. 生产者消费者模型的相关概念
1.1 什么是生产者消费者模型
生产者消费者模型是一种经典的并发编程的设计模式。其由三部分组成分别为生产者、消费者与共享资源缓冲区。
- 生产者:
生产任务、数据
的线程或进程 - 消费者:
处理任务、数据
的线程或进程 - 共享资源缓冲区: 任务与数据的暂存区,生产者向其中存储任务与数据,消费者从中获取任务与数据。
简单来说,共享资源缓冲区,是一段临时保存数据的内存空间,一般使用某种数据结构对象充当(阻塞队列)
生产者消费者模型中,其充当生产、消费角色的线程/进程,它们之间需要满足特定的关系,具体如下:
角色 | 关系 |
---|---|
生产者 vs 生产者 | 互斥 || 同步(互斥,可能同步) |
消费者 vs 消费者 | 互斥 || 同步(互斥,可能同步) |
生产者 vs 消费者 | 互斥 && 同步 |
1.2 生产者消费者模型的优势作用
生产者消费者模型是为了协调生产者与消费者之间协作。具体设计为,生产者生产的数据不再直接交给消费者,而是直接存入共享资源缓冲区。而消费者也不再从生产者手中获取数据,则是转为从共享资源缓冲区中获取存入的历史数据。通过这样的设计方式,让生产与消费的操作解耦合,提高更好的并发度,并且支持生产者、消费者之间的忙先不均。
生产者消费者模型高效与并发度好的原因为,支持生产任务与处理任务或数据的并发。当生产者竞争锁或是生产任务、数据时,消费者可执行自己的任务,或是直接从缓冲区中获取存储的历史任务、数据。消费者执行任务不影响生产者获取、生产任务。
2. 多线程简单实现生产者消费者模型
2.1 设计方案
1. 生产者消费者模型的实体选择:
- 生产者:创建一批线程向共享资源缓冲区中生产任务
- 消费者:创建一批线程从共享资源缓冲区获取任务并处理
- 共享资源缓冲区:此处使用自定义的阻塞队列(BlockQueue)实现,保证生产者、消费者访问其时互斥且同步
2.阻塞队列(BlockQueue)的实现:
成员变量 | 作用 |
---|---|
queue<T> | 用于存储任务、数据的队列 |
int _cap | 队列的容量大小 |
pthread_mutex_t _mutex | 访问阻塞队列时控制互斥的锁 |
pthread_cond_t _productor_cond | 控制生产者同步的条件变量 |
pthread_cond_t _consumer_cond | 控制消费者同步的条件变量 |
int _productor_wait_num | 在条件变量处阻塞等待的生产者线程数量 |
int _consumer_wait_num | 在条件变量处阻塞等待的消费者线程数量 |
成员函数 | 作用 |
---|---|
void Equeue(T& data) | 将生产者生产的数据入队列 |
void Pop(T* data) | 从队列中获取历史的数据,采用输出型参数的方式 |
bool IsFull() | 检测队列是否满了 |
bool IsEmpty() | 检测队列是否为空 |
- 互斥: 阻塞队列的入队与出队操作都必须是互斥的,即保证无论何时,无论是生产者还是消费者线程都只能有一个线程在访问阻塞队列。
- 同步: 除此之外,还要保证生产者与消费者之间的同步,即队列中数据存储已慢,则阻塞生产者,队列中没有数据,则阻塞消费者,确保生产与消费的整个过程可以正常进行。
- 条件变量的优化: 当没有生产者或消费者在条件变量下阻塞等待时,就可以选择不需要再去将对应的条件变量唤醒。
3. 程序的主干逻辑与函数
2.2 代码实现
2.2.1 线程类
#ifndef THREAD_MODULE
#define THREAD_MODULE
#include <pthread.h>
#include <iostream>
using namespace std;
#include <functional>namespace ThreadModule
{template<typename T>using func_t = function<void(T&)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}#endif
2.2.2 BlockQueue类
#ifndef BLOCK_QUEUE_HPP
#define BLOCK_QUEUE_HPP#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
public:BlockQueue(int cap){_cap = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_consumer_cond, nullptr);pthread_cond_init(&_productor_cond, nullptr);_consumer_wait_num = 0;_productor_wait_num = 0;}bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}void Enqueue(T& data){pthread_mutex_lock(&_mutex);while(IsFull()){_productor_wait_num++;pthread_cond_wait(&_productor_cond, &_mutex);_productor_wait_num--;}_q.push(data);if(_consumer_wait_num > 0)//当有正在等待的消费者时pthread_cond_signal(&_consumer_cond);//生产了继续消费pthread_mutex_unlock(&_mutex);}void Pop(T* data){pthread_mutex_lock(&_mutex);while(IsEmpty()){_consumer_wait_num++;pthread_cond_wait(&_consumer_cond, &_mutex);_consumer_wait_num--;}*data = _q.front();_q.pop();if(_productor_wait_num > 0)//当有正在阻塞等待的生产者时pthread_cond_signal(&_productor_cond);//消费了继续生产pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_consumer_cond);pthread_cond_destroy(&_productor_cond);}private:std::queue<T> _q; //队列存储数据int _cap; //阻塞队列的容量pthread_mutex_t _mutex;pthread_cond_t _consumer_cond;pthread_cond_t _productor_cond;int _consumer_wait_num;int _productor_wait_num;
};#endif
条件变量的阻塞判断条件应设为为while
,不能设置为if
,这是因为pthread_cond_wait
可能会出错返回导致继续执行后续代码。可此时唤醒条件并未满足,条件变量并没有被真的唤醒,此种情况被称为伪唤醒。if
条件判断语句并不能预防此种伪唤醒的错误情况,所以,一般条件变量的阻塞判断条件会被设置为while
检测,保证代码的健壮性。
2.2.3 任务类
#ifndef TASK_HPP
#define TASK_HPP
#include <functional>
#include <iostream>
using namespace std;//使用仿函数
class Task
{
public://无参构造,用于消费者创建接收数据Task(){}Task(int x, int y):_x(x), _y(y){}~Task(){}void toDebugQuestion(){cout << _x << " + " << _y << " =?" << endl;}void toDebugAnswer(){cout << _x << " + " << _y << " = " << _x + _y << endl;}private:int _x;int _y;int _sum;
};#endif
2.2.4 主干代码
#include "Thread.hpp"
#include "BlockQueue.hpp"
#include <vector>
using namespace ThreadModule;
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"using blockqueue_t = BlockQueue<Task>;void ProductorRun(blockqueue_t& bq)
{while(true){sleep(1);//生产慢Task t(rand() % 10, rand() % 10);bq.Enqueue(t);t.toDebugQuestion();}
}void ConsumerRun(blockqueue_t& bq)
{Task t;while(true){//sleep(1);//消费慢bq.Pop(&t);t.toDebugAnswer();}
}void StartComm(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, func_t<blockqueue_t> func, int num, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, bq, name);threads.back().start();cout << name << " create success..." << endl;}
}void StartProductor(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ProductorRun, num, "Productor");
}void StartConsumer(vector<Thread<blockqueue_t> >& threads, blockqueue_t& bq, int num)
{StartComm(threads, bq, ConsumerRun, num, "Consumer");
}void WaitAllThreads(vector<Thread<blockqueue_t> >& threads)
{for(auto& thread : threads){thread.join();}
}int main()
{srand((size_t)time(nullptr));vector<Thread<blockqueue_t> > threads;blockqueue_t bq(5);StartProductor(threads, bq, 3);StartConsumer(threads, bq, 5);WaitAllThreads(threads);return 0;
}