当前位置: 首页 > news >正文

Disruptor 消费者核心:BatchEventProcessor解析

BatchEventProcessor 

它是 Disruptor 框架中消费者端的核心实现,理解它对于掌握 Disruptor 的工作原理至关重要。

BatchEventProcessor 的主要职责是:作为一个独立的事件处理器,从 RingBuffer 中批量获取已经发布的事件,并将这些事件委托给用户定义的 EventHandler 进行处理。它在一个独立的线程中运行,并管理着自己的消费进度。

BatchEventProcessor 内部维护了几个关键的字段来实现其功能:

// ... existing code ...
public final class BatchEventProcessor<T>implements EventProcessor
{private static final int IDLE = 0;private static final int HALTED = IDLE + 1;private static final int RUNNING = HALTED + 1;private final AtomicInteger running = new AtomicInteger(IDLE);private ExceptionHandler<? super T> exceptionHandler;private final DataProvider<T> dataProvider;private final SequenceBarrier sequenceBarrier;private final EventHandlerBase<? super T> eventHandler;private final int batchLimitOffset;private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);private final RewindHandler rewindHandler;private int retriesAttempted = 0;BatchEventProcessor(
// ... existing code ...
  • running: 一个 AtomicInteger,用于原子地管理处理器的运行状态。它有三个值:IDLE (空闲), HALTED (已停止), RUNNING (运行中)。这确保了 run() 方法不会在多个线程中被同时执行。
  • exceptionHandler: 异常处理器。当 EventHandler 在处理事件或在生命周期回调(如 onStartonShutdown)中抛出异常时,会由这个 ExceptionHandler 来处理。
  • dataProvider: 事件的提供者,通常就是 RingBufferBatchEventProcessor 通过它来获取指定序号(sequence)的事件对象。
  • sequenceBarrier: 序列屏障。这是 BatchEventProcessor 与生产者以及它所依赖的其他消费者之间进行协调的关键。它负责等待,直到生产者发布了新的事件并且所有前置消费者都处理完毕。
  • eventHandler: 事件处理器。这是用户定义的业务逻辑所在,BatchEventProcessor 会把获取到的事件逐个交给它的 onEvent 方法处理。
  • batchLimitOffset: 批处理大小限制的偏移量。它由 maxBatchSize - 1 计算得来,用于在 processEvents 循环中确定一个批次的最大边界。
  • sequence: 处理器自身的序列号。它代表了这个 BatchEventProcessor 已经成功处理到的事件的序号。这个序列号对于整个 Disruptor 的进度跟踪至关重要,生产者需要检查所有消费者的 sequence 来确定哪些 RingBuffer 的槽位可以被覆盖。
  • rewindHandler: 用于处理批次重试(Batch Rewind)的逻辑。如果 EventHandler 是 RewindableEventHandler 类型,这里会是一个 TryRewindHandler,否则是一个 NoRewindHandler

构造与创建

通常,你不会直接调用 BatchEventProcessor 的构造函数,而是使用 BatchEventProcessorBuilder 来创建它,这样更方便。

// ... existing code ...public <T> BatchEventProcessor<T> build(final DataProvider<T> dataProvider,final SequenceBarrier sequenceBarrier,final EventHandler<? super T> eventHandler){final BatchEventProcessor<T> processor = new BatchEventProcessor<>(dataProvider, sequenceBarrier, eventHandler, maxBatchSize, null);eventHandler.setSequenceCallback(processor.getSequence());return processor;}
// ... existing code ...

构造函数接收所有核心依赖,并根据 eventHandler 是否实现了 RewindableEventHandler 接口来初始化 rewindHandler

// ... existing code ...this.batchLimitOffset = maxBatchSize - 1;this.rewindHandler = eventHandler instanceof RewindableEventHandler? new TryRewindHandler(batchRewindStrategy): new NoRewindHandler();}
// ... existing code ...

生命周期管理

BatchEventProcessor 实现了 Runnable 接口,通常被包裹在一个 Thread 中执行。

  • run(): 这是处理器的入口点。

    1. 它首先使用 compareAndExchange 原子操作将状态从 IDLE 切换到 RUNNING。这可以防止多个线程同时启动同一个处理器实例。
    2. 如果状态切换成功,它会清理 sequenceBarrier 的 alert 状态,调用 notifyStart() 通知 EventHandler 启动,然后进入核心的 processEvents() 方法。
    3. processEvents() 正常或异常退出后,finally 块会确保调用 notifyShutdown() 通知 EventHandler 关闭,并将状态重置为 IDLE
    // ... existing code ...
    public void run()
    {int witnessValue = running.compareAndExchange(IDLE, RUNNING);if (witnessValue == IDLE) // Successful CAS{sequenceBarrier.clearAlert();notifyStart();try{if (running.get() == RUNNING){processEvents();}}finally{notifyShutdown();running.set(IDLE);}}else{
    // ... existing code ...
    
  • halt(): 用于从外部请求处理器停止。它做两件事:

    1. 将 running 状态设置为 HALTED
    2. 调用 sequenceBarrier.alert()。这个调用会中断 sequenceBarrier.waitFor() 的等待,抛出 AlertException,从而使 processEvents() 的主循环能够检测到状态变化并干净地退出。
    // ... existing code ...
    @Override
    public void halt()
    {running.set(HALTED);sequenceBarrier.alert();
    }
    // ... existing code ...
    

核心处理逻辑 processEvents()

这是 BatchEventProcessor 最核心的部分,一个无限循环,不断地从 RingBuffer 获取和处理事件。

// ... existing code ...private void processEvents(){T event = null;long nextSequence = sequence.get() + 1L;while (true){final long startOfBatchSequence = nextSequence;try{try{// 1. 等待可用事件final long availableSequence = sequenceBarrier.waitFor(nextSequence);// 2. 计算批次结束位置final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence);// 3. 通知批次开始if (nextSequence <= endOfBatchSequence){eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1);}// 4. 循环处理批次内事件while (nextSequence <= endOfBatchSequence){event = dataProvider.get(nextSequence);eventHandler.onEvent(event, nextSequence, nextSequence == endOfBatchSequence);nextSequence++;}retriesAttempted = 0;// 5. 更新序列号sequence.set(endOfBatchSequence);}catch (final RewindableException e){// ... 异常处理 ...}}catch (final TimeoutException e){// ... 异常处理 ...}catch (final AlertException ex){// ... 异常处理 ...}catch (final Throwable ex){// ... 异常处理 ...}}}
// ... existing code ...

让我们分解这个循环:

  1. 等待可用事件sequenceBarrier.waitFor(nextSequence) 是一个阻塞调用。它会根据配置的 WaitStrategy 进行等待,直到生产者发布了序号大于等于 nextSequence 的事件,并且所有前置依赖的消费者也都处理到了这个位置。它返回当前可消费的最高序列号 availableSequence
    • 这里的“魔法”在于:

      一次昂贵的检查:sequenceBarrier.waitFor(nextSequence) 是整个循环中唯一可能发生阻塞和等待的地方。它会一直等到有新的事件可用,然后返回当前生产者发布过的最高事件序号 availableSequence。这个操作的成本相对较高。

      定义批次:一旦 waitFor 返回,处理器就知道从 nextSequence 到 availableSequence 之间的所有事件都已经是“囊中之物”,可以安全访问了。然后它根据 maxBatchSize 限制,计算出本次实际处理的批次终点 endOfBatchSequence。

      多次廉价的获取:接下来的内部 while 循环,就变成了一个非常快速的遍历。dataProvider.get(nextSequence) 本质上只是一个数组的索引访问,没有任何锁或并发检查,因为它知道这个范围内的所有数据都已就绪。

  2. 计算批次结束位置endOfBatchSequence 是通过 min(nextSequence + batchLimitOffset, availableSequence) 计算得出的。这意味着批次的结束点,要么是达到了 maxBatchSize 的上限,要么是处理完了所有当前可用的事件,取两者中的较小值。这正是“批处理”语义的体现。
  3. 通知批次开始: 如果批次中有事件(nextSequence <= endOfBatchSequence),则调用 eventHandler.onBatchStart(),告知 EventHandler 一个新的批次即将开始,并传递批次大小和队列深度等信息。
  4. 循环处理批次内事件: 内部的 while 循环遍历从 nextSequence 到 endOfBatchSequence 的所有事件。
    • dataProvider.get(nextSequence): 从 RingBuffer 获取事件对象。
    • eventHandler.onEvent(...): 调用用户的业务逻辑。endOfBatch 标志位告诉 EventHandler 这是否是当前批次的最后一个事件,这对于需要批量提交(如批量写入数据库、网络IO)的场景非常有用。
  5. 更新序列号sequence.set(endOfBatchSequence) 是至关重要的一步。在整个批次成功处理完毕后,处理器将自己的 sequence 直接更新到批次的最后一个序号。这向系统宣告:“我已经处理完到 endOfBatchSequence 为止的所有事件了”。生产者可以根据这个信息来判断哪些 RingBuffer 的槽位是安全的,可以被覆盖重用。

异常处理

processEvents 的 try-catch 块设计得非常精巧,能处理不同类型的中断和异常情况。

  • RewindableException: 如果 EventHandler 抛出此异常,表示它希望重试当前整个批次。catch 块会调用 rewindHandler.attemptRewindGetNextSequence(),如果重试策略允许,nextSequence 会被重置为批次的起始序号 startOfBatchSequence,从而在下一次循环中重新处理该批次。
  • TimeoutException: 当 WaitStrategy 等待超时时抛出。notifyTimeout() 会被调用,最终将异常委托给 ExceptionHandler
  • AlertException: 当 halt() 被调用时,sequenceBarrier.waitFor() 会抛出此异常。catch 块会检查 running 状态,如果不是 RUNNING,就 break 退出主循环,实现优雅停机。
  • Throwable: 捕获所有其他来自 EventHandler 的异常。handleEventException() 会被调用,然后将 sequence 设置为当前出错的 nextSequence 并递增 nextSequence。这是一个关键设计:它保证了即使某个事件处理失败,处理器也会跳过它继续处理下一个事件,避免了因为一个“毒丸”事件而导致整个消费者线程卡死。

总结

BatchEventProcessor 是 Disruptor 高性能设计的核心体现。它通过以下机制实现了高效的事件处理:

  • 批处理: 一次性获取一批可用的事件,减少了与 SequenceBarrier 的交互开销,并为 EventHandler 提供了批量操作的可能。
  • 无锁进度跟踪: 通过更新自己的 sequence 来声明进度,避免了使用锁带来的性能损耗。
  • 职责分离: 它只负责事件的获取、分发和生命周期管理,将业务逻辑完全解耦到 EventHandler 中。
  • 健壮的异常处理: 精心设计的异常处理机制确保了处理器的稳定运行和优雅停机。

可以说,BatchEventProcessor 是连接 RingBuffer 数据结构和用户业务逻辑的强大而高效的引擎。

TryRewindHandler

TryRewindHandler 是 BatchEventProcessor 内部的一个私有类,它实现了 RewindHandler 接口(一个内部接口)。它的核心职责是处理和决策是否要“回滚”(Rewind)当前正在处理的事件批次

具体来说,当事件处理器(EventHandler)在处理事件时抛出 RewindableException 时,TryRewindHandler 会被调用。它本身不包含复杂的逻辑,而是将决策权委托给了外部传入的 BatchRewindStrategy(批次回滚策略)。

我们来看一下它的实现:

// ... existing code ...private class TryRewindHandler implements RewindHandler{private final BatchRewindStrategy batchRewindStrategy;TryRewindHandler(final BatchRewindStrategy batchRewindStrategy){this.batchRewindStrategy = batchRewindStrategy;}@Overridepublic long attemptRewindGetNextSequence(final RewindableException e, final long startOfBatchSequence) throws RewindableException{// 调用外部策略来决定如何处理这个回滚异常// 同时会递增重试次数(retriesAttempted)if (batchRewindStrategy.handleRewindException(e, ++retriesAttempted) == REWIND){// 如果策略决定回滚,则返回这个批次的起始序号return startOfBatchSequence;}else{// 如果策略决定不回滚,则重置重试计数器,并重新抛出异常retriesAttempted = 0;throw e;}}}
// ... existing code ...

总结一下 TryRewindHandler 的功能:

  1. 持有策略:它持有一个 BatchRewindStrategy 实例,这是用户定义的、用于决定是否回滚的策略。
  2. 执行策略:当 attemptRewindGetNextSequence 方法被调用时,它会执行该策略。
  3. 返回结果
    • 如果策略决定回滚,它返回当前批次的起始序号 (startOfBatchSequence)。
    • 如果策略决定不回滚,它会重新抛出原始的 RewindableException

重试策略如下:

  • 总是重试
  • 一定次数重试
  • park一段时间后重试


TryRewindHandler 如何与 BatchEventProcessor 互动?

TryRewindHandler 和 BatchEventProcessor 的互动主要体现在两个方面:实例化异常处理

BatchEventProcessor 在其构造函数中,会根据传入的 eventHandler 类型来决定创建哪种 RewindHandler

// ... existing code ...BatchEventProcessor(final DataProvider<T> dataProvider,final SequenceBarrier sequenceBarrier,final EventHandlerBase<? super T> eventHandler,final int maxBatchSize,final BatchRewindStrategy batchRewindStrategy){
// ... existing code ...this.batchLimitOffset = maxBatchSize - 1;// 关键点:只有当 eventHandler 是 RewindableEventHandler 的实例时,// 才会创建 TryRewindHandler。// 否则,创建一个不支持回滚的 NoRewindHandler。this.rewindHandler = eventHandler instanceof RewindableEventHandler? new TryRewindHandler(batchRewindStrategy): new NoRewindHandler();}
// ... existing code ...

这个设计非常巧妙:只有当用户提供了支持回滚的 RewindableEventHandler 时,BatchEventProcessor 才会启用回滚处理逻辑。对于普通的 EventHandler,任何尝试回滚的操作都会直接失败。

BatchEventProcessor 的核心逻辑在 processEvents() 方法中。这里有一个 try-catch 块专门用于捕获 RewindableException

// ... existing code ...private void processEvents(){T event = null;long nextSequence = sequence.get() + 1L;while (true){final long startOfBatchSequence = nextSequence; // 记录批次开始的序号try{try{
// ... existing code ...while (nextSequence <= endOfBatchSequence){event = dataProvider.get(nextSequence);// 如果 eventHandler.onEvent 抛出 RewindableException...eventHandler.onEvent(event, nextSequence, nextSequence == endOfBatchSequence);nextSequence++;}
// ... existing code ...sequence.set(endOfBatchSequence);}catch (final RewindableException e){// ...异常会被这里捕获,并调用 rewindHandlernextSequence = rewindHandler.attemptRewindGetNextSequence(e, startOfBatchSequence);}}
// ... existing code ...catch (final Throwable ex){handleEventException(ex, nextSequence, event);sequence.set(nextSequence);nextSequence++;}}}
// ... existing code ...

互动流程如下:

  1. processEvents 循环开始,记录下当前批次的起始序号 startOfBatchSequence
  2. 在处理批次内事件时,eventHandler.onEvent(...) 抛出了一个 RewindableException
  3. 内层的 catch 块捕获到这个异常。
  4. catch 块调用 rewindHandler.attemptRewindGetNextSequence(...),并将异常和批次起始序号传给它。
  5. TryRewindHandler 调用 batchRewindStrategy 来判断是否应该重试。
  6. 如果策略决定回滚TryRewindHandler 返回 startOfBatchSequence。这个值被赋给 nextSequence。在下一次 while(true) 循环开始时,processEvents 就会从失败批次的第一个事件开始,重新处理整个批次。
  7. 如果策略决定不回滚TryRewindHandler 重新抛出 RewindableException。这个异常不会被内层 catch 捕获,而是被外层的 catch (final Throwable ex) 捕获,然后交由通用的 ExceptionHandler 来处理,并且处理器会继续处理下一个序号的事件,当前批次不会被重试。

总而言之,TryRewindHandler 扮演了一个策略执行者的角色。它连接了 BatchEventProcessor 的核心事件循环和用户自定义的 BatchRewindStrategy,使得 BatchEventProcessor 能够在捕获到可回滚异常时,根据外部策略灵活地决定是重试整个批次还是放弃并继续前进。

http://www.dtcms.com/a/320241.html

相关文章:

  • 告别复杂配置!cpolar让Prometheus监控突破网络限制
  • 【42】【OpenCV C++】 计算图像某一列像素方差 或 某一行像素的方差;
  • 嵌入式开发硬件——单片机
  • 【列出指定时间段内所有的下单产品】
  • 数据结构(循环顺序队列)
  • RAGAS:检索增强生成系统的无参考评估框架与技术解析
  • 2025年华数杯C题超详细解题思路
  • 哈希表原理与实现全解析
  • 天道20金句
  • Moses工具的配置和小语种平行语料训练SMT完整实现
  • 大模型 Transformer模型(上)
  • Java集合的遍历方式(全解析)
  • 力扣经典算法篇-46-阶乘后的零(正向步长遍历,逆向步长遍历)
  • BGP笔记整理
  • Maven高级:继承与聚合实战指南
  • RS485转Profibus网关在QDNA钠离子分析仪与300PLC通信中的应用解析
  • 【OCCT+ImGUI系列】013-碰撞检测-包围盒Bnd_Box
  • 【入门级-C++程序设计:9、函数与递归-函数定义与调用、形参与实参】
  • RESTful 服务概述:从理念到实践的全面解析
  • Coze开放平台综合文档指南
  • 达梦包含OR条件的SQL特定优化----INJECT-HINT优化方法
  • 最新完整内、外期货量化交易系统C#源码可售
  • 【C#补全计划:类和对象(九)】接口
  • redis--黑马点评--用户签到模块详解
  • dubbo源码之编解码逻辑
  • 一场 Dark Theme A/B 测试的复盘与提效实践
  • 聚集索引VS非聚集索引:核心差异详解
  • rebase 和pull的通俗区别是什么
  • 一个基于固定 IP地址查询天气的 C 语言程序,通过调用第三方天气 API:
  • React 多语言(i18n)方案全面指南