当前位置: 首页 > news >正文

RPC调用三 使用代理进行服务自动注册

1. Spring框架进行Bean管理

为了方便进行对象管理,本文使用Spring框架进行Bean的管理。

  • ClassPathBeanDefinitionScanner : 注册下面的类
package github.javaguide.spring;import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.filter.AnnotationTypeFilter;import java.lang.annotation.Annotation;/*** custom package scanner** @author shuang.kou* @createTime 2020年08月10日 21:42:00*/
public class CustomScanner extends ClassPathBeanDefinitionScanner {public CustomScanner(BeanDefinitionRegistry registry, Class<? extends Annotation> annoType) {super(registry);super.addIncludeFilter(new AnnotationTypeFilter(annoType));}@Overridepublic int scan(String... basePackages) {return super.scan(basePackages);}
}
  • CustomScannerRegistrar: 定义类的扫描路径
package github.javaguide.spring;import github.javaguide.annotation.RpcScan;
import github.javaguide.annotation.RpcService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.StandardAnnotationMetadata;
import org.springframework.stereotype.Component;/*** scan and filter specified annotations** @author shuang.kou* @createTime 2020年08月10日 22:12:00*/
@Slf4j
public class CustomScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {private static final String SPRING_BEAN_BASE_PACKAGE = "github.javaguide";private static final String BASE_PACKAGE_ATTRIBUTE_NAME = "basePackage";private ResourceLoader resourceLoader;@Overridepublic void setResourceLoader(ResourceLoader resourceLoader) {this.resourceLoader = resourceLoader;}@Overridepublic void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {//get the attributes and values ​​of RpcScan annotationAnnotationAttributes rpcScanAnnotationAttributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(RpcScan.class.getName()));String[] rpcScanBasePackages = new String[0];if (rpcScanAnnotationAttributes != null) {// get the value of the basePackage propertyrpcScanBasePackages = rpcScanAnnotationAttributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);}if (rpcScanBasePackages.length == 0) {rpcScanBasePackages = new String[]{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()};}// Scan the RpcService annotationCustomScanner rpcServiceScanner = new CustomScanner(beanDefinitionRegistry, RpcService.class);// Scan the Component annotationCustomScanner springBeanScanner = new CustomScanner(beanDefinitionRegistry, Component.class);if (resourceLoader != null) {rpcServiceScanner.setResourceLoader(resourceLoader);springBeanScanner.setResourceLoader(resourceLoader);}int springBeanAmount = springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);log.info("springBeanScanner扫描的数量 [{}]", springBeanAmount);int rpcServiceCount = rpcServiceScanner.scan(rpcScanBasePackages);log.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);}}

2. 注解

本次RPC的代理,基于注解和拦截进行实现,这样方便定义服务接口的调用。

  • RpcService:用来定义Rpc服务
package github.javaguide.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** RPC service annotation, marked on the service implementation class** @author shuang.kou* @createTime 2020年07月21日 13:11:00*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcService {/*** Service version, default value is empty string*/String version() default "";/*** Service group, default value is empty string*/String group() default "";}
  • RpcScan:用来定义Rpc的扫描包地址
package github.javaguide.annotation;import github.javaguide.spring.CustomScannerRegistrar;
import org.springframework.context.annotation.Import;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** scan custom annotations** @author shuang.kou* @createTime 2020年08月10日 21:42:00*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(CustomScannerRegistrar.class)
@Documented
public @interface RpcScan {String[] basePackage();}
  • PpcReference:用来对Rpc服务进行引用
package github.javaguide.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** RPC reference annotation, autowire the service implementation class** @author smile2coder* @createTime 2020年09月16日 21:42:00*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface RpcReference {/*** Service version, default value is empty string*/String version() default "";/*** Service group, default value is empty string*/String group() default "";}

3. 拦截

使用Spring的PostProcessor,解析RpcScan路径上,定义了RpcService的类,将拦截到的对象进行服务注册

package github.javaguide.spring;import github.javaguide.annotation.RpcReference;
import github.javaguide.annotation.RpcService;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcRequestTransportEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;/*** call this method before creating the bean to see if the class is annotated** @author shuang.kou* @createTime 2020年07月14日 16:42:00*/
@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {private final ServiceProvider serviceProvider;private final RpcRequestTransport rpcClient;public SpringBeanPostProcessor() {this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension(RpcRequestTransportEnum.NETTY.getName());}/*** 初始化之前,将有@RpcService注解的类发布到注册中心,并将对象缓存到ServiceProvider中* */@SneakyThrows@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (bean.getClass().isAnnotationPresent(RpcService.class)) {log.info("[{}] is annotated with  [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());// get RpcService annotationRpcService rpcService = bean.getClass().getAnnotation(RpcService.class);// build RpcServicePropertiesRpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcService.group()).version(rpcService.version()).service(bean).build();serviceProvider.publishService(rpcServiceConfig);}return bean;}/*** 初始化之后,将带有@RpcReference注解的属性注入代理对象* */@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = bean.getClass();Field[] declaredFields = targetClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);if (rpcReference != null) {RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcReference.group()).version(rpcReference.version()).build();RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());declaredField.setAccessible(true);try {declaredField.set(bean, clientProxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}
}

4. 服务注册

  • 服务提供者接口,主要有两个核心方法:
    • 发布服务,负责将服务发布到注册中心,并且缓存服务
    • 获取服务,从缓存中拿到服务对象
package github.javaguide.provider;import github.javaguide.config.RpcServiceConfig;/*** store and provide service object.** @author shuang.kou* @createTime 2020年05月31日 16:52:00*/
public interface ServiceProvider {/*** @param rpcServiceConfig rpc service related attributes*/void addService(RpcServiceConfig rpcServiceConfig);/*** @param rpcServiceName rpc service name* @return service object*/Object getService(String rpcServiceName);/*** @param rpcServiceConfig rpc service related attributes*/void publishService(RpcServiceConfig rpcServiceConfig);}
  • 实现类
package github.javaguide.provider.impl;import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.ServiceRegistryEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** @author shuang.kou* @createTime 2020年05月13日 11:23:00*/
@Slf4j
public class ZkServiceProviderImpl implements ServiceProvider {/*** key: rpc service name(interface name + version + group)* value: service object*/private final Map<String, Object> serviceMap;private final Set<String> registeredService;private final ServiceRegistry serviceRegistry;public ZkServiceProviderImpl() {serviceMap = new ConcurrentHashMap<>();registeredService = ConcurrentHashMap.newKeySet();serviceRegistry = ExtensionLoader.getExtensionLoader(ServiceRegistry.class).getExtension(ServiceRegistryEnum.ZK.getName());}@Overridepublic void addService(RpcServiceConfig rpcServiceConfig) {String rpcServiceName = rpcServiceConfig.getRpcServiceName();if (registeredService.contains(rpcServiceName)) {return;}registeredService.add(rpcServiceName);serviceMap.put(rpcServiceName, rpcServiceConfig.getService());log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());}@Overridepublic Object getService(String rpcServiceName) {Object service = serviceMap.get(rpcServiceName);if (null == service) {throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);}return service;}@Overridepublic void publishService(RpcServiceConfig rpcServiceConfig) {try {String host = InetAddress.getLocalHost().getHostAddress();this.addService(rpcServiceConfig);serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));} catch (UnknownHostException e) {log.error("occur exception when getHostAddress", e);}}}
  • 服务注册:将服务注册到注册中心
package github.javaguide.registry;import github.javaguide.extension.SPI;import java.net.InetSocketAddress;/*** service registration** @author shuang.kou* @createTime 2020年05月13日 08:39:00*/
@SPI
public interface ServiceRegistry {/*** register service** @param rpcServiceName    rpc service name* @param inetSocketAddress service address*/void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);}
  • ZK注册中心:使用CuratorFramework进行Java端的操作
package github.javaguide.registry.zk;import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.zk.util.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;import java.net.InetSocketAddress;/*** service registration  based on zookeeper** @author shuang.kou* @createTime 2020年05月31日 10:56:00*/
@Slf4j
public class ZkServiceRegistryImpl implements ServiceRegistry {@Overridepublic void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();CuratorFramework zkClient = CuratorUtils.getZkClient();CuratorUtils.createPersistentNode(zkClient, servicePath);}
}

5. 服务属性对象注入

解析RpcReference注解,对服务进行代理,将本地调用代理为远端的调用。

  • 这个是上面使用Spring进行bean属性注入的方法,包括三步
    • 检测到带有RpcReference注解的属性
    • 创建一个服务的代理对象
    • 将创建的对象,注入到属性中
  /*** 初始化之后,将带有@RpcReference注解的属性注入代理对象* */@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = bean.getClass();Field[] declaredFields = targetClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);if (rpcReference != null) {// 1. 服务的配置RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcReference.group()).version(rpcReference.version()).build();// 2. 创建服务的代理对象RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());// 3. 属性注入declaredField.setAccessible(true);try {declaredField.set(bean, clientProxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}
  • 其中,服务的代理对象,可以通过下面的方法进行创建
    • getProxy创建一个代理对象,对象的invoker就是本类
    • 本类的invoke,使用Netty进行通信,调用远端的服务,得到返回结果
package github.javaguide.proxy;import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.netty.client.NettyRpcClient;
import github.javaguide.remoting.transport.socket.SocketRpcClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;/*** Dynamic proxy class.* When a dynamic proxy object calls a method, it actually calls the following invoke method.* It is precisely because of the dynamic proxy that the remote method called by the client is like calling the local method (the intermediate process is shielded)** @author shuang.kou* @createTime 2020年05月10日 19:01:00*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {private static final String INTERFACE_NAME = "interfaceName";/*** Rpc通信*/private final RpcRequestTransport rpcRequestTransport;/**进行配置*/private final RpcServiceConfig rpcServiceConfig;public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {this.rpcRequestTransport = rpcRequestTransport;this.rpcServiceConfig = rpcServiceConfig;}public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {this.rpcRequestTransport = rpcRequestTransport;this.rpcServiceConfig = new RpcServiceConfig();}/*** 使用jdk自带的Proxy进行代理*/@SuppressWarnings("unchecked")public <T> T getProxy(Class<T> clazz) {return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);}/*** This method is actually called when you use a proxy object to call a method.* The proxy object is the object you get through the getProxy method.*/@SneakyThrows@SuppressWarnings("unchecked")@Overridepublic Object invoke(Object proxy, Method method, Object[] args) {log.info("invoked method: [{}]", method.getName());// 1. 创建Rpc请求RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()).parameters(args).interfaceName(method.getDeclaringClass().getName()).paramTypes(method.getParameterTypes()).requestId(UUID.randomUUID().toString()).group(rpcServiceConfig.getGroup()).version(rpcServiceConfig.getVersion()).build();// 2. 网络通信,将请求发送到远端,得到响应结果RpcResponse<Object> rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);// 3. 检查并返回Rpcthis.check(rpcResponse, rpcRequest);return rpcResponse.getData();}private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {if (rpcResponse == null) {throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());}if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());}if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());}}
}

相关文章:

  • 清华大学视觉空间智能新突破!Spatial-MLLM:提升多模态大语言模型的视觉空间智能能力
  • Remmina远程访问如何开启本地音频?
  • 论文解析:一文弄懂ResNet(图像识别分类、目标检测)
  • 高效多尺度网络与可学习离散小波变换用于盲运动去模糊
  • 守护数字世界:网络安全核心技术与实践策略
  • 进程间通信详解(二):System V IPC 三件套全面解析
  • ABP vNext + Hive 集成:多租户大数据 SQL 查询与报表分析
  • 到院率最高提升40%,消费医疗用AI营销机器人跑赢增长焦虑
  • MySQL中event突然不执行问题分析
  • C++ 8.1 内联函数
  • 如何使用 DeepSeek 帮助自己的工作
  • 深入解析MySQL锁机制:从全局锁到行级锁的全面指南
  • Uniapp如何适配HarmonyOS5?条件编译指南以及常见的错误有哪些?
  • DAY47打卡
  • 常见算法题目6 - 给定一个字符串,输出其最长的回文子串
  • 多场景 OkHttpClient 管理器 - Android 网络通信解决方案
  • 用户体验升级:表单失焦调用接口验证,错误信息即时可视化
  • 111页可编辑精品PPT | 华为业务变革框架及战略级项目管理华为数字化转型方法论
  • 不同类型的道路运输安全员证书(如公路、水路、联运)考试内容有何区别?
  • 力扣LFU460
  • 可以做微信游戏的网站有哪些/厦门seo网站推广优化
  • 为什么招聘网站做不大/墨猴seo排名公司
  • 网站建设同行友情链接/百度指数怎么下载
  • 哪个网站可以做c 的项目/搜狗搜索引擎优化论文
  • 国外一个做ppt的网站/seo关键词优化费用
  • 武汉做网站的公司有哪些/跨境电商有哪些平台