12-netty基础-手写rpc-编解码-04
netty系列文章:
01-netty基础-socket |
02-netty基础-java四种IO模型 |
03-netty基础-多路复用select、poll、epoll |
04-netty基础-Reactor三种模型 |
05-netty基础-ByteBuf数据结构 |
06-netty基础-编码解码 |
07-netty基础-自定义编解码器 |
08-netty基础-自定义序列化和反序列化 |
09-netty基础-手写rpc-原理-01 |
10-netty基础-手写rpc-定义协议头-02 |
11-netty基础-手写rpc-支持多序列化协议-03 |
12-netty基础-手写rpc-编解码-04 |
13-netty基础-手写rpc-消费方生成代理-05 |
14-netty基础-手写rpc-提供方(服务端)-06 |
1 自定义编辑码
编解码都采用原生的ByteBuf,分别为MessageToByteEncoder、ByteToMessageDecoder;解决了拆包、粘包问题
编码:将需要发送的数据封装成RpcProtocol形式进行发送
解码:将接收到的数据解释成RpcProtocol形式然后处理相应的业务逻辑
2 代码
2.1 编码
package com.bonnie.protocol.code;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;/*** 编码*/
@Slf4j
public class BonnieEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf out) throws Exception {log.info("============begin BonnieEncoder=========");Header header = msg.getHeader();// 魔数out.writeShort(header.getMagic());// 序列化类型out.writeByte(header.getSerialType());// 消息类型out.writeByte(header.getReqType());// 请求idout.writeLong(header.getRequestId());// 消息体序列化ISerializer serializer = SerializerManager.getSerializer(header.getSerialType());byte[] contentByteArray = serializer.serialize(msg.getContent());System.out.println("body长度"+contentByteArray.length);// 消息体长度,4个字节out.writeInt(contentByteArray.length);System.out.println("发送数据:"+JSONObject.toJSONString(msg));// 写入消息体out.writeBytes(contentByteArray);}}
2.2 解码
package com.bonnie.protocol.code;import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Objects;/*** 解码*/
@Slf4j
public class BonnieDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {log.info("========begin BonnieDecoder==========");// 首先判断可读的字节是否小于头的长度,如果小于,说明没有body数据,甚至数据有问题,不解码if (in.readableBytes()<= RpcConstant.HEAD_TOTOAL_LEN) {return;}// 标记读取开始索引in.markReaderIndex();// 魔数short magic = in.readShort();if (!Objects.equals(magic, RpcConstant.MAGIC)) {throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);}// 序列化类型byte serialType = in.readByte();// 消息类型byte reqType = in.readByte();// 请求idlong requestId = in.readLong();// 报文长度int dataLength = in.readInt();// 可读字节是否小于body的长度,如果小于,则不读取,并且重置到读指针的地方,等下一次读if(in.readableBytes()<dataLength) {in.resetReaderIndex();return;}// 消息体byte[] bodyByteArray = new byte[dataLength];// body内容读取到body中in.readBytes(bodyByteArray);// 封装头信息Header header = new Header();header.setMagic(magic);header.setSerialType(serialType);header.setReqType(reqType);header.setRequestId(requestId);header.setLength(dataLength);// 拿到对应的序列化ISerializer serializer = SerializerManager.getSerializer(serialType);/*** 根据请求类型,比如客户端发送数据,就是REQUWST,服务端给客户端回复数据就是RESPONSE,当然都是* 相对的,每一段都会发送REQUEST请求,每一段也会发送RESPONSE请求*/ReqTypeEnum reqTypeEnum = ReqTypeEnum.findByCode(reqType);switch (reqTypeEnum) {// 如果是请求报文 反序列化得到数据,封装数据,继续传递case REQUEST:RpcProtocol rpcProtocol = dealRequest(bodyByteArray, serializer, header);out.add(rpcProtocol);break;case RESPONSE:RpcProtocol rpcProtocolResponse = dealResponse(bodyByteArray, serializer, header);out.add(rpcProtocolResponse);break;case HEARTBEAT:// TODObreak;}}private RpcProtocol dealResponse(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcResponse rpcResponse = serializer.deserialize(bodyByteArray, RpcResponse.class);RpcProtocol<RpcResponse> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcResponse);return rpcProtocol;}private RpcProtocol dealRequest(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcRequest rpcRequest = serializer.deserialize(bodyByteArray, RpcRequest.class);RpcProtocol<RpcRequest> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcRequest);return rpcProtocol;}}