springboot netty 服务端网络编程入门与实战
Spring Boot3 Netty 服务端项目地址
https://gitee.com/supervol/loong-springboot-study
(记得给个start,感谢)
Netty 概述
Netty 是一款基于 Java NIO 的高性能、异步事件驱动的网络通信框架,广泛用于开发高并发、低延迟的网络服务(如 RPC 框架、WebSocket 服务、物联网协议服务等)。Spring Boot 3 作为主流的 Java 开发框架,提供了自动配置、依赖管理等便捷特性,二者结合可大幅简化高性能网络服务的开发流程。
Netty 核心
1. Netty 核心组件
Netty 的核心设计围绕 “事件驱动” 和 “管道模式”,关键组件包括:
- EventLoopGroup:线程组,负责管理 EventLoop(事件循环),分为两类:
BossGroup
:仅处理客户端连接请求,将连接分配给 WorkerGroup。WorkerGroup
:处理已建立连接的IO 事件(如读、写、解码)。
- Channel:网络通信的载体,代表一个 “连接”(如
NioServerSocketChannel
用于服务端监听,NioSocketChannel
用于客户端连接)。 - ChannelPipeline:责任链模式的实现,存储所有
ChannelHandler
,IO 事件会沿 Pipeline 传递并被 Handler 处理。 - ChannelHandler:业务逻辑处理器,分为:
ChannelInboundHandler
:处理入站事件(如接收客户端消息)。ChannelOutboundHandler
:处理出站事件(如向客户端发送消息)。
- ByteBuf:Netty 自定义的字节缓冲区,替代 JDK 的
ByteBuffer
,优化内存管理(如池化分配、零拷贝)。
2. Spring Boot 3 关键特性
- JDK 最低要求:Spring Boot 3 需 JDK 17+,Netty 4.1.x 系列已兼容 JDK 17,无需额外适配。
- 依赖管理:通过
spring-boot-starter-parent
可统一管理 Netty 版本,避免版本冲突。 - 生命周期管理:需通过 Spring 生命周期钩子(如
CommandLineRunner
、@PreDestroy
)控制 Netty 服务的启动与优雅关闭。
Netty 服务端示例
请参考项目地址中 springboot-netty/springboot-netty-server 模块代码。
Netty 粘包与拆包
1. 粘包与拆包原因
TCP 是流式协议,消息在传输过程中可能被拆分或合并,这两种情况都会导致接收方无法正确解析消息边界,必须通过协议设计解决。
- 粘包:多个小消息被合并为一个数据包发送
- 拆包:一个大消息被拆分为多个数据包发送
2. 粘包与拆包解决
(1)固定长度协议
使用FixedLengthFrameDecoder
,适用于消息长度固定的场景:
// 每个消息固定100字节
pipeline.addLast(new FixedLengthFrameDecoder(100));
(2)分隔符协议
使用DelimiterBasedFrameDecoder
,通过特殊字符标记消息结束:
// 定义分隔符(如换行符)
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
// 最大长度1024,超过则抛出TooLongFrameException
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
(3) 长度字段协议
常用方案,LengthFieldBasedFrameDecoder
通过消息中的长度字段确定消息边界,配置灵活,适用于大多数场景:
/*** 构造参数说明:* maxFrameLength: 最大帧长度,超过则丢弃* lengthFieldOffset: 长度字段偏移量* lengthFieldLength: 长度字段占用字节数* lengthAdjustment: 长度字段的偏移量矫正* initialBytesToStrip: 解码后跳过的字节数*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, // 最大帧长度1MB0, // 长度字段从0字节开始4, // 长度字段占4字节0, // 长度调整为04 // 跳过长度字段本身
));
对应的编码器LengthFieldPrepender
用于在发送消息前添加长度字段:
// 为消息添加4字节的长度字段
pipeline.addLast(new LengthFieldPrepender(4));
Netty 编解码器
Netty 的编解码器遵循 "入站解码、出站编码" 的设计原则,通常成对出现。
1. 解码器 (Decoder)
(1)ByteToMessageDecoder
将字节流解码为 Java 对象,核心方法decode()
需要手动处理拆包逻辑:
public class CustomDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 确保有足够的字节可读取(例如至少4字节的长度字段)if (in.readableBytes() < 4) {return; // 字节不足,等待更多数据}// 标记当前读指针位置in.markReaderIndex();// 读取长度字段int length = in.readInt();// 检查是否有足够的字节if (in.readableBytes() < length) {in.resetReaderIndex(); // 重置读指针return;}// 读取消息内容byte[] data = new byte[length];in.readBytes(data);// 解码为对象并添加到输出列表out.add(new String(data, CharsetUtil.UTF_8));}
}
(2)ReplayingDecoder
可重放解码器,ByteToMessageDecoder
的子类,通过虚拟缓冲区简化逻辑,无需手动检查字节数:
public class CustomReplayingDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 无需检查字节数,ReplayingDecoder会自动处理int length = in.readInt();byte[] data = new byte[length];in.readBytes(data);out.add(new String(data, CharsetUtil.UTF_8));}
}
2. 编码器 (Encoder)
(1)MessageToByteEncoder
public class CustomEncoder extends MessageToByteEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {// 写入长度字段byte[] data = msg.getBytes(CharsetUtil.UTF_8);out.writeInt(data.length);// 写入消息内容out.writeBytes(data);}
}
(2) 编解码器示例
在ChannelInitializer
中配置完整的编解码链路:
public class AdvancedChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 1. 粘包拆包处理pipeline.addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));pipeline.addLast(new LengthFieldPrepender(4));// 2. 日志处理器(方便调试)pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));// 3. 编解码器pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// 或者使用自定义编解码器// pipeline.addLast(new CustomDecoder());// pipeline.addLast(new CustomEncoder());// 4. 心跳检测(5秒读空闲,10秒写空闲,15秒读写空闲)pipeline.addLast(new IdleStateHandler(5, 10, 15, TimeUnit.SECONDS));// 5. 业务处理器pipeline.addLast(new AdvancedBusinessHandler());}
}
Netty 处理器
1. ChannelInitializer
用于初始化新连接的ChannelPipeline
,是 Netty 中连接处理的入口点:
- 核心作用:为每个新连接配置处理器链
- 注意事项:
- 完成初始化后会自动从 Pipeline 中移除
- 必须保证线程安全,避免共享可变状态
- 复杂场景可通过工厂模式创建处理器
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {private final SomeService someService; // 可注入Spring服务// 通过构造函数注入Spring管理的Beanpublic CustomChannelInitializer(SomeService someService) {this.someService = someService;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 根据连接属性动态添加处理器if (isSpecialClient(ch.remoteAddress())) {pipeline.addLast(new SpecialProtocolHandler());} else {pipeline.addLast(new StandardProtocolHandler());}// 添加共享的业务处理器pipeline.addLast(new BusinessHandler(someService));}private boolean isSpecialClient(SocketAddress address) {// 自定义逻辑判断return false;}
}
2. IdleStateHandler
用于检测连接空闲状态,实现心跳机制:
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {// 心跳消息private static final String HEARTBEAT_MSG = "heartbeat";@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;// 读空闲:长时间未收到客户端消息if (event.state() == IdleState.READER_IDLE) {log.warn("客户端 {} 读空闲,关闭连接", ctx.channel().remoteAddress());ctx.close(); // 关闭连接}// 写空闲:长时间未向客户端发送消息else if (event.state() == IdleState.WRITER_IDLE) {log.info("向客户端 {} 发送心跳", ctx.channel().remoteAddress());ctx.writeAndFlush(HEARTBEAT_MSG); // 发送心跳}} else {super.userEventTriggered(ctx, evt);}}
}
3. SimpleChannelInboundHandler
- 自动释放消息资源,适合处理特定类型的消息
- 泛型指定消息类型,无需手动类型转换
- 重写
channelRead0()
方法处理消息
public class StringMessageHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {log.info("收到消息:{}", msg);// 无需手动释放消息}
}
4. ChannelInboundHandlerAdapter
- 更灵活,可处理多种类型消息
- 需要手动释放消息资源(通过
ReferenceCountUtil.release()
) - 重写
channelRead()
方法处理消息
public class GenericMessageHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {if (msg instanceof String) {handleStringMessage(ctx, (String) msg);} else if (msg instanceof ByteBuf) {handleByteBufMessage(ctx, (ByteBuf) msg);}} finally {// 手动释放资源ReferenceCountUtil.release(msg);}}private void handleStringMessage(ChannelHandlerContext ctx, String msg) {// 处理字符串消息}private void handleByteBufMessage(ChannelHandlerContext ctx, ByteBuf msg) {// 处理字节消息}
}
5. 其他常用处理器
LoggingHandler:打印 IO 事件日志,方便调试
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
SslHandler:实现 SSL/TLS 加密通信
SslContext sslContext = SslContext.newServerContext(sslCert, sslKey); pipeline.addLast(sslContext.newHandler(ch.alloc()));
ChannelDuplexHandler:同时处理入站和出站事件
public class MetricsHandler extends ChannelDuplexHandler {private long startTime;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {startTime = System.currentTimeMillis();super.channelRead(ctx, msg);}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {long processingTime = System.currentTimeMillis() - startTime;log.info("消息处理耗时:{}ms", processingTime);super.write(ctx, msg, promise);} }
Netty 配置
将 Netty 核心参数通过 Spring 配置文件管理:
netty:server:port: 8888boss-thread-count: 1worker-thread-count: 8backlog: 1024keep-alive: truetcp-nodelay: truesndbuf: 32768rcvbuf: 32768max-frame-length: 1048576idle-timeout:reader-idle-time: 30writer-idle-time: 60all-idle-time: 90
对应的配置类:
@ConfigurationProperties(prefix = "netty.server")
public class NettyServerProperties {private int port = 8888;private int bossThreadCount = 1;private int workerThreadCount = Runtime.getRuntime().availableProcessors() * 2;private int backlog = 1024;private boolean keepAlive = true;private boolean tcpNodelay = true;private int sndbuf = 32768;private int rcvbuf = 32768;private int maxFrameLength = 1024 * 1024;private IdleTimeout idleTimeout = new IdleTimeout();// getters and setterspublic static class IdleTimeout {private int readerIdleTime = 30;private int writerIdleTime = 60;private int allIdleTime = 90;// getters and setters}
}
Netty 优化
处理器重用:无状态的处理器可以单例重用,减少对象创建开销
内存管理:
- 优先使用
PooledByteBufAllocator
- 避免频繁创建和销毁
ByteBuf
- 使用
ReferenceCountUtil.release()
确保资源释放
- 优先使用
线程模型优化:
- 根据 CPU 核心数调整 EventLoopGroup 线程数
- 避免在 IO 线程中执行耗时操作,可通过
ctx.executor().execute()
提交到其他线程池
流量控制:
- 使用
ChannelConfig.setWriteBufferHighWaterMark()
设置写缓冲区高水位线 - 通过
Channel.isWritable()
检查是否可写,避免 OOM
- 使用
监控与 metrics:
- 集成 Micrometer 监控连接数、消息吞吐量等指标
- 监控 EventLoopGroup 的线程利用率和任务队列长度