java ThreadPoolExecurtor源码解读 --- Worker
作为线程池中的实际工作者,worker的代码量非常的少,但是它的一些思想比如高内聚或者委派还是很有意思的,整体设计上还是很精妙的这篇文章带你从源码角度看看它
作为线程池的实际工作人员worker 采用的是内部类的设计方式,具有高内聚和封装性的特点,一方面它的唯一目的就是为了ThreadPoolExecutor服务他们之间逻辑紧密关联,另一方面也没有必要向外部暴露实现细节,同时作为内部类它可以直接、无缝地访问线程池地所有状态和方法。
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}------------------------------------------------------------------protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
Worker类
先来看Worker 这个类 它继承了AQS 同时实现了Runnable
继承AQS 主要是为了
protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }
1.使用它的 tryAcquire(int) 在Worker里将它用tryLock 进行了封装目的是获取当前线程的状态(是否持有锁)有锁表示当前正在执行Task,无锁则表示当前处于空闲状态,根据情况可以安全地进行中断
2.使用它地tryRelease(int)在Worker里用unlock进行了封装 通过 state==1 告诉外界“我正在跑任务”,state==0 表示“我空闲”
3.使用它的isHeldExclusively()在Worker里面用isLocked进行封装目的是确保获取锁当前线程并未持有锁,确保他是一个不可重入的独占锁
问题:为什么Worker要设置成不可重入的?
对于worker 而言锁存在的唯一价值就是一个二元状态的探针而不是锁保护临界区这种功能不可重入使得state==1完全等价于当前正在跑任务这句话永远成立,从而不会造成 shutdown() 或者 shutdownnow()此类方法出现 误杀和漏杀的现象(允许重入会导致state允许大于等于2的情况导致,获取worker状态混乱)
worker的核心方法 run
public void run() { runWorker(this);}
“把主运行循环委托给外部的 runWorker 方法。”这是它的官方注释
——换句话说 Worker
本身只是一个 Runnable 外壳,它的 run()
不做任何业务逻辑,直接把 this
扔给线程池的 private 方法 runWorker(Worker w)
去跑。 真正的“取任务-执行任务-循环”主循环全部写在 runWorker
里,Worker
只负责:
-
被线程工厂
newThread(this)
包装成 java.lang.Thread; -
线程启动时调
run()
→ 立即跳进 线程池主体逻辑runWorker
这么做的目的是为了运行策略集中化:所有池化管理代码(锁、统计、中断、异常、钩子)都集中在 ThreadPoolExecutor 一个类里,Worker 只当“线程+锁”载体,不越权。
worker的构造方法
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}
setState(-1) 依然是调用的AQS 里的方法,我们上面说过了各种操作只是为了确保worker的二元状态 (正在运行用户的任务/空闲可以被安全中断)因此在构造的时候不属于这两个状态的其一便通过 该方法将state设置为-1
构造出来的worker通过factory方法将自身作为参数new出来的thread赋值给自己的thread 也就是他和创建出的线程相互绑定,从而实现一种更灵活易懂的调用链
addWorker ->
newWorker (w) ->
w.thread.start() ->
因为w.thread绑定的是自己所以这里实际调用的也是自己的 run方法 ->
runWorker(this)
自此该woker在自己这个线程上开始正常工作 接受任务并处理
总结:对于worker 来讲最重要的是我们要抽象的看待它,只是将他视为状态和线程的载体,真正的调用和监测全部由外部的线程池来操作。就像是一个公司里的员工,你只负责工作就够了,工作内容的调度以及你这个员工的休假或者辞退等等都是由公司负责