当前位置: 首页 > news >正文

12-netty基础-手写rpc-编解码-04

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 自定义编辑码

编解码都采用原生的ByteBuf,分别为MessageToByteEncoder、ByteToMessageDecoder;解决了拆包、粘包问题
编码:将需要发送的数据封装成RpcProtocol形式进行发送
解码:将接收到的数据解释成RpcProtocol形式然后处理相应的业务逻辑

2 代码

2.1 编码

package com.bonnie.protocol.code;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;/*** 编码*/
@Slf4j
public class BonnieEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol<Object> msg, ByteBuf out) throws Exception {log.info("============begin BonnieEncoder=========");Header header = msg.getHeader();// 魔数out.writeShort(header.getMagic());// 序列化类型out.writeByte(header.getSerialType());// 消息类型out.writeByte(header.getReqType());// 请求idout.writeLong(header.getRequestId());// 消息体序列化ISerializer serializer = SerializerManager.getSerializer(header.getSerialType());byte[] contentByteArray = serializer.serialize(msg.getContent());System.out.println("body长度"+contentByteArray.length);// 消息体长度,4个字节out.writeInt(contentByteArray.length);System.out.println("发送数据:"+JSONObject.toJSONString(msg));// 写入消息体out.writeBytes(contentByteArray);}}

2.2 解码

package com.bonnie.protocol.code;import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.serializer.ISerializer;
import com.bonnie.protocol.serializer.SerializerManager;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.Objects;/*** 解码*/
@Slf4j
public class BonnieDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {log.info("========begin BonnieDecoder==========");// 首先判断可读的字节是否小于头的长度,如果小于,说明没有body数据,甚至数据有问题,不解码if (in.readableBytes()<= RpcConstant.HEAD_TOTOAL_LEN) {return;}// 标记读取开始索引in.markReaderIndex();// 魔数short magic = in.readShort();if (!Objects.equals(magic, RpcConstant.MAGIC)) {throw new IllegalArgumentException("Illegal request parameter 'magic',"+magic);}// 序列化类型byte serialType = in.readByte();// 消息类型byte reqType = in.readByte();// 请求idlong requestId = in.readLong();// 报文长度int dataLength = in.readInt();// 可读字节是否小于body的长度,如果小于,则不读取,并且重置到读指针的地方,等下一次读if(in.readableBytes()<dataLength) {in.resetReaderIndex();return;}// 消息体byte[] bodyByteArray = new byte[dataLength];// body内容读取到body中in.readBytes(bodyByteArray);// 封装头信息Header header = new Header();header.setMagic(magic);header.setSerialType(serialType);header.setReqType(reqType);header.setRequestId(requestId);header.setLength(dataLength);// 拿到对应的序列化ISerializer serializer = SerializerManager.getSerializer(serialType);/*** 根据请求类型,比如客户端发送数据,就是REQUWST,服务端给客户端回复数据就是RESPONSE,当然都是* 相对的,每一段都会发送REQUEST请求,每一段也会发送RESPONSE请求*/ReqTypeEnum reqTypeEnum = ReqTypeEnum.findByCode(reqType);switch (reqTypeEnum) {// 如果是请求报文  反序列化得到数据,封装数据,继续传递case REQUEST:RpcProtocol rpcProtocol = dealRequest(bodyByteArray, serializer, header);out.add(rpcProtocol);break;case RESPONSE:RpcProtocol rpcProtocolResponse = dealResponse(bodyByteArray, serializer, header);out.add(rpcProtocolResponse);break;case HEARTBEAT:// TODObreak;}}private RpcProtocol dealResponse(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcResponse rpcResponse = serializer.deserialize(bodyByteArray, RpcResponse.class);RpcProtocol<RpcResponse> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcResponse);return rpcProtocol;}private RpcProtocol dealRequest(byte[] bodyByteArray, ISerializer serializer, Header header) {RpcRequest rpcRequest = serializer.deserialize(bodyByteArray, RpcRequest.class);RpcProtocol<RpcRequest> rpcProtocol = new RpcProtocol<>();rpcProtocol.setHeader(header);rpcProtocol.setContent(rpcRequest);return rpcProtocol;}}

http://www.dtcms.com/a/320074.html

相关文章:

  • ubuntu 2024 安装拼音输入法
  • 【macOS操作系统部署开源DeepSeek大模型,搭建Agent平台,构建私有化RAG知识库完整流程】
  • Linux综合练习2
  • 电气设备与互感器全解析
  • 智能制造网络质量保障:德承 DX-1200多网口工控机在windows系统下的网络性能测试指南
  • 操作系统与并发底层原理多道技术
  • docker容器导出为镜像
  • 深度学习入门Day7:Transformer架构原理与实战全解析
  • 亚马逊广告运营:有什么好用的辅助工具
  • Redis配置、测试及分布式缓存实现
  • Android 之 Jetpack - Paging
  • 《C语言》函数练习题--2
  • ElasticSearch相关术语介绍
  • 使用 decimal 包解决 go float 浮点数运算失真
  • 小鸡模拟器安卓版:经典街机游戏的移动体验
  • 利用Axure与JavaScript打造动态图片上传原型:设计案例分享
  • spring-cglib代理-初探01
  • 深度学习-卷积神经网络CNN-1×1卷积层
  • Flink-1.19.0源码详解9-ExecutionGraph生成-后篇
  • UE5多人MOBA+GAS 39、制作角色上半身UI
  • 字符串匹配(重点解析KMP算法)
  • 6 大模块!重构物业运营方式
  • 跨境电商增长突围:多维变局下的战略重构与技术赋能
  • 数智先锋 | Bonree ONE 赋能通威股份有限公司提升全栈可观测性能力
  • 深入解析NVIDIA Nsight工具套件:原理、功能与实战指南
  • 房产证识别在房产行业的技术实现及应用原理
  • Python Socket 脚本深度解析与开发指南
  • 扣扣号码展示网站源码_号码售卖展示系统源码 全开源 带后台(源码下载)
  • 5、倒计时翻页效果
  • 工作任务管理