当前位置: 首页 > news >正文

Netty 重放解码器ReplayingDecoder揭秘:重写轻量异常机制 和 ConstantPool

ReplayingDecoder 的核心目标是让开发者能够像编写阻塞I/O(同步)代码一样,来编写非阻塞I/O(异步)的解码逻辑

ByteToMessageDecoder分析见:Netty ByteToMessageDecoder解码机制解析

在标准的 ByteToMessageDecoder 中,由于数据可能不是一次性完整到达(半包问题),我们在读取数据前必须不断地检查缓冲区中是否有足够的字节:

// ByteToMessageDecoder 的典型写法
if (buf.readableBytes() < 4) {return; // 字节不够,等待下次
}
int length = buf.readInt();if (buf.readableBytes() < length) {// 读出了长度,但内容不够buf.resetReaderIndex(); // 重置读指针return; // 等待下次
}
// 读取内容...

这种写法充满了防御性的 if 判断,显得很繁琐。

而 ReplayingDecoder 允许你这样写:

// ReplayingDecoder 的写法
public class IntegerHeaderFrameDecoderextends ReplayingDecoder<Void> {protected void decode(ChannelHandlerContext ctx,ByteBuf buf, List<Object> out) throws Exception {// 直接读,不用检查!out.add(buf.readBytes(buf.readInt()));}
}

代码变得极其简洁,你只需要假设所有需要的字节都已经在缓冲区里了。这正是 ReplayingDecoder 的魔力所在。

工作原理:魔法背后的 Signal

ReplayingDecoder 的神奇效果并非没有代价,其背后是一套精巧的“重放(Replay)”机制。

// ... existing code ...* <h3>How does this work?</h3>* <p>* {@link ReplayingDecoder} passes a specialized {@link ByteBuf}* implementation which throws an {@link Error} of certain type when there's not* enough data in the buffer.  In the {@code IntegerHeaderFrameDecoder} above,* you just assumed that there will be 4 or more bytes in the buffer when* you call {@code buf.readInt()}.  If there's really 4 bytes in the buffer,* it will return the integer header as you expected.  Otherwise, the* {@link Error} will be raised and the control will be returned to* {@link ReplayingDecoder}.  If {@link ReplayingDecoder} catches the* {@link Error}, then it will rewind the {@code readerIndex} of the buffer* back to the 'initial' position (i.e. the beginning of the buffer) and call* the {@code decode(..)} method again when more data is received into the* buffer.
// ... existing code ...

其工作流程如下:

  1. 特殊的 ByteBufReplayingDecoder 并不会将原始的 ByteBuf 直接传给你的 decode 方法。相反,它会用一个内部的、名为 ReplayingDecoderByteBuf 的特殊 ByteBuf 将原始 ByteBuf 包装起来。

    // ... existing code ...
    public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
    // ... existing code ...
    
  2. 抛出 Signal: 当你在 decode 方法中调用 replayable 这个 ByteBuf 的读操作时(如 readInt()),ReplayingDecoderByteBuf 会先检查底层的真实 ByteBuf 中是否有足够的字节。

    • 如果字节足够:正常执行读操作。
    • 如果字节不够:它不会返回或阻塞,而是会抛出一个特殊的、内部预先缓存好的 Error 子类——Signal。这个 Signal 非常轻量,因为它不需要填充堆栈信息。
  3. 捕获与重放ReplayingDecoder 的 callDecode 方法内部有一个 try-catch 块,专门用来捕获这个 Signal

    // ... existing code ...
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {replayable.setCumulation(in);try {while (in.isReadable()) {// ...try {decodeRemovalReentryProtection(ctx, replayable, out);// ...} catch (Signal replay) {replay.expect(REPLAY);// ...// Return to the checkpoint (or oldPosition) and retry.int checkpoint = this.checkpoint;if (checkpoint >= 0) {in.readerIndex(checkpoint);} // ...break; // 跳出循环,等待更多数据}// ...}} // ...
    }
    

    一旦捕获到 REPLAY 信号,它会: a. 将底层真实 ByteBuf 的 readerIndex 重置到解码开始前的位置(或上一个“检查点”)。 b. 放弃本次解码,等待更多的数据到达。

  4. 再次调用: 当网络上来了新数据,Netty 会再次调用 callDecode 方法,你的 decode 逻辑将从头开始重新执行。由于上次的状态被回滚了,这次就好像是第一次执行一样,直到所有需要的字节都满足,解码才会成功。

Signal 类设计分析

Signal 类在 Netty 中采用了一种特殊的设计方式,其继承自 Error 类并实现了 Constant<Signal> 接口。该设计具有以下几个关键特征:

继承自 Error

  • Signal 继承自 Error 而非 Exception,表明其被设计用于表示系统级的重要问题或特殊状态信号

  • 在 Java 中,Error 通常用于表示 JVM 或系统级别的严重问题,应用程序通常不应捕获这类错误

  • Netty 采用继承 Error 的设计主要是为了利用其特定特性,而非表示真正的系统错误

实现 Constant<Signal> 接口

  • Signal 实现 Constant<Signal> 接口,使其成为一个具有唯一标识和名称的常量

  • 通过 ConstantPool 管理这些常量,确保每个名称对应的 Signal 实例保持单例模式

空堆栈跟踪与无原因机制

  • Signal 重写了 initCause 和 fillInStackTrace 方法,使其不执行任何操作并返回自身实例

  • 这种设计避免了创建异常时的性能开销,因为 Signal 本质上是一种状态信号而非真正的异常

  • 通过这种方式,Signal 有效避免了填充堆栈跟踪带来的性能损耗

单例模式

  • 通过 ConstantPool 机制,Signal 实例以单例形式存在,相同名称总是返回同一实例

  • 这种设计有助于减少内存占用并提升系统性能

public final class Signal extends Error implements Constant<Signal> {private static final long serialVersionUID = -221145131122459977L;private static final ConstantPool<Signal> pool = new ConstantPool<Signal>() {@Overrideprotected Signal newConstant(int id, String name) {return new Signal(id, name);}};/*** Shortcut of {@link #valueOf(String) valueOf(firstNameComponent.getName() + "#" + secondNameComponent)}.*/public static Signal valueOf(Class<?> firstNameComponent, String secondNameComponent) {return pool.valueOf(firstNameComponent, secondNameComponent);}private final SignalConstant constant;/*** Creates a new {@link Signal} with the specified {@code name}.*/private Signal(int id, String name) {constant = new SignalConstant(id, name);}/*** Check if the given {@link Signal} is the same as this instance. If not an {@link IllegalStateException} will* be thrown.*/public void expect(Signal signal) {if (this != signal) {throw new IllegalStateException("unexpected signal: " + signal);}}// Suppress a warning since the method doesn't need synchronization@Overridepublic Throwable initCause(Throwable cause) {return this;}// Suppress a warning since the method doesn't need synchronization@Overridepublic Throwable fillInStackTrace() {return this;}@Overridepublic int id() {return constant.id();}@Overridepublic String name() {return constant.name();}

异常捕获开销分析

作为特殊的 Error 子类,Signal 被设计为在某些情况下作为状态信号抛出,而非表示实际错误,因此其异常捕获开销相对较低:

无堆栈跟踪机制

  • Signal 不会填充堆栈跟踪信息,显著降低了创建和抛出 Signal 实例的开销

  • 在常规异常对象创建过程中,填充堆栈跟踪是最耗时的环节之一

无原因设置

  • Signal 不支持设置原因链,进一步减少了对象创建时的性能开销

单例模式优势

  • 采用单例模式避免了重复创建相同信号实例的开销

综上所述,Signal 类的设计旨在作为轻量级状态信号机制,在需要时进行抛出。其通过避免传统异常对象创建过程中的主要性能开销来源,实现了较低的异常捕获开销。

以下是优化格式后的内容,在保持原内容完整性的基础上,对排版、代码展示等方面进行了优化,使其更易读:

Netty中ConstantPool的设计分析

ConstantPool相关类的设计采用了典型的模板方法模式,包含以下几个核心组件:

  • ​Constant<T>接口​​:定义了常量的基本行为,包括id()name()方法,并继承了Comparable<T>接口。

  • ​AbstractConstant<T>抽象类​​:实现了Constant<T>接口,提供了常量的基本实现。

  • ​ConstantPool<T>抽象类​​:管理常量的创建和获取,确保每个名称只对应一个常量实例。

  • ​具体的常量类​​:如AttributeKey<T>ChannelOption<T>Signal,它们继承自AbstractConstant<T>或直接实现Constant<T>接口,并通过内部的ConstantPool实例来管理自己类型的常量。

 

ConstantPool使用了ConcurrentHashMap来存储已创建的常量,确保线程安全:

// /netty/common/src/main/java/io/netty/util/ConstantPool.java
private final ConcurrentMap<String, T> constants = new ConcurrentHashMap<>();
private final AtomicInteger nextId = new AtomicInteger(1);

ConstantPool提供了valueOf方法来获取或创建常量:

// /netty/common/src/main/java/io/netty/util/ConstantPool.java
public T valueOf(String name) {return getOrCreate(checkNonEmpty(name, "name"));
}private T getOrCreate(String name) {T constant = constants.get(name);if (constant == null) {final T tempConstant = newConstant(nextId(), name);constant = constants.putIfAbsent(name, tempConstant);if (constant == null) {return tempConstant;}}return constant;
}

这个实现确保了:

  • 对于同一个名称,只会创建一个常量实例(单例模式)。

  • 多线程环境下线程安全。

  • 使用putIfAbsent避免了重复创建。

抽象方法

ConstantPool定义了一个抽象方法newConstant,由子类实现具体的常量创建逻辑:

// /netty/common/src/main/java/io/netty/util/ConstantPool.java
protected abstract T newConstant(int id, String name);

AttributeKey

AttributeKey是ConstantPool的一个典型应用:

// /netty/common/src/main/java/io/netty/util/AttributeKey.java
public final class AttributeKey<T> extends AbstractConstant<AttributeKey<T>> {private static final ConstantPool<AttributeKey<Object>> pool = new ConstantPool<AttributeKey<Object>>() {@Overrideprotected AttributeKey<Object> newConstant(int id, String name) {return new AttributeKey<Object>(id, name);}};@SuppressWarnings("unchecked")public static <T> AttributeKey<T> valueOf(String name) {return (AttributeKey<T>) pool.valueOf(name);}private AttributeKey(int id, String name) {super(id, name);}
}

ChannelOption

ChannelOption也使用了类似的模式:

// /netty/transport/src/main/java/io/netty/channel/ChannelOption.java
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {private static final ConstantPool<ChannelOption<Object>> pool = new ConstantPool<ChannelOption<Object>>() {@Overrideprotected ChannelOption<Object> newConstant(int id, String name) {return new ChannelOption<Object>(id, name);}};@SuppressWarnings("unchecked")public static <T> ChannelOption<T> valueOf(String name) {return (ChannelOption<T>) pool.valueOf(name);}// 预定义的常量public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");// ...private ChannelOption(int id, String name) {super(id, name);}
}

Signal

Signal类采用了稍微不同的实现方式,它直接继承自Error并实现了Constant<Signal>接口:

// /netty/common/src/main/java/io/netty/util/Signal.java
public final class Signal extends Error implements Constant<Signal> {private static final ConstantPool<Signal> pool = new ConstantPool<Signal>() {@Overrideprotected Signal newConstant(int id, String name) {return new Signal(id, name);}};private final SignalConstant constant;private Signal(int id, String name) {constant = new SignalConstant(id, name);}// 委托给内部的SignalConstant实现Constant接口的方法@Overridepublic int id() {return constant.id();}@Overridepublic String name() {return constant.name();}private static final class SignalConstant extends AbstractConstant<SignalConstant> {SignalConstant(int id, String name) {super(id, name);}}
}

设计优势

  • ​单例模式​​:确保每个名称只对应一个常量实例,节省内存并支持使用==进行比较。

  • ​线程安全​​:使用ConcurrentHashMap和原子操作确保多线程环境下的安全访问。

  • ​高性能​​:通过避免重复创建和使用高效的并发数据结构,提供快速的常量查找和创建。

  • ​类型安全​​:通过泛型确保类型安全,避免类型转换错误。

  • ​延迟初始化​​:常量只在第一次访问时创建,避免不必要的资源消耗。

状态管理与性能优化:checkpoint

反复从头解码的开销可能很大。例如,如果一个消息包含一个长度头和很长的消息体,每次因为消息体不完整而重放时,都会重新读取一遍长度头。为了解决这个问题,ReplayingDecoder 引入了状态管理和  检查点(checkpoint) 机制。

ReplayingDecoder<S> 中的泛型 S 就是用来定义状态的,通常使用一个枚举(Enum)。

// ... existing code ...* public enum MyDecoderState {*   READ_LENGTH,*   READ_CONTENT;* }** public class IntegerHeaderFrameDecoder*      extends {@link ReplayingDecoder}&lt;<strong>MyDecoderState</strong>&gt; {**   private int length;**   public IntegerHeaderFrameDecoder() {*     // Set the initial state.*     <strong>super(MyDecoderState.READ_LENGTH);</strong>*   }**   {@code @Override}*   protected void decode({@link ChannelHandlerContext} ctx,*                           {@link ByteBuf} buf, List&lt;Object&gt; out) throws Exception {*     switch (state()) {*     case READ_LENGTH:*       length = buf.readInt();*       <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>*     case READ_CONTENT:*       ByteBuf frame = buf.readBytes(length);*       <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>*       out.add(frame);*       break;*     default:*       throw new Error("Shouldn't reach here.");*     }*   }* }
// ... existing code ...
  • state(): 获取当前状态。
  • checkpoint(S newState): 这是核心方法。它做两件事:
    1. 设置检查点: 将当前 ByteBuf 的 readerIndex 保存下来。
    2. 更新状态: 将解码器的状态切换到 newState

在注释的例子中:

  1. 初始状态是 READ_LENGTH
  2. decode 方法进入 READ_LENGTH 分支,成功读取4字节的 length
  3. 调用 checkpoint(MyDecoderState.READ_CONTENT)。此时,检查点被设置在 length 之后,并且状态切换为 READ_CONTENT
  4. 代码继续执行到 READ_CONTENT 分支。假设此时 buf.readBytes(length) 因为字节不够而抛出 REPLAY 信号。
  5. callDecode 捕获信号后,会将 readerIndex 重置到上一个检查点,也就是 length 字段之后的位置。
  6. 当新数据到来再次解码时,decode 方法会从 state() 即 READ_CONTENT 状态开始,直接尝试读取消息体,而不需要再重新读取 length

通过 checkpoint,我们将一个大的解码任务分解成了多个小的、原子性的步骤,大大提高了复杂协议的解码效率。

局限性

简洁性的代价是一些限制:

  • 性能开销: 重放机制本身有一定开销。对于复杂的协议,如果不使用 checkpoint,性能会比 ByteToMessageDecoder 差。
  • 禁止部分操作: 并非所有 ByteBuf 的操作都支持。例如 indexOf()forEachByte()nioBuffer() 等操作,因为它们需要知道缓冲区的确切边界,而 ReplayingDecoderByteBuf 对上层隐藏了这个信息。调用这些方法会直接抛出 UnsupportedOperationException
  • 状态变量需要重置: 由于 decode 方法可能会被多次调用来解码同一个逻辑消息,必须小心处理成员变量。任何在 decode 过程中被修改的成员变量,都可能需要在下一次重放时被重置,否则会产生逻辑错误。最稳妥的方式就是使用 checkpoint 和状态机,将状态保存在 ReplayingDecoder 内部。

总结

ReplayingDecoder 是一个设计上非常优雅的工具,它以一种独特的方式解决了网络编程中常见的半包问题。

  • 优点:

    • 极大地简化了解码器的代码,使其更易读、更易维护。
    • 通过 checkpoint 和状态机,可以清晰地描述复杂协议的解码流程。
  • 缺点:

    • 存在一定的性能损耗,不适用于对性能要求极致的场景。
    • 对 ByteBuf 的操作有限制。
  • 适用场景:

    • 协议的结构相对固定、简单,例如 "长度+内容" 这种模式。
    • 协议比较复杂,使用状态机配合 checkpoint 可以让解码逻辑比手写大量 if 判断更加清晰。
    • 对开发效率的看重超过对极致性能的追求。
http://www.dtcms.com/a/393609.html

相关文章:

  • getgeo 生物信息 R语言 表型信息表”“样本信息表”或“临床信息表 phenodata phenotype data
  • OceanBase备租户创建(二):通过BACKUP DATABASE PLUS ARCHIVELOG
  • Linux文件打包压缩与软件安装管理完全指南
  • KingbaseES数据备份操作详解(图文教程)
  • 中断屏蔽实现方法-ARM内核
  • Kotlin 协程之 SharedFlow 与 StateFlow 深度解析
  • python爬虫(请求+解析+案例)
  • 111-Christopher-Dall_Arm-Timers-and-Fire:Arm架构计时器与半虚拟化时间
  • switch缺少break出现bug
  • 【自然语言处理】(3) --RNN循环神经网络
  • C# 中的 ReferenceEquals 方法
  • BERT:用于语言理解的深度双向Transformer预训练【简单分析】
  • 力扣hot100:两数相加(模拟竖式加法详解)(2)
  • Zotero + Word 插件管理参考文献的引用
  • 用Python一键整理文件:自动分类DOCX与PDF,告别文件夹杂乱
  • Ubuntu部署Elasticsearch教程
  • 61.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--新增功能--提取金额
  • 一款基于 .NET 开源、免费、命令行式的哔哩哔哩视频内容下载工具
  • Win Semi宣布推出线性优化的GaN工艺
  • 考研408计算机网络2025年第38题真题解析
  • C++编写的经典贪吃蛇游戏
  • 风险预测模型原理
  • PS练习5:利用翻转制作图像倒影
  • 平替Jenkins,推荐一款国产开源免费的CICD工具 - Arbess
  • aws 实战小bug
  • NumPy 系列(一):numpy 数组基础
  • VSCode 的 launch.json 配置
  • OpenLayers地图交互 -- 章节六:范围交互详解
  • 分布式专题——15 ZooKeeper特性与节点数据类型详解
  • 分布式专题——16 ZooKeeper经典应用场景实战(上)