重写BeanFactory初始化方法并行加载Bean
文章目录
- 背景
- 实现说明
- 使用方法
- 注意事项
背景
spring项目启动过程中,如何通过重写BeanFactory初始化方法,对初始化耗时长的bean进行并行处理
在Spring中,Bean的初始化默认是单线程的,这主要是为了保证Bean之间的依赖关系正确处理。但对于一些初始化耗时长且依赖关系简单的Bean,我们可以通过扩展Spring的BeanFactory来实现并行初始化,通过自定义BeanFactory和扩展ApplicationContext来实现对特定Bean的并行处理,从而提高启动速度。
实现说明
这个方案的核心思路是通过自定义BeanFactory和ApplicationContext,对特定Bean进行并行初始化,主要包含以下几个部分:
- 标记机制:通过
ParallelBeanMarker
接口标记需要并行初始化的Bean,实现该接口的Bean会被自动识别- 并行Bean检测:
ParallelBeanDetector
实现了BeanFactoryPostProcessor
,在BeanFactory初始化过程中检测所有标记为并行处理的Bean- 自定义BeanFactory:
ParallelProcessingBeanFactory
重写了createBean
方法,对标记的Bean使用线程池进行并行初始化- 扩展ApplicationContext:
ParallelProcessingApplicationContext
创建并使用自定义的BeanFactory,实现并行处理能力
/*** 标记接口,实现此接口的Bean将被并行初始化*/
public interface ParallelBeanMarker {
}
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;/*** 检测需要并行初始化的Bean*/
@Component
public class ParallelBeanDetector implements BeanFactoryPostProcessor {private Set<String> parallelBeanNames = new HashSet<>();@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {// 查找所有实现了ParallelBeanMarker接口的BeanString[] beanNames = beanFactory.getBeanDefinitionNames();for (String beanName : beanNames) {try {Class<?> beanType = beanFactory.getType(beanName);if (beanType != null && ParallelBeanMarker.class.isAssignableFrom(beanType)) {parallelBeanNames.add(beanName);System.out.println("Bean " + beanName + " 标记为并行初始化");}} catch (Throwable e) {// 忽略类型获取失败的情况}}}public Set<String> getParallelBeanNames() {return parallelBeanNames;}
}
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanFactory;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;/*** 支持并行初始化的BeanFactory*/
public class ParallelProcessingBeanFactory extends DefaultListableBeanFactory {// 线程池用于并行初始化Beanprivate final ExecutorService executorService;// 需要并行处理的Bean名称集合private final Set<String> parallelBeanNames;public ParallelProcessingBeanFactory(ExecutorService executorService, Set<String> parallelBeanNames) {this.executorService = executorService;this.parallelBeanNames = parallelBeanNames;}@Overrideprotected Object createBean(String beanName, RootBeanDefinition mbd, Object[] args) throws BeanCreationException {// 检查当前Bean是否需要并行初始化if (parallelBeanNames.contains(beanName) && !isBeanCurrentlyInCreation(beanName)) {try {// 提交到线程池并行处理Future<Object> future = executorService.submit(() -> super.createBean(beanName, mbd, args));return future.get();} catch (InterruptedException | ExecutionException e) {throw new BeanCreationException(beanName, "Error creating bean in parallel", e);}}// 非并行处理的Bean使用默认方式return super.createBean(beanName, mbd, args);}
}
import org.springframework.context.support.AbstractRefreshableApplicationContext;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternResolver;import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 扩展ApplicationContext以支持Bean的并行初始化*/
public class ParallelProcessingApplicationContext extends AbstractRefreshableApplicationContext {// 需要并行处理的Bean名称private final Set<String> parallelBeanNames;// 线程池大小private final int threadPoolSize;public ParallelProcessingApplicationContext(Set<String> parallelBeanNames, int threadPoolSize) {this.parallelBeanNames = parallelBeanNames;this.threadPoolSize = threadPoolSize;}@Overrideprotected void loadBeanDefinitions(DefaultListableBeanFactory beanFactory) {// 这里可以根据需要加载Bean定义// 实际应用中通常会使用XmlBeanDefinitionReader或ComponentScanBeanDefinitionParser}@Overrideprotected DefaultListableBeanFactory createBeanFactory() {// 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);// 返回自定义的支持并行处理的BeanFactoryreturn new ParallelProcessingBeanFactory(executorService, parallelBeanNames);}
}
import org.springframework.context.ApplicationContext;
import java.util.Set;/*** 应用初始化器*/
public class ApplicationInitializer {public ApplicationContext initialize() {// 获取需要并行处理的Bean名称ParallelBeanDetector detector = new ParallelBeanDetector();// 实际应用中,这里应该是Spring正常的初始化流程// 简化示例:创建自定义ApplicationContext并指定并行处理的BeanSet<String> parallelBeanNames = detector.getParallelBeanNames();// 创建应用上下文,设置线程池大小为CPU核心数int threadPoolSize = Runtime.getRuntime().availableProcessors();return new ParallelProcessingApplicationContext(parallelBeanNames, threadPoolSize);}
}
使用方法
- 让耗时长的Bean实现
ParallelBeanMarker
接口
@Component
public class TimeConsumingBean implements ParallelBeanMarker {// 耗时的初始化逻辑
}
- 在应用启动时使用自定义的
ParallelProcessingApplicationContext
- 根据系统情况调整线程池大小(通常设置为CPU核心数)
注意事项
- 依赖关系:并行初始化的Bean之间应避免相互依赖,否则可能导致初始化顺序问题
- 线程安全:确保并行初始化的Bean是线程安全的,其初始化逻辑不会产生竞态条件
- 事务管理:对于需要事务管理的Bean,并行初始化可能会导致事务边界问题
- 测试:并行处理可能引入复杂的并发问题,需要充分测试
- Spring版本兼容性:不同版本的Spring内部实现可能不同,需要根据实际使用的版本调整代码
这种方式可以显著提高包含多个耗时初始化Bean的Spring应用的启动速度,特别是当这些Bean之间没有复杂依赖关系时效果更佳。