当前位置: 首页 > news >正文

Java并发7--FutrureTask 及CompletetableFuture

文章目录

  • 一、Runnable 与 Callable 接口
  • 二、FutureTask
    • 1、Future 接口和 FutureTask 结构
      • 1.1 继承关系
      • 1.2 执行状态及其转换
      • 1.3 整体成员结构
    • 2、FutureTask 执行流程及等待原理
      • 2.1 Runnable 接口
      • 2.2 Future 接口
      • 2.2.1 get():获取执行结果的方法
      • 2.2.2 cancel() 取消任务
      • 2.2.3 isxxx():辅助方法
  • 三、CompletableFuture
    • 1、CompletableFuture概述
    • 2、创建方法
      • 2.1 创建对象方式创建任务
      • 2.2 静态方法方式创建任务
      • 2.3 成员方法方式创建任务
    • 3、结果获取和回调
    • 4、任务时序关系
      • 4.1 串行
      • 4.2 并行
      • 4.3 汇聚

一、Runnable 与 Callable 接口

在 Java 中,一般开启多线程的方式都是使用创建 Thread 对象的方法

new Thread(@Overridepublic void run() {// 这里写要执行的任务}
).start;

但是这种方法显而易见的存在一致的缺陷:创建对象时就将线程执行者与要执行的任何绑定在一块儿,使用起来不怎么灵活。

所以可以通过实现 Runnable 的实现类创建出多线程任务对象,也可以直接通过创建 Runnable 匿名内部类对象达到同样的效果

package com.example.bootrocketmq.mythread;public class RunableTest implements Runnable {/*** desc ->  实现同步方法,可以在这里实现要同步的任务*/@Overridepublic void run() {System.out.println("RunableTest, 在这里实现同步逻辑");try {// 模拟同步逻辑Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}/*** desc ->  定义主方法,创建两个线程,分别执行RunableTest的run方法*/public static void main(String[] args) {Runnable runnable = new RunableTest();Thread thread1 = new Thread(runnable);thread1.start();Thread thread2 = new Thread(runnable);thread2.start();}
}

总的来说,上面的两种方式都将执行者 线程实体 Thread对象 和任务 Runnable 对象分开

在实际过程中,可以选择多条线程同时执行一个 task 任务,这种方式会使得多线程编程更加灵活

但是还有一个很大的问题 ,就是 在处理多线程任务的时候,很多情况下都要返回值,但是Runnable 的 run() 方法总是 void 的

所以在需要返回值的时候,可以使用 Callable 接口的 call() 方法

// Runnable接口 -> run()
@FunctionalInterface
public interface Runnable {public abstract void run();
}// Callable接口 -> call()
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

但是需要注意的是,虽然 Thread 提供了很多的构造器方法,但是没有一个方法是可以接收 Callable 对象的

public Thread()
public Thread(Runnable target)
public Thread(Runnable target, AccessControlContext acc)
public Thread(ThreadGroup group, Runnable target)
public Thread(String name)
public Thread(ThreadGroup group, String name)
public Thread(Runnable target, String name)
public Thread(ThreadGroup group, Runnable target, String name)
public Thread(ThreadGroup group, Runnable target, String name,long stackSize)

那么在使用 Callable 时到底是怎么交给线程执行的呢?

肯定需要依赖于别的东西才能交由线程执行,这时候就要用到 FutureTask 了:
在这里插入图片描述

package com.example.bootrocketmq.mythread;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class CallableTask implements Callable<String> {/*** desc ->  实现Callable接口的call方法,Callable弥补了Runnable接口无法返回结果的问题,可以在这里指定泛型T作为返回结果*/@Overridepublic String call() throws Exception {return "hello call";}public static void main(String[] args) throws ExecutionException, InterruptedException {CallableTask task = new CallableTask();// FutureTask构造器中可以接收一个Callable类型的对象// 而FutureTask实现了Runnable,Future两个接口// 所以当我们创建了一个Callable类型的任务时,可以先封装成一个FutureTask对象,再将封装好的FutureTask传递给线程执行即可。FutureTask<String> futureTask = new FutureTask<>(task);Thread thread1 = new Thread(futureTask, "thread1");thread1.start();System.out.println("ans is: " + futureTask.get());}
}

二、FutureTask

1、Future 接口和 FutureTask 结构

1.1 继承关系

Callable 任务依赖 FutureTask 类执行,而 FutureTask 类实现了 RunnableFuture 接口

RunnableFuture 接口实则没有提供新的方法,只是简单的继承整合了 Runnable,Future 两个接口,所以大体类图关系如下:
在这里插入图片描述

1.2 执行状态及其转换

FutureTask 中,与 AQS 一样,存在一个用 volatile 关键字修饰的 int 变量 stateFutureTask 通过它对任务的执行状态进行管理

  • NEW:初始化状态,任务刚被创建出来处于的状态
  • COMPLETING:终止中间状态,任务从NEW变为NORMAL/EXCEPTIONAL时会经历的短暂状态
  • NORMAL :正常终止状态,任务正常执行完成后处于的状态
  • EXCEPTIONAL:异常终止状态,任务执行过程中抛出了异常后处于的状态
  • CANCELLED:取消状态,任务被成功取消后处于的状态
  • INTERRUPTING:中断中间状态,任务还未执行或在执行中时调用cancel(true)处于的中间状态
  • INTERRUPTED:中断最终状态,执行任务被取消且执行线程被中断后处于的状态
    在这里插入图片描述

当我们创建一个FutureTask时,FutureTask对象的state一定是处于NEW新建状态的

因为在FutureTask的构造方法中会执行this.state = NEW;操作。
在这里插入图片描述
FutureTask的状态转换是不可逆的,并且同时只要状态不处于NEW初始化状态,那么就可以认为该任务已经结束

例如FutureTask判断任务是否执行结束的isDone()方法,这个方法认为你只要不是New状态就返回true,就认为任务结束了已经

public boolean isDone() {return state != NEW;
}

1.3 整体成员结构

FutureTask 中存在两类线程:

  • 执行者:执行异步任务的线程,只存在一条线程
  • 等待者:等待获取执行结果的线程,可能存在多条线程
// 任务的执行状态
private volatile int state;
// 异步任务:Callable对象
private Callable<V> callable;
// 任务执行结果(因为是Object类型,所以异常也是可以保存的)
private Object outcome;
// 执行者线程
private volatile Thread runner;
// 等待者线程:由WaitNode内部类构成的链表
private volatile WaitNode waiters;// 静态内部类:WaitNode
static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }
}

Future接口的get()方法作用是获取异步执行之后的结果,若执行还未完成,获取执行结果的等待者则会阻塞。

那么FutureTask作为Future接口的实现者,自然也对该方法进行了实现。

因为FutureTask内部在逻辑上存在一个由WaitNode节点组成的单向链表,当一条线程尝试获取执行结果但是还未执行结束时,FutureTask则会每个等待者封装成一个WaitNode节点,并将其加入该链表中,直至执行者的任务执行完成后,再唤醒链表的每个节点中的线程。

而因为FutureTask内部的链表因为仅仅只是逻辑链表的原因,所以FutureTask本身只会将链表的head节点存储在成员变量waiters

其余的节点通过WaitNode中的next后继指针连接。
在这里插入图片描述

2、FutureTask 执行流程及等待原理

因为FutureTask是实现了RunnableFuture接口的,而RunnableFuture接口继承了Runnable,Future两个接口,所以要从这两个接口开始分析

2.1 Runnable 接口

核心流程如下:
在这里插入图片描述
Runnable接口中仅提供了一个run()方法,所以要看下FutureTask中的run()实现:

// FutureTask类 → run()方法
public void run() {// ①判断state是否为NEW,如果不是代表任务已经执行过或被取消// ②判断执行者位置上是否有线程,有则代表着当前任务正在执行// 如果state不为NEW或者执行者不为空都会直接终止当前线程执行if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))return;// 如果state==NEW且runner==currentThread,接着继续往下执行try {// 获取需要执行的异步任务Callable<V> c = callable;// 检查任务是否为空并再次检查state是否处于初始化状态if (c != null && state == NEW) {// 接收结果V result; // 接收终止状态:true为正常结束,false则为异常结束boolean ran; try {// 调用call()执行任务并获取执行结果result = c.call();// 终止状态改为正常结束ran = true;} catch (Throwable ex) {// 返回结果置为空result = null;// 终止状态改为异常结束ran = false;// CAS-将捕获的异常设置给outcome全局变量setException(ex);}// 如果执行状态为正常结束if (ran)// CAS-将执行结果设置给outcome全局变量set(result);}} finally {// 将执行者线程的引用置为null, 到时候GC用runner = null;// 检查state是否为INTERRUPTING、INTERRUPTED中断状态int s = state;if (s >= INTERRUPTING)// 如果是则响应线程中断handlePossibleCancellationInterrupt(s);}
}// FutureTask类 → handlePossibleCancellationInterrupt()方法
private void handlePossibleCancellationInterrupt(int s) {// 1.如果state==INTERRUPTING中断中间状态if (s == INTERRUPTING)// 3.如果线程再次从就绪状态获取到cpu资源回到执行状态//   循环调用yield()方法让当前线程持续处于就绪状态//   直至线程被中断且state==INTERRUPTED为止while (state == INTERRUPTING)// 2.调用yield()让当前线程让出cpu资源退出执行状态//   回到就绪状态方便响应线程中断Thread.yield(); 
}

简单来说,在FutureTask.run()方法中主要做了如下四步:

  1. 判断任务执行状态,如果正在执行或被执行过则直接return,反之则继续执行任务
  2. 如果任务执行过程中出现异常,则调用setException()写出捕获的异常信息
  3. 如果任务执行成功后,获取执行返回值并调用set()写出任务执行完成后的返回值
  4. 任务执行结束时,判断任务状态是否需要中断,需要则调用handlePossibleCancellationInterrupt()进行中断处理

setException()的处理

// FutureTask类 → setException()方法
protected void setException(Throwable t) {// 利用CAS机制将state改为COMPLETING中间状态if (UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)) {// 将捕获的异常写出给outcome成员outcome = t;// 再次利用CAS修改state为EXCEPTIONAL异常终止状态UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL); // 最终态// 唤醒等待队列中的等待者线程finishCompletion();}
}
  1. 先使用CAS操作将state修改为COMPLETING中间状态
  2. 将捕获的异常写出给outcome成员
  3. 写出捕获的异常后再次使用CAS将state改为EXCEPTIONAL异常终止状态
  4. 调用finishCompletion()方法唤醒等待列表中的等待者线程

set()处理

// FutureTask类 → set()方法
protected void set(V v) {// 利用CAS机制修改state为COMPLETING中间状态if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 将执行结果写出给outcome成员outcome = v;// 再次利用CAS修改state为NORMAL正常终止状态UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终态// 唤醒等待队列中的等待者线程finishCompletion();}
}// FutureTask类 → finishCompletion()方法
// 通过成员变量waiters中保存的head头节点与next后继节点指针,遍历唤醒整个逻辑链表所有节点中的等待者线程
private void finishCompletion() {// 该方法调用前state一定要为最终态// 获取waiters中保存的head节点,根据head遍历整个逻辑链表for (WaitNode q; (q = waiters) != null;) {// 利用cas操作将原本的head节点置nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 获取q节点的线程Thread t = q.thread;if (t != null) {q.thread = null; // 置空线程引用信息LockSupport.unpark(t);// 唤醒节点中的线程}// 获取链表中的下一个节点WaitNode next = q.next;// 如果下一个节点为空则代表着已经是链表尾部了if (next == null)// 那么则终止循环break;// 置空当前节点的后继节点引用信息方便GCq.next = null; // 将获取到的后继节点赋值给qq = next;}// 当遍历完成整个链表后退出循环break;}}// done()方法没有具体实现,留给使用者自身用于拓展的// 可以根据需求让执行者线程在执行完毕前多做一点善后工作done();callable = null;        // to reduce footprint
}
  1. 先使用CAS操作将state修改为COMPLETING中间状态
  2. 将任务正常执行结束的返回值写出给outcome成员
  3. 写出后再次使用CAS将state改为NORMAL正常终止状态
  4. 调用finishCompletion()方法唤醒等待列表中的等待者线程

2.2 Future 接口

在Future接口中有两类【获取执行结果和取消任务】,共五个方法声明:

// Future接口
public interface Future<V> {// ============== 1:取消任务的方法 ===================// 尝试取消Callable任务,取消成功返回true,反之falseboolean cancel(boolean mayInterruptIfRunning);// ------------- 两个辅助方法 ------------------// 判断Callable任务是否被取消boolean isCancelled();// 判断call()是否执行结束,结束返回true,反之false// 返回true的情况一共有三种:正常执行完成返回结果;执行过程中抛出异常中断了执行;Callable任务被取消导致执行结束boolean isDone();// =============== 2:获取执行结果的get方法// 获取call()执行结果,执行完成则返回,未完成则阻塞至完成为止V get() throws InterruptedException, ExecutionException;// get()方法的升级版,如果未完成会阻塞直至执行完毕或超时V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

2.2.1 get():获取执行结果的方法

在这里插入图片描述

// FutureTask类 → get()方法
public V get() throws InterruptedException, ExecutionException {// 获取任务状态int s = state;// 如果任务状态不大于终止中间状态则阻塞线程if (s <= COMPLETING)s = awaitDone(false, 0L);// 如果state大于终止中间状态则代表是最终态了,则返回执行结果return report(s);
}// FutureTask类 → 超时版get()方法
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (unit == null) {// 时间戳设置为null时,将触发空指针throw new NullPointerException();}int s = state;// 如果任务还没执行完成,那么调用awaitDone传递给定的时间阻塞线程// 如果时间到了状态还是不大于COMPLETING则代表任务还未执行结束// 那么抛出TimeoutException强制中断当前方法执行,退出等待状态if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();// 如果任务执行完成或在给定时间内执行完成则返回执行结果return report();
}

awaitDown() & report()

// FutureTask类 → awaitDone()方法
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 计算等待的截止时间final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {// 如果出现线程中断信息则移除等待链表节点信息if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 再次获取任务状态,如果大于COMPLETING// 代表任务已经执行结束,直接返回最新的state值int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}// 如果state==中间状态代表任务已经快执行完成了// 那么则让当前线程让出cpu资源进入就绪状态稍微等待else if (s == COMPLETING) // cannot time out yetThread.yield();// 如果任务还在执行或还未执行则构建waitnode节点else if (q == null)q = new WaitNode();// 利用cas机制将构建好的waitnode节点加入逻辑链表// 注意:该链表是栈的结构,所以并不是将新的节点// 变为之前节点的next,而是新节点变为head节点// 旧节点变为next后继节点(这样方便维护逻辑链表结构)else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 如果是超时等待的模式,则先判断时间有没有超时// 如果已经超时则删除对应节点并返回最新的state值else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}// 如果还没超时则将线程挂起阻塞指定时间LockSupport.parkNanos(this, nanos);}else// 如果不是超时版本则直接挂起阻塞线程LockSupport.park(this);}
}// FutureTask类 → report()方法
private V report(int s) throws ExecutionException {// 获取成员变量outcome的值Object x = outcome;// 如果state为正常终止状态则返回执行结果if (s == NORMAL)return (V)x;// 如果任务被取消或线程被中断则抛出CancellationException异常if (s >= CANCELLED)throw new CancellationException();// 如果state为异常终止状态则抛出捕获的异常信息throw new ExecutionException((Throwable)x);
}

2.2.2 cancel() 取消任务

// FutureTask类 → cancel()方法
// 入参为true代表中断执行者线程执行
// 入参为false代表取消任务执行
public boolean cancel(boolean mayInterruptIfRunning) {// 如果state!=NEW代表任务已经执行结束,直接返回false// 如果入参为false则CAS修改state=CANCELLED,返回true即可if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;// 因为call()方法执行过程中可能会抛出异常// 所以需要try finally语句块保证等待者线程唤醒try {// 如果入参为true,则中断执行者线程if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)// interrupt()不能保证一定中断执行线程// 因为强制中断线程不安全,所以java弃用了stop()方法// 而是换成了协调式中断,线程调用interrupt()后// 只会发出中断信号,由被中断线程决定响不响应中断操作t.interrupt();} finally { // final state// 中断后利用CAS修改state=INTERRUPTED最终态UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 不管是取消任务还是中断执行,完成后都会唤醒等待链表的所有线程finishCompletion();}return true;
}

当某条调用 FutureTask.cancel() 方法后,会根据传入的 boolean 值决定是进行中断执行者线程操作还是取消任务执行,具体执行逻辑如下:

  • mayInterruptIfRunning传入true的情况下:
    • 判断任务是否已经执行结束,是则直接返回false,反之则利用cas机制将state=INTERRUPTING中断中间状态
    • 获取执行者线程runner成员,调用执行者的interrupt()方法向执行者线程发出一个中断信号
    • 利用CAS机制将state=INTERRUPTED中断最终态
    • 调用finishCompletion()方法唤醒链表中的所有等待者线程
  • mayInterruptIfRunning传入false的情况下:
    • 判断任务是否已经执行结束,是则直接返回false
    • 如果任务还未执行或还在执行则利用cas机制将state=CANCELLED取消状态

2.2.3 isxxx():辅助方法

// FutureTask类 → isCancelled()方法
public boolean isCancelled() {// 如果state>=CANCELLED都代表状态不是执行终止状态// 那么则代表着肯定是中断或者取消状态,所以:// state>=CANCELLED代表任务被取消,反之则是没有return state >= CANCELLED;
}// FutureTask类 → isDone()方法
public boolean isDone() {// 如果state!=NEW初始化状态则代表任务执行结束// 因为就算是中间状态也会很快变为最终态return state != NEW;
}

三、CompletableFuture

1、CompletableFuture概述

在FutureTask中,如果想要获取到多线程执行的结果,有两种办法:

  • 一种是轮询FutureTask.isDone()方法,当结果为true的时候获取执行结果
  • 第二种则是调用FutureTask.get()方法。

但是无论那种方式都无法实现真正意义上的异步回调,因为任务执行需要时间,所以都会使得主线程被迫阻塞

等待执行结果返回后才能接着往下执行,最多只能在一定程度上减少等待方面开销的时间

public class Test10 {public static void main(String[] args) throws Exception {// 创建一个futureTask任务FutureTask<String> futureTask = new FutureTask<>(() ->Thread.currentThread().getName() + "-hello future task......");// 在线程t1中执行new Thread(futureTask, "T1").start();// todo: 可以在这里完成别的工作,因为任务执行需要时间System.out.println("main线程获取异步执行结果: " + futureTask.get());}
}

这种方式可以在一定程度上利用异步任务执行的时间来完成别的工作,但是总归来说与“异步获取执行结果”这个设计的初衷还是存在出入。

而CompletableFuture的出现则可以实现真正意义上的实现异步,不会在使用时因为任务还没执行完成导致获取执行结果的线程也被迫阻塞

CompletableFuture将处理执行结果的过程也放到异步线程里去完成,采用回调函数的概念解决问题。

CompletableFuture是JDK8支持函数式编程后引入的一个类,实现了Future与CompletionStage接口
在这里插入图片描述
用CompletionStage接口中提供的方法去支持任务完成时触发的函数和操作:then,when等操作来防止FutureTask的get阻塞和轮询isDone的现象出现。

2、创建方法

CompletableFuture中创建一个异步任务的方式总归有三种:

  • 与之前的FutureTask一样的使用方式,通过new对象完成创建
  • 通过CompletableFuture提供的静态方法完成创建
  • 通过CompletableFuture提供的成员方法完成创建

2.1 创建对象方式创建任务

public class Test10 {public static void main(String[] args) throws Exception {CompletableFuture<String> completableFuture = new CompletableFuture<>();new Thread(() -> {System.out.println("异步任务");// 执行完成后可以往CompletableFuture对象里面写出返回值completableFuture.complete(Thread.currentThread().getName());}).start();System.out.println("main线程执行获取结果: " + completableFuture.get());for (int i = 0; i < 3; i++) {System.out.println("main线程输出" + i);}}
}

创建一条线程执行异步操作,执行完成后往completableFuture对象中写入需要返回的值

而主线程则调用completableFuture.get()方法获取异步线程写回的值

这种与之前的FutureTask没任何区别,在获取到执行结果之前,因为任务还在执行,所以主线程会被迫阻塞,等待任务执行结束后才能继续往下执行

2.2 静态方法方式创建任务

CompletableFuture类提供了五个静态方法可以完成创建异步任务的操作,如下:

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
public static <U> CompletableFuture<U> completedFuture(U value);
  • run开头的代表创建一个没有返回值的异步任务
  • supply开头的方法代表创建一个具备返回值的异步任务

同时这两类方法都支持指定执行线程池,如果不指定执行线程池,会默认使用ForkJoinPool.commonPool()线程池内的线程执行创建出的异步任务。

package com.cui.commonboot.myjuc;import java.util.concurrent.CompletableFuture;/*** <p>* 功能描述:* </p>** @author cui haida* @date 2023/12/30/20:42*/
public class SupplyAsyncTest {// 求和100内的偶数private static String getEvenSum() {try {Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}int sum = 0;for (int i = 0; i <= 100; i++) {if ((i & 1) == 0) {sum += i;}}return Thread.currentThread().getName() + "线程 - 100内偶数之和:" + sum;}public static void main(String[] args) throws InterruptedException {// 创建有返回值的异步任务 ::为方法引用的语法CompletableFuture<String> supplyCF =CompletableFuture.supplyAsync(SupplyAsyncTest::getEvenSum);// 执行成功的回调supplyCF.thenAccept(System.out::println); // 等着执行完成自然会打印,实现了真正的异步// 执行过程中出现异常的回调supplyCF.exceptionally((e)->{e.printStackTrace();return "异步任务执行过程中出现异常....";});// 主线程执行打印1234...操作// 因为如果不为CompletableFuture指定线程池执行任务的情况下,// CompletableFuture默认是使用ForkJoinPool.commonPool()的线程// 同时是作为main线程的守护线程进行的,如果main挂了,执行异步任// 务的线程也会随之终止结束,并不会继续执行异步任务for (int i = 1; i <= 10; i++){System.out.println("main线程 - 输出:"+i);Thread.sleep(50);}}
}

当main线程创建好异步任务以及相关后续处理后,其实并没有阻塞等待任务的完成,而是继续执行接下来的逻辑

当任务执行结束时则会触发提前定义好的回调函数,返回任务执行结果(执行出现异常则会将捕获的异常信息返回给exceptionally回调函数)。

显而易见,CompletableFuture任务对比之前的FutureTask任务,在执行上以及执行结果返回上实现了真正意义上的“异步”。

四种方法统一实例

package com.cui.commonboot.myjuc;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** <p>* 功能描述:CompletableFuture* </p>** @author cui haida* @date 2023/12/30/20:53*/
public class AllStaticMethodTest {public static String getEvenSum() {int sum = 0;for (int i = 0; i <= 100; i++) {if ((i & 1) == 0) {sum += i;}}return "thread is: " + Thread.currentThread() + " ans is: " + sum;}public static String getOddSum() {int sum = 0;for (int i = 0; i <= 100; i++) {if ((i & 1) == 1) {sum += i;}}return "thread is: " + Thread.currentThread() + " ans is: " + sum;}public static void main(String[] args) {// ================= 最常见的用法 ===============CompletableFuture<String> supplyCF =CompletableFuture.supplyAsync(AllStaticMethodTest::getEvenSum); // 指定异步任务// 执行成功的回调supplyCF.thenAccept(r -> {System.out.println("这是执行成功的回调");System.out.println(r);});// 执行失败的回调supplyCF.exceptionally(e -> {e.printStackTrace();return "异步任务执行过程中出现异常";});for (int i = 0; i <= 10; i++) {System.out.println("main线程输出:" + i);try {Thread.sleep(50);} catch (InterruptedException e) {System.out.println("sleep error");}}// ============== 创建一个异步任务,已经给定返回值了 =========CompletableFuture<String> c = CompletableFuture.completedFuture("hello");c.thenApply(r -> {System.out.println("上个任务结果:" + r);return r + "...world";});c.thenAccept(System.out::println);// ============= 创建一个没有返回值的异步任务 =============CompletableFuture<Void> runCF = CompletableFuture.runAsync(() -> {System.out.println(Thread.currentThread().getName() + "没有返回值的异步任务");});// ============= 创建单例的线程池 ================ExecutorService executor = Executors.newSingleThreadExecutor();// 创建一个有返回值的异步任务并指定执行的线程池CompletableFuture<String> supplyCFThreadPool =CompletableFuture.supplyAsync(AllStaticMethodTest::getOddSum, executor);// // 执行过程中出现异常的回调supplyCFThreadPool.thenAccept(System.out::println);// 执行过程中出现异常的回调supplyCF.exceptionally((e) -> {e.printStackTrace();return "异步任务执行过程中出现异常....";});// 关闭线程池executor.shutdown();}
}

2.3 成员方法方式创建任务

在使用这种方式创建任务的前提是需要建立在已经创建出一个CompletableFuture对象上。

总归来说这类成员方法创建异步任务的方式属于串行化的形式创建的,下一个任务依赖于上一个任务的执行结果时,就可以采用这种方式。

// 可以基于CompletableFuture对象接着创建一个有返回任务
// 此类方法可以基于上一个任务再创建一个新的有返回型任务。
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)// 可以在上一个任务执行失败的情况下接着执行
// 在上个任务出现异常的情况下也可以接着执行
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);// 可以基于CompletableFuture对象接着创建一个无返回任务
//
CompletionStage<Void> thenRun(Runnable action); 
CompletionStage<Void> thenRunAsync(Runnable action); 
CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); // 与thenApply方法类似,但是thenApply方法操作的是同一个CompletableFuture
// 而该类方法则是生产新的CompletableFuture<返回值>对象进行操作
public <U> CompletableFuture<U> thenCompose(Function<? super T,  ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)

如果调用方法名不带Async的方法创建出的任务都是由上一个任务的执行线程来执行的,在上一个任务没有执行完成的情况下,当前创建出来的任务会等待上

一个任务执行完成后再执行。

而如果是通过Async这类方法创建出来的任务则不受到这个限制,通过调用方法名带Async的方法创建出的任务,具体的执行线程会根据实际情况来决定,主

要会分为如下三种情况:

  • 上一个任务已经执行结束了,那么当前创建出的任务会交给上个任务的执行线程来执行
  • 上一个任务还没有执行结束,那么则会另启一条线程来执行
  • 如果创建任务时指定了执行线程池,则会使用指定线程池的线程来执行
public class CompletableFutureDemo {public static void main(String[] args) throws Exception {CompletableFuture cf =CompletableFuture.supplyAsync(CompletableFutureDemo::evenNumbersSum)// 链式编程:基于上个任务的返回继续执行新的任务.thenApply(r -> {System.out.println("获取上个任务的执行结果:" + r);// 通过上个任务的执行结果完成计算:求和100所有数return r + oddNumbersSum();}).thenApplyAsync(r -> {System.out.println("获取上个任务的执行结果:" + r);Integer i = r / 0; // 拋出异常return r;}).handle((param, throwable) -> { // handler -> 创建一个可以在上个任务抛出异常时依旧执行的任务if (throwable == null) {return param * 2;}// 获取捕获的异常System.out.println(throwable.getMessage());System.out.println("我可以在上个任务" +"抛出异常时依旧执行....");return -1;}).thenCompose(x -> CompletableFuture.supplyAsync(() -> x + 1)).thenRun(() -> {System.out.println("我是串行无返回任务....");});// 主线程执行休眠一段时间// 因为如果不为CompletableFuture指定线程池执行任务的情况下,// CompletableFuture默认是使用ForkJoinPool.commonPool()的线程// 同时是作为main线程的守护线程进行的,如果main挂了,执行异步任// 务的线程也会随之终止结束,并不会继续执行异步任务Thread.sleep(2000);}// 求和100内的偶数private static int evenNumbersSum() {int sum = 0;try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}for (int i = 1; i <= 100; i++) {if (i % 2 == 0) sum += i;}return sum;}// 求和100内的奇数private static int oddNumbersSum() {int sum = 0;try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}for (int i = 1; i <= 100; i++) {if (i % 2 != 0) sum += i;}return sum;}
}

在上面,共创建了六个异步任务:

  • 求和100内的所有偶数
  • 基于第一个任务的结果再加上100内奇数总值计算100内所有数字的总和
  • 基于第二个任务的结果除0抛出一个异常
  • 使用handle创建一个可以在上个任务抛出异常时依旧执行的任务
  • 使用thenCompose创建一个基于上个任务返回值+1的任务
  • 使用thenRun创建了一个没有返回值的任务

3、结果获取和回调

在之前的FutureTask获取执行结果是通过FutureTask.get()获得

而在CompletableFuture中则提供了多样化的方式:既可以与之前的FutureTask阻塞式获取,也可以通过回调函数的方式异步通知

// ============ 第一类:主线程调用直接获取执行结果 ================
// 阻塞主线程获取执行结果(与FutureTask.get()一致)
public T get();
// 上个方法的超时版本
public T get(long timeout,TimeUnit unit);
// 尝试获取执行结果,执行完成返回执行结果,未完成返回任务参数
public T getNow(T valueIfAbsent);
// 阻塞主线程等待任务执行结束
public T join();// ============== 第二类:可以为无返回值的异步任务写出执行结果的 ============
// 无返回的异步任务正常执行完成后可以通过此方法写出返回值
public boolean complete(T value);
// 无返回的异步任务执行异常后可以通过此方法写出捕获的异常信息
public boolean completeExceptionally(Throwable ex);// ============== 第三类:任务正常执行成功的回调 =====================
// 任务正常执行完成后的回调函数:默认由执行任务的线程执行回调逻辑
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
// 任务正常执行完成后的回调函数:另启一条线程执行回调逻辑
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
// 任务正常执行完成后的回调函数:指定线程池执行回调逻辑
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);// ============== 第四类:任务执行抛出异常的回调 ========================
// 执行过程中出现异常时执行的回调方法
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)// ============== 第五类:任务执行结束的回调 ======================
// 执行结束后会执行的回调逻辑
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
// 上个方法的异步版本
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// 上个方法的指定线程池版本
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

4、任务时序关系

4.1 串行

CompletableFuture类中的实现中,总归有五大类方法用来描述任务之间串行关系的方法

它们分别为:thenApplythenAcceptthenRunhandle以及thenCompose

在CompletableFuture类中方法名以这五个开头的函数都是用来描述任务之间的串行关系。

比如目前创建三个任务①②③,这三个任务都是异步的方式执行的

但是任务②依赖于①的执行结果,任务③依赖于②的执行结果,执行顺序总为①→②→③,这种关系便被称为串行关系。

4.2 并行

在CompletionStage中并没有关于描述任务之间并行并行关系的方法,因为也没有必要,毕竟并行只是串行的多执行。

比如main主线程创建三个任务①②③,全部交由T1线程执行,那么执行关系便是T1执行①→②→③,这种情况被称为串行异步。

而并行则是指main主线程创建三个任务①②③,并将这三个任务分别交由:T1、T2、T3三条不同的线程执行

三条线程同时执行三个不同的任务,执行则是T1→①,T2→②,T3→③,这种情况便被称为并行异步。

4.3 汇聚

CompletionStage接口中描述任务之间汇聚关系的方法总供有两类

  • 一类是and类型的,代表着任务都要执行完成后才开始处理的方法。
  • 一类则是or类型的,代表着任务只要有任意一个执行完成就开始处理的方法。

AND类型:

  • thenCombine系列:可以接收前面任务的结果进行汇聚计算,并且计算后可以返回值
  • thenAcceptBoth系列:可以接收前面任务的结果进行汇聚计算,但计算后没有返回值
  • runAfterBoth系列:不可以接收前面任务的结果且无返回,但可以在任务结束后进行汇聚计算

CompletableFuture类的allOf系列:不可接收之前任务的结果,但可以汇聚多个任务,但是要配合回调处理方法一起使用

OR 类型

  • applyToEither系列:接收最先完成的任务结果进行处理,处理完成后可以返回值
  • acceptEither系列:接收最先完成的任务结果进行处理,但是处理完成后不能返回
  • runAfterEither系列:不能接收前面任务的返回值且无返回,单可以为最先完成的任务进行后继处理
  • CompletableFuture类的anyOf系列:可以同时汇聚任意个任务,并接收最先执行完成的结果进行处理,完成后没有返回值,需要配合回调方法一起使用
package com.cui.commonboot.myjuc;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <p>* 功能描述:汇聚操作测试* </p>** @author cui haida* @date 2023/12/31/19:27*/
public class AggTest {// 偶数之和public static int getEvenSum() {int sum = 0;for (int i = 0; i <= 100; i++) {if ((i & 1) == 0) {sum += i;}}return sum;}// 奇数之和public static int getOddSum() {int sum = 0;for (int i = 0; i <= 100; i++) {if ((i & 1) == 1) {sum += i;}}return sum;}public static void main(String[] args) throws ExecutionException, InterruptedException {// *--------------------创建两个异步任务CF1/CF2------------------*/CompletableFuture<Integer> cf1 =CompletableFuture.supplyAsync(AggTest::getEvenSum);CompletableFuture<Integer> cf2 =CompletableFuture.supplyAsync(AggTest::getOddSum);// ========================== and 测试 ============================// thenAcceptBoth、runAfterBoth系列与thenCombine差不多相同// 区别就在于入参BiFunction、BiConsumer、Runnable三个函数式接口的不同CompletableFuture<Integer> aggOfAndFuture = cf1.thenCombine(cf2, (r1, r2) -> {System.out.println("cf1 task ans is: " + cf1);System.out.println("cf2 task ans is: " + cf2);return r1 + r2;});System.out.println("agg ans of and is: " + aggOfAndFuture.get());// allOf测试(allOf还可以汇聚多个)CompletableFuture<Void> cfAllOf = CompletableFuture.allOf(cf1, cf2);cfAllOf.thenAccept(o -> {System.out.println("所有任务处理完之后,进行断后处理");});Thread.sleep(2000);System.out.println("================================================");// ========================== or 测试 ============================// acceptEither、runAfterEither系列与applyToEither系列也差不多相同// 区别就也是在于入参BiFunction、BiConsumer、Runnable三个函数式接口的不同CompletableFuture<Integer> cfApplyToEither = cf1.applyToEither(cf2, r -> {System.out.println("最先执行完成的任务结果:" + r);return r * 10; // 先到的将进行 * 10操作});System.out.println("cf1, cf2任务applyToEither汇聚处理结果:" + cfApplyToEither.get());// 使用anyOf汇聚两个任务,谁先执行完成就处理谁的执行结果CompletableFuture<Object> cfAnyOf = CompletableFuture.anyOf(cf1, cf2);// 配合thenAccept成功回调函数使用cfAnyOf.thenAccept(r -> {System.out.println("最先执行完成的任务结果:" + r);System.out.println("对先完成的任务结果进行后续处理....");});}
}

为什么CompletableFuture可以实现链式编程,完成任务的串行创建且执行?

因为CompletableFuture实现了CompletionStage接口,在每个任务执行完成后又回返回一个CompletableFuture对象

使用时我们可以接着基于该对象继续创建新的任务,同时每个CompletableFuture对象中存在一个链表,一个新创建的任务到来,如果线程还未执行完当前任务,则会将新到来的任务加入链表等待,线程处理完当前任务则会接着执行链表中的任务。

http://www.dtcms.com/a/288465.html

相关文章:

  • 高速SAR架构ADC选型设计
  • 为什么选择PGCE中级认证?
  • startnet.cmd命令里面的factory -minint
  • 零基础学习性能测试第二章-监控体系
  • 多线程 示例
  • QML 动画效果详解
  • Public Key Retrieval is not allowed
  • CS231n-2017 Lecture3线性分类器、最优化笔记
  • 测试计划(抽奖系统)
  • DC-DC降压转换5.5V/3A高效率低静态同步降压转换具有自适应关断功能
  • CCF编程能力等级认证GESP—C++7级—20250628
  • Navicat 查看单张表建表ddl
  • Python观察者模式详解:从理论到实战
  • 142. 环形链表 II
  • Spring IOC容器在Web环境中是如何启动的(源码级剖析)?
  • MCP 协议详细分析 二 Sampling
  • Jmeter的元件使用介绍:(一)测试计划详解
  • string的增删改查模拟实现(简单版)【C++】
  • 数据分析综合应用 30分钟精通计划
  • 使用UV管理FastAPI项目
  • 数独算法Python示例
  • 【HarmonyOS】Ability Kit - Stage模型
  • Redis数据库基础与持久化部署
  • Vue3的definePros和defineEmits
  • Nacos:微服务架构的核心引擎
  • xss-dom漏洞
  • Python 数据分析模板在工程实践中的问题诊断与系统性解决方案
  • 2025在线教育系统源码、平台开发新趋势:开源架构+AI赋能
  • FPGA自学——整体设计思路
  • MySQL练习3