当前位置: 首页 > news >正文

分布式专题——35 Netty的使用和常用组件辨析

1 简述

  • Netty 是一个基于 Java 的高性能、异步事件驱动的网络应用框架,被广泛用于开发网络应用程序,如网络服务器和客户端;

  • 本次讲解基于 Netty 4.1.42.Final 版本:

    <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.42.Final</version><scope>compile</scope>
    </dependency>
    
  • Netty 的优势

    • API 使用简单,降低开发门槛
    • 功能强大,预置多种编解码功能,支持多种主流协议
    • 定制能力强,可通过 ChannelHandler 灵活扩展通信框架
    • 性能高,与业界主流 NIO 框架对比,综合性能最优
    • 成熟、稳定,修复了所有已发现的 JDK NIO BUG,让业务开发人员无需为 NIO 的 BUG 烦恼
    • 社区活跃,版本迭代周期短,BUG 能被及时修复,还会加入更多新功能
    • 经历了大规模商业应用考验,质量得到验证
  • 为什么不用 Netty5?:Netty5 已经停止开发;

  • Netty 使用 NIO 而非 AIO 的原因

    • Netty 不看重 Windows 上的使用,在 Linux 系统上,AIO 底层仍使用 EPOLL,未很好实现 AIO,性能无明显优势,且被 JDK 封装一层,不易深度优化;

    • AIO 接收数据需要预先分配缓存,而 NIO 是接收时才分配缓存,对于连接数量大但流量小的情况,AIO 内存浪费多;

    • Linux 上 AIO 不够成熟,处理回调结果的速度跟不上处理需求;

    • 作者表示在类 Unix 系统上 AIO 不比 NIO(epoll)快,还存在无数据报支持、线程模型不必要(抽象过多却无实际使用场景)等问题。原话:

      Not faster than NIO (epoll) on unix systems (which is true)
      There is no daragram suppport
      Unnecessary threading model (too much abstraction without usage)
      
  • 为什么不用 Mina?:Mina 几乎不再更新,且 Netty 本身就是因为 Mina 不够好才被开发出来的。

2 第一个 Netty 程序

2.1 核心知识点

  • Bootstrap:是 Netty 框架的启动类和主入口类,分为客户端类 Bootstrap 和服务器类 ServerBootstrap 两种,用于初始化 Netty 应用并启动服务或建立客户端连接;

  • Channel

    • 是 Java NIO 的一个基本构造,代表一个到实体(如硬件设备、文件、网络套接字或能执行 I/O 操作的程序组件)的开放连接,可进行读、写等操作;
    • 也可看作传入(入站)或传出(出站)数据的载体,能被打开、关闭,以及进行连接、断开连接等操作;
  • EventLoop 和 EventLoopGroup

    • EventLoop 暂时可看成一个线程,负责处理 Channel 上的 I/O 操作和事件;
    • EventLoopGroup 自然可看成线程组,管理多个 EventLoop,用于分发和处理 I/O 事件等任务;
  • 事件:Netty 使用不同事件通知状态改变或操作状态,基于发生的事件触发适当动作。事件按与入站或出站数据流的相关性分类;

    • 入站事件:可能由入站数据或相关状态更改触发,包括连接已被激活/连接失活、数据读取、用户事件、错误事件等;
    • 出站事件:是未来将会触发的某个动作的操作结果,动作包括打开/关闭到远程节点的连接、将数据写到/冲刷到套接字等;
  • ChannelHandler

    • 每个事件都可被分发给 ChannelHandler 类中用户实现的方法;
    • 由于事件分为入站和出站,处理事件的 ChannelHandler 也被分为处理入站事件的 Handler、处理出站事件的 Handler,部分 Handler 既可以处理入站也可以处理出站事件;
    • Netty 提供了大量预定义的开箱即用的 ChannelHandler 实现,涵盖各种协议(如 HTTP 和 SSL/TLS)相关的 ChannelHandler
    • 基于 Netty 的网络应用程序会根据业务需求,使用 Netty 已提供的 ChannelHandler 或自行开发 ChannelHandler
  • ChannelPipelineChannelHandler 都放在 ChannelPipeline 中统一管理,事件在 ChannelPipeline 中流动,并被其中一个或多个 ChannelHandler 处理,它是 ChannelHandler 的容器,负责组织和管理 ChannelHandler 的执行顺序和交互;

    在这里插入图片描述

  • ChannelFuture

    • Netty 中所有的 I/O 操作都是异步的,异步意味着不需要主动等待结果返回,而是通过状态通知、回调函数等手段获取结果;

    • JDK 预置了 java.util.concurrent.Future 接口,提供了在操作完成时通知应用程序的方式,可看作异步操作结果的占位符,会在未来某个时刻完成并提供结果访问,但它的实现只允许手动检查操作是否完成,或一直阻塞直到完成,使用繁琐;

    • 因此 Netty 提供了自己的实现 ChannelFuture,用于在执行异步操作时使用。一般来说,每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture,通过它可以更方便地对异步操作的结果进行监听和处理。

2.2 请求-响应(Echo)示例

2.2.1 Netty 客户端的启动和配置类

public class EchoClient {private final int port;private final String host;public EchoClient(int port, String host) {this.port = port;this.host = host;}public void start() throws InterruptedException {// 创建事件循环组,用于处理客户端的IO操作和任务EventLoopGroup group  = new NioEventLoopGroup();try {// 创建客户端启动引导类Bootstrap b = new Bootstrap();b.group(group)// 指定使用NIO的Socket通道类型.channel(NioSocketChannel.class)// 设置要连接的服务器的地址和端口.remoteAddress(new InetSocketAddress(host,port))// 配置通道的处理器链,添加自定义的业务处理器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 将自定义的客户端处理器添加到管道中ch.pipeline().addLast(new EchoClientHandler());}});// 异步连接到服务器,sync()会等待连接完成ChannelFuture f = b.connect().sync();// 等待客户端通道关闭,保持程序运行f.channel().closeFuture().sync();} finally {// 优雅关闭事件循环组,释放资源group.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {// 创建客户端实例并启动,连接本地9999端口new EchoClient(9999,"127.0.0.1").start();}
}

2.2.2 客户端业务逻辑处理器

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {// 当从服务器接收到数据时自动调用此方法// SimpleChannelInboundHandler会自动释放ByteBuf资源@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {// 将接收到的ByteBuf转换为字符串并打印到控制台System.out.println("客户端接收到服务器消息: " + msg.toString(CharsetUtil.UTF_8));// 如果需要处理完一次响应就关闭连接,可以取消注释下面这行// ctx.close();}// 当客户端与服务器的连接建立成功并激活时调用此方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 连接建立后立即向服务器发送一条消息// Unpooled.copiedBuffer 将字符串转换为Netty的ByteBufctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty", CharsetUtil.UTF_8));// 以下是其他写入方式:// ctx.pipeline().write()   - 从管道头部开始处理// ctx.channel().write()    - 从管道尾部开始处理// ctx.alloc().buffer()     - 从内存分配器获取一个新的ByteBuf// writeAndFlush = write() + flush(),确保数据立即发送}// 还可以重写其他方法:// channelInactive() - 连接断开时调用// exceptionCaught() - 发生异常时调用
}

2.2.3 Netty 服务器的启动和配置类

public class EchoServer  {// 创建日志记录器,用于输出服务器运行日志private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);private final int port;  // 服务器监听的端口号public EchoServer(int port) {this.port = port;}public static void main(String[] args) throws InterruptedException {int port = 9999;  // 设置服务器监听端口为9999EchoServer echoServer = new EchoServer(port);LOG.info("服务器即将启动");echoServer.start();  // 启动服务器LOG.info("服务器关闭");  // 这行会在服务器关闭后执行}public void start() throws InterruptedException {// 创建主从Reactor线程组,用于处理连接和IO事件EventLoopGroup group  = new NioEventLoopGroup();try {// 创建服务器启动引导类,用于配置服务器参数ServerBootstrap b = new ServerBootstrap();b.group(group)// 指定使用NIO的服务器Socket通道类型.channel(NioServerSocketChannel.class)// 设置服务器绑定的本地地址和端口.localAddress(new InetSocketAddress(port))// 为每个新连接的客户端通道配置处理器.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 为每个客户端连接添加自定义的业务处理器ch.pipeline().addLast(new EchoServerHandler());}});// 异步绑定端口启动服务器,sync()会等待绑定操作完成ChannelFuture f = b.bind().sync();LOG.info("服务器启动完成。");// 阻塞当前线程,等待服务器通道关闭(保持服务器运行)// 当调用close()方法时,这个future会完成,程序继续执行f.channel().closeFuture().sync();} finally {// 优雅关闭事件循环组,释放所有资源// 包括关闭所有通道、取消所有任务等group.shutdownGracefully().sync();}}
}

2.2.4 服务器端业务逻辑处理器

public class EchoServerHandler extends ChannelInboundHandlerAdapter {// 当从客户端接收到数据时调用此方法@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 将接收到的消息转换为ByteBuf类型ByteBuf in = (ByteBuf)msg;// 将ByteBuf内容转换为字符串并打印到控制台System.out.println("服务器接收到客户端消息: " + in.toString(CharsetUtil.UTF_8));// 将接收到的数据原样写回给客户端(回声功能)// writeAndFlush会确保数据被写入并刷新到网络ctx.writeAndFlush(in);// 如果需要处理完消息后关闭连接,可以取消注释下面这行// ctx.close();}// 当有新的客户端连接建立时调用此方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 打印连接建立日志System.out.println("新的客户端连接已建立");// 调用父类方法,确保其他可能的处理逻辑也能执行super.channelActive(ctx);}// 当处理过程中发生异常时调用此方法@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 打印异常堆栈信息,便于调试cause.printStackTrace();// 关闭发生异常的连接,防止资源泄漏ctx.close();}// 还可以重写其他方法:// channelInactive() - 连接断开时调用// channelReadComplete() - 数据读取完成时调用
}

2.2.5 流程

  • 服务器启动EchoServer 启动并监听 9999 端口
  • 客户端启动EchoClient 连接服务器
  • 客户端发送消息:连接建立后,客户端发送 “Hello,Netty”
  • 服务器回声:服务器收到消息后,原样返回给客户端
  • 客户端打印回声:客户端收到服务器返回的消息并打印

3 Netty 组件

3.1 EventLoopEventLoopGroup

3.1.1 讲解

  • 在 NIO 中,通过 while 循环 select 出事件,再依次处理,这一“事件循环”机制就是 EventLoop 的核心逻辑体现。io.netty.channel.EventLoop 定义了 Netty 的核心抽象,用于处理网络连接生命周期中发生的各类事件;

  • 一个 EventLoop 由一个永远不会改变的 Thread 驱动,同时任务(Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行;

  • 相关包与接口扩展:

    • io.netty.util.concurrent 包构建在 JDK 的 java.util.concurrent 包基础之上;

    • io.netty.channel 包中的类,为了能与 Channel 的事件进行交互,对 java.util.concurrent 里的接口、类进行了扩展;

  • 从下面类图中可以看到相关类的继承与实现关系:

    在这里插入图片描述

    • 最上层是 Executor,它是 Java 并发编程中执行任务的最基础接口;

    • ExecutorService 继承自 Executor,扩展了更多管理任务执行的方法,比如关闭线程池、提交有返回值的任务等;

    • EventExecutorGroup 同时继承了 ExecutorServiceIterable,还关联 ScheduledExecutorService,它是对线程池相关功能在 Netty 场景下的进一步封装,用于管理一组 EventExecutor

    • EventExecutor 继承自 EventExecutorGroup,更侧重于单个执行器的功能,具备有序处理事件等特性;

    • OrderedEventExecutor 继承自 EventExecutor,强调事件处理的有序性;

    • EventLoop 继承自 OrderedEventExecutorEventLoopGroup,是 Netty 中处理 I/O 事件和任务的核心组件,每个 EventLoop 绑定一个线程,负责处理对应的 Channel 上的事件;

    • EventLoopGroup 继承自 EventExecutorGroup,它是 EventLoop 的组管理器,用于管理多个 EventLoop,在 Netty 服务启动时,负责分配 EventLoop 来处理客户端连接等任务。

3.1.2 线程的分配

  • 服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中;

  • 线程分配机制:

    • 异步传输下的共享:异步传输实现只使用少量的 EventLoop(以及和它们相关联的 Thread),在当前线程模型下,这些 EventLoop 可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来支撑大量的 Channel,而不是为每个 Channel 分配一个 Thread

    • 分配方式EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。当前实现中,使用顺序循环(round-robin)的方式进行分配,以获取均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel

    • 生命周期绑定:一旦一个 Channel 被分配给一个 EventLoop,它将在整个生命周期中都使用这个 EventLoop(以及相关联的 Thread);

    在这里插入图片描述

  • ThreadLocal 使用的影响:

    • 因为一个 EventLoop 通常会被用于支撑多个 Channel,所以对于所有相关联的 Channel 来说,ThreadLocal 都是一样的,这使得 ThreadLocal 对于实现状态追踪等功能来说是个糟糕的选择;
    • 不过,在一些无状态的上下文中,它仍然可以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。

3.1.3 线程管理

  • 当向 EventLoop 提交任务时,会先检查当前调用线程是否是支撑该 EventLoop 的线程:

    • :所提交的代码块会被直接执行;

    • 不是EventLoop 会调度该任务以便稍后执行,并将它放入到内部队列中。当 EventLoop 下次处理它的事件时,会执行队列中的那些任务/事件;

  • 流程步骤:

    • 提交任务:将要在 EventLoop 执行的任务,通过 Channel.eventLoop().execute(Task) 方法传递给 EventLoop

    • 线程检查:在把任务传递给 execute 方法之后,执行检查以确定当前调用线程是否就是分配给 EventLoop 的那个线程;

    • 相同线程的处理:如果是相同的线程,则在 EventLoop 中,可以直接执行任务;

    • 不同线程的处理:如果线程不是 EventLoop 的那个线程,则将任务放入队列以便 EventLoop 下一次处理它的事件时执行;

    在这里插入图片描述

3.2 Netty 网络抽象代表

3.2.1 讲解

  • Netty 网络抽象代表

    • Channel:对应 Socket,是 Netty 网络操作的基础,用于表示网络连接等;
    • EventLoop:负责控制流、多线程处理和并发,管理 Channel 的 I/O 事件等;
    • ChannelFuture:用于异步通知,因为 Netty 中 I/O 操作是异步的,通过它可以获取异步操作的结果等;
  • Channel 和 EventLoop 关系Channel 需要注册到某个 EventLoop 上,在其整个生命周期内都由该 EventLoop 处理 I/O 事件,即一个 Channel 与一个 EventLoop 绑定,但一个 EventLoop 可同时被多个 Channel 绑定;

    在这里插入图片描述

3.2.2 Channel 接口

  • 基本 I/O 操作与优势
    • 基本 I/O 操作(bind()connect()read()write())依赖底层网络传输原语,在 Java 网络编程中基于 Socket,Netty 的 Channel 接口提供的 API 用于所有 I/O 操作,降低了直接使用 Socket 类的复杂性,且 Channel 是拥有许多预定义、专门化实现的广泛类层次结构的根;
    • Channel 独一无二,为保证顺序,被声明为 java.lang.Comparable 的子接口,若两个不同 Channel 实例返回相同散列码,AbstractChannel 中的 compareTo() 方法会抛出 Error
  • Channel 的生命周期状态
    • ChannelUnregisteredChannel 已创建,但未注册到 EventLoop
    • ChannelRegisteredChannel 已注册到 EventLoop
    • ChannelActiveChannel 处于活动状态(已连接到远程节点),可接收和发送数据
    • ChannelInactiveChannel 未连接到远程节点
    • 这些状态改变时会生成对应事件,转发给 ChannelPipeline 中的 ChannelHandler 以作响应,编程中更关注 ChannelActiveChannelInactive
  • 重要 Channel 的方法
    • eventLoop:返回分配给 ChannelEventLoop
    • pipeline:返回 ChannelChannelPipeline,每个 Channel 都有自己的 ChannelPipeline
    • isActive:若 Channel 活动则返回 true,活动意义依底层传输而定,如 Socket 传输连接到远程节点为活动,Datagram 传输打开为活动
    • localAddress:返回本地的 SocketAddress
    • remoteAddress:返回远程的 SocketAddress
    • write:将数据写到远程节点,仅写往 Netty 内部缓存,未真正写往 socket
    • flush:将之前已写的数据冲刷到底层 socket 进行传输
    • writeAndFlush:简便方法,等同于调用 write() 后接着调用 flush()

3.3 ChannelPipelineChannelHandlerContext

3.3.1 ChannelPipeline 接口

  • Channel 被创建时,会自动分配一个新的 ChannelPipeline,每个 Channel 都有自己的 ChannelPipeline,且这种关联是永久性的,在 Netty 组件生命周期中是固定操作,无需开发人员干预;

  • ChannelPipeline 提供了 ChannelHandler 链的容器,定义了用于在该链上传播**入站(从网络到业务处理)出站(从业务处理到网络)**各种事件流的 API,代码中的 ChannelHandler 都放在 ChannelPipeline 中;

  • ChannelHandler 的工作是让事件流经 ChannelPipeline,它们在应用程序初始化或引导阶段被安装。ChannelHandler 对象接收事件、执行自身实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler,同时也可拦截事件阻止其继续传递,其执行顺序由添加顺序决定;

    在这里插入图片描述

3.3.2 ChannelHandler 的生命周期

  • ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时,会调用以下方法,每个方法都接收一个 ChannelHandlerContext 参数:
    • handlerAdded:当把 ChannelHandler 添加到 ChannelPipeline 中时被调用

    • handlerRemoved:当从 ChannelPipeline 中移除 ChannelHandler 时被调用

    • exceptionCaught:当处理过程中在 ChannelPipeline 中有错误产生时被调用

3.3.3 ChannelPipeline 中的 ChannelHandler

  • 入站和出站 ChannelHandler 被安装到同一个 ChannelPipeline 中,ChannelPipeline 以双向链表的形式进行维护管理;

    • 比如要在网络上传递数据,有这些需求:

      • 加密
      • 压缩(因为加密后得到的密文较大)
      • 按照业务要求检查报文中携带的用户信息是否合法
    • 于是实现了 5 个Handler:解压(入)Handler、压缩(出)handler、解密(入) Handler、加密(出) Handler、授权(入)Handler

    在这里插入图片描述

  • 事件流动(入站):当一个消息或其他入站事件被读取时,会从 ChannelPipeline 的头部开始流动,且只被处理入站事件的 Handler 处理(即解压(入)Handler、解密(入) Handler、授权(入)Handler),最终数据到达 ChannelPipeline 的尾端,处理结束;

    在这里插入图片描述

  • 事件流动(出站):数据的出站运动(正在被写的数据),概念上与入站类似,数据从链的尾端开始流动,只被处理出站事件的 Handler 处理(即加密(出) Handler、压缩(出)Handler),直到到达链的头部,之后出站数据到达网络传输层(Socket);

    在这里插入图片描述

  • Netty 的区分能力:Netty 能区分入站事件的 Handler 和出站事件的 Handler,确保数据只会在具有相同定向类型的两个 ChannelHandler 之间传递;

    • AbstractChannelHandlerContext.java 中的 findContextOutbound 方法用于查找处理出站事件的上下文;
    • ChannelHandlerMask.java 中定义了 MASK_ALL_INBOUND(所有入站事件掩码)和 MASK_ALL_OUTBOUND(所有出站事件掩码);
    • 这些都为 Netty 区分和处理入站、出站事件提供了底层支持;

    在这里插入图片描述

    在这里插入图片描述

  • 所以在编写 Netty 应用程序时要注意ChannelHandler 的顺序问题:

    • 不同方向(入站与出站):在编写 Netty 应用程序时,分属出站和入站不同的 Handler,在业务没特殊要求的情况下顺序无所谓。例如“压缩(出)handler”可以放在“解压(入)handler”和“解密(入)Handler”中间,也可以放在“解密(入) Handler”和“授权(入) Handler”之间;

      在这里插入图片描述

    • 同方向:同属一个方向(入站或出站)的 Handler 是有顺序的,因为上一个 Handler 处理的结果往往是下一个 Handler 所需的输入。比如入站处理,收到的数据需先解压得到密文,再解密,最后拿到明文中的用户信息进行授权检查,所以解压→解密→授权这三个入站 Handler 的顺序不能乱;

      在这里插入图片描述

3.3.4 ChannelPipeline上的方法

  • ChannelPipeline 以双向链表形式维护管理 ChannelHandler,提供了一系列对 ChannelHandler 进行操作的方法:

    • 添加相关addFirstaddBeforeaddAfteraddLast,用于将 ChannelHandler 添加到 ChannelPipeline 中不同位置;

    • 移除相关remove,用于将 ChannelHandlerChannelPipeline 中移除;

    • 替换相关replace,用于将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler

    • 获取相关get,通过类型或者名称返回 ChannelHandlercontext,返回和 ChannelHandler 绑定的 ChannelHandlerContextnames,返回 ChannelPipeline 中所有 ChannelHandler 的名称;

  • 此外,ChannelPipeline 的 API 还公开了用于调用入站和出站操作的附加方法。

3.3.5 ChannelHandlerContext

  • ChannelHandlerContext 代表了 ChannelHandlerChannelPipeline 之间的关联

    • 每当 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext
    • 它的主要作用类似 LinkedList 内部的 Node 类,用于维护 ChannelHandler 在双向链表中的前后指针(prenext);
    • 同时,它还提供了很多方法,比如让事件从当前 ChannelHandler 传递给链中的下一个 ChannelHandler,获取底层的 Channel,以及用于写出站数据等;

    在这里插入图片描述

  • ChannelHandlerContext 有很多方法也存在于 ChannelChannelPipeline 上,但存在重要不同;

    • 调用 Channel 或者 ChannelPipeline 上的这些方法,事件将沿着整个 ChannelPipeline 进行传播;
    • 而调用 ChannelHandlerContext 上的相同方法,事件则从当前所关联的 ChannelHandler 开始,且只会传播给位于该 ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的 ChannelHandler

    在这里插入图片描述

    • 举例来说,服务器收到对端报文,解压后解密失败需应答;
      • 若因加密算法不一致,应答报文以明文压缩格式发送,可在解密 handler 中用 ctx.write,应答报文只经过压缩 Handler 发往对端;
      • 其他情况,应答报文以加密和压缩格式发送,可在解密 handler 中用 channel.write() 或者 channelpipeline.write(),应答报文会流经整个出站处理过程;

    在这里插入图片描述

3.3.6 ChannelHandlerContext 的API

  • 资源与连接相关
    • alloc:返回和实例相关联的 Channel 所配置的 ByteBufAllocator
    • bind:绑定到给定的 SocketAddress 并返回 ChannelFuture
    • channel:返回绑定到实例的 Channel
    • close:关闭 Channel 并返回 ChannelFuture
    • connect:连接给定的 SocketAddress 并返回 ChannelFuture
    • deregister:从之前分配的 EventExecutor 注销并返回 ChannelFuture
    • disconnect:从远程节点断开并返回 ChannelFuture
  • 执行器与处理器相关
    • executor:返回调度事件的 EventExecutor
    • handler:返回绑定到实例的 ChannelHandler
    • isRemoved:判断所关联的 ChannelHandler 是否已从 ChannelPipeline 中移除,若移除则返回 true
    • name:返回实例的唯一名称
    • pipeline:返回实例所关联的 ChannelPipeline
  • 事件触发相关
    • fireChannelActive:触发对下一个 ChannelInboundHandler 上的 channelActive() 方法(已连接)的调用
    • fireChannelInactive:触发对下一个 ChannelInboundHandler 上的 channelInactive() 方法(已关闭)的调用
    • fireChannelRead:触发对下一个 ChannelInboundHandler 上的 channelRead() 方法(已接收的消息)的调用
    • fireChannelReadComplete:触发对下一个 ChannelInboundHandler 上的 channelReadComplete() 方法的调用
    • fireChannelRegistered:触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered() 方法的调用
    • fireChannelUnregistered:触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered() 方法的调用
    • fireChannelWritabilityChanged:触发对下一个 ChannelInboundHandler 上的 fireChannelWritabilityChanged() 方法的调用
    • fireExceptionCaught:触发对下一个 ChannelInboundHandler 上的 fireExceptionCaught(Throwable) 方法的调用
    • fireUserEventTriggered:触发对下一个 ChannelInboundHandler 上的 fireUserEventTriggered(Object evt) 方法的调用
  • 读写相关
    • read:将数据从 Channel 读取到第一个入站缓冲区,读取成功则触发 channelRead 事件,并在最后一个消息被读取完成后通知 ChannelInboundHandlerchannelReadComplete(ctx) 方法
    • write:通过实例写入消息并经过 ChannelPipeline
    • writeAndFlush 通过实例写入并冲刷消息并经过 ChannelPipeline
  • 使用要点
    • ChannelHandlerContextChannelHandler 之间的关联(绑定)永远不会改变,所以缓存对它的引用是安全的;
    • 对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应尽可能利用这个特性来获得最大的性能。

3.4 ChannelHandler

3.4.1 ChannelHandler 接口

  • 从应用程序开发角度看,ChannelHandler 是 Netty 的主要组件,充当处理入站和出站数据的应用程序逻辑容器,其方法由网络事件触发,可用于多种动作,如数据格式转换(编解码)、处理转换异常等;
  • 重要子接口
    • ChannelInboundHandler
      • 常被实现的子接口,接收入站事件和数据,这些数据会被应用程序业务逻辑处理;
      • 也可从该接口直接冲刷数据输出到对端,应用程序业务逻辑通常在一个或多个 ChannelInboundHandler 中实现;
    • ChannelOutboundHandler:处理出站数据且允许拦截所有操作。

3.4.2 ChannelInboundHandler 接口

  • 下面列出了该接口的生命周期方法,这些方法在数据被接收或对应 Channel 状态变化时调用,与 Channel 生命周期密切相关:

    • channelRegistered:当 Channel 已注册到它的 EventLoop 且能够处理 I/O 时被调用

    • channelUnregistered:当 Channel 从它的 EventLoop 注销且无法处理任何 I/O 时被调用

    • channelActive:当 Channel 处于活动状态时被调用,此时 Channel 已连接/绑定且就绪

    • channelInactive:当 Channel 离开活动状态且不再连接它的远程节点时被调用

    • channelReadComplete:当 Channel 上的一个读操作完成时被调用

    • channelRead:当从 Channel 读取数据时被调用

    • ChannelWritabilityChanged:当 Channel 的可写状态发生改变时被调用,可通过 ChannelisWritable() 方法检测可写性,与可写性相关的阈值可通过 Channel.config().setWriteHighWaterMark()Channel.config().setWriteLowWaterMark() 方法设置

    • userEventTriggered:当 ChannelInboundHandler.fireUserEventTriggered() 方法被调用时被调用

  • 注意:channelReadCompletechannelRead 两者区别后续会解释。

3.4.3 ChannelOutboundHandler 接口

  • 出站操作和数据由 ChannelOutboundHandler 处理,其方法会被 ChannelChannelPipeline 以及 ChannelHandlerContext 调用,自身定义的方法如下:
    • bind(ChannelHandlerContext, SocketAddress, ChannelPromise):当请求将 Channel 绑定到本地地址时被调用

    • connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise):当请求将 Channel 连接到远程节点时被调用

    • disconnect(ChannelHandlerContext, ChannelPromise):当请求将 Channel 从远程节点断开时被调用

    • close(ChannelHandlerContext, ChannelPromise):当请求关闭 Channel 时被调用

    • deregister(ChannelHandlerContext, ChannelPromise):当请求将 Channel 从它的 EventLoop 注销时被调用

    • read(ChannelHandlerContext):当请求从 Channel 读取更多的数据时被调用

    • flush(ChannelHandlerContext):当请求通过 Channel 将入队数据冲刷到远程节点时被调用

    • write(ChannelHandlerContext, Object, ChannelPromise):当请求通过 Channel 将数据写到远程节点时被调用

3.4.4 ChannelHandler的适配器

  • 存在一些适配器类,可降低编写自定义 ChannelHandler 的工作量,因为它们提供了对应接口中所有方法的默认实现。由于有时会忽略不感兴趣的事件,Netty 提供了抽象基类 ChannelInboundHandlerAdapter(处理入站)和 ChannelOutboundHandlerAdapter(处理出站);

  • 可使用 ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 类作为自己 ChannelHandler 的起始点,这两个适配器分别提供了 ChannelInboundHandlerChannelOutboundHandler 的基本实现,通过扩展抽象类 ChannelHandlerAdapter,它们获得了共同超接口 ChannelHandler 的方法;

    在这里插入图片描述

3.4.5 OutboundHandler的read方法

  • ChannelOutboundHandler 虽处理出站事件,但其中的 read 方法并非表示读数据,而是业务发出读数据的要求,该要求会封装为出站事件传播,触发 ChannelOutboundHandler 中的 read 方法;
  • 若 Handler 既要处理入站又要处理出站事件:
    • 可使用 ChannelDuplexHandler 类;
    • 也可同时实现 ChannelOutboundHandlerChannelInboundHandler 接口,但相对麻烦。

3.4.6 Handler的共享和并发安全性

  • ChannelHandlerAdapter 提供 isSharable() 方法,若实现被标注为 Sharable,该方法返回 true,表示可添加到多个 ChannelPipeline
  • 每个 SocketChannel 有自己的 pipeline,且每个 SocketChannel 与线程绑定,所以新创建的 Handler 实例之间完全独立,只要不共享全局变量,实例是线程安全的;
  • 业务若需多个 SocketChannel 共享一个 Handler 实例(如统计服务器接收和发出的业务报文总数),可实现 MessageCountHandler,并使用 Netty 的 @Sharable 注解,安装时共用一个实例。由于实例共享,实现统计功能时需注意线程安全,可使用 Java 并发编程里的 Atomic 类保证。

3.4.7 资源管理和SimpleChannelInboundHandler

  • NIO 中接收和发送网络数据通过创建 Buffer,应用程序业务与 Channel 间通过 Buffer 交换数据;

  • Netty 处理网络数据也需 Buffer,读网络数据时由 Netty 创建 Buffer,写网络数据时 Buffer 常由业务方创建,Buffer 用完后必须释放,否则可能内存泄漏;

  • Netty 中 Buffer 的自动释放

    • 写网络数据时,若数据被成功写往网络,Netty 会自动释放 Buffer,因为 Netty 会在 pipeline 中安装 HeadContextTailContext 两个 Handler,HeadContext 同时处理出站和入站事件,负责出站 Buffer 的释放;

    • 但如果有 OutboundHandler 处理(重写/拦截)write() 操作并丢弃数据,未继续往下写,需手动调用 ReferenceCountUtil.release 方法释放 Buffer,否则可能内存泄漏;

    • 读网络数据时,若每个 InboundHandler 都把数据往后传递(调用相关 fireChannelRead 方法),Netty 会自动释放 Buffer(由 TailContext 负责);若 InboundHandler 处理数据后不继续传递且不调用 ReferenceCountUtil.release 方法,可能内存泄漏;

      在这里插入图片描述

      在这里插入图片描述

      在这里插入图片描述

  • SimpleChannelInboundHandler:因消费入站数据是常规任务,Netty 提供 SimpleChannelInboundHandlerChannelInboundHandler 的实现),它会在数据被 channelRead0() 方法消费之后自动释放数据。系统提供的各种预定义 Handler 实现都正确处理了数据,所以自行编写业务 Handler 时,要么继续传递数据,要么自行释放;

    在这里插入图片描述

3.5 内置通信传输模式

  • NIO。对应包为 io.netty.channel.socket.nio,使用 java.nio.channels 包作为基础,采用基于选择器的方式实现非阻塞 I/O 操作,是 Netty 中较为常用的传输模式之一;

  • Epoll

    • 对应包为 io.netty.channel.epoll,由 JNI(Java Native Interface,Java 本地接口)驱动的 epoll() 和非阻塞 I/O 实现;

    • 仅在 Linux 系统上可用,支持多种 Linux 特有的特性(如 SO_REUSEPORT),相比 NIO 传输速度更快,且是完全非阻塞的;

      • 使用时需要将 NioEventLoopGroup 替换为 EpollEventLoopGroup,将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class
    • 由于使用了 JNI,还需要额外安装相关的 so 库(如 libnetty_transport_native_epoll_x86_64.so 等),并放在 Netty 的 jar 包下;

      在这里插入图片描述

  • OIO:对应包为 io.netty.channel.socket.oio,使用 java.net 包作为基础,采用阻塞流的方式进行 I/O 操作,适用于对性能要求不高或需要兼容旧代码的场景;

  • Local。对应包为 io.netty.channel.local,用于在 VM(虚拟机)内部通过管道进行通信的本地传输,适用于同一虚拟机内不同组件之间的通信;

  • Embedded。对应包为 io.netty.channel.embedded,是一种嵌入式传输,允许使用 ChannelHandler 但不需要真正基于网络的传输,在测试 ChannelHandler 实现时非常有用,方便进行单元测试等操作。

3.6 引导类BootstrapServerBootstrap

  • 在网络编程里,“服务器”和“客户端”代表不同网络行为,即监听传入连接还是建立到其他进程的连接。由此有两种引导类:

    • Bootstrap:用于客户端,作用是连接到远程主机和端口;

    • ServerBootstrap:用于服务器,作用是绑定到一个本地端口,因为服务器需要监听连接;

  • 两者的对比

    对比项BootstrapServerBootstrap
    网络编程中的作用连接到远程主机和端口绑定到一个本地端口
    EventLoopGroup 的数目12(也可使用同一个实例)
  • 服务器需要两组不同的 Channel

    • 第一组包含一个 ServerChannel,代表服务器自身已绑定到本地端口、正在监听的套接字;

    • 第二组包含所有已创建的、用于处理传入客户端连接的 Channel(每个已接受的连接对应一个);

    • ServerChannel 相关联的 EventLoopGroup 会分配一个 EventLoop,负责为传入连接请求创建 Channel;一旦连接被接受,第二个 EventLoopGroup 会给对应的 Channel 分配一个 EventLoop

    在这里插入图片描述

3.7 Channellnitializer

  • Netty 提供了一个特殊的 ChannelInboundHandlerAdapter 子类:

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
    
  • 它定义了下面这个核心方法:

    protected abstract void initChannel(C ch) throws Exception;
    
    • 这个方法提供了一种将多个 ChannelHandler 添加到一个 ChannelPipeline 中的简便方法;
    • 只需要向 BootstrapServerBootstrap 的实例提供 ChannelInitializer 实现,一旦 Channel 被注册到它的 EventLoop 之后,就会调用 initChannel() 方法;
    • 在该方法返回之后,ChannelInitializer 的实例将会从 ChannelPipeline 中移除它自己;
  • 应用场景:

    • 在自己的应用程序中,如果存在某个 handler 只使用一次的情况,也可以仿造 ChannelInitializer,用完以后将自己从 ChannelPipeline 中移除自己;
    • 例如授权 handler,某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权 handler 移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权 handler 依然会被安装到 ChannelPipeline,依然会进行授权检查。

3.8 ChannelOption

  • ChannelOption 的各种属性在套接字选项中都有对应,用于对 Netty 中的网络连接进行精细配置。

3.8.1 ChannelOption.SO_BACKLOG

  • 对应 tcp/ip 协议 listen 函数中的 backlog 参数;

    • 服务端处理客户端连接请求是顺序进行的,同一时间只能处理一个客户端连接,多个客户端连接请求到来时,服务端会将无法处理的连接请求放入队列等待;
    • 操作系统通常有两个队列:ACCEPT 队列(保存已完成 TCP 三次握手的连接)和 SYN 队列(服务器正在等待 TCP 三次握手完成的队列);
  • 在 BSD 派生系统里,backlogSYN 队列的大小;在 Linux 中,不同内核版本对 backlog 的定义较模糊,有些版本指 ACCEPT 队列与 SYN 队列合起来的大小,有些指 SYN 队列的大小;

  • 从 Linux 2.2 开始,backlog 指定等待接受的完全建立的套接字的队列长度,而非不完整的连接请求数量。不完整套接字队列的最大长度可通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 设置,默认值为 128;

  • backlog 参数大于 /proc/sys/net/core/somaxconn 中的值,会被静默截断为该值(默认 128)。在 2.4.25 之前的内核中,此限制是硬编码值,后续内核版本可通过修改 /etc/sysctl.conf(包括 tcp_max_syn_backlog),再用 sysctl -p 命令生效。

3.8.2 ChannelOption.SO_REUSEADDR

  • 对应套接字选项中的 SO_REUSEADDR,表示允许重复使用本地地址和端口
    • 例如,可让多网卡(IP)绑定相同端口;也可解决某进程非正常退出后,其占用端口需一段时间才能被其他进程使用的问题,若不设置 SO_REUSEADDR,该端口无法正常被其他进程使用;
    • 注意:此参数无法让应用绑定完全相同的 IP + Port 来重复启动。

3.8.3 ChannelOption.SO_KEEPALIVE

  • 对应套接字选项中的 SO_KEEPALIVE,用于设置 TCP 连接;
  • 设置该选项后,连接会测试链路状态,适用于长时间无数据交流的连接。若两小时内无数据通信,TCP 会自动发送一个活动探测数据报文。

3.8.4 ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF

  • ChannelOption.SO_SNDBUF 对应套接字选项中的 SO_SNDBUFChannelOption.SO_RCVBUF 对应 SO_RCVBUF
  • 这两个参数用于操作接收缓冲区和发送缓冲区的大小。接收缓冲区用于保存网络协议栈收到的数据,直到应用程序读取成功;发送缓冲区用于保存发送数据,直到发送成功。

3.8.5 ChannelOption.SO_LINGER

  • 对应套接字选项中的 SO_LINGER
  • Linux 内核默认处理方式是,当用户调用 close() 方法时,函数返回,会尽量发送数据,但不一定保证剩余数据都能发送,导致数据不确定性;
  • 使用 SO_LINGER 可阻塞 close() 的调用时间,直到数据完全发送。

3.8.6 ChannelOption.TCP_NODELAY

  • 对应套接字选项中的 TCP_NODELAY,与 Nagle 算法有关;
  • Nagle 算法会将小数据包组装为更大的帧后发送,而非输入一次发送一次,虽能有效提高网络有效负载,但会造成延时;
  • TCP_NODELAY 的作用是禁止使用 Nagle 算法,适用于小数据即时传输;
  • TCP_NODELAY 相对应的是 TCP_CORK,该选项需等到发送数据量最大时一次性发送数据,适用于文件传输。

3.9 ByteBuf

3.9.1 简介

  • API 优点
    • 可被用户自定义的缓冲区类型扩展
    • 通过内置复合缓冲区类型实现透明零拷贝
    • 容量按需增长,类似 JDK 的 StringBuilder
    • 读写模式切换无需调用 ByteBufferflip() 方法
    • 读写使用不同索引
    • 支持方法链式调用
    • 支持引用计数
    • 支持池化
  • ByteBuf 了维护两个不同索引,以 readwrite 开头的方法会推进对应索引,以 setget 开头的操作则不会;
  • 若读取字节直到 readerIndex 等于 writerIndex,会到达可读取数据末尾,继续读取会触发 IndexOutOfBoundsException
  • 可指定 ByteBuf 最大容量,写索引(writerIndex)超过会触发异常,默认限制为 Integer.MAX_VALUE

3.9.2 使用模式

  • 堆缓冲区:最常用模式,数据存储在 JVM 堆空间,称为支撑数组(backing array),无池化时分配和释放快速,通过 hasArray() 判断是否由数组支撑,否则为直接缓冲区;
  • 直接缓冲区:另一种 ByteBuf 模式,缺点是分配和释放比堆缓冲区昂贵;
  • 复合缓冲区(CompositeByteBuf:为多个 ByteBuf 提供聚合视图。例如 HTTP 协议的消息头和消息体,由不同模块产生的 ByteBuf,可在发送时聚合为 CompositeByteBuf,用统一 ByteBuf API 操作。

3.9.3 分配

  • ByteBufAllocator 接口:Netty 通过 ByteBufAllocator 接口分配任意类型的 ByteBuf 实例,不同方法功能如下:
    • buffer():返回基于堆或直接内存存储的 ByteBuf
    • heapBuffer():返回基于堆内存存储的 ByteBuf
    • directBuffer():返回基于直接内存存储的 ByteBuf
    • compositeBuffer():返回可通过添加最多指定数目的基于堆或直接内存存储的缓冲区来扩展的 CompositeByteBuf
    • ioBuffer():返回用于套接字 I/O 操作的 ByteBuf,运行环境有 sun.misc.Unsafe 支持时返回基于直接内存的 ByteBuf,否则返回基于堆内存的 ByteBuf;指定使用 PreferHeapByteBufAllocator 时,只返回基于堆内存的 ByteBuf
  • 获取方式:可通过 Channel(每个 Channel 可有不同 ByteBufAllocator 实例)或绑定到 ChannelHandlerChannelHandlerContext 获取 ByteBufAllocator 引用,例如 ctx.alloc().buffer()
  • ByteBufAllocator 实现
    • PooledByteBufAllocator:池化 ByteBuf 实例,提升性能并减少内存碎片,Netty4.1 默认使用;
    • UnpooledByteBufAllocator:不池化 ByteBuf 实例,每次调用返回新实例;
  • Unpooled 工具类:提供静态辅助方法创建未池化 ByteBuf 实例,方法包括:
    • buffer():返回未池化的基于堆内存的 ByteBuf
    • directBuffer():返回未池化的基于直接内存的 ByteBuf
    • wrappedBuffer():返回包装给定数据的 ByteBuf
    • copiedBuffer():返回复制给定数据的 ByteBuf
    • 该类也可用于不需要 Netty 其他组件的非网络项目

3.9.4 随机访问索引/顺序访问索引/读写操作

  • 索引基础ByteBuf 索引从零开始,第一个字节索引为 0,最后一个为 capacity() - 1。使用需索引参数(随机访问,即数组下标)的方法访问数据,不会改变 readerIndexwriterIndex,也可手动调用 readerIndex(index)writerIndex(index) 移动索引;
  • 读/写操作类别
    • get()set() 操作:从给定索引开始,保持索引不变,get 可加数据字长(如 boolbyteintshortlongbytes);
    • read()write() 操作:从给定索引开始,会根据已访问字节数调整索引;
  • 更多操作
    • isReadable():至少有一个字节可读时返回 true
    • isWritable():至少有一个字节可写时返回 true
    • readableBytes():返回可读取的字节数
    • writableBytes():返回可写入的字节数
    • capacity():返回 ByteBuf 可容纳的字节数,之后会尝试扩展直到达到 maxCapacity()
    • maxCapacity():返回 ByteBuf 可容纳的最大字节数
    • hasArray()ByteBuf 由字节数组支撑时返回 true
    • array()ByteBuf 由字节数组支撑时返回该数组,否则抛出 UnsupportedOperationException 异常

3.9.5 字节分段与索引管理

  • 可丢弃字节

    • 包含已被读过的字节,调用 discardReadBytes() 可丢弃并回收空间,其初始大小为 0,存储在 readerIndex 中,随 read 操作增加(get* 操作不移动 readerIndex);
    • 调用该方法后,可丢弃字节分段空间变为可写,但频繁调用可能导致内存复制(需将可读字节移到缓冲区开始位置),建议仅在内存宝贵时使用;

    在这里插入图片描述

  • 可读字节ByteBuf 的可读字节分段存储实际数据,新分配、包装或复制的缓冲区默认 readerIndex 为 0;

  • 可写字节:是拥有未定义内容、写入就绪的内存区域,新分配缓冲区默认 writerIndex 为 0,以 write 开头的操作从当前 writerIndex 开始写数据,并增加已写字节数;

    在这里插入图片描述

  • 索引管理

    • 可通过 markReaderIndex()markWriterIndex()resetWriterIndex()resetReaderIndex() 标记和重置 readerIndexwriterIndex
    • 也可通过 readerIndex(int)writerIndex(int) 将索引移到指定位置,设置无效位置会触发 IndexOutOfBoundsException
    • 调用 clear() 可将 readerIndexwriterIndex 设为 0,但不清除内存内容。

3.9.6 查找、派生与工具类

  • 查找操作ByteBuf 中有多种确定指定值索引的方法,简单的用 indexOf(),复杂的可调用 forEachByte(),例如查找回车符(\r)可通过 buffer.forEachByte(ByteBufProcessor.FIND_CR) 实现;
  • 派生缓冲区:为 ByteBuf 提供内容视图,通过 duplicate()slice()slice(int, int)Unpooled.unmodifiableBuffer(…)order(ByteOrder)readSlice(int) 等方法创建,每个方法返回新 ByteBuf 实例,有自己的读、写和标记索引,内部存储与 JDK 的 ByteBuffer 一样是共享的。若需现有缓冲区真实副本,用 copy()copy(int, int) 方法,返回的 ByteBuf 拥有独立数据副本;
  • 引用计数:是优化内存使用和性能的技术,当对象资源不再被其他对象引用时释放资源,Netty 4 为 ByteBuf 引入引用计数技术,通过 interface ReferenceCounted 实现;
  • 工具类(ByteBufUtil
    • 提供操作 ByteBuf 的静态辅助方法,与池化无关,在分配类外部实现;
    • 其中 hexdump() 方法以十六进制形式打印 ByteBuf 内容,便于调试记录;equals(ByteBuf, ByteBuf) 方法用于判断两个 ByteBuf 实例的相等性。

3.9.7 资源释放

  • ChannelInboundHandler 相关
    • 当重写 channelRead() 方法时,需显式释放与池化 ByteBuf 实例相关的内存,Netty 提供 ReferenceCountUtil.release() 方法;
    • Netty 会用 WARN 级日志记录未释放资源,方便发现违规实例;
    • 更简单的方式是使用 SimpleChannelInboundHandler,它会自动释放资源;
  • 入站请求:Netty 的 EventLoop 在处理 Channel 读操作时分配的 ByteBuf,需自行释放,有三种方式:
    • 使用 SimpleChannelInboundHandler
    • 在重写 channelRead() 方法时使用 ReferenceCountUtil.release()
    • 在重写 channelRead() 方法时使用 ctx.fireChannelRead 继续向后传递;
  • 出站请求:不管 ByteBuf 是否由业务创建,调用 writewriteAndFlush 方法,且无额外 OutboundHandler 重写 writewriteAndFlush 方法时,Netty 会自动释放,无需业务代码自行释放。

4 TCP 的粘包/半包

4.1 什么是粘包/半包?

  • 在 TCP 通信中,客户端向服务端发送数据时,由于 TCP 是面向流的协议,没有消息边界的概念,服务端一次读取到的字节数不确定,就可能出现粘包半包的情况;

  • 假设客户端分别发送两个数据包 D1 和 D2 给服务端,可能存在以下几种情况:

    • 无粘包和拆包:服务端分两次读取到两个独立的数据包,分别是 D1 和 D2

    • TCP 粘包:服务端一次接收到了两个数据包,D1 和 D2 粘合在一起

    • TCP 拆包:服务端分两次读取到两个数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容

    • 混合情况:服务端分两次读取到两个数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余内容 D1_2 和 D2 包的整包

    • 多次拆包(可能情况):若服务端 TCP 接收滑窗非常小,而数据包 D1 和 D2 比较大,服务端可能分多次才能将 D1 和 D2 包接收完全,期间发生多次拆包

    在这里插入图片描述

4.2 粘包/半包的产生原因

  • TCP 粘包发生的原因

    • 由于 TCP 是面向连接的可靠协议(基于三次握手机制),客户端与服务器会维持一个连接(Channel),数据在连接不断开时,可持续发送多个数据包到服务器;

    • 若发送的网络数据包太小,会启用 Nagle 算法(可配置是否启用),对较小数据包进行合并(基于此,TCP 网络延迟比 UDP 高),之后再发送(超时或包大小足够时)。这会导致服务器接收到消息(数据流)时,无法区分哪些数据包是客户端分开发送的,从而产生粘包;

    • 服务器接收到数据后放到缓冲区,若消息未被及时从缓冲区取走,下次取数据时可能一次取出多个数据包,造成粘包现象;

    UDP 无粘包的原因:UDP 是无连接的不可靠传输协议(适合频繁发送较小数据包),不会对数据包进行合并发送(无 Nagle 算法),一端发送什么数据就直接发出,每个数据包都是完整的(数据 + UDP 头 + IP 头等每次发送都封装一次),所以没有粘包问题;

  • TCP 半包(分包,即一个数据包被分成多次接收)产生的原因

    • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小;

    • 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写,是 TCP 报文段中数据字段的最大长度,数据字段加上 TCP 首部才等于整个 TCP 报文段,即MSS=TCP报文段长度−TCP首部长度\text{MSS} = \text{TCP报文段长度} - \text{TCP首部长度}MSS=TCP报文段长度TCP首部长度,并非 TCP 报文段的最大长度。

4.3 解决方案

  • 由于底层 TCP 无法理解上层业务数据,无法保证数据包不被拆分和重组,需通过上层应用协议栈设计解决,主流方案有以下三种:

  • 包尾增加分割符:在数据包尾部增加特定分割符(如回车换行符)来分割不同数据包,例如 FTP 协议;

  • 消息定长:每个报文设置为固定长度(如 200 字节),若数据不足,用空格等空位补齐;

  • 消息分消息头和消息体:将消息分为消息头和消息体,消息头中包含表示消息总长度(或消息体长度)的字段,通常设计为消息头的第一个字段用 int32 表示消息总长度,可使用 LengthFieldBasedFrameDecoder

4.4 channelRead VS channelReadComplete

  • channelRead 方法:Netty 是在读到完整的业务请求报文后才调用一次业务 ChannelHandlerchannelRead 方法,无论这条报文底层经过了几次 SocketChannelread 调用;

  • channelReadComplete 方法:不是在业务语义上的读取消息完成后被触发,而是在每次从 SocketChannel 成功读到消息后,由系统触发。也就是说,如果一个业务消息被 TCP 协议栈发送了 $ N $ 次,那么服务端的 channelReadComplete 方法就会被调用 $ N $ 次;

  • 代码示例与验证

    • 下面代码中客户端发送较小报文 5 次,通过循环构造 ByteBuf 并调用 ctx.writeAndFlush(msg) 发送;

      在这里插入图片描述

    • 服务端输出显示:

      • channelRead 执行次数和客户端发送报文数一致(共 5 次);
      • channelReadComplete 执行了 3 次,其执行次数和客户端发送报文数无直接关联,也无明显规律;
      • 这进一步验证了 channelRead 是一个报文执行一次,而 channelReadComplete 是每次从 SocketChannel 成功读到消息就触发;

      在这里插入图片描述

5 编解码器框架

5.1 什么是编解码器?

  • 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式;

  • 如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列(即它的数据),那么:

    • 编码器:将消息转换为适合于传输的格式(最有可能的就是字节流),操作出站数据;

    • 解码器:将网络字节流转换回应用程序的消息格式,处理入站数据;

  • 另外,前面所学的解决粘包半包的内容,其实也是编解码器框架的一部分。

5.2 解码器

  • 解码器分类
    • ByteToMessageDecoder:将字节解码为消息(或另一个字节序列);
      • 由于无法确定远程节点是否一次性发送完整消息,该类会对入站数据进行缓冲,直到准备好处理;
      • 其核心方法是 decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out),这是必须实现的抽象方法。调用该方法时传入包含传入数据的 ByteBuf 和用于添加解码消息的 List,会重复调用直到确定没有新元素添加到 ListByteBuf 中无更多可读字节,若 List 不为空,内容会传递给 ChannelPipeline 中的下一个 ChannelInboundHandler
    • MessageToMessageDecoder:将一种消息类型解码为另一种(如 StringInteger)。核心方法是 decode(ChannelHandlerContext ctx, I msg, List<Object> out),对每个需解码为另一种格式的入站消息调用,解码后消息传递给 ChannelPipeline 中的下一个 ChannelInboundHandler,其中 T 代表源数据类型。
  • Netty 是异步框架,解码器需在内存中缓冲字节直到可解码,为避免解码器缓冲大量数据耗尽内存,Netty 提供**TooLongFrameException**,当帧超出指定大小限制时由解码器抛出。可设置最大字节数阈值,超出则抛出该异常(由 ChannelHandler.exceptionCaught() 方法捕获),异常处理取决于解码器用户,某些协议(如 HTTP)可返回特殊响应,其他情况可能需关闭连接。

5.3 编码器

  • 编码器分类

    • MessageToByteEncoder:将消息编码为字节。核心方法是 encode(ChannelHandlerContext ctx, I msg, ByteBuf out),需实现该抽象方法,调用时传入要编码为 ByteBuf 的出站消息(类型为 I),ByteBuf 随后转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
    • MessageToMessageEncoder:将消息编码为消息(如 Java 对象到 String 类型的 JSON 文本),T 代表源数据类型。核心方法是 encode(ChannelHandlerContext ctx, I msg, List<Object> out),每个通过 write() 方法写入的消息都会传递给该方法,编码为一个或多个出站消息,随后转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
  • 业务场景示例

    • 两端通过 JSON 加密通信时,发送端流程为 Java 对象经 MessageToMessageEncoder 转为 String 类型 JSON 文本,再经 MessageToByteEncoder 转为网络加密报文;
    • 接收端流程为网络加密报文经 ByteToMessageDecoder 转为 String 类型 JSON 明文,再经 MessageToMessageDecoder 转为 Java 对象,可将 ByteToMessageDecoder 看作一次解码器,MessageToMessageDecoder 看作二次或多次解码器;
    • MessageToByteEncoder 看作网络报文编码器,MessageToMessageEncoder 看作业务编码器。
  • 编解码器类

    • 我们通常将解码器和编码器作为单独的实体讨论,但有时在同一个类中管理入站和出站数据及消息的转换很有用。Netty 的抽象编解码器类可用于此目的,每个抽象编解码器类会捆绑一个解码器/编码器对,且同时实现了 ChannelInboundHandlerChannelOutboundHandler 接口;

    • 不优先使用复合编解码器类的原因:因为通过尽可能将解码和编码功能分开,能最大化代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则;

    • 相关的类:

      • 抽象类 ByteToMessageCodec

      • 抽象类 MessageToMessageCodec

5.4 实战:实现SSL/TLS和Web服务

  • Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了我们所要花费的的时间与精力。

5.4.1 HTTP 系列

  • HTTP 协议特点:基于请求/响应模式,客户端发 HTTP 请求,服务器返回 HTTP 响应。Netty 提供多种编解码器简化该协议使用;
  • HTTP 消息组成:一个 HTTP 请求/响应可能由多个数据部分组成,FullHttpRequestFullHttpResponse 分别代表完整的请求和响应,所有 HTTP 消息(如 FullHttpRequestLastHttpContent 等)都实现 HttpObject 接口;
  • HTTP 编解码器
    • HttpRequestEncoder:将 HttpRequestHttpContentLastHttpContent 消息编码为字节
    • HttpResponseEncoder:将 HttpResponseHttpContentLastHttpContent 消息编码为字节
    • HttpRequestDecoder:将字节解码为 HttpRequestHttpContentLastHttpContent 消息
    • HttpResponseDecoder:将字节解码为 HttpResponseHttpContentLastHttpContent 消息
    • HttpClientCodecHttpServerCodec:对请求和响应编解码做了组合,HTTP 客户端使用 HttpClientCodec,服务器端使用 HttpResponseEncoderHttpRequestDecoder
  • 聚合 HTTP 消息:因 HTTP 请求和响应可能由多部分组成,需聚合形成完整消息,Netty 提供 HttpObjectAggregator,可将多个消息部分合并为 FullHttpRequestFullHttpResponse 消息,保证看到完整消息内容;
  • HTTP 压缩:使用 HTTP 时建议开启压缩减小传输数据大小,虽会带来 CPU 开销,但对文本数据很有必要。Netty 为压缩和解压缩提供 ChannelHandler 实现,支持 gzipdeflate 编码。

5.4.2 通过 SSL/TLS 保护 Netty 应用程序

  • SSL/TLS 作用:作为安全协议,层叠在其他协议之上实现数据安全,不仅用于安全网站,还可用于安全 SMTP(SMTPS)邮件服务器、关系型数据库系统等非 HTTP 应用程序;
  • Java 与 Netty 对 SSL/TLS 的支持:Java 提供 javax.net.ssl 包,其中 SSLContextSSLEngine 类使加解密简单直接。Netty 通过 SslHandler(内部使用 SSLEngine 完成实际工作)利用该 API,且多数情况下 SslHandlerChannelPipeline 中的第一个 ChannelHandler
  • 使用 HTTPS:启用 HTTPS 只需将 SslHandler 添加到 ChannelPipelineChannelHandler 组合中;
  • 根据客户端决定 HTTPS:Netty 提供了一个 OptionalSslHandler,可根据客户端访问决定是否启用 SSL,其关键是能根据业务需求在运行时替换和删除 Handler,类似 Dubbo 源码中 Handler 在运行时的移除。

6 序列化问题

6.1 序列化的目的

  • Java 序列化主要有两个目的:
    • 网络传输:在远程跨进程服务调用时,需将被传输的 Java 对象编码为字节数组或 ByteBuffer 对象,远程服务读取后再解码为原 Java 对象,这就是 Java 对象编解码技术;

    • 对象持久化:将对象的状态保存下来,以便后续恢复。

6.2 Java 序列化

  • Java 序列化只是 Java 编解码技术的一种,由于存在诸多缺陷,衍生出了多种其他编解码技术和框架;

  • Java 序列化从 JDK1.1 版本就已提供,只需实现 java.io.Serializable 并生成序列 ID,因此诞生之初广泛应用。但在远程服务调用(RPC)时,很少直接用它进行消息编解码和传输,原因在于其存在以下缺点:

    • 无法跨语言:跨进程服务调用中,服务提供者可能用 C++ 或其他语言开发,而 Java 序列化是 Java 语言内部私有协议,其他语言不支持。Java 序列化后的字节数组,其他语言无法反序列化,严重阻碍了其在异构语言进程交互场景的应用;

    • 序列化后的码流太大

    • 序列化性能太低:JDK 默认的序列化机制,无论是序列化后的码流大小还是序列化性能,表现都很差,所以通常不会选择 Java 序列化作为远程跨节点调用的编解码框架。

6.3 如何选择序列化框架

6.3.1 选择要点

  • 跨语言支持:若项目有跨语言的硬性要求,即使某序列化框架性能极好,但只支持特定语言,也无法选择;

  • 空间:即编码后占用的空间大小;

  • 时间:指编解码的速度。空间和时间是对序列化框架的性能要求,二者存在矛盾,通常需追求平衡,若要编码后占用空间小,往往要花费更多编码时间;

  • 可读性:有些项目要求序列化后的数据人类可读,此时可选的框架不多,一般是 JSON 或 XML 格式,部分序列化框架支持通过自带工具观察序列化后的数据,也可考虑。

6.3.2 序列化框架比较

  • 跨语言通用性

    • JDK Serializer:只适用于 Java
    • FST:只适用于 Java
    • Kryo:主要适用于 Java(可复杂支持跨语言)
    • Protocol bufferThriftHessianAvro:支持多种语言
    • 还有 msgpack 也支持跨语言
  • 性能方面

    • 空间性能avrokryoHessian2fstProtocol buffer 表现不错;
    • 时间性能kryofstProtocol buffer 表现很好;
    • Msgpack 也是优秀的序列化框架,性能与 Protocol buffer 不相上下;

    实际对比过程可以参考:几种Java常用序列化框架的选型与对比-阿里云开发者社区。

6.3.3 内置序列化支持

  • Netty 内置了对 JBoss MarshallingProtocol Buffers 的支持。

6.3.4 集成第三方 MessagePack 实战与 LengthFieldBasedFrameDecoder 详解

6.3.4.1 LengthFieldBasedFrameDecoder 参数详解
  • maxFrameLength:表示包的最大长度
  • lengthFieldOffset:长度域的偏移量,即跳过指定个数的字节后才是长度域
  • lengthFieldLength:记录帧数据长度的字段本身的长度,也就是长度域的长度
  • lengthAdjustment:长度的修正值,可正可负。Netty 读取到数据包的长度值 NNN 后,认为接下来的 NNN 个字节都需读取,但实际情况可能需增加或减少 NNN 的值,具体增减量写在此参数中
  • initialBytesToStrip:从数据帧中跳过的字节数,即得到完整数据包后,丢弃该数据包中指定字节数,剩下的才是后续业务实际需要的业务数据
  • failFast:若为 true,读取到长度域且其值超过 maxFrameLength 时,抛出 TooLongFrameException;若为 false,只有真正读取完长度域值表示的字节后,才会抛出该异常。默认值为 true,建议不修改,否则可能造成内存溢出
6.3.4.2 具体案例
  • 案例 1:数据包大小为 14B(长度域 2B + “HELLO, WORLD”),长度域值为 12B(0x000C),希望解码后保持一致。参数设置为:lengthFieldOffset = 0lengthFieldLength = 2lengthAdjustment 无需调整,initialBytesToStrip = 0(解码过程不丢弃任何数据);

    在这里插入图片描述

  • 案例 2:数据包大小为 14B(长度域 2B + “HELLO, WORLD”),长度域值为 12B(0x000C),希望解码后丢弃长度域 2B 字段。参数设置为:lengthFieldOffset = 0lengthFieldLength = 2lengthAdjustment 无需调整,initialBytesToStrip = 2(解码过程丢弃 2 个字节数据);

    在这里插入图片描述

  • 案例 3:数据包大小为 14B(长度域 2B + “HELLO, WORLD”),长度域值为 14(0x000E),包含长度域本身长度,希望解码后保持一致。参数设置为:lengthFieldOffset = 0lengthFieldLength = 2lengthAdjustment = -2(因长度域为 14,报文内容为 12,需告诉 Netty 实际读取报文长度比长度域少 2),initialBytesToStrip = 0(解码过程不丢弃任何数据);

    在这里插入图片描述

  • 案例 4:数据包大小为 17B(Header 1 2B + 长度域 3B + “HELLO, WORLD”),长度域值为 12B(0x0000C),编码解码后长度保持一致。参数设置为:lengthFieldOffset = 2lengthFieldLength = 3lengthAdjustment = 0(无需调整),initialBytesToStrip = 0(解码过程不丢弃任何数据);

    在这里插入图片描述

  • 案例 5:Header 与长度域位置置换,总数据包长度为 17B(长度域 3B + Header 2B + “HELLO, WORLD”),长度域值为 12B(0x0000C),编码解码后长度保持一致。参数设置为:lengthFieldOffset = 0lengthFieldLength = 3lengthAdjustment = 2(因长度域为 12,报文内容为 12,需把 Header 值一起读取,实际读取报文内容长度比长度域多 2),initialBytesToStrip = 0(解码过程不丢弃任何数据);

    在这里插入图片描述

  • 案例 6:带有两个 header,HDR1 丢弃,长度域丢弃,只剩下第二个 header 和有效包体,总数据包长度为 16B(HDR1 1B + 长度域 2B + HDR2 1B + “HELLO, WORLD”),长度域值为 12B(0x000C)。参数设置为:lengthFieldOffset = 1(HDR1 的长度),lengthFieldLength = 2lengthAdjustment = 1(因长度域为 12,报文内容为 12,需把 HDR2 值一起读取,实际读取报文内容长度比长度域多 1),initialBytesToStrip = 3(丢弃 HDR1 和长度字段);

    在这里插入图片描述

  • 案例 7:带有两个 header,HDR1 丢弃,长度域丢弃,只剩下第二个 header 和有效包体,总数据包长度为 16B(HDR1 1B + 长度域 2B + HDR2 1B + “HELLO, WORLD”),长度域值为 16B(0x0010),HDR1 长度 1B,HDR2 长度 1B,包体长度 12B(1+1+2+12=16)。参数设置为:lengthFieldOffset = 1lengthFieldLength = 2lengthAdjustment = -3(因长度域为 16,实际读取报文内容长度比长度域少 3),initialBytesToStrip = 3(丢弃 HDR1 和长度字段);

    在这里插入图片描述

7 单元测试

7.1 如何进行单元测试?

  • EmbeddedChannel 介绍:是 Netty 专门为改进 ChannelHandler 的单元测试提供的特殊 Channel 实现。可将入站或出站数据写入 EmbeddedChannel,然后检查是否有数据到达 ChannelPipeline 尾端,以此确定消息是否被编码/解码,以及是否触发 ChannelHandler 动作;

  • 核心方法

    • writeInbound(Object... msgs):将入站消息写入 EmbeddedChannel,若能通过 readInbound() 读取数据,返回 true
    • readInbound():从 EmbeddedChannel 读取一个入站消息,返回的内容已穿越整个 ChannelPipeline,无可读数据时返回 null
    • writeOutbound(Object... msgs):将出站消息写入 EmbeddedChannel,若能通过 readOutbound() 读取数据,返回 true
    • readOutbound():从 EmbeddedChannel 读取一个出站消息,返回的内容已穿越整个 ChannelPipeline,无可读数据时返回 null
    • finish():将 EmbeddedChannel 标记为完成,若有可读取的入站或出站数据,返回 true,还会调用 EmbeddedChannelclose() 方法
  • 数据处理分工:入站数据由 ChannelInboundHandler 处理,代表从远程节点读取的数据;出站数据由 ChannelOutboundHandler 处理,代表将要写到远程节点的数据;

  • 测试逻辑:用 writeOutbound() 将消息写入 Channel,沿出站方向传递,再用 readOutbound() 读取已处理消息,验证结果是否符合预期;入站数据则用 writeInbound()readInbound() 方法。每种情况下,消息都会传递过 ChannelPipeline,并被相关 ChannelInboundHandlerChannelOutboundHandler 处理;

    在这里插入图片描述

7.2 测试入站消息

  • 有一个简单的 ByteToMessageDecoder 实现,给定足够数据会产生固定大小的帧,数据不足时会等待下一个数据块并再次检查。该特定解码器会产生固定为 3 字节大小的帧,可能需要多个事件提供足够字节数来生成一个帧。

7.3 测试出站消息

  • 测试的处理器 AbsIntegerEncoder 是 Netty 的 MessageToMessageEncoder 的特殊化实现,用于将负值整数转换为绝对值。持有 AbsIntegerEncoderEmbeddedChannel 会以 4 字节负整数形式写出站数据,编码器从传入的 ByteBuf 中读取每个负整数,调用 Math.abs() 获取绝对值,并将每个负整数的绝对值写到 ChannelPipeline 中;

    在这里插入图片描述

7.4 测试异常处理

  • 应用程序常需执行比转换数据更复杂的任务,比如处理格式不正确的输入或过量数据;

  • 若读取字节数超出特定限制,会抛出 TooLongFrameException,这是防范资源耗尽的常用方法。设定最大帧大小为 3 字节,若帧大小超出限制,程序会丢弃字节并抛出 TooLongFrameExceptionChannelPipeline 中的其他 ChannelHandler 可在 exceptionCaught() 方法中选择处理或忽略该异常;

    在这里插入图片描述

http://www.dtcms.com/a/453838.html

相关文章:

  • Java Caffeine 高性能缓存库详解与使用案例
  • 如何用凡科做自己的网站网站建设中的色彩搭配
  • RK3588:MIPI底层驱动学习——入门第五篇(一文梳理media、video、v4l-subdev关系)
  • 每日一个C语言知识:C 变量
  • 神秘迷宫探险 - 详细题解教程
  • VOCO摘要
  • 轻量级个人建站
  • DevDay 2025 开发者大会看点
  • 什么网站可以免费做视频软件做网站导航按钮怎么做
  • 网站建设的基本要素有专门做任务的网站
  • 十六、Linux网络基础理论 - OSI模型、TCP/IP协议与IP地址详解
  • WDF驱动开发-PNP和电源管理
  • 网站的标题标签一般是写在网站 未备案 支付宝
  • 募投绘蓝图-昂瑞微的成长密码与未来布局
  • 公司网络营销的方案思路seo整站优化外包服务
  • 工程建设公司网站怎么查询网站的域名备案
  • TypeScript 对比 JavaScript
  • 焦作网站设计公司自己怎么做外贸网站
  • ros2 消息订阅与发布示例 c++
  • 廊坊网站建设廊坊企业文化建设网站
  • 纸做的花朵成品网站个人简介html代码模板
  • 【精品资料鉴赏】证券数据治理项目投标技术方案
  • AI大模型核心概念
  • 企业网站模板seo公益建设网站的作用
  • 阿里巴巴1688怎么做网站自建网站阿里云备案通过后怎么做
  • 成都规划网站佛山网上办事大厅官网
  • AI-RAN Sionna 5G NR 开发者套件
  • 百度商桥怎么添加到网站山东网站
  • iis 里没有网站柯桥网站建设
  • 外汇返佣网站建设有了域名之后怎么做自己的网站