Java开发异步编程中常用的接口和类
Callable 与 Runnable接口
Runnable使用方式一:使用Thread运行
Runnable使用方式二:使用线程池执行
Callable使用方式一:包装到FutureTask里面,然后通过Thread运行
一般写法
匿名内部类写法
Lambda表达式写法
Callable使用方式二:使用线程池执行
Future接口
FutureTask类
基本特点
“不可重复执行”特性(含源码)
从原始FutureTask对象中取值(含源码)
第1步: 进入 ExecutorService.submit()
第2步: 任务执行
第3步: finishCompletion() 和关键的 null 操作
第4步: 结果返回
新返回Future对象的作用
Callable 与 Runnable接口
- Callable接口类似于Runnable接口,都是用于实现一个线程任务。
- Callable实现的线程可以返回执行结果,但是Runnable不可以返回执行结果。
- Callable的run方法会抛出编译时异常,而Runnable没有抛出异常。
- 可以使用工厂类Executors把Runnable包装成Callable。
Runnable使用方式一:使用Thread运行
public class ThreadCreationDemo {// 第一种方式需要一个实现了Runnable接口的类static class MyRunnable implements Runnable {@Overridepublic void run() {// 打印当前执行此代码的线程的名称System.out.println("方法一 (MyRunnable): 线程 " + Thread.currentThread().getName() + " 正在运行。");}}public static void main(String[] args) {// 第一种方式: 使用一个独立的类去实现Runnable接口// 这种方法结构清晰,适用于逻辑比较复杂的场景。Thread t1 = new Thread(new MyRunnable());t1.start();// 第二种方式: 使用Lambda表达式实现一个Runnable// 这是Java 8+中最推荐的简洁写法。Thread t2 = new Thread(() -> {System.out.println("方法二 (Lambda): 线程 " + Thread.currentThread().getName() + " 正在运行。");});t2.start();// 第三种方式: 使用匿名内部类实现一个Runnable// 这是在Java 8出现之前常用的方法。Thread t3 = new Thread(new Runnable() {@Overridepublic void run() {System.out.println("方法三 (匿名内部类): 线程 " + Thread.currentThread().getName() + " 正在运行。");}});t3.start();}
}
Runnable使用方式二:使用线程池执行
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class ExecutorServiceDemo {// 定义一个实现了Runnable接口的类,用于作为要执行的任务static class MyRunnable implements Runnable {@Overridepublic void run() {System.out.println("任务由线程: " + Thread.currentThread().getName() + " 执行。");try {// 模拟任务执行耗时Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}public static void main(String[] args) {// 第二种方式: 使用“线程池”去运行Runnable// 使用Executors工厂类创建一个单线程的线程池// 这个线程池保证任务是按提交顺序依次执行的ExecutorService executorService = Executors.newSingleThreadExecutor();System.out.println("使用 executorService.execute() 提交任务。");// execute(Runnable): 提交一个任务,没有返回值。executorService.execute(new MyRunnable());System.out.println("使用 executorService.submit() 提交任务。");// submit(Runnable): 同样提交一个任务,但会返回一个Future对象。// 这个Future对象可以用来判断任务是否执行完毕.Future<?> future = executorService.submit(new MyRunnable());// --- 把Runnable通过工具类转成Callable ---// Callable与Runnable类似,但它可以有返回值并能抛出异常。// Executors.callable()方法可以将一个Runnable转换为Callable,// 其执行结果为null。Callable<Object> callable = Executors.callable(new MyRunnable());System.out.println("提交转换后的Callable任务。");Future<Object> callableFuture = executorService.submit(callable);// --- 关闭线程池 ---executorService.shutdown();}
}
Callable使用方式一:包装到FutureTask里面,然后通过Thread运行
因为FutureTask实现了Runnable接口,所以可以作为参数传入Thread中
一般写法
代码示例:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建一个有返回值的任务 (Callable)Callable<String> myTask = () -> {System.out.println("子线程: 正在执行任务...");Thread.sleep(2000); // 模拟耗时2秒return "任务执行完毕,这是结果";};// 2. 将任务包装成 FutureTask (如图所示)FutureTask<String> futureTask = new FutureTask<>(myTask);// 3. 把 FutureTask 交给一个新线程去执行 (如图所示)// 因为 FutureTask 实现了 Runnable 接口,所以 Thread 可以直接运行它Thread thread = new Thread(futureTask);thread.start();System.out.println("主线程: 已启动子线程,我先忙别的...");// 4. 在主线程中获取任务结果// get() 方法会阻塞当前线程,直到子线程的任务完成System.out.println("主线程: 现在需要结果了,开始等待...");String result = futureTask.get(); // 等待并获取结果System.out.println("主线程: 终于拿到了结果 - \"" + result + "\"");}
}
运行结果:
主线程: 已启动子线程,我先忙别的...
子线程: 正在执行任务...
主线程: 现在需要结果了,开始等待...
(程序在这里会暂停2秒)
主线程: 终于拿到了结果 - "任务执行完毕,这是结果"
也可以用匿名内部类和Lambda表达式来定义
Callable
任务
匿名内部类写法
匿名内部类是在Java 8之前的标准写法,语法上会显得比较冗长。
// 使用匿名内部类来定义 Callable 任务
FutureTask<String> futureTask_anonymous = new FutureTask<>(new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("通过匿名内部类执行...");Thread.sleep(1000);return "匿名内部类的结果";}
});// 然后同样交给 Thread 或线程池执行
// Thread thread = new Thread(futureTask_anonymous);
// thread.start();
Lambda表达式写法
Lambda表达式是Java 8及以后推荐的函数式编程写法,非常简洁。
由于Callable
接口是一个函数式接口(只有一个抽象方法call()
),所以可以被Lambda表达式替代。
// 使用Lambda表达式来定义 Callable 任务
FutureTask<String> futureTask_lambda = new FutureTask<>(() -> {System.out.println("通过Lambda表达式执行...");Thread.sleep(1000);return "Lambda的结果";
});// 然后同样交给 Thread 或线程池执行
// Thread thread = new Thread(futureTask_lambda);
// thread.start();
Callable使用方式二:使用线程池执行
execute方法只能接收Runnable对象,所以Callable对象不能使用线程池的execute方法执行,只能使用submit方法执行。
执行会返回Future类的对象,对这个对象调用get()方法即可以拿到返回值。
示例代码:
import java.util.concurrent.*;public class ExecutorServiceDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建一个线程池 (图中是单线程的,更常用的是固定大小的线程池)ExecutorService executorService = Executors.newFixedThreadPool(2); // 例如,一个有两个线程的池System.out.println("主线程: 准备将任务提交给线程池。");// 2. 将 Callable 任务提交给线程池 (直接使用Lambda表达式定义任务)// submit() 方法会立即返回一个 Future 对象Future<String> future = executorService.submit(() -> {System.out.println("子线程 (" + Thread.currentThread().getName() + "): 开始执行耗时任务...");Thread.sleep(2000); // 模拟耗时2秒return "任务完成,这是来自线程池的结果";});System.out.println("主线程: 任务已提交,我可以继续做其他事情...");// 在这里,主线程可以执行其他不受阻塞的代码// 3. 在需要结果时,调用 future.get() 阻塞等待System.out.println("主线程: 现在需要结果了,开始等待...");String result = future.get(); // 阻塞,直到子线程任务完成并返回结果System.out.println("主线程: 成功获取到结果 -> \"" + result + "\"");// 4. 关闭线程池 (这是必须的步骤,否则JVM不会退出)executorService.shutdown();}
}
Future接口
- 一个Future表示一个异步任务的结果;
- 它提供了一些方法用来检查异步是不是已经完成,没有完成就等待,待其完成后取回异步执行的结果;
- 异步执行的结果,只能通过get()方法获取,get()方法是阻塞的,如果异步执行没有执行结束 ,则阻塞直至拿到结果;
- 它提供了一个cancel()方法用于取消异步执行,执行完毕后不可取消;
- 如果不想要返回的结果,也可以把底层的任务声明为返回null。
Future接口有5个方法:
boolean isDone()
: 判断任务是否已经完成。V get()
: 阻塞等待,直到任务完成并返回结果。V get(long timeout, TimeUnit unit)
: 在指定时间内阻塞等待,超时则抛出TimeoutException
。boolean isCancelled()
: 判断任务是否在完成前被取消。boolean cancel(boolean mayInterruptIfRunning)
: 尝试取消任务的执行。
import java.util.concurrent.*;public class FutureMethodsDemo {public static void main(String[] args) {// 创建一个固定大小为2的线程池ExecutorService executor = Executors.newFixedThreadPool(2);System.out.println("--- 演示 get(), isDone() ---");// 提交一个 Callable 任务,该任务会模拟2秒的计算,然后返回一个字符串结果Future<String> future1 = executor.submit(() -> {System.out.println("任务1: 开始执行,预计耗时2秒...");Thread.sleep(2000);return "任务1执行完毕!";});// 1. isDone()// 在任务刚提交时,检查它是否完成System.out.println("提交任务1后,立即检查 isDone(): " + future1.isDone());try {// 2. get()// 这是一个阻塞方法,主线程会在这里等待,直到任务1执行完成并返回结果System.out.println("调用 future1.get(),主线程等待中...");String result1 = future1.get();System.out.println("future1.get() 获得结果: \"" + result1 + "\"");// 任务完成后,再次检查isDone()System.out.println("任务1完成后,再次检查 isDone(): " + future1.isDone());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println("\n--- 演示 get(timeout, unit) ---");// 提交一个需要3秒才能完成的任务Future<String> future2 = executor.submit(() -> {System.out.println("任务2: 开始执行,预计耗时3秒...");Thread.sleep(3000);return "任务2执行完毕!";});try {// 3. get(long timeout, TimeUnit unit)// 我们只等待1秒,由于任务需要3秒,所以这必定会超时System.out.println("调用 future2.get(1, TimeUnit.SECONDS),主线程最多等待1秒...");String result2 = future2.get(1, TimeUnit.SECONDS);System.out.println("在1秒内获取到结果: " + result2);} catch (TimeoutException e) {// 这里会捕获到超时异常System.out.println("等待超时! " + e.getClass().getName());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}System.out.println("\n--- 演示 cancel(), isCancelled() ---");// 提交一个长时间运行的任务Future<String> future3 = executor.submit(() -> {System.out.println("任务3: 开始执行,这是一个可能被中断的长任务...");try {Thread.sleep(5000); // 模拟一个非常耗时的操作} catch (InterruptedException e) {// 如果任务被中断,会进入这里System.out.println("任务3: 执行被中断!");return "已被中断";}return "任务3正常完成";});// 为了确保 cancel() 在任务开始后执行,我们稍微等待一下try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}// 4. cancel(boolean mayInterruptIfRunning)// 尝试取消任务的执行。参数true表示如果任务已经在运行,就中断执行它的线程。System.out.println("尝试取消任务3...");boolean cancelResult = future3.cancel(true);System.out.println("cancel(true) 方法返回: " + cancelResult);// 5. isCancelled()// 检查任务是否已经被成功取消System.out.println("调用cancel后,检查 isCancelled(): " + future3.isCancelled());// 再次检查isDone()。一个被取消的任务也被认为是“完成”的。System.out.println("调用cancel后,检查 isDone(): " + future3.isDone());try {// 试图获取一个已取消任务的结果,会抛出 CancellationExceptionSystem.out.println("尝试 get() 已取消的任务3...");String result3 = future3.get();System.out.println("获取到已取消任务的结果: " + result3);} catch (CancellationException e) {System.out.println("获取结果失败,因为任务已被取消! " + e.getClass().getName());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 关闭线程池,这是一个好习惯executor.shutdownNow();}
}
FutureTask类
基本特点
- FutureTask类实现了Runnable接口、Future接口(RunnableFuture接口)。
- FutureTask类除了实现接口的5+1个方法,还有两个构造方法。
- FutureTask代表了一个可以取消的异步执行。
- FutureTask可以用来包装Runnable或者Callable。
- FutureTask可以提交到Executor中去执行。
import java.util.concurrent.*;public class FutureTaskFeaturesDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// =================================================================// 特点 5: FutureTask 可以用来包装 Runnable 或者 Callable// =================================================================// 创建一个 Callable 任务,它会模拟计算并返回一个结果Callable<String> callableTask = () -> {System.out.println("子线程 (来自Callable): 正在进行复杂的计算...");Thread.sleep(2000); // 模拟耗时操作return "计算完成,结果是ABC";};// 创建一个 Runnable 任务,它没有返回值Runnable runnableTask = () -> {System.out.println("子线程 (来自Runnable): 正在执行一个任务...");try {Thread.sleep(1500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}};// 使用 Callable 构造 FutureTaskFutureTask<String> futureTaskFromCallable = new FutureTask<>(callableTask);// 使用 Runnable 构造 FutureTask,需要提供一个默认的返回结果String defaultResult = "Runnable执行完毕";FutureTask<String> futureTaskFromRunnable = new FutureTask<>(runnableTask, defaultResult);// =================================================================// 特点 2 & 6: FutureTask 是一个实现了 Runnable 和 Future 的类,可以提交到 Executor 执行// =================================================================// 因为 FutureTask 实现了 Runnable,所以它可以被线程池执行ExecutorService executor = Executors.newFixedThreadPool(2);System.out.println("主线程: 将 futureTaskFromCallable 提交到线程池执行。");executor.submit(futureTaskFromCallable); // 特点6的演示// =================================================================// 特点 1 & 3: 代表可取消的异步计算,结果只能在执行完成后才能获取 (get() 会阻塞)// =================================================================System.out.println("主线程: 尝试获取 futureTaskFromCallable 的结果...");// 在任务完成前,isDone() 返回 false (特点2的演示)System.out.println("主线程: 任务完成了吗? " + futureTaskFromCallable.isDone());// 调用 get() 方法,主线程会在这里阻塞,直到子线程中的任务执行完毕 (特点3的演示)String result = futureTaskFromCallable.get(); // 阻塞点System.out.println("主线程: 终于等到了结果 - " + result); // 特点1的演示// 任务完成后,isDone() 返回 trueSystem.out.println("主线程: 任务现在完成了吗? " + futureTaskFromCallable.isDone());System.out.println("------------------------------------------");// =================================================================// 特点 4: 一旦执行完成,则不能重新开始执行,也不能取消// =================================================================System.out.println("主线程: 演示任务完成后的状态。");// 尝试取消一个已经完成的任务boolean cancelResult = futureTaskFromCallable.cancel(true);System.out.println("主线程: 尝试取消已完成的任务,结果: " + cancelResult);System.out.println("主线程: 已完成的任务被取消了吗? " + futureTaskFromCallable.isCancelled());// 再次提交已经完成的任务到线程池// 注意:这不会让任务的 run() 方法被再次执行。// FutureTask 内部有状态控制,一旦任务完成(正常、异常或取消),它的状态就不会再改变。System.out.println("主线程: 再次提交同一个已完成的FutureTask...");executor.submit(futureTaskFromCallable); // 提交是有效的,但任务的run方法不会再执行// 你可以再次 get(), 它会立刻返回之前已经计算好的结果,而不会重新计算。System.out.println("主线程: 再次get(),立即返回结果: " + futureTaskFromCallable.get());System.out.println("------------------------------------------");// 演示取消一个尚未完成的任务 (再次体现特点1)FutureTask<String> longRunningTask = new FutureTask<>(() -> {System.out.println("子线程 (长任务): 我要睡10秒,除非被中断...");Thread.sleep(10000);return "我睡醒了";});executor.submit(longRunningTask);Thread.sleep(100); // 确保任务已经开始运行System.out.println("主线程: 任务太慢了,决定取消它。");longRunningTask.cancel(true); // 尝试取消System.out.println("主线程: 长任务被取消了吗? " + longRunningTask.isCancelled());System.out.println("主线程: 长任务算'完成'了吗? " + longRunningTask.isDone());try {longRunningTask.get(); // 对已取消的任务调用get会抛出CancellationException} catch (CancellationException e) {System.out.println("主线程: 果然,获取结果时抛出了 " + e.getClass().getSimpleName());}// 关闭线程池executor.shutdown();}
}
“不可重复执行”特性(含源码)
还有一点需要注意的就是,一旦执行完成,就不能重新开始执行,也不能取消。
FutureTask
之所以再次执行没有效果,是因为它内部设计了一个“一次性”的状态机(State Machine)。一旦任务进入“完成”状态(无论是正常结束、异常终止还是被取消),它的状态就无法再回到“未开始”状态。
示例如下:
错误的做法:
Callable<String> taskLogic = () -> "Hello";
FutureTask<String> myTask = new FutureTask<>(taskLogic);executor.submit(myTask); // 第一次执行,有效
executor.submit(myTask); // 第二次提交同一个对象,无效
正确的做法:
Callable<String> taskLogic = () -> "Hello"; // 业务逻辑可以复用// 第一次执行
FutureTask<String> task1 = new FutureTask<>(taskLogic);
executor.submit(task1);
System.out.println(task1.get());// 第二次执行,必须创建新对象
FutureTask<String> task2 = new FutureTask<>(taskLogic);
executor.submit(task2);
System.out.println(task2.get());
为什么会有这种现象?下面对原理进行分析:
FutureTask
被设计的首要目的不仅仅是“去执行一个任务”,更是“持有(或代表)一个异步计算的结果”。
可以将其想象成一张一次性的彩票:
- 创建
FutureTask
: 买了一张彩票,彩票处于“未开奖”(NEW
)状态。 - 执行
run()
方法: 开奖过程开始了。这是彩票唯一一次被“使用”的机会。 - 执行完成: 开奖结束,彩票状态变为“已开奖”(
NORMAL
/EXCEPTIONAL
),结果(中奖或未中奖)也已经确定并记录在彩票上。 - 调用
get()
: 随时可以查看这张彩票的开奖结果。 - 再次执行: 能用同一张已经开过奖的彩票去参加下一轮的抽奖吗?显然不能。它的使命已经在第一次开奖时完成了。
FutureTask
也是如此。它的run()
方法被设计为最多只执行一次。一旦执行完毕,它就从一个“任务执行器”转变为一个“结果容器”。任何后续对get()
的调用都会立刻返回已经缓存的结果,而任何再次执行它的尝试都会被直接忽略。
FutureTask
内部通过一个state
字段来管理其生命周期。这个状态流转是单向的,不可逆转。
其主要状态有:
NEW
(0): 任务已创建,但尚未开始执行。这是唯一可以开始执行任务的状态。COMPLETING
(1): 任务正在执行中,即将完成但结果还未设置。这是一个临时的中间状态。NORMAL
(2): 任务已正常执行完毕,结果已经成功设置。EXCEPTIONAL
(3): 任务执行过程中抛出了异常,异常信息已被保存。CANCELLED
(4): 任务在执行完成前被取消。INTERRUPTING
(5): 任务正在被中断的过程中。INTERRUPTED
(6): 任务已经被成功中断。
关键点:一旦状态从 NEW
变为任何其他状态(如NORMAL
, EXCEPTIONAL
, CANCELLED
),就再也无法回到 NEW
状态。
FutureTask
的run()
方法完美地体现了这种状态检查机制(以下为简化后的逻辑):
public void run() {// 1. 关键检查:如果当前状态不是 NEW,或者设置执行线程失败(说明其他线程抢先执行了),// 则直接返回,不执行任何操作。if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))return;try {Callable<V> c = callable; // 获取包装的任务if (c != null && state == NEW) {V result;boolean ran;try {// 2. 真正执行任务result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;// 3. 如果有异常,设置异常结果,并将状态变为 EXCEPTIONALsetException(ex);}if (ran)// 4. 如果正常完成,设置正常结果,并将状态变为 NORMALset(result);}} finally {// 清理工作runner = null;// ...}
}
从源码可以看出,run()
方法的第一行就是一个防御性检查。如果你拿着一个已经执行过(state
不再是NEW
)的FutureTask
实例去提交给线程池,线程池调用它的run()
方法时,这个检查会直接失败,方法立即return
,任务的逻辑自然就不会被再次执行了。
set()
方法是状态变更的开始。
// FutureTask.javaprotected void set(V v) {// 使用 CAS(Compare-And-Swap)原子地将状态从 NEW 变为 COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// outcome 字段用于存储最终结果this.outcome = v;// 将最终状态设置为 NORMAL (正常完成)UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// *** 关键调用!进入收尾阶段 ***finishCompletion();}
}
- 这里有两次状态变更:首先原子地从
NEW
变成一个临时的COMPLETING
状态,这是为了防止并发的cancel()
操作。 - 然后设置
outcome
字段保存结果。 - 最后,将状态设置为最终的
NORMAL
,并调用finishCompletion()
。
finishCompletion()
—— 唤醒所有等待者,这个方法负责唤醒所有因调用get()
而被阻塞的线程。
// FutureTask.javaprivate void finishCompletion() {// 'waiters' 是一个单向链表,存储了所有正在等待结果的线程 (WaitNode)。for (WaitNode q; (q = waiters) != null;) {// 使用 CAS 将 'waiters' 链表头置为 null,确保这个唤醒过程只被执行一次。if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 遍历整个等待者链表for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;// *** 唤醒! ***// 使用 LockSupport.unpark() 唤醒那个正在等待的线程。// 那个线程会从 get() -> awaitDone() 的阻塞中醒来。LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // help GCq = next;}break;}}// 做一些完成后的清理工作done();callable = null; // a micro-optimization
}
- 核心逻辑: 该方法会原子地“摘下”整个等待者链表 (
waiters
)。 - 然后,它会遍历这个刚刚摘下的链表,并对其中的每一个节点所代表的线程,执行
LockSupport.unpark(t)
。 - 被
unpark
的线程之前正阻塞在awaitDone()
方法里,现在它被唤醒,就可以从awaitDone()
返回,接着从get()
方法拿到outcome
中存储的结果,然后继续执行。
整个流程串起来就是:
- A线程调用
futureTask.get()
。get()
发现任务未完成(state
为NEW
),于是将A线程包装成一个WaitNode
节点,加入到waiters
等待链表中,然后LockSupport.park()
让自身(A线程)挂起等待。 - B线程(线程池中的工作线程)开始执行
futureTask.run()
。 run()
方法检查状态为NEW
,通过检查,开始执行业务代码。- 业务代码执行完毕,
run()
调用set(result)
。 set(result)
将state
从NEW
最终改为NORMAL
,把结果存入outcome
,然后调用finishCompletion()
。finishCompletion()
遍历waiters
链表,找到代表A线程的节点,并调用LockSupport.unpark(A线程)
。- A线程被唤醒,从
get()
方法中返回,并读取outcome
字段中已经准备好的结果。
因为state
的状态被永久地改变为了NORMAL
,并且finishCompletion
保证了所有等待者都被唤醒,这个FutureTask
的使命就此终结。任何后续对run()
的调用都会在第一步的if (state != NEW)
检查中失败,从而实现了“一次性”的语义。
从原始FutureTask对象中取值(含源码)
将一个已经创建好的FutureTask
对象提交给ExecutorService
时,submit
方法会返回一个新的、包装了你原始任务的Future对象。而计算的真正结果,仍然存储在原始的那个FutureTask
对象里。调用这个新的包装对象的get()
方法,往往得到的是null
,而不是想要的结果。
代码示例:
import java.util.concurrent.*;public class FutureTaskGetDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建一个 Callable,这是我们真正的业务逻辑Callable<String> myCallable = () -> {System.out.println("子线程: 正在执行核心业务逻辑...");Thread.sleep(1000);return "这是真正的计算结果";};// 2. 创建我们自己的、原始的 FutureTask 对象// 我们需要用这个对象来获取最终结果FutureTask<String> originalFutureTask = new FutureTask<>(myCallable);ExecutorService executor = Executors.newSingleThreadExecutor();// 3. 将我们原始的 FutureTask 提交给线程池// submit(Runnable) 会返回一个新的 Future 对象,我们称之为 wrapperFutureFuture<?> wrapperFuture = executor.submit(originalFutureTask);// 4. 分别从两个 Future 对象中获取结果// 错误的做法:尝试从 submit 方法返回的 Future 中获取结果// 这个 get() 等待的是 wrapperFuture 的完成,其结果是 nullObject resultFromWrapper = wrapperFuture.get();System.out.println("从 submit() 返回的 wrapperFuture.get() 拿到的结果是: " + resultFromWrapper);System.out.println("wrapperFuture is done: " + wrapperFuture.isDone());System.out.println("-------------------------------------------------");// 正确的做法:从我们自己创建的原始 FutureTask 对象中获取结果// 这个 get() 获取的是存储在 originalFutureTask 内部的真实结果String resultFromOriginal = originalFutureTask.get();System.out.println("从原始的 originalFutureTask.get() 拿到的结果是: " + resultFromOriginal);System.out.println("originalFutureTask is done: " + originalFutureTask.isDone());executor.shutdown();}
}
运行结果:
子线程: 正在执行核心业务逻辑...
从 submit() 返回的 wrapperFuture.get() 拿到的结果是: null
wrapperFuture is done: true
-------------------------------------------------
从原始的 originalFutureTask.get() 拿到的结果是: 这是真正的计算结果
originalFutureTask is done: true
从源码的角度看这种原因:
第1步: 进入 ExecutorService.submit()
FutureTask
本身实现了Runnable
接口,所以这里实际调用的是submit(Runnable task)
。我们看AbstractExecutorService
中的实现:
// AbstractExecutorService.javapublic Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();// 关键!它调用了 newTaskFor 来创建一个新的 RunnableFuture 对象。RunnableFuture<Void> ftask = newTaskFor(task, null); // 然后执行这个新创建的任务execute(ftask);// 最后返回这个新创建的任务return ftask;
}
初始的FutureTask被作为对象传入,newTaskFor(originalFutureTask, null)
被调用了。这个方法默认会返回一个新的FutureTask
实例
所以,代码等价于:RunnableFuture<Void> ftask = new FutureTask<Void>(originalFutureTask, null);
现在你就有了两个FutureTask
:
originalFutureTask
: 你自己创建的,它包装了myCallable
,期望的结果是String
类型。wrapperFuture
(即ftask
):submit
方法内部创建的,它包装了originalFutureTask
,期望的结果是null
。
第2步: 任务执行
线程池实际执行的是外层的wrapperFuture
。
wrapperFuture.run()
被调用。wrapperFuture
的Callable
是什么?是originalFutureTask
自己(因为它实现了Runnable
)。所以,wrapperFuture
的run
方法内部会调用originalFutureTask.run()
。originalFutureTask.run()
被调用。它会执行最初的myCallable
,计算出结果“真正的结果”。originalFutureTask
执行成功后,会调用set("真正的结果")
,将结果保存在自己的outcome
字段中。
第3步: finishCompletion()
和关键的 null
操作
originalFutureTask
在调用set()
并最终进入finishCompletion()
方法后,会执行一项重要的清理工作。
// FutureTask.javaprivate void finishCompletion() {// ... (唤醒等待者线程的代码) ...done(); // 这是一个空方法,留给子类扩展// *** 最关键的一行 ***// 为了帮助垃圾回收,将内部的 callable 引用置为 null。// 因为任务已经执行完了,理论上不再需要它了。callable = null;
}
在originalFutureTask
完成它的使命后,它扔掉了对myCallable
的引用。它的outcome
字段已经安全地保存了结果“真正的结果”。
第4步: 结果返回
- 当
originalFutureTask.run()
执行完毕后,wrapperFuture
的run()
方法也随之结束。 wrapperFuture
也成功完成了,它会调用set(null)
(因为创建它的时候newTaskFor(task, null)
的第二个参数是null
)。所以wrapperFuture
的outcome
字段保存的是null
。
结论:
- 如果你调用
wrapperFuture.get()
,你得到的是wrapperFuture
的outcome
,也就是null
。 - 如果你调用
originalFutureTask.get()
,你得到的是originalFutureTask
的outcome
,也就是 "真正的结果"。
新返回Future对象的作用
wrapperFuture
的核心价值在于提供统一的控制和状态管理接口,而不是为了传递结果。它是ExecutorService
框架为了保持API一致性和健壮性而设计的关键一环。
任务取消是wrapperFuture
最重要的用途之一。当你把任务提交给ExecutorService
后,你手中唯一能直接操作的句柄就是submit
方法返回的wrapperFuture
。你需要通过它来尝试取消任务的执行。
当你在wrapperFuture
上调用cancel()
时,它会将这个取消请求传播给内部包装的originalFutureTask
。
FutureTask.cancel()
方法会检查任务状态,如果任务还没开始跑,就将其状态设置为CANCELLED
。如果任务正在跑,它会根据你传入的mayInterruptIfRunning
参数来决定是否要中断执行该任务的线程。
import java.util.concurrent.*;public class WrapperFutureCancelDemo {public static void main(String[] args) throws InterruptedException {FutureTask<String> originalFutureTask = new FutureTask<>(() -> {System.out.println("子线程: 任务开始,准备睡5秒...");try {Thread.sleep(5000);} catch (InterruptedException e) {// cancel(true) 会让线程在这里抛出中断异常System.out.println("子线程: 我被中断了,任务提前结束!");return "未完成的结果";}System.out.println("子线程: 任务正常完成。");return "已完成的结果";});ExecutorService executor = Executors.newSingleThreadExecutor();// 提交后,我们只能通过 wrapperFuture 来控制这个任务Future<?> wrapperFuture = executor.submit(originalFutureTask);// 让主线程睡1秒,确保子任务已经开始运行Thread.sleep(1000);System.out.println("主线程: 任务已经跑了1秒,现在决定取消它。");// 调用 wrapperFuture 的 cancel(true) 来中断任务boolean cancelled = wrapperFuture.cancel(true); // true表示如果任务正在运行,就中断它System.out.println("主线程: 任务取消成功了吗? " + cancelled);// isCancelled() 也会被传播System.out.println("主线程: wrapperFuture 的状态是否是已取消? " + wrapperFuture.isCancelled());System.out.println("主线程: originalFutureTask 的状态是否是已取消? " + originalFutureTask.isCancelled());executor.shutdown();}
}
运行结果:
子线程: 任务开始,准备睡5秒...
主线程: 任务已经跑了1秒,现在决定取消它。
子线程: 我被中断了,任务提前结束!
主线程: 任务取消成功了吗? true
主线程: wrapperFuture 的状态是否是已取消? true
主线程: originalFutureTask 的状态是否是已取消? true
在这个例子中,完全没有用到get()
,但wrapperFuture.cancel()
成功地管理了任务的生命周期。
虽然wrapperFuture.get()
返回的是null
,但它依然是一个阻塞方法。它的作用是让当前线程等待,直到任务执行完成。这是一个非常重要的同步机制。
更重要的是,如果你的原始任务在执行时抛出了异常,调用wrapperFuture.get()
会重新抛出这个异常(包装在ExecutionException
中)。这使得主线程能够捕获并处理后台任务的错误。
import java.util.concurrent.*;public class WrapperFutureExceptionDemo {public static void main(String[] args) {FutureTask<String> originalFutureTask = new FutureTask<>(() -> {System.out.println("子线程: 任务开始,即将抛出异常!");throw new RuntimeException("计算出错!");});ExecutorService executor = Executors.newSingleThreadExecutor();Future<?> wrapperFuture = executor.submit(originalFutureTask);try {System.out.println("主线程: 调用 wrapperFuture.get() 等待任务完成...");// 这里会阻塞,直到任务结束。因为任务抛了异常,get()也会抛出异常。wrapperFuture.get();} catch (InterruptedException e) {System.err.println("主线程: 等待时被中断了。");} catch (ExecutionException e) {System.err.println("主线程: 成功捕获到后台任务的异常!");System.err.println(" - 根本原因: " + e.getCause());}executor.shutdown();}
}
运行结果:
子线程: 任务开始,即将抛出异常!
主线程: 调用 wrapperFuture.get() 等待任务完成...
主线程: 成功捕获到后台任务的异常!- 根本原因: java.lang.RuntimeException: 计算出错!
即便不关心返回值,wrapperFuture.get()
在错误处理和流程同步上也是不可或缺的。
ExecutorService
的设计哲学是提供一套统一、可预测的接口。
submit(Callable<T> task)
返回Future<T>
submit(Runnable task, T result)
返回Future<T>
submit(Runnable task)
返回Future<?>
无论你提交什么类型的任务,submit
方法总是返回一个Future
对象。这让使用者可以依赖一个统一的模型来管理所有异步任务,而不必写if-else
来判断提交的是Callable
还是Runnable
。wrapperFuture
(Future<?>
)正是这个统一模型中,用于代表“无返回值任务”的那个标准占位符。
总结来说,新返回的Future对象能够安全、统一地管理后台任务,即便并不关心那个任务的返回值。