Future和CompletableFuture详解
Future和CompletableFuture详解
Future是Java异步编程的核心,CompletableFuture是Java 8引入的强大异步编程工具。
Future接口详解
什么是Future
Future:表示一个异步计算的结果,提供了检查计算是否完成、等待计算完成、获取计算结果的方法。
核心思想:
同步调用:
main线程 ──────→ 调用方法 ──────→ 等待返回 ──────→ 继续执行|_____耗时操作_____|异步调用(Future):
main线程 ──────→ 提交任务 ──────→ 继续执行其他逻辑 ──────→ get()获取结果↓工作线程执行任务(异步)
Future接口定义
public interface Future<V> {// 取消任务boolean cancel(boolean mayInterruptIfRunning);// 任务是否被取消boolean isCancelled();// 任务是否完成boolean isDone();// 获取结果(阻塞)V get() throws InterruptedException, ExecutionException;// 获取结果(超时)V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
Future的使用
public class FutureDemo {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(2);// 提交Callable任务,返回FutureFuture<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("执行任务中...");Thread.sleep(2000);return "任务结果";}});System.out.println("继续执行main线程的其他逻辑...");// 检查任务状态System.out.println("任务是否完成: " + future.isDone()); // falseSystem.out.println("任务是否取消: " + future.isCancelled()); // false// 阻塞获取结果System.out.println("获取结果: " + future.get()); // 阻塞2秒System.out.println("任务是否完成: " + future.isDone()); // truepool.shutdown();}
}
输出:
继续执行main线程的其他逻辑...
任务是否完成: false
任务是否取消: false
执行任务中...
获取结果: 任务结果
任务是否完成: true
FutureTask深度解析
FutureTask类图
FutureTask继承关系:┌────────────┐│ Runnable │ ← 可以作为Thread的target└────────────┘↑│┌──────┴──────┐│RunnableFuture│└──────┬──────┘↑│ 同时实现┌──────┴──────┐│ Future │ ← 提供Future的所有方法└─────────────┘↑│┌──────┴──────┐│ FutureTask │ ← 包装Callable/Runnable└─────────────┘
FutureTask核心成员
public class FutureTask<V> implements RunnableFuture<V> {// ===== 任务状态(7种) =====private volatile int state;private static final int NEW = 0; // 新建private static final int COMPLETING = 1; // 完成中private static final int NORMAL = 2; // 正常完成private static final int EXCEPTIONAL = 3; // 异常private static final int CANCELLED = 4; // 取消private static final int INTERRUPTING = 5; // 中断中private static final int INTERRUPTED = 6; // 已中断// ===== 核心成员变量 =====private Callable<V> callable; // 任务对象private Object outcome; // 返回结果或异常private volatile Thread runner; // 执行任务的线程private volatile WaitNode waiters; // get()阻塞线程队列(栈结构)// ===== 等待节点 =====static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}
}
状态转换图
┌─────────────────────────────────────────────────────────────┐
│ FutureTask状态转换详细图 │
└─────────────────────────────────────────────────────────────┘┌─────────┐│ NEW │ ← 初始状态│ (state=0)│└────┬────┘│┌────────────┼────────────┐│ │ │▼ ▼ ▼┌──────────┐ ┌──────────┐ ┌──────────┐│ 正常执行 │ │ 异常执行 │ │ 取消 │└──────────┘ └──────────┘ └──────────┘│ │ │▼ ▼ ├─────────┐┌────────────┐ ┌────────────┐ │ ││ COMPLETING │ │ COMPLETING │ │ ││ (state=1) │ │ (state=1) │ ▼ ▼└─────┬──────┘ └─────┬──────┘ ┌─────┐ ┌───────────┐│ │ │CANCE│ │INTERRUPTING│▼ ▼ │LLED │ │(state=3) │┌──────────┐ ┌──────────┐ │(2) │ └──────┬────┘│ NORMAL │ │EXCEPTION-│ └─────┘ ││(state=2) │ │AL(state=3)│ ▼└──────────┘ └──────────┘ ┌────────────┐✅ ❌ │INTERRUPTED │正常完成 异常完成 │ (state=4) │└────────────┘❌取消并中断【状态码】:
0 = NEW 初始状态
1 = COMPLETING 执行中(过渡态)
2 = NORMAL 正常完成
3 = EXCEPTIONAL 异常完成
4 = CANCELLED 被取消(无中断)
5 = INTERRUPTING 中断中(过渡态)
6 = INTERRUPTED 已中断
构造方法
// 1. 传入Callable
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // 初始状态为NEW
}// 2. 传入Runnable + 返回值
public FutureTask(Runnable runnable, V result) {// 使用适配器模式将Runnable转换为Callablethis.callable = Executors.callable(runnable, result);this.state = NEW;
}// Runnable适配器
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run(); // 执行Runnable的run方法return result; // 返回指定的结果}
}
run()方法源码分析
public void run() {// 条件1:state != NEW说明任务已被执行或取消// 条件2:CAS设置runner为当前线程,失败说明被其他线程抢占if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 【核心】执行call()方法,获取结果result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;// 设置异常setException(ex);}if (ran)// 设置结果set(result);}} finally {runner = null; // help GCint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}
run()方法详细流程图:
┌──────────────────────────────────────────────┐
│ FutureTask#run() │
│ 任务执行流程 │
└────────────────────┬─────────────────────────┘│▼┌─────────────────────────┐│ state != NEW? │ ← 任务已执行/取消?└──────┬─────────┬────────┘YES NO│ │▼ ▼┌────────┐ ┌──────────────────────┐│ return │ │ CAS设置runner ││ 任务 │ │ runner: null → 当前线程││ 已结束│ └──────┬──────┬────────┘└────────┘ 失败 成功│ │▼ ▼┌────────┐ ┌──────────────┐│ return │ │ double-check ││ 被抢占│ │ c != null && │└────────┘ │ state == NEW │└──────┬───────┘│▼┌──────────────┐│ result = ││ c.call() │ ← 核心执行└───┬──────┬───┘成功 异常│ │▼ ▼┌──────────┐ ┌──────────────┐│ran = true│ │ran = false ││ │ │result = null │└────┬─────┘ └──────┬───────┘│ │▼ ▼┌──────────────┐ ┌──────────────────┐│ set(result) │ │ setException(ex) │├──────────────┤ ├──────────────────┤│ 1. CAS: │ │ 1. CAS: ││ NEW→ │ │ NEW→ ││ COMPLETING │ │ COMPLETING ││ 2. outcome=v │ │ 2. outcome=ex ││ 3. state= │ │ 3. state= ││ NORMAL │ │ EXCEPTIONAL ││ 4. finish- │ │ 4. finish- ││ Completion()│ │ Completion() │└──────┬───────┘ └──────┬───────────┘│ │└────────┬────────┘▼┌───────────────────────┐│ finishCompletion() ││ 唤醒所有get()阻塞线程 │├───────────────────────┤│ 1. 遍历waiters链表 ││ 2. LockSupport.unpark││ 每个等待线程 │└───────────────────────┘【关键点】:
✅ CAS保证只有一个线程执行
✅ COMPLETING是过渡状态
✅ outcome保存结果或异常
✅ finishCompletion唤醒所有等待线程
set()方法:设置正常结果
protected void set(V v) {// CAS设置状态:NEW → COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v; // 【保存结果】// 设置最终状态:COMPLETING → NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL);// 唤醒所有等待的线程finishCompletion();}
}
setException()方法:设置异常结果
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t; // 保存异常// 设置最终状态:COMPLETING → EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);finishCompletion();}
}
finishCompletion()方法:唤醒所有get()阻塞的线程
private void finishCompletion() {// 遍历waiters链表(栈)for (WaitNode q; (q = waiters) != null;) {// CAS设置waiters为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;// 唤醒等待线程LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // help GCq = next;}break;}}done(); // 钩子方法callable = null; // help GC
}
get()方法源码分析
public V get() throws InterruptedException, ExecutionException {int s = state;// 如果任务未完成,阻塞等待if (s <= COMPLETING)s = awaitDone(false, 0L);// 返回结果return report(s);
}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}
awaitDone()方法:阻塞等待(核心方法)
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;// 【三次自旋】for (;;) {// 检查中断if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 任务已完成,返回状态if (s > COMPLETING) {if (q != null)q.thread = null;return s;}// 任务即将完成,让出CPUelse if (s == COMPLETING)Thread.yield();// 【第1次自旋】创建WaitNodeelse if (q == null)q = new WaitNode();// 【第2次自旋】入队(头插法)else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 【第3次自旋】阻塞else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}else// 【阻塞当前线程】LockSupport.park(this);}
}
WaitNode队列图示:
waiters(栈结构,头插头取):初始状态:
waiters → null线程1调用get():
waiters → [thread1] → null线程2调用get():
waiters → [thread2] → [thread1] → null线程3调用get():
waiters → [thread3] → [thread2] → [thread1] → null任务完成后,finishCompletion()依次唤醒:
thread3 → thread2 → thread1
report()方法:返回结果或抛出异常
private V report(int s) throws ExecutionException {Object x = outcome; // 获取结果if (s == NORMAL)return (V)x; // 正常返回if (s >= CANCELLED)throw new CancellationException(); // 任务被取消throw new ExecutionException((Throwable)x); // 抛出异常
}
cancel()方法源码分析
public boolean cancel(boolean mayInterruptIfRunning) {// CAS修改状态:NEW → INTERRUPTING/CANCELLEDif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)// 中断执行任务的线程t.interrupt();} finally {// 设置最终状态:INTERRUPTING → INTERRUPTEDUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 唤醒所有get()阻塞的线程finishCompletion();}return true;
}
FutureTask完整示例
public class FutureTaskDemo {public static void main(String[] args) throws Exception {// 创建FutureTaskFutureTask<Integer> task = new FutureTask<>(() -> {System.out.println("任务开始执行...");Thread.sleep(2000);return 100;});// 启动线程执行任务new Thread(task, "t1").start();// 多个线程可以同时getnew Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 获取结果: " + task.get());} catch (Exception e) {e.printStackTrace();}}, "t2").start();new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 获取结果: " + task.get());} catch (Exception e) {e.printStackTrace();}}, "t3").start();// main线程获取结果System.out.println("main 获取结果: " + task.get());}
}
输出:
任务开始执行...
main 获取结果: 100
t2 获取结果: 100
t3 获取结果: 100
submit vs execute
方法对比
public interface ExecutorService {// 1. execute:只能提交Runnable,无返回值void execute(Runnable command);// 2. submit:可以提交Runnable/Callable,返回FutureFuture<?> submit(Runnable task);<T> Future<T> submit(Runnable task, T result);<T> Future<T> submit(Callable<T> task);
}
submit源码分析
// AbstractExecutorService
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();// 包装成FutureTask,返回值为nullRunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask); // 调用execute执行return ftask;
}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();// 包装成FutureTask,返回值为指定的resultRunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;
}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 包装成FutureTaskRunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}
异常处理对比
public class ExecuteVsSubmitDemo {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(2);// ===== execute:直接抛出异常 =====pool.execute(() -> {System.out.println("execute 执行");int i = 1 / 0; // 异常会被打印到控制台});Thread.sleep(100);// ===== submit:吞掉异常 =====pool.submit(() -> {System.out.println("submit 执行");int i = 1 / 0; // 异常被吞掉,不会打印});Thread.sleep(100);// ===== submit:通过get()获取异常 =====Future<?> future = pool.submit(() -> {System.out.println("submit 执行(get)");int i = 1 / 0;});try {future.get(); // 调用get()时才抛出异常} catch (ExecutionException e) {System.out.println("捕获异常: " + e.getCause());}pool.shutdown();}
}
输出:
execute 执行
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
submit 执行
submit 执行(get)
捕获异常: java.lang.ArithmeticException: / by zero
对比总结
| 特性 | execute | submit |
|---|---|---|
| 参数 | 只能Runnable | Runnable/Callable |
| 返回值 | void | Future |
| 异常处理 | 直接抛出 | 吞掉异常,需要get()获取 |
| 底层 | 直接执行 | 包装成FutureTask再execute |
CompletableFuture入门
为什么需要CompletableFuture
Future的局限性:
Future的缺点:
1. get()阻塞:无法实现真正的异步回调
2. 不能组合:无法链式调用,无法组合多个Future
3. 不能手动完成:无法主动设置结果
4. 异常处理困难:需要try-catch
5. 无法取消:cancel()不够灵活
CompletableFuture的优势:
CompletableFuture(JDK 8):
1. ✅ 支持异步回调(thenApply、thenAccept等)
2. ✅ 支持链式调用
3. ✅ 支持组合多个异步任务(thenCompose、thenCombine等)
4. ✅ 支持手动完成(complete)
5. ✅ 更好的异常处理(exceptionally、handle)
CompletableFuture类图
CompletableFuture继承关系:┌──────────┐│ Future │ ← 提供get()、cancel()等└──────────┘↑│┌────┴────────────┐│CompletionStage │ ← 提供异步编排能力└─────────────────┘↑│ 同时实现┌────┴──────────────┐│CompletableFuture │ ← 强大的异步编程工具└───────────────────┘
创建CompletableFuture
public class CompletableFutureDemo {public static void main(String[] args) throws Exception {// ===== 1. 手动创建(需要手动complete) =====CompletableFuture<String> future1 = new CompletableFuture<>();new Thread(() -> {try {Thread.sleep(1000);future1.complete("手动完成"); // 手动设置结果} catch (Exception e) {future1.completeExceptionally(e); // 手动设置异常}}).start();System.out.println(future1.get()); // 手动完成// ===== 2. runAsync:无返回值 =====CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {System.out.println("runAsync执行");});future2.get(); // null// ===== 3. supplyAsync:有返回值 =====CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {System.out.println("supplyAsync执行");return "返回结果";});System.out.println(future3.get()); // 返回结果// ===== 4. 指定线程池 =====ExecutorService pool = Executors.newFixedThreadPool(2);CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {System.out.println("使用自定义线程池: " + Thread.currentThread().getName());return "自定义线程池";}, pool);System.out.println(future4.get());pool.shutdown();}
}
CompletableFuture核心API
回调方法
thenApply(转换结果)
public class ThenApplyDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行任务1");return 100;}).thenApply(result -> {System.out.println("执行任务2,上一步结果: " + result);return result * 2;}).thenApply(result -> {System.out.println("执行任务3,上一步结果: " + result);return "最终结果: " + result;});System.out.println(future.get());}
}
输出:
执行任务1
执行任务2,上一步结果: 100
执行任务3,上一步结果: 200
最终结果: 200
thenAccept(消费结果)
CompletableFuture.supplyAsync(() -> "Hello").thenAccept(result -> {System.out.println("消费结果: " + result); // 无返回值});
thenRun(不关心结果)
CompletableFuture.supplyAsync(() -> "Hello").thenRun(() -> {System.out.println("任务完成,不关心结果"); // 无参数,无返回值});
组合方法
thenCompose(串行组合)
public class ThenComposeDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("任务1");return "Hello";}).thenCompose(result -> CompletableFuture.supplyAsync(() -> {System.out.println("任务2,上一步结果: " + result);return result + " World";}));System.out.println(future.get()); // Hello World}
}
thenCombine(并行组合)
public class ThenCombineDemo {public static void main(String[] args) throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("任务1");try { Thread.sleep(1000); } catch (Exception e) {}return 100;});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("任务2");try { Thread.sleep(2000); } catch (Exception e) {}return 200;});CompletableFuture<Integer> result = future1.thenCombine(future2, (r1, r2) -> {System.out.println("合并结果: " + r1 + " + " + r2);return r1 + r2;});System.out.println("最终结果: " + result.get()); // 300}
}
allOf(等待所有完成)
public class AllOfDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(1000); } catch (Exception e) {}return "任务1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(2000); } catch (Exception e) {}return "任务2";});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(3000); } catch (Exception e) {}return "任务3";});// 等待所有任务完成CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);allFuture.get(); // 阻塞3秒System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());}
}
anyOf(等待任意完成)
public class AnyOfDemo {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(3000); } catch (Exception e) {}return "任务1";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(1000); } catch (Exception e) {}return "任务2";});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try { Thread.sleep(2000); } catch (Exception e) {}return "任务3";});// 等待任意一个完成CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("最快完成的任务: " + anyFuture.get()); // 任务2}
}
异常处理
exceptionally
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) throw new RuntimeException("出错了");return "正常结果";}).exceptionally(ex -> {System.out.println("异常处理: " + ex.getMessage());return "默认值";});System.out.println(future.get()); // 默认值
handle(同时处理结果和异常)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) throw new RuntimeException("出错了");return "正常结果";}).handle((result, ex) -> {if (ex != null) {System.out.println("异常: " + ex.getMessage());return "异常处理后的值";}return result;});System.out.println(future.get()); // 异常处理后的值
方法后缀
方法命名规则:1. 无后缀(thenApply):└─ 由上一步的线程执行2. Async后缀(thenApplyAsync):└─ 由ForkJoinPool.commonPool()执行3. Async+Executor后缀(thenApplyAsync(fn, executor)):└─ 由自定义线程池执行
实战应用
应用1:商品详情页聚合
public class ProductDetailDemo {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(10);long start = System.currentTimeMillis();// 并行查询多个接口CompletableFuture<String> baseInfoFuture = CompletableFuture.supplyAsync(() -> {sleep(1000);return "基本信息";}, pool);CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {sleep(2000);return "价格信息";}, pool);CompletableFuture<String> inventoryFuture = CompletableFuture.supplyAsync(() -> {sleep(1500);return "库存信息";}, pool);CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> {sleep(1800);return "评论信息";}, pool);// 等待所有接口返回CompletableFuture<Void> allFuture = CompletableFuture.allOf(baseInfoFuture, priceFuture, inventoryFuture, reviewFuture);allFuture.get();// 聚合结果String result = String.format("商品详情:%s, %s, %s, %s",baseInfoFuture.get(),priceFuture.get(),inventoryFuture.get(),reviewFuture.get());System.out.println(result);System.out.println("总耗时: " + (System.currentTimeMillis() - start) + "ms");pool.shutdown();}static void sleep(long millis) {try { Thread.sleep(millis); } catch (Exception e) {}}
}
输出:
商品详情:基本信息, 价格信息, 库存信息, 评论信息
总耗时: 2005ms ← 并行执行,只需等待最长的2秒
应用2:异步回调链
public class AsyncCallbackDemo {public static void main(String[] args) throws Exception {CompletableFuture.supplyAsync(() -> {System.out.println("1. 查询用户ID");return "user123";}).thenApplyAsync(userId -> {System.out.println("2. 根据用户ID查询用户信息: " + userId);sleep(1000);return "User(id=" + userId + ", name=张三)";}).thenApplyAsync(userInfo -> {System.out.println("3. 根据用户信息查询订单: " + userInfo);sleep(1000);return "订单列表[10条]";}).thenAccept(orders -> {System.out.println("4. 显示订单: " + orders);}).exceptionally(ex -> {System.out.println("异常处理: " + ex.getMessage());return null;}).get(); // 阻塞等待完成}static void sleep(long millis) {try { Thread.sleep(millis); } catch (Exception e) {}}
}
应用3:超时控制
public class TimeoutDemo {public static void main(String[] args) {try {String result = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000); // 模拟耗时操作} catch (InterruptedException e) {e.printStackTrace();}return "正常结果";}).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS) // JDK 9+.get();System.out.println(result); // 超时默认值} catch (Exception e) {e.printStackTrace();}}
}
🎯 知识点总结
Future vs CompletableFuture
| 特性 | Future | CompletableFuture |
|---|---|---|
| 阻塞获取 | get()阻塞 | get()阻塞,但支持异步回调 |
| 回调 | ❌ | ✅ thenApply、thenAccept等 |
| 组合 | ❌ | ✅ thenCompose、thenCombine等 |
| 异常处理 | try-catch | exceptionally、handle |
| 手动完成 | ❌ | ✅ complete |
| 并行 | ❌ | ✅ allOf、anyOf |
CompletableFuture核心方法
创建:
├─ supplyAsync(有返回值)
└─ runAsync(无返回值)转换:
├─ thenApply(转换结果)
├─ thenAccept(消费结果)
└─ thenRun(不关心结果)组合:
├─ thenCompose(串行)
├─ thenCombine(并行)
├─ allOf(等待所有)
└─ anyOf(等待任意)异常:
├─ exceptionally
└─ handle
💡 常见面试题
Q1:FutureTask的run()方法返回值是void,为什么get()能获取到结果?
答:因为FutureTask有一个成员变量outcome,run()方法执行完后会把结果保存到outcome中,get()方法直接返回outcome的值。
Q2:多个线程同时调用get()会怎样?
答:多个线程调用get()会被封装成WaitNode加入到waiters链表(栈结构)中阻塞等待。任务完成后,finishCompletion()会遍历waiters链表,依次唤醒所有等待的线程。
Q3:submit()和execute()的区别?
答:
- execute只能提交Runnable,submit可以提交Runnable/Callable
- execute无返回值,submit返回Future
- execute直接抛出异常,submit吞掉异常(需要通过get()获取)
- submit底层是将任务包装成FutureTask,然后调用execute执行
Q4:CompletableFuture比Future强在哪里?
答:
- 支持异步回调(thenApply、thenAccept等),不需要阻塞等待
- 支持任务组合(thenCompose、thenCombine)
- 支持并行等待(allOf、anyOf)
- 更好的异常处理(exceptionally、handle)
- 支持手动完成(complete)
Q5:FutureTask的状态有哪些?
答:7种状态:
- NEW(新建)
- COMPLETING(完成中)
- NORMAL(正常完成)
- EXCEPTIONAL(异常)
- CANCELLED(取消)
- INTERRUPTING(中断中)
- INTERRUPTED(已中断)
