Java并发编程之CompletableFuture原理与实践
一、前言
在Java并发编程中,CompletableFuture是一个强大而灵活的工具。今天,我们将深入探讨它的实现原理、最佳实践,以及面试中的重要考点。
二、CompletableFuture的本质
CompletableFuture是Java 8引入的异步编程工具,它实现了Future和CompletionStage两个接口。这个设计让它既保持了Future的基本特性,又通过CompletionStage接口提供了强大的任务编排能力。
让我们先看一个基本示例:
public class CompletableFutureBasics {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模拟耗时操作sleep(1000);return "Hello";}).thenApply(result -> {// 转换结果return result + " World";});// 非阻塞方式处理结果future.thenAccept(System.out::println);}
}
三、核心方法
3.1 实例方法
public class Completable01 {public static void main(String[] args) throws Exception {// 线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 1、创建未完成的CompletableFuture,通过complete()方法完成CompletableFuture<Integer> cft01 = new CompletableFuture<>() ;cft01.complete(99) ;// 2、创建已经完成CompletableFuture,并且给定结果CompletableFuture<String> cft02 = CompletableFuture.completedFuture("given...value");// 3、有返回值,默认ForkJoinPool线程池CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {return "OK-3";});// 4、有返回值,采用Executor自定义线程池CompletableFuture<String> cft04 = CompletableFuture.supplyAsync(() -> {return "OK-4";},executor);// 5、无返回值,默认ForkJoinPool线程池CompletableFuture<Void> cft05 = CompletableFuture.runAsync(() -> {});// 6、无返回值,采用Executor自定义线程池CompletableFuture<Void> cft06 = CompletableFuture.runAsync(()-> {}, executor);}
}
3.2 计算方法
public class Completable02 {public static void main(String[] args) throws Exception {// 线程池ExecutorService executor = Executors.newFixedThreadPool(3);CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "OK";},executor);// 1、计算完成后,执行后续处理// cft01.whenComplete((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex));// 2、触发计算,如果没有完成,则get设定的值,如果已完成,则get任务返回值// boolean completeFlag = cft01.complete("given...value");// if (completeFlag){// System.out.println(cft01.get());// } else {// System.out.println(cft01.get());// }// 3、开启新CompletionStage,重新获取线程执行任务cft01.whenCompleteAsync((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex),executor);}
}
3.3 获取结果方法
public class Completable03 {public static void main(String[] args) throws Exception {// 线程池ExecutorService executor = Executors.newFixedThreadPool(3);CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "Res...OK";},executor);// 1、阻塞直到获取结果// System.out.println(cft01.get());// 2、设定超时的阻塞获取结果// System.out.println(cft01.get(4, TimeUnit.SECONDS));// 3、非阻塞获取结果,如果任务已经完成,则返回结果,如果任务未完成,返回给定的值// System.out.println(cft01.getNow("given...value"));// 4、get获取抛检查异常,join获取非检查异常System.out.println(cft01.join());}
}
3.4 任务编排方法
public class Completable04 {public static void main(String[] args) throws Exception {// 线程池ExecutorService executor = Executors.newFixedThreadPool(3);CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("OK-1");return "OK";},executor);// 1、cft01任务执行完成后,执行之后的任务,此处不关注cft01的结果// cft01.thenRun(() -> System.out.println("task...run")) ;// 2、cft01任务执行完成后,执行之后的任务,可以获取cft01的结果// cft01.thenAccept((res) -> {// System.out.println("cft01:"+res);// System.out.print