当前位置: 首页 > news >正文

Linux多线程(十二)之【生产者消费者模型】

文章目录

    • 生产者消费者模型
      • 为何要使用生产者消费者模型
        • 生产者消费者模型优点
      • 基于BlockingQueue的生产者消费者模型
        • BlockingQueue
        • C++ queue模拟阻塞队列的生产消费模型
          • 单线程生产消费模型
          • 多线程生产消费模型

生产者消费者模型

consumer/productor

321原则

321原则(便于记忆)

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,

所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,

消费者不找生产者要数据,而是直接从阻塞队列里取,

阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点
  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

image-20250425234123376

生产消费模型的高效问题

cp问题高效

基于BlockingQueue的生产者消费者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列区别在于,

当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;

当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出

(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

image-20250425234454539

C++ queue模拟阻塞队列的生产消费模型

代码:

单线程生产消费模型

blockqueue.hpp

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);low_water=maxcap_/3;high_water=maxcap_*2/3;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}T pop(){pthread_mutex_lock(&mutex_);if(q_.size()==0){pthread_cond_wait(&c_cond_,&mutex_);//1.调用的时候,自动释放锁}T t=q_.front();             // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足q_.pop();if(q_.size()<low_water)pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return t;}void push(const T &in){pthread_mutex_lock(&mutex_);if(q_.size()==maxcap_){pthread_cond_wait(&p_cond_,&mutex_);//1.调用的时候,自动释放锁}q_.push(in);                // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足if(q_.size()>high_water)pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:queue<T> q_;int maxcap_;// int mincap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;int low_water;int high_water;
};

main.cc

#include<iostream>
#include"blockqueue.hpp"
#include<unistd.h>using namespace std;void*Consumer(void*args)
{blockqueue<int> *bq=static_cast<blockqueue<int> *>(args);while(1){int data=bq->pop();cout<<"消费了一个数据: "<<data<<endl;// sleep(1);}
}void*Productor(void*args)
{blockqueue<int> *bq=static_cast<blockqueue<int> *>(args);int data=0;while(1){data++;bq->push(data);cout<<"生产了一个数据: "<<data<<endl;sleep(1);}
}int main()
{blockqueue<int> *bq=new blockqueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0;
}

image-20250508225919230

消费者sleep

image-20250508230034577

两者都不sleep

image-20250508230244057

多线程生产消费模型

补充:

  1. 为什么判断条件要放到加锁之后?

因为判断临界资源条件是否满足,也是在访问临界资源!

判断临界资源是否就绪,是通过在临界区内部判断的!

  1. 如果临界资源未就绪,那么线程就要进行等待。

等待的时候,线程是持有锁的!所以调用wait时,自动释放锁。

如果不释放锁,直接等待,那么等待的线程就没有线程可以唤醒了,

因为其他线程都在锁外,进不去临界区。

该线程因为唤醒而返回的时候,重新持有锁了。

  1. 如果线程在wait时,被误唤醒了呢?

伪唤醒的概念

伪唤醒

解决方法:判断条件时用while,不用if

blockqueue.hpp

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);// low_water=maxcap_/3;// high_water=maxcap_*2/3;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}T pop(){pthread_mutex_lock(&mutex_);while(q_.size()==0)//做到防止代码被伪唤醒!{pthread_cond_wait(&c_cond_,&mutex_);//1.调用的时候,自动释放锁}T t=q_.front();             // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足q_.pop();// if(q_.size()<low_water)pthread_cond_signal(&p_cond_);pthread_cond_signal(&p_cond_);//pthread_cond_broadcastpthread_mutex_unlock(&mutex_);return t;}void push(const T &in){pthread_mutex_lock(&mutex_);while(q_.size()==maxcap_)//做到防止代码被伪唤醒!{pthread_cond_wait(&p_cond_,&mutex_);//1.调用的时候,自动释放锁}q_.push(in);                // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足// if(q_.size()>high_water)pthread_cond_signal(&c_cond_);pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:queue<T> q_;int maxcap_;// int mincap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;// int low_water;// int high_water;
};

Task.hpp

#pragma once
#include <iostream>
#include<string>
using namespace std;enum
{Div_zero = 1,Mod_zero,Unknown
};class Task
{
public:Task(int x, int y, char op): a(x), b(y), op_(op), ret(0), exitcode(0){}void Run(){switch (op_){case '+':ret = a + b;break;case '-':ret = a - b;break;case '*':ret = a * b;break;case '/':{if (b == 0)exitcode = Div_zero;elseret = a / b;}break;case '%':{if (b == 0)exitcode = Mod_zero;elseret = a % b;}break;default:exitcode=Unknown;break;}}string GetTask(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=???";return r;}string Getret(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=";r+=to_string(ret);r+=" [ exitcode: ";r+=to_string(exitcode);r+=" ]";return r;}void operator()(){Run();}~Task() {}private:int a;int b;char op_;int ret;int exitcode;
};

main.cc

#include <iostream>
#include "blockqueue.hpp"
#include <unistd.h>
#include "Task.hpp"
#include <ctime>using namespace std;string oper = "+-*/%";void *Consumer(void *args)
{blockqueue<Task> *bq = static_cast<blockqueue<Task> *>(args);while (1){Task data = bq->pop();// data.Run();// 计算// data.Run();data();cout << "处理了一个任务 , 运算结果为: " << data.Getret() << " ,thread id: " << pthread_self() << endl;// sleep(1);}
}void *Productor(void *args)
{int len = oper.size();blockqueue<Task> *bq = static_cast<blockqueue<Task> *>(args);// int x=0,y=0;// Task data(x,y);while (1){// 模拟生产者生产数据int x = rand() % 10 + 1; //[1,10]int y = rand() % 10;     //[0,9];char op = oper[rand() % len];Task data(x, y, op);usleep(10);// 计算bq->push(data);cout << "生产了一个任务: " << data.GetTask() << " ,thread id: " << pthread_self() << endl;sleep(1);}
}int main()
{srand(time(nullptr));blockqueue<Task> *bq = new blockqueue<Task>();pthread_t c[3], p[5];for (int i = 0; i < 3; i++){pthread_create(c + i, nullptr, Consumer, bq);}for (int i = 0; i < 5; i++){pthread_create(p + i, nullptr, Productor, bq);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}delete bq;return 0;
}

image-20250511010553986

http://www.dtcms.com/a/265897.html

相关文章:

  • “Payload document size is larger than maximum of 16793600.“问题解决(MongoDB)
  • Kettle数据抽取(十一)作业-邮件
  • 什么是码率?剪映中如何选择适合的视频码率
  • C++(std::sort)
  • js-cookie详细介绍
  • Node.js与Webpack
  • 2025年6月:技术探索与生活平衡的协奏曲
  • 目标检测:从基础原理到前沿技术全面解析
  • 架构师的“降维打击”:用桥接模式,把 N*M 的问题变成 N+M
  • Matplotlib 安装使用教程
  • 【Git】同时在本地使用多个github账号进行github仓库管理
  • C++ 网络编程(14) asio多线程模型IOThreadPool
  • 【数据结构】树的基本操作
  • 阿里云服务网格ASM实践
  • 抗辐照芯片在核电厂火灾探测器中的应用优势与性能解析
  • springMvc的简单使用:要求在浏览器发起请求,由springMVC接受请求并响应,将个人简历信息展示到浏览器
  • Java 原生 HTTP Client
  • https如何利用工具ssl证书;使用自己生成的证书
  • http、SSL、TLS、https、证书
  • 【交互设计】UI 与 UX 简介:从核心概念到行业实践
  • 微算法科技(NASDAQ MLGO)基于量子图像处理的边缘检测算法:开拓图像分析新视野
  • [2025CVPR]SEEN-DA:基于语义熵引导的领域感知注意力机制
  • 通过观看数百个外科手术视频讲座来学习多模态表征|文献速递-最新论文分享
  • 【数据结构】哈希——闭散列/开散列模拟实现(C++)
  • [论文阅读] 人工智能 | 在非CUDA硬件上运行几何学习:基于Intel Gaudi-v2 HPU的PyTorch框架移植实践
  • Stable Diffusion 项目实战落地:AI照片修复 第一篇 从黑白到彩色:用AI给照片上色的魔法之旅
  • stm32f103c8t6---ymodem协议串口IAP升级(只教怎么操作,略讲原理,100%成功!)
  • laravel基础:隐式模型绑定的用法和介绍
  • 【AI】大语言模型(LLM) NLP
  • STM32-第二节-GPIO输入(按键,传感器)