线程池ThreadPoolExecutor源码剖笔记
一、为什么要有线程池?
当要执行一些异步任务时,为何不直接new一个Thread,然后start方法。
减少性能开销:如果每次执行异步任务,都去构建一个全新的线程执行,但是任务执行完毕后,线程还要销毁。可以利用线程池,实现资源复用,线程处理完任务,不会立即销毁,而是等待新任务的到来。类似连接池。
可以充分发挥CPU性能:如果每次来任务都直接new,没有办法去把控线程的个数。如果太多了,上下文频繁的切换,性能有损耗。如果太少了,CPU资源没有发挥出来。线程池可以指定好工作线程的个数,充分发挥CPU资源。
监控的效果:如果直接new的话,无法查看线程的个数或者工作的状态。但是采用了线程池之后,本身帮咱们记录很多信息,工作线程个数,有多少个任务在排队,每一个工作线程处理了多少个任务,整个线程池处理了多少个任务。
定时任务,批处理消费任务: 在半夜的时候,人少的时候,服务器的资源都是空闲下来的,完全利用这个时间,采用多线程的方式,多线程并行处理任务,不但可以充分的发挥CPU资源,还可以提升定时任务或者批处理的速度。
其实Java程序中,肯定用到了多线程: 比如Tomcat部署,Tomcat内部就有线程池资源,请求打过来,就是由Tomcat线程池处理的。OpenFeign,RabbitMQ这种框架或者中间件,内部也会涉及到线程池资源的信息。类似Hystrix的线程池隔离策略等等,都会涉及到线程池。
做任务的异步处理: 类似发送邮件,发送短信之类的操作,咱们可以通过异步的形式来玩,异步本身就是开启了一个线程,比如SpringBoot中@Async,内部就会提供一个线程池,来实现异步的功能。
………………
二、线程池的核心属性?
Java中的JUC包下,提供了一个非常好用的线程池,ThreadPoolExecutor。
首先掌握一下核心的属性,ctl
先简单的解释一个AtomicInteger是个什么鬼。
1、可以简单的看成,AtomicInteger就是一个int数值。
2、因为线程池中存在并发修改ctl的情况。AtomicInteger内部是基于CAS的方式,对ctl属性做修改。
Ps:CAS其实就是比较和交换,Compare And Swap。基于CPU支持的原语,来保证的原子性。保证修改某一个属性的时候,是原子性的。
ctl在线程池中,维护这两个信息:
- 工作线程的个数。
- 线程池的状态。
一个int类型,怎么维护这两个信息?线程池内部,将这个本质是int类型的ctl,拆分为32个bit位。
前3个bit位,维护线程池的状态。
低29个bit位,维护工作线程的个数。
维护工作线程: 计算工作线程最大个数的操作
00000000 00000000 00000000 00000001 == 1
<< 29
00100000 00000000 00000000 00000000 == 1 << 29
-1
00011111 11111111 11111111 11111111 == (1 << 29) - 1
换算10进制
536870911
维护线程池状态:
// 线程池状态RUNNING是最小的,依次递增到SHUTDOWN,RUNNING,TIDYING,TERMINATED…………
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
三、线程池的核心参数?(不会就回家等通知~)
线程池提供的有参构造里的7个参数。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {/*corePoolSize:核心线程数。maximumPoolSize:工作线程总数 = 核心线程数 + 非核心线程数。keepAliveTime:默认情况下,针对非核心线程的最大空闲时间unit:空闲时间的单位workQueue:任务排队的地方,可以指定长度。threadFactory:创建工作线程的。handler:拒绝策略。在核心线程满了,非核心线程满了,工作队列满了,才走拒绝策略。*/
}
线程池默认提供了四种拒绝策略
AbortPolicy:甩一个异常在你的脸上,活干不了~~~
DiscardPolicy:这个代表任务直接丢失,什么事都没做
DiscardOldestPolicy:将工作队列排在最前的任务丢失,然后重新投递当前任务
CallerRunsPolicy:让投递任务的线程自己处理
四、线程池提交任务的完整流程?
网上随便找的图。
// 任务投递给线程池之后的处理
// command:投递过来的任务
public void execute(Runnable command) {// 如果投递的任务为null,直接甩异常。if (command == null)throw new NullPointerException();// 拿到核心属性ctl。int c = ctl.get();// workerCountOf:在获取工作线程个数// 现在的工作线程数 小于 核心线程数if (workerCountOf(c) < corePoolSize) {// 基于addWorker方法,创建工作线程处理任务command,传递的true,代表构建核心线程// addWorker,返回true,创建工作线程成功,反之失败。if (addWorker(command, true))// 代表创建线程成功,任务已经交给线程池了。return结束了。return;// 代表创建线程失败了,有情况,重新获取ctl。做后续判断c = ctl.get();}// 任务交给核心线程处理失败了,代码走到这。// 线程池状态是RUNNING么?如果是,就尝试将任务扔到工作队列// 如果任务成功的投递到了工作队列排队,返回true,反之返回false// 返回true,进到if,execute方法结束if (isRunning(c) && workQueue.offer(command)) {// 代码到这,说明任务已经添加到工作队列// 再次获取ctlint recheck = ctl.get();// 再次判断,线程池状态是不是RUNNING// 如果状态不是RUNNING,走remove方法,将任务从工作队列中移除。// 移除任务操作,成功返回true,反之falseif (! isRunning(recheck) && remove(command))// 说明任务移除成功,给任务甩一波拒绝策略。reject(command);// 状态是RUNNING,或者,没拒绝// 查看工作线程个数是不是0个。else if (workerCountOf(recheck) == 0)// 创建一个非核心线程,处理这个任务,毕竟任务饥饿。addWorker(null, false);}// 到这,说明任务没扔到工作队列。// 创建非核心线程,处理任务。else if (!addWorker(command, false))// 如果创建非核心线程失败了,执行拒绝策略reject(command);
}