异步复习(线程)
文章目录
- 初始化线程的四种方法
- 疑惑点
- 线程池
- 线程池创建方式
- 为什么不推荐使用Executors提供方法创建(面试)
- 线程池七大参数
- 1. corePoolSize(核心线程数)
- 2. maximumPoolSize(最大线程数)
- 3. keepAliveTime(空闲线程存活时间)
- 4. unit(时间单位)
- 5.workQueue(任务队列)
- 6. threadFactory(线程工厂)
- 7. handler(拒绝策略)
- 线程池工作原理
- CompletableFuture 异步编排
- 简介
- 启动异步任务
- 创建异步对象 runAsync|supplyAsync
- 异步编排
- 线程结果感知和处理 whenCompleteAsync与exceptionally
- handle 线程结果感知和处理(推荐), handle 方法
- 线程串行化方法
- 1.thenRunAsync
- 2.thenAcceptAsync
- 3.thenApplyAsync
- 两任务组合 - 都要完成才处理新的任务
- 1.runAfterBothAsync,不获取结果并处理新任务
- 2.thenAcceptBothAsync,获取结果并处理新任务
- 3.thenCombineAsync,获取结果并获得新任务结果
- 两个任务 - 一个完成
- 1.runAfterEitherAsync,不获取结果, 新任务无返回值。
- 2.acceptEitherAsync,获取结果, 新任务无返回值。
- 3.applyToEitherAsync,获取结果, 新任务有返回值。
- 多任务组合
- allOf,等待所有任务完成
- anyOf,只要有一个任务完成
初始化线程的四种方法
1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
- 1)和 2)主进程无法获取线程的运算结果。不适合当前场景,且大量并发进来后,new Thread多的话会导致资源耗尽
- 3)主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。也可能导致服务器资源耗尽。
- 4)开发中最常用的线程使用方式,execute不获取运算结果,submit可以获取到线程的运算结果,且可以控制资源
1.继承Thread类
创建对象后可以直接start()让线程运行
package com.hansp.thread_;public class CPUNumber {public static void main(String[] args) {cat cat = new cat();cat.start();//启动线程,会自动调用run//具体看的曾分析吧}
}
//1.当一个类继承了Thread类,该类就可以当做线程使用
//2.重写run方法,协商自己的业务代码
//3.run Thread类实现了Runnable接口的run方法
/*Thread里面的run的源码
@Overridepublic void run() {if (target != null) {target.run();}}*/class cat extends Thread{@Overridepublic void run() {int times=0;while (true) {//该线程每隔一秒。在控制台输出“喵喵,我是小猫咪”System.out.println("喵喵,我是小猫咪"+(++times));//让该线程休眠一秒try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}if(times==80){//当输出80次时退出break;}}}
}
2.实现Runnable接口
需要new Thread(实现类对象) 然后调用thread.start()方法
package com.hansp.thread_;public class Demo01 {public static void main(String[] args) {A a = new A();//a.start();这里不能调用start,因为没有该方法Thread thread = new Thread(a);thread.start();//创建一个thread然后调用他的start要把我们实现接口的对象当做形参}
}
class A implements Runnable{//通过实现Runnable接口,开发线程int count=0;@Overridepublic void run() {while (true){System.out.println("小狗汪汪叫。。hi"+(++count)+"线程名称"+Thread.currentThread().getName());//休眠一秒try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}if(count==10)break;}}}
3.实现callable接口
新建一个FutureTask并将实现callable的对象传入,这个FutureTask实现了runnable接口,所以下面就和runnable的一样,创建一个线程传入FutureTask然后start()
实现callable的接口,主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。极大可能导致服务器资源耗尽。
public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());new Thread(futureTask).start();System.out.println(futureTask.get());}public static class Callable01 implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("当前线程:" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}}
4)
public class ThreadTest {public static ExecutorService executor = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {service.execute(new Runable01());//execute相当于是只是执行异步任务,获取不到返回结果Future<Integer> submit = service.submit(new Callable01());//submit可以获取到任务的返回结果submit.get();//估计这个也会阻塞吧}
}
疑惑点
首先就是只有所有线程都运行完,进程才会结束,只要你开辟了线程且start了就会运行完run方法
我现在的疑问是main线程
会不会等待新开辟出来的线程执行完后再一起结束
(就是开辟线程的线程会不会等待被开辟的线程)
结论:默认开辟出来的线程没有依赖关系,不会等待
这里的话是1和2是不会等待的,3中main会等待新线程执行完获取返回结果再结束,因为futureTask.get()
相当于阻塞了main线程
| ⏳ 等待机制 | FutureTask
内部使用LockSupport.park()
实现线程阻塞/唤醒 |
核心对比总结
特性 | Runnable 示例 | FutureTask 示例 |
---|---|---|
线程启动方式 | new Thread(runnable).start() | new Thread(futureTask).start() |
结果获取 | 无(run() 返回void ) | 通过futureTask.get() 获取返回值 |
main线程阻塞 | ❌ 不阻塞 | ✅ 显式阻塞等待 |
同步机制 | 无 | 内置锁+阻塞队列 |
适用场景 | 异步执行无需结果的任务 | 需要获取异步任务结果的场景 |
💡 关键结论:
- 默认情况下线程间没有依赖关系,
main
线程不会主动等待子线程FutureTask.get()
是显式的阻塞调用,会强制当前线程等待任务完成- 若需协调线程执行顺序,必须使用
join()
/get()
/锁等同步机制
线程池
线程池创建方式
- 1.使用Executors提供的方法创建(但工作中是不会用这种方式创建的)
下面是几种默认的方法
-
- 2.new ThreadPoolExecutor(七个参数) 一般都是通过new ThreadPoolExecutor 创建的
下一小节我们将讲解线程池中的七个参数
为什么不推荐使用Executors提供方法创建(面试)
这其实是个面试题哈,有比下图讲解更细的,可以去搜一下啊
线程池七大参数
1. corePoolSize(核心线程数)
定义:线程池中长期保持存活的线程数量
特点:
- 即使线程空闲也不会被回收(除非设置
allowCoreThreadTimeOut=true
) - 当任务数量 ≤ 核心线程数时,线程池会创建新线程处理任务
设置建议:
- CPU密集型任务:
CPU核数 + 1
- IO密集型任务:
CPU核数 × 2
2. maximumPoolSize(最大线程数)
定义:线程池允许创建的最大线程数量
特点:
- 当任务队列已满且当前线程数 < 最大线程数时,会创建新线程
- 当线程数达到最大值且队列已满时,触发拒绝策略
设置建议:根据系统资源设置上限,避免资源耗尽
3. keepAliveTime(空闲线程存活时间)
定义:非核心线程空闲时的存活时间
特点:
- 当线程空闲时间超过该值时,线程会被回收
- 只对超出核心线程数的线程有效
- 单位:配合
unit
参数使用(秒/毫秒等)
设置建议:30-60秒(平衡资源释放和线程创建开销)
4. unit(时间单位)
定义:keepAliveTime
的时间单位
可选值:
TimeUnit.NANOSECONDS // 纳秒
TimeUnit.MICROSECONDS // 微秒
TimeUnit.MILLISECONDS // 毫秒(最常用)
TimeUnit.SECONDS // 秒
TimeUnit.MINUTES // 分钟
TimeUnit.HOURS // 小时
TimeUnit.DAYS // 天
5.workQueue(任务队列)
定义:存储等待执行任务的阻塞队列
常用实现:
队列类型 | 特点 | 适用场景 |
---|---|---|
ArrayBlockingQueue | 有界队列(固定容量) | 流量可控场景 |
LinkedBlockingQueue | 无界队列(默认Integer.MAX_VALUE) | 不推荐(可能OOM) |
SynchronousQueue | 不存储元素的队列 | 要求快速响应的场景 |
PriorityBlockingQueue | 优先级队列 | 任务有优先级区别 |
DelayedWorkQueue | 延迟队列 | 定时任务场景 |
6. threadFactory(线程工厂)
- 定义:用于创建新线程的工厂
- 作用:
- 统一设置线程属性(名称、优先级、守护状态等)
- 便于监控和问题排查
- 自定义示例:
public class NamedThreadFactory implements ThreadFactory {private final String namePrefix;private final AtomicInteger counter = new AtomicInteger(1);public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix + "-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, namePrefix + counter.getAndIncrement());t.setDaemon(false);t.setPriority(Thread.NORM_PRIORITY);return t;}
}
7. handler(拒绝策略)
触发条件: 当线程池已关闭,或线程数已达最大值且队列已满
内置策略:
策略 | 行为 | 适用场景 |
---|---|---|
AbortPolicy(默认) | 抛出 RejectedExecutionException | 需要感知任务被拒绝 |
CallerRunsPolicy | 由调用者线程执行任务 | 保证任务不被丢弃 |
DiscardPolicy | 静默丢弃被拒绝的任务 | 可容忍任务丢失 |
DiscardOldestPolicy | 丢弃队列最老的任务并重试 | 可容忍旧任务丢失 |
线程池工作原理
文字描述:
-
在创建了线程池后,开始等待请求。
-
当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
- 如果正在运行的线程数量小于corePoolSize,那么马上创建核心线程运行这个任务;
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
- 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
-
当一个线程完成任务时,它会从队列中取下一个任务来执行。
-
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
CompletableFuture 异步编排
简介
CompletableFuture提供了非常强大的Future接口的扩展功能, 可以简化异步编程。
提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口, 所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果, 但是这种方式不推荐使用。
CompletableFuture 和 FutureTask ( 构造参数为Callable实现类)同属于 Future 接口的实现类, 都可以获取线程的执行结果。
业务逻辑本身其实他们之间可能是有关系的
,而不是有一个任务就开辟一个线程去做,比如说下面列表的第四五六步,是依赖于第一步查出的信息查询查spu的销售属性,这三个方法都是要传入sku信息的,所以说第四五六步依赖于第一步的返回结果。所以需要进行异步编排
示例,使用异步可以缩短响应时间:
开辟多个线程去查询可以更快,比如说下面这个业务逻辑
不使用异步需要等sum也就是6.5s,如果全是异步的话等待max即可也就是1.5s(实际上有自己的编排就是0.5+1.5 第五步依靠第一步返回结果)
但是
// 1.获取sku的基本信息0.5s
// 2.获取sku的图片信息0.5s// 3.获取sku的促销信息1s
// 4.获取spu的所有销售属性 1s
// 5.获取规格参数组及组下的规格参数1.5s
// 6.spu详情1s
注意,下面的内容关于基本是就是融会贯通的
很多都是通用的 比如带Async的方法就将新任务交给线程池(指定或者默认)不带的话就是上一个线程继续执行
也分获取上一步任务的返回值和不获取上一步任务的返回值
也分自己反回值是否反回
启动异步任务
创建异步对象 runAsync|supplyAsync
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
//无线程返回值,指定线程池(传入runnable无返回值)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//有线程返回值,指定线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
1、 runAsync 都是没有线程返回结果的, supplyAsync 都是可以获取线程返回结果的
2、 可以传入自定义的线程池, 否则就用默认的线程池;
3、Async代表异步方法
runAsync,不带线程返回值
public class ThreadTest {// ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>( 100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println("当前线程:"+Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果...."+i);}, executor);}
}
supplyAsync 带线程返回值
通过supplyAsync 方法的返回值.get()可以获取到返回值
public class ThreadTest {// ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>( 100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i; //有返回值}, executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}
异步编排
线程结果感知和处理 whenCompleteAsync与exceptionally
线程完成返回结果后当前线程执行action
public CompletableFuture whencomplete(BiConsumer<? super T,? super Throwable> action);
线程完成返回结果后将action交给默认线程池执行呢
public CompletableFuture whenCompleteAsync(BiConsumer <? super T,? super Throwable> action);
线程完成返回结果后将action交给 指定 线程池执行呢
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor);
线程出现异常时候执行
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
- whenComplete 和 whenCompleteAsync 的区别:
- whenComplete: 是执行当前任务的线程执行继续执行 whenComplete 的任务。
- whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池
来进行执行。
即方法不以 Async 结尾, 意味着 Action 使用相同的线程执行, 而 Async 可能会使用其他线程执行(如果是使用相同的线程池, 也可能会被同一个线程选中执行)
实例
public class ThreadTest {// ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>( 100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;//虽然能得到异常信息,但无法修改返回结果}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);//可以感知异常,并返回自定义默认值(!!!)},executor).exceptionally(throwable -> {return 0;});Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}
这段代码有无就是关于异常处理的情况exceptionally,能否返回默认值
如果没有使用exceptionally
如果使用exceptionally
handle 线程结果感知和处理(推荐), handle 方法
当前线程继续处理
public <U> completionstage<U> handle(BiFunction<? super T, Throwable,? extends u> fn);
交给默认线程池
public <U> completionstage<U> handleAsync(BiFunction<? super T,Throwable,?extends U>fn);
交给指定线程池
public <U> Completionstage<U> handleAsync(BiFunction<? super T, Throwable,?extends U>
fn,Executor executor);
和 complete 一样
不同的是这个可对结果做最后的处理(可处理异常),可改变返回值(上一种方法只能在有异常的时候返回默认值)
,更加灵活,所以推荐用这种方式
这样是不可以的
总结:使用R apply(T t, U u); 可以感知异常,和修改返回值的功能。
举例
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 6;System.out.println("运行结果...." + i);return i;}, executor).handleAsync((res, throwable) -> {if (res!=null){return res*2;}if (throwable!=null){System.out.println("出现异常"+throwable.getMessage());return -1;}return 0;},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
总结:
一般用handle,因为whencomplete如果异常不能给定默认返回结果,需要再调用exceptionally,而handle可以
该方法作用:获得前一任务的返回值【自己也可以是异步执行的】,也可以处理上一任务的异常,调用exceptionally修改前一任务的返回值【例如异常情况时给一个默认返回值】而handle方法可以简化操作
线程串行化方法
- thenRun:继续执行,不接受上一个任务的返回结果,自己执行完没有返回结果
- thenAccept:继续执行,接受上一个任务的返回结果,自己执行完没有返回结果
- thenApply:继续执行,接受上一任务的返回结果,并且自己的返回结果也被下一个任务所感知
- 以上都要前置任务成功完成。
Function<? super T,? extends U>
T: 上一个任务返回结果的类型
U: 当前任务的返回值类型
1.thenRunAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> runAsync= CompletableFuture.runAsync(() -> {System.out.println("当前线程:"+Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果...."+i);}, executor).thenRunAsync(() -> {System.out.println("任务二启动了...");},executor);System.out.println("返回数据:");}
2.thenAcceptAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> supplyAsync= CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:"+Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果...."+i);return i;}, executor).thenAcceptAsync(res -> {System.out.println("任务二启动了..."+"拿到了上一步的结果:"+res);},executor);System.out.println("返回数据:");}
3.thenApplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果...." + i);return i;}, executor).thenApplyAsync(res -> {System.out.println("任务二启动了..." + "拿到了上一步的结果:" + res);return res*2;}, executor);Integer integer = future.get();System.out.println("返回数据:"+integer);}
两任务组合 - 都要完成才处理新的任务
runAfterBoth:组合两个future,不需要获取之前任务future的结果,只需两个future处理完任务后,处理该任务。
thenAcceptBoth:组合两个future,获取前两个future任务的返回结果,然后处理任务,没有返回值。
thenCombine:组合两个future,获取前两个future的返回结果,并返回当前任务的返回值
public <U,V> CompletableFuture
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture thenCombineAsync
(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);public CompletableFuture thenAcceptBoth
(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);public CompletableFuture thenAcceptBothAsync
(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);public CompletableFuture thenAcceptBothAsync
(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
1.runAfterBothAsync,不获取结果并处理新任务
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());System.out.println("任务二运行结束....");return "hello";}, executor);future01.runAfterBothAsync(future02,() -> {System.out.println("任务三开始...");});System.out.println("返回数据:");}
2.thenAcceptBothAsync,获取结果并处理新任务
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());System.out.println("任务二运行结束....");return "hello";}, executor);future01.thenAcceptBothAsync(future02,(res1, res2) -> {System.out.println("任务一返回值:"+res1+"任务二返回值:"+res2);});System.out.println("返回数据:");}
3.thenCombineAsync,获取结果并获得新任务结果
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<String> future = future01.thenCombineAsync(future02, (res1, res2) -> {System.out.println("任务一返回值:" + res1 + "任务二返回值:" + res2);return res1 + (String) res2;}, executor);System.out.println("返回数据:"+future.get());}
两个任务 - 一个完成
runAfterEither: 两个任务有一个执行完成, 不需要获取 future 的结果, 处理任务, 也没有返回值。
acceptEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务, 没有新的返回值。
applyToEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务并有新的返回值。
1.runAfterEitherAsync,不获取结果, 新任务无返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);future01.runAfterEitherAsync(future02,() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());},executor);System.out.println("返回数据:");}
2.acceptEitherAsync,获取结果, 新任务无返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);future02.acceptEitherAsync(future01,res ->{System.out.println("任务三线程开始:" + Thread.currentThread().getName()+"拿到上次任务的结果:"+res);},executor);System.out.println("返回数据:");}
3.applyToEitherAsync,获取结果, 新任务有返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<String> future = future02.applyToEitherAsync(future01, res -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName() + "拿到上次任务的结果:" + res);return res + "t3";}, executor);System.out.println("返回数据:"+future.get());}
多任务组合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
}//anyOf: 只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);
}
allOf,等待所有任务完成
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务三运行结束....");return "hello2";}, executor);CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);allOf.get();//等待所有任务完成System.out.println("返回数据:");}
anyOf,只要有一个任务完成
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务三运行结束....");return "hello2";}, executor);CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);anyOf.get();//等待其中之一任务完成System.out.println("返回数据:");}