当前位置: 首页 > wzjs >正文

网站建设与管理答案免费自制app软件

网站建设与管理答案,免费自制app软件,wordpress输出标签文章,石景山保安公司简介 CompletableFuture:异步任务编排工具。java 8中引入的一个类,位于juc包下,是Future的增强版。它可以让用户更好地构建和组合异步任务,避免回调地狱。 在CompletableFuture中,如果用户没有指定执行异步任务时的线…

简介

CompletableFuture:异步任务编排工具。java 8中引入的一个类,位于juc包下,是Future的增强版。它可以让用户更好地构建和组合异步任务,避免回调地狱。

在CompletableFuture中,如果用户没有指定执行异步任务时的线程池,默认使用ForkJoinPool中的公共线程池。

使用案例

简单使用

几个入门案例,学习如何使用CompletableFuture提交异步任务并行接收返回值

提交异步任务 supplyAsync

提交一个异步任务,然后阻塞地获取它的返回值

@Test
public void test1() throws ExecutionException, InterruptedException {// 提交异步任务CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "task1";});String s = future.get();  // 阻塞地获取异步任务的执行结果System.out.println("s = " + s);  // [ForkJoinPool.commonPool-worker-9] task1
}

阻塞地等待异步任务完成 join

join方法和get方法,都是阻塞地等待异步任务执行完成,然后获取返回值,只不过对于异常的处理不同,推荐使用get方法来获取返回值。

@Test
public void test2() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "task2";});System.out.println(future.get());  // [ForkJoinPool.commonPool-worker-9] task2
}

消费异步任务的结果 thenAccept

thenAccept函数接收一个Consumer类型的实例作为参数,它会消费异步任务的执行结果,然后返回一个CompletableFuture<Void>,实际上没有必要处理它的返回值。

@Test
public void test8() throws InterruptedException {// 异步任务CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "task2";});// 使用thenAccept,提供一个回调函数,消费异步任务的结果future.join();future.thenAccept(System.out::println);  // [ForkJoinPool.commonPool-worker-9] task2
}

异步任务编排

到这里开始,开始涉及到异步任务编排,在一个异步任务之后启动另一个异步任务,或者在多个异步任务之后另一个异步任务。

具有依赖关系的异步任务,根据异步任务的依赖数量,可以把异步任务分为 零依赖、一元依赖、二元依赖和多元依赖。

一元依赖

thenApply

thenApply方法,接收一个Function作为参数,返回另一个future。一个异步任务依赖另一个异步任务

@Test
public void test3() {// 异步任务1CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "[" + (System.currentTimeMillis() / 1000) + "]" + "任务1";});// 异步任务2CompletableFuture<String> future1 = future.thenApply(s -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return s + "," + "[" + (System.currentTimeMillis() / 1000) + "]" +  "任务2";});future1.join();// 注意结果中异步任务1的时间和异步任务2的时间,说明异步任务2是在异步任务执行完之后触发的// [ForkJoinPool.commonPool-worker-1] [1732975152]任务1,[1732975155]任务2future1.thenAccept(System.out::println);
}

thenAccept方法接收一个Consumer,没有返回值,thenApply接收一个Function,有返回值

thenCompose

从语义上看,thenCompose方法接收一个异步任务作为参数,thenApply方法接收一个普通任务作为参数,选择 thenApply还是 thenCompose 取决于用户的需求,如果新任务是另一个异步任务,选择thenCompose,如果新任务只是消费上一个异步任务的结果,然后返回消费结果,选择thenApply。

@Test
public void test9() {// 异步任务1CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " +"[" + (System.currentTimeMillis() / 1000) + "]" + "任务1";});// 异步任务2CompletableFuture<String> future1 = future.thenCompose(s -> CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return s + "\n" +"[" + Thread.currentThread().getName() + "] " +"[" + (System.currentTimeMillis() / 1000) + "]" + "任务2";}));future1.join();//[ForkJoinPool.commonPool-worker-9] [1727663353]任务1//[ForkJoinPool.commonPool-worker-9] [1727663356]任务2future1.thenAccept(System.out::println);
}

二元依赖

thenCombine

融合当前异步任务(调用thenCombine的CF实例)和另一个异步任务(other)的结果,thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。

@Test
public void test9() {System.out.println("[" + Thread.currentThread().getName() + "] " +"[" + System.currentTimeMillis() / 1000 + "]" + "任务0");// 异步任务1CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " +"[" + System.currentTimeMillis() / 1000 + "]" + "任务1";});// 异步任务2CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " +"[" + System.currentTimeMillis() / 1000 + "]" + "任务2";});// 融合异步任务1、2的结果,生成任务3CompletableFuture<String> future3 = future.thenCombine(future2,(s1, s2) -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return s1 + "\n" + s2 + "\n" +"[" + Thread.currentThread().getName() + "] " +"[" + System.currentTimeMillis() / 1000 + "]" + "任务3";});future3.join();// 任务1暂停5秒、任务2暂停2秒、任务3暂停3秒,任务3依赖任务1和任务2,可以看到,// 任务1和任务2的执行是互不影响的,等到任务1、任务2都执行完成,任务3才执行// [main] [1734753154]任务0// [ForkJoinPool.commonPool-worker-1] [1734753159]任务1// [ForkJoinPool.commonPool-worker-2] [1734753156]任务2// [ForkJoinPool.commonPool-worker-1] [1734753162]任务3future3.thenAccept(System.out::println);
}

多元依赖

allOf

allOf方法:多个异步任务都完成,才执行下一个异步任务

// allOf 方法接受一个 CompletableFuture 的可变参数列表,并返回一个新的 CompletableFuture,
// 当所有给定的 CompletableFuture 都完成时,新的 CompletableFuture 才会完成。这个方法通常
// 用于等待多个异步任务都完成。
@Test
public void test5() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "[" + System.currentTimeMillis() / 1000 + "]" + "任务2";});// allOf方法,创建一个新的future,它会等到前面两个异步任务完成后再执行CompletableFuture<Void> future3 = CompletableFuture.allOf(future, future2);CompletableFuture<String> future4 = future3.thenApply(v -> {// 在allOf方法返回的异步任务中,它需要调用join方法,来获取之前异步任务的结果String s1 = future.join();String s2 = future2.join();try {Thread.sleep(4000);} catch (InterruptedException e) {throw new RuntimeException(e);}return s1 + "\n" + s2 + "\n" + "[" + Thread.currentThread().getName() + "] " + "[" + System.currentTimeMillis() / 1000 + "]" + "任务3";});future4.join(); // 阻塞当前线程future4.thenAccept(System.out::println);
}
anyOf

多个异步任务中只要有一个完成,就执行下一个异步任务

// anyOf方法
@Test
public void test6() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "[" + System.currentTimeMillis() / 1000 + "]" + "任务1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " + "[" + System.currentTimeMillis() / 1000 + "]" + "任务2";});CompletableFuture<Object> future3 = CompletableFuture.anyOf(future, future2);future3.join();// 任务2比任务1先结束,所以这里只获取到了任务2的结果// [ForkJoinPool.commonPool-worker-2] [1727662633]任务2future3.thenAccept(System.out::println);
}

异常处理

handle

handle 方法是一个双目方法,它接受一个 BiFunction 参数,该函数有两个参数:一个是异步任务的结果,另一个是异常对象。无论 CompletableFuture 正常完成还是异常完成,都会调用 handle 方法。

正常完成:

@Test
public void test7() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "[" + Thread.currentThread().getName() + "] " +"[" + System.currentTimeMillis() / 1000 + "]" + "任务1";});CompletableFuture<String> future2 = future.handle((s, e) -> {if (e != null) {return "Error: " + e.getMessage();}return s + " handle";});future2.join();// [ForkJoinPool.commonPool-worker-9] [1727662978]任务1 handlefuture2.thenAccept(System.out::println);
}

exceptionally

exceptionally 方法是一个单目方法,它接受一个 Function 参数,该函数只处理异常情况。当 CompletableFuture异常完成时,exceptionally 方法会被调用。

@Test
public void test8() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);int i = 10 / 0;} catch (InterruptedException e) {throw new RuntimeException(e);}return null;});CompletableFuture<String> future2 = future.exceptionally(e -> "Error: " + e.getMessage());future2.join();// Error: java.lang.ArithmeticException: / by zerofuture2.thenAccept(System.out::println);
}

API总结:

  • 零依赖:supplyAsync,直接提交

  • 一元依赖:thenApply、thenAccept、thenCompose

  • 二元依赖:thenCombine

  • 多元依赖:allOf、anyOf

使用时的注意事项

1、指定线程池:使用案例中的代码都没有指定线程池,但是实际使用过程中,最好指定线程池。

2、带有Async后缀的方法:CompletableFuture中提供的API,带有Async后缀的方法和普通方法,例如,thenApply和thenApplyAsync,thenApply使用和上一个任务一样的线程来执行异步任务,thenApplyAsync则使用新线程来执行异步任务,推荐使用带有Async后缀的方法。

源码分析

CompletableFuture使用一个栈来存储当前异步任务之后要触发的任务,栈使用的数据结构是单向链表。调用thenApply等方法编排异步任务时,实际上是向上一个任务的stack属性中注入当前任务,上一个任务结束后,会获取stack属性中的任务,然后继续执行,依次类推,直到stack属性为空

整体结构

CompletableFuture的继承体系

它实现了Future接口和CompletionStage接口:

  • Future代表异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法
  • CompletionStage代表异步计算的阶段,定义了在异步任务完成之后要触发的操作,这个操作可以是一个普通任务,也可以是一个异步任务

Future:

public interface Future<V> {// 取消异步任务boolean cancel(boolean mayInterruptIfRunning);// 判断异步任务是否取消boolean isCancelled();// 判断异步任务是否结束boolean isDone();// 获取异步任务的结果V get() throws InterruptedException, ExecutionException;// 获取异步任务的结果,并且指定超时时间V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

CompletionStage:代表异步计算的一个阶段,定义了在一个异步任务完成之后执行的操作,之前在入门案例中演示的thenApply方法、thenCompose方法,就是定义在这个接口中

public interface CompletionStage<T> {/* 一元依赖的异步任务 */// 创建一个新的异步任务,使用Function接口来消费异步任务的结果public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);// 创建一个新的异步任务,使用Consumer接口来消费异步任务的结果public CompletionStage<Void> thenAccept(Consumer<? super T> action);// 创建一个新的异步任务public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);// ... 
}

CompletionFuture中的属性

CompletionFuture中只有两个属性:result、stack,result存储当前异步任务的结果,stack存储下一个要被触发的异步任务,stack是一个单向链表

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {/* 下面两个成员变量都是通过cas算法来操作的,所有的方法都是在操作这两个变量,通过这两个变量实现了异步任务编排 */volatile Object result;  // 存储当前异步任务的结果// 存储当前异步任务结束后需要触发的后续操作,它是一个单向链表,// 相当于观察者模式中的观察者,当前的CompletableFuture实例就是被观察者(主题)volatile Completion stack; // 静态内部类,要执行的操作abstract static class Completion extends ForkJoinTask<Void>// AsynchronousCompletionTask,没有方法的接口,用于debug时标识一个异步任务implements Runnable, AsynchronousCompletionTask {volatile Completion next;      // 单向链表,指向下一个异步任务// 尝试触发异步任务abstract CompletableFuture<?> tryFire(int mode);}
}

stack属性的数据结构 Completion

Completion是CompletableFuture的静态内部类,可以把它理解为是异步任务要执行的操作,它存储了:

  • 操作,也就是用户要异步执行的任务

  • 操作对应的CompletableFuture实例

  • 操作依赖的CompletableFuture实例

  • 操作完成后需要触发的CompletableFuture实例

Completion比较复杂的地方在于它有多个子类,每个子类都有不同的功能,在前面使用案例中演示的每个API,它的内部都使用了不同的子类,代表不同的操作,所以这里介绍一下Completion和它的继承体系

Completion:顶层类,只定义了指向下一个异步任务的属性 next

// Completion,静态内部类,在继承体系中位于顶层
abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;      // 下一个异步任务
}// 一个没有方法的接口,用于标识某个被async方法创建的实例是异步任务,用于debug、监控
public static interface AsynchronousCompletionTask { }

有一个依赖的异步任务:UniCompletion、UniApply,UniCompletion是基类,存储了单个依赖的共有属性,UniApply是具体实现,是thenApply方法对应的内部类。

abstract static class UniCompletion<T,V> extends Completion {Executor executor;                 // 线程池// 当前异步任务对应的CompletableFuture实例CompletableFuture<V> dep;// 当前任务的依赖。具体而言,就是只有在src对应的异步任务执行完成后,才可以执行dep对应的异步任务,也就是当前任务,// 执行异步任务的逻辑中会判断前一个异步任务是否完成,并且获取它的结果。CompletableFuture<T> src;UniCompletion(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src) {this.executor = executor; this.dep = dep; this.src = src;}// 确保异步任务只执行一次,它会通过cas算法修改状态变量,final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { // 这个方法继承自ForkJoinTaskif (e == null)return true;executor = null; // disablee.execute(this);}return false;}// 判断当前异步任务是否还存活:dep不为null,就是还存活final boolean isLive() { return dep != null; }
}// UniApply,UniCompletion的子类,定义了异步任务的执行逻辑,thenApply方法对应的内部类,
static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn;  // 要异步执行的任务
}

有两个依赖的异步任务:BiCompletion、BiApply,类似于上面提到的,BiCompletion是基类,存储了共有属性,BiApply是具体实现

// BiCompletion
abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {CompletableFuture<U> snd; // 异步任务的第二个依赖
}// BiApply
static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {BiFunction<? super T,? super U,? extends V> fn;  // 要异步执行的任务
}

AsyncSupply:不属于Completion的继承体系,存储没有依赖的单个异步任务

static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep;  // 异步任务对应的CompletableFuture实例Supplier<T> fn;            // 异步任务要执行的计算
}

总结

在介绍CompletableFuture的工作机制之前,先总结一下它的数据结构:

CompletableFuture:代表一个异步任务,它存储了这个异步任务的执行结果、执行完之后要触发的下一个异步任务,同时提供了面向用户的API

  • 它的属性 Object result、Completion stack

Completion:存储了一个异步任务执行过程中需要用到的全部信息,包括它的依赖、它要触发的下一个异步任务、异步任务要执行的计算、异步任务对应的CompletableFuture实例

  • 它的属性:

    • Function function:异步任务要进行的计算

    • CompletableFuture dep:异步任务对应的CompletableFuture实例

    • CompletableFuture src:异步任务的第一个依赖

    • CompletableFuture snd:异步任务的第二个依赖,// 一元依赖的异步任务中没有这个属性

    • CompletableFuture next:要被触发的下一个异步任务

可以看到,CompletableFuture和Completion互相持有对方的实例。

在了解了CompletableFuture的基本结构之后,接下来的问题是:

  • 两个有依赖关系的异步任务是如何被编排在一起的?
  • 上一个异步任务结束之后如何触发下一个异步任务?

哪个线程执行异步任务?

先了解一下异步任务在哪个线程执行。

CompletableFuture使用一个线程池来执行异步任务。用户提交异步任务后,如果没有指定线程池,那么使用默认线程池。如果当前环境下可用cpu核心数大于1,使用的线程池是ForkJoinPool,否则每次提交任务它都会创建一个新的线程(ThreadPerTaskExecutor)

确认使用哪个线程池的源码:

  • ForkJoinPool中公共线程池的并行度默认是CPU个数减1,用户也可以通过jvm参数指定
  • 如果公共线程池的并行度小于等于0,使用单线程线程池
// 判断使用哪个线程池
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();/* 一、ForkJoinPool中判断当前公共线程池的并行度: */
// 1. 首先从环境变量中获取一个指定的属性 parallelism
String pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
int parallelism = -1;
if (pp != null) {parallelism = Integer.parseInt(pp);
}
// 2. 如果属性为空,获取当前环境下的CPU数个数,如果小于等于1个,那么公共线程池的并行度就是1,否则就是CPU数减1
if (parallelism < 0 && // default 1 less than #cores(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) {parallelism = 1;
}// 二、单线程线程池,它的实现特别简单,为每个任务创建一个新的线程
static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) { new Thread(r).start(); }
}

工作机制

单个异步任务的执行

提交异步任务 supplyAsync方法

整体流程:

// 整体流程:这里省略了前面的方法调用
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();// 第一步:创建CompletableFuture实例,向线程池提交任务CompletableFuture<U> d = new CompletableFuture<U>();// 第二步:线程池执行异步任务,异步任务编排就实现在AsyncSupply中,这里专门针对supplyAsync的内部类e.execute(new AsyncSupply<U>(d, f));// 第三步:返回CompletableFuture实例return d;
}

整体流程中的第二步:执行异步任务

// 要点1:AsyncSupply,用户调用supplyAsync方法时执行的内部类,异步任务编排的核心流程
static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep;   // 当前CompletableFuture的实例Supplier<T> fn;             // 需要在线程池中执行的任务// 参数1,当前CompletableFuture的实例,用于存储异步任务的返回值并且触发下一个异步任务// 参数2,需要在线程池中执行的任务AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}// 线程启动后会执行当前实例的run方法,因为它继承了Runnable接口public void run() {CompletableFuture<T> d; Supplier<T> f;if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;if (d.result == null) {try {// 这里做了两个事情:// 1. 执行异步任务,也就是Supplier接口中提供的函数// 2. 将异步任务的返回值设置到当前CompletableFuture实例中d.completeValue(f.get());} catch (Throwable ex) {// 如果出现异常,设置异常返回值d.completeThrowable(ex);}}// 触发下一个异步任务d.postComplete();}}
}

可以看到,AsyncSupply类继承了Runnable接口,持有当前CompletableFuture实例和要异步执行的任务,在run方法中,执行任务,然后调用CompletableFuture实例中的completeValue方法,设置返回值。

返回值如何被设置的?通过cas操作,将任务的结果设置到当前CompletableFuture实例的result属性中,如果任务正常结束,正常设置结果,如果任务抛出异常,把异常封装到一个Exception中

// 这是上一步中调用的// 分支一:设置返回值
final boolean completeValue(T t) {// 通过cas操作来更新当前CompletableFuture实例中的resultreturn UNSAFE.compareAndSwapObject(this, RESULT, null,(t == null) ? NIL : t);
}// 分支二:如果出现异常,设置异常返回值
final boolean completeThrowable(Throwable x) {// 也是通过cas操作,更新当前CompletableFuture实例中的result,只不过返回值是被包装到了AltResult中return UNSAFE.compareAndSwapObject(this, RESULT, null,encodeThrowable(x));
}
static AltResult encodeThrowable(Throwable x) {return new AltResult((x instanceof CompletionException) ? x :new CompletionException(x));
}
获取异步任务的返回值 get

如何获取异步任务的返回值?调用get方法或join方法,它们都会获取返回值。

整体流程:

// get方法
public T get() throws InterruptedException, ExecutionException {Object r;// 如果当前result实例等于null,调用waitingGet方法,阻塞地获取返回值,否则调用reportGet方法,// 上报返回值return reportGet((r = result) == null ? waitingGet(true) : r);
}// waitingGet方法:阻塞地获取返回值,将返回值写到result变量中
private Object waitingGet(boolean interruptible) {// 信号器,负责线程的阻塞和唤醒Signaller q = null;// 当前线程是否入队boolean queued = false;// 旋转次数int spins = -1;Object r;// 判断:result实例是否等于null,如果等于null,需要阻塞地获取返回值while ((r = result) == null) {// 先自旋:spins,它是自旋,自旋次数是CPU个数的8次方以上,因为自旋次数减减的// 之前,会生成一个随机数,生成的随机数大于等于0,自旋次数才会减1。如果自旋结束之后// 还没有得到结果,会进入阻塞队列。if (spins < 0)spins = SPINS;   // SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 1 << 8 : 0) 256次else if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;}else if (q == null)// 创建一个信号器,用于阻塞和唤醒线程q = new Signaller(interruptible, 0L, 0L);else if (!queued)// 如果信号器还没有入队,将信号器设置到当前CompletableFuture实例的stack属性中queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {// 如果当前线程被打断,放弃阻塞,清理stack变量,返回nullq.thread = null;cleanStack();return null;}// 线程进入阻塞,底层调用LockSupport的park方法,等待异步任务执行完成之后唤醒当前线程else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}// 如果信号器不等于nullif (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}}// 触发后续任务postComplete();// 返回异步任务的结果,result的变量是由其它线程设置的,当前线程只需要阻塞地获取它。return r;
}

1、信号器入栈的方法:

// 将信号器设置到CompletableFuture的stack属性中
final boolean tryPushStack(Completion c) { // c是信号器实例Completion h = stack;  // stack,当前CompletableFuture实例的stack属性/* lazySetNext方法加下面的cas操作,相当于将信号器入栈,信号器链接到头结点并且成为新的头结点 */lazySetNext(c, h);return UNSAFE.compareAndSwapObject(this, STACK, h, c); 
}static void lazySetNext(Completion c, Completion next) {UNSAFE.putOrderedObject(c, NEXT, next);  // Completion的next属性指向下一个Completion,它们之间构成单向链表
}

2、阻塞当前线程的方法:

public static void managedBlock(ManagedBlocker blocker) throws InterruptedException {ForkJoinPool p;ForkJoinWorkerThread wt;Thread t = Thread.currentThread();if ((t instanceof ForkJoinWorkerThread) &&// 省略代码}else {// 阻塞当前线程。isReleasebale判断阻塞是否被释放,如果没有,继续进入阻塞状态do {} while (!blocker.isReleasable() &&!blocker.block());}
}// 阻塞当前线程,线程唤醒后,判断阻塞是否可以释放
public boolean block() {if (isReleasable())return true;else if (deadline == 0L)LockSupport.park(this);else if (nanos > 0L)LockSupport.parkNanos(this, nanos);return isReleasable();
}// 阻塞是否可以释放:信号器中的线程实例为null、或者线程被打断、或者超过指定时间,都算阻塞结束
public boolean isReleasable() {// 信号器中的线程实例为nullif (thread == null)return true;// 当前线程被打断if (Thread.interrupted()) {int i = interruptControl;interruptControl = -1;if (i > 0)return true;}// 过了超时时间if (deadline != 0L &&(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {thread = null;return true;}return false;
}

3、上报异步任务的返回值:

// reportGet方法,处理异步任务的返回值
private static <T> T reportGet(Object r)throws InterruptedException, ExecutionException {// 如果返回值为空if (r == null) // by convention below, null means interruptedthrow new InterruptedException();// 如果返回一个异常实例if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw (CancellationException)x;if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw new ExecutionException(x);}// 如果返回一个正常结果,将结果进行泛型转换,返回给调用者@SuppressWarnings("unchecked") T t = (T) r;return t;
}

获取返回值的时候,get方法中,会先判断有没有返回值,如果没有,自旋,自旋次数是CPU个数的8次方,如果还没有返回值,进入阻塞队列。

4、线程阻塞后在什么地方被唤醒?在postComplete方法中

// 参考异步任务的执行过程,执行完之后,会调用CompletableFuture的postComplete方法,
// 触发STACK属性中存储的下一个任务
final void postComplete() {CompletableFuture<?> f = this; Completion h;while ((h = f.stack) != null ||  // 判断stack属性中是否有下一个任务(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (f.casStack(h, t = h.next)) {  // 下一个任务出栈,现在STACK属性中存储下下一个任务if (t != null) {  // 下下一个任务不为nullif (f != this) {pushStack(h);continue;}h.next = null;    // detach}f = (d = h.tryFire(NESTED)) == null ? this : d;  // 触发下一个任务的执行}}
}// 下一个任务出栈的方法
final boolean casStack(Completion cmp, Completion val) {return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
}// 下一个任务的执行:在这里下一个任务是信号器 Signaller
final CompletableFuture<?> tryFire(int ignore) {Thread w; // no need to atomically claimif ((w = thread) != null) {thread = null;LockSupport.unpark(w);  // 它会唤醒信号器中存储的线程}return null;
}

一元依赖

这里以thenApply为例。

整体流程:

private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d =  new CompletableFuture<V>();// 第一步:执行异步任务,如果当前任务可以直接执行的话,就直接执行if (e != null || !d.uniApply(this, f, null)) {  // 参数this是上一个异步任务// 第二步:当前任务无法直接执行,创建UniApply实例UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);// 第三步:将当前实例推入栈中,表示上一个异步任务执行完成后触发push(c);  // this是上一个异步任务,所以push方法是由上一个异步任务的实例调用的// 尝试触发异步任务c.tryFire(SYNC);}return d;
}

1、异步任务的执行逻辑

// uniApply方法:执行异步任务,参数a是上一个异步任务,需要判断上一个异步任务有没有执行完成并且获取它的执行结果
final <S> boolean uniApply(CompletableFuture<S> a,Function<? super S,? extends T> f,UniApply<S,T> c) {  // 参数c是当前异步任务实例Object r; Throwable x;// 如果上一个异步任务没有结果,返回falseif (a == null || (r = a.result) == null || f == null)return false;// 1、判断当前异步任务是否可以执行:如果当前异步任务没有结果tryComplete: if (result == null) {// 如果上一个异步任务是异常结束的if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {completeThrowable(x, r);break tryComplete;  // 退出循环,不执行当前异步任务}r = null;}try {// 2、确保异步任务只会被执行一次if (c != null && !c.claim())return false;// 3、执行异步任务,并且上报结果@SuppressWarnings("unchecked") S s = (S) r;completeValue(f.apply(s));} catch (Throwable ex) {completeThrowable(ex);}}return true;
}

2、如果异步任务不可以直接执行,需要把异步任务push到上一个异步任务的STACK属性中

// 将新的异步任务推入栈中,参数c是当前异步任务
final void push(UniCompletion<?,?> c) {if (c != null) {// 如果当前异步任务没有结束,将异步任务推入栈中,这里是推入栈顶while (result == null && !tryPushStack(c))// 失败时清除,将新异步任务的next值设为nulllazySetNext(c, null); // clear on failure}
}

3、再次尝试触发异步任务

// 尝试触发异步任务
final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d; CompletableFuture<T> a;if ((d = dep) == null ||// 执行异步任务!d.uniApply(a = src, fn, mode > 0 ? null : this))return null;// 执行成功后,属性置空dep = null; src = null; fn = null;// 如果当前异步任务执行成功,触发后续任务return d.postFire(a, mode);
}

4、触发下一个异步任务

// 触发下一个异步任务,这里会触发前一个异步任务的后序任务和当前异步任务的后序任务
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {// 会判断前一个异步任务和当前异步任务有没有下一个任务。// a是前一个任务if (a != null && a.stack != null) {if (a.result == null)a.cleanStack();else if (mode >= 0)a.postComplete();}// this是当前任务if (result != null && stack != null) {if (mode < 0)return this;elsepostComplete();}return null;
}// postComplete
final void postComplete() {CompletableFuture<?> f = this; Completion h;// 如果当前异步任务有下一个任务while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;// 使用下下一个异步任务,代替下一个任务,赋值给stack变量if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}h.next = null;    // detach}// 执行下一个异步任务,然后接收它的返回值f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}
如何确保异步任务只执行一次

claim方法

final boolean claim() {Executor e = executor;// 这个方法来自ForkJoinTask,用cas算法来操作任务实例中的status字段,保证任务只会被执行一次if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// 如果当前异步任务没有可用的线程池,返回true,由当前线程执行if (e == null)return true;// 把异步任务提交给线程池执行executor = null; // disablee.execute(this);}return false;
}

二元依赖

这里以thenCombine方法为例

整体流程:

private <U,V> CompletableFuture<V> biApplyStage(Executor e, CompletionStage<U> o,BiFunction<? super T,? super U,? extends V> f) {CompletableFuture<U> b;if (f == null || (b = o.toCompletableFuture()) == null)throw new NullPointerException();CompletableFuture<V> d = new CompletableFuture<V>();// biApply方法中的参数this、b,是当前异步任务依赖的前两个异步任务if (e != null || !d.biApply(this, b, f, null)) {BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);bipush(b, c);  // this和b是依赖,c是要被触发的异步任务c.tryFire(SYNC);}return d;
}

执行逻辑:

final <R,S> boolean biApply(CompletableFuture<R> a,CompletableFuture<S> b,BiFunction<? super R,? super S,? extends T> f,BiApply<R,S,T> c) {Object r, s; Throwable x;// 关键逻辑,判断当前异步任务所依赖的两个异步任务是否完成,如果没有,退出if (a == null || (r = a.result) == null ||b == null || (s = b.result) == null || f == null)return false;tryComplete: if (result == null) {if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {completeThrowable(x, r);break tryComplete;}r = null;}if (s instanceof AltResult) {if ((x = ((AltResult)s).ex) != null) {completeThrowable(x, s);break tryComplete;}s = null;}try {if (c != null && !c.claim())return false;@SuppressWarnings("unchecked") R rr = (R) r;@SuppressWarnings("unchecked") S ss = (S) s;completeValue(f.apply(rr, ss));} catch (Throwable ex) {completeThrowable(ex);}}return true;
}

总结

CompletableFuture存储了计算逻辑,Completion存储了计算过程中需要用到的数据。

两个有依赖关系的异步任务是如何被编排在一起的?上一个异步任务结束之后如何触发下一个异步任务?通过CompletableFuture的stack属性,当前异步任务执行完之后,会获取stack属性中的值,这个值就是下一个需要计算的异步任务

参考:
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html
https://www.cnblogs.com/Createsequence/p/16963895.html
https://javaguide.cn/java/concurrent/completablefuture-intro.html

http://www.dtcms.com/wzjs/209641.html

相关文章:

  • 网页公司制作网站优化培训学校
  • h5网站制作接单百度百科分类方法
  • 做网站是靠什么赚钱外链系统
  • 网站策划的内容包含了什么?百度极速版app下载安装
  • 京东的网站是哪家公司做的销售培训
  • 电商网站开发意义网络推广协议合同范本
  • 无锡网站设计制作西安seo计费管理
  • 学校网站建设和维护情况高端网站建设报价
  • 专业的网站建设排名深圳百度推广优化
  • 邯郸市网站建设搜索引擎营销优缺点
  • 郴州做网站怎么做一个网站页面
  • 游戏网站建设友情链接怎么互换
  • 番禺网站开发公司电话百度今日小说搜索风云榜
  • 网站是用什么技术做的建网站的详细步骤
  • 普通人怎么样做网站google收录提交入口
  • 想接外包做网站网站的优化和推广方案
  • 互联网营销与推广seo专业培训机构
  • 同城购物网站怎么做seo关键词排名软件流量词
  • github托管wordpress沈阳seo公司
  • 群辉做网站服务器配置搜索关键词优化排名
  • 书法网站模版网络推广营销方案免费
  • 在哪个网站可以做行测题徐州seo顾问
  • 哈尔滨网站制作公司有哪些脱发严重是什么原因引起的
  • 网站首页原型图怎么做湖南seo优化价格
  • c 网站做微信收款功能百度推广代理商与总公司的区别
  • 电商网站建设相关书籍推荐移动广告平台
  • 如何做ps4游戏视频网站黑科技引流推广神器免费
  • 嘉定网站公司站长工具之家
  • 奇趣网做网站苏州seo怎么做
  • 网站做充值和提现深圳网络推广培训机构