Linux生产者消费者模型
Linux生产者消费者模型
- Linux生产者消费者模型详解
- 生产者消费者模型
- 生产者消费者模型的概念
- 生产者消费者模型的特点
- 生产者消费者模型优点
- 基于BlockingQueue的生产者消费者模型
- 基于阻塞队列的生产者消费者模型
- 模拟实现基于阻塞队列的生产消费模型
- 基础实现
- 生产者消费者步调调整
- 条件唤醒优化
- 基于计算任务的扩展
- 总结
Linux生产者消费者模型详解
生产者消费者模型
生产者消费者模型的概念
生产者消费者模型通过一个容器解决生产者与消费者的强耦合问题。
- 通信方式:生产者不直接与消费者交互,而是将数据放入容器;消费者从容器取数据。
- 容器作用:缓冲区,解耦生产者与消费者,平衡双方处理能力。
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的经典场景,具有以下特点:
- 三种关系:
- 生产者与生产者:互斥(竞争容器访问)。
- 消费者与消费者:互斥(竞争容器访问)。
- 生产者与消费者:互斥(共享容器)+同步(生产消费顺序)。
- 两种角色:生产者与消费者(线程或进程)。
- 一个交易场所:内存缓冲区(如队列)。
互斥原因:容器是临界资源,需用互斥锁保护,多线程竞争访问。
同步原因:
- 容器满时,生产者需等待,避免生产失败。
- 容器空时,消费者需等待,避免消费失败。
- 同步确保有序访问,防止饥饿,提高效率。
注意:互斥保证数据正确性,同步实现线程协作。
生产者消费者模型优点
- 解耦:生产者与消费者独立运行,通过容器间接交互。
- 支持并发:生产者生产时,消费者可同时消费。
- 支持忙闲不均:容器缓冲数据,平衡处理速度差异。
对比函数调用(紧耦合),生产者消费者模型是松耦合设计,生产者无需等待消费者处理。
基于BlockingQueue的生产者消费者模型
基于阻塞队列的生产者消费者模型
在多线程编程中,**阻塞队列(Blocking Queue)**是实现生产者消费者模型的常用数据结构。
- 与普通队列的区别:
- 队列空时,取元素操作阻塞,直到有数据。
- 队列满时,放元素操作阻塞,直到有空间。
- 应用场景:类似管道通信。
模拟实现基于阻塞队列的生产消费模型
基础实现
以单生产者、单消费者为例,使用C++ queue
实现阻塞队列:
BlockQueue.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#define NUM 5
template<class T>
class BlockQueue {
private:
bool IsFull() { return _q.size() == _cap; }
bool IsEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = NUM) : _cap(cap) {
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
~BlockQueue() {
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
void Push(const T& data) {
pthread_mutex_lock(&_mutex);
while (IsFull()) {
pthread_cond_wait(&_full, &_mutex); // 队列满,等待
}
_q.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_empty); // 唤醒消费者
}
void Pop(T& data) {
pthread_mutex_lock(&_mutex);
while (IsEmpty()) {
pthread_cond_wait(&_empty, &_mutex); // 队列空,等待
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_full); // 唤醒生产者
}
private:
std::queue<T> _q; // 阻塞队列
int _cap; // 容量上限
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _full; // 满条件变量
pthread_cond_t _empty; // 空条件变量
};
main.cpp:
#include "BlockQueue.hpp"
#include <unistd.h>
void* Producer(void* arg) {
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true) {
sleep(1);
int data = rand() % 100 + 1;
bq->Push(data);
std::cout << "Producer: " << data << std::endl;
}
return nullptr;
}
void* Consumer(void* arg) {
BlockQueue<int>* bq = (BlockQueue<int>*)arg;
while (true) {
sleep(1);
int data;
bq->Pop(data);
std::cout << "Consumer: " << data << std::endl;
}
return nullptr;
}
int main() {
srand((unsigned int)time(nullptr));
pthread_t producer, consumer;
BlockQueue<int>* bq = new BlockQueue<int>;
pthread_create(&producer, nullptr, Producer, bq);
pthread_create(&consumer, nullptr, Consumer, bq);
pthread_join(producer, nullptr);
pthread_join(consumer, nullptr);
delete bq;
return 0;
}
说明:
- 单生产者单消费者:无需维护生产者间或消费者间的互斥。
- 互斥:
_mutex
保护队列。 - 同步:
_full
和_empty
条件变量控制生产消费顺序。 - 条件判断:用
while
防止伪唤醒。 - 运行结果:生产者与消费者步调一致,每秒交替生产消费。
生产者消费者步调调整
-
生产快,消费慢:
void* Producer(void* arg) { BlockQueue<int>* bq = (BlockQueue<int>*)arg; while (true) { int data = rand() % 100 + 1; bq->Push(data); std::cout << "Producer: " << data << std::endl; } } void* Consumer(void* arg) { BlockQueue<int>* bq = (BlockQueue<int>*)arg; while (true) { sleep(1); int data; bq->Pop(data); std::cout << "Consumer: " << data << std::endl; } }
- 生产者快速填满队列后等待,消费者消费一个后唤醒生产者,后续步调一致。
-
生产慢,消费快:
void* Producer(void* arg) { BlockQueue<int>* bq = (BlockQueue<int>*)arg; while (true) { sleep(1); int data = rand() % 100 + 1; bq->Push(data); std::cout << "Producer: " << data << std::endl; } } void* Consumer(void* arg) { BlockQueue<int>* bq = (BlockQueue<int>*)arg; while (true) { int data; bq->Pop(data); std::cout << "Consumer: " << data << std::endl; } }
- 消费者初始等待生产者生产,消费后继续等待,步调随生产者。
条件唤醒优化
调整唤醒条件,例如队列数据量超一半时唤醒消费者,小于一半时唤醒生产者:
void Push(const T& data) {
pthread_mutex_lock(&_mutex);
while (IsFull()) {
pthread_cond_wait(&_full, &_mutex);
}
_q.push(data);
if (_q.size() >= _cap / 2) {
pthread_cond_signal(&_empty); // 超一半唤醒消费者
}
pthread_mutex_unlock(&_mutex);
}
void Pop(T& data) {
pthread_mutex_lock(&_mutex);
while (IsEmpty()) {
pthread_cond_wait(&_empty, &_mutex);
}
data = _q.front();
_q.pop();
if (_q.size() <= _cap / 2) {
pthread_cond_signal(&_full); // 少于一半唤醒生产者
}
pthread_mutex_unlock(&_mutex);
}
- 效果:生产者快速填满队列后等待,消费者消费至一半以下才唤醒生产者。
基于计算任务的扩展
将队列存储类型改为任务类,扩展功能:
Task.hpp:
#pragma once
#include <iostream>
class Task {
public:
Task(int x = 0, int y = 0, char op = 0) : _x(x), _y(y), _op(op) {}
void Run() {
int result = 0;
switch (_op) {
case '+': result = _x + _y; break;
case '-': result = _x - _y; break;
case '*': result = _x * _y; break;
case '/':
if (_y == 0) { std::cout << "Warning: div zero!" << std::endl; result = -1; }
else { result = _x / _y; } break;
case '%':
if (_y == 0) { std::cout << "Warning: mod zero!" << std::endl; result = -1; }
else { result = _x % _y; } break;
default: std::cout << "error operation!" << std::endl; break;
}
std::cout << _x << " " << _op << " " << _y << "=" << result << std::endl;
}
private:
int _x, _y;
char _op;
};
main.cpp:
#include "BlockQueue.hpp"
#include "Task.hpp"
void* Producer(void* arg) {
BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
const char* ops = "+-*/%";
while (true) {
int x = rand() % 100;
int y = rand() % 100;
char op = ops[rand() % 5];
Task t(x, y, op);
bq->Push(t);
std::cout << "Producer task done" << std::endl;
}
return nullptr;
}
void* Consumer(void* arg) {
BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
while (true) {
sleep(1);
Task t;
bq->Pop(t);
t.Run();
}
return nullptr;
}
int main() {
srand((unsigned int)time(nullptr));
pthread_t producer, consumer;
BlockQueue<Task>* bq = new BlockQueue<Task>;
pthread_create(&producer, nullptr, Producer, bq);
pthread_create(&consumer, nullptr, Consumer, bq);
pthread_join(producer, nullptr);
pthread_join(consumer, nullptr);
delete bq;
return 0;
}
- 功能:生产者生成计算任务,消费者执行计算并输出结果。
- 扩展性:通过定义不同
Task
类实现多样化任务处理。
总结
- 模型核心:通过容器解耦生产者与消费者,支持并发与忙闲不均。
- 实现关键:阻塞队列结合互斥锁与条件变量,确保互斥与同步。
- 灵活性:可调整步调、唤醒条件,或扩展为复杂任务处理。