当前位置: 首页 > news >正文

springboot netty 服务端网络编程入门与实战

Spring Boot3 Netty 服务端项目地址

https://gitee.com/supervol/loong-springboot-study

(记得给个start,感谢)

Netty 概述

        Netty 是一款基于 Java NIO 的高性能、异步事件驱动的网络通信框架,广泛用于开发高并发、低延迟的网络服务(如 RPC 框架、WebSocket 服务、物联网协议服务等)。Spring Boot 3 作为主流的 Java 开发框架,提供了自动配置、依赖管理等便捷特性,二者结合可大幅简化高性能网络服务的开发流程。

Netty 核心

1. Netty 核心组件

        Netty 的核心设计围绕 “事件驱动” 和 “管道模式”,关键组件包括:

  • EventLoopGroup:线程组,负责管理 EventLoop(事件循环),分为两类:
    • BossGroup:仅处理客户端连接请求,将连接分配给 WorkerGroup。
    • WorkerGroup:处理已建立连接的IO 事件(如读、写、解码)。
  • Channel:网络通信的载体,代表一个 “连接”(如 NioServerSocketChannel 用于服务端监听,NioSocketChannel 用于客户端连接)。
  • ChannelPipeline:责任链模式的实现,存储所有 ChannelHandler,IO 事件会沿 Pipeline 传递并被 Handler 处理。
  • ChannelHandler:业务逻辑处理器,分为:
    • ChannelInboundHandler:处理入站事件(如接收客户端消息)。
    • ChannelOutboundHandler:处理出站事件(如向客户端发送消息)。
  • ByteBuf:Netty 自定义的字节缓冲区,替代 JDK 的 ByteBuffer,优化内存管理(如池化分配、零拷贝)。

2. Spring Boot 3 关键特性

  • JDK 最低要求:Spring Boot 3 需 JDK 17+,Netty 4.1.x 系列已兼容 JDK 17,无需额外适配。
  • 依赖管理:通过 spring-boot-starter-parent 可统一管理 Netty 版本,避免版本冲突。
  • 生命周期管理:需通过 Spring 生命周期钩子(如 CommandLineRunner@PreDestroy)控制 Netty 服务的启动与优雅关闭。

Netty 服务端示例

        请参考项目地址中 springboot-netty/springboot-netty-server 模块代码。

Netty 粘包与拆包

1. 粘包与拆包原因

        TCP 是流式协议,消息在传输过程中可能被拆分或合并,这两种情况都会导致接收方无法正确解析消息边界,必须通过协议设计解决。

  • 粘包:多个小消息被合并为一个数据包发送
  • 拆包:一个大消息被拆分为多个数据包发送

2. 粘包与拆包解决

(1)固定长度协议

        使用FixedLengthFrameDecoder,适用于消息长度固定的场景:

// 每个消息固定100字节
pipeline.addLast(new FixedLengthFrameDecoder(100));

(2)分隔符协议

        使用DelimiterBasedFrameDecoder,通过特殊字符标记消息结束:

// 定义分隔符(如换行符)
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
// 最大长度1024,超过则抛出TooLongFrameException
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

(3) 长度字段协议

        常用方案,LengthFieldBasedFrameDecoder通过消息中的长度字段确定消息边界,配置灵活,适用于大多数场景:

/*** 构造参数说明:* maxFrameLength: 最大帧长度,超过则丢弃* lengthFieldOffset: 长度字段偏移量* lengthFieldLength: 长度字段占用字节数* lengthAdjustment: 长度字段的偏移量矫正* initialBytesToStrip: 解码后跳过的字节数*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024,  // 最大帧长度1MB0,             // 长度字段从0字节开始4,             // 长度字段占4字节0,             // 长度调整为04              // 跳过长度字段本身
));

        对应的编码器LengthFieldPrepender用于在发送消息前添加长度字段:

// 为消息添加4字节的长度字段
pipeline.addLast(new LengthFieldPrepender(4));

Netty 编解码器

        Netty 的编解码器遵循 "入站解码、出站编码" 的设计原则,通常成对出现。

1. 解码器 (Decoder)

(1)ByteToMessageDecoder

        将字节流解码为 Java 对象,核心方法decode()需要手动处理拆包逻辑:

public class CustomDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 确保有足够的字节可读取(例如至少4字节的长度字段)if (in.readableBytes() < 4) {return; // 字节不足,等待更多数据}// 标记当前读指针位置in.markReaderIndex();// 读取长度字段int length = in.readInt();// 检查是否有足够的字节if (in.readableBytes() < length) {in.resetReaderIndex(); // 重置读指针return;}// 读取消息内容byte[] data = new byte[length];in.readBytes(data);// 解码为对象并添加到输出列表out.add(new String(data, CharsetUtil.UTF_8));}
}

(2)ReplayingDecoder

        可重放解码器,ByteToMessageDecoder的子类,通过虚拟缓冲区简化逻辑,无需手动检查字节数:

public class CustomReplayingDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 无需检查字节数,ReplayingDecoder会自动处理int length = in.readInt();byte[] data = new byte[length];in.readBytes(data);out.add(new String(data, CharsetUtil.UTF_8));}
}

2. 编码器 (Encoder)

(1)MessageToByteEncoder

public class CustomEncoder extends MessageToByteEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {// 写入长度字段byte[] data = msg.getBytes(CharsetUtil.UTF_8);out.writeInt(data.length);// 写入消息内容out.writeBytes(data);}
}

(2) 编解码器示例

        在ChannelInitializer中配置完整的编解码链路:

public class AdvancedChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 1. 粘包拆包处理pipeline.addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));pipeline.addLast(new LengthFieldPrepender(4));// 2. 日志处理器(方便调试)pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));// 3. 编解码器pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// 或者使用自定义编解码器// pipeline.addLast(new CustomDecoder());// pipeline.addLast(new CustomEncoder());// 4. 心跳检测(5秒读空闲,10秒写空闲,15秒读写空闲)pipeline.addLast(new IdleStateHandler(5, 10, 15, TimeUnit.SECONDS));// 5. 业务处理器pipeline.addLast(new AdvancedBusinessHandler());}
}

Netty 处理器

1. ChannelInitializer

        用于初始化新连接的ChannelPipeline,是 Netty 中连接处理的入口点:

  • 核心作用:为每个新连接配置处理器链
  • 注意事项
    • 完成初始化后会自动从 Pipeline 中移除
    • 必须保证线程安全,避免共享可变状态
    • 复杂场景可通过工厂模式创建处理器
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {private final SomeService someService; // 可注入Spring服务// 通过构造函数注入Spring管理的Beanpublic CustomChannelInitializer(SomeService someService) {this.someService = someService;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 根据连接属性动态添加处理器if (isSpecialClient(ch.remoteAddress())) {pipeline.addLast(new SpecialProtocolHandler());} else {pipeline.addLast(new StandardProtocolHandler());}// 添加共享的业务处理器pipeline.addLast(new BusinessHandler(someService));}private boolean isSpecialClient(SocketAddress address) {// 自定义逻辑判断return false;}
}

2. IdleStateHandler

        用于检测连接空闲状态,实现心跳机制:

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {// 心跳消息private static final String HEARTBEAT_MSG = "heartbeat";@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;// 读空闲:长时间未收到客户端消息if (event.state() == IdleState.READER_IDLE) {log.warn("客户端 {} 读空闲,关闭连接", ctx.channel().remoteAddress());ctx.close(); // 关闭连接}// 写空闲:长时间未向客户端发送消息else if (event.state() == IdleState.WRITER_IDLE) {log.info("向客户端 {} 发送心跳", ctx.channel().remoteAddress());ctx.writeAndFlush(HEARTBEAT_MSG); // 发送心跳}} else {super.userEventTriggered(ctx, evt);}}
}

3. SimpleChannelInboundHandler

  • 自动释放消息资源,适合处理特定类型的消息
  • 泛型指定消息类型,无需手动类型转换
  • 重写channelRead0()方法处理消息
public class StringMessageHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {log.info("收到消息:{}", msg);// 无需手动释放消息}
}

4. ChannelInboundHandlerAdapter

  • 更灵活,可处理多种类型消息
  • 需要手动释放消息资源(通过ReferenceCountUtil.release()
  • 重写channelRead()方法处理消息
public class GenericMessageHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {if (msg instanceof String) {handleStringMessage(ctx, (String) msg);} else if (msg instanceof ByteBuf) {handleByteBufMessage(ctx, (ByteBuf) msg);}} finally {// 手动释放资源ReferenceCountUtil.release(msg);}}private void handleStringMessage(ChannelHandlerContext ctx, String msg) {// 处理字符串消息}private void handleByteBufMessage(ChannelHandlerContext ctx, ByteBuf msg) {// 处理字节消息}
}

5. 其他常用处理器

  1. LoggingHandler:打印 IO 事件日志,方便调试

    pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
    
  2. SslHandler:实现 SSL/TLS 加密通信

    SslContext sslContext = SslContext.newServerContext(sslCert, sslKey);
    pipeline.addLast(sslContext.newHandler(ch.alloc()));
    
  3. ChannelDuplexHandler:同时处理入站和出站事件

    public class MetricsHandler extends ChannelDuplexHandler {private long startTime;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {startTime = System.currentTimeMillis();super.channelRead(ctx, msg);}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {long processingTime = System.currentTimeMillis() - startTime;log.info("消息处理耗时:{}ms", processingTime);super.write(ctx, msg, promise);}
    }
    

Netty 配置

        将 Netty 核心参数通过 Spring 配置文件管理:

netty:server:port: 8888boss-thread-count: 1worker-thread-count: 8backlog: 1024keep-alive: truetcp-nodelay: truesndbuf: 32768rcvbuf: 32768max-frame-length: 1048576idle-timeout:reader-idle-time: 30writer-idle-time: 60all-idle-time: 90

        对应的配置类:

@ConfigurationProperties(prefix = "netty.server")
public class NettyServerProperties {private int port = 8888;private int bossThreadCount = 1;private int workerThreadCount = Runtime.getRuntime().availableProcessors() * 2;private int backlog = 1024;private boolean keepAlive = true;private boolean tcpNodelay = true;private int sndbuf = 32768;private int rcvbuf = 32768;private int maxFrameLength = 1024 * 1024;private IdleTimeout idleTimeout = new IdleTimeout();// getters and setterspublic static class IdleTimeout {private int readerIdleTime = 30;private int writerIdleTime = 60;private int allIdleTime = 90;// getters and setters}
}

Netty 优化

  1. 处理器重用:无状态的处理器可以单例重用,减少对象创建开销

  2. 内存管理

    • 优先使用PooledByteBufAllocator
    • 避免频繁创建和销毁ByteBuf
    • 使用ReferenceCountUtil.release()确保资源释放
  3. 线程模型优化

    • 根据 CPU 核心数调整 EventLoopGroup 线程数
    • 避免在 IO 线程中执行耗时操作,可通过ctx.executor().execute()提交到其他线程池
  4. 流量控制

    • 使用ChannelConfig.setWriteBufferHighWaterMark()设置写缓冲区高水位线
    • 通过Channel.isWritable()检查是否可写,避免 OOM
  5. 监控与 metrics

    • 集成 Micrometer 监控连接数、消息吞吐量等指标
    • 监控 EventLoopGroup 的线程利用率和任务队列长度


文章转载自:

http://oj2lIzEq.brLgf.cn
http://ZzWIheGL.brLgf.cn
http://539JM6jJ.brLgf.cn
http://TOPXeZQJ.brLgf.cn
http://FTfLFzOc.brLgf.cn
http://5Ddq0y3g.brLgf.cn
http://mXjaYfcd.brLgf.cn
http://P7jJyEVQ.brLgf.cn
http://Rhgh3BcZ.brLgf.cn
http://lqPJgwUi.brLgf.cn
http://c7AKlrHN.brLgf.cn
http://LDocdLof.brLgf.cn
http://oaktln44.brLgf.cn
http://tJO2i2wR.brLgf.cn
http://qKK7r0Oh.brLgf.cn
http://xt12ykzw.brLgf.cn
http://0NqEVND2.brLgf.cn
http://OQycpTmF.brLgf.cn
http://CzlTkMVk.brLgf.cn
http://hX1acPXg.brLgf.cn
http://z0wJscbB.brLgf.cn
http://l80DQaZw.brLgf.cn
http://gISAOmlF.brLgf.cn
http://WZV5yYgY.brLgf.cn
http://iwLOMNF8.brLgf.cn
http://6nnDBuLH.brLgf.cn
http://S7pqe43J.brLgf.cn
http://p4qGydfy.brLgf.cn
http://YlG8fnSB.brLgf.cn
http://aUiBRQqz.brLgf.cn
http://www.dtcms.com/a/385305.html

相关文章:

  • 从零开始学AI——15
  • Linux C库函数的可重入与不可重入版本说明
  • ZooKeeper核心知识点总结:分布式系统的“协调者”
  • Unreal故障艺术之RGB颜色分离故障
  • 金融数据---东方财富人气榜-A股
  • 设计模式详解——创建型
  • Java 泛型与通配符全解析
  • Python变量与数据类型全解析:从命名规则到类型转换
  • 了解篇 | StarRocks 是个什么数据库?
  • 风险控制规则引擎:从敏捷开发工具到管理逻辑的承载者
  • 基于Matlab深度学习的植物叶片智能识别系统及其应用
  • AI编程从0-1开发一个小程序
  • Android原生的TextToSpeech,文字合成语音并播放
  • 【03】AI辅助编程完整的安卓二次商业实战-本地构建运行并且调试-二次开发改注册登陆按钮颜色以及整体资源结构熟悉-优雅草伊凡
  • 高德api使用
  • 工程造价指数指标分析:从数据采集到决策支撑的工程经济实践
  • 中控平台数据监控大屏
  • Vue 与 React 的区别?
  • 元图CAD:智能工程图纸解决方案的商业模型创新
  • MySQL 全量备份迁移步骤指南
  • 有关gitlab14.x版本在内网环境下无法添加webhooks的解决方法
  • O3.4 opencv摄像头跟踪
  • 数智管理学(五十二)
  • 121、【OS】【Nuttx】【周边】效果呈现方案解析:find 命令格式(上)
  • Python 3入门指南
  • I.MX6UL:EPIT
  • 企业数字化转型的 4A 架构指南:从概念解读到 TOGAF 阶段对应
  • Linux基础之部署mysql数据库
  • 【文献分享】空间互近邻关系在空间转录组学数据中的应用
  • 高精度、高带宽的磁角度传感器——MA600A