openfeigin 跨服务调用流程 源码阅读
注解的扫描解析
@EnableFeignClients注解导入FeignClientsRegistrar,用于扫描和注册 Feign 客户端。
@EnableFeignClients注解上有两个参数:
basePackages
指定要扫描的包路径;
defaultConfiguration
可以指定全局配置。
FeignClientsRegistrar
FeignClientsRegistrar实现了 ImportBeanDefinitionRegistrar
接口,作用是 在容器启动时动态注册 Bean 定义。接口方法registerBeanDefinitions()如下:
@Overridepublic void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {//1 注册默认配置(global FeignConfiguration)registerDefaultConfiguration(metadata, registry);//2 扫描并注册 @FeignClient 标注的接口registerFeignClients(metadata, registry);}
第一步注册配置先不看了,看下第二部注册@FeignClient 标注的接口
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");if (clients == null || clients.length == 0) {/**EnableFeignClients注解上未指定clients,从指定的basePackages扫描FeignClient注解 转换成对应的bean定义信息*/ClassPathScanningCandidateComponentProvider scanner = getScanner();scanner.setResourceLoader(this.resourceLoader);scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));Set<String> basePackages = getBasePackages(metadata);for (String basePackage : basePackages) {candidateComponents.addAll(scanner.findCandidateComponents(basePackage));}}else {for (Class<?> clazz : clients) {candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));}}for (BeanDefinition candidateComponent : candidateComponents) {if (candidateComponent instanceof AnnotatedBeanDefinition) {// 验证FeignClient注解修饰的bean只能是接口AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());String name = getClientName(attributes);registerClientConfiguration(registry, name, attributes.get("configuration"));//注册FeignClient beanregisterFeignClient(registry, annotationMetadata, attributes);}}}
@FeignClient 注解用来声明这是一个Feign 客户端接口,被修饰的只能是接口。name
或 value
:服务名,用于服务发现,url
:直连地址(不经过注册中心时用);fallback
:指定降级类。
当 @FeignClient
被扫描到时,@FeignClient
接口bean最终由FeignClientFactoryBean 创建。
FeignClientsRegistrar.registerFeignClient()
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,Map<String, Object> attributes) {String className = annotationMetadata.getClassName();Class clazz = ClassUtils.resolveClassName(className, null);ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory? (ConfigurableBeanFactory) registry : null;String contextId = getContextId(beanFactory, attributes);String name = getName(attributes);//创建FeignClientFactoryBeanFeignClientFactoryBean factoryBean = new FeignClientFactoryBean();factoryBean.setBeanFactory(beanFactory);factoryBean.setName(name);factoryBean.setContextId(contextId);factoryBean.setType(clazz);factoryBean.setRefreshableClient(isClientRefreshEnabled());BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {factoryBean.setUrl(getUrl(beanFactory, attributes));factoryBean.setPath(getPath(beanFactory, attributes));factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));Object fallback = attributes.get("fallback");if (fallback != null) {factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback: ClassUtils.resolveClassName(fallback.toString(), null));}Object fallbackFactory = attributes.get("fallbackFactory");if (fallbackFactory != null) {factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory: ClassUtils.resolveClassName(fallbackFactory.toString(), null));}//创建对应接口bean实例方法return factoryBean.getObject();});definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);definition.setLazyInit(true);validate(attributes);AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);// has a default, won't be nullboolean primary = (Boolean) attributes.get("primary");beanDefinition.setPrimary(primary);String[] qualifiers = getQualifiers(attributes);if (ObjectUtils.isEmpty(qualifiers)) {qualifiers = new String[] { contextId + "FeignClient" };}BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);//将bean注册到容器中BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);registerOptionsBeanDefinition(registry, contextId);}
@FeignClient接口bean最终由FeignClientFactoryBean的getObject()方法创建实例,getObject()方法调用getTarget()方法,它调用 Feign.Builder.target() 最后通过 ReflectiveFeign来创建代理对象。在 ReflectiveFeign.newInstance() 里,用 JDK 动态代理创建了接口实现:
ReflectiveFeign#newInstance()
public <T> T newInstance(Target<T> target) {Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();for (Method method : target.type().getMethods()) {if (method.getDeclaringClass() == Object.class) {continue;} else if (Util.isDefault(method)) {//是default接口方法,每个方法创建一个对应的DefaultMethodHandler,使用MethodHandle方式进行调用DefaultMethodHandler handler = new DefaultMethodHandler(method);defaultMethodHandlers.add(handler);methodToHandler.put(method, handler);} else {//其他的需要通过feigin调用的方法,handler类型是SynchronousMethodHandlermethodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));}}//创建handlerInvocationHandler handler = factory.create(target, methodToHandler);//创建代理对象T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),new Class<?>[] {target.type()}, handler);for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {defaultMethodHandler.bindTo(proxy);}return proxy;}
这样@FeignClient修饰的接口就通过动态代理方式创建了一个对那个的实例bean,当调用的时候就会通过InvocationHandler.invoke()方法来处理,这里默认的InvocationHandler实例是FeignInvocationHandler,是ReflectiveFeign的内部类,其invoke方法最后会从上面methodToHandler这个map中找到当前方法对应的handler调用其invoke进行处理。
服务调用
SynchronousMethodHandler
SynchronousMethodHandler是feigin动态代理的InvocationHandler实现类,当接口方法被调用时,会调用其invoke方法
public Object invoke(Object[] argv) throws Throwable {//创建请求模板RequestTemplate template = buildTemplateFromArgs.create(argv);Options options = findOptions(argv);Retryer retryer = this.retryer.clone();while (true) {try {//执行服务调用和解码return executeAndDecode(template, options);} catch (RetryableException e) {try {retryer.continueOrPropagate(e);} catch (RetryableException th) {Throwable cause = th.getCause();if (propagationPolicy == UNWRAP && cause != null) {throw cause;} else {throw th;}}if (logLevel != Logger.Level.NONE) {logger.logRetry(metadata.configKey(), logLevel);}continue;}}}
执行主要是调用client.execute(request, options)。client 是 Feign 的底层 HTTP 客户端(LoadBalancerFeignClient
/ OkHttpClient
/ ApacheHttpClient
)。它负责发起真正的 HTTP 请求。
Http客户端client的初始化在FeignLoadBalancerAutoConfiguration里初始化,在DefaultFeignLoadBalancerConfiguration里默认是FeignBlockingLoadBalancerClient类型。
@Bean@ConditionalOnMissingBean@Conditional(OnRetryNotEnabledCondition.class)public Client feignClient(LoadBalancerClient loadBalancerClient,LoadBalancerClientFactory loadBalancerClientFactory) {return new FeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient,loadBalancerClientFactory);}
这里FeignBlockingLoadBalancerClient内部由一个Client.Default类型的delegate代理client,其execute()方法实际上使用的HttpURLConnection来请求服务接口。
FeignBlockingLoadBalancerClient.execute
public Response execute(Request request, Request.Options options) throws IOException {final URI originalUri = URI.create(request.url());String serviceId = originalUri.getHost();Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);String hint = getHint(serviceId);DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),RequestDataContext.class, ResponseData.class, ServiceInstance.class);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));//使用负载均衡器选择对应的服务实例ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);if (instance == null) {String message = "Load balancer does not contain an instance for the service " + serviceId;if (LOG.isWarnEnabled()) {LOG.warn(message);}supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.DISCARD, lbRequest, lbResponse)));return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();}String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();Request newRequest = buildRequest(request, reconstructedUrl);LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);//使用delegate(Client.Default)代理类来实际发送http请求return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());}