当前位置: 首页 > news >正文

FutureTask 源码解析

摘要

介绍异步编程task封装类FutureTask源码

1 类接口

提供了可取消的异步计算功能,可以任务结果获取,状态查询,任务取消功能。等待任务结束的线程会阻塞等待在该对象上,被封装为  WaitNode 对象,阻塞等待。
实现接口 java.util.concurrent.RunnableFuture ,该 接口为函数式接口,定义了一个方法 void run(); 并且该接口继承了 Runnable, Future<V> ;
Runnable 代表线程执行的任务,所以一个 FutureTask 可以提交给一个 Executor 来执行。
Future 提供了任务结果获取,状态查询,任务取消功能

2 成员变量及其对应的数

状态机
		* Possible state transitions:
		* NEW= 0; -> COMPLETING = 1; -> NORMAL = 2;
		* NEW= 0; -> COMPLETING = 1; -> EXCEPTIONAL = 3;
		* NEW= 0; -> CANCELLED= 4;
		* NEW= 0; -> INTERRUPTING = 5; -> INTERRUPTED  = 6;
初始化状态为 NEW ,只有  set, setException, and cancel 可以将状态转换为 terminal
COMPLETING 计算完成正在将结构set给outcome时的状态,包括任务执行正常结束或者异常结束
INTERRUPTING  (在方法 cancel(true) 时状态会转换为 INTERRUPTING,中断当前线程后状态修改为 INTERRUPTED  如果参数为false 状态转换为 CANCELLED)
从这些中间状态转变为最终状态时,使用了开销较低的有序 / 延迟写入操作,因为这些状态值是唯一的,并且不能再被进一步修改。
NORMAL 代表任务正常结束,没有异常
outcome 任务计算结果放在该对象中,包括异常也会放在该结果中

作用-> 接口暴露的方法源码

java.util.concurrent.FutureTask#run
	a. 状态校验如果为NEW并且CAS更新runner为当前线程成功,进入后续方法执行
	b. doubleCheck 如果状态不为NEW不执行任务,存在这样的情况,线程A 线程B通过 state != NEW 的判断,线程A完成CAS,并且执行完当前run方法 此时 runner被重新更新为null,这个时候线程B才开始执行CAS,并且成功,所以重复校验了下state的状态 如果不为null代表已其他线程执行了当前任务
	c. 直接执行 call() 方法,如果正常结束,
		CAS更新状态为 COMPLETING 将结果赋值为 outcome ,如果CAS更新状态失败,不执行后续逻辑,直接返回
		赋值完后使用 putOrderedInt 方式更新状态为 NORMAL  唤醒waiters上阻塞获取结果的线程,唤醒完成后调用钩子方法 done();
	d.  如果不是正常结束 
		CAS更新状态为 COMPLETING 将异常赋值为 outcome ,如果CAS更新状态失败,不执行后续逻辑,直接返回
		赋值完后使用 putOrderedInt 更新状态为 EXCEPTIONAL  唤醒waiters上阻塞获取结果的线程,唤醒完成后调用钩子方法 done();
	e.  finally 代码块
		重新获取状态,如果状态 >= INTERRUPTING ,循环判断 state== INTERRUPTING 调用yield 等待状态被更新为最终 INTERRUPTED 状态.因为  INTERRUPTED 为最终状态,使用  sun.misc.Unsafe#putOrderedInt 更新。这个方法不保证更新后的值对其他线程立即可见。所以需要在这里等待下 
		除此以外 NORMAL EXCEPTIONAL 状态更新也是相同的方法。因为是最终状态,所以不保证立即可见性


java.util.concurrent.FutureTask#get()
	如果状态  <= COMPLETING 阻塞等待,调用 java.util.concurrent.FutureTask#awaitDone 获取任务执行结果。获取过程根据执行状态进行对应处理
	a 如果状态 > COMPLETING 代表任务执行完成 返回状态
	b 如果状态 == COMPLETING 当前线程 yield 进入下一次循环
	c 否则当前任务正在执行,创建 WaitNode ,在下一次循环中通过CAS方式追加到 waiters  后面
	d 如果有超时时间,定时阻塞,否则一直阻塞
	e 上面步骤可能失败,所以放在for循环里面重试,重试过程中会校验当前线程的中断位,并且响应中断抛出异常
	根据 awaitDone 方法返回的当前任务状态,在 java.util.concurrent.FutureTask#report 中将来异常|执行结果 赋值给outcome 然后返回。
java.util.concurrent.FutureTask#cancel
	a 状态校验,只有状态为NEW才允许取消,根据传递参数CAS更新state为INTERRUPTING : CANCELLED ,如果状态不正确或者CAS失败返回false 
	b 如果参数 mayInterruptIfRunning == true 中断当前线程 CAS更新状态为 INTERRUPTED
	c  唤醒waiters上阻塞获取结果的线程,唤醒完成后调用钩子方法 done();

构造方法接受 Runnable 和 Callable 类型的任务,如果是Runnable 会使用 java.util.concurrent.Executors.callable(java.lang.Runnable, T) 转换为Callable类型。

接口 java.util.concurrent.ExecutorService 详解
该接口继承 java.util.concurrent.Executor 提供了termination方法和创建 Future 对象的方法用于跟踪执行结果
提供两个shutdown线程池的方法
1 shutdown 
	允许在 terminating 之前,之前提交的任务被执行完
2 shutdownnow
	线程不执行同步任务获取任务,也不接受新提交任务,并且中断正在执行任务的线程
提供 Termination 相关的方法
	boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;当前shutdwon相关方法执行后,等待,直到超时|任务执行完成|当前线程被中断
	当线程池 terminating 时,没有任务正在执行,也没有线程等待任务执行,并且不接受新任务提交

3 执行批量任务方法
invokeAny and invokeAll  批量执行任务使用,并等待一个或者所有任务执行完成。 Class ExecutorCompletionService can be used to write customized variants of these methods。这些方法返回Future
	<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;	
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
	
4 submit 方法将任务封装为 Future 对象,用于取消任务,或者等待线程执行完成,或者得到任务执行结果	,适配了 Callable 和 Runnable 类型任务
	<T> Future<T> submit(Callable<T> task); 
	<T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

抽象类 java.util.concurrent.AbstractExecutorService
提供了 接口  ExecutorService 默认实现
	
接口 java.util.concurrent.Executor 详解
单方法接口,提供
void execute(Runnable command);
将参数指定的任务提交给线程池,没返回值。

线程池的 Condition termination = mainLock.newCondition(); 
通过线程池的 awaitTermination 会在该条件对象上等待,在	tryTerminate 会调用signal唤醒等待在该对象上的线程

面试经典问题

问题:为啥 Object outcome   不是volatile的
	Java 的 Happens-Before 原则保证了在状态转换操作(如从 COMPLETING 到 NORMAL)之后,对 outcome 的写入操作对后续读取操作是可见的。因此,即使 outcome 不是 volatile 的,也能保证在任务完成后,其他线程可以正确读取到 outcome 的值。
outcome 存储的是任务的最终结果,一旦任务完成,结果就不会再改变。因此,不需要像 volatile 变量那样在每次访问时都保证最新值的可见性,只要在任务完成后能正确读取到最终结果即可。
FutureTask存在的问题:
	问题:FutureTask获取线程执行的结果前,主线程需要通过get方法一直阻塞等待子线程执行完call方法,才可以拿到返回结果。
	问题:如果不通过get去挂起线程,通过while循环,不停的判断任务的执行状态是否结束,结束后,再拿结果。如果任务长时间没执行完毕,CPU会一直调度查看任务状态的方法,会浪费CPU资源。
FutureTask是一个同步非阻塞处理任务的方式。
需要一个异步非阻塞处理任务的方式。CompletableFuture在一定程度上就提供了各种异步非阻塞的处理方案,并且提供响应式编程,代码编写上,效果更佳

相关文章:

  • 深入剖析 Axios 的 POST 请求:何时使用 qs 处理数据
  • 0基础 | 硬件滤波 C、RC、LC、π型
  • 基于Springboot+Mysql的闲一品(含LW+PPT+源码+系统演示视频+安装说明)
  • [16届蓝桥杯 2025 c++省 B] 水质检测
  • Axure疑难杂症:详解横向菜单左右拖动效果及阈值说明
  • 在Fortran程序中嵌入Lua解释器
  • windows下Git安装及其IDEA配置
  • Spring - 13 ( 11000 字 Spring 入门级教程 )
  • Linux : 多线程互斥
  • 智享 AI直播3.0时代:无人智能系统如何颠覆传统拓客模式?‌‌
  • 计算机组成原理-指令系统
  • 集合框架二三事
  • 供应链业务-供应链全局观(三)- 供应链三流的集成
  • Transformer模型中的两种掩码
  • RK3588上Linux系统编译C/C++ Demo时出现BUG:The C/CXX compiler identification is unknown
  • 双向链表专题(C语言)
  • RK3576 GPIO 配置与使用
  • 【Docker】离线安装Docker
  • 【土堆 PyTorch 教程总结】PyTorch入门
  • 【频域分析】功率谱
  • 媒体评欧阳娜娜遭民进党当局威胁:艺人表达国家认同是民族大义
  • 娃哈哈:自4月起已终止与今麦郎的委托代工关系,未来将坚持自有生产模式
  • 公元1058年:柳永词为什么时好时坏?
  • “老中青少”四代同堂,季春艳携锡剧《玲珑女》冲击梅花奖
  • 第1现场 | 美国称将取消制裁,对叙利亚意味着什么
  • 习近平举行仪式欢迎巴西总统卢拉访华