当前位置: 首页 > news >正文

14-netty基础-手写rpc-提供方(服务端)-06

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 功能逻辑

在服务端启动的时候,在spring容器中bean已经被初始化好之后,拿到当前bean的信息,判断是否被BonnieRemoteService修饰,如果被修饰则获取到当前类下的所有Method,然后将这个Method信息缓存到容器中。以供后续rpc反射调用。
缓存容器使用Map, key:类的全路径+方法名称  value:Method即可

2、核心代码

spring容器中bean已经被初始化好,可以实现BeanPostProcessor接口中的postProcessAfterInitialization方法实现初始化后扩展功能

实现InitializingBean接口中afterPropertiesSet方法,将nettyServer的服务放在这块启动

2.1 收集BonnieRemoteService修饰的类

package com.bonnie.protocol.spring.service;import com.bonnie.protocol.annotation.BonnieRemoteService;
import com.bonnie.protocol.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;import java.lang.reflect.Method;@Component
@Slf4j
public class SpringRpcProviderBean implements BeanPostProcessor, InitializingBean {private String serverAddress = "127.0.0.1";private Integer serverPort = 48081;//    public SpringRpcProviderBean(String serverAddress, Integer serverPort) {
//        this.serverAddress = serverAddress;
//        this.serverPort = serverPort;
//    }/*** bean初始化完成后,执行该逻辑* @param bean* @param beanName* @return* @throws BeansException*/@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {System.out.println("SpringRpcProviderBean===="+beanName);// 只要bean声明了BonnieRemoteService注解,则需要把该服务发布到网络上// 判断当前类上是否有注解BonnieRemoteServiceboolean flag = bean.getClass().isAnnotationPresent(BonnieRemoteService.class);if (flag) {Method[] declaredMethods = bean.getClass().getDeclaredMethods();for (Method method : declaredMethods) {String serviceName = bean.getClass().getInterfaces()[0].getName();String key = serviceName + "." + method.getName();BeanMethod beanMethod = new BeanMethod();beanMethod.setBean(bean);beanMethod.setMethod(method);// 缓存到map容器中Mediator.beanMethodMap.put(key, beanMethod);}}return bean;}@Overridepublic void afterPropertiesSet() throws Exception {log.info("启动Netty服务端======48081");new Thread(()->{new NettyServer(serverAddress, serverPort).startNettyServer();}).start();}
}

2.2 NettyServer实现

启动服务,设置编解码的方式

package com.bonnie.protocol.netty;import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServer {private String serverAddress;private Integer serverPort;public NettyServer(String serverAddress, Integer serverPort) {this.serverAddress = serverAddress;this.serverPort = serverPort;}public void startNettyServer() {log.info("begin start Netty server");NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup work = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()// 长度域解码器.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 12, 4, 0, 0)).addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new RpcServerHandler());}});try {ChannelFuture channelFuture = serverBootstrap.bind(this.serverAddress, this.serverPort).sync();log.info("Server started Success on serverAddress {} Port,{}",this.serverAddress, this.serverPort);channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();work.shutdownGracefully();}}}

2.3 NettyServer接收客户端请求数据

package com.bonnie.protocol.netty;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.spring.service.Mediator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);/*** 服务端接收客户端消息* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {long requestId = msg.getHeader().getRequestId();log.info("接收到客户端的消息: requestId {} {}", requestId, JSONObject.toJSONString(msg));// 构建返回消息ReqResponseRpcProtocol<RpcResponse> responseRpcProtocol = new RpcProtocol<>();Header header = msg.getHeader();// 设置返回的消息类型header.setReqType(ReqTypeEnum.RESPONSE.getCode());// 通过调用获取到方法的返回数据Object result = Mediator.getInstance().processor(msg.getContent());RpcResponse rpcResponse = new RpcResponse();rpcResponse.setMsg("success");rpcResponse.setData(result);responseRpcProtocol.setHeader(header);responseRpcProtocol.setContent(rpcResponse);// 数据写入到客户端ctx.writeAndFlush(responseRpcProtocol);}}

http://www.dtcms.com/a/319803.html

相关文章:

  • Java NIO 核心原理与秋招高频面试题解析
  • day28-NFS
  • iOS混淆工具使用,后续维护与版本升级中实用的混淆策略
  • 代码随想录day58图论8
  • windows操作系统定时关机、重启指令记录
  • 一周学会Matplotlib3 Python 数据可视化-坐标轴 (Axis)
  • 进程间数据的关联与隔离
  • 管家婆软件如何设置默认税率?
  • AI创新中心从“空间集聚”到“生态共生”
  • 代码库详细笔记
  • P1690 贪婪的 Copy
  • [airplay2] airplay2简略介绍
  • 前端全局注册知识【持续更新】
  • 二分查找算法,并分析其时间、空间复杂度
  • IIS7.5下的https无法绑定主机头,显示灰色如何处理?
  • [ java SE ] 多人聊天窗口1.0
  • 强光干扰下裂缝漏检率↓82%!陌讯轻量化模型在道路巡检的落地实践
  • 2深度学习Pytorch-自动微分--梯度计算、梯度上下文控制(累计梯度、梯度清零)
  • Ethereum: 像Uniswap V3贡献者一样开发,克隆、编译与测试v3-core
  • 通过减少回表和增加冗余字段,优化SQL查询效率
  • LSTM 单变量时序预测—pytorch
  • vscode+latex本地英文期刊环境配置
  • VScode使用jupyter notebook,配置内核报错没有torch解决
  • 如何委托第三方检测机构做软件测试?
  • 鸿蒙 - 分享功能
  • 直播预告|鸿蒙生态下的 Flutter 开发实战
  • 非化学冷却塔水处理解决方案:绿色工业时代的革新引擎
  • Elasticsearch 文档分词器
  • 神经网络入门指南:从零理解 PyTorch 的核心思想
  • 2025 五大商旅平台管控力解析:合规要求下的商旅管理新范式