当前位置: 首页 > news >正文

Linux生产者消费者模型

Linux生产者消费者模型

  • Linux生产者消费者模型详解
    • 生产者消费者模型
      • 生产者消费者模型的概念
      • 生产者消费者模型的特点
      • 生产者消费者模型优点
    • 基于BlockingQueue的生产者消费者模型
      • 基于阻塞队列的生产者消费者模型
      • 模拟实现基于阻塞队列的生产消费模型
        • 基础实现
        • 生产者消费者步调调整
        • 条件唤醒优化
        • 基于计算任务的扩展
    • 总结


Linux生产者消费者模型详解


生产者消费者模型

生产者消费者模型的概念

生产者消费者模型通过一个容器解决生产者与消费者的强耦合问题。

  • 通信方式:生产者不直接与消费者交互,而是将数据放入容器;消费者从容器取数据。
  • 容器作用:缓冲区,解耦生产者与消费者,平衡双方处理能力。

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的经典场景,具有以下特点:

  1. 三种关系
    • 生产者与生产者:互斥(竞争容器访问)。
    • 消费者与消费者:互斥(竞争容器访问)。
    • 生产者与消费者:互斥(共享容器)+同步(生产消费顺序)。
  2. 两种角色:生产者与消费者(线程或进程)。
  3. 一个交易场所:内存缓冲区(如队列)。

互斥原因:容器是临界资源,需用互斥锁保护,多线程竞争访问。
同步原因

  • 容器满时,生产者需等待,避免生产失败。
  • 容器空时,消费者需等待,避免消费失败。
  • 同步确保有序访问,防止饥饿,提高效率。

注意:互斥保证数据正确性,同步实现线程协作。

生产者消费者模型优点

  1. 解耦:生产者与消费者独立运行,通过容器间接交互。
  2. 支持并发:生产者生产时,消费者可同时消费。
  3. 支持忙闲不均:容器缓冲数据,平衡处理速度差异。

对比函数调用(紧耦合),生产者消费者模型是松耦合设计,生产者无需等待消费者处理。


基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中,**阻塞队列(Blocking Queue)**是实现生产者消费者模型的常用数据结构。

  • 与普通队列的区别
    • 队列空时,取元素操作阻塞,直到有数据。
    • 队列满时,放元素操作阻塞,直到有空间。
  • 应用场景:类似管道通信。

模拟实现基于阻塞队列的生产消费模型

基础实现

以单生产者、单消费者为例,使用C++ queue 实现阻塞队列:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>

#define NUM 5

template<class T>
class BlockQueue {
private:
    bool IsFull() { return _q.size() == _cap; }
    bool IsEmpty() { return _q.empty(); }
public:
    BlockQueue(int cap = NUM) : _cap(cap) {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }
    ~BlockQueue() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }
    void Push(const T& data) {
        pthread_mutex_lock(&_mutex);
        while (IsFull()) {
            pthread_cond_wait(&_full, &_mutex); // 队列满,等待
        }
        _q.push(data);
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_empty); // 唤醒消费者
    }
    void Pop(T& data) {
        pthread_mutex_lock(&_mutex);
        while (IsEmpty()) {
            pthread_cond_wait(&_empty, &_mutex); // 队列空,等待
        }
        data = _q.front();
        _q.pop();
        pthread_mutex_unlock(&_mutex);
        pthread_cond_signal(&_full); // 唤醒生产者
    }
private:
    std::queue<T> _q; // 阻塞队列
    int _cap; // 容量上限
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _full; // 满条件变量
    pthread_cond_t _empty; // 空条件变量
};

main.cpp

#include "BlockQueue.hpp"
#include <unistd.h>

void* Producer(void* arg) {
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
    while (true) {
        sleep(1);
        int data = rand() % 100 + 1;
        bq->Push(data);
        std::cout << "Producer: " << data << std::endl;
    }
    return nullptr;
}
void* Consumer(void* arg) {
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
    while (true) {
        sleep(1);
        int data;
        bq->Pop(data);
        std::cout << "Consumer: " << data << std::endl;
    }
    return nullptr;
}
int main() {
    srand((unsigned int)time(nullptr));
    pthread_t producer, consumer;
    BlockQueue<int>* bq = new BlockQueue<int>;
    pthread_create(&producer, nullptr, Producer, bq);
    pthread_create(&consumer, nullptr, Consumer, bq);
    pthread_join(producer, nullptr);
    pthread_join(consumer, nullptr);
    delete bq;
    return 0;
}

说明

  • 单生产者单消费者:无需维护生产者间或消费者间的互斥。
  • 互斥_mutex 保护队列。
  • 同步_full_empty 条件变量控制生产消费顺序。
  • 条件判断:用 while 防止伪唤醒。
  • 运行结果:生产者与消费者步调一致,每秒交替生产消费。
生产者消费者步调调整
  1. 生产快,消费慢

    void* Producer(void* arg) {
        BlockQueue<int>* bq = (BlockQueue<int>*)arg;
        while (true) {
            int data = rand() % 100 + 1;
            bq->Push(data);
            std::cout << "Producer: " << data << std::endl;
        }
    }
    void* Consumer(void* arg) {
        BlockQueue<int>* bq = (BlockQueue<int>*)arg;
        while (true) {
            sleep(1);
            int data;
            bq->Pop(data);
            std::cout << "Consumer: " << data << std::endl;
        }
    }
    
    • 生产者快速填满队列后等待,消费者消费一个后唤醒生产者,后续步调一致。
  2. 生产慢,消费快

    void* Producer(void* arg) {
        BlockQueue<int>* bq = (BlockQueue<int>*)arg;
        while (true) {
            sleep(1);
            int data = rand() % 100 + 1;
            bq->Push(data);
            std::cout << "Producer: " << data << std::endl;
        }
    }
    void* Consumer(void* arg) {
        BlockQueue<int>* bq = (BlockQueue<int>*)arg;
        while (true) {
            int data;
            bq->Pop(data);
            std::cout << "Consumer: " << data << std::endl;
        }
    }
    
    • 消费者初始等待生产者生产,消费后继续等待,步调随生产者。
条件唤醒优化

调整唤醒条件,例如队列数据量超一半时唤醒消费者,小于一半时唤醒生产者:

void Push(const T& data) {
    pthread_mutex_lock(&_mutex);
    while (IsFull()) {
        pthread_cond_wait(&_full, &_mutex);
    }
    _q.push(data);
    if (_q.size() >= _cap / 2) {
        pthread_cond_signal(&_empty); // 超一半唤醒消费者
    }
    pthread_mutex_unlock(&_mutex);
}
void Pop(T& data) {
    pthread_mutex_lock(&_mutex);
    while (IsEmpty()) {
        pthread_cond_wait(&_empty, &_mutex);
    }
    data = _q.front();
    _q.pop();
    if (_q.size() <= _cap / 2) {
        pthread_cond_signal(&_full); // 少于一半唤醒生产者
    }
    pthread_mutex_unlock(&_mutex);
}
  • 效果:生产者快速填满队列后等待,消费者消费至一半以下才唤醒生产者。
基于计算任务的扩展

将队列存储类型改为任务类,扩展功能:

Task.hpp

#pragma once
#include <iostream>

class Task {
public:
    Task(int x = 0, int y = 0, char op = 0) : _x(x), _y(y), _op(op) {}
    void Run() {
        int result = 0;
        switch (_op) {
            case '+': result = _x + _y; break;
            case '-': result = _x - _y; break;
            case '*': result = _x * _y; break;
            case '/': 
                if (_y == 0) { std::cout << "Warning: div zero!" << std::endl; result = -1; }
                else { result = _x / _y; } break;
            case '%': 
                if (_y == 0) { std::cout << "Warning: mod zero!" << std::endl; result = -1; }
                else { result = _x % _y; } break;
            default: std::cout << "error operation!" << std::endl; break;
        }
        std::cout << _x << " " << _op << " " << _y << "=" << result << std::endl;
    }
private:
    int _x, _y;
    char _op;
};

main.cpp

#include "BlockQueue.hpp"
#include "Task.hpp"

void* Producer(void* arg) {
    BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
    const char* ops = "+-*/%";
    while (true) {
        int x = rand() % 100;
        int y = rand() % 100;
        char op = ops[rand() % 5];
        Task t(x, y, op);
        bq->Push(t);
        std::cout << "Producer task done" << std::endl;
    }
    return nullptr;
}
void* Consumer(void* arg) {
    BlockQueue<Task>* bq = (BlockQueue<Task>*)arg;
    while (true) {
        sleep(1);
        Task t;
        bq->Pop(t);
        t.Run();
    }
    return nullptr;
}
int main() {
    srand((unsigned int)time(nullptr));
    pthread_t producer, consumer;
    BlockQueue<Task>* bq = new BlockQueue<Task>;
    pthread_create(&producer, nullptr, Producer, bq);
    pthread_create(&consumer, nullptr, Consumer, bq);
    pthread_join(producer, nullptr);
    pthread_join(consumer, nullptr);
    delete bq;
    return 0;
}
  • 功能:生产者生成计算任务,消费者执行计算并输出结果。
  • 扩展性:通过定义不同 Task 类实现多样化任务处理。

总结

  • 模型核心:通过容器解耦生产者与消费者,支持并发与忙闲不均。
  • 实现关键:阻塞队列结合互斥锁与条件变量,确保互斥与同步。
  • 灵活性:可调整步调、唤醒条件,或扩展为复杂任务处理。

相关文章:

  • 快速求出质数
  • 【算法训练】单向链表
  • pandas中新增的case_when()方法
  • c++ 命名空间 namespace
  • 【 <二> 丹方改良:Spring 时代的 JavaWeb】之 Spring Boot 中的数据验证:使用 Hibernate Validator
  • 数据建模流程: 概念模型>>逻辑模型>>物理模型
  • NSSCTF(MISC)——[NSSRound#4 SWPU]Type Message
  • 网络爬虫-2:基础与理论
  • 论文阅读笔记:Denoising Diffusion Probabilistic Models (3)
  • C语言中*a与a的区别和联系
  • 数据结构——B树、B+树、哈夫曼树
  • 安全测试理论
  • JavaScript 性能优化实战
  • 【云馨AI-大模型】自动化部署Dify 1.1.2,无需科学上网,Linux环境轻松实现,附Docker离线安装等
  • 【C++教程】setw()函数的使用方法
  • 深入理解Linux中的SCP命令:使用与原理
  • Hutool中的相关类型转换
  • 山东大学数据结构课程设计
  • linux--时区查看和修改
  • 动态规划-01背包
  • 旭辉控股集团:去年收入477.89亿元,长远计划逐步向轻资产业务模式转型
  • 女冰队长于柏巍,拒绝被年龄定义
  • 国务院安委办、应急管理部进一步调度部署“五一”假期安全防范工作
  • 浪尖计划再出发:万亿之城2030课题组赴九城调研万亿产业
  • 美国清洗政治:一幅残酷新世界的蓝图正在展开
  • 中国海油总裁:低油价短期影响利润,但也催生资产并购机会