【Netty系列】自定义协议
目录
代码实现
关键点说明:
代码实现
以下是为代码添加详细注释的版本,关键位置都增加了中文说明:
/*** 自定义协议对象(POJO)* 协议结构:消息长度(4字节) + 消息内容(UTF-8字符串)*/
public class CustomProtocol {private int length; // 消息内容长度private String content;// 消息内容// 构造方法、getter/setter省略(实际使用时需要补充)
}/*** 编码器:将CustomProtocol对象转换为字节流* 继承MessageToByteEncoder,泛型参数指定要编码的对象类型*/
public class CustomEncoder extends MessageToByteEncoder<CustomProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) {// 1. 将内容转换为字节数组byte[] contentBytes = msg.getContent().getBytes(StandardCharsets.UTF_8);// 2. 先写入4字节的消息长度(不包含自身长度)out.writeInt(contentBytes.length); // 3. 写入消息内容字节数组out.writeBytes(contentBytes);}
}/*** 解码器:将字节流转换为CustomProtocol对象* 继承LengthFieldBasedFrameDecoder处理粘包/半包问题*/
public class CustomDecoder extends LengthFieldBasedFrameDecoder {// 参数说明:// maxFrameLength - 最大允许的帧长度(防DoS攻击)// lengthFieldOffset - 长度字段偏移量(这里从0开始)// lengthFieldLength - 长度字段自身占用的字节数(4字节int)// lengthAdjustment - 长度字段后的内容调整值(这里不需要调整)// initialBytesToStrip - 需要跳过的字节数(这里保留全部字节)public CustomDecoder() {super(1024 * 1024, 0, 4, 0, 4);}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {// 1. 调用父类方法获取完整数据帧(已处理粘包问题)ByteBuf frame = (ByteBuf) super.decode(ctx, in);if (frame == null) return null;try {// 2. 读取长度字段(4字节int)int length = frame.readInt();// 3. 根据长度读取内容字节数组byte[] contentBytes = new byte[length];frame.readBytes(contentBytes);// 4. 构建协议对象return new CustomProtocol(length, new String(contentBytes, StandardCharsets.UTF_8));} finally {frame.release(); // 释放ByteBuf资源}}
}/*** 服务端业务处理器* 使用@Sharable注解表明可以安全地在多个Channel间共享*/
@Sharable
public class ServerHandler extends SimpleChannelInboundHandler<CustomProtocol> {// 处理接收到的消息@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) {System.out.println("Server received: " + msg.getContent());// 构造响应消息(转为大写)String response = msg.getContent().toUpperCase();ctx.writeAndFlush(new CustomProtocol(response.length(), response));}// 异常处理@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close(); // 关闭发生异常的连接}
}/*** 服务端启动类*/
public class Server {private final int port;public Server(int port) {this.port = port;}public void start() throws Exception {// 创建事件循环组EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理连接请求EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理IO操作try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 指定使用NIO传输.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 添加处理器链(注意顺序!)ch.pipeline().addLast(new CustomDecoder()) // 先解码.addLast(new CustomEncoder()) // 后编码.addLast(new ServerHandler()); // 最后业务处理}});// 绑定端口并启动服务ChannelFuture f = b.bind(port).sync();System.out.println("Server started on port " + port);// 等待服务端通道关闭f.channel().closeFuture().sync();} finally {// 优雅关闭线程池workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new Server(8080).start();}
}/*** 客户端处理器*/
public class ClientHandler extends SimpleChannelInboundHandler<CustomProtocol> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, CustomProtocol msg) {System.out.println("Client received: " + msg.getContent());ctx.close(); // 收到响应后关闭连接}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}/*** 客户端启动类*/
public class Client {private final String host;private final int port;public Client(String host, int port) {this.host = host;this.port = port;}public void send(String message) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 客户端使用SocketChannel.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new CustomDecoder()).addLast(new CustomEncoder()).addLast(new ClientHandler());}});// 连接服务器ChannelFuture f = b.connect(host, port).sync();// 发送消息f.channel().writeAndFlush(new CustomProtocol(message.length(), message));// 等待连接关闭f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new Client("localhost", 8080).send("Hello Netty!");}
}
关键点说明:
- 编解码顺序:
-
- 解码器需要放在pipeline的前面,因为要先将字节流转换为对象
- 编码器放在中间,用于将出站的对象转换为字节流
- 业务处理器放在最后处理实际业务逻辑
- 粘包处理:
-
- 使用
LengthFieldBasedFrameDecoder
解决TCP粘包问题 - 通过读取长度字段确定每个数据包的边界
- 使用
- 资源管理:
-
- 在finally块中关闭EventLoopGroup确保资源释放
- 解码器中调用frame.release()释放ByteBuf
- 线程模型:
-
- 服务端使用bossGroup接收连接,workerGroup处理IO
- 客户端使用单个EventLoopGroup处理所有操作
- 异常处理:
-
- 通过重写exceptionCaught方法处理未捕获异常
- 发生异常时主动关闭连接防止资源泄漏
这个注释版本应该能帮助更好地理解Netty的工作机制和自定义协议的处理流程。实际开发中还需要添加超时处理、心跳机制等增强健壮性。