Netty Channel 详解
Netty的Channel未使用nio的实现,采用自定义实现。
Channel工作原理
Channel是Netty抽象出来的网络I/O读写相关的接口,为什么不使用JDKNIO原生的Channel而要另起炉灶呢,主要原因如下。
- JDK的
SocketChannel
和ServerSocketChannel
没有统一的Channel接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来并不方便。 - JDK的
SocketChannel
和ServerSocketChannel
的主要职责就是网络I/O操作,由于它们是SPI
类接口,由具体的虚拟机厂家来提供,所以通过继承SPI
功能类来扩展其功能的难度很大;直接实现ServerSocketChannel
和SocketChannel
抽象类,其工作量和重新开发一个新的Channel功能类是差不多的。 - Netty的
Channel
需要能够跟Netty的整体架构融合在一起,例如I/O模型、基于ChanneIPipeline
的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannel
和ServerSocketChannel
都没有提供,需要重新封装。 - 自定义的Channel,功能实现更加灵活。
基于上述4个原因,Netty重新设计了Channel接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下。
- 在Channel接口层,采用
Facade
模式进行统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供。 - Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。
- 具体实现采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一负责分配和调度,功能实现更加灵活。
Channel的主要继承关系图
类图
源码分析
公共接口
▶ AttributeMap
public interface AttributeMap {<T> Attribute<T> attr(AttributeKey<T> key);<T> boolean hasAttr(AttributeKey<T> key);
}
▶ ChannelOutboundInvoker
ChannelOutboundInvoker
是 Netty 中的一个关键接口,主要用于 定义出站(Outbound)操作的触发方法,即所有 从应用程序向网络发送数据 的操作(如写入数据、刷新缓冲区、关闭连接等)都通过该接口的方法发起。它是 Netty 异步事件驱动模型中 出站事件 的核心抽象。
ChannelOutboundInvoker
的主要用途:
(1) 统一出站操作的调用方式
- 它为所有出站操作(如
write
、flush
、connect
、disconnect
、close
等)提供了统一的调用接口。 - 无论是
Channel
、ChannelPipeline
还是ChannelHandlerContext
,只要涉及出站操作,都会通过ChannelOutboundInvoker
的方法触发。
(2) 支持链式调用和异步回调
- 所有方法返回
ChannelFuture
,允许开发者:- 链式调用(如
write().flush().addListener(...)
)。 - 添加监听器(
ChannelFutureListener
)以在操作完成时得到通知(如数据发送完成、连接关闭等)。
- 链式调用(如
(3) 解耦业务逻辑与底层 I/O 操作
- 通过该接口,Netty 将 业务代码(如消息编码、发送逻辑)与 底层 I/O 操作(如 Socket 写入)解耦,使开发者无需直接操作 Socket,而是通过 Netty 的抽象方法完成数据发送。
实现类
▶ AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);//父类 Channelprivate final Channel parent;//private final ChannelId id;//Unsafe实例;private final Unsafe unsafe;//当前Channel对应的DefaultChannelPipeline;private final DefaultChannelPipeline pipeline;private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);private final CloseFuture closeFuture = new CloseFuture(this);private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;//当前Channel注册的EventLoop;private volatile EventLoop eventLoop;private volatile boolean registered;private boolean closeInitiated;private Throwable initialCloseCause;/** Cache for the string representation of this channel */private boolean strValActive;private String strVal;
}
▶ AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {//NioSocketChannel和NioServerSocketChannel需要共用,所以定义 了一个java.nio.SocketChannel和 java.nio.ServerSocketChannel的公共父类SelectableChannel,用于设置SelectableChannel参数和进行I/O操作。private final SelectableChannel ch;//它代表了JDKSelectionKey的OP_READprotected final int readInterestOp;//Channel注册到 EventLoop后返回的选择键volatile SelectionKey selectionKey;boolean readPending;private final Runnable clearReadPendingRunnable = new Runnable() {@Overridepublic void run() {clearReadPending0();}};//连接结果private ChannelPromise connectPromise;//超时任务private ScheduledFuture<?> connectTimeoutFuture;private SocketAddress requestedRemoteAddress;
}
▶ AbstractNioByteChannel
扩展了继续写半包的能力。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);private static final String EXPECTED_TYPES =" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +StringUtil.simpleClassName(FileRegion.class) + ')';// 负责继续写半包消息 最主要的方法就是doWrite(ChannelOutboundBufferin)private final Runnable flushTask = new Runnable() {@Overridepublic void run() {((AbstractNioUnsafe) unsafe()).flush0();}};protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {Object msg = in.current();if (msg == null) {// Directly return here so incompleteWrite(...) is not called.return 0;}return doWriteInternal(in, in.current());}private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;// 检查 ByteBuf 中是否有可读字节if (!buf.isReadable()) {// 如果没有可读字节,从 ChannelOutboundBuffer 中移除该消息in.remove();return 0;}// 调用 doWriteBytes 方法将 ByteBuf 中的字节写入底层通道,并获取实际写入的字节数final int localFlushedAmount = doWriteBytes(buf);// 检查是否成功写入了字节if (localFlushedAmount > 0) {// 更新 ChannelOutboundBuffer 中消息的写入进度in.progress(localFlushedAmount);// 再次检查 ByteBuf 中是否还有可读字节if (!buf.isReadable()) {// 如果没有可读字节,说明该消息已全部写入,从 ChannelOutboundBuffer 中移除该消息in.remove();}// 返回 1 表示进行了一次写操作return 1;}} else if (msg instanceof FileRegion) {// 将消息强制转换为 FileRegion 类型FileRegion region = (FileRegion) msg;// 检查文件区域是否已经全部传输完成if (region.transferred() >= region.count()) {// 如果已经全部传输完成,从 ChannelOutboundBuffer 中移除该消息in.remove();// 返回 0 表示没有进行写操作return 0;}// 调用 doWriteFileRegion 方法将文件区域写入底层通道,并获取实际写入的字节数long localFlushedAmount = doWriteFileRegion(region);// 检查是否成功写入了字节if (localFlushedAmount > 0) {// 更新 ChannelOutboundBuffer 中消息的写入进度in.progress(localFlushedAmount);// 再次检查文件区域是否已经全部传输完成if (region.transferred() >= region.count()) {// 如果已经全部传输完成,从 ChannelOutboundBuffer 中移除该消息in.remove();}// 返回 1 表示进行了一次写操作return 1;}} else {// 正常情况下不会执行到这里,因为消息类型应该只可能是 ByteBuf 或 FileRegion// 如果出现其他类型,抛出错误throw new Error();}// 如果没有成功写入任何数据,返回 WRITE_STATUS_SNDBUF_FULL 表示尝试写入但未被操作系统接受return WRITE_STATUS_SNDBUF_FULL;}}
▶ ChannelOutboundBuffer
▶ AbstractNioMessageChannel
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {final SelectionKey key = selectionKey();final int interestOps = key.interestOps();for (;;) {// 从ChannelOutboundBuffer中弹出一条消息进行处理Object msg = in.current();if (msg == null) {// Wrote all messages.//所有消息都已经被发送完成。清除写半包标识,退出循环。if ((interestOps & SelectionKey.OP_WRITE) != 0) {key.interestOps(interestOps & ~SelectionKey.OP_WRITE);}break;}try {boolean done = false;//循环尝试写入消息,最多尝试 config().getWriteSpinCount() 次for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {if (doWriteMessage(msg, in)) {done = true;break;}}if (done) {// 若消息成功写入,从 ChannelOutboundBuffer 中移除该消息in.remove();} else {// Did not write all messages.// 若消息未成功写入,说明还有消息未完成写入。将写操作添加到 SelectionKey 的关注操作中if ((interestOps & SelectionKey.OP_WRITE) == 0) {key.interestOps(interestOps | SelectionKey.OP_WRITE);}break;}} catch (Exception e) {if (continueOnWriteError()) {in.remove(e);} else {throw e;}}}}}
AbstractNioMessageChannel
和AbstractNioByteChannel
的消息发送实现比较相似,不同之处在于:一个发送的是ByteBuf
或者FileRegion
,它们可以直接被发送。另一个发送的则是POJO对象。
▶ NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {@Overridepublic boolean isActive() {// As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed// we will also need to check if it is open.return isOpen() && javaChannel().socket().isBound();}protected void doBind(SocketAddress localAddress) throws Exception {//在JDK 7+上使用更高效的直接绑定API。if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}} @Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 尝试从当前的 ServerSocketChannel 接受一个新的客户端连接// SocketUtils.accept 是 Netty 提供的工具方法,用于处理客户端连接的接受操作SocketChannel ch = SocketUtils.accept(javaChannel());try {// 检查是否成功接受了一个新的客户端连接if (ch != null) {// 若成功接受连接,将其封装为 NioSocketChannel 实例// 构造函数传入当前的 NioServerSocketChannel 作为父通道,以及新接受的 SocketChannel//。buf 最终会在 AbstractNioMessageChannel(NioMessageUnsafe) 的 read 方法里被使用buf.add(new NioSocketChannel(this, ch));// 成功添加一个新连接,返回 1return 1;}} catch (Throwable t) {// 若在创建 NioSocketChannel 过程中发生异常,记录警告日志logger.warn("Failed to create a new channel from an accepted socket.", t);try {// 尝试关闭已接受的 SocketChannel,避免资源泄漏ch.close();} catch (Throwable t2) {// 若关闭 SocketChannel 时发生异常,记录警告日志logger.warn("Failed to close a socket.", t2);}}// 没有新连接或处理过程中出现异常,返回 0return 0;}
}
▶ NioSocketChannel
Unsafe
Unsafe
接口实际上是Channel
接口的辅助接口,它不应该被用户代码直接调用。实际的**I/O
读写**操作都是由Unsafe
接口负责完成的。
Unsafe
是定义在Channel
中的 非静态内部类,生命周期随Channel
。
类继承图
Channel.Unsafe
AbstractNioChannel.NioUnsafe
:增加了几个接口- AbstractChannel.AbstractUnsafe
AbstractChannel.AbstractUnsafe
AbstractNioChannel.AbstractNioUnsafe
AbstractNioByteChannel.NioByteUnsafe
AbstractNioMessageChannel.NioMessageUnsafe
NioSocketChannel.NioSocketChannelUnsafe
源码分析
▶ Channel.Unsafe
public interface Channel {interface Unsafe {//获取当前 Channel 的接收缓冲区分配器句柄,用于高效管理接收数据的 ByteBuf 分配(如动态调整缓冲区大小)RecvByteBufAllocator.Handle recvBufAllocHandle();//获取 Channel 绑定的本地地址(IP + 端口)。SocketAddress localAddress();//获取 Channel 连接的远程地址(IP + 端口)。SocketAddress remoteAddress();//将 Channel 注册到指定的 EventLoop(事件循环线程),用于后续 I/O 操作。void register(EventLoop eventLoop, ChannelPromise promise);void bind(SocketAddress localAddress, ChannelPromise promise);void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);void disconnect(ChannelPromise promise);void close(ChannelPromise promise);void closeForcibly();void deregister(ChannelPromise promise);//开始读取数据(触发底层 Socket 的可读事件监听)。void beginRead();//向 Channel 写入数据(消息),promise 用于异步通知写入结果。void write(Object msg, ChannelPromise promise);//刷新输出缓冲区,确保数据真正发送到网络(如 TCP 发送缓冲区)。void flush();ChannelPromise voidPromise();ChannelOutboundBuffer outboundBuffer();}
}
▶ AbstractNioChannel.NioUnsafe
public abstract class AbstractNioChannel{public interface NioUnsafe extends Unsafe {/*** Return underlying {@link SelectableChannel}*/SelectableChannel ch();/*** Finish connect*/void finishConnect();/*** Read from underlying {@link SelectableChannel}*/void read();void forceFlush();}
}
▶ AbstractChannel.AbstractUnsafe
public abstract class AbstractChannel{protected abstract class AbstractUnsafe implements Unsafe {private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);private RecvByteBufAllocator.Handle recvHandle;private boolean inFlush0;/** true if the channel has never been registered, false otherwise */private boolean neverRegistered = true;}
}
▶ AbstractNioChannel.AbstractNioUnsafe
public abstract class AbstractNioChannel{protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {}
}
▶ AbstractNioByteChannel.NioByteUnsafe
public abstract class AbstractNioByteChannel{protected class NioByteUnsafe extends AbstractNioUnsafe {}
}
▶ AbstractNioMessageChannel.NioMessageUnsafe
public abstract class AbstractNioMessageChannel{//不是 静态内部类,随 channel的生命周期。private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();// 此处开始处理 pipeline。fireChannelReadfor (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();//pipelinepipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);//pipelinepipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
}
▶ NioSocketChannel.NioSocketChannelUnsafe
public class NioSocketChannel{private final class NioSocketChannelUnsafe extends NioByteUnsafe {}
}
核心类
ChannelOutboundBuffer
ChannelOutboundBuffer
是 Netty 用于管理 Channel 出站数据(待发送数据)的核心组件,主要功能包括:
- 缓冲待发送的数据(如
write()
操作写入的消息)。 - 跟踪数据的发送状态(已写入 Socket 但未确认、完全未发送等)。
- 高效处理内存释放(避免内存泄漏)。
- 支持批量刷新(
flush()
操作)。
核心数据结构
数据存储结构
ChannelOutboundBuffer
使用 单向链表 存储待发送的数据条目(Entry
),每个 Entry
包含:
static final class Entry {//定义一个静态常量对象池 RECYCLER,用于管理 Entry 对象的复用。对象池可以减少对象创建和销毁带来的性能开销private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {@Overridepublic Entry newObject(Handle<Entry> handle) {return new Entry(handle);}});//用于管理 Entry 对象在对象池中的生命周期。Handle 提供了 recycle 方法,用于将对象返回到对象池以便复用。private final Handle<Entry> handle;//指向下一个 Entry 对象的引用,用于构建单向链表结构,方便管理出站缓冲区中的消息条目。Entry next;//存储待发送的消息对象,类型为 Object,说明可以存储任意类型的消息Object msg;//存储多个 ByteBuffer 对象的数组,用于批量处理 NIO 缓冲区。ByteBuffer[] bufs;//单个 ByteBuffer 对象,用于存储单个 NIO 缓冲区ByteBuffer buf;//用于表示消息发送操作的异步结果。当消息发送完成或失败时,可以通过该对象通知调用者。ChannelPromise promise;//记录消息发送的进度,即已经发送的字节数long progress;//记录消息的总字节数long total;//记录待发送消息的总大小,包括消息本身的大小和额外开销。int pendingSize;//记录 ByteBuffer 的数量,初始值为 -1,表示尚未计算int count = -1;//布尔标志,用于表示该消息条目是否已被取消。boolean cancelled;
}
关键字段
// 链表头节点(已写入 Socket 但未确认的数据)
private Entry flushedEntry;// 链表尾节点(最新写入但未刷新的数据)
private Entry unflushedEntry;// 总待发送字节数(用于流量控制)
private long totalPendingSize;// 是否已触发 flush 操作
private boolean flushed;
核心方法分析
添加数据(write 操作)
addMessage()
/ addFlush()
- 功能:将待发送的消息(如 ByteBuf)添加到缓冲区。
- 关键逻辑:
- 创建新的
Entry
并插入链表尾部(unflushedEntry
)。 - 更新
totalPendingSize
(用于流量控制)。 - 如果未调用过
flush()
,数据会暂存到unflushedEntry
链表。
- 创建新的
void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry = Entry.newInstance(msg, size, totalPendingSize, promise);if (unflushedEntry == null) {unflushedEntry = entry;} else {flushedEntry.next = entry; // 插入到已刷新链表的尾部}totalPendingSize += size;
}
刷新数据(flush 操作)
flush()
- 功能:将
unflushedEntry
中的数据移动到flushedEntry
链表,准备写入 Socket。 - 关键逻辑:
- 将
unflushedEntry
链表的所有节点转移到flushedEntry
链表。 - 触发底层的
Channel
的doWrite()
方法(实际写入 Socket)。
- 将
void flush() {if (unflushedEntry != null) {if (flushedEntry == null) {flushedEntry = unflushedEntry;} else {// 将 unflushedEntry 链表拼接到 flushedEntry 后面Entry tail = flushedEntry;while (tail.next != null) {tail = tail.next;}tail.next = unflushedEntry;}unflushedEntry = null;// 触发底层写入channel.flush0();}
}
数据写入 Socket(底层交互)
removeBytes()
- 功能:在 Socket 成功写入数据后,从缓冲区移除已发送的数据(释放内存)。
- 关键逻辑:
- 根据实际写入的字节数,逐个释放
Entry
中的 ByteBuf。 - 更新
totalPendingSize
。
- 根据实际写入的字节数,逐个释放
void removeBytes(long writtenBytes) {while (writtenBytes > 0) {Entry e = flushedEntry;if (e == null) break;long size = e.pendingSize;if (size <= writtenBytes) {// 整个 Entry 已发送完成,释放 ByteBufReferenceCountUtil.safeRelease(e.buf);flushedEntry = e.next;totalPendingSize -= size;writtenBytes -= size;} else {// 部分发送,更新 pendingSizee.pendingSize -= writtenBytes;totalPendingSize -= writtenBytes;writtenBytes = 0;}}
}
与 EventLoop 和 Socket 的协作
- 写入流程:
- 用户调用
channel.write(msg)
→ 数据添加到unflushedEntry
。 - 用户调用
channel.flush()
→ 数据移动到flushedEntry
,触发doWrite()
。 doWrite()
通过底层的SocketChannel
写入数据,成功后调用removeBytes()
。
- 用户调用
- 流量控制:
totalPendingSize
:用于实现 写水位线(Write Buffer Water Mark):- 如果
totalPendingSize
超过高水位线,Channel 会暂停写入(触发channelWritabilityChanged
事件)。 - 当数据被确认发送后(
removeBytes()
),水位线下降,恢复写入。
- 如果