当前位置: 首页 > news >正文

AbstractExecutorService:Java并发核心模板解析

AbstractExecutorService

AbstractExecutorService 是 Java 并发框架中的一个核心基类,它为 ExecutorService 接口提供了默认的实现。它的设计精妙地运用了模板方法模式,极大地简化了自定义线程池或其他执行器服务的创建过程。

public abstract class AbstractExecutorService implements ExecutorService {
  • implements ExecutorService: 这说明该类遵循 ExecutorService 接口的契约。ExecutorService 接口继承自 Executor 接口,并增加了任务生命周期管理(如 shutdownisTerminated)和处理 Future 的方法(如 submitinvokeAllinvokeAny)。

AbstractExecutorService 的核心作用是:它实现了 ExecutorService 中那些逻辑复杂、通用的方法(如 submitinvokeAllinvokeAny),而将最基本、最核心的任务执行逻辑——execute(Runnable command) 方法——留给子类去实现。

任何想要成为 ExecutorService 的自定义类,只需要继承 AbstractExecutorService,并实现以下几个关键方法即可:

  • execute(Runnable command)必须实现。定义了如何执行一个 Runnable 任务。
  • shutdown()必须实现。定义如何平滑地关闭执行器。
  • shutdownNow()必须实现。定义如何立即关闭执行器。
  • isShutdown()必须实现
  • isTerminated()必须实现
  • awaitTermination(long timeout, TimeUnit unit)必须实现

核心功能 1:任务的提交 (submit 方法)

AbstractExecutorService 提供了 submit 方法的三种重载形式,它们是 ExecutorService 接口的标准方法。这些方法的作用是接收一个任务(Runnable 或 Callable),并返回一个 Future 对象,用于追踪任务的执行状态和获取结果。

它们的实现逻辑非常相似且清晰:

// ... existing code .../*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException       {@inheritDoc}*/@Overridepublic Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException       {@inheritDoc}*/@Overridepublic <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException       {@inheritDoc}*/@Overridepublic <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
// ... existing code ...

分析其步骤:

  1. 参数校验: 检查传入的 task 是否为 null,如果是则抛出 NullPointerException
  2. 任务封装: 调用 newTaskFor(...) 方法将 Runnable 或 Callable 封装成一个 RunnableFuture 对象。RunnableFuture 既是一个 Runnable(可以被执行),也是一个 Future(可以获取结果)。默认情况下,newTaskFor 会创建一个 FutureTask 实例。
  3. 执行任务: 调用 execute(ftask)。这里的 execute 方法是抽象的,它将实际的执行策略完全委托给了子类。例如,ThreadPoolExecutor 的 execute 方法会将任务放入工作队列,并可能创建新的工作线程来执行它。
  4. 返回 Future: 将封装好的 ftask 对象返回给调用者。

这个过程完美地体现了模板方法模式:submit 定义了“提交任务”这一行为的整体流程框架,而将最关键的“如何执行”这一步交由子类实现。

核心功能 2:批量执行任务 (invokeAll 方法)

invokeAll 方法用于执行一个任务集合,并等待所有任务都完成后返回一个 Future 列表。

// ... existing code ...@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try { f.get(); }catch (CancellationException | ExecutionException ignore) {}}}return futures;} catch (Throwable t) {cancelAll(futures);throw t;}}
// ... existing code ...

无超时版本的 invokeAll 逻辑如下:

  1. 遍历所有任务,将它们一一封装成 RunnableFuture,并调用 execute 提交执行。
  2. 再次遍历 Future 列表,对每个尚未完成的 Future 调用 get() 方法。这会阻塞当前线程,直到该任务完成。
  3. get() 可能抛出的 CancellationException 或 ExecutionException 在这里被忽略,因为 invokeAll 的契约是等待所有任务完成(无论是正常结束、异常结束还是被取消)。
  4. 如果中途发生任何其他异常(如 InterruptedException),finally 块中的 cancelAll 会确保取消所有已提交的任务。

带超时的 invokeAll 版本逻辑更复杂,它需要处理超时,并在超时后取消所有未完成的任务。

核心功能 3:竞速执行任务 (invokeAny 方法)

invokeAny 是最复杂的一个方法。它执行一个任务集合,并返回第一个成功完成的任务的结果。一旦有一个任务成功,其他所有任务都会被取消。

// ... existing code ...private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {
// ... existing code ...ArrayList<Future<T>> futures = new ArrayList<>(ntasks);ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);try {
// ... existing code ...for (;;) {Future<T> f = ecs.poll(); // 尝试获取一个已完成的任务if (f == null) {if (ntasks > 0) {--ntasks;futures.add(ecs.submit(it.next())); // 提交新任务++active;}
// ... existing code ...}if (f != null) {--active;try {return f.get(); // 成功获取结果,直接返回} catch (ExecutionException eex) {ee = eex; // 任务执行失败,记录异常,继续等下一个} catch (RuntimeException rex) {ee = new ExecutionException(rex);}}}if (ee == null)ee = new ExecutionException();throw ee; // 所有任务都失败了} finally {cancelAll(futures); // 无论如何,最后取消所有任务}}
// ... existing code ...

其实现巧妙地利用了 ExecutorCompletionService

  1. ExecutorCompletionService: 这是一个辅助类,它将 Executor 和 BlockingQueue 结合起来。当通过它 submit 任务时,任务完成后其 Future 会被放入一个内部队列。可以通过 poll 或 take 方法获取下一个已完成任务的 Future,而无需自己遍历检查。
  2. 增量提交: 它不是一次性提交所有任务,而是采用一种“提交一个,检查一个”的策略。它先提交一个任务,然后进入循环。
  3. 循环检查: 在循环中,它使用 ecs.poll() 尝试从完成队列中获取一个 Future
  4. 处理结果:
    • 如果获取到了 Future (f != null),就调用 f.get()。如果 get() 成功返回结果,doInvokeAny 方法立即将此结果返回。如果 get() 抛出异常,说明这个任务失败了,记录下异常,然后继续循环,等待下一个任务完成。
    • 如果没有获取到 Future (f == null),并且还有未提交的任务,就再提交一个新任务。
  5. 最终处理:
    • 如果所有任务都执行完毕,但没有一个成功(即都抛出了异常),则将最后一个捕获到的 ExecutionException 抛出。
    • finally 块是关键:无论方法是正常返回、还是抛出异常,它都会调用 cancelAll 来取消所有已提交的任务,确保资源被释放,这符合 invokeAny 的语义。

可扩展点 (newTaskFor 方法)

// ... existing code ...protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
// ... existing code ...

这两个 protected 方法是 AbstractExecutorService 留给子类的“钩子”(Hook)。默认情况下,它们使用标准的 FutureTask 来包装任务。但是,子类可以重写这两个方法,返回自定义的 RunnableFuture 实现。这提供了极大的灵活性,比如可以创建一个带有任务优先级、或能记录更多统计信息的自定义 Future 类型。

总结

AbstractExecutorService 是一个杰出的模板方法模式应用范例。它为 ExecutorService 的实现者们处理了所有复杂的、通用的流程控制逻辑,如任务提交、批量执行、竞速执行等。实现者只需要继承它,并专注于最核心的 execute 逻辑以及线程池的生命周期管理,就可以快速、可靠地构建出一个功能完备的执行器服务。这种设计大大降低了并发编程的门槛,并提高了代码的复用性。

ExecutorCompletionService 

ExecutorCompletionService 解决了并发编程中一个常见的需求:将任务的提交与任务完成结果的获取解耦。当你向线程池提交一批任务时,你往往希望能够获取那些先完成的任务的结果,而不是按照提交的顺序来等待。ExecutorCompletionService 正是为了这个场景而设计的。

public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor;private final AbstractExecutorService aes;private final BlockingQueue<Future<V>> completionQueue;// ...
}
  • implements CompletionService<V>: 它实现了 CompletionService 接口,这个接口定义了任务提交 (submit) 和获取已完成任务 (takepoll) 的规范。
  • 核心思想ExecutorCompletionService 本身并不执行任务。它像一个“中间人”,内部组合了一个 Executor(通常是线程池)来真正执行任务,同时维护一个阻塞队列 BlockingQueue(完成队列),用于存放已完成任务的 Future
  • 成员变量:
    • private final Executor executor: 这是实际执行任务的执行器。你在构造 ExecutorCompletionService 时必须提供一个。
    • private final AbstractExecutorService aes: 这是一个优化。如果传入的 executor 恰好是 AbstractExecutorService 的实例,它会保留一个引用,以便后续可以调用 aes.newTaskFor() 方法,这允许重用 Executor 自定义的 FutureTask 类型。
    • private final BlockingQueue<Future<V>> completionQueue: 这是整个机制的核心。当一个任务执行完毕后,它的 Future 对象会被放入这个队列中。

任务提交 (submit 方法)

submit 方法的实现是理解 ExecutorCompletionService 工作原理的关键。

// ... existing code ...public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture<V>(f, completionQueue));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture<V>(f, completionQueue));return f;}
// ... existing code ...

让我们分解 submit(Callable<V> task) 的执行步骤:

  1. RunnableFuture<V> f = newTaskFor(task);: 首先,它将用户提交的 Callable 任务封装成一个 RunnableFuture(通常是 FutureTask)。这个 f 就是最终会返回给调用者的 Future 对象。
  2. executor.execute(new QueueingFuture<V>(f, completionQueue));: 这是最精妙的一步。它没有直接把 f 交给 executor 执行。而是创建了一个新的、私有的内部类 QueueingFuture 的实例,并将原始的 Future (f) 和完成队列 (completionQueue) 作为参数传给它。然后,它将这个 QueueingFuture 对象提交给 executor 执行。
  3. return f;: 它将第 1 步创建的原始 Future (f) 立即返回给调用者。

所以,调用者拿到的是任务本身的 Future,而线程池真正在执行的是一个包装了该 Future 的 QueueingFuture

魔法的幕后推手 (QueueingFuture 内部类)

这个私有内部类是实现“完成后放入队列”这一逻辑的关键。

// ... existing code .../*** FutureTask extension to enqueue upon completion.*/private static class QueueingFuture<V> extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task,BlockingQueue<Future<V>> completionQueue) {super(task, null);this.task = task;this.completionQueue = completionQueue;}private final Future<V> task;private final BlockingQueue<Future<V>> completionQueue;protected void done() { completionQueue.add(task); }}
// ... existing code ...
  • extends FutureTask<Void>QueueingFuture 继承自 FutureTask,所以它本身也是一个 Runnable,可以被 executor 执行。
  • 构造函数: 它接收原始任务的 Future (task) 和完成队列,并把它们保存起来。super(task, null) 的调用意味着,当这个 QueueingFuture 被执行时,它实际上会去执行它所包装的 task
  • protected void done(): 这是 FutureTask 提供的一个钩子方法(Hook)。当 FutureTask 的任务(在这里就是它包装的 task)执行完成时(无论是正常结束、异常退出还是被取消),done() 方法会被自动回调。
  • completionQueue.add(task);: 在 done() 方法中,它所做的唯一一件事就是将被包装的、现已完成的原始 Future (task) 添加到完成队列中。

整个流程串起来就是

  1. submit 方法创建一个 Future (f) 和一个包装它的 QueueingFuture (qf)。
  2. qf 被交给线程池执行。
  3. 线程池中的某个线程执行 qf,这会导致 f 被执行。
  4. 当 f 执行完毕后,qf 的 done() 方法被触发。
  5. done() 方法将 f 放入 completionQueue

获取结果 (take 和 poll 方法)

获取结果的方法就非常简单了,它们只是简单地委托给内部的完成队列。

// ... existing code ...public Future<V> take() throws InterruptedException {return completionQueue.take();}public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException {return completionQueue.poll(timeout, unit);}
// ... existing code ...
  • take(): 从完成队列中取出一个 Future。如果队列为空,调用线程会阻塞,直到有任务完成并将其 Future 放入队列。
  • poll(): 从完成队列中取出一个 Future。如果队列为空,它会立即返回 null,不会阻塞。
  • poll(long timeout, TimeUnit unit): 带超时的版本,如果在指定时间内没有任务完成,则返回 null

因为任务的 Future 是在它完成后才被放入队列的,所以通过 take() 或 poll() 取出 Future 的顺序,就是这些任务实际完成的顺序。

总结

ExecutorCompletionService 是一个优雅的装饰器模式和回调机制的应用。它通过一个包装类 (QueueingFuture) 和一个阻塞队列,巧妙地将任务的执行与结果的消费分离开来,实现了“按完成顺序获取结果”的功能。

这在处理大量异构任务(执行时间不同)并希望尽快处理已完成任务结果的场景中非常有用。例如,同时向多个Web服务发起请求,并希望处理任何一个先返回的响应,而不是死板地按请求发出的顺序等待。我们之前分析的 AbstractExecutorService.invokeAny 方法,其内部就是利用了 ExecutorCompletionService 来实现其功能的。

http://www.dtcms.com/a/312073.html

相关文章:

  • 深入 Go 底层原理(一):Slice 的实现剖析
  • 二叉树链式结构的实现
  • lesson31:Python异常处理完全指南:从基础到高级实践
  • 乌鸫科技前端二面
  • Go语言中的闭包详解
  • OpenCV学习 day3
  • stm32是如何实现电源控制的?
  • 如何防止内存攻击(Buffer Overflow, ROP)
  • 髋臼方向的定义与测量-I
  • u-boot启动过程(NXP6ULL)
  • android studio 安装Flutter
  • WD5208S,12V500MA,应用于小家电电源工业控制领域
  • Kubernetes 构建高可用、高性能 Redis 集群实战指南
  • #C语言——学习攻略:探索字符函数和字符串函数(一)--字符分类函数,字符转换函数,strlen,strcpy,strcat函数的使用和模拟实现
  • 数据库理论
  • 【MATLAB】(五)向量
  • 变量筛选—随机森林特征重要性
  • windows@Path环境变量中同名可执行文件优先级竞争问题@Scoop安装软件命令行启动存在同名竞争问题的解决
  • 解决 InputStream 只能读取一次问题
  • Java语言核心特性全解析:从面向对象到跨平台原理
  • Docker--将非root用户添加docker用户组,解决频繁sudo执行输入密码的问题
  • 【动态规划 | 子序列问题】子序列问题的最优解:动态规划方法详解
  • RK628F HDMI-IN调试:应用接口使用
  • Vulnhub ELECTRICAL靶机复现(附提权)
  • QPainter::CompositionMode解析
  • junit总@mockbaen与@mock的区别与联系
  • flutter分享到支付宝
  • Linux进程控制核心:创建·等待·终止·替换
  • Qt 信号和槽正常连接返回true,但发送信号后槽函数无响应问题【已解决】
  • 深入解析Java Stream Sink接口