当前位置: 首页 > news >正文

Netty Channel 详解

Netty的Channel未使用nio的实现,采用自定义实现。

Channel工作原理

Channel是Netty抽象出来的网络I/O读写相关的接口,为什么不使用JDKNIO原生的Channel而要另起炉灶呢,主要原因如下。

  • JDK的SocketChannelServerSocketChannel没有统一的Channel接口供业务开发者使用,对于用户而言,没有统一的操作视图,使用起来并不方便。
  • JDK的SocketChannelServerSocketChannel的主要职责就是网络I/O操作,由于它们是SPI类接口,由具体的虚拟机厂家来提供,所以通过继承SPI功能类来扩展其功能的难度很大;直接实现ServerSocketChannelSocketChannel抽象类,其工作量和重新开发一个新的Channel功能类是差不多的。
  • Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型、基于ChanneIPipeline的定制模型,以及基于元数据描述配置化的TCP参数等,这些JDK的SocketChannelServerSocketChannel都没有提供,需要重新封装。
  • 自定义的Channel,功能实现更加灵活。

基于上述4个原因,Netty重新设计了Channel接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下。

  • 在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供。
  • Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。
  • 具体实现采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一负责分配和调度,功能实现更加灵活。

Channel的主要继承关系图

类图

image-20250609150156788

源码分析

公共接口

▶ AttributeMap

public interface AttributeMap {<T> Attribute<T> attr(AttributeKey<T> key);<T> boolean hasAttr(AttributeKey<T> key);
}

▶ ChannelOutboundInvoker

ChannelOutboundInvoker 是 Netty 中的一个关键接口,主要用于 定义出站(Outbound)操作的触发方法,即所有 从应用程序向网络发送数据 的操作(如写入数据、刷新缓冲区、关闭连接等)都通过该接口的方法发起。它是 Netty 异步事件驱动模型中 出站事件 的核心抽象。

ChannelOutboundInvoker 的主要用途:

(1) 统一出站操作的调用方式

  • 它为所有出站操作(如 writeflushconnectdisconnectclose 等)提供了统一的调用接口。
  • 无论是 ChannelChannelPipeline 还是 ChannelHandlerContext,只要涉及出站操作,都会通过 ChannelOutboundInvoker 的方法触发。

(2) 支持链式调用和异步回调

  • 所有方法返回ChannelFuture,允许开发者:
    • 链式调用(如 write().flush().addListener(...))。
    • 添加监听器ChannelFutureListener)以在操作完成时得到通知(如数据发送完成、连接关闭等)。

(3) 解耦业务逻辑与底层 I/O 操作

  • 通过该接口,Netty 将 业务代码(如消息编码、发送逻辑)与 底层 I/O 操作(如 Socket 写入)解耦,使开发者无需直接操作 Socket,而是通过 Netty 的抽象方法完成数据发送。

实现类

▶ AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);//父类 Channelprivate final Channel parent;//private final ChannelId id;//Unsafe实例;private final Unsafe unsafe;//当前Channel对应的DefaultChannelPipeline;private final DefaultChannelPipeline pipeline;private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);private final CloseFuture closeFuture = new CloseFuture(this);private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;//当前Channel注册的EventLoop;private volatile EventLoop eventLoop;private volatile boolean registered;private boolean closeInitiated;private Throwable initialCloseCause;/** Cache for the string representation of this channel */private boolean strValActive;private String strVal;
}

▶ AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {//NioSocketChannel和NioServerSocketChannel需要共用,所以定义 了一个java.nio.SocketChannel和 java.nio.ServerSocketChannel的公共父类SelectableChannel,用于设置SelectableChannel参数和进行I/O操作。private final SelectableChannel ch;//它代表了JDKSelectionKey的OP_READprotected final int readInterestOp;//Channel注册到 EventLoop后返回的选择键volatile SelectionKey selectionKey;boolean readPending;private final Runnable clearReadPendingRunnable = new Runnable() {@Overridepublic void run() {clearReadPending0();}};//连接结果private ChannelPromise connectPromise;//超时任务private ScheduledFuture<?> connectTimeoutFuture;private SocketAddress requestedRemoteAddress;
}

▶ AbstractNioByteChannel

扩展了继续写半包的能力。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);private static final String EXPECTED_TYPES =" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +StringUtil.simpleClassName(FileRegion.class) + ')';// 负责继续写半包消息  最主要的方法就是doWrite(ChannelOutboundBufferin)private final Runnable flushTask = new Runnable() {@Overridepublic void run() {((AbstractNioUnsafe) unsafe()).flush0();}};protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {Object msg = in.current();if (msg == null) {// Directly return here so incompleteWrite(...) is not called.return 0;}return doWriteInternal(in, in.current());}private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;// 检查 ByteBuf 中是否有可读字节if (!buf.isReadable()) {// 如果没有可读字节,从 ChannelOutboundBuffer 中移除该消息in.remove();return 0;}// 调用 doWriteBytes 方法将 ByteBuf 中的字节写入底层通道,并获取实际写入的字节数final int localFlushedAmount = doWriteBytes(buf);// 检查是否成功写入了字节if (localFlushedAmount > 0) {// 更新 ChannelOutboundBuffer 中消息的写入进度in.progress(localFlushedAmount);// 再次检查 ByteBuf 中是否还有可读字节if (!buf.isReadable()) {// 如果没有可读字节,说明该消息已全部写入,从 ChannelOutboundBuffer 中移除该消息in.remove();}// 返回 1 表示进行了一次写操作return 1;}} else if (msg instanceof FileRegion) {// 将消息强制转换为 FileRegion 类型FileRegion region = (FileRegion) msg;// 检查文件区域是否已经全部传输完成if (region.transferred() >= region.count()) {// 如果已经全部传输完成,从 ChannelOutboundBuffer 中移除该消息in.remove();// 返回 0 表示没有进行写操作return 0;}// 调用 doWriteFileRegion 方法将文件区域写入底层通道,并获取实际写入的字节数long localFlushedAmount = doWriteFileRegion(region);// 检查是否成功写入了字节if (localFlushedAmount > 0) {// 更新 ChannelOutboundBuffer 中消息的写入进度in.progress(localFlushedAmount);// 再次检查文件区域是否已经全部传输完成if (region.transferred() >= region.count()) {// 如果已经全部传输完成,从 ChannelOutboundBuffer 中移除该消息in.remove();}// 返回 1 表示进行了一次写操作return 1;}} else {// 正常情况下不会执行到这里,因为消息类型应该只可能是 ByteBuf 或 FileRegion// 如果出现其他类型,抛出错误throw new Error();}// 如果没有成功写入任何数据,返回 WRITE_STATUS_SNDBUF_FULL 表示尝试写入但未被操作系统接受return WRITE_STATUS_SNDBUF_FULL;}}

▶ ChannelOutboundBuffer

▶ AbstractNioMessageChannel

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {@Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {final SelectionKey key = selectionKey();final int interestOps = key.interestOps();for (;;) {// 从ChannelOutboundBuffer中弹出一条消息进行处理Object msg = in.current();if (msg == null) {// Wrote all messages.//所有消息都已经被发送完成。清除写半包标识,退出循环。if ((interestOps & SelectionKey.OP_WRITE) != 0) {key.interestOps(interestOps & ~SelectionKey.OP_WRITE);}break;}try {boolean done = false;//循环尝试写入消息,最多尝试 config().getWriteSpinCount() 次for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {if (doWriteMessage(msg, in)) {done = true;break;}}if (done) {// 若消息成功写入,从 ChannelOutboundBuffer 中移除该消息in.remove();} else {// Did not write all messages.// 若消息未成功写入,说明还有消息未完成写入。将写操作添加到 SelectionKey 的关注操作中if ((interestOps & SelectionKey.OP_WRITE) == 0) {key.interestOps(interestOps | SelectionKey.OP_WRITE);}break;}} catch (Exception e) {if (continueOnWriteError()) {in.remove(e);} else {throw e;}}}}}

AbstractNioMessageChannelAbstractNioByteChannel的消息发送实现比较相似,不同之处在于:一个发送的是ByteBuf或者FileRegion,它们可以直接被发送。另一个发送的则是POJO对象。

▶ NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {@Overridepublic boolean isActive() {// As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed// we will also need to check if it is open.return isOpen() && javaChannel().socket().isBound();}protected void doBind(SocketAddress localAddress) throws Exception {//在JDK 7+上使用更高效的直接绑定API。if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}   @Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 尝试从当前的 ServerSocketChannel 接受一个新的客户端连接// SocketUtils.accept 是 Netty 提供的工具方法,用于处理客户端连接的接受操作SocketChannel ch = SocketUtils.accept(javaChannel());try {// 检查是否成功接受了一个新的客户端连接if (ch != null) {// 若成功接受连接,将其封装为 NioSocketChannel 实例// 构造函数传入当前的 NioServerSocketChannel 作为父通道,以及新接受的 SocketChannel//。buf 最终会在 AbstractNioMessageChannel(NioMessageUnsafe) 的 read 方法里被使用buf.add(new NioSocketChannel(this, ch));// 成功添加一个新连接,返回 1return 1;}} catch (Throwable t) {// 若在创建 NioSocketChannel 过程中发生异常,记录警告日志logger.warn("Failed to create a new channel from an accepted socket.", t);try {// 尝试关闭已接受的 SocketChannel,避免资源泄漏ch.close();} catch (Throwable t2) {// 若关闭 SocketChannel 时发生异常,记录警告日志logger.warn("Failed to close a socket.", t2);}}// 没有新连接或处理过程中出现异常,返回 0return 0;}    
}
	

▶ NioSocketChannel


Unsafe

Unsafe接口实际上是Channel接口的辅助接口,它不应该被用户代码直接调用。实际的**I/O读写**操作都是由Unsafe接口负责完成的。

Unsafe 是定义在Channel中的 非静态内部类,生命周期随Channel

类继承图

image-20250615091744903

  • Channel.Unsafe
  • AbstractNioChannel.NioUnsafe:增加了几个接口
  • AbstractChannel.AbstractUnsafe
  • AbstractChannel.AbstractUnsafe
  • AbstractNioChannel.AbstractNioUnsafe
  • AbstractNioByteChannel.NioByteUnsafe
  • AbstractNioMessageChannel.NioMessageUnsafe
  • NioSocketChannel.NioSocketChannelUnsafe

源码分析

▶ Channel.Unsafe

public interface Channel {interface Unsafe {//获取当前 Channel 的接收缓冲区分配器句柄,用于高效管理接收数据的 ByteBuf 分配(如动态调整缓冲区大小)RecvByteBufAllocator.Handle recvBufAllocHandle();//获取 Channel 绑定的本地地址(IP + 端口)。SocketAddress localAddress();//获取 Channel 连接的远程地址(IP + 端口)。SocketAddress remoteAddress();//将 Channel 注册到指定的 EventLoop(事件循环线程),用于后续 I/O 操作。void register(EventLoop eventLoop, ChannelPromise promise);void bind(SocketAddress localAddress, ChannelPromise promise);void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);void disconnect(ChannelPromise promise);void close(ChannelPromise promise);void closeForcibly();void deregister(ChannelPromise promise);//开始读取数据(触发底层 Socket 的可读事件监听)。void beginRead();//向 Channel 写入数据(消息),promise 用于异步通知写入结果。void write(Object msg, ChannelPromise promise);//刷新输出缓冲区,确保数据真正发送到网络(如 TCP 发送缓冲区)。void flush();ChannelPromise voidPromise();ChannelOutboundBuffer outboundBuffer();}
}

▶ AbstractNioChannel.NioUnsafe

public abstract class AbstractNioChannel{public interface NioUnsafe extends Unsafe {/*** Return underlying {@link SelectableChannel}*/SelectableChannel ch();/*** Finish connect*/void finishConnect();/*** Read from underlying {@link SelectableChannel}*/void read();void forceFlush();}
}

▶ AbstractChannel.AbstractUnsafe

public abstract class AbstractChannel{protected abstract class AbstractUnsafe implements Unsafe {private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);private RecvByteBufAllocator.Handle recvHandle;private boolean inFlush0;/** true if the channel has never been registered, false otherwise */private boolean neverRegistered = true;}
}

▶ AbstractNioChannel.AbstractNioUnsafe

public abstract class AbstractNioChannel{protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {}
}

▶ AbstractNioByteChannel.NioByteUnsafe

public abstract class AbstractNioByteChannel{protected class NioByteUnsafe extends AbstractNioUnsafe {}
}

▶ AbstractNioMessageChannel.NioMessageUnsafe

public abstract class AbstractNioMessageChannel{//不是 静态内部类,随 channel的生命周期。private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();// 此处开始处理 pipeline。fireChannelReadfor (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();//pipelinepipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);//pipelinepipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
}

▶ NioSocketChannel.NioSocketChannelUnsafe

public class NioSocketChannel{private final class NioSocketChannelUnsafe extends NioByteUnsafe {}
}

核心类

ChannelOutboundBuffer

ChannelOutboundBuffer 是 Netty 用于管理 Channel 出站数据(待发送数据)的核心组件,主要功能包括:

  • 缓冲待发送的数据(如 write() 操作写入的消息)。
  • 跟踪数据的发送状态(已写入 Socket 但未确认、完全未发送等)。
  • 高效处理内存释放(避免内存泄漏)。
  • 支持批量刷新flush() 操作)。

核心数据结构

数据存储结构

ChannelOutboundBuffer 使用 单向链表 存储待发送的数据条目(Entry),每个 Entry 包含:

static final class Entry {//定义一个静态常量对象池 RECYCLER,用于管理 Entry 对象的复用。对象池可以减少对象创建和销毁带来的性能开销private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {@Overridepublic Entry newObject(Handle<Entry> handle) {return new Entry(handle);}});//用于管理 Entry 对象在对象池中的生命周期。Handle 提供了 recycle 方法,用于将对象返回到对象池以便复用。private final Handle<Entry> handle;//指向下一个 Entry 对象的引用,用于构建单向链表结构,方便管理出站缓冲区中的消息条目。Entry next;//存储待发送的消息对象,类型为 Object,说明可以存储任意类型的消息Object msg;//存储多个 ByteBuffer 对象的数组,用于批量处理 NIO 缓冲区。ByteBuffer[] bufs;//单个 ByteBuffer 对象,用于存储单个 NIO 缓冲区ByteBuffer buf;//用于表示消息发送操作的异步结果。当消息发送完成或失败时,可以通过该对象通知调用者。ChannelPromise promise;//记录消息发送的进度,即已经发送的字节数long progress;//记录消息的总字节数long total;//记录待发送消息的总大小,包括消息本身的大小和额外开销。int pendingSize;//记录 ByteBuffer 的数量,初始值为 -1,表示尚未计算int count = -1;//布尔标志,用于表示该消息条目是否已被取消。boolean cancelled;
}
关键字段
// 链表头节点(已写入 Socket 但未确认的数据)
private Entry flushedEntry;// 链表尾节点(最新写入但未刷新的数据)
private Entry unflushedEntry;// 总待发送字节数(用于流量控制)
private long totalPendingSize;// 是否已触发 flush 操作
private boolean flushed;
核心方法分析
添加数据(write 操作)

addMessage() / addFlush()

  • 功能:将待发送的消息(如 ByteBuf)添加到缓冲区。
  • 关键逻辑:
    • 创建新的 Entry 并插入链表尾部(unflushedEntry)。
    • 更新 totalPendingSize(用于流量控制)。
    • 如果未调用过 flush(),数据会暂存到 unflushedEntry 链表。
void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry = Entry.newInstance(msg, size, totalPendingSize, promise);if (unflushedEntry == null) {unflushedEntry = entry;} else {flushedEntry.next = entry; // 插入到已刷新链表的尾部}totalPendingSize += size;
}
刷新数据(flush 操作)

flush()

  • 功能:将 unflushedEntry 中的数据移动到 flushedEntry 链表,准备写入 Socket。
  • 关键逻辑:
    • unflushedEntry 链表的所有节点转移到 flushedEntry 链表。
    • 触发底层的 ChanneldoWrite() 方法(实际写入 Socket)。
void flush() {if (unflushedEntry != null) {if (flushedEntry == null) {flushedEntry = unflushedEntry;} else {// 将 unflushedEntry 链表拼接到 flushedEntry 后面Entry tail = flushedEntry;while (tail.next != null) {tail = tail.next;}tail.next = unflushedEntry;}unflushedEntry = null;// 触发底层写入channel.flush0();}
}
数据写入 Socket(底层交互)

removeBytes()

  • 功能:在 Socket 成功写入数据后,从缓冲区移除已发送的数据(释放内存)。
  • 关键逻辑:
    • 根据实际写入的字节数,逐个释放 Entry 中的 ByteBuf。
    • 更新 totalPendingSize
void removeBytes(long writtenBytes) {while (writtenBytes > 0) {Entry e = flushedEntry;if (e == null) break;long size = e.pendingSize;if (size <= writtenBytes) {// 整个 Entry 已发送完成,释放 ByteBufReferenceCountUtil.safeRelease(e.buf);flushedEntry = e.next;totalPendingSize -= size;writtenBytes -= size;} else {// 部分发送,更新 pendingSizee.pendingSize -= writtenBytes;totalPendingSize -= writtenBytes;writtenBytes = 0;}}
}

与 EventLoop 和 Socket 的协作

  1. 写入流程
    • 用户调用 channel.write(msg) → 数据添加到 unflushedEntry
    • 用户调用 channel.flush() → 数据移动到 flushedEntry,触发 doWrite()
    • doWrite() 通过底层的 SocketChannel 写入数据,成功后调用 removeBytes()
  2. 流量控制
    • totalPendingSize:用于实现 写水位线(Write Buffer Water Mark):
      • 如果 totalPendingSize 超过高水位线,Channel 会暂停写入(触发 channelWritabilityChanged 事件)。
      • 当数据被确认发送后(removeBytes()),水位线下降,恢复写入。

相关文章:

  • 反无人机系统:技术利刃如何守护低空安全?
  • 无人机表演越来越火,C端市场大爆发
  • Unity Addressable使用之服务器远程加载
  • kolla安装openstack
  • Maven通过修改pom.xml配置文件下载指定依赖包,以及解决MVNRepository网站加载和验证问题的方法
  • Modbus TCP转Profibus DP网关接JF-600MT称重变送器到西门子S7-300plc系统
  • React 和 Vue 项目中集成基于 Svelte 的 `Bytemd` 库 || @bytemd/react` 底层实现原理
  • Web Worker技术详解与应用场景
  • 【JS-4.4-键盘常用事件】深入理解DOM键盘事件:提升用户交互体验的关键
  • 策略设计模式
  • 安卓对外发布工程源码:怎么做到仅UI层公布
  • React Next快速搭建前后端全栈项目并部署至Vercel
  • 【教程】脚本方式安装pip
  • 柯尼卡美能达Konica Minolta bizhub 750i打印机信息
  • 基于Docker本地化搭建部署Dify
  • 黑马python(十三)
  • Python中使用RK45方法求解微分方程的详细指南
  • 九联UNT403G/UNT413G-国科GK6323V100C-2+8G/4+16G-安卓9.0-优盘短接强刷固件包
  • 编程江湖-设计模式
  • Element表格表头合并技巧
  • 网站详情页/百度人工优化
  • 汽车网站代码/app有哪些推广方式
  • 建设局官方网站/百度seo代理
  • 工控人如何做自己的网站/搜索引擎营销题库和答案
  • 公司网站建设公司排名/营销渠道策划方案
  • 网站设计要如何做支付功能/网站整站优化公司