多线程(4)
1阻塞队列:
1.1概念:
阻塞队列就是一种特殊的队列,两大特性就是1-->线程安全 2--->具备阻塞功能,是一种线程安全的数据结构,当队列满是,入队操作就会阻塞等待,队列为空的时候出队操作也会阻塞等待,只有在队列不满时才能正常入队,队列不为空时才能正常出队列,常用于生产者消费者模型
参见的阻塞队列:
BlockingQueue是一个接口,具体常用的实现类有ArrayBlockingQueue+LinkedListBlockingQueue+PriorityBlockingQueue,入队列的核心操作是put,出队列是take,当然queue支持的offer poll 操作也是支持的,但是我们不用,因为这些操作不具备阻塞功能
,如果强用当队列满了,offer的话,直接offer会失败.
ArrayBlockingQueue是需要指定边界的,就是指定容量,LinkedListBlockingQueue可以指定,也可以不指定,但是如果不指定的话默认值是非常大的,这样会导致队列非常大,导致内存耗尽,产生内存超出范围这样的异常,最好还是要根据具体的需求指定大小的
1.2阻塞队列的实现
思路:我们使用循环数组来模拟实现队列,这个队列需要具备阻塞等待功能
使用synchronized搭配wait notify使用,在不同线程中,我们都是用过这个阻塞队列的实例化对象来调用put take 方法,所以即使在不同线程中,使用的阻塞队列bq 是一样的,所以我们可以对this加速(因为我们将wait 和notify的锁对象必须是一样的)
细节:对于是否wait的条件判断不能只用if来作判断,因为wait可能被异常唤醒,导致错误的执行往下的代码
1:wait可被interrupt这样的中断方法提前唤醒,这个时候队列压根就还是满的(或者是空的),你这个时候去执行下面的增加操作就是一个错误操作
2:假设有多个put在阻塞等待,其中一个put抢到锁后增加元素操作后,然后notify,这notify本意是去唤醒那些一开始因为队列是空而无法take的线程,然而假如你这个时候阴差阳错的去唤醒了put线程,这个时候队列因为最开始的put操作可能已经满了,你这个时候在对一个满的队列再往里面插是错误操作了.
所以推荐wait搭配while使用,由此我们知道wait不仅要搭配synchronized使用也要搭配while使用
因为文档里面本来就推荐我们wait搭配while使用,这样可以多一层校验
static class MyBlockingQueue {//使用循环数组模拟实现队列int[] arr;private int size = 0;//统计元素个数private int head = 0;private int tail = 0;public MyBlockingQueue(int capacity) {this.arr = new int[capacity];}public void put(int elem) throws InterruptedException {synchronized (this) {while (size >= arr.length) {//队列满了需要阻塞等待//此处建议使用while---作二次判断this.wait();}//等待结束可以正常插入arr[tail] = elem;tail++;if (tail >= 100) tail = 0;// tail = (tail + 1) % data.length;size++;this.notifyAll();}}public int take() throws InterruptedException {synchronized (this) {while (size == 0) {//说明队列是空的需要阻塞等待队列不为空this.wait();}//队列不为空了int ret = arr[head];head++;if (head >= 100) head = 0;size--;this.notifyAll();return ret;}}}static int i=0;public static void main(String[] args) {MyBlockingQueue bq=new MyBlockingQueue(100);Thread producer=new Thread(()->{//生产者线程while(true){try {bq.put(i);System.out.println("生产一个元素"+i);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer=new Thread(()->{while (true){try {int p=bq.take();System.out.println("消费一个元素"+p);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();}
2生产者消费者模型:
2.1生产者消费者概念
生产者消费者模型其实就是通过一个容器作为中间缓存,来解除生产者和消费者之间的强耦合关系,比如说某个汽车厂商生产某款汽车,厂商可能会一股脑生成一大堆汽车,那么这堆汽车也不一定有大量的消费者立即来购买,然后就将车放到仓库里,这个就是库存车,这个仓库就是我们的阻塞队列,这个容器就是阻塞队列,当你这个仓库都满了,你这个厂商肯定就不会再去生产车了,当有消费者了就从仓库里取走车辆,生产者和消费者之间并没有直接相关,并不是说有消费者向生产者发送需求了再去生产,生产者生产的车辆立马就到消费者手里,中间存在一个阻塞队列,当然这个库存里可能就没有车,这个车卖的特别火爆,那么这时消费者就需要阻塞等待,比如小米汽车,消费者一般都需要等上100天左右才能提车,因为供不应求,起到解耦合的作用,
2.2生产者消费者模型的优势
1解耦合:
如果没有中间的阻塞队列,那么生产者消费者之间直接相关,生产者中有有关消费者的代码,消费者中有有关生产者的代码,修改A需要考虑到B,修改B需要考虑到A这样的话修改起来特别麻烦,现在就是A和队列交互,A和队列耦合,B和队列交互,B和队列耦合,AB不耦合,然后队列一当new出来就一般不修改,修改A B的时候就无需要考虑太多直接修改,降低了修改代码的成本.
2削峰填谷
当生产者产生大量请求时,如果一股脑的全部交给消费者,消费者可能处理不过来,当服务器提供的资源满足不了需求的资源时,服务器就会崩溃,这时如果中间加一个缓存区,将生产的请求都放入队列,然后消费者有条不紊的在自己能力范围内取出适当数量请求来处理,确保消费者的压力保持一定水平不突然上升和下降,高峰期请求特别多,低峰期请求少,就会出现高峰做不完,低峰没事做这种情况,所以我们将高峰的一部分任务分配到低峰来,将任务平均到低峰,达到削峰填谷效果,高峰期过了,高峰期余留的请求可以继续在低峰期去完成,让低峰期也别空着,帮着去处理高峰期余留的任务.
生产者消费者模型代码:
static int i=0;public static void main(String[] args) {BlockingQueue<Integer> bq=new LinkedBlockingDeque<>(100);Thread producer=new Thread(()->{//生产者线程while(true){try {bq.put(i);System.out.println("生产一个元素"+i);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer=new Thread(()->{while (true){try {int p=bq.take();System.out.println("消费一个元素"+p);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();}
生产者和消费者的速度其实是旗鼓相当的,一般不会出现阻塞,生产一部分消费一部分,只有当出现上述代码的极端情况,就是一边快,一边慢,才可能出现上述的阻塞等待情况
3线程池:
3.1线程池概念:线程池是一种管理和复用线程的机制,我们在应用程序中创建一组可用线程,即线程池,并在需要执行任务时,将这些任务分配给这些线程来执行,线程池的主要目的是优化线程的创建和销毁和管理,提高多线程应用程序性能和效率
原因就在于我们可以对线程实现复用,创建线程和销毁线程都需要占用资源,频繁的创建和销毁比起只创建一次然后一直复用,也不销毁需要的资源多的多
(比如创建消耗50,删除消耗50,创建销毁1次就是100资源,5次就是500资源,然而复用的话,只需要消耗一开始创建线程消耗的50资源,然后后续也没有销毁操作,整个过程只需要50资源)
3.2为什么我们认为从线程池取线程比创建线程高效?
去取线程这个"取"操作需要开销,创建线程的这个"创建"也需要开销,凭什么你就说取更好嘞?
首先操作系统是由内核+配套应用程序组成的,内核里包含操作系统的各种核心功能----->比如管理(鼠标键盘这样)硬件设备-----给软件提供稳定的运行环境,当然创建线程肯定也就是内核控制,也就是创不创建线程内核说的算,内核要做的事情这么多,这会儿想着去管理管理硬件,一会儿想着去干干那个,对于创建线程这件事情来说就是不可控的了,什么时候轮到创建线程,是不可控的说不定的,就好比你让你朋友给你登录某个游戏做做任务,如果你条件允许你自己立马就上游戏开刷了,但是让朋友来干的话可能就不可控了,你也不知道他什么时候帮你刷,可能下午,可能晚上?
那么''创建线程''这个操作是依赖操作系统内核完成的是不可控的,由内核说的算,你程序员无法通过代码控制-----而''取''这个操作是可以通过应用程序代码控制的,是可控的,我们通常认为可控比不可控这样的操作更高效,所以你从线程池里取线程来执行任务比创建线程更高效.
3.3java标准库提供的可以直接使用的线程池:
主要看最后一个,最后一个会了,其他的也就会了
第一个参数:int corePoolSize 核心线程数:至少有多少个线程,线程池一旦创建,附和的这些核心线程也随之创建,直到线程池销毁,这些核心线程才销毁,类似于Steam上买的游戏,只买游戏本体他也会送一些DLC,这些DLC也就是核心DLC,和我们这个核心线程是一个意思,那么顾名思义你肯定还可以还其他DLC,也就是非核心DLC,也就是说一定还存在非核心线程.
第二个参数
:
int maximumPoolSize 表示最大线程数,最大线程数=核心线程数+非核心线程数,非核心线程随具体情况创建销毁,当任务量大,核心线程处理不过来时创建一批临时的非核心线程来,当任务量降下来之后,可以将这些没必要的非核心线程销毁,也就像实习生一样,一个公司业务繁忙时可能招一批实习生过来处理任务,当没啥事的时候就可能将这批实习生裁掉,但是对于核心线程可不敢乱销毁,也就是你一个公司的核心人员你可不敢乱裁对吧!核心线程是随线程池而生随线程池而毁,非临时的,而非核心线程是临时的,可随时销毁
第三个参数:long keepAliveTime 非核心线程允许空闲的最大时间,我们说当创建的非核心线程是可以销毁的,什么时候销毁呢?当任务量不繁忙,非核心线程空闲下来后,开始计时,超过keepAliveTime就立马将该线程销毁
第四个参数:TimeUnit unit 第三个参数给的时时间的数值,比如100, 1000,10,但是没给单位,那么TimdeUnit就是标准库内置的一个枚举类,里面包含了常见的时间单位,
第五个参数:BlockingQueue<Runnable> workQueue 工作队列,其实线程池也是一种生产者消费者模型,调用submit就相当于生产了一个任务,然后将该任务往阻塞队列里面传,然后取出任务给线程池当中的线程让他们来执行,这个阻塞队列<Runnable>中是Runnable这样的任务,我们可以指定阻塞队列的底层组成--->数组/链表--->容积--->优先级等属性
第六个参数:ThreadFactory threadFactory 传一个工厂类,该类提供了帮助new 不同线程的方法,通过该工厂中的方法new 出不同的线程出来构成线程池
工厂模式:工厂模式是和单例模式并列的一种设计模式,工厂模式是为了弥补构造方法的缺陷,为什么这样说呢?我们知道构造方法如果想要写多个的话必须通过重载来实现,这个实现不一定能成功,比如描述一个点,你就得new 然后调用对应的构造方法去new 出一个点对象出来
class Point{int x;int y;public Point(int x,int y){//通过这个构造方法来构造一个通过 x y 描述的点this.x=x;this.y=y;}public Point(int r,int a){//通过这个方法来构造一个通过 r(极坐标半径) a(角度)描述的点//y=r*sin(a);//x=r*cos(a);//只是打比方}}
// 当你得到r 和a 你去调用x y的构造方法肯定是无法正确的描述一个点的 //你必须调用这个下面的构造方法,将r 和a 通过三角函数进行转换计算得出值然后再赋值给x y 才对,你直接无脑调用第一个构造方法,拿r a 直接赋值给x y肯定不对,所以需要调用不同的构造方法,我们就需要重载不同的构造方法,但是很明显你在同一个类当中无法实现这个重载 //因为这两个参数类型 个数 都是一样的--会直接编译不通过
像这样的一个情况就是和明显的构造方法缺陷问题,我们如何解决这个问题?----->工厂模式
顾名思义一个工厂就是专门加工生产某个物品的区域,生产出来的产品一般具备专一性,你不可能特斯拉汽车生产工厂生产出个小米汽车,小米汽车也不可能生产处特斯拉,我们有一个工厂类,这个类中提供专门的静态方法,这个就是工厂方法,这些方法针对不同的需求将new 的过程和初始化的过程进行了封装-------这个比较抽象,我们举个例子-------线程池里面有很多线程,线程具备很多属性,你创建的多个线程,有些线程你想要给他初始化某些属性,另外的一些线程又不想初始化这部分属性,想去初始化另外一部分属性,这个时候就需要在new 的时候调用不同的构造方法,刚才说了这个构造方法不是说你想实现就能再类当中实现的,这时我们就需要工厂模式了,提供不同工厂方法,方法体内直接无脑new 出一个线程对象,至于这个对象的属性我们再根据传来的参数一一进行初始化即可
//工厂类
class ThreadFactory{public static MyThread makeThread_A(int A){//先无脑new 一个线程对象MyThread thread=new MyThread();//这个方法是创建一个线程,只初始化A属性---我们再手动初始化即可thread.A=A;//然后返回这个按照指定要求创建好的线程即可return thread;}public static MyThread makeThread_B(int B){//先无脑new 一个线程对象MyThread thread=new MyThread();//这个方法是创建一个线程,只初始化B属性---我们再手动初始化即可thread.B=B;//然后返回这个按照指定要求创建好的线程即可return thread;}public static MyThread makeThread_B_A(int B,int A){//先无脑new 一个线程对象MyThread thread=new MyThread();//这个方法是创建一个线程,只初始化AB属性---我们再手动初始化即可thread.B=B;thread.A=A;//然后返回这个按照指定要求创建好的线程即可return thread;}}class MyThread{//如下是abcd 4个不同的属性int A;int B;int C;String D;
}
上面这个例子就是很好的将new 和初始化的过程封装到我们的工厂方法当中了,我们要构造一个线程对象的时候我们只需要直接调用对应的工厂方法,把方法传过去,让工厂方法帮你初始化,帮你new 对象即可,不一会儿工厂方法就会将按照你要求的指定线程创建好然后返回给你了......这样就很好的解决了因为构造方法无法实现重载然后无法new 出指定的对象的这样的问题了.
第七个参数(最重要)RejectedExecutionHandler handler ---拒绝策略---当我们将任务往队列里扔,队列说我满了,我拒绝你进来,应对队列这个拒绝我们采取的策略
其实我们在一个线程中调用submit方法后,如果阻塞队列满了,我们不是简单的就直接进行阻塞等待了,(因为我们知道其实等待这件事就是一件非常影响效率和令人烦恼的事情,比如我们假期出去玩遇到堵车,本来假期时间就非常宝贵,现在好了还把大部分时间浪费在毫无意义的等待上面确实是令人烦恼的),程序也一样,能不等待就尽量不去等待,使用其他方案来解决,当队列满了时并不是立即就会去执行put方法将任务put到队列当中,如果直接执行put肯定就阻塞等待了,真正做法而是在前面加一层判断,如果满了我们就去执行其他的方案.-----那么这参数就是这个方案
具体操作的伪代码:
具体的拒绝策略:
1AbortPolicy 线程池直接抛出异常---导致正在处理的任务也中断
2 CallerRunsPolicy:让调用submit方法的这个线程自个去执行这个任务
3 DiscardOldestPolicy:丢弃到队列中最老的任务,放这个新任务入队列
4 DiscardPolicy:丢弃到当中队列中最新的任务,让目前的这个任务入队列
3.4创建线程池
首先我们可以直接new一个ThreadPoolExecutor类还实例化出一个线程池,但是这个类的构造方法是在是有点多有点复杂,我们需要合理的维护每个形参才能顺利的创建
有无更加简单的创建方法呢?肯定是有的---Executo类
Executor这个类就相当于一个工厂类,里面有很多工厂方法,每种工厂方法生产出来的线程池不一样,
我们知道工厂模式就是将`new 和初始化的过程进行了封装,那么在工厂方法里已经就new线程池然后返回了,你想要创建一个怎么样的线程池,现在就只需要简单的去调用对应的工厂方法即可,最多需要考虑一两个参数,之前的那些567个参数都不用去管了
具体的两个工厂方法:
1---->newFixedThreadPool() 可以指定核心线程数,但是核心线程数和最大线程数一样,也就是说没有所谓的非核心线程
2--->newCachedThreadPool() 这个不需要线程数量,核心线程数为0,最大线程数为正无穷
3.4手动实现一个线程池
class MyThreadPool{//创建一个工作队列---消息队列BlockingQueue<Runnable> workqueue;//往队列里塞任务的方法public void submit(Runnable task) throws InterruptedException {workqueue.put(task);}private int n;//线程池的线程数量public MyThreadPool(int n) {this.n = n;this.workqueue=new ArrayBlockingQueue<>(1000);//通过循环的方式创建出多条线程for(int i=0;i<n;i++){Thread t=new Thread(()->{try {Runnable task=workqueue.take();//取任务task.run();//执行任务} catch (InterruptedException e) {throw new RuntimeException(e);}});t.start();//开启线程}}
}
public class demo30 {public static void main(String[] args) throws InterruptedException {MyThreadPool myThreadPool=new MyThreadPool(10);for (int i=0;i<100;i++){int id=i;//任务编号//通过main线程向线程池提交任务myThreadPool.submit(()->{System.out.println(Thread.currentThread().getName()+" "+"id="+id);});}}}