【CompletableFuture】CompletionStage、创建子任务、设置的子任务回调钩子(二)
这里写自定义目录标题
- 1. 为什么用CompletableFuture
- 2. CompletionStage
- 3. 核心四个静态方法
- 3.1 使用 runAsync 和 supplyAsync 创建子任务
- 4. 设置的子任务回调钩子
- 4.1 whenComplete
- 4.2 whenCompleteAsync
- 4.3 whenCompleteAsync
- 4.4 exceptionally(Function<Throwable, ? extends T> fn)
- 4.5 总结对比
1. 为什么用CompletableFuture
get()方法在Future计算完成之前会一直处在阻塞状态下,isDone()方法容易耗费CPU资源,对于真正的一步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样我们就不需要等待结果。
阻塞的方式和异步编程的设计理念相维度,而轮询的方式会耗费无谓的CPU资源,因此JDK8推出了CompletableFuture。
CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成之后通知监听的一方。
2. CompletionStage
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另外一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个 Java 函数式接口实例,表示该子任务所要执行的动作。
每个 CompletionStage 子任务所包装的可以是一个 Function、Consumer 或者 Runnable 函数式
接口实例。这三个常用的函数式接口的特点为:
(1)Function
Function 接口的唯一方法点是:有输入、有输出。包装了 Funtion 实例的 CompletionStage 子
任务需要一个输入参数,并会产生一个输出结果到下一步。
(2)Runnable
Runnable 接口的唯一方法点是:无输入、无输出。包装了 Runnable 实例的 CompletionStage
子任务既不需要任何输入参数,也不会产生任何输出。
(3)Consumer
Consumer 接口的唯一方法点是:有输入、无输出。包装了 Consumer 实例的 CompletionStage
子任务需要一个输入参数,但不会产生任何输出。
多个 CompletionStage 构成了一条任务流水线,一个环节执行完成了将结果可以移交给下一
个环节(子任务)。多个 CompletionStage 子任务之间可以使用链式调用,下面是一个简单的例子:
oneStage.thenApply(x -> square(x)) .thenAccept(y -> System.out.println(y)).thenRun(() -> System.out.println())
对以上例子中的 CompletionStage 子任务说明如下:
(1)oneStage 是一个 CompletionStage 子任务,这是一个前提。
(2)x -> square(x)
是一个 Function 类型的 Lamda 表达式,被 thenApply 方法包装成了一
个 CompletionStage 子任务,该子任务需要接收一个参数 x,然后会输出一个结果x 的平方值。
(3)y -> System.out.println(y)
是一个 Comsumer 类型的 Lamda 表达式,被 thenAccept 方
法包装成了一个 CompletionStage 子任务,该子任务需要消耗上一个 Stage(子任务)的输出值,
但是此 Stage 并没有输出。
(4)() -> System.out.println()
是一个 Runnable 类型的 Lamda 表达式,被 thenRun 方法
包装成了一个 CompletionStage 子任务,既不消耗上一个 Stage 的输出,也不产生结果。
CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶
段。虽然一个 Stage 可以触发其他 Stage,但是并不能保证后续 Stage 的执行顺序。
3. 核心四个静态方法
3.1 使用 runAsync 和 supplyAsync 创建子任务
CompletionStage 子任务的创建是通过 CompletableFuture 完成的。CompletableFuture 类提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletionStage 的方法。
CompletableFuture 定义了一组方法用于创建 CompletionStage 子任务(或者阶段性任务),基
础的方法如下:
// 子任务包装一个 Runnable 实例,并使用 ForkJoinPool.commonPool()线程池去执行
public static CompletableFuture<Void> runAsync(Runnable runnable)// 子任务包装一个 Runnable 实例,并使用指定的 executor 线程池去执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)// 子任务包装一个 Supplier 实例,并使用 ForkJoinPool.commonPool()线程池去执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)// 子任务包装一个 Supplier 实例,并使用指定的 executor 线程池去执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
4. 设置的子任务回调钩子
可以为CompletionStage子任务设置特定的回调钩子,当的计算结果完成,或者抛出异常的时候,可以执行这些特定的回调钩子。
// 设置的子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T,?super Throwable> action)// 设置的子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)// 设置的子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor)// 设置的异常处理的回调钩子
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
4.1 whenComplete
当任务完成时(成功或失败都会回调),在当前线程或执行任务的线程上执行回调。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 10 / 2;
});future.whenComplete((result, throwable) -> {if (throwable != null) {System.out.println("任务发生异常:" + throwable.getMessage());} else {System.out.println("任务成功,结果是:" + result);}
});
4.2 whenCompleteAsync
同上,不过回调会在默认的 ForkJoinPool.commonPool() 线程池中异步执行。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 10 / 2;
});future.whenCompleteAsync((result, throwable) -> {if (throwable != null) {System.out.println("异步任务发生异常:" + throwable.getMessage());} else {System.out.println("异步任务成功,结果是:" + result);}
});
4.3 whenCompleteAsync
同样异步执行回调,但你可以指定一个线程池。
ExecutorService executor = Executors.newFixedThreadPool(2);CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 10 / 2;
});future.whenCompleteAsync((result, throwable) -> {if (throwable != null) {System.out.println("自定义线程池处理异常:" + throwable.getMessage());} else {System.out.println("自定义线程池处理结果:" + result);}
}, executor);executor.shutdown();
4.4 exceptionally(Function<Throwable, ? extends T> fn)
仅当任务发生异常时回调。你可以返回一个替代结果。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {return 10 / 0; // 会抛出 ArithmeticException
});future.exceptionally(ex -> {System.out.println("发生异常:" + ex.getMessage());return -1; // 返回默认值}).thenAccept(result -> {System.out.println("处理后的结果是:" + result);});
4.5 总结对比
如果你需要:
- 只处理异常:用 exceptionally
- 无论成功失败都要执行某些逻辑:用 whenComplete
- 且要保证不阻塞主线程:用 whenCompleteAsync 并指定线程池更灵活
如需结合使用多个钩子,也可以链式调用,例如:
CompletableFuture.supplyAsync(() -> 10 / 0).whenComplete((res, ex) -> System.out.println("完成!")).exceptionally(ex -> {System.out.println("捕获异常:" + ex.getMessage());return 0;});