FutureTask 源码解析
摘要
介绍异步编程task封装类FutureTask源码
1 类接口
提供了可取消的异步计算功能,可以任务结果获取,状态查询,任务取消功能。等待任务结束的线程会阻塞等待在该对象上,被封装为 WaitNode 对象,阻塞等待。
实现接口 java.util.concurrent.RunnableFuture ,该 接口为函数式接口,定义了一个方法 void run(); 并且该接口继承了 Runnable, Future<V> ;
Runnable 代表线程执行的任务,所以一个 FutureTask 可以提交给一个 Executor 来执行。
Future 提供了任务结果获取,状态查询,任务取消功能
2 成员变量及其对应的数
状态机
* Possible state transitions:
* NEW= 0; -> COMPLETING = 1; -> NORMAL = 2;
* NEW= 0; -> COMPLETING = 1; -> EXCEPTIONAL = 3;
* NEW= 0; -> CANCELLED= 4;
* NEW= 0; -> INTERRUPTING = 5; -> INTERRUPTED = 6;
初始化状态为 NEW ,只有 set, setException, and cancel 可以将状态转换为 terminal
COMPLETING 计算完成正在将结构set给outcome时的状态,包括任务执行正常结束或者异常结束
INTERRUPTING (在方法 cancel(true) 时状态会转换为 INTERRUPTING,中断当前线程后状态修改为 INTERRUPTED 如果参数为false 状态转换为 CANCELLED)
从这些中间状态转变为最终状态时,使用了开销较低的有序 / 延迟写入操作,因为这些状态值是唯一的,并且不能再被进一步修改。
NORMAL 代表任务正常结束,没有异常
outcome 任务计算结果放在该对象中,包括异常也会放在该结果中
作用-> 接口暴露的方法源码
java.util.concurrent.FutureTask#run
a. 状态校验如果为NEW并且CAS更新runner为当前线程成功,进入后续方法执行
b. doubleCheck 如果状态不为NEW不执行任务,存在这样的情况,线程A 线程B通过 state != NEW 的判断,线程A完成CAS,并且执行完当前run方法 此时 runner被重新更新为null,这个时候线程B才开始执行CAS,并且成功,所以重复校验了下state的状态 如果不为null代表已其他线程执行了当前任务
c. 直接执行 call() 方法,如果正常结束,
CAS更新状态为 COMPLETING 将结果赋值为 outcome ,如果CAS更新状态失败,不执行后续逻辑,直接返回
赋值完后使用 putOrderedInt 方式更新状态为 NORMAL 唤醒waiters上阻塞获取结果的线程,唤醒完成后调用钩子方法 done();
d. 如果不是正常结束
CAS更新状态为 COMPLETING 将异常赋值为 outcome ,如果CAS更新状态失败,不执行后续逻辑,直接返回
赋值完后使用 putOrderedInt 更新状态为 EXCEPTIONAL 唤醒waiters上阻塞获取结果的线程,唤醒完成后调用钩子方法 done();
e. finally 代码块
重新获取状态,如果状态 >= INTERRUPTING ,循环判断 state== INTERRUPTING 调用yield 等待状态被更新为最终 INTERRUPTED 状态.因为 INTERRUPTED 为最终状态,使用 sun.misc.Unsafe#putOrderedInt 更新。这个方法不保证更新后的值对其他线程立即可见。所以需要在这里等待下
除此以外 NORMAL EXCEPTIONAL 状态更新也是相同的方法。因为是最终状态,所以不保证立即可见性
java.util.concurrent.FutureTask#get()
如果状态 <= COMPLETING 阻塞等待,调用 java.util.concurrent.FutureTask#awaitDone 获取任务执行结果。获取过程根据执行状态进行对应处理
a 如果状态 > COMPLETING 代表任务执行完成 返回状态
b 如果状态 == COMPLETING 当前线程 yield 进入下一次循环
c 否则当前任务正在执行,创建 WaitNode ,在下一次循环中通过CAS方式追加到 waiters 后面
d 如果有超时时间,定时阻塞,否则一直阻塞
e 上面步骤可能失败,所以放在for循环里面重试,重试过程中会校验当前线程的中断位,并且响应中断抛出异常
根据 awaitDone 方法返回的当前任务状态,在 java.util.concurrent.FutureTask#report 中将来异常|执行结果 赋值给outcome 然后返回。
java.util.concurrent.FutureTask#cancel
a 状态校验,只有状态为NEW才允许取消,根据传递参数CAS更新state为INTERRUPTING : CANCELLED ,如果状态不正确或者CAS失败返回false
b 如果参数 mayInterruptIfRunning == true 中断当前线程 CAS更新状态为 INTERRUPTED
c 唤醒waiters上阻塞获取结果的线程,唤醒完成后调用钩子方法 done();
构造方法接受 Runnable 和 Callable 类型的任务,如果是Runnable 会使用 java.util.concurrent.Executors.callable(java.lang.Runnable, T) 转换为Callable类型。
接口 java.util.concurrent.ExecutorService 详解
该接口继承 java.util.concurrent.Executor 提供了termination方法和创建 Future 对象的方法用于跟踪执行结果
提供两个shutdown线程池的方法
1 shutdown
允许在 terminating 之前,之前提交的任务被执行完
2 shutdownnow
线程不执行同步任务获取任务,也不接受新提交任务,并且中断正在执行任务的线程
提供 Termination 相关的方法
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;当前shutdwon相关方法执行后,等待,直到超时|任务执行完成|当前线程被中断
当线程池 terminating 时,没有任务正在执行,也没有线程等待任务执行,并且不接受新任务提交
3 执行批量任务方法
invokeAny and invokeAll 批量执行任务使用,并等待一个或者所有任务执行完成。 Class ExecutorCompletionService can be used to write customized variants of these methods。这些方法返回Future
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
4 submit 方法将任务封装为 Future 对象,用于取消任务,或者等待线程执行完成,或者得到任务执行结果 ,适配了 Callable 和 Runnable 类型任务
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
抽象类 java.util.concurrent.AbstractExecutorService
提供了 接口 ExecutorService 默认实现
接口 java.util.concurrent.Executor 详解
单方法接口,提供
void execute(Runnable command);
将参数指定的任务提交给线程池,没返回值。
线程池的 Condition termination = mainLock.newCondition();
通过线程池的 awaitTermination 会在该条件对象上等待,在 tryTerminate 会调用signal唤醒等待在该对象上的线程
面试经典问题
问题:为啥 Object outcome 不是volatile的
Java 的 Happens-Before 原则保证了在状态转换操作(如从 COMPLETING 到 NORMAL)之后,对 outcome 的写入操作对后续读取操作是可见的。因此,即使 outcome 不是 volatile 的,也能保证在任务完成后,其他线程可以正确读取到 outcome 的值。
outcome 存储的是任务的最终结果,一旦任务完成,结果就不会再改变。因此,不需要像 volatile 变量那样在每次访问时都保证最新值的可见性,只要在任务完成后能正确读取到最终结果即可。
FutureTask存在的问题:
问题:FutureTask获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果。
问题:如果不通过get去挂起线程,通过while循环,不停的判断任务的执行状态是否结束,结束后,再拿结果。如果任务长时间没执行完毕,CPU会一直调度查看任务状态的方法,会浪费CPU资源。
FutureTask是一个同步非阻塞处理任务的方式。
需要一个异步非阻塞处理任务的方式。CompletableFuture在一定程度上就提供了各种异步非阻塞的处理方案,并且提供响应式编程,代码编写上,效果更佳