Netty从入门到进阶(四)
四、Netty常见参数的学习及优化
1. 扩展序列化算法
序列化和反序列化主要用于消息正文的转换上
- 序列化时,需要将Java对象变成要传输的数据(可以是byte[],或者json等,最终都需要变成byte[])
- 反序列化时,需要将传入的正文数据还原成Java对象,便于处理
目前的代码仅支持Java自带的反序列化,其机制的核心代码如下:
// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
改进:
①为了支持更多的序列化算法,抽象一个Serializer接口,提供两个实现,这里直接将实现加入了枚举类Serializer.Algorithm中
package cn.itcast.protocol;import com.google.gson.Gson;import java.io.*;
import java.nio.charset.StandardCharsets;/*** 用于扩展序列化、反序列化算法*/
public interface Serializer {// 反序列化算法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化算法<T> byte[] serialize(T object);enum Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化失败", e);}}},// 引入了Gson依赖Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static Serializer.Algorithm getByInt(int type) {Serializer.Algorithm[] array = Serializer.Algorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过SerializerAlgorithm范围");}return array[type];}}
}
②增加配置类Config 和 配置文件application.properties
package cn.itcast.config;import cn.itcast.protocol.Serializer;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}// 获取运行端口public static int getServerPort() {String value = properties.getProperty("server.port");if (value == null) {return 8080;} else {return Integer.parseInt(value);}}// 获取序列化和反序列化算法public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if (value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}
application.properties
serializer.algorithm=Json
server.port=8090
③修改编解码器
package cn.itcast.protocol;import cn.itcast.config.Config;
import cn.itcast.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组/*ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();*/byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte(); // 0 或 1byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);/*ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();*/// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];// 根据消息类型字节 获取 对应消息的classClass<?> messageClass = Message.getMessageClass(messageType);Object message = algorithm.deserialize(messageClass, bytes);log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
④测试
package cn.itcast.netty.c4_1;import cn.itcast.config.Config;
import cn.itcast.message.LoginRequestMessage;
import cn.itcast.message.Message;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestSerializer {public static void main(String[] args) {MessageCodecSharable CODEC = new MessageCodecSharable();LoggingHandler LOGGING = new LoggingHandler();EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);LoginRequestMessage message = new LoginRequestMessage("张三", "123");// 序列化channel.writeOutbound(message);log.debug("-------------------------分隔线------------------------------------");// 反序列化ByteBuf buf = messageToByteBuf(message);channel.writeInbound(buf);}// 反序列化public static ByteBuf messageToByteBuf(Message msg) {// 先得到序列化后的byte数组int algorithm = Config.getSerializerAlgorithm().ordinal();ByteBuf out = ByteBufAllocator.DEFAULT.buffer();out.writeBytes(new byte[]{1, 2, 3, 4});out.writeByte(1);out.writeByte(algorithm);out.writeByte(msg.getMessageType());out.writeInt(msg.getSequenceId());out.writeByte(0xff);byte[] bytes = Serializer.Algorithm.values()[algorithm].serialize(msg);// 得到ByteBufout.writeInt(bytes.length);out.writeBytes(bytes);return out;}
}
输出
// 。。。 省略 。。。
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=张三, password=123, nickname=null)
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 85B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 45 |...............E|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 e5 bc a0 |{"username":"...|
|00000020| e4 b8 89 22 2c 22 70 61 73 73 77 6f 72 64 22 3a |...","password":|
|00000030| 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 65 49 |"123","sequenceI|
|00000040| 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 79 70 |d":0,"messageTyp|
|00000050| 65 22 3a 30 7d |e":0} |
+--------+-------------------------------------------------+----------------+
// 。。。 省略 。。。
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 85B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 01 00 00 00 00 00 ff 00 00 00 45 |...............E|
|00000010| 7b 22 75 73 65 72 6e 61 6d 65 22 3a 22 e5 bc a0 |{"username":"...|
|00000020| e4 b8 89 22 2c 22 70 61 73 73 77 6f 72 64 22 3a |...","password":|
|00000030| 22 31 32 33 22 2c 22 73 65 71 75 65 6e 63 65 49 |"123","sequenceI|
|00000040| 64 22 3a 30 2c 22 6d 65 73 73 61 67 65 54 79 70 |d":0,"messageTyp|
|00000050| 65 22 3a 30 7d |e":0} |
+--------+-------------------------------------------------+----------------+
// 。。。 省略 。。。
10:51:17 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=张三, password=123, nickname=null)
// 。。。 省略 。。。
2. 参数调优
2.1 CONNECT_TIMEOUT_MILLIS
- 属于SocketChannel的参数
- 用于在客户端建立连接时,如果在指定毫秒内无法连接,会抛出timeout异常
- SO_TIMEOUT 主要用于在 阻塞IO,阻塞IO中 accept、read等都是无限等待的,如果不希望永远阻塞,使用它调整超时事件
package cn.itcast.netty.c4_1;import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestConnectionTimeout {public static void main(String[] args) {// 1. 客户端通过 .option()方法给SocketChannel配置参数// 2. 服务器端
// new ServerBootstrap().option() // 是给ServerSocketChannel配置参数
// new ServerBootstrap().childOption() // 是给SocketChannel配置参数NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).channel(NioSocketChannel.class).handler(new LoggingHandler());ChannelFuture future = bootstrap.connect("localhost", 8080);future.sync().channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();log.debug("timeout");} finally {group.shutdownGracefully();}}
}
2.2 SO_BACKLOG
- 属于 ServerSocketChannel 的参数
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改为SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,serve r回复 SYN + ACK 到 client,client 收到,状态改为 ESTABLISHED,并发送 ACK 给 server
- 第三次我是,server 收到 ACK,状态改为 ESTABLISHED,将该请求从 sync queue 放入accept queue
其中
- 在linux 2.2以前,backlog 大小包括了两个队列的大小,在2.2之后,分别用下面两个参数来控制
- sync queue - 半连接队列
- 大小通过 /proc/syc/net/piv4/tcp_max_syn_backlog 指定,在syncookies 启动的情况下,逻辑上没有最大值限制,所以这个设置便被忽略
- accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen函数时,内核会根据传入的 backlog参数与系统参数,取二者的较小值
- 如果accpt queue队列满了,server 将发送一个拒绝连接的错误信息到client
Netty中,可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
测试代码:服务端
package cn.itcast.netty.c4_1;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class TestBacklogServer {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).option(ChannelOption.SO_BACKLOG, 1024).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});}
}
客户端
package cn.itcast.netty.c4_1;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class TestBacklogClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080);channelFuture.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {worker.shutdownGracefully();}}
}
2.3 ulimit-n
- 属于操作系统参数(Linux/Unix)
- 用于设置或查看单个进程能打开的最大文件描述符数量(File Descriptor Limit),包括:
- 网络连接(如Socket连接)
- 打开的文件
- 管道(Pipes)
- 其它需要文件描述符的资源
为什么这个参数重要?
1. 网络服务器的高并发场景
- 每个TCP连接会占用一个文件描述符
- 如果ulimit -n设置过小,当并发连接数超过限制时,会报Too many open files错误,导致新连接被拒绝
2. 文件操作限制
- 如果一个进程需要同时操作大量文件(如日志写入、数据库连接等),文件描述符不足会导致操作失败
2.4 TCP_NODELAY
- 属于SocketChannel参数
- 用于控制是否启用Nagle算法,
- 取值true,代表禁用Nagle算法(数据立即发送,减少延迟)
- 适用场景:低延迟优先(如实时游戏、在线交易、SSH/Telnet)、小包频繁发送(如心跳包、控制指令)、需要快速响应的交互式应用(如HTTP/2、gRPC)
- 取值false,代表启用Nagle算法(默认值,合并小包减少网络开销)
- 适用场景:带宽优化优先(如文件传输、大数据流)、高延迟、高吞吐场景(如视频流、FTP)
- 取值true,代表禁用Nagle算法(数据立即发送,减少延迟)
2.5 SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF(发送缓冲区大小)属于SocketChannel参数,影响单次发送的最大数据量;
- SO_RCVBUF(接收缓冲区大小)既可用于SocketChannel参数,也可以用于ServerSocketChannel参数(建议设置到ServerSocketChannel上),影响单次接收的最大数据量
优化建议:
- 如果禁用Nagle算法(TCP_NODELAY=true),可以适当减小SO_SNDBUF以避免缓冲区堆积;
- 如果启用Nagle算法(TCP_NODELAY=false),可以增大SO_SNDBUF以提高吞吐量
2.6 ALLOCATOR
- 属于SocketChannel参数
- 用来分配ByteBuf,ctx.alloc()
服务端:
package cn.itcast.netty.c4_1;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = ctx.alloc().buffer();log.debug("alloc buf {}", buf);}});}}).bind(8080);}
}
客户端:
package cn.itcast.netty.c4_1;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class TestBacklogClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup worker = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello".getBytes()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 8080);channelFuture.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {worker.shutdownGracefully();}}
}
服务端输出
// 。。。省略。。。
16:09:58 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x50636027, L:/127.0.0.1:8080 - R:/127.0.0.1:62815] READ: 5B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
16:09:58 [DEBUG] [nioEventLoopGroup-2-2] c.i.n.c.TestByteBuf - alloc buf PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
2.7 RCVBUF_ALLOCATOR
- 属于 SocketChannel 参数
- 控制 netty 接收缓冲区大小
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct直接内存,具体池化还是非池化由 allocator 决定
@Slf4j
public class TestByteBuf {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("receive buf: {}", msg);}});}}).bind(8080);}
}
服务器输出
16:29:50 [DEBUG] [nioEventLoopGroup-2-2] i.n.h.l.LoggingHandler - [id: 0x623458d8, L:/127.0.0.1:8080 - R:/127.0.0.1:63518] READ: 5B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
16:29:50 [DEBUG] [nioEventLoopGroup-2-2] c.i.n.c.TestByteBuf - receive buf: PooledUnsafeDirectByteBuf(ridx: 0, widx: 5, cap: 1024)
3. RPC框架
3.1 准备工作
①为了简化起见,在原来聊天项目的基础上新增Rpc请求和响应消息
@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}
②请求消息
package cn.itcast.message;import lombok.Getter;
import lombok.ToString;@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message{/*** 调用的接口全限定名,服务端根据它找到实现*/private String interfaceName;/*** 调用接口中的方法名*/private String methodName;/*** 方法返回类型*/private Class<?> returnType;/*** 方法参数类型数组*/private Class[] parameterTypes;/*** 方法参数值数组*/private Object[] parameterValue;public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}
③响应消息
package cn.itcast.message;import lombok.Data;
import lombok.ToString;@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message{/*** 返回值*/private Object returnValue;/*** 异常值*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}
④服务器架子
package cn.itcast.server;import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc请求消息处理器,待实现RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
⑤客户端架子
package cn.itcast.client;import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
⑥服务器端的service获取
package cn.itcast.server.service;import cn.itcast.config.Config;import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}
⑦相关配置 application.properties
serializer.algorithm=Json
server.port=8090
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
3.2 服务器 handler
①添加HelloService
package cn.itcast.server.service;public interface HelloService {String sayHello(String msg);
}
②添加实现类HelloServiceImpl
package cn.itcast.server.service;public class HelloServiceImpl implements HelloService{@Overridepublic String sayHello(String msg) {return "你好" + msg;}
}
③RpcRequestMessageHandler
package cn.itcast.handler;import cn.itcast.message.RpcRequestMessage;
import cn.itcast.message.RpcResponseMessage;
import cn.itcast.server.service.HelloService;
import cn.itcast.server.service.ServicesFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) throws Exception {RpcResponseMessage reponse = new RpcResponseMessage();reponse.setSequenceId(message.getSequenceId());try {HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());reponse.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();reponse.setExceptionValue(new Exception("远程调用出错, " + e.getCause().getMessage()));}ctx.writeAndFlush(reponse);}public static void main(String[] args) throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException {RpcRequestMessage message = new RpcRequestMessage(1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"});HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName()));Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());Object invoke = method.invoke(service, message.getParameterValue());System.out.println(invoke);}
}
3.3 客户端代码 第一版
①RpcClient -> 只发消息版
package cn.itcast.client;import cn.itcast.handler.RpcResponseMessageHandler;
import cn.itcast.message.RpcRequestMessage;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.writeAndFlush(new RpcRequestMessage(1,"cn.itcast.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});channel.closeFuture().sync();} catch (InterruptedException e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
3.4 客户端 handler 第一版
①RpcResponseMessageHandler
package cn.itcast.handler;import cn.itcast.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}
②修改Serializer代码,解决“java.lang.UnsupportOperationException: Attempted to serialize java.lang.Class: java.lang.String. Forgot to register a type adapter? ”异常
package cn.itcast.protocol;import com.google.gson.*;import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;/*** 用于扩展序列化、反序列化算法*/
public interface Serializer {// 反序列化算法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化算法<T> byte[] serialize(T object);enum Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化失败", e);}}},// 引入了Gson依赖Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();String json = new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}@Overridepublic <T> byte[] serialize(T object) {Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();String json = gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static Serializer.Algorithm getByInt(int type) {Serializer.Algorithm[] array = Serializer.Algorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过SerializerAlgorithm范围");}return array[type];}}static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {@Overridepublic Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {String str = json.getAsString();return Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}@Overridepublic JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {// class -> jsonreturn new JsonPrimitive(src.getName());}}
}
3.5 客户端代码 第二版
①创建SequenceIdGenerator,生成自增的sequenceId
package cn.itcast.protocol;import java.util.concurrent.atomic.AtomicInteger;public class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}
②copy一份之前的RpcClient,改为RpcClientManager,代码修改为如下
package cn.itcast.client;import cn.itcast.handler.RpcResponseMessageHandler;
import cn.itcast.message.RpcRequestMessage;
import cn.itcast.message.RpcResponseMessage;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import cn.itcast.protocol.SequenceIdGenerator;
import cn.itcast.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Proxy;@Slf4j
public class RpcClientManager {public static void main(String[] args) {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("张三"));System.out.println(service.sayHello("李四"));}// 创建代理类public static <T> T getProxyService(Class<T> serviceClass) {ClassLoader loader = serviceClass.getClassLoader();Class<?>[] interfaces = new Class[]{serviceClass};Object res = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {// 1. 将方法调用转换为消息对象int sequenceId = SequenceIdGenerator.nextId();RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空Promise对象,用来接收结果 指定promise对象异步接收结果的线程DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);// 4. 等待promise结果promise.await();if (promise.isSuccess()) {// 调用正常return promise.getNow();} else {// 调用失败throw new RuntimeException(promise.cause());}});return (T) res;}private static Channel channel = null;private static final Object LOCK = new Object();// 单例模式public static Channel getChannel() {if (channel != null) {return channel;}synchronized (LOCK) {if (channel != null) {return channel;}initChannel();return channel;}}/*** 初始化channel*/private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc响应消息处理器,待实现RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (InterruptedException e) {log.error("client error", e);}}
}
3.6 客户端 handler 第二版
package cn.itcast.handler;import cn.itcast.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {// 序号 用来接收结果的promise对象public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);// 返回并移除Promise<Object> promise = PROMISES.remove(msg.getSequenceId());if (promise != null) {Object returnValue = msg.getReturnValue();Exception exceptionValue = msg.getExceptionValue();if (exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}}
}