RPC调用三 使用代理进行服务自动注册
1. Spring框架进行Bean管理
为了方便进行对象管理,本文使用Spring框架进行Bean的管理。
ClassPathBeanDefinitionScanner
: 注册下面的类
package github.javaguide.spring;import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.filter.AnnotationTypeFilter;import java.lang.annotation.Annotation;/*** custom package scanner** @author shuang.kou* @createTime 2020年08月10日 21:42:00*/
public class CustomScanner extends ClassPathBeanDefinitionScanner {public CustomScanner(BeanDefinitionRegistry registry, Class<? extends Annotation> annoType) {super(registry);super.addIncludeFilter(new AnnotationTypeFilter(annoType));}@Overridepublic int scan(String... basePackages) {return super.scan(basePackages);}
}
CustomScannerRegistrar
: 定义类的扫描路径
package github.javaguide.spring;import github.javaguide.annotation.RpcScan;
import github.javaguide.annotation.RpcService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.StandardAnnotationMetadata;
import org.springframework.stereotype.Component;/*** scan and filter specified annotations** @author shuang.kou* @createTime 2020年08月10日 22:12:00*/
@Slf4j
public class CustomScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {private static final String SPRING_BEAN_BASE_PACKAGE = "github.javaguide";private static final String BASE_PACKAGE_ATTRIBUTE_NAME = "basePackage";private ResourceLoader resourceLoader;@Overridepublic void setResourceLoader(ResourceLoader resourceLoader) {this.resourceLoader = resourceLoader;}@Overridepublic void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {//get the attributes and values of RpcScan annotationAnnotationAttributes rpcScanAnnotationAttributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(RpcScan.class.getName()));String[] rpcScanBasePackages = new String[0];if (rpcScanAnnotationAttributes != null) {// get the value of the basePackage propertyrpcScanBasePackages = rpcScanAnnotationAttributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);}if (rpcScanBasePackages.length == 0) {rpcScanBasePackages = new String[]{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()};}// Scan the RpcService annotationCustomScanner rpcServiceScanner = new CustomScanner(beanDefinitionRegistry, RpcService.class);// Scan the Component annotationCustomScanner springBeanScanner = new CustomScanner(beanDefinitionRegistry, Component.class);if (resourceLoader != null) {rpcServiceScanner.setResourceLoader(resourceLoader);springBeanScanner.setResourceLoader(resourceLoader);}int springBeanAmount = springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);log.info("springBeanScanner扫描的数量 [{}]", springBeanAmount);int rpcServiceCount = rpcServiceScanner.scan(rpcScanBasePackages);log.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);}}
2. 注解
本次RPC的代理,基于注解和拦截进行实现,这样方便定义服务接口的调用。
RpcService
:用来定义Rpc服务
package github.javaguide.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** RPC service annotation, marked on the service implementation class** @author shuang.kou* @createTime 2020年07月21日 13:11:00*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcService {/*** Service version, default value is empty string*/String version() default "";/*** Service group, default value is empty string*/String group() default "";}
RpcScan
:用来定义Rpc的扫描包地址
package github.javaguide.annotation;import github.javaguide.spring.CustomScannerRegistrar;
import org.springframework.context.annotation.Import;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** scan custom annotations** @author shuang.kou* @createTime 2020年08月10日 21:42:00*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(CustomScannerRegistrar.class)
@Documented
public @interface RpcScan {String[] basePackage();}
- PpcReference:用来对Rpc服务进行引用
package github.javaguide.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** RPC reference annotation, autowire the service implementation class** @author smile2coder* @createTime 2020年09月16日 21:42:00*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface RpcReference {/*** Service version, default value is empty string*/String version() default "";/*** Service group, default value is empty string*/String group() default "";}
3. 拦截
使用Spring的PostProcessor,解析RpcScan路径上,定义了RpcService的类,将拦截到的对象进行服务注册
package github.javaguide.spring;import github.javaguide.annotation.RpcReference;
import github.javaguide.annotation.RpcService;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcRequestTransportEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;/*** call this method before creating the bean to see if the class is annotated** @author shuang.kou* @createTime 2020年07月14日 16:42:00*/
@Slf4j
@Component
public class SpringBeanPostProcessor implements BeanPostProcessor {private final ServiceProvider serviceProvider;private final RpcRequestTransport rpcClient;public SpringBeanPostProcessor() {this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension(RpcRequestTransportEnum.NETTY.getName());}/*** 初始化之前,将有@RpcService注解的类发布到注册中心,并将对象缓存到ServiceProvider中* */@SneakyThrows@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (bean.getClass().isAnnotationPresent(RpcService.class)) {log.info("[{}] is annotated with [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());// get RpcService annotationRpcService rpcService = bean.getClass().getAnnotation(RpcService.class);// build RpcServicePropertiesRpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcService.group()).version(rpcService.version()).service(bean).build();serviceProvider.publishService(rpcServiceConfig);}return bean;}/*** 初始化之后,将带有@RpcReference注解的属性注入代理对象* */@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = bean.getClass();Field[] declaredFields = targetClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);if (rpcReference != null) {RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcReference.group()).version(rpcReference.version()).build();RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());declaredField.setAccessible(true);try {declaredField.set(bean, clientProxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}
}
4. 服务注册
- 服务提供者接口,主要有两个核心方法:
- 发布服务,负责将服务发布到注册中心,并且缓存服务
- 获取服务,从缓存中拿到服务对象
package github.javaguide.provider;import github.javaguide.config.RpcServiceConfig;/*** store and provide service object.** @author shuang.kou* @createTime 2020年05月31日 16:52:00*/
public interface ServiceProvider {/*** @param rpcServiceConfig rpc service related attributes*/void addService(RpcServiceConfig rpcServiceConfig);/*** @param rpcServiceName rpc service name* @return service object*/Object getService(String rpcServiceName);/*** @param rpcServiceConfig rpc service related attributes*/void publishService(RpcServiceConfig rpcServiceConfig);}
- 实现类
package github.javaguide.provider.impl;import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.ServiceRegistryEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** @author shuang.kou* @createTime 2020年05月13日 11:23:00*/
@Slf4j
public class ZkServiceProviderImpl implements ServiceProvider {/*** key: rpc service name(interface name + version + group)* value: service object*/private final Map<String, Object> serviceMap;private final Set<String> registeredService;private final ServiceRegistry serviceRegistry;public ZkServiceProviderImpl() {serviceMap = new ConcurrentHashMap<>();registeredService = ConcurrentHashMap.newKeySet();serviceRegistry = ExtensionLoader.getExtensionLoader(ServiceRegistry.class).getExtension(ServiceRegistryEnum.ZK.getName());}@Overridepublic void addService(RpcServiceConfig rpcServiceConfig) {String rpcServiceName = rpcServiceConfig.getRpcServiceName();if (registeredService.contains(rpcServiceName)) {return;}registeredService.add(rpcServiceName);serviceMap.put(rpcServiceName, rpcServiceConfig.getService());log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());}@Overridepublic Object getService(String rpcServiceName) {Object service = serviceMap.get(rpcServiceName);if (null == service) {throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);}return service;}@Overridepublic void publishService(RpcServiceConfig rpcServiceConfig) {try {String host = InetAddress.getLocalHost().getHostAddress();this.addService(rpcServiceConfig);serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));} catch (UnknownHostException e) {log.error("occur exception when getHostAddress", e);}}}
- 服务注册:将服务注册到注册中心
package github.javaguide.registry;import github.javaguide.extension.SPI;import java.net.InetSocketAddress;/*** service registration** @author shuang.kou* @createTime 2020年05月13日 08:39:00*/
@SPI
public interface ServiceRegistry {/*** register service** @param rpcServiceName rpc service name* @param inetSocketAddress service address*/void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress);}
- ZK注册中心:使用CuratorFramework进行Java端的操作
package github.javaguide.registry.zk;import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.zk.util.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;import java.net.InetSocketAddress;/*** service registration based on zookeeper** @author shuang.kou* @createTime 2020年05月31日 10:56:00*/
@Slf4j
public class ZkServiceRegistryImpl implements ServiceRegistry {@Overridepublic void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();CuratorFramework zkClient = CuratorUtils.getZkClient();CuratorUtils.createPersistentNode(zkClient, servicePath);}
}
5. 服务属性对象注入
解析RpcReference注解,对服务进行代理,将本地调用代理为远端的调用。
- 这个是上面使用Spring进行bean属性注入的方法,包括三步
- 检测到带有RpcReference注解的属性
- 创建一个服务的代理对象
- 将创建的对象,注入到属性中
/*** 初始化之后,将带有@RpcReference注解的属性注入代理对象* */@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = bean.getClass();Field[] declaredFields = targetClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);if (rpcReference != null) {// 1. 服务的配置RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder().group(rpcReference.group()).version(rpcReference.version()).build();// 2. 创建服务的代理对象RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());// 3. 属性注入declaredField.setAccessible(true);try {declaredField.set(bean, clientProxy);} catch (IllegalAccessException e) {e.printStackTrace();}}}return bean;}
- 其中,服务的代理对象,可以通过下面的方法进行创建
- getProxy创建一个代理对象,对象的invoker就是本类
- 本类的invoke,使用Netty进行通信,调用远端的服务,得到返回结果
package github.javaguide.proxy;import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.netty.client.NettyRpcClient;
import github.javaguide.remoting.transport.socket.SocketRpcClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;/*** Dynamic proxy class.* When a dynamic proxy object calls a method, it actually calls the following invoke method.* It is precisely because of the dynamic proxy that the remote method called by the client is like calling the local method (the intermediate process is shielded)** @author shuang.kou* @createTime 2020年05月10日 19:01:00*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {private static final String INTERFACE_NAME = "interfaceName";/*** Rpc通信*/private final RpcRequestTransport rpcRequestTransport;/**进行配置*/private final RpcServiceConfig rpcServiceConfig;public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {this.rpcRequestTransport = rpcRequestTransport;this.rpcServiceConfig = rpcServiceConfig;}public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {this.rpcRequestTransport = rpcRequestTransport;this.rpcServiceConfig = new RpcServiceConfig();}/*** 使用jdk自带的Proxy进行代理*/@SuppressWarnings("unchecked")public <T> T getProxy(Class<T> clazz) {return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);}/*** This method is actually called when you use a proxy object to call a method.* The proxy object is the object you get through the getProxy method.*/@SneakyThrows@SuppressWarnings("unchecked")@Overridepublic Object invoke(Object proxy, Method method, Object[] args) {log.info("invoked method: [{}]", method.getName());// 1. 创建Rpc请求RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()).parameters(args).interfaceName(method.getDeclaringClass().getName()).paramTypes(method.getParameterTypes()).requestId(UUID.randomUUID().toString()).group(rpcServiceConfig.getGroup()).version(rpcServiceConfig.getVersion()).build();// 2. 网络通信,将请求发送到远端,得到响应结果RpcResponse<Object> rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);// 3. 检查并返回Rpcthis.check(rpcResponse, rpcRequest);return rpcResponse.getData();}private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {if (rpcResponse == null) {throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());}if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());}if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());}}
}