当前位置: 首页 > news >正文

13-netty基础-手写rpc-消费方生成代理-05

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 功能逻辑

在客户端启动的时候要为添加了BonnieRemoteReference注解的属性生成一个代理类;代理类的主要功能:在spring容器加载完BeanDefinition之后,在Bean初始化之前,触发生成代理类。
逻辑:

  1. 获取到所有的BeanDefinition
  2. 拿到BeanDefinition对应的class
  3. 遍历class下的所有被BonnieRemoteReference修饰的属性(成员变量)
  4. 为被BonnieRemoteReference修饰的属性,使用BeanDefinitionBuilder构建BeanDefinition,设置interfaceClass、serviceAddress、servicePort属性,并放入到spring容器中,对象的类型为SpringRpcReferenceBean;
  5. SpringRpcReferenceBean实现FactoryBean接口,然后在getObject中返回代理对象。
  6. 编写NettyClient代码

补充:

Spring 的 FactoryBean 是一个工厂 bean 接口,用于自定义 bean 的创建逻辑。它的核心作用是:

  • 当容器获取该 bean 时(如 getBean("xxx")),实际返回的是 getObject() 方法创建的对象,而非 SpringRpcReferenceBean 自身实例。
  • 常用于创建复杂对象(如远程服务代理、数据库连接池等)

2 重点代码介绍

2.1 触发生成代理类入口代码

在spring容器加载BeanDefinition之后,在Bean初始化之前执行,实现接口BeanFactoryPostProcessor接口中postProcessBeanFactory方法即可
 

获取所有的beanDefinitionNames
String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();

获取beanClassName对应的类信息
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);

获取clazz上的所有属性(成员变量)
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);

当前这个field是否被BonnieRemoteReference注解修饰
BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);

生成SpringRpcReferenceBean的BeanDefinition
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class)
放入属性,远程调用中需要的内容,比如是那个类,以及地址端口信息
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);

放入到spring容器中
registry.registerBeanDefinition(entry.getKey(), entry.getValue());

package com.bonnie.protocol.spring.reference;import com.bonnie.protocol.annotation.BonnieRemoteReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext applicationContext;private ClassLoader classLoader;//保存发布的引用bean的信息private final Map<String, BeanDefinition> rpcRefBeanDefinitionMap = new ConcurrentHashMap<>();private RpcClientProperties rpcClientProperties;public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {this.rpcClientProperties = rpcClientProperties;}/*** 实现postProcessBeanFactory方法,spring容器加载了bean的定义文件之后, 在bean实例化之前执行* 1、将类型的存在的BonnieRemoteReference注解的属性,构造BeanDefinition放在容器中,beanName是类的全限定名, BeanDefinition(类的全限定名,客户端IP,客户端端口号)* @param beanFactory* @throws BeansException*/@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {// 获取到所有的beanDefinitionString[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();// 遍历for (String beanDefinitionName : beanDefinitionNames) {BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (Objects.nonNull(beanClassName)) {// 获取到这个类的所有fieldClass<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);// 该方法遍历class对象中的所有的field属性,并且作为参数传入到parseRpcReference方法中ReflectionUtils.doWithFields(clazz, this::parseRpcReference);}}// 将生成的BeanDefinition放入到容器中BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;Set<Map.Entry<String, BeanDefinition>> entries = this.rpcRefBeanDefinitionMap.entrySet();for (Map.Entry<String, BeanDefinition> entry : entries) {if (applicationContext.containsBean(entry.getKey())) {log.warn("SpringContext already register bean {}", entry.getKey());} else {registry.registerBeanDefinition(entry.getKey(), entry.getValue());log.info("registered RpcReferenceBean {} success", entry.getKey());}}}private void parseRpcReference(Field field) {// 当前这个field是否被BonnieRemoteReference注解修饰BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);// BonnieRemoteReference注解修饰if (Objects.nonNull(remoteReference)) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);builder.addPropertyValue("interfaceClass", field.getType());builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());BeanDefinition beanDefinition = builder.getBeanDefinition();rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);}}@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader = classLoader;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}

2.2 生成代理类代码

上面会被BonnieRemoteReference修饰的属性(Field)为生成SpringRpcReferenceBean对象,并添加相关的属性。

实现FactoryBean接口,当spring获取SpringRpcReferenceBean对象的时候,调用的就是里面的getObject对象,在getObject里面生成一个代理类,即可代理被BonnieRemoteReference修饰的类。

package com.bonnie.protocol.spring.reference;import lombok.Setter;
import org.springframework.beans.factory.FactoryBean;import java.lang.reflect.Proxy;/*** 创建SpringRpcReferenceBean的代理对象*/
@Setter
public class SpringRpcReferenceBean implements FactoryBean<Object> {private String serviceAddress;private Integer servicePort;private Class<?> interfaceClass;/*** 返回由工厂创建的目标Bean实例* @return* @throws Exception*/@Overridepublic Object getObject() throws Exception {System.out.println("代理类 serviceAddress "+serviceAddress);System.out.println("代理类 servicePort "+servicePort);System.out.println("代理类 interfaceClass "+interfaceClass);// 为BonnieRemoteReference生成一个代理类return Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new RpcInvokerProxy(serviceAddress, servicePort));}/*** 返回目标Bean的类型* @return*/@Overridepublic Class<?> getObjectType() {return this.interfaceClass;}}

2.3 代理类handler

这块主要是在发生rpc调用的时候,组装请求信息,并通过nettyClient向服务端发起连接并且发送请求。

package com.bonnie.protocol.spring.reference;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.enums.SerialTypeEnum;
import com.bonnie.protocol.netty.NettyClient;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;public class RpcInvokerProxy implements InvocationHandler {private String host;private Integer port;public RpcInvokerProxy(String host, Integer port) {this.host = host;this.port = port;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 构建发送的请求报文,首先去创建RequestHold类,在这个类定义一个原子自增的RequestId,* 在一个就是每次请求都会有结果,那么请求id和结果的关系要有一个映射关系*/RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();long requestId = RequestHolder.REQUEST_ID.incrementAndGet();System.out.println("生成的requestId:" + requestId);Header header = new Header();header.setMagic(RpcConstant.MAGIC);header.setSerialType(SerialTypeEnum.JAVA_SERIAL.getCode());header.setReqType(ReqTypeEnum.REQUEST.getCode());header.setRequestId(requestId);header.setLength(0);RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParams(args);rpcRequest.setParameterTypes(method.getParameterTypes());reqProtocol.setHeader(header);reqProtocol.setContent(rpcRequest);// 发起远程调用NettyClient nettyClient = new NettyClient(host, port);System.out.println("代理发送到服务端请求内容:" + JSONObject.toJSONString(reqProtocol));// new DefaultEventLoop(),是用来去执行监听器的RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));// 在发起请求之前,添加映射关系到map中RequestHolder.REQUEST_MAP.put(header.getRequestId(), future);// 客户端发送数据nettyClient.sendRequest(reqProtocol);// 通过promise,异步等待服务端发送数据来,不然就会一直在此等待// get方法得到的是RpcResponse类,然后调用getData方法获取到数据return future.getPromise().get().getData();}
}

2.4 netty客户端代码

这块主要包含创建客户端、向服务端发起连接、发送请求,也会设置前文中自定义编解码、序列化的操作

package com.bonnie.protocol.netty;import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClient {private final Bootstrap bootstrap;private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();private String serviceAddress;private Integer servicePort;public NettyClient(String serviceAddress, Integer servicePort) {log.info("开始初始化NettyClient======");bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.info("开始初始化RpcClientInitializer======");ch.pipeline().addLast(new LoggingHandler()).addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new RpcClientHandler());}});this.serviceAddress = serviceAddress;this.servicePort = servicePort;}/*** 发送数据* @param protocol* @throws Exception*/public void sendRequest(RpcProtocol<RpcRequest> protocol) {try {System.out.println(this.serviceAddress+ "===="+this.servicePort);final ChannelFuture channelFuture  = bootstrap.connect(this.serviceAddress, this.servicePort).sync();// 注册一个监听器,如果出问题就关闭groupchannelFuture.addListener(listener -> {if (channelFuture.isSuccess()) {log.info("connect rpc server {} success.",this.serviceAddress);} else {log.error("connect rpc server {} failed. ",this.servicePort);channelFuture.cause().printStackTrace();eventLoopGroup.shutdownGracefully();}});log.info("begin transfer data");// 向服务端发送数据channelFuture.channel().writeAndFlush(protocol);} catch (InterruptedException e) {e.printStackTrace();}}}

2.5 netty客户端接收服务端响应数据

package com.bonnie.protocol.netty;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.spring.reference.RequestHolder;
import com.bonnie.protocol.spring.reference.RpcFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {/*** 接收服务端响应数据* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {long requestId = msg.getHeader().getRequestId();log.info("接收服务端响应的结果====== requestId {} {}", requestId, JSONObject.toJSONString(msg));// 删除映射关系RpcFuture<RpcResponse> future = RequestHolder.REQUEST_MAP.remove(requestId);// 我们之前说异步等待服务端发送数据过来,那么只要服务端发送数据过来,就会调用管道RpcClentHandler的read方法// 那么当初future.getPromise().get()如果不再阻塞获取数据呢?就是通过给Promise中的Success设置值,同时会唤醒阻塞的线程// 一当唤醒线程, future.getPromise().get()就会不再阻塞,就获取到服务端返回的数据future.getPromise().setSuccess(msg.getContent());}}

http://www.dtcms.com/a/319619.html

相关文章:

  • Qt——入门
  • 数据赋能(386)——数据挖掘——迭代过程
  • Spring、Spring MVC、MyBatis 和 Spring Boot的关系
  • Ethereum:如何优雅部署 NPM 包中的第三方智能合约?
  • LoadBalancingSpi
  • Beelzebub靶机
  • MyCAT实战环节
  • 动手学深度学习13.10. 转置卷积 -笔记练习(PyTorch)
  • 在新建word中使用以前文件中的列表样式
  • Python调用Shell指令的方法与实践
  • 深海中的类型晨曦
  • Jmeter使用第一节-认识面板(Mac版)
  • 初识C++类的6个默认成员函数
  • 以复合赋值运算符(op=)优化单独运算符(op)的实现
  • BKP 与 RTC 时钟
  • 从Text2SQL到Text2Metrics:衡石指标管理技术跃迁
  • 【Bluedroid】蓝牙音频接收端活动设备切换机制深度解析(sink_set_active_device)
  • 密码学侧信道攻击(Side-channel Attack):从物理泄露中窃取密钥
  • 水库大坝安全监测系统主要概述
  • 护网行动之后:容器安全如何升级?微隔离打造内网“微堡垒”
  • SkyWalking-1--SkyWalking是什么?
  • 基于MATLAB实现支持向量机(SVM)分类
  • `/dev/vdb` 是一个新挂载的 4TB 硬盘,但目前尚未对其进行分区和格式化。
  • WebSocket 在多线程环境下处理 Session并发
  • 多数据中心运维:别让 “分布式” 变成 “混乱式”
  • 机器学习 [白板推导](七)[概率图模型]
  • QtC++ 中使用 qtwebsocket 开源库实现基于websocket的本地服务开发详解
  • 30-Hive SQL-DML-Load加载数据
  • 黄金将变盘【月相】择时交易系统黄金,为何即将变盘?
  • 【深度学习机器学习】构建情绪对话模型:从数据到部署的完整实践