分布式专题——35 Netty的使用和常用组件辨析
1 简述
-
Netty 是一个基于 Java 的高性能、异步事件驱动的网络应用框架,被广泛用于开发网络应用程序,如网络服务器和客户端;
-
本次讲解基于 Netty 4.1.42.Final 版本:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.42.Final</version><scope>compile</scope> </dependency>
-
Netty 的优势
- API 使用简单,降低开发门槛
- 功能强大,预置多种编解码功能,支持多种主流协议
- 定制能力强,可通过
ChannelHandler
灵活扩展通信框架 - 性能高,与业界主流 NIO 框架对比,综合性能最优
- 成熟、稳定,修复了所有已发现的 JDK NIO BUG,让业务开发人员无需为 NIO 的 BUG 烦恼
- 社区活跃,版本迭代周期短,BUG 能被及时修复,还会加入更多新功能
- 经历了大规模商业应用考验,质量得到验证
-
为什么不用 Netty5?:Netty5 已经停止开发;
-
Netty 使用 NIO 而非 AIO 的原因:
-
Netty 不看重 Windows 上的使用,在 Linux 系统上,AIO 底层仍使用
EPOLL
,未很好实现 AIO,性能无明显优势,且被 JDK 封装一层,不易深度优化; -
AIO 接收数据需要预先分配缓存,而 NIO 是接收时才分配缓存,对于连接数量大但流量小的情况,AIO 内存浪费多;
-
Linux 上 AIO 不够成熟,处理回调结果的速度跟不上处理需求;
-
作者表示在类 Unix 系统上 AIO 不比 NIO(
epoll
)快,还存在无数据报支持、线程模型不必要(抽象过多却无实际使用场景)等问题。原话:Not faster than NIO (epoll) on unix systems (which is true) There is no daragram suppport Unnecessary threading model (too much abstraction without usage)
-
-
为什么不用 Mina?:Mina 几乎不再更新,且 Netty 本身就是因为 Mina 不够好才被开发出来的。
2 第一个 Netty 程序
2.1 核心知识点
-
Bootstrap:是 Netty 框架的启动类和主入口类,分为客户端类
Bootstrap
和服务器类ServerBootstrap
两种,用于初始化 Netty 应用并启动服务或建立客户端连接; -
Channel:
- 是 Java NIO 的一个基本构造,代表一个到实体(如硬件设备、文件、网络套接字或能执行 I/O 操作的程序组件)的开放连接,可进行读、写等操作;
- 也可看作传入(入站)或传出(出站)数据的载体,能被打开、关闭,以及进行连接、断开连接等操作;
-
EventLoop 和 EventLoopGroup:
EventLoop
暂时可看成一个线程,负责处理 Channel 上的 I/O 操作和事件;EventLoopGroup
自然可看成线程组,管理多个EventLoop
,用于分发和处理 I/O 事件等任务;
-
事件:Netty 使用不同事件通知状态改变或操作状态,基于发生的事件触发适当动作。事件按与入站或出站数据流的相关性分类;
- 入站事件:可能由入站数据或相关状态更改触发,包括连接已被激活/连接失活、数据读取、用户事件、错误事件等;
- 出站事件:是未来将会触发的某个动作的操作结果,动作包括打开/关闭到远程节点的连接、将数据写到/冲刷到套接字等;
-
ChannelHandler:
- 每个事件都可被分发给
ChannelHandler
类中用户实现的方法; - 由于事件分为入站和出站,处理事件的
ChannelHandler
也被分为处理入站事件的Handler
、处理出站事件的Handler
,部分Handler
既可以处理入站也可以处理出站事件; - Netty 提供了大量预定义的开箱即用的
ChannelHandler
实现,涵盖各种协议(如 HTTP 和 SSL/TLS)相关的ChannelHandler
; - 基于 Netty 的网络应用程序会根据业务需求,使用 Netty 已提供的
ChannelHandler
或自行开发ChannelHandler
;
- 每个事件都可被分发给
-
ChannelPipeline:
ChannelHandler
都放在ChannelPipeline
中统一管理,事件在ChannelPipeline
中流动,并被其中一个或多个ChannelHandler
处理,它是ChannelHandler
的容器,负责组织和管理ChannelHandler
的执行顺序和交互; -
ChannelFuture
-
Netty 中所有的 I/O 操作都是异步的,异步意味着不需要主动等待结果返回,而是通过状态通知、回调函数等手段获取结果;
-
JDK 预置了
java.util.concurrent.Future
接口,提供了在操作完成时通知应用程序的方式,可看作异步操作结果的占位符,会在未来某个时刻完成并提供结果访问,但它的实现只允许手动检查操作是否完成,或一直阻塞直到完成,使用繁琐; -
因此 Netty 提供了自己的实现
ChannelFuture
,用于在执行异步操作时使用。一般来说,每个 Netty 的出站 I/O 操作都将返回一个ChannelFuture
,通过它可以更方便地对异步操作的结果进行监听和处理。
-
2.2 请求-响应(Echo)示例
2.2.1 Netty 客户端的启动和配置类
public class EchoClient {private final int port;private final String host;public EchoClient(int port, String host) {this.port = port;this.host = host;}public void start() throws InterruptedException {// 创建事件循环组,用于处理客户端的IO操作和任务EventLoopGroup group = new NioEventLoopGroup();try {// 创建客户端启动引导类Bootstrap b = new Bootstrap();b.group(group)// 指定使用NIO的Socket通道类型.channel(NioSocketChannel.class)// 设置要连接的服务器的地址和端口.remoteAddress(new InetSocketAddress(host,port))// 配置通道的处理器链,添加自定义的业务处理器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 将自定义的客户端处理器添加到管道中ch.pipeline().addLast(new EchoClientHandler());}});// 异步连接到服务器,sync()会等待连接完成ChannelFuture f = b.connect().sync();// 等待客户端通道关闭,保持程序运行f.channel().closeFuture().sync();} finally {// 优雅关闭事件循环组,释放资源group.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {// 创建客户端实例并启动,连接本地9999端口new EchoClient(9999,"127.0.0.1").start();}
}
2.2.2 客户端业务逻辑处理器
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {// 当从服务器接收到数据时自动调用此方法// SimpleChannelInboundHandler会自动释放ByteBuf资源@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {// 将接收到的ByteBuf转换为字符串并打印到控制台System.out.println("客户端接收到服务器消息: " + msg.toString(CharsetUtil.UTF_8));// 如果需要处理完一次响应就关闭连接,可以取消注释下面这行// ctx.close();}// 当客户端与服务器的连接建立成功并激活时调用此方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 连接建立后立即向服务器发送一条消息// Unpooled.copiedBuffer 将字符串转换为Netty的ByteBufctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty", CharsetUtil.UTF_8));// 以下是其他写入方式:// ctx.pipeline().write() - 从管道头部开始处理// ctx.channel().write() - 从管道尾部开始处理// ctx.alloc().buffer() - 从内存分配器获取一个新的ByteBuf// writeAndFlush = write() + flush(),确保数据立即发送}// 还可以重写其他方法:// channelInactive() - 连接断开时调用// exceptionCaught() - 发生异常时调用
}
2.2.3 Netty 服务器的启动和配置类
public class EchoServer {// 创建日志记录器,用于输出服务器运行日志private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);private final int port; // 服务器监听的端口号public EchoServer(int port) {this.port = port;}public static void main(String[] args) throws InterruptedException {int port = 9999; // 设置服务器监听端口为9999EchoServer echoServer = new EchoServer(port);LOG.info("服务器即将启动");echoServer.start(); // 启动服务器LOG.info("服务器关闭"); // 这行会在服务器关闭后执行}public void start() throws InterruptedException {// 创建主从Reactor线程组,用于处理连接和IO事件EventLoopGroup group = new NioEventLoopGroup();try {// 创建服务器启动引导类,用于配置服务器参数ServerBootstrap b = new ServerBootstrap();b.group(group)// 指定使用NIO的服务器Socket通道类型.channel(NioServerSocketChannel.class)// 设置服务器绑定的本地地址和端口.localAddress(new InetSocketAddress(port))// 为每个新连接的客户端通道配置处理器.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 为每个客户端连接添加自定义的业务处理器ch.pipeline().addLast(new EchoServerHandler());}});// 异步绑定端口启动服务器,sync()会等待绑定操作完成ChannelFuture f = b.bind().sync();LOG.info("服务器启动完成。");// 阻塞当前线程,等待服务器通道关闭(保持服务器运行)// 当调用close()方法时,这个future会完成,程序继续执行f.channel().closeFuture().sync();} finally {// 优雅关闭事件循环组,释放所有资源// 包括关闭所有通道、取消所有任务等group.shutdownGracefully().sync();}}
}
2.2.4 服务器端业务逻辑处理器
public class EchoServerHandler extends ChannelInboundHandlerAdapter {// 当从客户端接收到数据时调用此方法@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 将接收到的消息转换为ByteBuf类型ByteBuf in = (ByteBuf)msg;// 将ByteBuf内容转换为字符串并打印到控制台System.out.println("服务器接收到客户端消息: " + in.toString(CharsetUtil.UTF_8));// 将接收到的数据原样写回给客户端(回声功能)// writeAndFlush会确保数据被写入并刷新到网络ctx.writeAndFlush(in);// 如果需要处理完消息后关闭连接,可以取消注释下面这行// ctx.close();}// 当有新的客户端连接建立时调用此方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 打印连接建立日志System.out.println("新的客户端连接已建立");// 调用父类方法,确保其他可能的处理逻辑也能执行super.channelActive(ctx);}// 当处理过程中发生异常时调用此方法@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 打印异常堆栈信息,便于调试cause.printStackTrace();// 关闭发生异常的连接,防止资源泄漏ctx.close();}// 还可以重写其他方法:// channelInactive() - 连接断开时调用// channelReadComplete() - 数据读取完成时调用
}
2.2.5 流程
- 服务器启动:
EchoServer
启动并监听 9999 端口 - 客户端启动:
EchoClient
连接服务器 - 客户端发送消息:连接建立后,客户端发送 “Hello,Netty”
- 服务器回声:服务器收到消息后,原样返回给客户端
- 客户端打印回声:客户端收到服务器返回的消息并打印
3 Netty 组件
3.1 EventLoop
与EventLoopGroup
3.1.1 讲解
-
在 NIO 中,通过
while
循环select
出事件,再依次处理,这一“事件循环”机制就是EventLoop
的核心逻辑体现。io.netty.channel.EventLoop
定义了 Netty 的核心抽象,用于处理网络连接生命周期中发生的各类事件; -
一个
EventLoop
由一个永远不会改变的Thread
驱动,同时任务(Runnable
或者Callable
)可以直接提交给EventLoop
实现,以立即执行或者调度执行; -
相关包与接口扩展:
-
io.netty.util.concurrent
包构建在 JDK 的java.util.concurrent
包基础之上; -
io.netty.channel
包中的类,为了能与Channel
的事件进行交互,对java.util.concurrent
里的接口、类进行了扩展;
-
-
从下面类图中可以看到相关类的继承与实现关系:
-
最上层是
Executor
,它是 Java 并发编程中执行任务的最基础接口; -
ExecutorService
继承自Executor
,扩展了更多管理任务执行的方法,比如关闭线程池、提交有返回值的任务等; -
EventExecutorGroup
同时继承了ExecutorService
和Iterable
,还关联ScheduledExecutorService
,它是对线程池相关功能在 Netty 场景下的进一步封装,用于管理一组EventExecutor
; -
EventExecutor
继承自EventExecutorGroup
,更侧重于单个执行器的功能,具备有序处理事件等特性; -
OrderedEventExecutor
继承自EventExecutor
,强调事件处理的有序性; -
EventLoop
继承自OrderedEventExecutor
和EventLoopGroup
,是 Netty 中处理 I/O 事件和任务的核心组件,每个EventLoop
绑定一个线程,负责处理对应的Channel
上的事件; -
EventLoopGroup
继承自EventExecutorGroup
,它是EventLoop
的组管理器,用于管理多个EventLoop
,在 Netty 服务启动时,负责分配EventLoop
来处理客户端连接等任务。
-
3.1.2 线程的分配
-
服务于
Channel
的 I/O 和事件的EventLoop
包含在EventLoopGroup
中; -
线程分配机制:
-
异步传输下的共享:异步传输实现只使用少量的
EventLoop
(以及和它们相关联的Thread
),在当前线程模型下,这些EventLoop
可能会被多个Channel
所共享。这使得可以通过尽可能少量的Thread
来支撑大量的Channel
,而不是为每个Channel
分配一个Thread
; -
分配方式:
EventLoopGroup
负责为每个新创建的Channel
分配一个EventLoop
。当前实现中,使用顺序循环(round-robin)的方式进行分配,以获取均衡的分布,并且相同的EventLoop
可能会被分配给多个Channel
; -
生命周期绑定:一旦一个
Channel
被分配给一个EventLoop
,它将在整个生命周期中都使用这个EventLoop
(以及相关联的Thread
);
-
-
对
ThreadLocal
使用的影响:- 因为一个
EventLoop
通常会被用于支撑多个Channel
,所以对于所有相关联的Channel
来说,ThreadLocal
都是一样的,这使得ThreadLocal
对于实现状态追踪等功能来说是个糟糕的选择; - 不过,在一些无状态的上下文中,它仍然可以被用于在多个
Channel
之间共享一些重度的或者代价昂贵的对象,甚至是事件。
- 因为一个
3.1.3 线程管理
-
当向
EventLoop
提交任务时,会先检查当前调用线程是否是支撑该EventLoop
的线程:-
是:所提交的代码块会被直接执行;
-
不是:
EventLoop
会调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop
下次处理它的事件时,会执行队列中的那些任务/事件;
-
-
流程步骤:
-
提交任务:将要在
EventLoop
执行的任务,通过Channel.eventLoop().execute(Task)
方法传递给EventLoop
; -
线程检查:在把任务传递给
execute
方法之后,执行检查以确定当前调用线程是否就是分配给EventLoop
的那个线程; -
相同线程的处理:如果是相同的线程,则在
EventLoop
中,可以直接执行任务; -
不同线程的处理:如果线程不是
EventLoop
的那个线程,则将任务放入队列以便EventLoop
下一次处理它的事件时执行;
-
3.2 Netty 网络抽象代表
3.2.1 讲解
-
Netty 网络抽象代表
Channel
:对应 Socket,是 Netty 网络操作的基础,用于表示网络连接等;EventLoop
:负责控制流、多线程处理和并发,管理Channel
的 I/O 事件等;ChannelFuture
:用于异步通知,因为 Netty 中 I/O 操作是异步的,通过它可以获取异步操作的结果等;
-
Channel 和 EventLoop 关系:
Channel
需要注册到某个EventLoop
上,在其整个生命周期内都由该EventLoop
处理 I/O 事件,即一个Channel
与一个EventLoop
绑定,但一个EventLoop
可同时被多个Channel
绑定;
3.2.2 Channel 接口
- 基本 I/O 操作与优势
- 基本 I/O 操作(
bind()
、connect()
、read()
、write()
)依赖底层网络传输原语,在 Java 网络编程中基于Socket
,Netty 的Channel
接口提供的 API 用于所有 I/O 操作,降低了直接使用Socket
类的复杂性,且Channel
是拥有许多预定义、专门化实现的广泛类层次结构的根; - 因
Channel
独一无二,为保证顺序,被声明为java.lang.Comparable
的子接口,若两个不同Channel
实例返回相同散列码,AbstractChannel
中的compareTo()
方法会抛出Error
;
- 基本 I/O 操作(
- Channel 的生命周期状态
ChannelUnregistered
:Channel
已创建,但未注册到EventLoop
ChannelRegistered
:Channel
已注册到EventLoop
ChannelActive
:Channel
处于活动状态(已连接到远程节点),可接收和发送数据ChannelInactive
:Channel
未连接到远程节点- 这些状态改变时会生成对应事件,转发给
ChannelPipeline
中的ChannelHandler
以作响应,编程中更关注ChannelActive
和ChannelInactive
- 重要 Channel 的方法
eventLoop
:返回分配给Channel
的EventLoop
pipeline
:返回Channel
的ChannelPipeline
,每个Channel
都有自己的ChannelPipeline
isActive
:若Channel
活动则返回true
,活动意义依底层传输而定,如Socket
传输连接到远程节点为活动,Datagram
传输打开为活动localAddress
:返回本地的SocketAddress
remoteAddress
:返回远程的SocketAddress
write
:将数据写到远程节点,仅写往 Netty 内部缓存,未真正写往 socketflush
:将之前已写的数据冲刷到底层 socket 进行传输writeAndFlush
:简便方法,等同于调用write()
后接着调用flush()
3.3 ChannelPipeline
与ChannelHandlerContext
3.3.1 ChannelPipeline 接口
-
当
Channel
被创建时,会自动分配一个新的ChannelPipeline
,每个Channel
都有自己的ChannelPipeline
,且这种关联是永久性的,在 Netty 组件生命周期中是固定操作,无需开发人员干预; -
ChannelPipeline
提供了ChannelHandler
链的容器,定义了用于在该链上传播**入站(从网络到业务处理)和出站(从业务处理到网络)**各种事件流的 API,代码中的ChannelHandler
都放在ChannelPipeline
中; -
ChannelHandler
的工作是让事件流经ChannelPipeline
,它们在应用程序初始化或引导阶段被安装。ChannelHandler
对象接收事件、执行自身实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler
,同时也可拦截事件阻止其继续传递,其执行顺序由添加顺序决定;
3.3.2 ChannelHandler 的生命周期
- 在
ChannelHandler
被添加到ChannelPipeline
中或者被从ChannelPipeline
中移除时,会调用以下方法,每个方法都接收一个ChannelHandlerContext
参数:-
handlerAdded
:当把ChannelHandler
添加到ChannelPipeline
中时被调用 -
handlerRemoved
:当从ChannelPipeline
中移除ChannelHandler
时被调用 -
exceptionCaught
:当处理过程中在ChannelPipeline
中有错误产生时被调用
-
3.3.3 ChannelPipeline
中的 ChannelHandler
-
入站和出站
ChannelHandler
被安装到同一个ChannelPipeline
中,ChannelPipeline
以双向链表的形式进行维护管理;-
比如要在网络上传递数据,有这些需求:
- 加密
- 压缩(因为加密后得到的密文较大)
- 按照业务要求检查报文中携带的用户信息是否合法
-
于是实现了 5 个Handler:解压(入)Handler、压缩(出)handler、解密(入) Handler、加密(出) Handler、授权(入)Handler
-
-
事件流动(入站):当一个消息或其他入站事件被读取时,会从
ChannelPipeline
的头部开始流动,且只被处理入站事件的Handler
处理(即解压(入)Handler、解密(入) Handler、授权(入)Handler),最终数据到达ChannelPipeline
的尾端,处理结束; -
事件流动(出站):数据的出站运动(正在被写的数据),概念上与入站类似,数据从链的尾端开始流动,只被处理出站事件的
Handler
处理(即加密(出) Handler、压缩(出)Handler),直到到达链的头部,之后出站数据到达网络传输层(Socket); -
Netty 的区分能力:Netty 能区分入站事件的
Handler
和出站事件的Handler
,确保数据只会在具有相同定向类型的两个ChannelHandler
之间传递;AbstractChannelHandlerContext.java
中的findContextOutbound
方法用于查找处理出站事件的上下文;ChannelHandlerMask.java
中定义了MASK_ALL_INBOUND
(所有入站事件掩码)和MASK_ALL_OUTBOUND
(所有出站事件掩码);- 这些都为 Netty 区分和处理入站、出站事件提供了底层支持;
-
所以在编写 Netty 应用程序时要注意
ChannelHandler
的顺序问题:-
不同方向(入站与出站):在编写 Netty 应用程序时,分属出站和入站不同的
Handler
,在业务没特殊要求的情况下顺序无所谓。例如“压缩(出)handler”可以放在“解压(入)handler”和“解密(入)Handler”中间,也可以放在“解密(入) Handler”和“授权(入) Handler”之间; -
同方向:同属一个方向(入站或出站)的
Handler
是有顺序的,因为上一个Handler
处理的结果往往是下一个Handler
所需的输入。比如入站处理,收到的数据需先解压得到密文,再解密,最后拿到明文中的用户信息进行授权检查,所以解压→解密→授权这三个入站Handler
的顺序不能乱;
-
3.3.4 ChannelPipeline上的方法
-
ChannelPipeline
以双向链表形式维护管理ChannelHandler
,提供了一系列对ChannelHandler
进行操作的方法:-
添加相关:
addFirst
、addBefore
、addAfter
、addLast
,用于将ChannelHandler
添加到ChannelPipeline
中不同位置; -
移除相关:
remove
,用于将ChannelHandler
从ChannelPipeline
中移除; -
替换相关:
replace
,用于将ChannelPipeline
中的一个ChannelHandler
替换为另一个ChannelHandler
; -
获取相关:
get
,通过类型或者名称返回ChannelHandler
;context
,返回和ChannelHandler
绑定的ChannelHandlerContext
;names
,返回ChannelPipeline
中所有ChannelHandler
的名称;
-
-
此外,
ChannelPipeline
的 API 还公开了用于调用入站和出站操作的附加方法。
3.3.5 ChannelHandlerContext
-
ChannelHandlerContext
代表了ChannelHandler
和ChannelPipeline
之间的关联- 每当
ChannelHandler
添加到ChannelPipeline
中时,都会创建ChannelHandlerContext
; - 它的主要作用类似
LinkedList
内部的Node
类,用于维护ChannelHandler
在双向链表中的前后指针(pre
和next
); - 同时,它还提供了很多方法,比如让事件从当前
ChannelHandler
传递给链中的下一个ChannelHandler
,获取底层的Channel
,以及用于写出站数据等;
- 每当
-
ChannelHandlerContext
有很多方法也存在于Channel
和ChannelPipeline
上,但存在重要不同;- 调用
Channel
或者ChannelPipeline
上的这些方法,事件将沿着整个ChannelPipeline
进行传播; - 而调用
ChannelHandlerContext
上的相同方法,事件则从当前所关联的ChannelHandler
开始,且只会传播给位于该ChannelPipeline
中的下一个(入站下一个,出站上一个)能够处理该事件的ChannelHandler
;
- 举例来说,服务器收到对端报文,解压后解密失败需应答;
- 若因加密算法不一致,应答报文以明文压缩格式发送,可在解密
handler
中用ctx.write
,应答报文只经过压缩Handler
发往对端; - 其他情况,应答报文以加密和压缩格式发送,可在解密
handler
中用channel.write()
或者channelpipeline.write()
,应答报文会流经整个出站处理过程;
- 若因加密算法不一致,应答报文以明文压缩格式发送,可在解密
- 调用
3.3.6 ChannelHandlerContext 的API
- 资源与连接相关:
alloc
:返回和实例相关联的Channel
所配置的ByteBufAllocator
bind
:绑定到给定的SocketAddress
并返回ChannelFuture
channel
:返回绑定到实例的Channel
close
:关闭Channel
并返回ChannelFuture
connect
:连接给定的SocketAddress
并返回ChannelFuture
deregister
:从之前分配的EventExecutor
注销并返回ChannelFuture
disconnect
:从远程节点断开并返回ChannelFuture
- 执行器与处理器相关:
executor
:返回调度事件的EventExecutor
handler
:返回绑定到实例的ChannelHandler
isRemoved
:判断所关联的ChannelHandler
是否已从ChannelPipeline
中移除,若移除则返回true
name
:返回实例的唯一名称pipeline
:返回实例所关联的ChannelPipeline
- 事件触发相关:
fireChannelActive
:触发对下一个ChannelInboundHandler
上的channelActive()
方法(已连接)的调用fireChannelInactive
:触发对下一个ChannelInboundHandler
上的channelInactive()
方法(已关闭)的调用fireChannelRead
:触发对下一个ChannelInboundHandler
上的channelRead()
方法(已接收的消息)的调用fireChannelReadComplete
:触发对下一个ChannelInboundHandler
上的channelReadComplete()
方法的调用fireChannelRegistered
:触发对下一个ChannelInboundHandler
上的fireChannelRegistered()
方法的调用fireChannelUnregistered
:触发对下一个ChannelInboundHandler
上的fireChannelUnregistered()
方法的调用fireChannelWritabilityChanged
:触发对下一个ChannelInboundHandler
上的fireChannelWritabilityChanged()
方法的调用fireExceptionCaught
:触发对下一个ChannelInboundHandler
上的fireExceptionCaught(Throwable)
方法的调用fireUserEventTriggered
:触发对下一个ChannelInboundHandler
上的fireUserEventTriggered(Object evt)
方法的调用
- 读写相关:
read
:将数据从Channel
读取到第一个入站缓冲区,读取成功则触发channelRead
事件,并在最后一个消息被读取完成后通知ChannelInboundHandler
的channelReadComplete(ctx)
方法write
:通过实例写入消息并经过ChannelPipeline
writeAndFlush
通过实例写入并冲刷消息并经过ChannelPipeline
- 使用要点:
ChannelHandlerContext
和ChannelHandler
之间的关联(绑定)永远不会改变,所以缓存对它的引用是安全的;- 对于其他类的同名方法,
ChannelHandlerContext
的方法将产生更短的事件流,应尽可能利用这个特性来获得最大的性能。
3.4 ChannelHandler
3.4.1 ChannelHandler 接口
- 从应用程序开发角度看,
ChannelHandler
是 Netty 的主要组件,充当处理入站和出站数据的应用程序逻辑容器,其方法由网络事件触发,可用于多种动作,如数据格式转换(编解码)、处理转换异常等; - 重要子接口
ChannelInboundHandler
:- 常被实现的子接口,接收入站事件和数据,这些数据会被应用程序业务逻辑处理;
- 也可从该接口直接冲刷数据输出到对端,应用程序业务逻辑通常在一个或多个
ChannelInboundHandler
中实现;
ChannelOutboundHandler
:处理出站数据且允许拦截所有操作。
3.4.2 ChannelInboundHandler 接口
-
下面列出了该接口的生命周期方法,这些方法在数据被接收或对应
Channel
状态变化时调用,与Channel
生命周期密切相关:-
channelRegistered
:当Channel
已注册到它的EventLoop
且能够处理 I/O 时被调用 -
channelUnregistered
:当Channel
从它的EventLoop
注销且无法处理任何 I/O 时被调用 -
channelActive
:当Channel
处于活动状态时被调用,此时Channel
已连接/绑定且就绪 -
channelInactive
:当Channel
离开活动状态且不再连接它的远程节点时被调用 -
channelReadComplete
:当Channel
上的一个读操作完成时被调用 -
channelRead
:当从Channel
读取数据时被调用 -
ChannelWritabilityChanged
:当Channel
的可写状态发生改变时被调用,可通过Channel
的isWritable()
方法检测可写性,与可写性相关的阈值可通过Channel.config().setWriteHighWaterMark()
和Channel.config().setWriteLowWaterMark()
方法设置 -
userEventTriggered
:当ChannelInboundHandler.fireUserEventTriggered()
方法被调用时被调用
-
-
注意:
channelReadComplete
和channelRead
两者区别后续会解释。
3.4.3 ChannelOutboundHandler 接口
- 出站操作和数据由
ChannelOutboundHandler
处理,其方法会被Channel
、ChannelPipeline
以及ChannelHandlerContext
调用,自身定义的方法如下:-
bind(ChannelHandlerContext, SocketAddress, ChannelPromise)
:当请求将Channel
绑定到本地地址时被调用 -
connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
:当请求将Channel
连接到远程节点时被调用 -
disconnect(ChannelHandlerContext, ChannelPromise)
:当请求将Channel
从远程节点断开时被调用 -
close(ChannelHandlerContext, ChannelPromise)
:当请求关闭Channel
时被调用 -
deregister(ChannelHandlerContext, ChannelPromise)
:当请求将Channel
从它的EventLoop
注销时被调用 -
read(ChannelHandlerContext)
:当请求从Channel
读取更多的数据时被调用 -
flush(ChannelHandlerContext)
:当请求通过Channel
将入队数据冲刷到远程节点时被调用 -
write(ChannelHandlerContext, Object, ChannelPromise)
:当请求通过Channel
将数据写到远程节点时被调用
-
3.4.4 ChannelHandler的适配器
-
存在一些适配器类,可降低编写自定义
ChannelHandler
的工作量,因为它们提供了对应接口中所有方法的默认实现。由于有时会忽略不感兴趣的事件,Netty 提供了抽象基类ChannelInboundHandlerAdapter
(处理入站)和ChannelOutboundHandlerAdapter
(处理出站); -
可使用
ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
类作为自己ChannelHandler
的起始点,这两个适配器分别提供了ChannelInboundHandler
和ChannelOutboundHandler
的基本实现,通过扩展抽象类ChannelHandlerAdapter
,它们获得了共同超接口ChannelHandler
的方法;
3.4.5 OutboundHandler的read方法
ChannelOutboundHandler
虽处理出站事件,但其中的read
方法并非表示读数据,而是业务发出读数据的要求,该要求会封装为出站事件传播,触发ChannelOutboundHandler
中的read
方法;- 若 Handler 既要处理入站又要处理出站事件:
- 可使用
ChannelDuplexHandler
类; - 也可同时实现
ChannelOutboundHandler
和ChannelInboundHandler
接口,但相对麻烦。
- 可使用
3.4.6 Handler的共享和并发安全性
ChannelHandlerAdapter
提供isSharable()
方法,若实现被标注为Sharable
,该方法返回true
,表示可添加到多个ChannelPipeline
;- 每个
SocketChannel
有自己的pipeline
,且每个SocketChannel
与线程绑定,所以新创建的 Handler 实例之间完全独立,只要不共享全局变量,实例是线程安全的; - 业务若需多个
SocketChannel
共享一个 Handler 实例(如统计服务器接收和发出的业务报文总数),可实现MessageCountHandler
,并使用 Netty 的@Sharable
注解,安装时共用一个实例。由于实例共享,实现统计功能时需注意线程安全,可使用 Java 并发编程里的Atomic
类保证。
3.4.7 资源管理和SimpleChannelInboundHandler
-
NIO 中接收和发送网络数据通过创建
Buffer
,应用程序业务与Channel
间通过Buffer
交换数据; -
Netty 处理网络数据也需
Buffer
,读网络数据时由 Netty 创建Buffer
,写网络数据时Buffer
常由业务方创建,Buffer
用完后必须释放,否则可能内存泄漏; -
Netty 中 Buffer 的自动释放
-
写网络数据时,若数据被成功写往网络,Netty 会自动释放
Buffer
,因为 Netty 会在pipeline
中安装HeadContext
和TailContext
两个 Handler,HeadContext
同时处理出站和入站事件,负责出站Buffer
的释放; -
但如果有
OutboundHandler
处理(重写/拦截)write()
操作并丢弃数据,未继续往下写,需手动调用ReferenceCountUtil.release
方法释放Buffer
,否则可能内存泄漏; -
读网络数据时,若每个
InboundHandler
都把数据往后传递(调用相关fireChannelRead
方法),Netty 会自动释放Buffer
(由TailContext
负责);若InboundHandler
处理数据后不继续传递且不调用ReferenceCountUtil.release
方法,可能内存泄漏;
-
-
SimpleChannelInboundHandler
:因消费入站数据是常规任务,Netty 提供SimpleChannelInboundHandler
(ChannelInboundHandler
的实现),它会在数据被channelRead0()
方法消费之后自动释放数据。系统提供的各种预定义 Handler 实现都正确处理了数据,所以自行编写业务 Handler 时,要么继续传递数据,要么自行释放;
3.5 内置通信传输模式
-
NIO。对应包为
io.netty.channel.socket.nio
,使用java.nio.channels
包作为基础,采用基于选择器的方式实现非阻塞 I/O 操作,是 Netty 中较为常用的传输模式之一; -
Epoll
-
对应包为
io.netty.channel.epoll
,由 JNI(Java Native Interface,Java 本地接口)驱动的epoll()
和非阻塞 I/O 实现; -
仅在 Linux 系统上可用,支持多种 Linux 特有的特性(如
SO_REUSEPORT
),相比 NIO 传输速度更快,且是完全非阻塞的;- 使用时需要将
NioEventLoopGroup
替换为EpollEventLoopGroup
,将NioServerSocketChannel.class
替换为EpollServerSocketChannel.class
;
- 使用时需要将
-
由于使用了 JNI,还需要额外安装相关的 so 库(如
libnetty_transport_native_epoll_x86_64.so
等),并放在 Netty 的 jar 包下;
-
-
OIO:对应包为
io.netty.channel.socket.oio
,使用java.net
包作为基础,采用阻塞流的方式进行 I/O 操作,适用于对性能要求不高或需要兼容旧代码的场景; -
Local。对应包为
io.netty.channel.local
,用于在 VM(虚拟机)内部通过管道进行通信的本地传输,适用于同一虚拟机内不同组件之间的通信; -
Embedded。对应包为
io.netty.channel.embedded
,是一种嵌入式传输,允许使用ChannelHandler
但不需要真正基于网络的传输,在测试ChannelHandler
实现时非常有用,方便进行单元测试等操作。
3.6 引导类Bootstrap
与ServerBootstrap
-
在网络编程里,“服务器”和“客户端”代表不同网络行为,即监听传入连接还是建立到其他进程的连接。由此有两种引导类:
-
Bootstrap
:用于客户端,作用是连接到远程主机和端口; -
ServerBootstrap
:用于服务器,作用是绑定到一个本地端口,因为服务器需要监听连接;
-
-
两者的对比
对比项 Bootstrap
ServerBootstrap
网络编程中的作用 连接到远程主机和端口 绑定到一个本地端口 EventLoopGroup
的数目1 2(也可使用同一个实例) -
服务器需要两组不同的
Channel
:-
第一组包含一个
ServerChannel
,代表服务器自身已绑定到本地端口、正在监听的套接字; -
第二组包含所有已创建的、用于处理传入客户端连接的
Channel
(每个已接受的连接对应一个); -
与
ServerChannel
相关联的EventLoopGroup
会分配一个EventLoop
,负责为传入连接请求创建Channel
;一旦连接被接受,第二个EventLoopGroup
会给对应的Channel
分配一个EventLoop
;
-
3.7 Channellnitializer
-
Netty 提供了一个特殊的
ChannelInboundHandlerAdapter
子类:public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
-
它定义了下面这个核心方法:
protected abstract void initChannel(C ch) throws Exception;
- 这个方法提供了一种将多个
ChannelHandler
添加到一个ChannelPipeline
中的简便方法; - 只需要向
Bootstrap
或ServerBootstrap
的实例提供ChannelInitializer
实现,一旦Channel
被注册到它的EventLoop
之后,就会调用initChannel()
方法; - 在该方法返回之后,
ChannelInitializer
的实例将会从ChannelPipeline
中移除它自己;
- 这个方法提供了一种将多个
-
应用场景:
- 在自己的应用程序中,如果存在某个
handler
只使用一次的情况,也可以仿造ChannelInitializer
,用完以后将自己从ChannelPipeline
中移除自己; - 例如授权
handler
,某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权handler
移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权handler
依然会被安装到ChannelPipeline
,依然会进行授权检查。
- 在自己的应用程序中,如果存在某个
3.8 ChannelOption
ChannelOption
的各种属性在套接字选项中都有对应,用于对 Netty 中的网络连接进行精细配置。
3.8.1 ChannelOption.SO_BACKLOG
-
对应
tcp/ip
协议listen
函数中的backlog
参数;- 服务端处理客户端连接请求是顺序进行的,同一时间只能处理一个客户端连接,多个客户端连接请求到来时,服务端会将无法处理的连接请求放入队列等待;
- 操作系统通常有两个队列:
ACCEPT
队列(保存已完成 TCP 三次握手的连接)和SYN
队列(服务器正在等待 TCP 三次握手完成的队列);
-
在 BSD 派生系统里,
backlog
指SYN
队列的大小;在 Linux 中,不同内核版本对backlog
的定义较模糊,有些版本指ACCEPT
队列与SYN
队列合起来的大小,有些指SYN
队列的大小; -
从 Linux 2.2 开始,
backlog
指定等待接受的完全建立的套接字的队列长度,而非不完整的连接请求数量。不完整套接字队列的最大长度可通过/proc/sys/net/ipv4/tcp_max_syn_backlog
设置,默认值为 128; -
若
backlog
参数大于/proc/sys/net/core/somaxconn
中的值,会被静默截断为该值(默认 128)。在 2.4.25 之前的内核中,此限制是硬编码值,后续内核版本可通过修改/etc/sysctl.conf
(包括tcp_max_syn_backlog
),再用sysctl -p
命令生效。
3.8.2 ChannelOption.SO_REUSEADDR
- 对应套接字选项中的
SO_REUSEADDR
,表示允许重复使用本地地址和端口- 例如,可让多网卡(IP)绑定相同端口;也可解决某进程非正常退出后,其占用端口需一段时间才能被其他进程使用的问题,若不设置
SO_REUSEADDR
,该端口无法正常被其他进程使用; - 注意:此参数无法让应用绑定完全相同的 IP + Port 来重复启动。
- 例如,可让多网卡(IP)绑定相同端口;也可解决某进程非正常退出后,其占用端口需一段时间才能被其他进程使用的问题,若不设置
3.8.3 ChannelOption.SO_KEEPALIVE
- 对应套接字选项中的
SO_KEEPALIVE
,用于设置 TCP 连接; - 设置该选项后,连接会测试链路状态,适用于长时间无数据交流的连接。若两小时内无数据通信,TCP 会自动发送一个活动探测数据报文。
3.8.4 ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF
对应套接字选项中的SO_SNDBUF
,ChannelOption.SO_RCVBUF
对应SO_RCVBUF
;- 这两个参数用于操作接收缓冲区和发送缓冲区的大小。接收缓冲区用于保存网络协议栈收到的数据,直到应用程序读取成功;发送缓冲区用于保存发送数据,直到发送成功。
3.8.5 ChannelOption.SO_LINGER
- 对应套接字选项中的
SO_LINGER
; - Linux 内核默认处理方式是,当用户调用
close()
方法时,函数返回,会尽量发送数据,但不一定保证剩余数据都能发送,导致数据不确定性; - 使用
SO_LINGER
可阻塞close()
的调用时间,直到数据完全发送。
3.8.6 ChannelOption.TCP_NODELAY
- 对应套接字选项中的
TCP_NODELAY
,与 Nagle 算法有关; - Nagle 算法会将小数据包组装为更大的帧后发送,而非输入一次发送一次,虽能有效提高网络有效负载,但会造成延时;
TCP_NODELAY
的作用是禁止使用 Nagle 算法,适用于小数据即时传输;- 与
TCP_NODELAY
相对应的是TCP_CORK
,该选项需等到发送数据量最大时一次性发送数据,适用于文件传输。
3.9 ByteBuf
3.9.1 简介
- API 优点
- 可被用户自定义的缓冲区类型扩展
- 通过内置复合缓冲区类型实现透明零拷贝
- 容量按需增长,类似 JDK 的
StringBuilder
- 读写模式切换无需调用
ByteBuffer
的flip()
方法 - 读写使用不同索引
- 支持方法链式调用
- 支持引用计数
- 支持池化
ByteBuf
了维护两个不同索引,以read
或write
开头的方法会推进对应索引,以set
或get
开头的操作则不会;- 若读取字节直到
readerIndex
等于writerIndex
,会到达可读取数据末尾,继续读取会触发IndexOutOfBoundsException
; - 可指定
ByteBuf
最大容量,写索引(writerIndex
)超过会触发异常,默认限制为Integer.MAX_VALUE
。
3.9.2 使用模式
- 堆缓冲区:最常用模式,数据存储在 JVM 堆空间,称为支撑数组(backing array),无池化时分配和释放快速,通过
hasArray()
判断是否由数组支撑,否则为直接缓冲区; - 直接缓冲区:另一种 ByteBuf 模式,缺点是分配和释放比堆缓冲区昂贵;
- 复合缓冲区(
CompositeByteBuf
):为多个ByteBuf
提供聚合视图。例如 HTTP 协议的消息头和消息体,由不同模块产生的ByteBuf
,可在发送时聚合为CompositeByteBuf
,用统一ByteBuf
API 操作。
3.9.3 分配
ByteBufAllocator
接口:Netty 通过ByteBufAllocator
接口分配任意类型的ByteBuf
实例,不同方法功能如下:buffer()
:返回基于堆或直接内存存储的ByteBuf
heapBuffer()
:返回基于堆内存存储的ByteBuf
directBuffer()
:返回基于直接内存存储的ByteBuf
compositeBuffer()
:返回可通过添加最多指定数目的基于堆或直接内存存储的缓冲区来扩展的CompositeByteBuf
ioBuffer()
:返回用于套接字 I/O 操作的ByteBuf
,运行环境有sun.misc.Unsafe
支持时返回基于直接内存的ByteBuf
,否则返回基于堆内存的ByteBuf
;指定使用PreferHeapByteBufAllocator
时,只返回基于堆内存的ByteBuf
- 获取方式:可通过
Channel
(每个Channel
可有不同ByteBufAllocator
实例)或绑定到ChannelHandler
的ChannelHandlerContext
获取ByteBufAllocator
引用,例如ctx.alloc().buffer()
; ByteBufAllocator
实现PooledByteBufAllocator
:池化ByteBuf
实例,提升性能并减少内存碎片,Netty4.1 默认使用;UnpooledByteBufAllocator
:不池化ByteBuf
实例,每次调用返回新实例;
Unpooled
工具类:提供静态辅助方法创建未池化ByteBuf
实例,方法包括:buffer()
:返回未池化的基于堆内存的ByteBuf
directBuffer()
:返回未池化的基于直接内存的ByteBuf
wrappedBuffer()
:返回包装给定数据的ByteBuf
copiedBuffer()
:返回复制给定数据的ByteBuf
- 该类也可用于不需要 Netty 其他组件的非网络项目
3.9.4 随机访问索引/顺序访问索引/读写操作
- 索引基础:
ByteBuf
索引从零开始,第一个字节索引为 0,最后一个为capacity() - 1
。使用需索引参数(随机访问,即数组下标)的方法访问数据,不会改变readerIndex
和writerIndex
,也可手动调用readerIndex(index)
或writerIndex(index)
移动索引; - 读/写操作类别
get()
和set()
操作:从给定索引开始,保持索引不变,get
可加数据字长(如bool
、byte
、int
、short
、long
、bytes
);read()
和write()
操作:从给定索引开始,会根据已访问字节数调整索引;
- 更多操作
isReadable()
:至少有一个字节可读时返回true
isWritable()
:至少有一个字节可写时返回true
readableBytes()
:返回可读取的字节数writableBytes()
:返回可写入的字节数capacity()
:返回ByteBuf
可容纳的字节数,之后会尝试扩展直到达到maxCapacity()
maxCapacity()
:返回ByteBuf
可容纳的最大字节数hasArray()
:ByteBuf
由字节数组支撑时返回true
array()
:ByteBuf
由字节数组支撑时返回该数组,否则抛出UnsupportedOperationException
异常
3.9.5 字节分段与索引管理
-
可丢弃字节:
- 包含已被读过的字节,调用
discardReadBytes()
可丢弃并回收空间,其初始大小为 0,存储在readerIndex
中,随read
操作增加(get*
操作不移动readerIndex
); - 调用该方法后,可丢弃字节分段空间变为可写,但频繁调用可能导致内存复制(需将可读字节移到缓冲区开始位置),建议仅在内存宝贵时使用;
- 包含已被读过的字节,调用
-
可读字节:
ByteBuf
的可读字节分段存储实际数据,新分配、包装或复制的缓冲区默认readerIndex
为 0; -
可写字节:是拥有未定义内容、写入就绪的内存区域,新分配缓冲区默认
writerIndex
为 0,以write
开头的操作从当前writerIndex
开始写数据,并增加已写字节数; -
索引管理:
- 可通过
markReaderIndex()
、markWriterIndex()
、resetWriterIndex()
、resetReaderIndex()
标记和重置readerIndex
与writerIndex
; - 也可通过
readerIndex(int)
或writerIndex(int)
将索引移到指定位置,设置无效位置会触发IndexOutOfBoundsException
; - 调用
clear()
可将readerIndex
和writerIndex
设为 0,但不清除内存内容。
- 可通过
3.9.6 查找、派生与工具类
- 查找操作:
ByteBuf
中有多种确定指定值索引的方法,简单的用indexOf()
,复杂的可调用forEachByte()
,例如查找回车符(\r
)可通过buffer.forEachByte(ByteBufProcessor.FIND_CR)
实现; - 派生缓冲区:为
ByteBuf
提供内容视图,通过duplicate()
、slice()
、slice(int, int)
、Unpooled.unmodifiableBuffer(…)
、order(ByteOrder)
、readSlice(int)
等方法创建,每个方法返回新ByteBuf
实例,有自己的读、写和标记索引,内部存储与 JDK 的ByteBuffer
一样是共享的。若需现有缓冲区真实副本,用copy()
或copy(int, int)
方法,返回的ByteBuf
拥有独立数据副本; - 引用计数:是优化内存使用和性能的技术,当对象资源不再被其他对象引用时释放资源,Netty 4 为
ByteBuf
引入引用计数技术,通过interface ReferenceCounted
实现; - 工具类(
ByteBufUtil
):- 提供操作
ByteBuf
的静态辅助方法,与池化无关,在分配类外部实现; - 其中
hexdump()
方法以十六进制形式打印ByteBuf
内容,便于调试记录;equals(ByteBuf, ByteBuf)
方法用于判断两个ByteBuf
实例的相等性。
- 提供操作
3.9.7 资源释放
ChannelInboundHandler
相关:- 当重写
channelRead()
方法时,需显式释放与池化ByteBuf
实例相关的内存,Netty 提供ReferenceCountUtil.release()
方法; - Netty 会用
WARN
级日志记录未释放资源,方便发现违规实例; - 更简单的方式是使用
SimpleChannelInboundHandler
,它会自动释放资源;
- 当重写
- 入站请求:Netty 的
EventLoop
在处理Channel
读操作时分配的ByteBuf
,需自行释放,有三种方式:- 使用
SimpleChannelInboundHandler
; - 在重写
channelRead()
方法时使用ReferenceCountUtil.release()
; - 在重写
channelRead()
方法时使用ctx.fireChannelRead
继续向后传递;
- 使用
- 出站请求:不管
ByteBuf
是否由业务创建,调用write
或writeAndFlush
方法,且无额外OutboundHandler
重写write
和writeAndFlush
方法时,Netty 会自动释放,无需业务代码自行释放。
4 TCP 的粘包/半包
4.1 什么是粘包/半包?
-
在 TCP 通信中,客户端向服务端发送数据时,由于 TCP 是面向流的协议,没有消息边界的概念,服务端一次读取到的字节数不确定,就可能出现粘包或半包的情况;
-
假设客户端分别发送两个数据包 D1 和 D2 给服务端,可能存在以下几种情况:
-
无粘包和拆包:服务端分两次读取到两个独立的数据包,分别是 D1 和 D2
-
TCP 粘包:服务端一次接收到了两个数据包,D1 和 D2 粘合在一起
-
TCP 拆包:服务端分两次读取到两个数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容
-
混合情况:服务端分两次读取到两个数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余内容 D1_2 和 D2 包的整包
-
多次拆包(可能情况):若服务端 TCP 接收滑窗非常小,而数据包 D1 和 D2 比较大,服务端可能分多次才能将 D1 和 D2 包接收完全,期间发生多次拆包
-
4.2 粘包/半包的产生原因
-
TCP 粘包发生的原因:
-
由于 TCP 是面向连接的可靠协议(基于三次握手机制),客户端与服务器会维持一个连接(
Channel
),数据在连接不断开时,可持续发送多个数据包到服务器; -
若发送的网络数据包太小,会启用 Nagle 算法(可配置是否启用),对较小数据包进行合并(基于此,TCP 网络延迟比 UDP 高),之后再发送(超时或包大小足够时)。这会导致服务器接收到消息(数据流)时,无法区分哪些数据包是客户端分开发送的,从而产生粘包;
-
服务器接收到数据后放到缓冲区,若消息未被及时从缓冲区取走,下次取数据时可能一次取出多个数据包,造成粘包现象;
UDP 无粘包的原因:UDP 是无连接的不可靠传输协议(适合频繁发送较小数据包),不会对数据包进行合并发送(无 Nagle 算法),一端发送什么数据就直接发出,每个数据包都是完整的(数据 + UDP 头 + IP 头等每次发送都封装一次),所以没有粘包问题;
-
-
TCP 半包(分包,即一个数据包被分成多次接收)产生的原因:
-
应用程序写入数据的字节大小大于套接字发送缓冲区的大小;
-
进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写,是 TCP 报文段中数据字段的最大长度,数据字段加上 TCP 首部才等于整个 TCP 报文段,即MSS=TCP报文段长度−TCP首部长度\text{MSS} = \text{TCP报文段长度} - \text{TCP首部长度}MSS=TCP报文段长度−TCP首部长度,并非 TCP 报文段的最大长度。
-
4.3 解决方案
-
由于底层 TCP 无法理解上层业务数据,无法保证数据包不被拆分和重组,需通过上层应用协议栈设计解决,主流方案有以下三种:
-
包尾增加分割符:在数据包尾部增加特定分割符(如回车换行符)来分割不同数据包,例如 FTP 协议;
-
消息定长:每个报文设置为固定长度(如 200 字节),若数据不足,用空格等空位补齐;
-
消息分消息头和消息体:将消息分为消息头和消息体,消息头中包含表示消息总长度(或消息体长度)的字段,通常设计为消息头的第一个字段用
int32
表示消息总长度,可使用LengthFieldBasedFrameDecoder
。
4.4 channelRead
VS channelReadComplete
-
channelRead
方法:Netty 是在读到完整的业务请求报文后才调用一次业务ChannelHandler
的channelRead
方法,无论这条报文底层经过了几次SocketChannel
的read
调用; -
channelReadComplete
方法:不是在业务语义上的读取消息完成后被触发,而是在每次从SocketChannel
成功读到消息后,由系统触发。也就是说,如果一个业务消息被 TCP 协议栈发送了 $ N $ 次,那么服务端的channelReadComplete
方法就会被调用 $ N $ 次; -
代码示例与验证
-
下面代码中客户端发送较小报文 5 次,通过循环构造
ByteBuf
并调用ctx.writeAndFlush(msg)
发送; -
服务端输出显示:
channelRead
执行次数和客户端发送报文数一致(共 5 次);channelReadComplete
执行了 3 次,其执行次数和客户端发送报文数无直接关联,也无明显规律;- 这进一步验证了
channelRead
是一个报文执行一次,而channelReadComplete
是每次从SocketChannel
成功读到消息就触发;
-
5 编解码器框架
5.1 什么是编解码器?
-
每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式;
-
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列(即它的数据),那么:
-
编码器:将消息转换为适合于传输的格式(最有可能的就是字节流),操作出站数据;
-
解码器:将网络字节流转换回应用程序的消息格式,处理入站数据;
-
-
另外,前面所学的解决粘包半包的内容,其实也是编解码器框架的一部分。
5.2 解码器
- 解码器分类
ByteToMessageDecoder
:将字节解码为消息(或另一个字节序列);- 由于无法确定远程节点是否一次性发送完整消息,该类会对入站数据进行缓冲,直到准备好处理;
- 其核心方法是
decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
,这是必须实现的抽象方法。调用该方法时传入包含传入数据的ByteBuf
和用于添加解码消息的List
,会重复调用直到确定没有新元素添加到List
或ByteBuf
中无更多可读字节,若List
不为空,内容会传递给ChannelPipeline
中的下一个ChannelInboundHandler
;
MessageToMessageDecoder
:将一种消息类型解码为另一种(如String
到Integer
)。核心方法是decode(ChannelHandlerContext ctx, I msg, List<Object> out)
,对每个需解码为另一种格式的入站消息调用,解码后消息传递给ChannelPipeline
中的下一个ChannelInboundHandler
,其中T
代表源数据类型。
- Netty 是异步框架,解码器需在内存中缓冲字节直到可解码,为避免解码器缓冲大量数据耗尽内存,Netty 提供**
TooLongFrameException
**,当帧超出指定大小限制时由解码器抛出。可设置最大字节数阈值,超出则抛出该异常(由ChannelHandler.exceptionCaught()
方法捕获),异常处理取决于解码器用户,某些协议(如 HTTP)可返回特殊响应,其他情况可能需关闭连接。
5.3 编码器
-
编码器分类
MessageToByteEncoder
:将消息编码为字节。核心方法是encode(ChannelHandlerContext ctx, I msg, ByteBuf out)
,需实现该抽象方法,调用时传入要编码为ByteBuf
的出站消息(类型为I
),ByteBuf
随后转发给ChannelPipeline
中的下一个ChannelOutboundHandler
;MessageToMessageEncoder
:将消息编码为消息(如 Java 对象到String
类型的 JSON 文本),T
代表源数据类型。核心方法是encode(ChannelHandlerContext ctx, I msg, List<Object> out)
,每个通过write()
方法写入的消息都会传递给该方法,编码为一个或多个出站消息,随后转发给ChannelPipeline
中的下一个ChannelOutboundHandler
;
-
业务场景示例:
- 两端通过 JSON 加密通信时,发送端流程为 Java 对象经
MessageToMessageEncoder
转为String
类型 JSON 文本,再经MessageToByteEncoder
转为网络加密报文; - 接收端流程为网络加密报文经
ByteToMessageDecoder
转为String
类型 JSON 明文,再经MessageToMessageDecoder
转为 Java 对象,可将ByteToMessageDecoder
看作一次解码器,MessageToMessageDecoder
看作二次或多次解码器; MessageToByteEncoder
看作网络报文编码器,MessageToMessageEncoder
看作业务编码器。
- 两端通过 JSON 加密通信时,发送端流程为 Java 对象经
-
编解码器类
-
我们通常将解码器和编码器作为单独的实体讨论,但有时在同一个类中管理入站和出站数据及消息的转换很有用。Netty 的抽象编解码器类可用于此目的,每个抽象编解码器类会捆绑一个解码器/编码器对,且同时实现了
ChannelInboundHandler
和ChannelOutboundHandler
接口; -
不优先使用复合编解码器类的原因:因为通过尽可能将解码和编码功能分开,能最大化代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则;
-
相关的类:
-
抽象类
ByteToMessageCodec
-
抽象类
MessageToMessageCodec
-
-
5.4 实战:实现SSL/TLS和Web服务
- Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了我们所要花费的的时间与精力。
5.4.1 HTTP 系列
- HTTP 协议特点:基于请求/响应模式,客户端发 HTTP 请求,服务器返回 HTTP 响应。Netty 提供多种编解码器简化该协议使用;
- HTTP 消息组成:一个 HTTP 请求/响应可能由多个数据部分组成,
FullHttpRequest
和FullHttpResponse
分别代表完整的请求和响应,所有 HTTP 消息(如FullHttpRequest
、LastHttpContent
等)都实现HttpObject
接口; - HTTP 编解码器
HttpRequestEncoder
:将HttpRequest
、HttpContent
和LastHttpContent
消息编码为字节HttpResponseEncoder
:将HttpResponse
、HttpContent
和LastHttpContent
消息编码为字节HttpRequestDecoder
:将字节解码为HttpRequest
、HttpContent
和LastHttpContent
消息HttpResponseDecoder
:将字节解码为HttpResponse
、HttpContent
和LastHttpContent
消息HttpClientCodec
和HttpServerCodec
:对请求和响应编解码做了组合,HTTP 客户端使用HttpClientCodec
,服务器端使用HttpResponseEncoder
和HttpRequestDecoder
- 聚合 HTTP 消息:因 HTTP 请求和响应可能由多部分组成,需聚合形成完整消息,Netty 提供
HttpObjectAggregator
,可将多个消息部分合并为FullHttpRequest
或FullHttpResponse
消息,保证看到完整消息内容; - HTTP 压缩:使用 HTTP 时建议开启压缩减小传输数据大小,虽会带来 CPU 开销,但对文本数据很有必要。Netty 为压缩和解压缩提供
ChannelHandler
实现,支持gzip
和deflate
编码。
5.4.2 通过 SSL/TLS 保护 Netty 应用程序
- SSL/TLS 作用:作为安全协议,层叠在其他协议之上实现数据安全,不仅用于安全网站,还可用于安全 SMTP(SMTPS)邮件服务器、关系型数据库系统等非 HTTP 应用程序;
- Java 与 Netty 对 SSL/TLS 的支持:Java 提供
javax.net.ssl
包,其中SSLContext
和SSLEngine
类使加解密简单直接。Netty 通过SslHandler
(内部使用SSLEngine
完成实际工作)利用该 API,且多数情况下SslHandler
是ChannelPipeline
中的第一个ChannelHandler
; - 使用 HTTPS:启用 HTTPS 只需将
SslHandler
添加到ChannelPipeline
的ChannelHandler
组合中; - 根据客户端决定 HTTPS:Netty 提供了一个
OptionalSslHandler
,可根据客户端访问决定是否启用 SSL,其关键是能根据业务需求在运行时替换和删除Handler
,类似 Dubbo 源码中Handler
在运行时的移除。
6 序列化问题
6.1 序列化的目的
- Java 序列化主要有两个目的:
-
网络传输:在远程跨进程服务调用时,需将被传输的 Java 对象编码为字节数组或
ByteBuffer
对象,远程服务读取后再解码为原 Java 对象,这就是 Java 对象编解码技术; -
对象持久化:将对象的状态保存下来,以便后续恢复。
-
6.2 Java 序列化
-
Java 序列化只是 Java 编解码技术的一种,由于存在诸多缺陷,衍生出了多种其他编解码技术和框架;
-
Java 序列化从 JDK1.1 版本就已提供,只需实现
java.io.Serializable
并生成序列 ID,因此诞生之初广泛应用。但在远程服务调用(RPC)时,很少直接用它进行消息编解码和传输,原因在于其存在以下缺点:-
无法跨语言:跨进程服务调用中,服务提供者可能用 C++ 或其他语言开发,而 Java 序列化是 Java 语言内部私有协议,其他语言不支持。Java 序列化后的字节数组,其他语言无法反序列化,严重阻碍了其在异构语言进程交互场景的应用;
-
序列化后的码流太大;
-
序列化性能太低:JDK 默认的序列化机制,无论是序列化后的码流大小还是序列化性能,表现都很差,所以通常不会选择 Java 序列化作为远程跨节点调用的编解码框架。
-
6.3 如何选择序列化框架
6.3.1 选择要点
-
跨语言支持:若项目有跨语言的硬性要求,即使某序列化框架性能极好,但只支持特定语言,也无法选择;
-
空间:即编码后占用的空间大小;
-
时间:指编解码的速度。空间和时间是对序列化框架的性能要求,二者存在矛盾,通常需追求平衡,若要编码后占用空间小,往往要花费更多编码时间;
-
可读性:有些项目要求序列化后的数据人类可读,此时可选的框架不多,一般是 JSON 或 XML 格式,部分序列化框架支持通过自带工具观察序列化后的数据,也可考虑。
6.3.2 序列化框架比较
-
跨语言通用性
JDK Serializer
:只适用于 JavaFST
:只适用于 JavaKryo
:主要适用于 Java(可复杂支持跨语言)Protocol buffer
、Thrift
、Hessian
、Avro
:支持多种语言- 还有
msgpack
也支持跨语言
-
性能方面
- 空间性能:
avro
、kryo
、Hessian2
、fst
、Protocol buffer
表现不错; - 时间性能:
kryo
、fst
、Protocol buffer
表现很好; Msgpack
也是优秀的序列化框架,性能与Protocol buffer
不相上下;
实际对比过程可以参考:几种Java常用序列化框架的选型与对比-阿里云开发者社区。
- 空间性能:
6.3.3 内置序列化支持
- Netty 内置了对
JBoss Marshalling
和Protocol Buffers
的支持。
6.3.4 集成第三方 MessagePack 实战与 LengthFieldBasedFrameDecoder
详解
6.3.4.1 LengthFieldBasedFrameDecoder
参数详解
maxFrameLength
:表示包的最大长度lengthFieldOffset
:长度域的偏移量,即跳过指定个数的字节后才是长度域lengthFieldLength
:记录帧数据长度的字段本身的长度,也就是长度域的长度lengthAdjustment
:长度的修正值,可正可负。Netty 读取到数据包的长度值 NNN 后,认为接下来的 NNN 个字节都需读取,但实际情况可能需增加或减少 NNN 的值,具体增减量写在此参数中initialBytesToStrip
:从数据帧中跳过的字节数,即得到完整数据包后,丢弃该数据包中指定字节数,剩下的才是后续业务实际需要的业务数据failFast
:若为true
,读取到长度域且其值超过maxFrameLength
时,抛出TooLongFrameException
;若为false
,只有真正读取完长度域值表示的字节后,才会抛出该异常。默认值为true
,建议不修改,否则可能造成内存溢出
6.3.4.2 具体案例
-
案例 1:数据包大小为 14B(长度域 2B + “HELLO, WORLD”),长度域值为 12B(0x000C),希望解码后保持一致。参数设置为:
lengthFieldOffset = 0
,lengthFieldLength = 2
,lengthAdjustment
无需调整,initialBytesToStrip = 0
(解码过程不丢弃任何数据); -
案例 2:数据包大小为 14B(长度域 2B + “HELLO, WORLD”),长度域值为 12B(0x000C),希望解码后丢弃长度域 2B 字段。参数设置为:
lengthFieldOffset = 0
,lengthFieldLength = 2
,lengthAdjustment
无需调整,initialBytesToStrip = 2
(解码过程丢弃 2 个字节数据); -
案例 3:数据包大小为 14B(长度域 2B + “HELLO, WORLD”),长度域值为 14(0x000E),包含长度域本身长度,希望解码后保持一致。参数设置为:
lengthFieldOffset = 0
,lengthFieldLength = 2
,lengthAdjustment = -2
(因长度域为 14,报文内容为 12,需告诉 Netty 实际读取报文长度比长度域少 2),initialBytesToStrip = 0
(解码过程不丢弃任何数据); -
案例 4:数据包大小为 17B(Header 1 2B + 长度域 3B + “HELLO, WORLD”),长度域值为 12B(0x0000C),编码解码后长度保持一致。参数设置为:
lengthFieldOffset = 2
,lengthFieldLength = 3
,lengthAdjustment = 0
(无需调整),initialBytesToStrip = 0
(解码过程不丢弃任何数据); -
案例 5:Header 与长度域位置置换,总数据包长度为 17B(长度域 3B + Header 2B + “HELLO, WORLD”),长度域值为 12B(0x0000C),编码解码后长度保持一致。参数设置为:
lengthFieldOffset = 0
,lengthFieldLength = 3
,lengthAdjustment = 2
(因长度域为 12,报文内容为 12,需把 Header 值一起读取,实际读取报文内容长度比长度域多 2),initialBytesToStrip = 0
(解码过程不丢弃任何数据); -
案例 6:带有两个 header,HDR1 丢弃,长度域丢弃,只剩下第二个 header 和有效包体,总数据包长度为 16B(HDR1 1B + 长度域 2B + HDR2 1B + “HELLO, WORLD”),长度域值为 12B(0x000C)。参数设置为:
lengthFieldOffset = 1
(HDR1 的长度),lengthFieldLength = 2
,lengthAdjustment = 1
(因长度域为 12,报文内容为 12,需把 HDR2 值一起读取,实际读取报文内容长度比长度域多 1),initialBytesToStrip = 3
(丢弃 HDR1 和长度字段); -
案例 7:带有两个 header,HDR1 丢弃,长度域丢弃,只剩下第二个 header 和有效包体,总数据包长度为 16B(HDR1 1B + 长度域 2B + HDR2 1B + “HELLO, WORLD”),长度域值为 16B(0x0010),HDR1 长度 1B,HDR2 长度 1B,包体长度 12B(1+1+2+12=16)。参数设置为:
lengthFieldOffset = 1
,lengthFieldLength = 2
,lengthAdjustment = -3
(因长度域为 16,实际读取报文内容长度比长度域少 3),initialBytesToStrip = 3
(丢弃 HDR1 和长度字段);
7 单元测试
7.1 如何进行单元测试?
-
EmbeddedChannel
介绍:是 Netty 专门为改进ChannelHandler
的单元测试提供的特殊Channel
实现。可将入站或出站数据写入EmbeddedChannel
,然后检查是否有数据到达ChannelPipeline
尾端,以此确定消息是否被编码/解码,以及是否触发ChannelHandler
动作; -
核心方法
writeInbound(Object... msgs)
:将入站消息写入EmbeddedChannel
,若能通过readInbound()
读取数据,返回true
readInbound()
:从EmbeddedChannel
读取一个入站消息,返回的内容已穿越整个ChannelPipeline
,无可读数据时返回null
writeOutbound(Object... msgs)
:将出站消息写入EmbeddedChannel
,若能通过readOutbound()
读取数据,返回true
readOutbound()
:从EmbeddedChannel
读取一个出站消息,返回的内容已穿越整个ChannelPipeline
,无可读数据时返回null
finish()
:将EmbeddedChannel
标记为完成,若有可读取的入站或出站数据,返回true
,还会调用EmbeddedChannel
的close()
方法
-
数据处理分工:入站数据由
ChannelInboundHandler
处理,代表从远程节点读取的数据;出站数据由ChannelOutboundHandler
处理,代表将要写到远程节点的数据; -
测试逻辑:用
writeOutbound()
将消息写入Channel
,沿出站方向传递,再用readOutbound()
读取已处理消息,验证结果是否符合预期;入站数据则用writeInbound()
和readInbound()
方法。每种情况下,消息都会传递过ChannelPipeline
,并被相关ChannelInboundHandler
或ChannelOutboundHandler
处理;
7.2 测试入站消息
- 有一个简单的
ByteToMessageDecoder
实现,给定足够数据会产生固定大小的帧,数据不足时会等待下一个数据块并再次检查。该特定解码器会产生固定为 3 字节大小的帧,可能需要多个事件提供足够字节数来生成一个帧。
7.3 测试出站消息
-
测试的处理器
AbsIntegerEncoder
是 Netty 的MessageToMessageEncoder
的特殊化实现,用于将负值整数转换为绝对值。持有AbsIntegerEncoder
的EmbeddedChannel
会以 4 字节负整数形式写出站数据,编码器从传入的ByteBuf
中读取每个负整数,调用Math.abs()
获取绝对值,并将每个负整数的绝对值写到ChannelPipeline
中;
7.4 测试异常处理
-
应用程序常需执行比转换数据更复杂的任务,比如处理格式不正确的输入或过量数据;
-
若读取字节数超出特定限制,会抛出
TooLongFrameException
,这是防范资源耗尽的常用方法。设定最大帧大小为 3 字节,若帧大小超出限制,程序会丢弃字节并抛出TooLongFrameException
,ChannelPipeline
中的其他ChannelHandler
可在exceptionCaught()
方法中选择处理或忽略该异常;