Disruptor 消费者核心:BatchEventProcessor解析
BatchEventProcessor
它是 Disruptor 框架中消费者端的核心实现,理解它对于掌握 Disruptor 的工作原理至关重要。
BatchEventProcessor
的主要职责是:作为一个独立的事件处理器,从 RingBuffer
中批量获取已经发布的事件,并将这些事件委托给用户定义的 EventHandler
进行处理。它在一个独立的线程中运行,并管理着自己的消费进度。
BatchEventProcessor
内部维护了几个关键的字段来实现其功能:
// ... existing code ...
public final class BatchEventProcessor<T>implements EventProcessor
{private static final int IDLE = 0;private static final int HALTED = IDLE + 1;private static final int RUNNING = HALTED + 1;private final AtomicInteger running = new AtomicInteger(IDLE);private ExceptionHandler<? super T> exceptionHandler;private final DataProvider<T> dataProvider;private final SequenceBarrier sequenceBarrier;private final EventHandlerBase<? super T> eventHandler;private final int batchLimitOffset;private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);private final RewindHandler rewindHandler;private int retriesAttempted = 0;BatchEventProcessor(
// ... existing code ...
running
: 一个AtomicInteger
,用于原子地管理处理器的运行状态。它有三个值:IDLE
(空闲),HALTED
(已停止),RUNNING
(运行中)。这确保了run()
方法不会在多个线程中被同时执行。exceptionHandler
: 异常处理器。当EventHandler
在处理事件或在生命周期回调(如onStart
,onShutdown
)中抛出异常时,会由这个ExceptionHandler
来处理。dataProvider
: 事件的提供者,通常就是RingBuffer
。BatchEventProcessor
通过它来获取指定序号(sequence)的事件对象。sequenceBarrier
: 序列屏障。这是BatchEventProcessor
与生产者以及它所依赖的其他消费者之间进行协调的关键。它负责等待,直到生产者发布了新的事件并且所有前置消费者都处理完毕。eventHandler
: 事件处理器。这是用户定义的业务逻辑所在,BatchEventProcessor
会把获取到的事件逐个交给它的onEvent
方法处理。batchLimitOffset
: 批处理大小限制的偏移量。它由maxBatchSize - 1
计算得来,用于在processEvents
循环中确定一个批次的最大边界。sequence
: 处理器自身的序列号。它代表了这个BatchEventProcessor
已经成功处理到的事件的序号。这个序列号对于整个 Disruptor 的进度跟踪至关重要,生产者需要检查所有消费者的sequence
来确定哪些RingBuffer
的槽位可以被覆盖。rewindHandler
: 用于处理批次重试(Batch Rewind)的逻辑。如果EventHandler
是RewindableEventHandler
类型,这里会是一个TryRewindHandler
,否则是一个NoRewindHandler
。
构造与创建
通常,你不会直接调用 BatchEventProcessor
的构造函数,而是使用 BatchEventProcessorBuilder
来创建它,这样更方便。
// ... existing code ...public <T> BatchEventProcessor<T> build(final DataProvider<T> dataProvider,final SequenceBarrier sequenceBarrier,final EventHandler<? super T> eventHandler){final BatchEventProcessor<T> processor = new BatchEventProcessor<>(dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null);eventHandler.setSequenceCallback(processor.getSequence());return processor;}
// ... existing code ...
构造函数接收所有核心依赖,并根据 eventHandler
是否实现了 RewindableEventHandler
接口来初始化 rewindHandler
。
// ... existing code ...this.batchLimitOffset = maxBatchSize - 1;this.rewindHandler = eventHandler instanceof RewindableEventHandler? new TryRewindHandler(batchRewindStrategy): new NoRewindHandler();}
// ... existing code ...
生命周期管理
BatchEventProcessor
实现了 Runnable
接口,通常被包裹在一个 Thread
中执行。
run()
: 这是处理器的入口点。- 它首先使用
compareAndExchange
原子操作将状态从IDLE
切换到RUNNING
。这可以防止多个线程同时启动同一个处理器实例。 - 如果状态切换成功,它会清理
sequenceBarrier
的alert
状态,调用notifyStart()
通知EventHandler
启动,然后进入核心的processEvents()
方法。 processEvents()
正常或异常退出后,finally
块会确保调用notifyShutdown()
通知EventHandler
关闭,并将状态重置为IDLE
。
// ... existing code ... public void run() {int witnessValue = running.compareAndExchange(IDLE, RUNNING);if (witnessValue == IDLE) // Successful CAS{sequenceBarrier.clearAlert();notifyStart();try{if (running.get() == RUNNING){processEvents();}}finally{notifyShutdown();running.set(IDLE);}}else{ // ... existing code ...
- 它首先使用
halt()
: 用于从外部请求处理器停止。它做两件事:- 将
running
状态设置为HALTED
。 - 调用
sequenceBarrier.alert()
。这个调用会中断sequenceBarrier.waitFor()
的等待,抛出AlertException
,从而使processEvents()
的主循环能够检测到状态变化并干净地退出。
// ... existing code ... @Override public void halt() {running.set(HALTED);sequenceBarrier.alert(); } // ... existing code ...
- 将
核心处理逻辑 processEvents()
这是 BatchEventProcessor
最核心的部分,一个无限循环,不断地从 RingBuffer
获取和处理事件。
// ... existing code ...private void processEvents(){T event = null;long nextSequence = sequence.get() + 1L;while (true){final long startOfBatchSequence = nextSequence;try{try{// 1. 等待可用事件final long availableSequence = sequenceBarrier.waitFor(nextSequence);// 2. 计算批次结束位置final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence);// 3. 通知批次开始if (nextSequence <= endOfBatchSequence){eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1);}// 4. 循环处理批次内事件while (nextSequence <= endOfBatchSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == endOfBatchSequence);nextSequence++;}retriesAttempted = 0;// 5. 更新序列号sequence.set(endOfBatchSequence);}catch (final RewindableException e){// ... 异常处理 ...}}catch (final TimeoutException e){// ... 异常处理 ...}catch (final AlertException ex){// ... 异常处理 ...}catch (final Throwable ex){// ... 异常处理 ...}}}
// ... existing code ...
让我们分解这个循环:
- 等待可用事件:
sequenceBarrier.waitFor(nextSequence)
是一个阻塞调用。它会根据配置的WaitStrategy
进行等待,直到生产者发布了序号大于等于nextSequence
的事件,并且所有前置依赖的消费者也都处理到了这个位置。它返回当前可消费的最高序列号availableSequence
。这里的“魔法”在于:
一次昂贵的检查:sequenceBarrier.waitFor(nextSequence) 是整个循环中唯一可能发生阻塞和等待的地方。它会一直等到有新的事件可用,然后返回当前生产者发布过的最高事件序号 availableSequence。这个操作的成本相对较高。
定义批次:一旦 waitFor 返回,处理器就知道从 nextSequence 到 availableSequence 之间的所有事件都已经是“囊中之物”,可以安全访问了。然后它根据 maxBatchSize 限制,计算出本次实际处理的批次终点 endOfBatchSequence。
多次廉价的获取:接下来的内部 while 循环,就变成了一个非常快速的遍历。dataProvider.get(nextSequence) 本质上只是一个数组的索引访问,没有任何锁或并发检查,因为它知道这个范围内的所有数据都已就绪。
- 计算批次结束位置:
endOfBatchSequence
是通过min(nextSequence + batchLimitOffset, availableSequence)
计算得出的。这意味着批次的结束点,要么是达到了maxBatchSize
的上限,要么是处理完了所有当前可用的事件,取两者中的较小值。这正是“批处理”语义的体现。 - 通知批次开始: 如果批次中有事件(
nextSequence <= endOfBatchSequence
),则调用eventHandler.onBatchStart()
,告知EventHandler
一个新的批次即将开始,并传递批次大小和队列深度等信息。 - 循环处理批次内事件: 内部的
while
循环遍历从nextSequence
到endOfBatchSequence
的所有事件。dataProvider.get(nextSequence)
: 从RingBuffer
获取事件对象。eventHandler.onEvent(...)
: 调用用户的业务逻辑。endOfBatch
标志位告诉EventHandler
这是否是当前批次的最后一个事件,这对于需要批量提交(如批量写入数据库、网络IO)的场景非常有用。
- 更新序列号:
sequence.set(endOfBatchSequence)
是至关重要的一步。在整个批次成功处理完毕后,处理器将自己的sequence
直接更新到批次的最后一个序号。这向系统宣告:“我已经处理完到endOfBatchSequence
为止的所有事件了”。生产者可以根据这个信息来判断哪些RingBuffer
的槽位是安全的,可以被覆盖重用。
异常处理
processEvents
的 try-catch
块设计得非常精巧,能处理不同类型的中断和异常情况。
RewindableException
: 如果EventHandler
抛出此异常,表示它希望重试当前整个批次。catch
块会调用rewindHandler.attemptRewindGetNextSequence()
,如果重试策略允许,nextSequence
会被重置为批次的起始序号startOfBatchSequence
,从而在下一次循环中重新处理该批次。TimeoutException
: 当WaitStrategy
等待超时时抛出。notifyTimeout()
会被调用,最终将异常委托给ExceptionHandler
。AlertException
: 当halt()
被调用时,sequenceBarrier.waitFor()
会抛出此异常。catch
块会检查running
状态,如果不是RUNNING
,就break
退出主循环,实现优雅停机。Throwable
: 捕获所有其他来自EventHandler
的异常。handleEventException()
会被调用,然后将sequence
设置为当前出错的nextSequence
并递增nextSequence
。这是一个关键设计:它保证了即使某个事件处理失败,处理器也会跳过它继续处理下一个事件,避免了因为一个“毒丸”事件而导致整个消费者线程卡死。
总结
BatchEventProcessor
是 Disruptor 高性能设计的核心体现。它通过以下机制实现了高效的事件处理:
- 批处理: 一次性获取一批可用的事件,减少了与
SequenceBarrier
的交互开销,并为EventHandler
提供了批量操作的可能。 - 无锁进度跟踪: 通过更新自己的
sequence
来声明进度,避免了使用锁带来的性能损耗。 - 职责分离: 它只负责事件的获取、分发和生命周期管理,将业务逻辑完全解耦到
EventHandler
中。 - 健壮的异常处理: 精心设计的异常处理机制确保了处理器的稳定运行和优雅停机。
可以说,BatchEventProcessor
是连接 RingBuffer
数据结构和用户业务逻辑的强大而高效的引擎。
TryRewindHandler
TryRewindHandler
是 BatchEventProcessor
内部的一个私有类,它实现了 RewindHandler
接口(一个内部接口)。它的核心职责是处理和决策是否要“回滚”(Rewind)当前正在处理的事件批次。
具体来说,当事件处理器(EventHandler
)在处理事件时抛出 RewindableException
时,TryRewindHandler
会被调用。它本身不包含复杂的逻辑,而是将决策权委托给了外部传入的 BatchRewindStrategy
(批次回滚策略)。
我们来看一下它的实现:
// ... existing code ...private class TryRewindHandler implements RewindHandler{private final BatchRewindStrategy batchRewindStrategy;TryRewindHandler(final BatchRewindStrategy batchRewindStrategy){this.batchRewindStrategy = batchRewindStrategy;}@Overridepublic long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException{// 调用外部策略来决定如何处理这个回滚异常// 同时会递增重试次数(retriesAttempted)if (batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND){// 如果策略决定回滚,则返回这个批次的起始序号return startOfBatchSequence;}else{// 如果策略决定不回滚,则重置重试计数器,并重新抛出异常retriesAttempted = 0;throw e;}}}
// ... existing code ...
总结一下 TryRewindHandler
的功能:
- 持有策略:它持有一个
BatchRewindStrategy
实例,这是用户定义的、用于决定是否回滚的策略。 - 执行策略:当
attemptRewindGetNextSequence
方法被调用时,它会执行该策略。 - 返回结果:
- 如果策略决定回滚,它返回当前批次的起始序号 (
startOfBatchSequence
)。 - 如果策略决定不回滚,它会重新抛出原始的
RewindableException
。
- 如果策略决定回滚,它返回当前批次的起始序号 (
重试策略如下:
- 总是重试
- 一定次数重试
- park一段时间后重试
TryRewindHandler
如何与 BatchEventProcessor
互动?
TryRewindHandler
和 BatchEventProcessor
的互动主要体现在两个方面:实例化和异常处理。
BatchEventProcessor
在其构造函数中,会根据传入的 eventHandler
类型来决定创建哪种 RewindHandler
。
// ... existing code ...BatchEventProcessor(final DataProvider<T> dataProvider,final SequenceBarrier sequenceBarrier,final EventHandlerBase<? super T> eventHandler,final int maxBatchSize,final BatchRewindStrategy batchRewindStrategy){
// ... existing code ...this.batchLimitOffset = maxBatchSize - 1;// 关键点:只有当 eventHandler 是 RewindableEventHandler 的实例时,// 才会创建 TryRewindHandler。// 否则,创建一个不支持回滚的 NoRewindHandler。this.rewindHandler = eventHandler instanceof RewindableEventHandler? new TryRewindHandler(batchRewindStrategy): new NoRewindHandler();}
// ... existing code ...
这个设计非常巧妙:只有当用户提供了支持回滚的 RewindableEventHandler
时,BatchEventProcessor
才会启用回滚处理逻辑。对于普通的 EventHandler
,任何尝试回滚的操作都会直接失败。
BatchEventProcessor
的核心逻辑在 processEvents()
方法中。这里有一个 try-catch
块专门用于捕获 RewindableException
。
// ... existing code ...private void processEvents(){T event = null;long nextSequence = sequence.get() + 1L;while (true){final long startOfBatchSequence = nextSequence; // 记录批次开始的序号try{try{
// ... existing code ...while (nextSequence <= endOfBatchSequence){event = dataProvider.get(nextSequence);// 如果 eventHandler.onEvent 抛出 RewindableException...eventHandler.onEvent(event, nextSequence, nextSequence == endOfBatchSequence);nextSequence++;}
// ... existing code ...sequence.set(endOfBatchSequence);}catch (final RewindableException e){// ...异常会被这里捕获,并调用 rewindHandlernextSequence = rewindHandler.attemptRewindGetNextSequence(e, startOfBatchSequence);}}
// ... existing code ...catch (final Throwable ex){handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}}
// ... existing code ...
互动流程如下:
processEvents
循环开始,记录下当前批次的起始序号startOfBatchSequence
。- 在处理批次内事件时,
eventHandler.onEvent(...)
抛出了一个RewindableException
。 - 内层的
catch
块捕获到这个异常。 catch
块调用rewindHandler.attemptRewindGetNextSequence(...)
,并将异常和批次起始序号传给它。TryRewindHandler
调用batchRewindStrategy
来判断是否应该重试。- 如果策略决定回滚:
TryRewindHandler
返回startOfBatchSequence
。这个值被赋给nextSequence
。在下一次while(true)
循环开始时,processEvents
就会从失败批次的第一个事件开始,重新处理整个批次。 - 如果策略决定不回滚:
TryRewindHandler
重新抛出RewindableException
。这个异常不会被内层catch
捕获,而是被外层的catch (final Throwable ex)
捕获,然后交由通用的ExceptionHandler
来处理,并且处理器会继续处理下一个序号的事件,当前批次不会被重试。
总而言之,TryRewindHandler
扮演了一个策略执行者的角色。它连接了 BatchEventProcessor
的核心事件循环和用户自定义的 BatchRewindStrategy
,使得 BatchEventProcessor
能够在捕获到可回滚异常时,根据外部策略灵活地决定是重试整个批次还是放弃并继续前进。