当前位置: 首页 > news >正文

Netty ByteToMessageDecoder解码机制全解析

ByteToMessageDecoder 

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

ByteToMessageDecoder 是一个 抽象的 ChannelInboundHandlerAdapter。它的核心职责是将入站的字节流(ByteBuf)解码成应用层的消息(Message)对象

在基于流的传输协议(如 TCP)中,数据并非以你发送时划分的消息包(Message)形式到达,而是一个连续的字节流。这会导致两个常见问题:

  1. 粘包(Sticking): 多个小的消息包被合并在一个TCP报文中一次性接收。
  2. 半包(Half-packet): 一个大的消息包被拆分到多个TCP报文中,一次只接收到了一部分。

ByteToMessageDecoder 的设计目标就是为了优雅地解决这两个问题。它通过内部维护一个累积缓冲区,自动将零散的 ByteBuf 累积起来,然后让子类在数据足够时进行解码。


核心组件与字段分析

ByteToMessageDecoder 内部有几个关键的字段和组件来支撑其功能。

1. 累积器 cumulator 和累积缓冲区 cumulation
  • ByteBuf cumulation;:这是解码器的核心,一个 ByteBuf 类型的累积缓冲区。所有 channelRead 事件接收到的 ByteBuf 都会被累积到这个 cumulation 中。

  • private Cumulator cumulator = MERGE_CUMULATOR;Cumulator 是一个接口,定义了如何将新的 ByteBuf (in) 合并到旧的累积缓冲区 (cumulation) 中。ByteToMessageDecoder 提供了两种默认实现:

    • MERGE_CUMULATOR (默认):

      // ... existing code ...
      public static final Cumulator MERGE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {// ...try {final int required = in.readableBytes();if (required > cumulation.maxWritableBytes() ||// ... some conditions) {// 空间不足时,扩展缓冲区return expandCumulation(alloc, cumulation, in);}// 空间足够,直接写入cumulation.writeBytes(in, in.readerIndex(), required);in.readerIndex(in.writerIndex());return cumulation;} finally {// 必须释放传入的 in buffer,因为它的数据已经被消费(复制)in.release();}}
      };
      // ... existing code ...
      

      工作方式:它尝试将新数据拷贝到现有的 cumulation 缓冲区的可写空间中。如果空间不足,它会分配一个更大的新缓冲区,并将旧数据和新数据都拷贝到这个新缓冲区里。 优点:数据总是存储在单个连续的 ByteBuf 中,访问速度快。 缺点:在缓冲区需要扩展时,会涉及内存分配和数据拷贝,可能产生性能开销。

    • COMPOSITE_CUMULATOR:

      // ... existing code ...
      public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {// ...CompositeByteBuf composite = null;try {if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {composite = (CompositeByteBuf) cumulation;// ...} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);}// 将新的 in buffer 作为一个组件添加进去composite.addFlattenedComponents(true, in);in = null;return composite;} finally {// ...}}
      };
      // ... existing code ...
      

      工作方式:它使用 CompositeByteBuf(复合缓冲区)来累积数据。它不进行内存拷贝,而是像一个列表一样,直接将新的 ByteBuf 作为组件添加进来。 优点:避免了内存拷贝,累积操作非常高效。 缺点CompositeByteBuf 的内部实现比普通 ByteBuf 复杂,随机访问数据的性能可能会稍差。

2. 状态与行为控制字段
  • private boolean singleDecode;:如果设置为 true,每次 channelRead 事件只会调用 decode 方法一次,成功解码出一个消息后就会停止,即使缓冲区里还有足够的数据可以解码出更多消息。这在某些需要进行协议升级的场景下很有用。
  • private byte decodeState;:一个内部状态机,用于处理复杂的并发和重入问题。例如,如果在 decode 方法执行期间,该 handler 被从 pipeline 中移除,这个状态机可以确保状态的正确流转和资源的安全释放。
  • private int discardAfterReads = 16; 和 private int numReads;:这是一个内存优化策略。当 decode 方法从 cumulation 缓冲区读取数据后,这部分已读空间并不会立即被回收。当读操作(channelRead)累计达到 discardAfterReads 次数后,ByteToMessageDecoder 会调用 cumulation.discardSomeReadBytes() 方法。这个方法会把可读字节移动到缓冲区的最前端,从而释放尾部的空间,防止缓冲区无限增长。

channelRead

这是解码器的入口。当上一个 handler 传来数据时,此方法被调用。

// ... existing code ...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {selfFiredChannelRead = true;CodecOutputList out = CodecOutputList.newInstance();try {first = cumulation == null;// 1. 累积数据cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);// 2. 调用解码逻辑callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {// 3. 清理和资源管理if (cumulation != null && !cumulation.isReadable()) {// ... 释放空的 cumulation ...cumulation = null;} else if (++numReads >= discardAfterReads) {// ... 执行 discardSomeReadBytes ...}int size = out.size();firedChannelRead |= out.insertSinceRecycled();// 4. 将解码后的消息传递给下一个 HandlerfireChannelRead(ctx, out, size);} finally {out.recycle();}}} else {ctx.fireChannelRead(msg);}}
// ... existing code ...

流程分解:

  1. 累积数据: 将传入的 ByteBuf (msg) 通过 cumulator 合并到 cumulation 中。
  2. 调用解码: 调用 callDecode 方法,该方法内部会循环调用子类实现的 decode 方法。
  3. 清理和资源管理: 在 finally 块中,检查 cumulation 是否已经读完。如果读完,就释放它。同时,检查是否需要执行 discardSomeReadBytes 优化。
  4. 传递结果: 将解码产生的消息列表 out 中的所有对象,通过 fireChannelRead 传递给 ChannelPipeline 中的下一个 ChannelInboundHandler

callDecode 

// ... existing code ...protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {final int outSize = out.size();if (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();// Check if this handler was removed before continuing with decoding.// If it was removed, it is not safe to continue to operate on the buffer.//// See:// - https://github.com/netty/netty/issues/4635if (ctx.isRemoved()) {break;}}int oldInputLength = in.readableBytes();decodeRemovalReentryProtection(ctx, in, out);// Check if this handler was removed before continuing the loop.// If it was removed, it is not safe to continue to operate on the buffer.//// See https://github.com/netty/netty/issues/1664if (ctx.isRemoved()) {break;}if (out.isEmpty()) {if (oldInputLength == in.readableBytes()) {break;} else {continue;}}if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}if (isSingleDecode()) {break;}}} catch (DecoderException e) {throw e;} catch (Exception cause) {throw new DecoderException(cause);}}
// ... existing code ...

callDecode 的核心是一个 while (in.isReadable()) 循环,意图是在一次 channelRead 事件中,尽可能多地从累积缓冲区 in 中解码出消息。下面是循环内部的详细步骤:

  1. 处理上一轮解码结果 (if (outSize > 0))

    • 在一个解码周期中,用户的 decode 方法可能一次性向 out 列表添加多个消息。这个 if 块检查 out 列表是否已有解码出的消息。
    • 如果有,就调用 fireChannelRead(ctx, out, outSize) 将这些消息立即传递给 pipeline 中的下一个 handler
    • 传递后,调用 out.clear() 清空列表,为下一轮解码做准备。
    • 关键安全检查if (ctx.isRemoved())。在 fireChannelRead 过程中,下游的 handler 完全有可能执行一个操作,将当前的 ByteToMessageDecoder 从 pipeline 中移除。如果发生了这种情况,后续的所有操作都是不安全的,必须立刻 break 循环。
  2. 调用用户解码逻辑

    • int oldInputLength = in.readableBytes();:在调用用户的 decode 方法之前,记录下当前缓冲区的可读字节数。这个变量至关重要,用于后续判断解码是否有效。
    • decodeRemovalReentryProtection(ctx, in, out);:这是真正调用用户实现的 decode 方法的地方。它被一个“防重入和移除保护”的方法包裹着,我们稍后详细分析这个方法。
  3. 解码后的状态检查

    • 再次进行移除安全检查if (ctx.isRemoved())。用户的 decode 方法内部也可能触发 handler 的移除操作,所以调用后必须再次检查。
    • 情况一:没有解码出消息 (if (out.isEmpty()))
      • if (oldInputLength == in.readableBytes()):如果 out 列表为空,并且缓冲区的可读字节数没有变化,这说明剩余的数据不足以构成一个完整的消息。这是最常见的情况,解码器应该停止本次解码,等待更多数据到达。因此,break 循环。
      • else { continue; }:如果 out 列表为空,但可读字节数减少了,这说明用户的 decode 方法消费了一部分数据(例如,跳过了一些无效字节或协议头),但还没有解码出完整的消息。这是一个有效的中间状态,所以用 continue 进入下一轮循环,尝试用剩余的数据继续解码。
    • 情况二:成功解码出消息 (out 不为空)
      • if (oldInputLength == in.readableBytes()):如果 out 列表不为空(解码出了消息),但缓冲区的可读字节数没有变化,这是一个严重的逻辑错误。这意味着用户的 decode 实现凭空创建了一个消息,却没有从输入流中消费任何数据。如果不加以制止,这将导致 while 循环无限执行,造成死循环。因此,Netty 在这里会主动抛出 DecoderException
    • 检查 singleDecode 标志if (isSingleDecode())。如果用户设置了 singleDecode 为 true,那么在成功解码出一批消息后,无论缓冲区是否还有数据,都应立即 break 循环。

decodeRemovalReentryProtection

这是对用户 decode 方法的核心封装,目的是为了安全。

// ... existing code ...final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {decode(ctx, in, out);} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}}
// ... existing code ...

它的作用是:

  • 在调用 decode(ctx, in, out) 之前,将内部状态 decodeState 设置为 STATE_CALLING_CHILD_DECODE
  • 如果在 decode 方法执行期间,handlerRemoved 方法被从另一个线程或事件回调中调用,handlerRemoved 会检查到这个状态,它不会立即执行移除逻辑,而是将状态改为 STATE_HANDLER_REMOVED_PENDING 然后返回。
  • 当 decode 方法执行完毕,finally 块会检查状态是否为 STATE_HANDLER_REMOVED_PENDING。如果是,说明在解码期间有一个“待处理的移除请求”。此时,它会安全地将解码出的最后结果 fire 出去,然后再调用 handlerRemoved(ctx) 来完成真正的移除清理工作。
  • 这个机制确保了 decode 方法的原子性,防止了在解码过程中状态被破坏的风险。

fireChannelRead

这个静态方法负责将解码后的消息列表传递出去。

// ... existing code ...static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {if (msgs instanceof CodecOutputList) {fireChannelRead(ctx, (CodecOutputList) msgs, numElements);} else {for (int i = 0; i < numElements; i++) {ctx.fireChannelRead(msgs.get(i));}}}static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {for (int i = 0; i < numElements; i ++) {ctx.fireChannelRead(msgs.getUnsafe(i));}}
// ... existing code ...

逻辑很简单:遍历列表,对每个解码出的消息调用 ctx.fireChannelRead()。它对 Netty 内部使用的 CodecOutputList 做了优化,使用 getUnsafe 方法来提升性能。

总结

callDecode 的逻辑比表面看起来要复杂得多,它不仅仅是一个简单的循环调用。它是一个非常健壮的执行引擎,内置了多种检查和保护机制:

  • 状态驱动的循环:循环的终止条件是动态的,取决于“是否还有数据”、“是否解码出消息”、“是否消耗了字节”以及“是否设置了 singleDecode”。
  • 严格的契约执行:强制要求解码出消息的 decode 方法必须消耗输入字节,否则抛出异常,防止开发者写出死循环的解码器。
  • 并发和重入安全:通过 decodeRemovalReentryProtection 和状态机,优雅地处理了在解码过程中自身被移除的复杂情况。
  • 及时的事件传播:一旦解码出消息,会立即 fire 出去,而不是等整个 while 循环结束,这降低了消息传递的延迟。

decode

这是开发者必须实现的抽象方法,是解码逻辑的核心所在。

// ... existing code .../*** Decode the from one {@link ByteBuf} to an other. This method will be called till either the input* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input* {@link ByteBuf}.** @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to* @param in            the {@link ByteBuf} from which to read data* @param out           the {@link List} to which decoded messages should be added* @throws Exception    is thrown if an error occurs*/protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
// ... existing code ...

实现要点:

  • 检查可读字节: 在读取前,先检查 in.readableBytes() 是否足够构成一个完整的消息。
  • 数据不足则返回: 如果数据不足,不要做任何读取操作,直接 returnByteToMessageDecoder 会保留当前 cumulation 的内容,等待下一次 channelRead 带来更多数据。
  • 数据足够则解码: 如果数据足够,从 in 中读取相应字节(例如 in.readInt()in.readBytes(len)),解码成一个或多个消息对象,然后将这些对象 out.add(message) 到输出列表 out 中。
  • 循环解码ByteToMessageDecoder 的 callDecode 方法会在一个循环中调用你的 decode 方法,直到 in 中不再有足够的数据可以组成一个新消息为止。

channelReadComplete 和 AutoRead

// ... existing code ...@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {numReads = 0;discardSomeReadBytes();if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {ctx.read();}firedChannelRead = false;selfFiredChannelRead = false;ctx.fireChannelReadComplete();}
// ... existing code ...

这个方法有一个非常重要的逻辑:当 Channel 的 isAutoRead 选项为 false 时,Netty 不会自动从底层 Socket 读取数据。开发者需要手动调用 ctx.read() 来触发下一次读取。 ByteToMessageDecoder 在这里做了一个智能判断:如果在一次完整的读操作中(从 channelRead 到 channelReadComplete),它接收到了数据(selfFiredChannelRead 为 true),但没有成功解码出任何消息传递给下一个 handler!firedChannelRead),它会认为可能是因为数据不够,所以主动调用 ctx.read() 去请求更多数据,防止数据流中断。

handlerRemoved 和 channelInactive

这两个方法处理解码器生命周期结束时的逻辑。

  • 当 Channel 关闭 (channelInactive) 或 handler 被移除 (handlerRemoved) 时,ByteToMessageDecoder 会做最后的清理工作。
  • 它会调用 decodeLast 方法,让子类有机会处理 cumulation 中可能存在的、不完整的尾部数据。
  • 最后,它会确保 cumulation 缓冲区被释放,防止内存泄漏。

总结

ByteToMessageDecoder 是 Netty 中一个设计精良且功能强大的基础组件。它通过以下机制,极大地简化了网络编程中处理字节流解码的复杂性:

  1. 自动字节累积: 开发者无需手动管理零散的 ByteBuf,解码器会自动处理粘包/半包问题。
  2. 模板方法模式: 它定义了整个解码流程的骨架 (channelRead -> callDecode -> decode),开发者只需要填充核心的 decode 逻辑即可。
  3. 高效的内存管理: 提供了 MERGE_CUMULATOR 和 COMPOSITE_CUMULATOR 两种策略,并内置了 discardSomeReadBytes 这样的内存整理机制。
  4. 健壮的生命周期管理: 妥善处理了 handler 添加、移除、channel 关闭等事件,确保资源能被正确释放。
  5. 对 AutoRead 的智能处理: 在非自动读取模式下,能智能地触发下一次数据读取,保证数据处理的流畅性。

ByteToMessageDecoder (父类) 的职责:搭建通用框架

父类不知道应用层协议长什么样,但它解决了所有协议解码都会面临的共性问题:数据不是一次性完整到达的

它的核心贡献体现在以下几点:

  1. 创建并管理累积缓冲区 (cumulation) 这是解决半包问题的基石。当 channelRead 方法接收到一个 ByteBuf 时,它做的第一件事不是调用 decode,而是将这个 ByteBuf 的数据累积到内部的 cumulation 缓冲区中。

    // ... existing code ...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {// ...try {// ...// 关键步骤:将新到达的 msg 累积到 cumulation 中cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);// 然后才用累积后的完整数据去调用解码逻辑callDecode(ctx, cumulation, out);
    // ... existing code ...
    

    作用:无论网络底层把一个完整的消息拆分成了多少个“半包”发送过来,ByteToMessageDecoder 都会帮你把它们重新拼接在一起,存放在 cumulation 里。

  2. 提供循环解码机制 (callDecode) 这是解决粘包问题的关键。callDecode 方法内部有一个 while 循环,只要缓冲区里还有数据,它就会持续尝试调用子类的 decode 方法。

    // ... existing code ...
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {// 关键循环:只要缓冲区可读,就一直尝试解码while (in.isReadable()) {// ...int oldInputLength = in.readableBytes();decodeRemovalReentryProtection(ctx, in, out); // 调用子类的 decode// ...if (out.isEmpty()) {if (oldInputLength == in.readableBytes()) {// 如果没读出东西,并且字节也没少,说明是半包,跳出循环break;} else {// 字节少了但没读出东西,说明子类可能在做一些跳过字节的操作,继续循环continue;}}// ...}
    // ... existing code ...
    

    作用:如果网络一次性发来一个包含多个消息的“粘包”,这个 while 循环会确保 decode 方法被连续调用,直到把所有完整的消息都从缓冲区中解码出来为止。

  3. 处理“数据不足”的场景 这是整个机制能正常工作的核心。当子类的 decode 方法发现 cumulation 里的数据不足以构成一个完整消息时,它会直接返回,并且不从缓冲区中读取任何字节。 此时,callDecode 方法里的 break 逻辑会被触发(因为没有新消息产出,并且可读字节数未变)。循环终止,但重要的是,cumulation 里的数据被完整保留了下来。它会静静地等待下一次 channelRead 事件带来更多的数据,然后在新数据累积进来后,再次尝试解码。

子类 decode 方法的职责:定义消息边界

子类的工作被大大简化了。它不需要关心数据是怎么来的、是不是完整的,它只需要做一个“裁判”:

  • 输入:父类提供的、已经累积好的 ByteBuf
  • 逻辑:根据具体的协议规则,判断这个 ByteBuf 里的数据当前是否足够解析出一个或多个完整的消息。
    • 如果足够:就从 ByteBuf 中读取相应的数据,解码成消息对象,添加到 out 列表中。
    • 如果不够:就什么也不做,直接 return

总结

我们可以用一个比喻来理解这个关系:

  • ByteToMessageDecoder (父类) 就像一个自动化的流水线工作台。它负责从卡车(网络)上卸货(ByteBuf),并把所有零件(半包)都堆在工作台上(cumulation)。它还设定了一个规则:只要工作台上有零件,就让工人一直干活(while循环)。
  • Decoder (子类) 就像一个熟练的工人。他只关心一件事:看着工作台上的零件,根据图纸(协议规范),判断是否能组装出一个完整的产品(消息)。能组装就拿走零件组装起来,不能就停手等待下一批零件。

所以,ByteToMessageDecoder 并没有直接解析你的协议,但它通过提供累积、循环和状态保持的通用框架,完美地解决了半包和粘包问题,让子类可以专注于核心的、与协议相关的解码逻辑。

FixedLengthFrameDecoder

首先,看它的定义和用途:

// ... existing code ...
/*** A decoder that splits the received {@link ByteBuf}s by the fixed number* of bytes. For example, if you received the following four fragmented packets:* <pre>* +---+----+------+----+* |A| BC | DEFG | HI |* +---+----+------+----+* </pre>* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the* following three packets with the fixed length:* <pre>* +-----+-----+-----+* | ABC | DEF | GHI |* +-----+-----+-----+* </pre>*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
// ... existing code ...

它的作用非常明确:将接收到的字节流,按照一个固定的长度 (frameLength) 进行切分。无论底层传来的是半包还是粘包,它都能正确地切分出长度为 frameLength 的数据帧(Frame)。

构造函数与核心字段

// ... existing code ...
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {private final int frameLength;/*** Creates a new instance.** @param frameLength the length of the frame*/public FixedLengthFrameDecoder(int frameLength) {checkPositive(frameLength, "frameLength");this.frameLength = frameLength;}
// ... existing code ...

逻辑非常简单:

  • 它只有一个核心字段 frameLength,在构造时传入,代表每个数据帧的固定长度。
  • 构造函数会检查 frameLength 必须为正数。

核心解码逻辑 decode

FixedLengthFrameDecoder 巧妙地重写了两个 decode 方法,我们来分析一下:

// ... existing code ...@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}}/*** Create a frame out of the {@link ByteBuf} and return it.* ...*/protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readRetainedSlice(frameLength);}}
}
  1. protected Object decode(ChannelHandlerContext ctx, ByteBuf in)

    • 这个方法是 FixedLengthFrameDecoder 自己新增的一个 decode 重载方法,它负责真正的解码逻辑。
    • if (in.readableBytes() < frameLength): 这是最关键的一步。它检查父类 ByteToMessageDecoder 传来的累积缓冲区 in 中的可读字节数,是否小于我们期望的固定长度 frameLength
      • 如果小于,说明这是一个“半包”,数据还不够。此时它直接 return null
      • 如果大于或等于,说明数据足够,可以解码出一个完整的数据帧。
    • return in.readRetainedSlice(frameLength);: 当数据足够时,它调用 in.readRetainedSlice(frameLength)。这个方法会从 in 中读取 frameLength 个字节,并返回一个新的、独立的 ByteBuf 切片。这个切片与原始 ByteBuf 共享底层内存,但有独立的读写指针。read 操作会移动 in 的 readerIndex,而 retained 则会增加返回的 ByteBuf 的引用计数,确保它在传递给下一个 handler 后不会被意外释放。
  2. protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

    • 这个方法是重写 ByteToMessageDecoder 的抽象方法。
    • 它的逻辑非常简单:调用上面那个 decode 方法,拿到解码结果 decoded
    • 如果 decoded 不为 null(即成功解码出一个帧),就把它加入到输出列表 out 中。

父子类协作流程分析

现在,我们把父类 ByteToMessageDecoder 的行为和子类 FixedLengthFrameDecoder 的行为串联起来,看看一个完整的协作流程:

  1. [父类] channelRead 被触发,接收到一个 ByteBuf(可能是半包或粘包)。
  2. [父类] 将这个 ByteBuf 的数据累积到内部的 cumulation 缓冲区。
  3. [父类] 调用 callDecode 方法,callDecode 内部的 while 循环开始执行。
  4. [父类] callDecode 调用 decodeRemovalReentryProtection,后者最终调用子类实现的 decode(ctx, cumulation, out) 方法。
  5. [子类] FixedLengthFrameDecoder 的 decode(ctx, in, out) 方法被调用。它内部又调用了自己的 decode(ctx, in) 方法。
  6. [子类] decode(ctx, in) 检查 in.readableBytes()
    • 场景A (半包): 可读字节数小于 frameLength。它返回 null
    • 场景B (数据足够): 可读字节数大于等于 frameLength。它读取 frameLength 字节,返回一个新的 ByteBuf
  7. [子类] decode(ctx, in, out) 拿到上一步的结果:
    • 场景A: 结果是 null,它什么也不做,直接返回。
    • 场景B: 结果是一个 ByteBuf,它将这个 ByteBuf 添加到 out 列表中。
  8. [父类] callDecode 检查 decode 方法执行后的状态:
    • 场景A: out 列表为空,且 cumulation 的可读字节数未变。callDecode 知道这是半包情况,于是 break 循环,等待更多数据。
    • 场景B: out 列表不为空。callDecode 会将 out 中的消息 fireChannelRead 给下一个 handler,清空 out 列表,然后继续 while 循环,尝试从 cumulation 剩余的字节中解码下一个帧(这就是处理粘包的关键)。
  9. 循环往复,直到 cumulation 中的数据不足以构成一个 frameLength 的帧为止。

总结

通过 FixedLengthFrameDecoder 这个简单的例子,我们可以清晰地看到:

  • 父类 ByteToMessageDecoder 负责了所有通用的、与具体协议无关的脏活累活:字节累积、循环调用、状态管理、半包数据保留、粘包数据处理
  • 子类 FixedLengthFrameDecoder 只需聚焦于一个核心问题:“如何定义一个完整的消息帧?”。它的答案是:“当字节数达到 frameLength 时”。它用短短几行代码实现了这个判断和切片逻辑。

这种设计模式是典型的模板方法模式,极大地提高了代码的复用性和可维护性,让开发者可以快速开发出针对各种私有协议的解码器,而无需关心底层的半包/粘包处理细节。

http://www.dtcms.com/a/391509.html

相关文章:

  • scrapy项目-爬取某招聘网站信息
  • 解决ubuntu下搜狗输入法在浏览器不可用的问题
  • 设计模式- 命令模式详解
  • 谈一谈Java成员变量,局部变量和静态变量的创建和回收时机
  • OSCP - Proving Grounds - Leyla
  • 9 月 19 日 IT 界热点大赏:科技浪潮下的创新与变革
  • 自动化脚本的零失误之路
  • Redis(三)Redis集群的三种模式
  • 网络环路:成因、影响与防环机制深度解析
  • 力扣刷题笔记(1)--面试150数组部分
  • 分割模型Maskformer
  • C# TCP的方式 实现上传文件
  • 高压消解罐:难溶物质消解的首选工具
  • JavaScript 字符串截取最后一位的几种方法
  • MobileNetV3训练自定义数据集并通过C++进行推理模型部署
  • nvshmem源码学习(一)ibgda视角的整体流程
  • Redis群集的三种模式
  • 鸿蒙(南向/北向)
  • Spring IoCDI 快速入门
  • MySQL的C语言驱动核心——`mysql_real_connect()` 函数
  • C++线程池学习 Day06
  • React 样式CSS的定义 多种定义方式 前端基础
  • react+anddesign组件Tabs实现后台管理系统自定义页签头
  • Midscene 低代码实现Android自动化
  • ADB使用指南
  • FunCaptcha如何查找sitekey参数
  • 大模型如何让机器人实现“从冰箱里拿一瓶可乐”?
  • Python实现液体蒸发优化算法 (Evaporation Rate Water Cycle Algorithm, ER-WCA)(附完整代码)
  • MySQL 数据库的「超级钥匙」—`mysql_real_connect`
  • LeetCode 每日一题 3484. 设计电子表格