当前位置: 首页 > news >正文

【Netty系列】自定义协议

目录

代码实现

关键点说明:


代码实现

以下是为代码添加详细注释的版本,关键位置都增加了中文说明:

/*** 自定义协议对象(POJO)* 协议结构:消息长度(4字节) + 消息内容(UTF-8字符串)*/
public class CustomProtocol {private int length;    // 消息内容长度private String content;// 消息内容// 构造方法、getter/setter省略(实际使用时需要补充)
}/*** 编码器:将CustomProtocol对象转换为字节流* 继承MessageToByteEncoder,泛型参数指定要编码的对象类型*/
public class CustomEncoder extends MessageToByteEncoder<CustomProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) {// 1. 将内容转换为字节数组byte[] contentBytes = msg.getContent().getBytes(StandardCharsets.UTF_8);// 2. 先写入4字节的消息长度(不包含自身长度)out.writeInt(contentBytes.length); // 3. 写入消息内容字节数组out.writeBytes(contentBytes);}
}/*** 解码器:将字节流转换为CustomProtocol对象* 继承LengthFieldBasedFrameDecoder处理粘包/半包问题*/
public class CustomDecoder extends LengthFieldBasedFrameDecoder {// 参数说明:// maxFrameLength - 最大允许的帧长度(防DoS攻击)// lengthFieldOffset - 长度字段偏移量(这里从0开始)// lengthFieldLength - 长度字段自身占用的字节数(4字节int)// lengthAdjustment - 长度字段后的内容调整值(这里不需要调整)// initialBytesToStrip - 需要跳过的字节数(这里保留全部字节)public CustomDecoder() {super(1024 * 1024, 0, 4, 0, 4);}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {// 1. 调用父类方法获取完整数据帧(已处理粘包问题)ByteBuf frame = (ByteBuf) super.decode(ctx, in);if (frame == null) return null;try {// 2. 读取长度字段(4字节int)int length = frame.readInt();// 3. 根据长度读取内容字节数组byte[] contentBytes = new byte[length];frame.readBytes(contentBytes);// 4. 构建协议对象return new CustomProtocol(length, new String(contentBytes, StandardCharsets.UTF_8));} finally {frame.release(); // 释放ByteBuf资源}}
}/*** 服务端业务处理器* 使用@Sharable注解表明可以安全地在多个Channel间共享*/
@Sharable
public class ServerHandler extends SimpleChannelInboundHandler<CustomProtocol> {// 处理接收到的消息@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) {System.out.println("Server received: " + msg.getContent());// 构造响应消息(转为大写)String response = msg.getContent().toUpperCase();ctx.writeAndFlush(new CustomProtocol(response.length(), response));}// 异常处理@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close(); // 关闭发生异常的连接}
}/*** 服务端启动类*/
public class Server {private final int port;public Server(int port) {this.port = port;}public void start() throws Exception {// 创建事件循环组EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理连接请求EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO操作try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 指定使用NIO传输.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 添加处理器链(注意顺序!)ch.pipeline().addLast(new CustomDecoder())  // 先解码.addLast(new CustomEncoder())  // 后编码.addLast(new ServerHandler()); // 最后业务处理}});// 绑定端口并启动服务ChannelFuture f = b.bind(port).sync();System.out.println("Server started on port " + port);// 等待服务端通道关闭f.channel().closeFuture().sync();} finally {// 优雅关闭线程池workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new Server(8080).start();}
}/*** 客户端处理器*/
public class ClientHandler extends SimpleChannelInboundHandler<CustomProtocol> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) {System.out.println("Client received: " + msg.getContent());ctx.close(); // 收到响应后关闭连接}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}/*** 客户端启动类*/
public class Client {private final String host;private final int port;public Client(String host, int port) {this.host = host;this.port = port;}public void send(String message) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 客户端使用SocketChannel.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new CustomDecoder()).addLast(new CustomEncoder()).addLast(new ClientHandler());}});// 连接服务器ChannelFuture f = b.connect(host, port).sync();// 发送消息f.channel().writeAndFlush(new CustomProtocol(message.length(), message));// 等待连接关闭f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new Client("localhost", 8080).send("Hello Netty!");}
}

关键点说明:

  1. 编解码顺序
    • 解码器需要放在pipeline的前面,因为要先将字节流转换为对象
    • 编码器放在中间,用于将出站的对象转换为字节流
    • 业务处理器放在最后处理实际业务逻辑
  1. 粘包处理
    • 使用LengthFieldBasedFrameDecoder解决TCP粘包问题
    • 通过读取长度字段确定每个数据包的边界
  1. 资源管理
    • 在finally块中关闭EventLoopGroup确保资源释放
    • 解码器中调用frame.release()释放ByteBuf
  1. 线程模型
    • 服务端使用bossGroup接收连接,workerGroup处理IO
    • 客户端使用单个EventLoopGroup处理所有操作
  1. 异常处理
    • 通过重写exceptionCaught方法处理未捕获异常
    • 发生异常时主动关闭连接防止资源泄漏

这个注释版本应该能帮助更好地理解Netty的工作机制和自定义协议的处理流程。实际开发中还需要添加超时处理、心跳机制等增强健壮性。

相关文章:

  • CM3内核寄存器
  • latex figure Missing number, treated as zero. <to be read again>
  • Android的uid~package~pid的关系
  • NodeJS全栈开发面试题讲解——P9性能优化(Node.js 高级)
  • 经典面试题:一文了解常见的缓存问题
  • Spark on Hive表结构变更
  • 性能优化 - 案例篇:缓存
  • NodeJS全栈开发面试题讲解——P10微服务架构(Node.js + 多服务协作)
  • 聊一聊接口测试中缓存处理策略
  • 多模态大语言模型arxiv论文略读(102)
  • 量子语言模型——where to go
  • 快速掌握 GO 之 RabbitMQ 结合 gin+gorm 案例
  • SQL进阶之旅 Day 10:执行计划解读与优化
  • Python应用for循环临时变量作用域
  • 基于Android的跳蚤市场_springboot+vue
  • Qt OpenGL编程常用类
  • 电子电路:时钟脉冲与上升沿的详细解析
  • (面试)OkHttp实现原理
  • pc端小卡片功能-原生JavaScript金融信息与节日日历
  • SpringAI+DeepSeek大模型应用开发实战
  • 动态ip做网站影响seo吗/网站关键词优化办法
  • 人气最高的网络游戏排行榜/吉林seo外包
  • 南昌做网站建设公司/免费域名邮箱
  • 360的网站排名怎么做/seo文章范文
  • 建设银行住房公积金预约网站/推推蛙seo顾问
  • 可以用足球做的游戏视频网站/站长查询域名