AbstractExecutorService:Java并发核心模板解析
AbstractExecutorService
AbstractExecutorService
是 Java 并发框架中的一个核心基类,它为 ExecutorService
接口提供了默认的实现。它的设计精妙地运用了模板方法模式,极大地简化了自定义线程池或其他执行器服务的创建过程。
public abstract class AbstractExecutorService implements ExecutorService {
implements ExecutorService
: 这说明该类遵循ExecutorService
接口的契约。ExecutorService
接口继承自Executor
接口,并增加了任务生命周期管理(如shutdown
,isTerminated
)和处理Future
的方法(如submit
,invokeAll
,invokeAny
)。
AbstractExecutorService
的核心作用是:它实现了 ExecutorService
中那些逻辑复杂、通用的方法(如 submit
, invokeAll
, invokeAny
),而将最基本、最核心的任务执行逻辑——execute(Runnable command)
方法——留给子类去实现。
任何想要成为 ExecutorService
的自定义类,只需要继承 AbstractExecutorService
,并实现以下几个关键方法即可:
execute(Runnable command)
: 必须实现。定义了如何执行一个Runnable
任务。shutdown()
: 必须实现。定义如何平滑地关闭执行器。shutdownNow()
: 必须实现。定义如何立即关闭执行器。isShutdown()
: 必须实现。isTerminated()
: 必须实现。awaitTermination(long timeout, TimeUnit unit)
: 必须实现。
核心功能 1:任务的提交 (submit
方法)
AbstractExecutorService
提供了 submit
方法的三种重载形式,它们是 ExecutorService
接口的标准方法。这些方法的作用是接收一个任务(Runnable
或 Callable
),并返回一个 Future
对象,用于追踪任务的执行状态和获取结果。
它们的实现逻辑非常相似且清晰:
// ... existing code .../*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/@Overridepublic Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/@Overridepublic <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/@Overridepublic <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
// ... existing code ...
分析其步骤:
- 参数校验: 检查传入的
task
是否为null
,如果是则抛出NullPointerException
。 - 任务封装: 调用
newTaskFor(...)
方法将Runnable
或Callable
封装成一个RunnableFuture
对象。RunnableFuture
既是一个Runnable
(可以被执行),也是一个Future
(可以获取结果)。默认情况下,newTaskFor
会创建一个FutureTask
实例。 - 执行任务: 调用
execute(ftask)
。这里的execute
方法是抽象的,它将实际的执行策略完全委托给了子类。例如,ThreadPoolExecutor
的execute
方法会将任务放入工作队列,并可能创建新的工作线程来执行它。 - 返回 Future: 将封装好的
ftask
对象返回给调用者。
这个过程完美地体现了模板方法模式:submit
定义了“提交任务”这一行为的整体流程框架,而将最关键的“如何执行”这一步交由子类实现。
核心功能 2:批量执行任务 (invokeAll
方法)
invokeAll
方法用于执行一个任务集合,并等待所有任务都完成后返回一个 Future
列表。
// ... existing code ...@Overridepublic <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException();ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());try {for (Callable<T> t : tasks) {RunnableFuture<T> f = newTaskFor(t);futures.add(f);execute(f);}for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);if (!f.isDone()) {try { f.get(); }catch (CancellationException | ExecutionException ignore) {}}}return futures;} catch (Throwable t) {cancelAll(futures);throw t;}}
// ... existing code ...
无超时版本的 invokeAll
逻辑如下:
- 遍历所有任务,将它们一一封装成
RunnableFuture
,并调用execute
提交执行。 - 再次遍历
Future
列表,对每个尚未完成的Future
调用get()
方法。这会阻塞当前线程,直到该任务完成。 get()
可能抛出的CancellationException
或ExecutionException
在这里被忽略,因为invokeAll
的契约是等待所有任务完成(无论是正常结束、异常结束还是被取消)。- 如果中途发生任何其他异常(如
InterruptedException
),finally
块中的cancelAll
会确保取消所有已提交的任务。
带超时的 invokeAll
版本逻辑更复杂,它需要处理超时,并在超时后取消所有未完成的任务。
核心功能 3:竞速执行任务 (invokeAny
方法)
invokeAny
是最复杂的一个方法。它执行一个任务集合,并返回第一个成功完成的任务的结果。一旦有一个任务成功,其他所有任务都会被取消。
// ... existing code ...private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {
// ... existing code ...ArrayList<Future<T>> futures = new ArrayList<>(ntasks);ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);try {
// ... existing code ...for (;;) {Future<T> f = ecs.poll(); // 尝试获取一个已完成的任务if (f == null) {if (ntasks > 0) {--ntasks;futures.add(ecs.submit(it.next())); // 提交新任务++active;}
// ... existing code ...}if (f != null) {--active;try {return f.get(); // 成功获取结果,直接返回} catch (ExecutionException eex) {ee = eex; // 任务执行失败,记录异常,继续等下一个} catch (RuntimeException rex) {ee = new ExecutionException(rex);}}}if (ee == null)ee = new ExecutionException();throw ee; // 所有任务都失败了} finally {cancelAll(futures); // 无论如何,最后取消所有任务}}
// ... existing code ...
其实现巧妙地利用了 ExecutorCompletionService
:
ExecutorCompletionService
: 这是一个辅助类,它将Executor
和BlockingQueue
结合起来。当通过它submit
任务时,任务完成后其Future
会被放入一个内部队列。可以通过poll
或take
方法获取下一个已完成任务的Future
,而无需自己遍历检查。- 增量提交: 它不是一次性提交所有任务,而是采用一种“提交一个,检查一个”的策略。它先提交一个任务,然后进入循环。
- 循环检查: 在循环中,它使用
ecs.poll()
尝试从完成队列中获取一个Future
。 - 处理结果:
- 如果获取到了
Future
(f != null
),就调用f.get()
。如果get()
成功返回结果,doInvokeAny
方法立即将此结果返回。如果get()
抛出异常,说明这个任务失败了,记录下异常,然后继续循环,等待下一个任务完成。 - 如果没有获取到
Future
(f == null
),并且还有未提交的任务,就再提交一个新任务。
- 如果获取到了
- 最终处理:
- 如果所有任务都执行完毕,但没有一个成功(即都抛出了异常),则将最后一个捕获到的
ExecutionException
抛出。 finally
块是关键:无论方法是正常返回、还是抛出异常,它都会调用cancelAll
来取消所有已提交的任务,确保资源被释放,这符合invokeAny
的语义。
- 如果所有任务都执行完毕,但没有一个成功(即都抛出了异常),则将最后一个捕获到的
可扩展点 (newTaskFor
方法)
// ... existing code ...protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}
// ... existing code ...
这两个 protected
方法是 AbstractExecutorService
留给子类的“钩子”(Hook)。默认情况下,它们使用标准的 FutureTask
来包装任务。但是,子类可以重写这两个方法,返回自定义的 RunnableFuture
实现。这提供了极大的灵活性,比如可以创建一个带有任务优先级、或能记录更多统计信息的自定义 Future
类型。
总结
AbstractExecutorService
是一个杰出的模板方法模式应用范例。它为 ExecutorService
的实现者们处理了所有复杂的、通用的流程控制逻辑,如任务提交、批量执行、竞速执行等。实现者只需要继承它,并专注于最核心的 execute
逻辑以及线程池的生命周期管理,就可以快速、可靠地构建出一个功能完备的执行器服务。这种设计大大降低了并发编程的门槛,并提高了代码的复用性。
ExecutorCompletionService
ExecutorCompletionService
解决了并发编程中一个常见的需求:将任务的提交与任务完成结果的获取解耦。当你向线程池提交一批任务时,你往往希望能够获取那些先完成的任务的结果,而不是按照提交的顺序来等待。ExecutorCompletionService
正是为了这个场景而设计的。
public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor;private final AbstractExecutorService aes;private final BlockingQueue<Future<V>> completionQueue;// ...
}
implements CompletionService<V>
: 它实现了CompletionService
接口,这个接口定义了任务提交 (submit
) 和获取已完成任务 (take
,poll
) 的规范。- 核心思想:
ExecutorCompletionService
本身并不执行任务。它像一个“中间人”,内部组合了一个Executor
(通常是线程池)来真正执行任务,同时维护一个阻塞队列BlockingQueue
(完成队列),用于存放已完成任务的Future
。 - 成员变量:
private final Executor executor
: 这是实际执行任务的执行器。你在构造ExecutorCompletionService
时必须提供一个。private final AbstractExecutorService aes
: 这是一个优化。如果传入的executor
恰好是AbstractExecutorService
的实例,它会保留一个引用,以便后续可以调用aes.newTaskFor()
方法,这允许重用Executor
自定义的FutureTask
类型。private final BlockingQueue<Future<V>> completionQueue
: 这是整个机制的核心。当一个任务执行完毕后,它的Future
对象会被放入这个队列中。
任务提交 (submit
方法)
submit
方法的实现是理解 ExecutorCompletionService
工作原理的关键。
// ... existing code ...public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture<V>(f, completionQueue));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture<V>(f, completionQueue));return f;}
// ... existing code ...
让我们分解 submit(Callable<V> task)
的执行步骤:
RunnableFuture<V> f = newTaskFor(task);
: 首先,它将用户提交的Callable
任务封装成一个RunnableFuture
(通常是FutureTask
)。这个f
就是最终会返回给调用者的Future
对象。executor.execute(new QueueingFuture<V>(f, completionQueue));
: 这是最精妙的一步。它没有直接把f
交给executor
执行。而是创建了一个新的、私有的内部类QueueingFuture
的实例,并将原始的Future
(f
) 和完成队列 (completionQueue
) 作为参数传给它。然后,它将这个QueueingFuture
对象提交给executor
执行。return f;
: 它将第 1 步创建的原始Future
(f
) 立即返回给调用者。
所以,调用者拿到的是任务本身的 Future
,而线程池真正在执行的是一个包装了该 Future
的 QueueingFuture
。
魔法的幕后推手 (QueueingFuture
内部类)
这个私有内部类是实现“完成后放入队列”这一逻辑的关键。
// ... existing code .../*** FutureTask extension to enqueue upon completion.*/private static class QueueingFuture<V> extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task,BlockingQueue<Future<V>> completionQueue) {super(task, null);this.task = task;this.completionQueue = completionQueue;}private final Future<V> task;private final BlockingQueue<Future<V>> completionQueue;protected void done() { completionQueue.add(task); }}
// ... existing code ...
extends FutureTask<Void>
:QueueingFuture
继承自FutureTask
,所以它本身也是一个Runnable
,可以被executor
执行。- 构造函数: 它接收原始任务的
Future
(task
) 和完成队列,并把它们保存起来。super(task, null)
的调用意味着,当这个QueueingFuture
被执行时,它实际上会去执行它所包装的task
。 protected void done()
: 这是FutureTask
提供的一个钩子方法(Hook)。当FutureTask
的任务(在这里就是它包装的task
)执行完成时(无论是正常结束、异常退出还是被取消),done()
方法会被自动回调。completionQueue.add(task);
: 在done()
方法中,它所做的唯一一件事就是将被包装的、现已完成的原始Future
(task
) 添加到完成队列中。
整个流程串起来就是:
submit
方法创建一个Future
(f
) 和一个包装它的QueueingFuture
(qf
)。qf
被交给线程池执行。- 线程池中的某个线程执行
qf
,这会导致f
被执行。 - 当
f
执行完毕后,qf
的done()
方法被触发。 done()
方法将f
放入completionQueue
。
获取结果 (take
和 poll
方法)
获取结果的方法就非常简单了,它们只是简单地委托给内部的完成队列。
// ... existing code ...public Future<V> take() throws InterruptedException {return completionQueue.take();}public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException {return completionQueue.poll(timeout, unit);}
// ... existing code ...
take()
: 从完成队列中取出一个Future
。如果队列为空,调用线程会阻塞,直到有任务完成并将其Future
放入队列。poll()
: 从完成队列中取出一个Future
。如果队列为空,它会立即返回null
,不会阻塞。poll(long timeout, TimeUnit unit)
: 带超时的版本,如果在指定时间内没有任务完成,则返回null
。
因为任务的 Future
是在它完成后才被放入队列的,所以通过 take()
或 poll()
取出 Future
的顺序,就是这些任务实际完成的顺序。
总结
ExecutorCompletionService
是一个优雅的装饰器模式和回调机制的应用。它通过一个包装类 (QueueingFuture
) 和一个阻塞队列,巧妙地将任务的执行与结果的消费分离开来,实现了“按完成顺序获取结果”的功能。
这在处理大量异构任务(执行时间不同)并希望尽快处理已完成任务结果的场景中非常有用。例如,同时向多个Web服务发起请求,并希望处理任何一个先返回的响应,而不是死板地按请求发出的顺序等待。我们之前分析的 AbstractExecutorService.invokeAny
方法,其内部就是利用了 ExecutorCompletionService
来实现其功能的。