Netty ByteToMessageDecoder解码机制全解析
ByteToMessageDecoder
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
ByteToMessageDecoder
是一个 抽象的 ChannelInboundHandlerAdapter
。它的核心职责是将入站的字节流(ByteBuf
)解码成应用层的消息(Message)对象。
在基于流的传输协议(如 TCP)中,数据并非以你发送时划分的消息包(Message)形式到达,而是一个连续的字节流。这会导致两个常见问题:
- 粘包(Sticking): 多个小的消息包被合并在一个TCP报文中一次性接收。
- 半包(Half-packet): 一个大的消息包被拆分到多个TCP报文中,一次只接收到了一部分。
ByteToMessageDecoder
的设计目标就是为了优雅地解决这两个问题。它通过内部维护一个累积缓冲区,自动将零散的 ByteBuf
累积起来,然后让子类在数据足够时进行解码。
核心组件与字段分析
ByteToMessageDecoder
内部有几个关键的字段和组件来支撑其功能。
1. 累积器 cumulator
和累积缓冲区 cumulation
ByteBuf cumulation;
:这是解码器的核心,一个ByteBuf
类型的累积缓冲区。所有channelRead
事件接收到的ByteBuf
都会被累积到这个cumulation
中。private Cumulator cumulator = MERGE_CUMULATOR;
:Cumulator
是一个接口,定义了如何将新的ByteBuf
(in
) 合并到旧的累积缓冲区 (cumulation
) 中。ByteToMessageDecoder
提供了两种默认实现:MERGE_CUMULATOR
(默认):// ... existing code ... public static final Cumulator MERGE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {// ...try {final int required = in.readableBytes();if (required > cumulation.maxWritableBytes() ||// ... some conditions) {// 空间不足时,扩展缓冲区return expandCumulation(alloc, cumulation, in);}// 空间足够,直接写入cumulation.writeBytes(in, in.readerIndex(), required);in.readerIndex(in.writerIndex());return cumulation;} finally {// 必须释放传入的 in buffer,因为它的数据已经被消费(复制)in.release();}} }; // ... existing code ...
工作方式:它尝试将新数据拷贝到现有的
cumulation
缓冲区的可写空间中。如果空间不足,它会分配一个更大的新缓冲区,并将旧数据和新数据都拷贝到这个新缓冲区里。 优点:数据总是存储在单个连续的ByteBuf
中,访问速度快。 缺点:在缓冲区需要扩展时,会涉及内存分配和数据拷贝,可能产生性能开销。COMPOSITE_CUMULATOR
:// ... existing code ... public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {// ...CompositeByteBuf composite = null;try {if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {composite = (CompositeByteBuf) cumulation;// ...} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);}// 将新的 in buffer 作为一个组件添加进去composite.addFlattenedComponents(true, in);in = null;return composite;} finally {// ...}} }; // ... existing code ...
工作方式:它使用
CompositeByteBuf
(复合缓冲区)来累积数据。它不进行内存拷贝,而是像一个列表一样,直接将新的ByteBuf
作为组件添加进来。 优点:避免了内存拷贝,累积操作非常高效。 缺点:CompositeByteBuf
的内部实现比普通ByteBuf
复杂,随机访问数据的性能可能会稍差。
2. 状态与行为控制字段
private boolean singleDecode;
:如果设置为true
,每次channelRead
事件只会调用decode
方法一次,成功解码出一个消息后就会停止,即使缓冲区里还有足够的数据可以解码出更多消息。这在某些需要进行协议升级的场景下很有用。private byte decodeState;
:一个内部状态机,用于处理复杂的并发和重入问题。例如,如果在decode
方法执行期间,该handler
被从pipeline
中移除,这个状态机可以确保状态的正确流转和资源的安全释放。private int discardAfterReads = 16;
和private int numReads;
:这是一个内存优化策略。当decode
方法从cumulation
缓冲区读取数据后,这部分已读空间并不会立即被回收。当读操作(channelRead
)累计达到discardAfterReads
次数后,ByteToMessageDecoder
会调用cumulation.discardSomeReadBytes()
方法。这个方法会把可读字节移动到缓冲区的最前端,从而释放尾部的空间,防止缓冲区无限增长。
channelRead
这是解码器的入口。当上一个 handler
传来数据时,此方法被调用。
// ... existing code ...@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {selfFiredChannelRead = true;CodecOutputList out = CodecOutputList.newInstance();try {first = cumulation == null;// 1. 累积数据cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);// 2. 调用解码逻辑callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {// 3. 清理和资源管理if (cumulation != null && !cumulation.isReadable()) {// ... 释放空的 cumulation ...cumulation = null;} else if (++numReads >= discardAfterReads) {// ... 执行 discardSomeReadBytes ...}int size = out.size();firedChannelRead |= out.insertSinceRecycled();// 4. 将解码后的消息传递给下一个 HandlerfireChannelRead(ctx, out, size);} finally {out.recycle();}}} else {ctx.fireChannelRead(msg);}}
// ... existing code ...
流程分解:
- 累积数据: 将传入的
ByteBuf
(msg
) 通过cumulator
合并到cumulation
中。 - 调用解码: 调用
callDecode
方法,该方法内部会循环调用子类实现的decode
方法。 - 清理和资源管理: 在
finally
块中,检查cumulation
是否已经读完。如果读完,就释放它。同时,检查是否需要执行discardSomeReadBytes
优化。 - 传递结果: 将解码产生的消息列表
out
中的所有对象,通过fireChannelRead
传递给ChannelPipeline
中的下一个ChannelInboundHandler
。
callDecode
// ... existing code ...protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {final int outSize = out.size();if (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();// Check if this handler was removed before continuing with decoding.// If it was removed, it is not safe to continue to operate on the buffer.//// See:// - https://github.com/netty/netty/issues/4635if (ctx.isRemoved()) {break;}}int oldInputLength = in.readableBytes();decodeRemovalReentryProtection(ctx, in, out);// Check if this handler was removed before continuing the loop.// If it was removed, it is not safe to continue to operate on the buffer.//// See https://github.com/netty/netty/issues/1664if (ctx.isRemoved()) {break;}if (out.isEmpty()) {if (oldInputLength == in.readableBytes()) {break;} else {continue;}}if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}if (isSingleDecode()) {break;}}} catch (DecoderException e) {throw e;} catch (Exception cause) {throw new DecoderException(cause);}}
// ... existing code ...
callDecode
的核心是一个 while (in.isReadable())
循环,意图是在一次 channelRead
事件中,尽可能多地从累积缓冲区 in
中解码出消息。下面是循环内部的详细步骤:
处理上一轮解码结果 (
if (outSize > 0)
)- 在一个解码周期中,用户的
decode
方法可能一次性向out
列表添加多个消息。这个if
块检查out
列表是否已有解码出的消息。 - 如果有,就调用
fireChannelRead(ctx, out, outSize)
将这些消息立即传递给pipeline
中的下一个handler
。 - 传递后,调用
out.clear()
清空列表,为下一轮解码做准备。 - 关键安全检查:
if (ctx.isRemoved())
。在fireChannelRead
过程中,下游的handler
完全有可能执行一个操作,将当前的ByteToMessageDecoder
从pipeline
中移除。如果发生了这种情况,后续的所有操作都是不安全的,必须立刻break
循环。
- 在一个解码周期中,用户的
调用用户解码逻辑
int oldInputLength = in.readableBytes();
:在调用用户的decode
方法之前,记录下当前缓冲区的可读字节数。这个变量至关重要,用于后续判断解码是否有效。decodeRemovalReentryProtection(ctx, in, out);
:这是真正调用用户实现的decode
方法的地方。它被一个“防重入和移除保护”的方法包裹着,我们稍后详细分析这个方法。
解码后的状态检查
- 再次进行移除安全检查:
if (ctx.isRemoved())
。用户的decode
方法内部也可能触发handler
的移除操作,所以调用后必须再次检查。 - 情况一:没有解码出消息 (
if (out.isEmpty())
)if (oldInputLength == in.readableBytes())
:如果out
列表为空,并且缓冲区的可读字节数没有变化,这说明剩余的数据不足以构成一个完整的消息。这是最常见的情况,解码器应该停止本次解码,等待更多数据到达。因此,break
循环。else { continue; }
:如果out
列表为空,但可读字节数减少了,这说明用户的decode
方法消费了一部分数据(例如,跳过了一些无效字节或协议头),但还没有解码出完整的消息。这是一个有效的中间状态,所以用continue
进入下一轮循环,尝试用剩余的数据继续解码。
- 情况二:成功解码出消息 (
out
不为空)if (oldInputLength == in.readableBytes())
:如果out
列表不为空(解码出了消息),但缓冲区的可读字节数没有变化,这是一个严重的逻辑错误。这意味着用户的decode
实现凭空创建了一个消息,却没有从输入流中消费任何数据。如果不加以制止,这将导致while
循环无限执行,造成死循环。因此,Netty 在这里会主动抛出DecoderException
。
- 检查
singleDecode
标志:if (isSingleDecode())
。如果用户设置了singleDecode
为true
,那么在成功解码出一批消息后,无论缓冲区是否还有数据,都应立即break
循环。
- 再次进行移除安全检查:
decodeRemovalReentryProtection
这是对用户 decode
方法的核心封装,目的是为了安全。
// ... existing code ...final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {decode(ctx, in, out);} finally {boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;decodeState = STATE_INIT;if (removePending) {fireChannelRead(ctx, out, out.size());out.clear();handlerRemoved(ctx);}}}
// ... existing code ...
它的作用是:
- 在调用
decode(ctx, in, out)
之前,将内部状态decodeState
设置为STATE_CALLING_CHILD_DECODE
。 - 如果在
decode
方法执行期间,handlerRemoved
方法被从另一个线程或事件回调中调用,handlerRemoved
会检查到这个状态,它不会立即执行移除逻辑,而是将状态改为STATE_HANDLER_REMOVED_PENDING
然后返回。 - 当
decode
方法执行完毕,finally
块会检查状态是否为STATE_HANDLER_REMOVED_PENDING
。如果是,说明在解码期间有一个“待处理的移除请求”。此时,它会安全地将解码出的最后结果fire
出去,然后再调用handlerRemoved(ctx)
来完成真正的移除清理工作。 - 这个机制确保了
decode
方法的原子性,防止了在解码过程中状态被破坏的风险。
fireChannelRead
这个静态方法负责将解码后的消息列表传递出去。
// ... existing code ...static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {if (msgs instanceof CodecOutputList) {fireChannelRead(ctx, (CodecOutputList) msgs, numElements);} else {for (int i = 0; i < numElements; i++) {ctx.fireChannelRead(msgs.get(i));}}}static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {for (int i = 0; i < numElements; i ++) {ctx.fireChannelRead(msgs.getUnsafe(i));}}
// ... existing code ...
逻辑很简单:遍历列表,对每个解码出的消息调用 ctx.fireChannelRead()
。它对 Netty 内部使用的 CodecOutputList
做了优化,使用 getUnsafe
方法来提升性能。
总结
callDecode
的逻辑比表面看起来要复杂得多,它不仅仅是一个简单的循环调用。它是一个非常健壮的执行引擎,内置了多种检查和保护机制:
- 状态驱动的循环:循环的终止条件是动态的,取决于“是否还有数据”、“是否解码出消息”、“是否消耗了字节”以及“是否设置了
singleDecode
”。 - 严格的契约执行:强制要求解码出消息的
decode
方法必须消耗输入字节,否则抛出异常,防止开发者写出死循环的解码器。 - 并发和重入安全:通过
decodeRemovalReentryProtection
和状态机,优雅地处理了在解码过程中自身被移除的复杂情况。 - 及时的事件传播:一旦解码出消息,会立即
fire
出去,而不是等整个while
循环结束,这降低了消息传递的延迟。
decode
这是开发者必须实现的抽象方法,是解码逻辑的核心所在。
// ... existing code .../*** Decode the from one {@link ByteBuf} to an other. This method will be called till either the input* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input* {@link ByteBuf}.** @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to* @param in the {@link ByteBuf} from which to read data* @param out the {@link List} to which decoded messages should be added* @throws Exception is thrown if an error occurs*/protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
// ... existing code ...
实现要点:
- 检查可读字节: 在读取前,先检查
in.readableBytes()
是否足够构成一个完整的消息。 - 数据不足则返回: 如果数据不足,不要做任何读取操作,直接
return
。ByteToMessageDecoder
会保留当前cumulation
的内容,等待下一次channelRead
带来更多数据。 - 数据足够则解码: 如果数据足够,从
in
中读取相应字节(例如in.readInt()
、in.readBytes(len)
),解码成一个或多个消息对象,然后将这些对象out.add(message)
到输出列表out
中。 - 循环解码:
ByteToMessageDecoder
的callDecode
方法会在一个循环中调用你的decode
方法,直到in
中不再有足够的数据可以组成一个新消息为止。
channelReadComplete
和 AutoRead
// ... existing code ...@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {numReads = 0;discardSomeReadBytes();if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {ctx.read();}firedChannelRead = false;selfFiredChannelRead = false;ctx.fireChannelReadComplete();}
// ... existing code ...
这个方法有一个非常重要的逻辑:当 Channel
的 isAutoRead
选项为 false
时,Netty 不会自动从底层 Socket 读取数据。开发者需要手动调用 ctx.read()
来触发下一次读取。 ByteToMessageDecoder
在这里做了一个智能判断:如果在一次完整的读操作中(从 channelRead
到 channelReadComplete
),它接收到了数据(selfFiredChannelRead
为 true
),但没有成功解码出任何消息传递给下一个 handler
(!firedChannelRead
),它会认为可能是因为数据不够,所以主动调用 ctx.read()
去请求更多数据,防止数据流中断。
handlerRemoved
和 channelInactive
这两个方法处理解码器生命周期结束时的逻辑。
- 当
Channel
关闭 (channelInactive
) 或handler
被移除 (handlerRemoved
) 时,ByteToMessageDecoder
会做最后的清理工作。 - 它会调用
decodeLast
方法,让子类有机会处理cumulation
中可能存在的、不完整的尾部数据。 - 最后,它会确保
cumulation
缓冲区被释放,防止内存泄漏。
总结
ByteToMessageDecoder
是 Netty 中一个设计精良且功能强大的基础组件。它通过以下机制,极大地简化了网络编程中处理字节流解码的复杂性:
- 自动字节累积: 开发者无需手动管理零散的
ByteBuf
,解码器会自动处理粘包/半包问题。 - 模板方法模式: 它定义了整个解码流程的骨架 (
channelRead
->callDecode
->decode
),开发者只需要填充核心的decode
逻辑即可。 - 高效的内存管理: 提供了
MERGE_CUMULATOR
和COMPOSITE_CUMULATOR
两种策略,并内置了discardSomeReadBytes
这样的内存整理机制。 - 健壮的生命周期管理: 妥善处理了
handler
添加、移除、channel
关闭等事件,确保资源能被正确释放。 - 对
AutoRead
的智能处理: 在非自动读取模式下,能智能地触发下一次数据读取,保证数据处理的流畅性。
ByteToMessageDecoder
(父类) 的职责:搭建通用框架
父类不知道应用层协议长什么样,但它解决了所有协议解码都会面临的共性问题:数据不是一次性完整到达的。
它的核心贡献体现在以下几点:
创建并管理累积缓冲区 (
cumulation
) 这是解决半包问题的基石。当channelRead
方法接收到一个ByteBuf
时,它做的第一件事不是调用decode
,而是将这个ByteBuf
的数据累积到内部的cumulation
缓冲区中。// ... existing code ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {// ...try {// ...// 关键步骤:将新到达的 msg 累积到 cumulation 中cumulation = cumulator.cumulate(ctx.alloc(),first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);// 然后才用累积后的完整数据去调用解码逻辑callDecode(ctx, cumulation, out); // ... existing code ...
作用:无论网络底层把一个完整的消息拆分成了多少个“半包”发送过来,
ByteToMessageDecoder
都会帮你把它们重新拼接在一起,存放在cumulation
里。提供循环解码机制 (
callDecode
) 这是解决粘包问题的关键。callDecode
方法内部有一个while
循环,只要缓冲区里还有数据,它就会持续尝试调用子类的decode
方法。// ... existing code ... protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {// 关键循环:只要缓冲区可读,就一直尝试解码while (in.isReadable()) {// ...int oldInputLength = in.readableBytes();decodeRemovalReentryProtection(ctx, in, out); // 调用子类的 decode// ...if (out.isEmpty()) {if (oldInputLength == in.readableBytes()) {// 如果没读出东西,并且字节也没少,说明是半包,跳出循环break;} else {// 字节少了但没读出东西,说明子类可能在做一些跳过字节的操作,继续循环continue;}}// ...} // ... existing code ...
作用:如果网络一次性发来一个包含多个消息的“粘包”,这个
while
循环会确保decode
方法被连续调用,直到把所有完整的消息都从缓冲区中解码出来为止。处理“数据不足”的场景 这是整个机制能正常工作的核心。当子类的
decode
方法发现cumulation
里的数据不足以构成一个完整消息时,它会直接返回,并且不从缓冲区中读取任何字节。 此时,callDecode
方法里的break
逻辑会被触发(因为没有新消息产出,并且可读字节数未变)。循环终止,但重要的是,cumulation
里的数据被完整保留了下来。它会静静地等待下一次channelRead
事件带来更多的数据,然后在新数据累积进来后,再次尝试解码。
子类 decode
方法的职责:定义消息边界
子类的工作被大大简化了。它不需要关心数据是怎么来的、是不是完整的,它只需要做一个“裁判”:
- 输入:父类提供的、已经累积好的
ByteBuf
。 - 逻辑:根据具体的协议规则,判断这个
ByteBuf
里的数据当前是否足够解析出一个或多个完整的消息。- 如果足够:就从
ByteBuf
中读取相应的数据,解码成消息对象,添加到out
列表中。 - 如果不够:就什么也不做,直接
return
。
- 如果足够:就从
总结
我们可以用一个比喻来理解这个关系:
ByteToMessageDecoder
(父类) 就像一个自动化的流水线工作台。它负责从卡车(网络)上卸货(ByteBuf
),并把所有零件(半包)都堆在工作台上(cumulation
)。它还设定了一个规则:只要工作台上有零件,就让工人一直干活(while
循环)。Decoder
(子类) 就像一个熟练的工人。他只关心一件事:看着工作台上的零件,根据图纸(协议规范),判断是否能组装出一个完整的产品(消息)。能组装就拿走零件组装起来,不能就停手等待下一批零件。
所以,ByteToMessageDecoder
并没有直接解析你的协议,但它通过提供累积、循环和状态保持的通用框架,完美地解决了半包和粘包问题,让子类可以专注于核心的、与协议相关的解码逻辑。
FixedLengthFrameDecoder
首先,看它的定义和用途:
// ... existing code ...
/*** A decoder that splits the received {@link ByteBuf}s by the fixed number* of bytes. For example, if you received the following four fragmented packets:* <pre>* +---+----+------+----+* |A| BC | DEFG | HI |* +---+----+------+----+* </pre>* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the* following three packets with the fixed length:* <pre>* +-----+-----+-----+* | ABC | DEF | GHI |* +-----+-----+-----+* </pre>*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
// ... existing code ...
它的作用非常明确:将接收到的字节流,按照一个固定的长度 (frameLength
) 进行切分。无论底层传来的是半包还是粘包,它都能正确地切分出长度为 frameLength
的数据帧(Frame)。
构造函数与核心字段
// ... existing code ...
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {private final int frameLength;/*** Creates a new instance.** @param frameLength the length of the frame*/public FixedLengthFrameDecoder(int frameLength) {checkPositive(frameLength, "frameLength");this.frameLength = frameLength;}
// ... existing code ...
逻辑非常简单:
- 它只有一个核心字段
frameLength
,在构造时传入,代表每个数据帧的固定长度。 - 构造函数会检查
frameLength
必须为正数。
核心解码逻辑 decode
FixedLengthFrameDecoder
巧妙地重写了两个 decode
方法,我们来分析一下:
// ... existing code ...@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}}/*** Create a frame out of the {@link ByteBuf} and return it.* ...*/protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readRetainedSlice(frameLength);}}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
- 这个方法是
FixedLengthFrameDecoder
自己新增的一个decode
重载方法,它负责真正的解码逻辑。 if (in.readableBytes() < frameLength)
: 这是最关键的一步。它检查父类ByteToMessageDecoder
传来的累积缓冲区in
中的可读字节数,是否小于我们期望的固定长度frameLength
。- 如果小于,说明这是一个“半包”,数据还不够。此时它直接
return null
。 - 如果大于或等于,说明数据足够,可以解码出一个完整的数据帧。
- 如果小于,说明这是一个“半包”,数据还不够。此时它直接
return in.readRetainedSlice(frameLength);
: 当数据足够时,它调用in.readRetainedSlice(frameLength)
。这个方法会从in
中读取frameLength
个字节,并返回一个新的、独立的ByteBuf
切片。这个切片与原始ByteBuf
共享底层内存,但有独立的读写指针。read
操作会移动in
的readerIndex
,而retained
则会增加返回的ByteBuf
的引用计数,确保它在传递给下一个handler
后不会被意外释放。
- 这个方法是
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
- 这个方法是重写
ByteToMessageDecoder
的抽象方法。 - 它的逻辑非常简单:调用上面那个
decode
方法,拿到解码结果decoded
。 - 如果
decoded
不为null
(即成功解码出一个帧),就把它加入到输出列表out
中。
- 这个方法是重写
父子类协作流程分析
现在,我们把父类 ByteToMessageDecoder
的行为和子类 FixedLengthFrameDecoder
的行为串联起来,看看一个完整的协作流程:
- [父类]
channelRead
被触发,接收到一个ByteBuf
(可能是半包或粘包)。 - [父类] 将这个
ByteBuf
的数据累积到内部的cumulation
缓冲区。 - [父类] 调用
callDecode
方法,callDecode
内部的while
循环开始执行。 - [父类]
callDecode
调用decodeRemovalReentryProtection
,后者最终调用子类实现的decode(ctx, cumulation, out)
方法。 - [子类]
FixedLengthFrameDecoder
的decode(ctx, in, out)
方法被调用。它内部又调用了自己的decode(ctx, in)
方法。 - [子类]
decode(ctx, in)
检查in.readableBytes()
:- 场景A (半包): 可读字节数小于
frameLength
。它返回null
。 - 场景B (数据足够): 可读字节数大于等于
frameLength
。它读取frameLength
字节,返回一个新的ByteBuf
。
- 场景A (半包): 可读字节数小于
- [子类]
decode(ctx, in, out)
拿到上一步的结果:- 场景A: 结果是
null
,它什么也不做,直接返回。 - 场景B: 结果是一个
ByteBuf
,它将这个ByteBuf
添加到out
列表中。
- 场景A: 结果是
- [父类]
callDecode
检查decode
方法执行后的状态:- 场景A:
out
列表为空,且cumulation
的可读字节数未变。callDecode
知道这是半包情况,于是break
循环,等待更多数据。 - 场景B:
out
列表不为空。callDecode
会将out
中的消息fireChannelRead
给下一个handler
,清空out
列表,然后继续while
循环,尝试从cumulation
剩余的字节中解码下一个帧(这就是处理粘包的关键)。
- 场景A:
- 循环往复,直到
cumulation
中的数据不足以构成一个frameLength
的帧为止。
总结
通过 FixedLengthFrameDecoder
这个简单的例子,我们可以清晰地看到:
- 父类
ByteToMessageDecoder
负责了所有通用的、与具体协议无关的脏活累活:字节累积、循环调用、状态管理、半包数据保留、粘包数据处理。 - 子类
FixedLengthFrameDecoder
只需聚焦于一个核心问题:“如何定义一个完整的消息帧?”。它的答案是:“当字节数达到frameLength
时”。它用短短几行代码实现了这个判断和切片逻辑。
这种设计模式是典型的模板方法模式,极大地提高了代码的复用性和可维护性,让开发者可以快速开发出针对各种私有协议的解码器,而无需关心底层的半包/粘包处理细节。