当前位置: 首页 > news >正文

Netty从入门到进阶(四)

四、Netty常见参数的学习及优化

1. 扩展序列化算法

序列化和反序列化主要用于消息正文的转换上

  • 序列化时,需要将Java对象变成要传输的数据(可以是byte[],或者json等,最终都需要变成byte[])
  • 反序列化时,需要将传入的正文数据还原成Java对象,便于处理

目前的代码仅支持Java自带的反序列化,其机制的核心代码如下:

// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

改进

①为了支持更多的序列化算法,抽象一个Serializer接口,提供两个实现,这里直接将实现加入了枚举类Serializer.Algorithm中

package cn.itcast.protocol;import com.google.gson.Gson;import java.io.*;
import java.nio.charset.StandardCharsets;/*** 用于扩展序列化、反序列化算法*/
public interface Serializer {// 反序列化算法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化算法<T> byte[] serialize(T object);enum  Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化失败", e);}}},// 引入了Gson依赖Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static Serializer.Algorithm getByInt(int type) {Serializer.Algorithm[] array = Serializer.Algorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过SerializerAlgorithm范围");}return array[type];}}
}

②增加配置类Config 和 配置文件application.properties

package cn.itcast.config;import cn.itcast.protocol.Serializer;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}// 获取运行端口public static int getServerPort() {String value = properties.getProperty("server.port");if (value == null) {return 8080;} else {return Integer.parseInt(value);}}// 获取序列化和反序列化算法public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if (value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}

application.properties

serializer.algorithm=Json
server.port=8090

③修改编解码器

package cn.itcast.protocol;import cn.itcast.config.Config;
import cn.itcast.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组/*ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();*/byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();  // 0 或 1byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);/*ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();*/// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];// 根据消息类型字节 获取 对应消息的classClass<?> messageClass = Message.getMessageClass(messageType);Object message = algorithm.deserialize(messageClass, bytes);log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}

④测试

package cn.itcast.netty.c4_1;import cn.itcast.config.Config;
import cn.itcast.message.LoginRequestMessage;
import cn.itcast.message.Message;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC = new MessageCodecSharable();LoggingHandler LOGGING = new LoggingHandler();EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message = new LoginRequestMessage("张三", "123");// 序列化channel.writeOutbound(message);log.debug("-------------------------分隔线------------------------------------");// 反序列化ByteBuf buf = messageToByteBuf(message);channel.writeInbound(buf);}// 反序列化public static ByteBuf messageToByteBuf(Message msg) {// 先得到序列化后的byte数组int algorithm = Config.getSerializerAlgorithm().ordinal();ByteBuf out = ByteBufAllocator.DEFAULT.buffer();out.writeBytes(new byte[]{1, 2, 3, 4});out.writeByte(1);out.writeByte(algorithm);out.writeByte(msg.getMessageType());out.writeInt(msg.getSequenceId());out.writeByte(0xff);byte[] bytes = Serializer.Algorithm.values()[algorithm].serialize(msg);// 得到ByteBufout.writeInt(bytes.length);out.writeBytes(bytes);return out;}
}

输出

// 。。。 省略 。。。
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=张三, password=123, nickname=null)
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 85B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 45 |...............E|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 e5 bc a0 |{"username":"...|
|00000020| e4 b8 89 22 2c 22 70 61 73 73 77 6f 72 64 22 3a |...","password":|
|00000030| 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 65 49 |"123","sequenceI|
|00000040| 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 79 70 |d":0,"messageTyp|
|00000050| 65 22 3a 30 7d                                  |e":0}           |
+--------+-------------------------------------------------+----------------+
// 。。。 省略 。。。
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 85B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 45 |...............E|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 e5 bc a0 |{"username":"...|
|00000020| e4 b8 89 22 2c 22 70 61 73 73 77 6f 72 64 22 3a |...","password":|
|00000030| 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 65 49 |"123","sequenceI|
|00000040| 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 79 70 |d":0,"messageTyp|
|00000050| 65 22 3a 30 7d                                  |e":0}           |
+--------+-------------------------------------------------+----------------+
// 。。。 省略 。。。
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=张三, password=123, nickname=null)
// 。。。 省略 。。。

2. 参数调优

2.1 CONNECT_TIMEOUT_MILLIS

  • 属于SocketChannel的参数
  • 用于在客户端建立连接时,如果在指定毫秒内无法连接,会抛出timeout异常
  • SO_TIMEOUT 主要用于在 阻塞IO,阻塞IO中 accept、read等都是无限等待的,如果不希望永远阻塞,使用它调整超时事件
package cn.itcast.netty.c4_1;import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestConnectionTimeout {public static void main(String[] args) {// 1. 客户端通过 .option()方法给SocketChannel配置参数// 2. 服务器端
//        new ServerBootstrap().option()  // 是给ServerSocketChannel配置参数
//        new ServerBootstrap().childOption()  // 是给SocketChannel配置参数NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).channel(NioSocketChannel.class).handler(new LoggingHandler());ChannelFuture future = bootstrap.connect("localhost", 8080);future.sync().channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();log.debug("timeout");} finally {group.shutdownGracefully();}}
}

2.2 SO_BACKLOG

  • 属于 ServerSocketChannel 的参数

  • 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改为SYN_REVD,并将该请求放入 sync queue 队列
  • 第二次握手,serve r回复 SYN + ACK 到 client,client 收到,状态改为 ESTABLISHED,并发送 ACK 给 server
  • 第三次我是,server 收到 ACK,状态改为 ESTABLISHED,将该请求从 sync queue 放入accept queue

其中

  • 在linux 2.2以前,backlog 大小包括了两个队列的大小,在2.2之后,分别用下面两个参数来控制
  • sync queue - 半连接队列
    • 大小通过 /proc/syc/net/piv4/tcp_max_syn_backlog 指定,在syncookies 启动的情况下,逻辑上没有最大值限制,所以这个设置便被忽略
  • accept queue - 全连接队列
    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen函数时,内核会根据传入的 backlog参数与系统参数,取二者的较小值
    • 如果accpt queue队列满了,server 将发送一个拒绝连接的错误信息到client

Netty中,可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

测试代码:服务端

package cn.itcast.netty.c4_1;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class TestBacklogServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).option(ChannelOption.SO_BACKLOG, 1024).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});}
}

客户端

package cn.itcast.netty.c4_1;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class TestBacklogClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080);channelFuture.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {worker.shutdownGracefully();}}
}

2.3 ulimit-n

  • 属于操作系统参数(Linux/Unix)
  • 用于设置或查看单个进程能打开的最大文件描述符数量(File Descriptor Limit),包括:
    • 网络连接(如Socket连接)
    • 打开的文件
    • 管道(Pipes)
    • 其它需要文件描述符的资源

为什么这个参数重要?

1. 网络服务器的高并发场景

  • 每个TCP连接会占用一个文件描述符
  • 如果ulimit -n设置过小,当并发连接数超过限制时,会报Too many open files错误,导致新连接被拒绝

2. 文件操作限制

  • 如果一个进程需要同时操作大量文件(如日志写入、数据库连接等),文件描述符不足会导致操作失败

2.4 TCP_NODELAY

  • 属于SocketChannel参数
  • 用于控制是否启用Nagle算法,
    • 取值true,代表禁用Nagle算法(数据立即发送,减少延迟)
      • 适用场景:低延迟优先(如实时游戏、在线交易、SSH/Telnet)、小包频繁发送(如心跳包、控制指令)、需要快速响应的交互式应用(如HTTP/2、gRPC)
    • 取值false,代表启用Nagle算法(默认值,合并小包减少网络开销)
      • 适用场景:带宽优化优先(如文件传输、大数据流)、高延迟、高吞吐场景(如视频流、FTP)

2.5 SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF(发送缓冲区大小)属于SocketChannel参数,影响单次发送的最大数据量;
  • SO_RCVBUF(接收缓冲区大小)既可用于SocketChannel参数,也可以用于ServerSocketChannel参数(建议设置到ServerSocketChannel上),影响单次接收的最大数据量

优化建议:

  • 如果禁用Nagle算法(TCP_NODELAY=true),可以适当减小SO_SNDBUF以避免缓冲区堆积;
  • 如果启用Nagle算法(TCP_NODELAY=false),可以增大SO_SNDBUF以提高吞吐量

2.6 ALLOCATOR

  • 属于SocketChannel参数
  • 用来分配ByteBuf,ctx.alloc()

服务端:

package cn.itcast.netty.c4_1;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = ctx.alloc().buffer();log.debug("alloc buf {}", buf);}});}}).bind(8080);}
}

客户端:

package cn.itcast.netty.c4_1;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class TestBacklogClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello".getBytes()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080);channelFuture.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {worker.shutdownGracefully();}}
}

服务端输出

// 。。。省略。。。

16:09:58 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x50636027, L:/127.0.0.1:8080 - R:/127.0.0.1:62815] READ: 5B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
16:09:58 [DEBUG] [nioEventLoopGroup-2-2] c.i.n.c.TestByteBuf - alloc buf PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)

2.7 RCVBUF_ALLOCATOR

  • 属于 SocketChannel 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct直接内存,具体池化还是非池化由 allocator 决定
@Slf4j
public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("receive buf: {}", msg);}});}}).bind(8080);}
}

服务器输出

16:29:50 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x623458d8, L:/127.0.0.1:8080 - R:/127.0.0.1:63518] READ: 5B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+
16:29:50 [DEBUG] [nioEventLoopGroup-2-2] c.i.n.c.TestByteBuf - receive buf: PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 1024)

3. RPC框架

3.1 准备工作

①为了简化起见,在原来聊天项目的基础上新增Rpc请求和响应消息

@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}

②请求消息

package cn.itcast.message;import lombok.Getter;
import lombok.ToString;@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message{/*** 调用的接口全限定名,服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class<?> returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}

③响应消息

package cn.itcast.message;import lombok.Data;
import lombok.ToString;@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message{/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}

④服务器架子

package cn.itcast.server;import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc请求消息处理器,待实现RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

⑤客户端架子

package cn.itcast.client;import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

⑥服务器端的service获取

package cn.itcast.server.service;import cn.itcast.config.Config;import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}

⑦相关配置 application.properties

serializer.algorithm=Json
server.port=8090
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

3.2 服务器 handler

①添加HelloService

package cn.itcast.server.service;public interface HelloService {String sayHello(String msg);
}

②添加实现类HelloServiceImpl 

package cn.itcast.server.service;public class HelloServiceImpl implements HelloService{@Overridepublic String sayHello(String msg) {return "你好" + msg;}
}

③RpcRequestMessageHandler 

package cn.itcast.handler;import cn.itcast.message.RpcRequestMessage;
import cn.itcast.message.RpcResponseMessage;
import cn.itcast.server.service.HelloService;
import cn.itcast.server.service.ServicesFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception {RpcResponseMessage reponse = new RpcResponseMessage();reponse.setSequenceId(message.getSequenceId());try {HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());reponse.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();reponse.setExceptionValue(new Exception("远程调用出错, " + e.getCause().getMessage()));}ctx.writeAndFlush(reponse);}public static void main(String[] args) throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException {RpcRequestMessage message = new RpcRequestMessage(1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"});HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());System.out.println(invoke);}
}

3.3 客户端代码 第一版

①RpcClient -> 只发消息版

package cn.itcast.client;import cn.itcast.handler.RpcResponseMessageHandler;
import cn.itcast.message.RpcRequestMessage;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.writeAndFlush(new RpcRequestMessage(1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});channel.closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

3.4 客户端 handler 第一版

①RpcResponseMessageHandler 

package cn.itcast.handler;import cn.itcast.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}

②修改Serializer代码,解决“java.lang.UnsupportOperationException: Attempted to serialize java.lang.Class: java.lang.String. Forgot to register a type adapter? ”异常

package cn.itcast.protocol;import com.google.gson.*;import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;/*** 用于扩展序列化、反序列化算法*/
public interface Serializer {// 反序列化算法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化算法<T> byte[] serialize(T object);enum  Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化失败", e);}}},// 引入了Gson依赖Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();String json = new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}@Overridepublic <T> byte[] serialize(T object) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();String json = gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static Serializer.Algorithm getByInt(int type) {Serializer.Algorithm[] array = Serializer.Algorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过SerializerAlgorithm范围");}return array[type];}}static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {@Overridepublic Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {String str = json.getAsString();return Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}@Overridepublic JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {// class -> jsonreturn new JsonPrimitive(src.getName());}}
}

3.5 客户端代码 第二版

①创建SequenceIdGenerator,生成自增的sequenceId

package cn.itcast.protocol;import java.util.concurrent.atomic.AtomicInteger;public class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}

②copy一份之前的RpcClient,改为RpcClientManager,代码修改为如下

package cn.itcast.client;import cn.itcast.handler.RpcResponseMessageHandler;
import cn.itcast.message.RpcRequestMessage;
import cn.itcast.message.RpcResponseMessage;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import cn.itcast.protocol.SequenceIdGenerator;
import cn.itcast.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Proxy;@Slf4j
public class RpcClientManager {public static void main(String[] args) {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("张三"));System.out.println(service.sayHello("李四"));}// 创建代理类public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader loader = serviceClass.getClassLoader();Class<?>[] interfaces = new Class[]{serviceClass};Object res = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {// 1. 将方法调用转换为消息对象int sequenceId = SequenceIdGenerator.nextId();RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空Promise对象,用来接收结果          指定promise对象异步接收结果的线程DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);// 4. 等待promise结果promise.await();if (promise.isSuccess()) {// 调用正常return promise.getNow();} else {// 调用失败throw new RuntimeException(promise.cause());}});return (T) res;}private static Channel channel = null;private static final Object LOCK = new Object();// 单例模式public static Channel getChannel() {if (channel != null) {return channel;}synchronized (LOCK) {if (channel != null) {return channel;}initChannel();return channel;}}/*** 初始化channel*/private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (InterruptedException e) {log.error("client error", e);}}
}

3.6 客户端 handler 第二版

package cn.itcast.handler;import cn.itcast.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {//                          序号      用来接收结果的promise对象public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);// 返回并移除Promise<Object> promise = PROMISES.remove(msg.getSequenceId());if (promise != null) {Object returnValue = msg.getReturnValue();Exception exceptionValue = msg.getExceptionValue();if (exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}}
}

相关文章:

  • strncpy_s与_TRUNCATE
  • Jinja2 模板在 Python 和 LLM 提示词编辑器中的应用
  • 如何搭建反向海淘代购系统?
  • Cursor 编辑器中的 Notepad 功能使用指南
  • 网络安全攻防领域证书
  • 黑群晖NAS部署DeepSeek模型与内网穿透实现本地AI服务
  • FastJSON 1.2.83版本升级指南:安全加固与性能优化实践
  • BERT vs BART vs T5:预训练语言模型核心技术详解
  • mysql 的卸载- Windows 版
  • Kotlin 中的继承/实现
  • 【Git】面对发布或重要节点,Git如何打Tag?
  • navicat 有免费版了,navicat 官方免费版下载
  • Conda 安装 nbextensions详细教程
  • 【Redisson】锁可重入原理
  • P4 QT项目----会学串口助手(解析笔记)
  • Oracle 条件索引 case when 报错解决方案(APP)
  • 铸铁平台的制造工艺复杂而精细
  • 探索铸铁试验平台在制造行业的卓越价值
  • keil5怎么关闭工程
  • vue2为什么不能检查数组的的变化,改怎样解决
  • 目前做哪个网站致富/北京关键词快速排名
  • 网站建设制作设计推广/如何查看百度搜索指数
  • 甘州区建设局网站/淘宝指数查询入口
  • 中国空间站真实图片/正规推广赚佣金的平台
  • UE4做购物网站/企业软文怎么写
  • wordpress 添加点赞/如何做网站推广优化