CompletableFuture 深度解析
本文将探讨 Java 8 引入的
CompletableFuture
,一个在异步编程中实现非阻塞、可组合操作的强大工具。我们将从CompletableFuture
的基本概念、与传统Future
的区别、核心 API 用法,到复杂的链式调用、组合操作以及异常处理进行全面解析,并通过丰富的代码示例,帮助 Java 后端开发者更好地理解和应用CompletableFuture
,提升系统性能和响应能力。
1. 为什么需要 CompletableFuture
在现代后端开发中,高并发和低延迟是衡量系统性能的重要指标。传统的同步编程模型在处理耗时操作(如网络请求、数据库查询)时,会阻塞当前线程,导致系统吞吐量下降,用户体验变差。为了解决这一问题,异步编程应运而生。Java 5 引入的 Future
接口是异步编程的初步尝试,它代表了一个异步计算的结果,但其局限性也日益凸显。
1.1 传统 Future 的局限性
Future
接口虽然提供了异步执行任务的能力,但其设计存在以下几个主要局限性:
- 阻塞式获取结果:
Future.get()
方法会阻塞当前线程,直到异步任务完成并返回结果。这意味着,如果任务执行时间过长,调用线程将被长时间阻塞,无法执行其他操作,从而降低了系统的响应能力和资源利用率。 - 无法方便地进行链式操作和组合:
Future
接口没有提供直接的方法来将多个异步操作串联起来,或者将多个异步操作的结果进行组合。当需要执行一系列相互依赖的异步任务时,开发者往往需要手动管理线程和回调,代码变得复杂且难以维护,容易出现“回调地狱”(Callback Hell)。 - 异常处理不便:
Future
接口的异常处理机制相对简单。当异步任务抛出异常时,只有在调用get()
方法时才能捕获到ExecutionException
,这使得异常的传播和处理变得不灵活,难以在异步流程中进行细粒度的错误控制。
1.2 CompletableFuture 的优势
为了克服 Future
的这些局限性,Java 8 引入了 CompletableFuture
类。CompletableFuture
不仅实现了 Future
接口,还实现了 CompletionStage
接口,这使得它在异步编程方面拥有了前所未有的灵活性和强大功能。CompletableFuture
的主要优势体现在:
- 非阻塞:
CompletableFuture
通过回调机制实现了非阻塞操作。它允许你注册一个回调函数,当异步任务完成时,该回调函数会被自动执行,而不会阻塞当前线程。这极大地提高了系统的并发能力和响应速度。 - 可组合:
CompletableFuture
提供了丰富的 API,支持将多个异步操作进行链式调用和组合。你可以轻松地将一个异步任务的结果作为另一个异步任务的输入,或者等待多个异步任务都完成后再执行某个操作。这种可组合性使得复杂的异步流程能够以声明式的方式清晰地表达,代码结构更加简洁。 - 更灵活的异常处理:
CompletableFuture
提供了exceptionally()
、handle()
等方法,允许开发者在异步任务的任何阶段捕获和处理异常。这使得异常处理变得更加灵活和可控,避免了传统Future
中异常处理的痛点。 - 更强大的并发控制:
CompletableFuture
内部使用了 ForkJoinPool 作为默认的异步执行线程池,也可以自定义线程池。它能够更好地利用多核处理器的优势,实现高效的并发控制。
总之,CompletableFuture
是 Java 异步编程领域的一个里程碑式的改进,它为开发者提供了构建高性能、高响应、易于维护的并发应用程序的强大工具。
2. CompletableFuture 核心概念与基本用法
CompletableFuture
是 Java 8 中引入的一个强大的并发工具,它位于 java.util.concurrent
包中。
2.1 CompletableFuture 是什么
CompletableFuture<T>
是一个类,它实现了 Future<T>
和 CompletionStage<T>
两个接口。这意味着它既可以作为传统 Future
的替代品,用于获取异步计算的结果,又具备了 CompletionStage
接口提供的强大功能,支持链式操作和组合多个异步计算步骤。
Future<T>
接口:代表一个异步计算的结果。通过get()
方法可以阻塞地获取结果,或者通过cancel()
方法取消任务。CompletionStage<T>
接口:定义了一系列用于描述异步计算阶段的方法。这些方法允许你在一个异步操作完成后执行另一个操作,而无需阻塞当前线程。这是CompletableFuture
强大之处的核心。
简单来说,CompletableFuture
代表了一个可能在未来某个时间点完成的异步计算的结果。这个结果可以是成功的值,也可以是计算过程中抛出的异常。
2.2 创建 CompletableFuture
CompletableFuture
提供了多种静态方法来创建不同类型的异步任务:
2.2.1 CompletableFuture.runAsync(Runnable runnable)
用于执行一个没有返回值的异步任务。它接受一个 Runnable
类型的参数,并在 ForkJoinPool.commonPool() 中异步执行该任务。如果需要指定线程池,可以使用 CompletableFuture.runAsync(Runnable runnable, Executor executor)
。
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws InterruptedException {System.out.println("主线程开始");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("异步任务执行完成,无返回值");} catch (InterruptedException e) {e.printStackTrace();}});// 主线程可以继续执行其他任务,无需等待异步任务完成System.out.println("主线程继续执行其他任务");// 等待异步任务完成(非阻塞方式,通过回调)future.thenRun(() -> System.out.println("异步任务真正完成后的回调"));// 为了演示效果,让主线程等待一段时间,确保异步任务有时间执行TimeUnit.SECONDS.sleep(3);System.out.println("主线程结束");}
}
2.2.2 CompletableFuture.supplyAsync(Supplier supplier)
用于执行一个有返回值的异步任务。它接受一个 Supplier<T>
类型的参数,并在 ForkJoinPool.commonPool() 中异步执行该任务,返回一个 CompletableFuture<T>
。同样,也可以指定线程池:CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)
。
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("异步任务执行完成,有返回值");return "Hello, CompletableFuture!";} catch (InterruptedException e) {throw new IllegalStateException(e);}});System.out.println("主线程继续执行其他任务");// 阻塞式获取结果(仅为演示,实际应用中应避免长时间阻塞)String result = future.get(); System.out.println("异步任务返回结果: " + result);System.out.println("主线程结束");}
}
2.2.3 new CompletableFuture()
你可以手动创建一个 CompletableFuture
实例,并在后续通过 complete()
或 completeExceptionally()
方法来手动完成它。这在某些场景下非常有用,例如当你需要将一个非 CompletableFuture
风格的异步操作包装成 CompletableFuture
时。
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = new CompletableFuture<>();// 在另一个线程中执行耗时操作,并手动完成 CompletableFuturenew Thread(() -> {try {TimeUnit.SECONDS.sleep(2);future.complete("手动完成的 CompletableFuture");System.out.println("手动 CompletableFuture 已完成");} catch (InterruptedException e) {future.completeExceptionally(e);}}).start();System.out.println("主线程继续执行其他任务");String result = future.get(); // 阻塞等待结果System.out.println("获取到手动 CompletableFuture 的结果: " + result);System.out.println("主线程结束");}
}
2.2.4 CompletableFuture.completedFuture(T value)
如果你已经知道一个异步操作的结果,可以直接使用 completedFuture()
方法创建一个已经完成的 CompletableFuture
。这对于测试或者某些特定场景非常方便,可以避免不必要的异步执行。
示例代码:
import java.util.concurrent.CompletableFuture;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = CompletableFuture.completedFuture("这是一个已完成的 CompletableFuture");System.out.println("主线程继续执行其他任务");String result = future.get(); // 不会阻塞,立即返回结果System.out.println("获取到已完成 CompletableFuture 的结果: " + result);System.out.println("主线程结束");}
}
2.3 获取结果
当 CompletableFuture
完成后,你可以通过以下方法获取其结果:
2.3.1 get()
get()
方法是 Future
接口中定义的方法,它会阻塞当前线程,直到 CompletableFuture
完成并返回结果。如果任务在完成时抛出异常,get()
方法会抛出 ExecutionException
,其 getCause()
方法可以获取到原始异常。
注意:在实际应用中,应尽量避免长时间阻塞主线程,get()
方法通常用于测试或在确定异步任务很快完成的场景。
2.3.2 join()
join()
方法与 get()
方法类似,也会阻塞当前线程并等待 CompletableFuture
完成。但不同的是,join()
方法不会抛出受检异常 ExecutionException
,而是将原始异常包装成非受检异常 CompletionException
抛出。这使得在链式调用中处理异常更加方便,无需在每个 get()
调用处都进行 try-catch
。
2.3.3 complete(T value)
complete()
方法用于手动完成 CompletableFuture
,并为其设置一个结果值。如果 CompletableFuture
已经完成(无论是正常完成还是异常完成),再次调用 complete()
将无效。
2.3.4 completeExceptionally(Throwable ex)
completeExceptionally()
方法用于手动使 CompletableFuture
异常完成,并为其设置一个异常。这在异步任务执行过程中发生错误时非常有用,可以将异常信息传递给 CompletableFuture
的消费者。
示例代码(get()
vs join()
):
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureGetJoin {public static void main(String[] args) {// 正常完成的 CompletableFutureCompletableFuture<String> successFuture = CompletableFuture.supplyAsync(() -> "Success");try {String resultGet = successFuture.get();System.out.println("get() 正常结果: " + resultGet);String resultJoin = successFuture.join();System.out.println("join() 正常结果: " + resultJoin);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 异常完成的 CompletableFutureCompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Something went wrong!");});try {exceptionFuture.get(); // 抛出 ExecutionException} catch (InterruptedException | ExecutionException e) {System.out.println("get() 捕获到异常: " + e.getCause().getMessage());}try {exceptionFuture.join(); // 抛出 CompletionException} catch (Exception e) {System.out.println("join() 捕获到异常: " + e.getCause().getMessage());}}
}
3. 链式操作:构建异步任务流
CompletableFuture
最强大的特性之一是其支持链式操作,允许我们将多个异步任务串联起来,形成一个有向无环图(DAG),从而构建复杂的异步任务流。这极大地简化了异步编程的复杂性,避免了传统回调模式带来的“回调地狱”。
3.1 结果转换:thenApply()
thenApply()
方法用于对上一个 CompletableFuture
的结果进行转换,并返回一个新的 CompletableFuture
。它接受一个 Function
函数式接口作为参数,该函数接收上一个 CompletableFuture
的结果作为输入,并返回一个转换后的新结果。thenApply()
是同步执行的,即转换操作会在完成上一个 CompletableFuture
的线程中执行。如果需要异步执行转换操作,可以使用 thenApplyAsync()
。
方法签名:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> initialFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> transformedFuture = initialFuture.thenApply(s -> {System.out.println("thenApply 在 " + Thread.currentThread().getName() + " 线程中执行");return s + " World";});System.out.println("结果: " + transformedFuture.get()); // 输出: 结果: Hello World}
}
3.2 消费结果:thenAccept()
thenAccept()
方法用于消费上一个 CompletableFuture
的结果,但不会返回新的结果(即返回 CompletableFuture<Void>
)。它接受一个 Consumer
函数式接口作为参数,该函数接收上一个 CompletableFuture
的结果作为输入,但没有返回值。thenAccept()
同样有异步版本 thenAcceptAsync()
。
方法签名:
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenAccept(s -> {System.out.println("thenAccept 在 " + Thread.currentThread().getName() + " 线程中执行");System.out.println("消费结果: " + s + ", 无返回值");});voidFuture.get(); // 等待消费完成}
}
3.3 任务完成:thenRun()
thenRun()
方法用于在上一个 CompletableFuture
完成后执行一个不关心结果且没有返回值的任务。它接受一个 Runnable
函数式接口作为参数。thenRun()
同样有异步版本 thenRunAsync()
。
方法签名:
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenRun(() -> {System.out.println("thenRun 在 " + Thread.currentThread().getName() + " 线程中执行");System.out.println("上一个任务已完成,执行不关心结果的任务");});voidFuture.get(); // 等待任务完成}
}
3.4 异步转换:thenCompose()
thenCompose()
方法是 CompletableFuture
中非常重要的一个方法,它用于将上一个 CompletableFuture
的结果作为参数,创建并返回一个新的 CompletableFuture
。这与 thenApply()
的区别在于,thenApply()
返回的是一个包含转换后结果的 CompletableFuture
,而 thenCompose()
返回的是一个扁平化的 CompletableFuture
。当你的转换函数本身也返回一个 CompletableFuture
时,thenCompose()
可以避免出现 CompletableFuture<CompletableFuture<T>>
这种嵌套结构,从而保持链的扁平化。
方法签名:
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "User ID: 123";} catch (InterruptedException e) {throw new IllegalStateException(e);}});// thenApply 示例 (会产生嵌套)CompletableFuture<CompletableFuture<String>> nestedFuture = future1.thenApply(userId -> {System.out.println("thenApply 内部获取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Alice";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenApply 结果 (嵌套): " + nestedFuture.get().get()); // 需要两次 get()// thenCompose 示例 (扁平化)CompletableFuture<String> flatFuture = future1.thenCompose(userId -> {System.out.println("thenCompose 内部获取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Bob";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenCompose 结果 (扁平化): " + flatFuture.get()); // 只需一次 get()}
}
4. 组合操作:处理多个异步任务
在实际应用中,我们经常需要处理多个独立的异步任务,并在它们全部完成或其中任意一个完成时执行后续操作。CompletableFuture
提供了强大的组合方法,使得这些场景的处理变得非常优雅和高效。
4.1 组合两个结果:thenCombine()
thenCombine()
方法用于当两个 CompletableFuture
都完成后,将它们的结果组合起来,并返回一个新的 CompletableFuture
。它接受另一个 CompletionStage
和一个 BiFunction
作为参数,BiFunction
接收两个 CompletableFuture
的结果作为输入,并返回一个组合后的新结果。
方法签名:
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "World";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> {System.out.println("thenCombine 在 " + Thread.currentThread().getName() + " 线程中执行");return s1 + " " + s2;});System.out.println("组合结果: " + combinedFuture.get()); // 输出: 组合结果: Hello World}
}
4.2 所有任务完成:allOf()
allOf()
方法用于等待所有给定的 CompletableFuture
都完成。它接受一个 CompletableFuture
数组作为参数,并返回一个 CompletableFuture<Void>
。当所有输入的 CompletableFuture
都成功完成时,返回的 CompletableFuture
才会完成。如果其中任何一个 CompletableFuture
异常完成,那么 allOf()
返回的 CompletableFuture
也会异常完成。
方法签名:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);// 等待所有任务完成allFutures.get(); System.out.println("所有任务都已完成!");// 可以通过各自的 future.get() 获取结果System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());}
}
4.3 任意任务完成:anyOf()
anyOf()
方法用于当任何一个给定的 CompletableFuture
完成时,就完成当前的 CompletableFuture
。它接受一个 CompletableFuture
数组作为参数,并返回一个 CompletableFuture<Object>
。返回的 CompletableFuture
的结果将是第一个完成的 CompletableFuture
的结果。
方法签名:
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一个完成的任务结果: " + anyOfFuture.get()); // 输出: 第一个完成的任务结果: Result from Future 2}
}
5. 异常处理
在异步编程中,异常处理是一个非常重要的环节。CompletableFuture
提供了多种机制来优雅地处理异步任务中可能出现的异常,避免了传统 Future
中异常处理的繁琐和不便。
5.1 异常处理:exceptionally()
exceptionally()
方法用于当 CompletableFuture
出现异常时,提供一个替代结果。它接受一个 Function<Throwable, ? extends T>
作为参数,当上一个 CompletableFuture
异常完成时,该函数会被调用,并接收异常作为输入,然后返回一个替代值作为当前 CompletableFuture
的结果。如果上一个 CompletableFuture
正常完成,exceptionally()
不会执行。
方法签名:
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
示例代码:
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("模拟异常");}return "正常结果";}).exceptionally(ex -> {System.out.println("捕获到异常: " + ex.getMessage());return "从异常中恢复的默认结果";});System.out.println("最终结果: " + future.get());}
}
5.2 统一处理:handle()
handle()
方法是一个更通用的处理方法,无论 CompletableFuture
是正常完成还是异常完成,它都会执行。它接受一个 BiFunction<? super T, Throwable, ? extends U>
作为参数,该函数接收上一个 CompletableFuture
的结果和可能发生的异常作为输入。如果正常完成,异常参数为 null
;如果异常完成,结果参数为 null
。handle()
的返回值将作为当前 CompletableFuture
的结果。
方法签名:
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情况CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常数据").handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕获到异常: " + ex.getMessage());return "处理异常后的结果";} else {System.out.println("handle 正常处理结果: " + result);return result + " - 处理完成";}});System.out.println("正常情况最终结果: " + normalFuture.get());// 异常情况CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模拟异常");}).handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕获到异常: " + ex.getMessage());return "处理异常后的结果";} else {System.out.println("handle 正常处理结果: " + result);return result + " - 处理完成";}});System.out.println("异常情况最终结果: " + exceptionFuture.get());}
}
5.3 完成时回调:whenComplete()
whenComplete()
方法用于当 CompletableFuture
完成时执行一个回调,无论它是正常完成还是异常完成。它接受一个 BiConsumer<? super T, ? super Throwable>
作为参数,该函数接收结果和异常作为输入。与 handle()
不同的是,whenComplete()
不会修改 CompletableFuture
的结果,主要用于日志记录、资源清理或触发后续不依赖结果的操作。如果 whenComplete()
内部抛出异常,该异常会覆盖原始的异常(如果存在)。
方法签名:
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
示例代码:
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情况CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常数据").whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕获到异常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,结果: " + result);}});System.out.println("正常情况最终结果: " + normalFuture.get());// 异常情况CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模拟异常");}).whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕获到异常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,结果: " + result);}});try {exceptionFuture.get(); // 原始异常会被重新抛出} catch (Exception e) {System.out.println("get() 捕获到原始异常: " + e.getCause().getMessage());}}
}
6. 实际应用场景与最佳实践
CompletableFuture
在 Java 后端开发中有着广泛的应用,尤其是在需要处理大量并发请求、优化系统响应时间以及构建高吞吐量服务的场景下。合理地运用 CompletableFuture
可以显著提升系统性能和用户体验。
6.1 实际应用场景
6.1.1 并行调用多个微服务
在微服务架构中,一个业务请求可能需要调用多个下游微服务来获取数据。如果这些调用是串行的,那么总的响应时间将是所有微服务响应时间之和。通过 CompletableFuture
,我们可以并行地发起对多个微服务的调用,然后使用 allOf()
或 thenCombine()
等方法等待所有结果或组合结果,从而大大缩短响应时间。
示例场景: 用户下单时,需要同时查询商品库存、用户积分和优惠券信息。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class MicroserviceParallelCall {// 模拟查询商品库存的微服务public static CompletableFuture<Integer> getProductStock(String productId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟网络延迟System.out.println("查询商品 " + productId + " 库存完成");return 100; // 假设库存100} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模拟查询用户积分的微服务public static CompletableFuture<Integer> getUserPoints(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1.5); // 模拟网络延迟System.out.println("查询用户 " + userId + " 积分完成");return 2000; // 假设积分2000} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模拟查询优惠券信息的微服务public static CompletableFuture<String> getCouponInfo(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(0.8); // 模拟网络延迟System.out.println("查询用户 " + userId + " 优惠券完成");return "满100减10"; // 假设优惠券信息} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) throws Exception {long start = System.currentTimeMillis();CompletableFuture<Integer> stockFuture = getProductStock("P001");CompletableFuture<Integer> pointsFuture = getUserPoints("U001");CompletableFuture<String> couponFuture = getCouponInfo("U001");// 等待所有异步任务完成CompletableFuture.allOf(stockFuture, pointsFuture, couponFuture).join();// 获取结果并处理Integer stock = stockFuture.get();Integer points = pointsFuture.get();String coupon = couponFuture.get();System.out.println("\n所有信息查询完成:");System.out.println("商品库存: " + stock);System.out.println("用户积分: " + points);System.out.println("优惠券信息: " + coupon);long end = System.currentTimeMillis();System.out.println("总耗时: " + (end - start) + " ms");}
}
6.1.2 异步发送通知(邮件、短信)
在用户注册、订单支付成功等场景下,系统通常需要发送邮件、短信或站内信等通知。这些通知操作通常不影响主业务流程,但如果同步执行,可能会增加主业务的响应时间。使用 CompletableFuture
可以将这些通知操作异步化,提升主业务的响应速度。
示例场景: 用户注册成功后,异步发送欢迎邮件和短信。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class AsyncNotification {public static CompletableFuture<Void> sendEmail(String email, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟邮件发送耗时System.out.println("邮件发送成功到: " + email + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static CompletableFuture<Void> sendSms(String phoneNumber, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(0.5); // 模拟短信发送耗时System.out.println("短信发送成功到: " + phoneNumber + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static void main(String[] args) {System.out.println("用户注册成功,开始处理通知...");CompletableFuture<Void> emailFuture = sendEmail("test@example.com", "欢迎注册!");CompletableFuture<Void> smsFuture = sendSms("13800138000", "欢迎注册!");// 主业务流程可以继续,无需等待通知发送完成System.out.println("主业务流程继续执行...");// 可以选择等待所有通知发送完成,或者不等待CompletableFuture.allOf(emailFuture, smsFuture).join();System.out.println("所有通知任务已完成。");}
}
6.1.3 批量数据处理
当需要处理大量数据,并且每个数据的处理是独立的时,可以使用 CompletableFuture
将数据分成小批次并行处理,从而提高整体处理效率。
示例场景: 批量处理用户数据,对每个用户进行数据清洗和存储。
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class BatchDataProcessing {public static CompletableFuture<String> processUserData(String user) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(200); // 模拟数据处理耗时System.out.println("处理用户数据: " + user + " 完成");return user.toUpperCase(); // 模拟数据清洗} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) {List<String> users = Arrays.asList("user1", "user2", "user3", "user4", "user5");long start = System.currentTimeMillis();List<CompletableFuture<String>> futures = users.stream().map(BatchDataProcessing::processUserData).collect(Collectors.toList());// 等待所有用户数据处理完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 获取所有处理结果List<String> processedUsers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println("\n所有用户数据处理完成,结果: " + processedUsers);long end = System.currentTimeMillis();System.out.println("总耗时: " + (end - start) + " ms");}
}
6.2 最佳实践
-
合理选择线程池:
CompletableFuture
默认使用ForkJoinPool.commonPool()
。对于 CPU 密集型任务,默认线程池通常是合适的。但对于 I/O 密集型任务,建议自定义线程池,并根据实际情况调整线程数量,避免线程饥饿或资源浪费。例如,可以使用Executors.newFixedThreadPool()
或ThreadPoolExecutor
。// 自定义线程池示例 ExecutorService customExecutor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> {// 耗时操作return "Result"; }, customExecutor);
-
避免过度使用
get()
和join()
:虽然get()
和join()
可以获取CompletableFuture
的结果,但它们是阻塞的。过度使用会导致异步优势丧失,甚至引入死锁。应尽量使用thenApply()
、thenAccept()
、thenCompose()
等非阻塞的回调方法来构建异步链。 -
善用异常处理机制:
exceptionally()
、handle()
和whenComplete()
提供了灵活的异常处理方式。根据业务需求选择合适的异常处理策略,确保异步任务的健壮性。exceptionally()
适用于从异常中恢复并提供替代结果的场景,handle()
适用于无论成功失败都需要统一处理的场景,而whenComplete()
适用于执行一些副作用操作(如日志记录、资源清理)而不改变结果的场景。 -
链式调用与组合的合理运用:充分利用
CompletableFuture
提供的链式调用和组合方法,将复杂的异步逻辑拆解成更小、更易管理的部分。这不仅使代码更具可读性,也更容易进行测试和维护。特别注意thenApply()
和thenCompose()
的区别,避免不必要的嵌套。 -
超时处理:对于可能长时间运行的异步任务,考虑添加超时机制,避免资源无限期占用。虽然
CompletableFuture
本身没有直接的超时方法,但可以通过CompletableFuture.orTimeout()
(Java 9+) 或结合CompletableFuture.runAfter()
等方法实现。// Java 9+ 超时处理示例 CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(5); // 模拟长时间任务return "Task Completed";} catch (InterruptedException e) {throw new IllegalStateException(e);} }).orTimeout(2, TimeUnit.SECONDS); // 2秒后超时try {System.out.println(futureWithTimeout.get()); } catch (Exception e) {System.out.println("任务超时或异常: " + e.getMessage()); }
-
日志记录:在异步任务的关键节点添加日志,便于追踪任务执行状态和排查问题。可以使用
whenComplete()
或handle()
来记录任务的成功或失败。
7. 总结
CompletableFuture
是 Java 8 引入的异步编程利器,它通过提供非阻塞、可组合、灵活的异常处理机制,极大地提升了 Java 在并发编程领域的表现力。本文从 CompletableFuture
的基本概念、创建方式、链式操作、组合操作以及异常处理等方面进行了深入解析,并通过丰富的代码示例展示了其在实际应用中的强大功能。
掌握 CompletableFuture
的核心 API 和最佳实践,能够帮助 Java 后端开发者更好地应对高并发、低延迟的挑战,构建出高性能、高响应、易于维护的现代后端服务。