【linux】多线程(六)生产者消费者模型,queue模拟阻塞队列的生产消费模型
小编个人主页详情<—请点击
小编个人gitee代码仓库<—请点击
linux系列专栏<—请点击
倘若命中无此运,孤身亦可登昆仑,送给屏幕面前的读者朋友们和小编自己!
目录
- 前言
- 一、生产者消费者模型
- 原理介绍
- 如何理解高效呢?
- 为什么要使用生产者消费者模型
- 二、queue模拟阻塞队列的生产消费模型
- Tash.hpp
- 基本框架
- operator()()
- GetTask
- GetResult
- BlockQueue.hpp
- 基本框架
- push
- pop
- 三、模拟单生产者单消费者模型
- main.c
- 四、多生产者多消费者模型
- main.c
- 总结
前言
【linux】多线程(五)线程安全,死锁,线程同步(条件变量pthread_cond_init/destroy/wait/signal/broadcast)——书接上文 详情请点击<——,本文会在上文的基础上进行讲解,所以对上文不了解的读者友友请点击前方的蓝字链接进行学习
本文由小编为大家介绍——【linux】多线程(六)生产者消费者模型
一、生产者消费者模型
原理介绍
- 生产者消费者模型(consumer producter,也叫做cp问题),现在小编引入一个买卖辣条的场景供大家理解生产者消费者模型
- 假设有这么一个超市,它里面专门卖辣条,日常我们想吃辣条的时候,一般都是去该超市中去购买辣条,我们作为顾客,那么扮演的就是消费者的角色,顾客一般有很多,所以消费者也有很多,当消费者从超市购买完成辣条之后,通常是拿到家里后,在家里吃辣条
- 可是超市中的种类繁多的辣条也不是平白无故的出现的,即超市需要从很多供应商进货,供应商负责将辣条放到超市里,但是供应商的辣条也不是凭空出现的,供应商需要有自己的辣条工厂来生产辣条,所以供应商从自己的辣条工厂获取辣条,所以供应商就扮演的是生产者的角色
- 超市就如同一个大号的缓存,超市负责从供应商那里进货,然后将辣条放到货架上,卖给消费者,如果超市中的货架上的辣条已经放满了,那么供货商将无法继续供给辣条,所以供应商就应该停下生产,等待消费者购买辣条。如果超市中货架上的辣条已经全部为空了,那么消费者将无法继续购买辣条,所以消费者应该停下购买,等待生产者生产辣条
- 假设临近过年了,按照往年经验供应商预计顾客会对辣条的需求量剧增,但是过年的时候,工厂的工人要放假无法生产辣条了,而过年的时候,顾客对辣条的需求量又很大,所以怎么办呢?所以多个供应商在过年前的前一个月加大工厂的产量,确保将超市的货架摆满,于是过年的时候,工厂的工人放假了,消费者就去辣条超市中购买辣条,此时生产者已经不生产辣条了,但是消费者仍能购买到很多辣条
- 所以超市作为一个大号的缓存,可以将生产和消费的行为进行一定程度的解耦,超市可以以两种角度看待,空间角度,商品数角度,其中生产者关心的是超市的空间,消费者关心的是商品数
- 所以此时我们回归一下,将上述场景与linux联系起来,那么供应商以及背后的工厂联合起来扮演的是生产者,顾客以及背后的家联合起来扮演的就是消费者,超市是一个巨大的缓存,那么就是扮演的特定结构的内存空间
- 在linux中线程是操作系统调度的基本单位,所以说无论是生产者还是消费者都只能由线程来承担,而线程之间传输的只能是数据,所以生产者线程(多个)传输数据到特定结构的内存空间,消费者线程(多个)再从特定结构的内存空间拿数据
- 而特定结构的内存空间又是共享资源,多线程并发访问共享资源会出现数据不一致等并发问题,所以要使用锁保护共享资源,即同一时间只允许一个执行流访问这个共享资源
- 所以由于生产者有多个,消费者有多个,那么其中也必然会有3中关系,如下的互斥都是为了保证数据安全,只有一个角色访问超市
(1)生产者和生产者之间:互斥(供应商和供应商之间是竞争关系,彼此都希望对方倒闭,然后自己独占市场,所以生产者和生产者是互斥关系)
(2)消费者和消费者之间:互斥(假设超市只剩下一包辣条,但是此时却进来了两个消费者,这两个消费者是竞争关系,都想争夺最后一包辣条,都希望双方不和自己竞争,然后自己得到辣条,所以是消费者和消费者是互斥关系)
(3)生产者和消费者之间:互斥,同步(超市中有很多货架,假设供货商正在向货架上放辣条,此时消费者不能来将辣条抢走,因为我供应商还没有统计我摆满了几个货架,还没有和超市算钱,所以生产者和消费者之间有互斥关系,如果超市中的货架上的辣条已经放满了,那么供货商将无法继续供给辣条,所以供应商就应该停下生产,等待消费者购买辣条。如果超市中货架上的辣条已经全部为空了,那么消费者将无法继续购买辣条,所以消费者应该停下购买,等待生产者生产辣条,所以生产者和消费者之间还应该按照一定是顺序访问超市,所以生产者和消费者之间还有同步关系) - 所以为了便于理解,我们给生产者消费者模型一个321原则
(1)3种关系——如上
(2)2种角色——生产者和消费者
(1)1个场所——特定结构的内存空间 - 并且生产者消费者模型还有两个优点
(1)支持忙闲不均(即可以平衡生产者和消费者的需求,生产者生产多,消费者需求少,不要紧,可以将生产者生产的数据放到特定结构的内存空间缓存起来。生产者生产少,消费者需求多,不要紧,特定结构的内存空间中还有生产者提前缓存的数据,那么消费者直接拿数据就行)
(2)生产和消费进行解耦
(3)高效(那么如何理解高效呢?如下)
如何理解高效呢?
-
小编同样是拿上面的场景来帮助大家理解高效
-
我们看生产者不能仅仅看到它向超市供应辣条,还应该看到他从工厂获取辣条,那么对应到linux中即我们看生产者线程不能单单的看它向特定结构的内存空间放数据,即将数据放到特定结构的内存空间,还应该看到它获取数据,如何获取数据呢?从用户(例如用户的一些登录注册请求等),网络中进行获取数据
-
同样的,我们看消费者不能仅仅看到它从超市购买辣条,还应该看到他在家中吃辣条,那么对应到linux中即我们看消费者线程不能单单的看他从特定结构的内存空间中拿数据,还应该看到他加工处理数据
-
我们将生产者将数据放到特定结构的内存空间称之为生产数据到特定结构的内存空间,将消费者从特定的内存空间中拿数据称之为消费数据
-
于是生产者在生产者消费者模型做了两点:
(1)获取数据
(2)生产数据到特定结构的内存空间 -
于是消费者在生产者消费者模型做了两点:
(1)消费数据
(2)加工处理数据
-
别忘了有多个生产者,有多个消费者,那么对应在linux中有多个生产者线程,有多个消费者线程,所以那么当生产者消费者模型运行起来的时候,也就意味着操作系统内有多个线程并发式的运行,但是特定结构的内存空间在访问的时候由于已经加了锁,所以同一时间只允许一个线程访问,那么为什么小编还要说生产者消费者模型高效呢?
-
生产者从用户,网络等获取数据需不需要花费时间?需要。消费者加工处理数据需不需要时间?需要,这就对了都需要时间
-
我们的多生产者线程,多消费者线程的高效是对比单生产者线程,单消费者线程的场景得出的
-
那么有多个生产者线程中的一个生产者线程在生产数据到特定结构的内存空间,其它的生产者线程在并发的获取数据,那么这是不是节约时间?是的
-
那么有多个消费者线程中的一个消费者线程在从特定结构的内存空间获取数据,即消费数据,其它的消费者线程在并发的加工处理数据,那么这是不是节约时间?是的
-
那么有一个生产者线程在生产数据到特定结构的内存空间,多个消费者线程在并发的加工处理数据,这是不是节约时间?是的
-
那么有一个消费者线程在从特定结构的内存空间获取数据,即消费数据,多个生产者线程在并发的从用户,网络等获取数据,这是不是节约时间?是的
-
那么多个生产者线程获取数据,多个消费者线程加工处理数据,一个生产者线程在身缠数据到特定结构的内存空间,这是不是节约时间?是的
-
那么多个生产者线程获取数据,多个消费者线程加工处理数据,一个消费者线程在从特定结构的内存空间获取数据,即消费数据,这是不是节约时间?是的
-
所以小编才说生产者消费者模型是高效的
为什么要使用生产者消费者模型
- 生产者消费者模型就是用一个容器(特定结构的内存空间)来解决生产者和消费者强耦合的问题。生产者和消费者之间不直接通讯,而是通过阻塞队列(容器)来进行通讯。
- 所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列即可。消费者不找生产者要数据,而是直接从阻塞队列里获取数据。
- 阻塞队列就相当于一个缓冲区,用来平衡生产者和消费者的处理能力,用来给生产者和消费者解耦合的
二、queue模拟阻塞队列的生产消费模型
- 那么生产者消费者模型中的特定结构的内存空间,即一个容器,这里我们采用阻塞队列来实现,那么下面我们就来使用queue模拟阻塞队列的生产消费模型,所以说阻塞队列阻塞队列那么就要用到队列,所以小编则采用c++中的queue来模拟阻塞队列进而实现生产消费模型
- 我们在上面讲的是生产者线程将数据生产到特定结构的内存空间(阻塞队列)中,消费者从阻塞队列中拿数据,可是仅仅传输诸如,int,string等类型的数据好像意义不是很大,在更多的应用场景中则是传输任务,即将任务封装在对象中放到阻塞队列中进行传输,下面我们就下来看一下模拟实现计算任务
Tash.hpp
基本框架
- 我们要实现的任务是计算加,减,乘,除,取模运算,那么就要有对应的运算符,所以我们定义一个全局的opers用于存储运算符
- 既然是运算,那么就有可能出现除零错误,取模零错误,以及运算符错误,所以我们使用枚举定义对应的错误码
- 接下来看类Task的私有成员变量,那么首先应该有左操作数data1_,右操作数data2_,操作符oper_,计算结果result_,错误码exitcode_
- 所以我们就在Task的构造函数中对这些成员变量进行初始化即可,这里的操作符需要传入,并且计算结果以及错误码默认为0
#pragma once
#include <iostream>
#include <string>std::string opers = "+-*/%";enum Err
{DivZero = 1,ModZero,Unknown
};class Task
{
public:Task(int data1, int data2, char op): data1_(data1), data2_(data2), oper_(op), result_(0), exitcode_(0){}~Task(){}private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};
operator()()
- 既然是封装的任务,那么就应该有任务的计算方法,这里为了便于调用我们使用仿函数,所以首先利用switch case语句对操作符oper_进行判断,然后计算即可
- 但是这里可能会出现除零错误,所以我们判断一下当运算符是除号的时候,如果第二个操作数data_为0,那么就会发生除零错误,这里我们就不进行运算了,直接设置错误码即可,用户看到操作码之后,就会知道出现错误了,结果不可靠
- 同样的也有可能会出现取模零错误,同样的方法和除零错误一样,如果第二个操作数data_为0,那么就会发生取模零错误,这里我们就不进行运算了,直接设置错误码即可
- 但是有可能用户传入的运算符是其它值,所以这里如果运算符不是加,减,乘,除,取模,那么就会进入default,那么同样的我们设置错误码即可
void operator()()
{switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if (data2_ == 0)exitcode_ = DivZero;elseresult_ = data1_ / data2_;}break;case '%':{if (data2_ == 0)exitcode_ = ModZero;elseresult_ = data1_ % data2_;}break;default:exitcode_ = Unknown;break;}
}
GetTask
- 那么生产者生产了什么样的任务到阻塞队列,还应该打印给用户看,所以Task类还应该提供一个获取任务,即将任务组成字符串进行返回即可
std::string GetTask()
{std::string r = (std::to_string(data1_));r += ' ';r += oper_;r += ' ';r += std::to_string(data2_);r += " = ?";return r;
}
GetResult
- 消费者经过计算完成数据之后,还要将计算结果回显给用户,所以Task类还应该提供一个成员函数将计算结果转换成字符串返回
std::string GetResult()
{std::string r = (std::to_string(data1_));r += ' ';r += oper_;r += ' ';r += std::to_string(data2_);r += " = ";r += std::to_string(result_);r += ", [";r += "exitcode: ";r += std::to_string(exitcode_);r += "]";return r;
}
BlockQueue.hpp
基本框架
- 所以我们就要使用c++中的queue模拟实现阻塞队列了,那么这个阻塞队列我们期望可以接收任意数据类型,因为在实际应用中可能会有不同的场景,接收不同的数据类型,所以这里我们采用模板类的方式实现阻塞队列BlockQueue
- 那么既然是queue模拟阻塞队列的生产消费模型,即模拟生产者消费者模型,那么就势必要满足生产者消费者模型的321原则,即3种关系(1)生产者与生产者:互斥(2)消费者与消费者:互斥(3)生产者与消费者:互斥,同步。2种角色:(1)生产者(2)消费者。1个场所:(1)特定结构的内存空间
- 所以如上特定的内存空间我们已经采用了阻塞队列来模拟,生产者和消费者会在主线程中创建新线程来扮演,所以3中关系我们如何满足呢?即首先我们要满足互斥,同步,要满足互斥,那么离不开互斥锁,要满足同步,那么离不开条件变量,所以阻塞队列的成员中还应该包含互斥锁和条件变量
- 注意这里的条件变量应该是两个条件变量,为什么呢?因为有两种线程,生产者线程,消费者线程这两种,假设只有一个条件变量,生产者线程和消费者线程都去这一个条件变量下等待了,那么假设此时队列中只有一个节点了,消费者线程消费了一个节点,那么此时队列中没有节点了,所以消费者线程应该定向的唤醒生产者线程生产节点到队列中,但是唤醒就成了问题,因为此时只有一个条件变量,生产者线程和消费者线程都去这一个条件变量下等待了,那么我能否保证下一个唤醒的一定是生产者线程呢?不能确定,因为唤醒的也有可能是消费者线程,此时一旦消费者线程被唤醒了,那么本来队列中就没有节点了,再继续pop数据,那么此时就崩溃了,所以万万不能采用一个条件变量的方法
- 而是应该采用两个条件变量的方法,一个生产者条件变量,让生产者去生产者条件变量下进行等待,一个消费者条件变量,让消费者去消费者条件变量下去进行等待。那么消费者线程此时如果想要特定的唤醒生产者线程就很容易唤醒了,直接去生产者条件变量下唤醒队头对应的生产者线程即可,无误
- 阻塞队列,那么也就是说这个队列可能会阻塞,即生产者或者消费者会阻塞,而消费者在队列中没有节点的时候无法消费,消费者需要阻塞等待生产者生产数据到队列,而生产者在队列空间满了的时候无法生产,需要阻塞等待消费者消费数据节点,那么为了让生产者也可以阻塞,所以这个队列的节点个数我们设置一个上线,那么我们就定义一个成员变量maxcap_作为空间的默认放置的节点个数,一旦队列节点的个数等于maxcap_那么就让生产者阻塞等待
- 所以综上,阻塞队列的私有成员变量应该有一个队列q,一个锁mutex_,一个生产者条件变量p_cond_,一个消费者条件变量c_cond_,一个最大容量maxcap_,同时还要定义一个静态成员变量static const int defaultnum = 20;作为i最大容量的初始值
- 那么我们在BlockQueue阻塞队列的构造函数中进行初始化即可,但是这个最大容量如果用户不显式传入,那么我们就采用默认值defaultnum即可,那么由于在类中定义的是局部锁以及局部的条件变量,所以要使用pthread_mutex_init初始化锁,使用pthread_cond_init初始化条件变量
- 那么在BlockQueue的析构函数中对使用pthread_mutex_destroy对局部锁释放销毁,使用pthread_cond_destroy对局部的条件变量释放销毁即可
#pragma once
#include <queue>
#include <pthread.h>template<class T>
class BlockQueue
{static const int defaultnum = 20;
public:BlockQueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr); }~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}private:std::queue<T> q;int maxcap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;
};
push
- 那么BlockQueue的成员函数push就是将节点添加到阻塞队列中,即入队列,即这个push应该是生产者线程来进行使用的,那么由于要保证互斥,所以在访问临界资源queue之前要先申请锁,访问完成之后要释放锁
- 那么对于生产者线程来讲,当阻塞队列中的节点个数等于最大容量maxcap_的时候就不能继续生产了,应该去生产者条件变量下进行等待,所以我们就要使用while循环判断,注意这里存在伪唤醒的情况,所以必须采用while循环判断,绝对不能采用if判断
关于伪唤醒,小编在上一篇文章中的最后一个现象解释中已经进行了详细的讲解详情请点击<——
- 那么紧接着就是入队,当生产者线程push将节点入队之后,可以确保这个阻塞队列中一定至少有一个节点供消费者线程消费,所以那么生产者线程就去消费者条件变量下唤醒队头的消费者线程即可,最后生产者线程释放锁即可
void push(const T& in)
{pthread_mutex_lock(&mutex_);while(q.size() == maxcap_){pthread_cond_wait(&p_cond_, &mutex_);}q.push(in);pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);
}
pop
- 那么BlockQueue的成员函数pop就是将节点从阻塞队列弹出,即出队列,即这个pop应该是消费者线程来进行使用的,那么由于要保证互斥,所以在访问临界资源queue之前要先申请锁,访问完成之后要释放锁
- 消费者线程什么时候要阻塞等待呢?即当阻塞队列中没有节点的时候不能将节点从队列中出队,并且我们为了避免伪唤醒的情况,应该使用while循环判断,当阻塞队列的节点个数为0的时候,应该让消费者线程去消费者条件队列下去进行等待
- 那么走到接下来一定是while循环判断不成立,即节点的个数不为0,那么接下来可以大胆的出队,所以当出队后,消费者线程可以保证,阻塞队列中至少一定有一个节点空间可以被使用,即可以确保一个生产者线程生产一个节点到阻塞队列,所以消费者线程就去生产者条件变量下唤醒生产者条件变量,最后消费者线程释放锁,同时return返回出队节点即可
T pop()
{pthread_mutex_lock(&mutex_);while(q.size() == 0){pthread_cond_wait(&c_cond_, &mutex_);}T out = q.front();q.pop();pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return out;
}
三、模拟单生产者单消费者模型
- 为了便于大家理解,小编先带领大家模拟实现单生产者单消费者模型
main.c
- 那么我们在main函数这个文件中,就要创建出生产者线程和消费者线程,然后这两个线程要执行对应的线程函数,生产者线程要执行生产者线程函数Producter,消费者线程要执行消费者线程函数Consumer,这两个线程函授都需要拿到阻塞队列才能进行操作,所以参数要传入阻塞队列,两个线程函数的开始都要线程void*类型转换为阻塞队列的指针才继续进行后续的操作
- 生产者线程函数Producter,就要先获取数据,在实际的场景中这个数据应该是从用户,网络等进行获取,那么这里我们采用随机数的方式获取数据,左操作数和右操作数的值我们控制在0到9之间,所以应该采用随机数取模10,即取模获取左操作数,usleep休眠
- 这个休眠是必须的,因为随机数是基于时间生成的,如果代码跑的太快,那么左操作数的值概率可能和右操作数相同,由于休眠了,大概率右操作数的值大概率可能和左操作数不同,由于数据范围太小,仅仅是0到9,所以随机数取模后有可能还是相同,不影响,我们期望看到正常结果,也期望看到异常结果,让右操作数为0,发生除零错误,发生取模零错误,我们期望看到异常对应的错误码
- 计算运算符字符串opers的长度,由于opers = "+-*/%"所以我们计算出这个字符串的长度为5,然后让随机数取模5,这样就可以在0到4之间随机,然后我们采用下标+[ ]的方式拿操作符op即可
- 接下来采用上面的构建好的数据传入Task类构造出对象t,我们再打印一下任务回显给用户即可,并且也将线程的tid进行打印,然后push到阻塞队列中即可,最后小编让生产者线程函数休眠1秒,即生产者生产慢,期望看到同步现象,即当阻塞队列中没有节点数据的时候,消费者线程阻塞等待生产者线程生产节点数据,上述过程要放到while死循环中,我们期望生产者线程源源不断的生产节点到阻塞队列,一旦阻塞队列满了,那么生产者线程要阻塞等待消费者线程消费阻塞队列的节点
- 那么下面小编讲解消费者线程Consumer,那么消费者线程就要取出阻塞队列的节点,调用仿函数进行运算,最后回显打印结果,同时打印线程的tid即可,同样的上述过程放到死循环中,即消费者线程源源不断的从阻塞队列中拿节点,即当阻塞队列中没有数据的时候,消费者线程阻塞等待生产者线程生产节点数据
- 接下来是main函数中的逻辑,main函数创建一个随机数种子,主线程new一个阻塞队列便于传参到线程函数中,然后创建一个消费者线程,创建一个生产者线程,然后主线程pthread_join等待两个线程退出即可,最后delete释放阻塞队列放置内存泄漏
- 所以当下面的进程运行,我们期望看到生产者线程先运行向阻塞队列生产节点数据,因为如果消费者线程先运行,那么由于阻塞队列中没有数据,那么消费者线程就会阻塞等待生产者线程向阻塞队列生产节点数据,并且由于生产者线程中最后释放锁之后执行了sleep,那么生产者线程休眠期间,消费者线程就会被唤醒,然后继续执行出队列,然后运行任务,打印任务信息,然后继续循环,但是由于此时阻塞队列已经为空,所以消费者线程又会阻塞等待生产者线程生产节点数据到阻塞队列中,以此间隔1秒循环往复,并且期望看到除零错误为1以及取模0错误为2的错误码(但是这个概率出现,因为小编是采用的是随机数的方式,右操作数是0,同时运算符还为除或者取模的概率较小,所以后面小编不一定可以演示出来)
#include "Task.hpp"
#include "BlockQueue.hpp"
#include <iostream>
#include <ctime>
#include <pthread.h>
#include <unistd.h>void* Consumer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(true){Task t = bq->pop();t();std::cout << "获取任务" << t.GetTask() << ", 执行任务: " << t.GetResult() << (void*)pthread_self() << std::endl;}
}void* Producter(void* args)
{int len = opers.size();BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(true){//模拟获取数据int data1 = rand() % 10; //[0, 9]usleep(10);int data2 = rand() % 10; //[0, 9]char op = opers[rand() % len];Task t(data1, data2, op);std::cout << "生产了一个任务" << t.GetTask() << (void*)pthread_self() << std::endl;bq->push(t);sleep(1);}
}int main()
{srand(time(nullptr));BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, Consumer, (void*)bq);pthread_create(&p, nullptr, Producter, (void*)bq);pthread_join(c, nullptr);pthread_join(p, nullptr);delete bq;return 0;
}
运行结果如下,无误,我们也成功的看到了错误码为1,为2的情况,当错误码不为0,那么就代表结果错误,当错误码为0代表结果无误
四、多生产者多消费者模型
main.c
- 那么在main函数中定义消费者线程的数组以及生产者线程的数组,那么使用for循环分别进行pthread_create创建多个对应的新线程,最后主线程pthead_join等待对应的多个新线程即可,这里创建3个消费者线程,创建3个生产者线程
- 生产者线程函数Producter小编去掉最后的休眠,所以可以观察到一瞬间多个线程就会将阻塞队列打满
- 消费者线程函数Consumer小编在最后可以添加sleep休眠1秒,这样让消费者线程跑慢点
- 由于最开始阻塞队列没有节点数据,所以生产者线程先运行,即使消费者线程先跑,那么消费者线程最开始也会由于阻塞队列中没有节点数据而去等待生产者生产节点数据到阻塞队列,所以我们期望看到程序一运行,由于生产者线程由于执行的线程函数中没有进行sleep休眠,一瞬间阻塞队列就被打满,但是由于过程中会唤醒消费者线程,所以打满的中途会有三个消费者线程执行任务最后执行sleep一秒,当阻塞队列满了之后,经过while判断,生产者线程就会都去阻塞等待,此时消费者线程sleep一秒完成就会运行,while判断阻塞队列不为空可以继续向后执行,于是就会有三个消费者线程,那么就会出现同时三个任务执行,间隔一秒,同时三个生产者线程被唤醒进行生产,紧接着三个生产者线程就会去唤醒三个消费者线程继续执行,间隔一秒这样循环
#include "Task.hpp"
#include "BlockQueue.hpp"
#include <iostream>
#include <ctime>
#include <pthread.h>
#include <unistd.h>void* Consumer(void* args)
{BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(true){Task t = bq->pop();t();std::cout << "获取任务" << t.GetTask() << ", 执行任务: " << t.GetResult() << (void*)pthread_self() << std::endl;sleep(1);}
}void* Producter(void* args)
{int len = opers.size();BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);while(true){//模拟获取数据int data1 = rand() % 10; //[0, 9]usleep(10);int data2 = rand() % 10; //[0, 9]char op = opers[rand() % len];Task t(data1, data2, op);std::cout << "生产了一个任务" << t.GetTask() << (void*)pthread_self() << std::endl;bq->push(t);}
}int main()
{srand(time(nullptr));BlockQueue<Task>* bq = new BlockQueue<Task>();pthread_t c[3], p[3];for(int i = 0; i < 3; i++){pthread_create(c + i, nullptr, Consumer, (void*)bq);}for(int i = 0; i < 3; i++){pthread_create(p + i, nullptr, Producter, (void*)bq);}for(int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for(int i = 0; i < 3; i++){pthread_join(p[i], nullptr);}delete bq;return 0;
}
运行结果如下,无误
总结
以上就是今天的博客内容啦,希望对读者朋友们有帮助
水滴石穿,坚持就是胜利,读者朋友们可以点个关注
点赞收藏加关注,找到小编不迷路!