当前位置: 首页 > news >正文

深入解析FutureTask:原理与实战

我们来深入解析 FutureTask。下面将从它们的用法开始,逐步深入到底层实现、方法和接口设计,并探讨它们对于我们自己设计实现以及其他编程实践的学习意义。

主要就是放入等待队列(CAS操作一个链表头),完成任务的线程唤醒等待线程继续逻辑。

FutureTask 深入解析

FutureTask 是一个可取消的异步计算任务。它实现了 RunnableFuture 接口,该接口同时继承了 Runnable 和 Future 接口。这意味着 FutureTask 既可以作为一个 Runnable 被执行器(如 ExecutorService)执行,也可以作为一个 Future 来获取异步计算的结果、查询计算状态或取消计算。

FutureTask 通常用于包装一个 Callable 或 Runnable 对象。

基本用法示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;public class FutureTaskExample {public static void main(String[] args) throws Exception {// 1. 创建 Callable 任务Callable<String> callableTask = () -> {System.out.println(Thread.currentThread().getName() + " is executing callable task...");TimeUnit.SECONDS.sleep(2); // 模拟耗时操作return "Callable Result";};// 2. 使用 Callable 创建 FutureTaskFutureTask<String> futureTask1 = new FutureTask<>(callableTask);// 也可以使用 Runnable 创建 FutureTask (通常需要提供一个结果,若无则为 null)Runnable runnableTask = () -> {System.out.println(Thread.currentThread().getName() + " is executing runnable task...");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {Thread.currentThread().interrupt();}};FutureTask<String> futureTask2 = new FutureTask<>(runnableTask, "Runnable Result (if provided)");// 3. 提交 FutureTask 到 ExecutorService 执行// FutureTask 本身就是 Runnable,可以直接被线程执行或提交给 ExecutorServiceExecutorService executor = Executors.newFixedThreadPool(2);executor.submit(futureTask1);executor.submit(futureTask2);// 或者直接 new Thread(futureTask1).start();System.out.println("Tasks submitted.");// 4. 获取结果 (get() 方法会阻塞直到任务完成)try {System.out.println("Waiting for futureTask1 result...");String result1 = futureTask1.get(); // 阻塞等待System.out.println("futureTask1 result: " + result1);System.out.println("Waiting for futureTask2 result with timeout...");// get(long timeout, TimeUnit unit) 可以设置超时String result2 = futureTask2.get(2, TimeUnit.SECONDS);System.out.println("futureTask2 result: " + result2);} catch (InterruptedException e) {System.err.println("Task interrupted: " + e.getMessage());} catch (java.util.concurrent.ExecutionException e) {System.err.println("Task execution failed: " + e.getCause());} catch (java.util.concurrent.TimeoutException e) {System.err.println("Task timed out: " + e.getMessage());}// 5. 检查任务状态和取消if (!futureTask1.isDone()) {System.out.println("futureTask1 is not done yet.");}if (futureTask1.isCancelled()) {System.out.println("futureTask1 was cancelled.");}// 尝试取消一个未完成的任务FutureTask<Integer> cancellableTask = new FutureTask<>(() -> {TimeUnit.SECONDS.sleep(5);return 100;});new Thread(cancellableTask).start();Thread.sleep(100); // 给任务一点时间启动boolean cancelled = cancellableTask.cancel(true); // true 表示如果任务正在运行,则中断它System.out.println("CancellableTask cancelled: " + cancelled);System.out.println("CancellableTask isCancelled: " + cancellableTask.isCancelled());System.out.println("CancellableTask isDone: " + cancellableTask.isDone()); // cancel 后 isDone() 也为 trueexecutor.shutdown();}
}

FutureTask 是 Java 并发包中一个非常核心的类,它代表一个可取消的异步计算。它巧妙地结合了 Future 接口(用于获取异步计算的结果)和 Runnable 接口(使得它可以被 Executor 执行)。

1. 状态管理 (State Management)

FutureTask 内部维护一个 volatile int state 字段来表示任务的当前状态。状态包括:

  • NEW: 初始状态,任务尚未开始或正在运行。

  • COMPLETING: 任务已完成,正在设置结果(一个短暂的中间状态)。

  • NORMAL: 任务正常完成,结果已设置。

  • EXCEPTIONAL: 任务因抛出异常而完成,异常已设置。

  • CANCELLED: 任务被取消(在开始运行前)。

  • INTERRUPTING: 任务被取消,并且正在尝试中断运行任务的线程(一个短暂的中间状态)。

  • INTERRUPTED: 任务被取消,并且运行任务的线程已被中断。

状态之间的转换通过 CAS (Compare-And-Set) 操作(使用 VarHandle STATE)来保证原子性。

2. 任务执行 (run() 方法)

FutureTaskrun() 方法被调用时(通常由一个 Executor 的工作线程调用):

  1. 首先会通过 CAS 操作尝试将 runner 字段(volatile Thread runner)从 null 设置为当前线程。这确保了只有一个线程可以实际执行任务。

  2. 如果设置成功并且任务状态是 NEW,则会调用内部的 Callable 对象的 call() 方法。

  3. 如果 call() 方法正常返回,则调用 set(V result) 方法设置结果,并将状态转换为 NORMAL

  4. 如果 call() 方法抛出异常,则调用 setException(Throwable t) 方法设置异常,并将状态转换为 EXCEPTIONAL

  5. finally 块中,runner 字段会被重置为 null。还会检查任务是否在运行期间被取消并需要中断(状态为 INTERRUPTINGINTERRUPTED),如果是,则会调用 handlePossibleCancellationInterrupt() 处理。

3. 获取结果 (get()get(long, TimeUnit) 方法)

  • get() 方法:

    • 首先检查当前状态 s = state

    • 如果任务尚未完成 (s <= COMPLETING),则调用 awaitDone(boolean timed, long nanos) 方法阻塞等待。

    • 一旦任务完成(状态变为 NORMAL, EXCEPTIONAL, CANCELLED, 或 INTERRUPTED),awaitDone 返回,然后 get() 方法调用 report(int s) 来返回结果或抛出相应的异常。

    • NORMAL: 返回结果。

    • EXCEPTIONAL: 抛出 ExecutionException (包装了原始异常)。

    • CANCELLEDINTERRUPTED: 抛出 CancellationException

  • get(long, TimeUnit)** 方法**:类似 get(),但带有超时机制。如果在超时时间内任务未完成,则抛出 TimeoutException

4. 取消任务 (cancel(boolean mayInterruptIfRunning) 方法)

  1. 尝试通过 CAS 将状态从 NEW 转换为 CANCELLED (如果 mayInterruptIfRunningfalse) 或 INTERRUPTING (如果 mayInterruptIfRunningtrue)。

  2. 如果 CAS 成功:

    1. 如果 mayInterruptIfRunningtrue

      • 获取 runner 线程。

      • 如果 runner 不为 null,则调用 runner.interrupt() 来中断执行任务的线程。

      • finally 块中,将状态设置为 INTERRUPTED (使用 STATE.setRelease 保证内存可见性)。

    2. 最后,调用 finishCompletion() 来唤醒所有等待的线程。

  3. 如果 CAS 失败(例如任务已经完成或已被取消),则返回 false

5. 等待队列 (WaitNodewaiters 字段)

  • private volatile WaitNode waiters;:这是一个指向等待线程链表头部的指针。这个链表是一个简单的 Treiber 栈 (LIFO 栈)。

  • WaitNode 是一个静态内部类,代码如下:

// ... 
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } 
} 
// ... 

每个 WaitNode 封装了一个等待结果的线程 (thread = Thread.currentThread()) 和一个指向下一个节点的指针 (next)。

6. 阻塞和唤醒机制 (awaitDone()finishCompletion())

  • awaitDone(boolean timed, long nanos)

当一个线程调用 get() 并且任务未完成时,会进入此方法。它会创建一个新的 WaitNode,然后在一个循环中:

  1. 通过 CAS 将新的 WaitNode 添加到 waiters 链表的头部(实现入栈)。

  2. 再次检查任务状态,如果已完成,则移除刚添加的节点并返回状态。

  3. 如果任务仍未完成,则调用 LockSupport.park(this) (或 LockSupport.parkNanos(this, nanos)) 使当前线程阻塞。

  4. 当线程被唤醒时,如果是因为中断,则从等待队列中移除节点并抛出 InterruptedException。如果是因为超时,则从等待队列中移除节点并返回当前状态。

  • finishCompletion()

当任务完成(通过 set, setException, 或 cancel)时,此方法被调用。

  1. 它会遍历 waiters 链表,并对每个 WaitNode 中的线程调用 LockSupport.unpark(q.thread) 来唤醒它们。

  2. 遍历完成后,调用 done() 方法(这是一个空方法,供子类覆盖以执行完成回调)。

  3. 最后将 callable 设为 null 以帮助 GC。

哪个线程负责管理唤醒 get 等待的线程?

负责唤醒等待线程的是完成任务的那个线程。具体来说:

  • 如果是任务正常执行完成或抛出异常,那么是执行 run() 方法的线程(即 runner 线程)在调用 set()setException() 后,最终会调用 finishCompletion() 来唤醒所有等待者。

  • 如果是任务被取消,那么是调用 cancel() 方法的线程在成功取消任务后,会调用 finishCompletion() 来唤醒所有等待者。

等待 get() 方法结果的线程被封装在 WaitNode 对象中。每个 WaitNode 包含:

  • volatile Thread thread;: 对等待线程本身的引用。

  • volatile WaitNode next;: 指向链表中下一个 WaitNode 的引用。

这些 WaitNode 对象形成一个后进先出 (LIFO) 的栈式链表,其头节点由 FutureTaskvolatile WaitNode waiters; 字段指向。当一个线程需要等待时,它会创建一个新的 WaitNode 并将其 CAS 到 waiters 链表的头部。当任务完成时,finishCompletion() 方法会遍历这个链表并唤醒每个节点中的线程。

设计优势

这种设计避免了使用更重的锁(如 AbstractQueuedSynchronizer,早期版本的 FutureTask 使用过它),转而使用轻量级的 CAS 操作和 LockSupport 进行线程的阻塞和唤醒,这在很多情况下能提供更好的性能。

相关文章:

  • 【从0-1的HTML】第3篇:html引入css的3种方式
  • Golang——7、包与接口详解
  • Redisson - 实现延迟队列
  • DAY43 复习日
  • 压测软件-Jmeter
  • MySQL 9.0 相较于 MySQL 8.0 引入了多项重要改进和新特性
  • 机器学习基础(四) 决策树
  • 时序数据库IoTDB与EdgeX Foundry集成适配服务介绍
  • 记一次运行spark报错
  • 使用 Ansys Q3D 进行电容提取
  • 离散化思想
  • C++初阶 | 模板
  • 黑枸杞成分对抗结肠炎:微生物组学视角下的新发现与突破
  • matlab实现高斯烟羽模型算法
  • 【C语言】通用统计数据结构及其更新函数(最值、变化量、总和、平均数、方差等)
  • ABP-Book Store Application中文讲解 - Part 8: Authors: Application Layer
  • 苹果企业签名撤销
  • powershell 查当前用户和域名
  • Python 区块链开发实战:从零到一构建智能合约
  • 【手写系列】手写动态代理
  • 石家庄网站建设电商/广州百度seo优化排名
  • 高米店网站建设公司/b2b商务平台
  • 网站建设费计入哪个科目/驾校推广网络营销方案
  • 襄阳网站seo厂家/360推广登录平台
  • 微信网站域名/北京百度推广排名优化
  • 做鞋子批发网站有哪些/个人博客模板