当前位置: 首页 > news >正文

CompletableFuture 深度解析

本文将探讨 Java 8 引入的 CompletableFuture,一个在异步编程中实现非阻塞、可组合操作的强大工具。我们将从 CompletableFuture 的基本概念、与传统 Future 的区别、核心 API 用法,到复杂的链式调用、组合操作以及异常处理进行全面解析,并通过丰富的代码示例,帮助 Java 后端开发者更好地理解和应用 CompletableFuture,提升系统性能和响应能力。

1. 为什么需要 CompletableFuture

在现代后端开发中,高并发和低延迟是衡量系统性能的重要指标。传统的同步编程模型在处理耗时操作(如网络请求、数据库查询)时,会阻塞当前线程,导致系统吞吐量下降,用户体验变差。为了解决这一问题,异步编程应运而生。Java 5 引入的 Future 接口是异步编程的初步尝试,它代表了一个异步计算的结果,但其局限性也日益凸显。

1.1 传统 Future 的局限性

Future 接口虽然提供了异步执行任务的能力,但其设计存在以下几个主要局限性:

  • 阻塞式获取结果Future.get() 方法会阻塞当前线程,直到异步任务完成并返回结果。这意味着,如果任务执行时间过长,调用线程将被长时间阻塞,无法执行其他操作,从而降低了系统的响应能力和资源利用率。
  • 无法方便地进行链式操作和组合Future 接口没有提供直接的方法来将多个异步操作串联起来,或者将多个异步操作的结果进行组合。当需要执行一系列相互依赖的异步任务时,开发者往往需要手动管理线程和回调,代码变得复杂且难以维护,容易出现“回调地狱”(Callback Hell)。
  • 异常处理不便Future 接口的异常处理机制相对简单。当异步任务抛出异常时,只有在调用 get() 方法时才能捕获到 ExecutionException,这使得异常的传播和处理变得不灵活,难以在异步流程中进行细粒度的错误控制。

1.2 CompletableFuture 的优势

为了克服 Future 的这些局限性,Java 8 引入了 CompletableFuture 类。CompletableFuture 不仅实现了 Future 接口,还实现了 CompletionStage 接口,这使得它在异步编程方面拥有了前所未有的灵活性和强大功能。CompletableFuture 的主要优势体现在:

  • 非阻塞CompletableFuture 通过回调机制实现了非阻塞操作。它允许你注册一个回调函数,当异步任务完成时,该回调函数会被自动执行,而不会阻塞当前线程。这极大地提高了系统的并发能力和响应速度。
  • 可组合CompletableFuture 提供了丰富的 API,支持将多个异步操作进行链式调用和组合。你可以轻松地将一个异步任务的结果作为另一个异步任务的输入,或者等待多个异步任务都完成后再执行某个操作。这种可组合性使得复杂的异步流程能够以声明式的方式清晰地表达,代码结构更加简洁。
  • 更灵活的异常处理CompletableFuture 提供了 exceptionally()handle() 等方法,允许开发者在异步任务的任何阶段捕获和处理异常。这使得异常处理变得更加灵活和可控,避免了传统 Future 中异常处理的痛点。
  • 更强大的并发控制CompletableFuture 内部使用了 ForkJoinPool 作为默认的异步执行线程池,也可以自定义线程池。它能够更好地利用多核处理器的优势,实现高效的并发控制。

总之,CompletableFuture 是 Java 异步编程领域的一个里程碑式的改进,它为开发者提供了构建高性能、高响应、易于维护的并发应用程序的强大工具。

2. CompletableFuture 核心概念与基本用法

CompletableFuture 是 Java 8 中引入的一个强大的并发工具,它位于 java.util.concurrent 包中。

2.1 CompletableFuture 是什么

CompletableFuture<T> 是一个类,它实现了 Future<T>CompletionStage<T> 两个接口。这意味着它既可以作为传统 Future 的替代品,用于获取异步计算的结果,又具备了 CompletionStage 接口提供的强大功能,支持链式操作和组合多个异步计算步骤。

  • Future<T> 接口:代表一个异步计算的结果。通过 get() 方法可以阻塞地获取结果,或者通过 cancel() 方法取消任务。
  • CompletionStage<T> 接口:定义了一系列用于描述异步计算阶段的方法。这些方法允许你在一个异步操作完成后执行另一个操作,而无需阻塞当前线程。这是 CompletableFuture 强大之处的核心。

简单来说,CompletableFuture 代表了一个可能在未来某个时间点完成的异步计算的结果。这个结果可以是成功的值,也可以是计算过程中抛出的异常。

2.2 创建 CompletableFuture

CompletableFuture 提供了多种静态方法来创建不同类型的异步任务:

2.2.1 CompletableFuture.runAsync(Runnable runnable)

用于执行一个没有返回值的异步任务。它接受一个 Runnable 类型的参数,并在 ForkJoinPool.commonPool() 中异步执行该任务。如果需要指定线程池,可以使用 CompletableFuture.runAsync(Runnable runnable, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws InterruptedException {System.out.println("主线程开始");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("异步任务执行完成,无返回值");} catch (InterruptedException e) {e.printStackTrace();}});// 主线程可以继续执行其他任务,无需等待异步任务完成System.out.println("主线程继续执行其他任务");// 等待异步任务完成(非阻塞方式,通过回调)future.thenRun(() -> System.out.println("异步任务真正完成后的回调"));// 为了演示效果,让主线程等待一段时间,确保异步任务有时间执行TimeUnit.SECONDS.sleep(3);System.out.println("主线程结束");}
}

2.2.2 CompletableFuture.supplyAsync(Supplier supplier)

用于执行一个有返回值的异步任务。它接受一个 Supplier<T> 类型的参数,并在 ForkJoinPool.commonPool() 中异步执行该任务,返回一个 CompletableFuture<T>。同样,也可以指定线程池:CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("异步任务执行完成,有返回值");return "Hello, CompletableFuture!";} catch (InterruptedException e) {throw new IllegalStateException(e);}});System.out.println("主线程继续执行其他任务");// 阻塞式获取结果(仅为演示,实际应用中应避免长时间阻塞)String result = future.get(); System.out.println("异步任务返回结果: " + result);System.out.println("主线程结束");}
}

2.2.3 new CompletableFuture()

你可以手动创建一个 CompletableFuture 实例,并在后续通过 complete()completeExceptionally() 方法来手动完成它。这在某些场景下非常有用,例如当你需要将一个非 CompletableFuture 风格的异步操作包装成 CompletableFuture 时。

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = new CompletableFuture<>();// 在另一个线程中执行耗时操作,并手动完成 CompletableFuturenew Thread(() -> {try {TimeUnit.SECONDS.sleep(2);future.complete("手动完成的 CompletableFuture");System.out.println("手动 CompletableFuture 已完成");} catch (InterruptedException e) {future.completeExceptionally(e);}}).start();System.out.println("主线程继续执行其他任务");String result = future.get(); // 阻塞等待结果System.out.println("获取到手动 CompletableFuture 的结果: " + result);System.out.println("主线程结束");}
}

2.2.4 CompletableFuture.completedFuture(T value)

如果你已经知道一个异步操作的结果,可以直接使用 completedFuture() 方法创建一个已经完成的 CompletableFuture。这对于测试或者某些特定场景非常方便,可以避免不必要的异步执行。

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = CompletableFuture.completedFuture("这是一个已完成的 CompletableFuture");System.out.println("主线程继续执行其他任务");String result = future.get(); // 不会阻塞,立即返回结果System.out.println("获取到已完成 CompletableFuture 的结果: " + result);System.out.println("主线程结束");}
}

2.3 获取结果

CompletableFuture 完成后,你可以通过以下方法获取其结果:

2.3.1 get()

get() 方法是 Future 接口中定义的方法,它会阻塞当前线程,直到 CompletableFuture 完成并返回结果。如果任务在完成时抛出异常,get() 方法会抛出 ExecutionException,其 getCause() 方法可以获取到原始异常。

注意:在实际应用中,应尽量避免长时间阻塞主线程,get() 方法通常用于测试或在确定异步任务很快完成的场景。

2.3.2 join()

join() 方法与 get() 方法类似,也会阻塞当前线程并等待 CompletableFuture 完成。但不同的是,join() 方法不会抛出受检异常 ExecutionException,而是将原始异常包装成非受检异常 CompletionException 抛出。这使得在链式调用中处理异常更加方便,无需在每个 get() 调用处都进行 try-catch

2.3.3 complete(T value)

complete() 方法用于手动完成 CompletableFuture,并为其设置一个结果值。如果 CompletableFuture 已经完成(无论是正常完成还是异常完成),再次调用 complete() 将无效。

2.3.4 completeExceptionally(Throwable ex)

completeExceptionally() 方法用于手动使 CompletableFuture 异常完成,并为其设置一个异常。这在异步任务执行过程中发生错误时非常有用,可以将异常信息传递给 CompletableFuture 的消费者。

示例代码(get() vs join()):

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureGetJoin {public static void main(String[] args) {// 正常完成的 CompletableFutureCompletableFuture<String> successFuture = CompletableFuture.supplyAsync(() -> "Success");try {String resultGet = successFuture.get();System.out.println("get() 正常结果: " + resultGet);String resultJoin = successFuture.join();System.out.println("join() 正常结果: " + resultJoin);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 异常完成的 CompletableFutureCompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Something went wrong!");});try {exceptionFuture.get(); // 抛出 ExecutionException} catch (InterruptedException | ExecutionException e) {System.out.println("get() 捕获到异常: " + e.getCause().getMessage());}try {exceptionFuture.join(); // 抛出 CompletionException} catch (Exception e) {System.out.println("join() 捕获到异常: " + e.getCause().getMessage());}}
}

3. 链式操作:构建异步任务流

CompletableFuture 最强大的特性之一是其支持链式操作,允许我们将多个异步任务串联起来,形成一个有向无环图(DAG),从而构建复杂的异步任务流。这极大地简化了异步编程的复杂性,避免了传统回调模式带来的“回调地狱”。

3.1 结果转换:thenApply()

thenApply() 方法用于对上一个 CompletableFuture 的结果进行转换,并返回一个新的 CompletableFuture。它接受一个 Function 函数式接口作为参数,该函数接收上一个 CompletableFuture 的结果作为输入,并返回一个转换后的新结果。thenApply() 是同步执行的,即转换操作会在完成上一个 CompletableFuture 的线程中执行。如果需要异步执行转换操作,可以使用 thenApplyAsync()

方法签名:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> initialFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> transformedFuture = initialFuture.thenApply(s -> {System.out.println("thenApply 在 " + Thread.currentThread().getName() + " 线程中执行");return s + " World";});System.out.println("结果: " + transformedFuture.get()); // 输出: 结果: Hello World}
}

3.2 消费结果:thenAccept()

thenAccept() 方法用于消费上一个 CompletableFuture 的结果,但不会返回新的结果(即返回 CompletableFuture<Void>)。它接受一个 Consumer 函数式接口作为参数,该函数接收上一个 CompletableFuture 的结果作为输入,但没有返回值。thenAccept() 同样有异步版本 thenAcceptAsync()

方法签名:

CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenAccept(s -> {System.out.println("thenAccept 在 " + Thread.currentThread().getName() + " 线程中执行");System.out.println("消费结果: " + s + ", 无返回值");});voidFuture.get(); // 等待消费完成}
}

3.3 任务完成:thenRun()

thenRun() 方法用于在上一个 CompletableFuture 完成后执行一个不关心结果且没有返回值的任务。它接受一个 Runnable 函数式接口作为参数。thenRun() 同样有异步版本 thenRunAsync()

方法签名:

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenRun(() -> {System.out.println("thenRun 在 " + Thread.currentThread().getName() + " 线程中执行");System.out.println("上一个任务已完成,执行不关心结果的任务");});voidFuture.get(); // 等待任务完成}
}

3.4 异步转换:thenCompose()

thenCompose() 方法是 CompletableFuture 中非常重要的一个方法,它用于将上一个 CompletableFuture 的结果作为参数,创建并返回一个新的 CompletableFuture。这与 thenApply() 的区别在于,thenApply() 返回的是一个包含转换后结果的 CompletableFuture,而 thenCompose() 返回的是一个扁平化的 CompletableFuture。当你的转换函数本身也返回一个 CompletableFuture 时,thenCompose() 可以避免出现 CompletableFuture<CompletableFuture<T>> 这种嵌套结构,从而保持链的扁平化。

方法签名:

<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "User ID: 123";} catch (InterruptedException e) {throw new IllegalStateException(e);}});// thenApply 示例 (会产生嵌套)CompletableFuture<CompletableFuture<String>> nestedFuture = future1.thenApply(userId -> {System.out.println("thenApply 内部获取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Alice";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenApply 结果 (嵌套): " + nestedFuture.get().get()); // 需要两次 get()// thenCompose 示例 (扁平化)CompletableFuture<String> flatFuture = future1.thenCompose(userId -> {System.out.println("thenCompose 内部获取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Bob";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenCompose 结果 (扁平化): " + flatFuture.get()); // 只需一次 get()}
}

4. 组合操作:处理多个异步任务

在实际应用中,我们经常需要处理多个独立的异步任务,并在它们全部完成或其中任意一个完成时执行后续操作。CompletableFuture 提供了强大的组合方法,使得这些场景的处理变得非常优雅和高效。

4.1 组合两个结果:thenCombine()

thenCombine() 方法用于当两个 CompletableFuture 都完成后,将它们的结果组合起来,并返回一个新的 CompletableFuture。它接受另一个 CompletionStage 和一个 BiFunction 作为参数,BiFunction 接收两个 CompletableFuture 的结果作为输入,并返回一个组合后的新结果。

方法签名:

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "World";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> {System.out.println("thenCombine 在 " + Thread.currentThread().getName() + " 线程中执行");return s1 + " " + s2;});System.out.println("组合结果: " + combinedFuture.get()); // 输出: 组合结果: Hello World}
}

4.2 所有任务完成:allOf()

allOf() 方法用于等待所有给定的 CompletableFuture 都完成。它接受一个 CompletableFuture 数组作为参数,并返回一个 CompletableFuture<Void>。当所有输入的 CompletableFuture 都成功完成时,返回的 CompletableFuture 才会完成。如果其中任何一个 CompletableFuture 异常完成,那么 allOf() 返回的 CompletableFuture 也会异常完成。

方法签名:

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);// 等待所有任务完成allFutures.get(); System.out.println("所有任务都已完成!");// 可以通过各自的 future.get() 获取结果System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());}
}

4.3 任意任务完成:anyOf()

anyOf() 方法用于当任何一个给定的 CompletableFuture 完成时,就完成当前的 CompletableFuture。它接受一个 CompletableFuture 数组作为参数,并返回一个 CompletableFuture<Object>。返回的 CompletableFuture 的结果将是第一个完成的 CompletableFuture 的结果。

方法签名:

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一个完成的任务结果: " + anyOfFuture.get()); // 输出: 第一个完成的任务结果: Result from Future 2}
}

5. 异常处理

在异步编程中,异常处理是一个非常重要的环节。CompletableFuture 提供了多种机制来优雅地处理异步任务中可能出现的异常,避免了传统 Future 中异常处理的繁琐和不便。

5.1 异常处理:exceptionally()

exceptionally() 方法用于当 CompletableFuture 出现异常时,提供一个替代结果。它接受一个 Function<Throwable, ? extends T> 作为参数,当上一个 CompletableFuture 异常完成时,该函数会被调用,并接收异常作为输入,然后返回一个替代值作为当前 CompletableFuture 的结果。如果上一个 CompletableFuture 正常完成,exceptionally() 不会执行。

方法签名:

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("模拟异常");}return "正常结果";}).exceptionally(ex -> {System.out.println("捕获到异常: " + ex.getMessage());return "从异常中恢复的默认结果";});System.out.println("最终结果: " + future.get());}
}

5.2 统一处理:handle()

handle() 方法是一个更通用的处理方法,无论 CompletableFuture 是正常完成还是异常完成,它都会执行。它接受一个 BiFunction<? super T, Throwable, ? extends U> 作为参数,该函数接收上一个 CompletableFuture 的结果和可能发生的异常作为输入。如果正常完成,异常参数为 null;如果异常完成,结果参数为 nullhandle() 的返回值将作为当前 CompletableFuture 的结果。

方法签名:

<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情况CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常数据").handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕获到异常: " + ex.getMessage());return "处理异常后的结果";} else {System.out.println("handle 正常处理结果: " + result);return result + " - 处理完成";}});System.out.println("正常情况最终结果: " + normalFuture.get());// 异常情况CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模拟异常");}).handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕获到异常: " + ex.getMessage());return "处理异常后的结果";} else {System.out.println("handle 正常处理结果: " + result);return result + " - 处理完成";}});System.out.println("异常情况最终结果: " + exceptionFuture.get());}
}

5.3 完成时回调:whenComplete()

whenComplete() 方法用于当 CompletableFuture 完成时执行一个回调,无论它是正常完成还是异常完成。它接受一个 BiConsumer<? super T, ? super Throwable> 作为参数,该函数接收结果和异常作为输入。与 handle() 不同的是,whenComplete() 不会修改 CompletableFuture 的结果,主要用于日志记录、资源清理或触发后续不依赖结果的操作。如果 whenComplete() 内部抛出异常,该异常会覆盖原始的异常(如果存在)。

方法签名:

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情况CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常数据").whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕获到异常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,结果: " + result);}});System.out.println("正常情况最终结果: " + normalFuture.get());// 异常情况CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模拟异常");}).whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕获到异常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,结果: " + result);}});try {exceptionFuture.get(); // 原始异常会被重新抛出} catch (Exception e) {System.out.println("get() 捕获到原始异常: " + e.getCause().getMessage());}}
}

6. 实际应用场景与最佳实践

CompletableFuture 在 Java 后端开发中有着广泛的应用,尤其是在需要处理大量并发请求、优化系统响应时间以及构建高吞吐量服务的场景下。合理地运用 CompletableFuture 可以显著提升系统性能和用户体验。

6.1 实际应用场景

6.1.1 并行调用多个微服务

在微服务架构中,一个业务请求可能需要调用多个下游微服务来获取数据。如果这些调用是串行的,那么总的响应时间将是所有微服务响应时间之和。通过 CompletableFuture,我们可以并行地发起对多个微服务的调用,然后使用 allOf()thenCombine() 等方法等待所有结果或组合结果,从而大大缩短响应时间。

示例场景: 用户下单时,需要同时查询商品库存、用户积分和优惠券信息。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class MicroserviceParallelCall {// 模拟查询商品库存的微服务public static CompletableFuture<Integer> getProductStock(String productId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟网络延迟System.out.println("查询商品 " + productId + " 库存完成");return 100; // 假设库存100} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模拟查询用户积分的微服务public static CompletableFuture<Integer> getUserPoints(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1.5); // 模拟网络延迟System.out.println("查询用户 " + userId + " 积分完成");return 2000; // 假设积分2000} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模拟查询优惠券信息的微服务public static CompletableFuture<String> getCouponInfo(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(0.8); // 模拟网络延迟System.out.println("查询用户 " + userId + " 优惠券完成");return "满100减10"; // 假设优惠券信息} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) throws Exception {long start = System.currentTimeMillis();CompletableFuture<Integer> stockFuture = getProductStock("P001");CompletableFuture<Integer> pointsFuture = getUserPoints("U001");CompletableFuture<String> couponFuture = getCouponInfo("U001");// 等待所有异步任务完成CompletableFuture.allOf(stockFuture, pointsFuture, couponFuture).join();// 获取结果并处理Integer stock = stockFuture.get();Integer points = pointsFuture.get();String coupon = couponFuture.get();System.out.println("\n所有信息查询完成:");System.out.println("商品库存: " + stock);System.out.println("用户积分: " + points);System.out.println("优惠券信息: " + coupon);long end = System.currentTimeMillis();System.out.println("总耗时: " + (end - start) + " ms");}
}

6.1.2 异步发送通知(邮件、短信)

在用户注册、订单支付成功等场景下,系统通常需要发送邮件、短信或站内信等通知。这些通知操作通常不影响主业务流程,但如果同步执行,可能会增加主业务的响应时间。使用 CompletableFuture 可以将这些通知操作异步化,提升主业务的响应速度。

示例场景: 用户注册成功后,异步发送欢迎邮件和短信。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class AsyncNotification {public static CompletableFuture<Void> sendEmail(String email, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟邮件发送耗时System.out.println("邮件发送成功到: " + email + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static CompletableFuture<Void> sendSms(String phoneNumber, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(0.5); // 模拟短信发送耗时System.out.println("短信发送成功到: " + phoneNumber + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static void main(String[] args) {System.out.println("用户注册成功,开始处理通知...");CompletableFuture<Void> emailFuture = sendEmail("test@example.com", "欢迎注册!");CompletableFuture<Void> smsFuture = sendSms("13800138000", "欢迎注册!");// 主业务流程可以继续,无需等待通知发送完成System.out.println("主业务流程继续执行...");// 可以选择等待所有通知发送完成,或者不等待CompletableFuture.allOf(emailFuture, smsFuture).join();System.out.println("所有通知任务已完成。");}
}

6.1.3 批量数据处理

当需要处理大量数据,并且每个数据的处理是独立的时,可以使用 CompletableFuture 将数据分成小批次并行处理,从而提高整体处理效率。

示例场景: 批量处理用户数据,对每个用户进行数据清洗和存储。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class BatchDataProcessing {public static CompletableFuture<String> processUserData(String user) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(200); // 模拟数据处理耗时System.out.println("处理用户数据: " + user + " 完成");return user.toUpperCase(); // 模拟数据清洗} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) {List<String> users = Arrays.asList("user1", "user2", "user3", "user4", "user5");long start = System.currentTimeMillis();List<CompletableFuture<String>> futures = users.stream().map(BatchDataProcessing::processUserData).collect(Collectors.toList());// 等待所有用户数据处理完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 获取所有处理结果List<String> processedUsers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println("\n所有用户数据处理完成,结果: " + processedUsers);long end = System.currentTimeMillis();System.out.println("总耗时: " + (end - start) + " ms");}
}

6.2 最佳实践

  • 合理选择线程池CompletableFuture 默认使用 ForkJoinPool.commonPool()。对于 CPU 密集型任务,默认线程池通常是合适的。但对于 I/O 密集型任务,建议自定义线程池,并根据实际情况调整线程数量,避免线程饥饿或资源浪费。例如,可以使用 Executors.newFixedThreadPool()ThreadPoolExecutor

    // 自定义线程池示例
    ExecutorService customExecutor = Executors.newFixedThreadPool(10);
    CompletableFuture.supplyAsync(() -> {// 耗时操作return "Result";
    }, customExecutor);
    
  • 避免过度使用 get()join():虽然 get()join() 可以获取 CompletableFuture 的结果,但它们是阻塞的。过度使用会导致异步优势丧失,甚至引入死锁。应尽量使用 thenApply()thenAccept()thenCompose() 等非阻塞的回调方法来构建异步链。

  • 善用异常处理机制exceptionally()handle()whenComplete() 提供了灵活的异常处理方式。根据业务需求选择合适的异常处理策略,确保异步任务的健壮性。exceptionally() 适用于从异常中恢复并提供替代结果的场景,handle() 适用于无论成功失败都需要统一处理的场景,而 whenComplete() 适用于执行一些副作用操作(如日志记录、资源清理)而不改变结果的场景。

  • 链式调用与组合的合理运用:充分利用 CompletableFuture 提供的链式调用和组合方法,将复杂的异步逻辑拆解成更小、更易管理的部分。这不仅使代码更具可读性,也更容易进行测试和维护。特别注意 thenApply()thenCompose() 的区别,避免不必要的嵌套。

  • 超时处理:对于可能长时间运行的异步任务,考虑添加超时机制,避免资源无限期占用。虽然 CompletableFuture 本身没有直接的超时方法,但可以通过 CompletableFuture.orTimeout() (Java 9+) 或结合 CompletableFuture.runAfter() 等方法实现。

    // Java 9+ 超时处理示例
    CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(5); // 模拟长时间任务return "Task Completed";} catch (InterruptedException e) {throw new IllegalStateException(e);}
    }).orTimeout(2, TimeUnit.SECONDS); // 2秒后超时try {System.out.println(futureWithTimeout.get());
    } catch (Exception e) {System.out.println("任务超时或异常: " + e.getMessage());
    }
    
  • 日志记录:在异步任务的关键节点添加日志,便于追踪任务执行状态和排查问题。可以使用 whenComplete()handle() 来记录任务的成功或失败。

7. 总结

CompletableFuture 是 Java 8 引入的异步编程利器,它通过提供非阻塞、可组合、灵活的异常处理机制,极大地提升了 Java 在并发编程领域的表现力。本文从 CompletableFuture 的基本概念、创建方式、链式操作、组合操作以及异常处理等方面进行了深入解析,并通过丰富的代码示例展示了其在实际应用中的强大功能。

掌握 CompletableFuture 的核心 API 和最佳实践,能够帮助 Java 后端开发者更好地应对高并发、低延迟的挑战,构建出高性能、高响应、易于维护的现代后端服务。

相关文章:

  • 阿里 Qwen3 模型更新,吉卜力风格get
  • 开疆智能CCLinkIE转ModbusTCP网关连接傲博机器人配置案例
  • 领域驱动设计(DDD)【23】之泛化:从概念到实践
  • 永磁同步电机无速度算法--基于增强型正交PLL的滑模观测器
  • MySQL之MVCC实现原理深度解析
  • 印度和澳洲的地理因素
  • 用鸿蒙打造真正的跨设备数据库:从零实现分布式存储
  • linux安装vscode
  • 求区间最大值
  • 从OCR瓶颈到结构化理解来有效提升RAG的效果
  • 趣味数据结构之——数组
  • spring07-JdbcTemplate操作数据库
  • JSON简介及其应用
  • Geollama 辅助笔记:raw_to_prompt_strings_geo.py
  • 编程江湖-左右互博术(多线程,多进程)
  • [Linux]信号入门
  • 【企业管理】利益分配
  • 《高等数学》(同济大学·第7版)第十章 重积分第三节三重积分
  • 科大讯飞2025AI开发者大赛-用户新增赛道时间规则解析
  • ARFoundation系列讲解 - 100 VisionPro 环境搭建