当前位置: 首页 > news >正文

基于Apollo对配置类的热更新优化

背景

关于配置的热更新,apollo 通过`com.ctrip.framework.apollo.spring.annotation.SpringValueProcessor` 处理带@Value的方法或字段,通过监听变更事件,采用反射去更新对应的值

但这个功能仅仅用于单个属性,当我有一组有关联关系的配置时,需要对每一个属性都进行@Value注解标记,这样不利于统一管理配置,比如像下面定义xxljob任务的一些属性

 @Data
    public static class FlushConfig {
        //sleep-interval-mills
        @Value("${xxl-sharding-job.common.sleep-interval-mills:-1}")
        private Integer sleepIntervalMills;
        //batch-size
        @Value("${xxl-sharding-job.common.batch-size:1000}")
        private Integer batchSize;
        //max-retry
        @Value("${xxl-sharding-job.common.max-retry:1000}")
        private Integer maxRetry;
    }

如果有新加的字段,Apollo和本地代码都需要改动,而且需要检查新加的字段是否有漏加注解的情况

另外如果对某个任务需要自定义属性,比如对某个任务自定义定义batchSize,还要在构造配置类,配置名称不同的key,不利于复用和管理

配置绑定

针对这个问题,springboot提供了@ConfigurationProperties注解把一些配置与java对象的属性进行绑定,我们可以定义这样一个配置类

@ConfigurationProperties(prefix = "xxl-sharding-job")
@Component
public class XxlShardingConfig {

    private FlushConfig common;

    private Map<String, FlushConfig> custom;


    @Data
    public static class FlushConfig {
     
        private Integer sleepIntervalMills;
        
        private Integer batchSize;
       
        private Integer maxRetry;
    }

    public int getBatchSize(String xxlJobName) {
        FlushConfig flushConfig = getFlushConfig(xxlJobName);
        return Optional.ofNullable(flushConfig.batchSize).orElse(common.batchSize);
    }

    private FlushConfig getFlushConfig(String xxlJobName) {
        if(custom==null){
            return common;
        }
        return custom.getOrDefault(xxlJobName, common);
    }

    public int getMaxRetry(String xxlJobName) {
        FlushConfig flushConfig = getFlushConfig(xxlJobName);
        return Optional.ofNullable(flushConfig.maxRetry).orElse(common.maxRetry);
    }

    public int getSleepIntervalMills(String xxlJobName) {
        FlushConfig flushConfig = getFlushConfig(xxlJobName);
        return Optional.ofNullable(flushConfig.sleepIntervalMills).orElse(common.sleepIntervalMills);
    }
}

同时配置文件定义如下属性,即可实现配置绑定到属性

xxl-sharding-job.common.sleep-interval-mills=-1
xxl-sharding-job.common.batch-size=1000
xxl-sharding-job.common.max-retry=3
#特殊任务自定义
xxl-sharding-job.custom.yourTask.max-retry=5

基于这种配置类的方式,在特殊任务自定义属性的场景,我们无需修改配置类代码,配置类能提供相关的功能方法,更内聚,但缺陷是无法实现热更新

配置类热更新Demo

查阅apollo相关代码,官网给了个demo

有如下配置类

@ConditionalOnProperty("redis.cache.enabled")
@ConfigurationProperties(prefix = "redis.cache")
@Component("sampleRedisConfig")
@RefreshScope
public class SampleRedisConfig {

  private static final Logger logger = LoggerFactory.getLogger(SampleRedisConfig.class);

  private int expireSeconds;
  private String clusterNodes;
  private int commandTimeout;

  private Map<String, String> someMap = Maps.newLinkedHashMap();
  private List<String> someList = Lists.newLinkedList();

  @PostConstruct
  private void initialize() {
    logger.info(
        "SampleRedisConfig initialized - expireSeconds: {}, clusterNodes: {}, commandTimeout: {}, someMap: {}, someList: {}",
        expireSeconds, clusterNodes, commandTimeout, someMap, someList);
  }

  public void setExpireSeconds(int expireSeconds) {
    this.expireSeconds = expireSeconds;
  }

  public void setClusterNodes(String clusterNodes) {
    this.clusterNodes = clusterNodes;
  }

  public void setCommandTimeout(int commandTimeout) {
    this.commandTimeout = commandTimeout;
  }

  public Map<String, String> getSomeMap() {
    return someMap;
  }

  public List<String> getSomeList() {
    return someList;
  }

  @Override
  public String toString() {
    return String.format(
        "[SampleRedisConfig] expireSeconds: %d, clusterNodes: %s, commandTimeout: %d, someMap: %s, someList: %s",
            expireSeconds, clusterNodes, commandTimeout, someMap, someList);
  }
}

配置刷新类

@ConditionalOnProperty("redis.cache.enabled")
@Component
public class SpringBootApolloRefreshConfig {
  private static final Logger logger = LoggerFactory.getLogger(SpringBootApolloRefreshConfig.class);

  private final SampleRedisConfig sampleRedisConfig;
  private final RefreshScope refreshScope;

  public SpringBootApolloRefreshConfig(
      final SampleRedisConfig sampleRedisConfig,
      final RefreshScope refreshScope) {
    this.sampleRedisConfig = sampleRedisConfig;
    this.refreshScope = refreshScope;
  }

  @ApolloConfigChangeListener(value = {ConfigConsts.NAMESPACE_APPLICATION, "TEST1.apollo", "application.yaml"},
      interestedKeyPrefixes = {"redis.cache."})
  public void onChange(ConfigChangeEvent changeEvent) {
    logger.info("before refresh {}", sampleRedisConfig.toString());
    refreshScope.refresh("sampleRedisConfig");
    logger.info("after refresh {}", sampleRedisConfig.toString());
  }
}

可以看到通过RefreshScope实现配置的刷新

这个类允许bean进行动态刷新(使用refresh/refreshAll方法),再下一次访问时会新建bean实例

所有的生命周期方法都会应用到这个bean上,并支持序列化,本文主要对RefresScope的逻辑进行分析

RefreshScope浅析

Scope 作用域用于管理bean的生命周期和可见范围,比如默认的常见单例,原型,session级

GenericScope是springcloud包下的扩展作用域,为 Bean 提供一个通用的作用域管理机制,支持动态注册和销毁 Bean 实例,同时提供了对作用域生命周期的细粒度控制。

RefreshScope 定义,scope定义为refresh

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
public @interface RefreshScope {
	/**
	 * @see Scope#proxyMode()
	 */
	ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;

}

初始化流程

spring会扫描对应的路径,构造bean定义—org.springframework.context.annotation.ClassPathBeanDefinitionScanner#doScan

对于RefershScope标记的类,解析后ScopedProxyMode=TARGET_CLASS

org.springframework.context.annotation.AnnotationConfigUtils#applyScopedProxyMode

ScopedProxyMode scopedProxyMode = metadata.getScopedProxyMode();
		if (scopedProxyMode.equals(ScopedProxyMode.NO)) {
			return definition;
		}
		boolean proxyTargetClass = scopedProxyMode.equals(ScopedProxyMode.TARGET_CLASS);
		return ScopedProxyCreator.createScopedProxy(definition, registry, proxyTargetClass);

会生成两个bean定义

一个为factoryBean的bean定义,会在refersh流程中进行初始化,名称为beanName

一个为实际配置类的bean定义,名称为scopedTarget.beanName

容器级扩展接口处理

容器启动流程里会调用 org.springframework.context.support.AbstractApplicationContext#invokeBeanFactoryPostProcessors ,这是一个容器级的扩展接口

会调用org.springframework.cloud.context.scope.GenericScope 里的后置处理方法

当对应bean的scope为refresh时,会更改其targetClass为LockedScopedProxyFactoryBean,用于创建 AOP代理对象,并为代理对象添加切面——执行相关方法加读写锁,防止与scope的refresh逻辑发生冲突

CreateBean

在初始化bean时,由于 上面设置targetClass为LockedScopedProxyFactoryBean,父类逻辑(org.springframework.aop.scope.ScopedProxyFactoryBean#setBeanFactory)生成scopedTarget.beanName代理对象并进行初始化操作

生成代理对象的同时也会对 refreshScope的 cache进行缓存初始化

ContextRefreshed

RefreshScope会监听contextRefreshed事件,做一个兜底初始化的动作,保证某个scope下的 bean都已初始化,纳入管理

//org.springframework.beans.factory.support.AbstractBeanFactory#doGetBean
Object scopedInstance = scope.get(beanName, () -> {
							beforePrototypeCreation(beanName);
							try {
								return createBean(beanName, mbd, args);
							}
							finally {
								afterPrototypeCreation(beanName);
							}
						});
						bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);

scope get方法如下,其中org.springframework.cloud.context.scope.ScopeCache#put方法,相当于putIfAbsent方法,

BeanLifecycleWrapper value = this.cache.put(name,
				new BeanLifecycleWrapper(name, objectFactory));
		locks.putIfAbsent(name, new ReentrantReadWriteLock());
		try {
			return value.getBean();
		}
		catch (RuntimeException e) {
			this.errors.put(name, e);
			throw e;
		}

其中wrapper的getBean方法如下,做bean的初始化

if (this.bean == null) {
				synchronized (this.name) {
					if (this.bean == null) {
						this.bean = this.objectFactory.getObject();
					}
				}
			}
			return this.bean;

刷新处理

当调用org.springframework.cloud.context.scope.refresh.RefreshScope#refresh 方法时,会把该缓存清理掉

使用该对象时,由于持有的是代理对象,会走到 scope的处理逻辑,如果 scope中没有缓存该对象,同样会走一遍doGetBean的逻辑,重新加载 bean,从而拿到最新的bean

RefreshScope总结

RefreshScope的核心逻辑是通过自定义Scope去管理bean的生命周期,而factoryBean在初始化阶段生成该Scope对应的代理bean,当访问beanName对象时,走了代理逻辑,实际访问的是 beanName为scopedTarget.beanName的代理对象

在获取bean就走到scope逻辑,里面缓存了代理对象,当 refresh 时,会把代理对象删除,下次访问时会 createBean

优化使用方式

了解原理后,可以发现对每个配置类如果要实现更新,都需要写一个监听器并调用refresh方法,可以看到这个逻辑是公共的,并且@Configuration的prefix和apollo监听的前缀也是一致的,可以设计一个注解来减少这些重复工作

定义热更新注解

@ConfigurationProperties
@Component
@RefreshScope
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ApolloRefreshConfig {
    /**
     * 配置类前缀
     * @return
     */
    @AliasFor(annotation = ConfigurationProperties.class, value = "prefix")
    String prefix();

    /**
     * 默认监听的 namesapce
     * @return
     */
    String[] namespaces() default {"application"};
}

对于该注解标记的类,会注册一个内置的监听器

@Configuration(proxyBeanMethods = false)
public class ApolloConfigListenerRegister implements BeanDefinitionRegistryPostProcessor, EnvironmentAware {
    private static Environment environment;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        for (String beanDefinitionName : registry.getBeanDefinitionNames()) {
            BeanDefinition beanDefinition = registry.getBeanDefinition(beanDefinitionName);
            if (beanDefinition instanceof AnnotatedBeanDefinition) {
                AnnotationMetadata metadata = ((AnnotatedBeanDefinition) beanDefinition).getMetadata();
                MergedAnnotation<ApolloRefreshConfig> mergedAnnotation = metadata.getAnnotations().get(ApolloRefreshConfig.class);
                if (mergedAnnotation.isPresent()) {
                    AnnotationAttributes annotationAttributes = mergedAnnotation.asAnnotationAttributes();
                    String listenName = beanDefinitionName + "-apollo-listener";
                    RootBeanDefinition listenerBean = new RootBeanDefinition(DefaultConfigListener.class);
                    listenerBean.setPropertyValues(new MutablePropertyValues().add("annotationAttributes", annotationAttributes).add("beanName", beanDefinitionName));
                    registry.registerBeanDefinition(listenName, listenerBean);
                }
            }

        }
    }

    @Override
    public void postProcessBeanFactory(@NotNull ConfigurableListableBeanFactory beanFactory) throws BeansException {
        //do nothing
    }

    @Override
    public void setEnvironment(@NotNull Environment environment) {
        ApolloConfigListenerRegister.environment = environment;
    }

    @Setter
    @Slf4j
    static class DefaultConfigListener implements ConfigChangeListener {
        @Resource
        private RefreshScope refreshScope;

        private String beanName;

        private AnnotationAttributes annotationAttributes;

        public void onChange(ConfigChangeEvent changeEvent) {
            LOGGER.info("bean:{},namespace:{} change", beanName, changeEvent.getNamespace());
            for (String changedKey : changeEvent.changedKeys()) {
                ConfigChange changedConfig = changeEvent.getChange(changedKey);
                LOGGER.info("key:{}, oldValue:{}, newValue:{}", changedConfig.getPropertyName(), changedConfig.getOldValue(), changedConfig.getNewValue());
            }
            refreshScope.refresh(beanName);
        }

        @PostConstruct
        public void init() {
            String[] namespaces = annotationAttributes.getStringArray("namespaces");
            String[] annotatedInterestedKeyPrefixes = new String[]{annotationAttributes.getString("prefix")};
            Set<String> interestedKeyPrefixes = Sets.newHashSet(annotatedInterestedKeyPrefixes);
            for (String namespace : namespaces) {
                final String resolvedNamespace = environment.resolveRequiredPlaceholders(namespace);
                Config config = ConfigService.getConfig(resolvedNamespace);
                config.addChangeListener(this, null, interestedKeyPrefixes);
            }
        }
    }

}

当我们定义配置类时候,可以用这个注解实现配置类热更新

/**
 * #公共设置,本地文件预设
 * xxl-sharding-job.common.sleep-interval-mills=-1
 * xxl-sharding-job.common.batch-size=1000
 * xxl-sharding-job.common.max-retry=3
 * #特殊任务自定义
 * xxl-sharding-job.custom.yourTask.max-retry=3
 */
@Data
@Slf4j
@ApolloRefreshConfig(prefix = "xxl-sharding-job")
public class XxlShardingConfig {

    private FlushConfig common;

    private Map<String, FlushConfig> custom;


    @Data
    public static class FlushConfig {
        //sleep-interval-mills
        private Integer sleepIntervalMills;
        //batch-size
        private Integer batchSize;
        //max-retry
        private Integer maxRetry;
    }

    public int getBatchSize(String xxlJobName) {
        FlushConfig flushConfig = getFlushConfig(xxlJobName);
        return Optional.ofNullable(flushConfig.batchSize).orElse(common.batchSize);
    }

    private FlushConfig getFlushConfig(String xxlJobName) {
        if(custom==null){
            return common;
        }
        return custom.getOrDefault(xxlJobName, common);
    }

    public int getMaxRetry(String xxlJobName) {
        FlushConfig flushConfig = getFlushConfig(xxlJobName);
        return Optional.ofNullable(flushConfig.maxRetry).orElse(common.maxRetry);
    }

    public int getSleepIntervalMills(String xxlJobName) {
        FlushConfig flushConfig = getFlushConfig(xxlJobName);
        return Optional.ofNullable(flushConfig.sleepIntervalMills).orElse(common.sleepIntervalMills);
    }
}

实现效果

相关文章:

  • qt-C++笔记之Linux下Qt环境变量设置及与QtCreator的关系
  • C语言标准IO是什么?
  • ffmpeg-static 依赖详解
  • 给wordpress仪表盘添加自定义图标
  • B/B+树与mysql索引
  • 什么是“零日漏洞”(Zero-Day Vulnerability)?为何这类攻击被视为高风险威胁?
  • 009 rocketmq延时消息
  • 机器学习:监督学习、无监督学习和强化学习
  • C语言【指针篇】(四)
  • 开发环境需要同时安装2个nodejs版本
  • 25年前端如何走的更稳
  • 解决VirtualBox - Error In supR3HardenedWinReSpawn报错
  • Kotlin 数据类与密封类
  • #深入了解DNS3和VCTK语音数据集
  • JMeter 引入 JAR 包的几种方法
  • 【SqlServer】SQL Server Management Studio (SSMS) 下载、安装、配置使用及卸载——保姆级教程
  • 酒店管理系统(代码+数据库+LW)
  • python 元组tuple
  • Kafka 为什么会消息堆积?
  • Python基于Django的音乐推荐系统的设计与实现(附源码,文档说明)
  • 织梦软件开发网站模板下载/学校seo推广培训班
  • 自己做的网站怎么链接火车头采集/国内新闻
  • 龙岩seo外包公司/济源新站seo关键词排名推广
  • wordpress新手优化/南京seo招聘
  • 制作网站首先做的工作/如何找到网络公关公司
  • 做网站广告联盟/上海还能推seo吗