Spring Boot 异步处理框架核心源码解析及实现原理
Spring Boot 异步处理框架核心源码解析及实现原理
1. 引言
在现代高并发应用场景中,异步处理是提升系统性能和响应能力的关键技术。Spring Boot通过@EnableAsync
和@Async
注解提供了简洁而强大的异步处理能力。本文将深入分析Spring Boot异步处理框架的核心源码,揭示其实现原理。
2. @EnableAsync 启动机制
2.1 注解定义与功能
/*** 启用Spring异步方法执行功能的注解* 通过@Import导入AsyncConfigurationSelector来注册必要的组件*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {/*** 指定要在类或方法级别检测的'async'注解类型* <p>默认情况下,Spring的@{@link Async}注解和EJB 3.1的{@code @javax.ejb.Asynchronous}注解都会被检测到* <p>此属性的存在是为了让开发人员可以提供自己的自定义注解类型,* 用来指示某个方法(或给定类的所有方法)应该异步调用*/Class<? extends Annotation> annotation() default Annotation.class;/*** 指示是否创建基于子类的(CGLIB)代理,而不是标准的基于Java接口的代理* <p><strong>仅当{@link #mode}设置为{@link AdviceMode#PROXY}时适用</strong>* <p>默认值为{@code false}* <p>注意,将此属性设置为{@code true}会影响所有需要代理的Spring管理bean,* 而不仅仅是标记了{@code @Async}的那些bean。* 例如,其他标记了Spring的{@code @Transactional}注解的bean也将同时升级为子类代理。* 在实践中这种方法没有负面影响,除非明确期望一种类型的代理而不是另一种类型——例如在测试中*/boolean proxyTargetClass() default false;/*** 指示异步通知应该如何应用* <p><b>默认是{@link AdviceMode#PROXY}</b>* 请注意,代理模式只允许拦截通过代理的调用。* 同一类内的本地调用无法通过这种方式拦截;在本地调用中的此类方法上的{@link Async}注解将被忽略,* 因为Spring的拦截器甚至不会在这种运行时场景中启动。* 对于更高级的拦截模式,请考虑将其切换到{@link AdviceMode#ASPECTJ}*/AdviceMode mode() default AdviceMode.PROXY;/*** 指示{@link AsyncAnnotationBeanPostProcessor}应该应用的顺序* <p>默认是{@link Ordered#LOWEST_PRECEDENCE},以便在所有其他后处理器之后运行,* 这样它可以向现有代理添加advisor,而不是双重代理*/int order() default Ordered.LOWEST_PRECEDENCE;}
2.2 AsyncConfigurationSelector 源码解析
AsyncConfigurationSelector
是配置选择的核心组件,根据不同的AdviceMode
选择相应的配置类:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {@Overridepublic String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}}
}
2.3 ProxyAsyncConfiguration 配置类
ProxyAsyncConfiguration
负责创建异步处理的基础Bean:
/*** 异步代理配置类,用于处理 @Async 注解的配置*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {/*** 创建异步注解的Bean后置处理器* * @return AsyncAnnotationBeanPostProcessor 异步注解处理器实例*/@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {// 断言 @EnableAsync 注解元数据已注入Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");// 创建异步注解Bean后置处理器实例AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();// 配置执行器和异常处理器bpp.configure(this.executor, this.exceptionHandler);// 获取自定义的异步注解类型Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");// 如果自定义了异步注解类型,则设置该注解类型if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}// 设置是否使用CGLIB代理目标类bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));// 设置处理器的执行顺序bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}}
2.4 AsyncConfigurer 自定义配置
通过实现AsyncConfigurer
接口可以自定义线程池和异常处理器:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("Async-");executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new CustomAsyncExceptionHandler();}
}
3. @Async 注解执行机制
3.1 AsyncAnnotationBeanPostProcessor
这是异步处理的核心Bean后处理器,负责识别@Async
注解并创建代理:
/*** 异步注解的Bean后置处理器,用于处理@Async注解* 继承自AbstractBeanFactoryAwareAdvisingPostProcessor,提供AOP增强功能*/
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {/*** 默认的{@link TaskExecutor} bean名称:"taskExecutor"* <p>注意,初始查找是按类型进行的;这只是一个后备方案,* 以防在上下文中找到多个执行器bean时使用* @since 4.2* @see AnnotationAsyncExecutionInterceptor#DEFAULT_TASK_EXECUTOR_BEAN_NAME*/public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;protected final Log logger = LogFactory.getLog(getClass());// 执行器供应器,用于提供异步执行的线程池@Nullableprivate Supplier<Executor> executor;// 异常处理器供应器,用于处理异步执行中的未捕获异常@Nullableprivate Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;// 异步注解类型,默认为@Async注解@Nullableprivate Class<? extends Annotation> asyncAnnotationType;/*** 构造函数,设置在现有advisor之前应用*/public AsyncAnnotationBeanPostProcessor() {setBeforeExistingAdvisors(true);}/*** 使用给定的执行器和异常处理器供应器配置此后置处理器,* 如果供应器不可解析,则应用相应的默认值* @since 5.1*/public void configure(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {this.executor = executor;this.exceptionHandler = exceptionHandler;}/*** 设置异步调用方法时要使用的{@link Executor}* <p>如果未指定,则应用默认的执行器解析:在上下文中搜索唯一的{@link TaskExecutor} bean,* 或者搜索名为"taskExecutor"的{@link Executor} bean。* 如果两者都不可解析,则在拦截器内创建一个本地默认执行器* @see AnnotationAsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)* @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME*/public void setExecutor(Executor executor) {this.executor = SingletonSupplier.of(executor);}/*** 设置用于处理异步方法执行时抛出的未捕获异常的{@link AsyncUncaughtExceptionHandler}* @since 4.1*/public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {this.exceptionHandler = SingletonSupplier.of(exceptionHandler);}/*** 设置要在类或方法级别检测的'async'注解类型。* 默认情况下,将检测{@link Async}注解和EJB 3.1的{@code javax.ejb.Asynchronous}注解* <p>此setter属性的存在是为了让开发人员可以提供自己的(非Spring特定的)注解类型,* 用来指示某个方法(或给定类的所有方法)应该异步调用* @param asyncAnnotationType 所需的注解类型*/public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");this.asyncAnnotationType = asyncAnnotationType;}/*** 设置Bean工厂,并创建异步注解advisor* @param beanFactory Bean工厂*/@Overridepublic void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);// 创建异步注解advisor并进行配置AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);if (this.asyncAnnotationType != null) {advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);this.advisor = advisor;}}
3.2 AsyncAnnotationAdvisor 解析
AsyncAnnotationAdvisor
负责创建切点和拦截通知:
/*** 异步注解的Advisor,用于为@Async注解的方法提供AOP增强* 继承自AbstractPointcutAdvisor,实现BeanFactoryAware接口*/
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {private Advice advice; // 通知,包含实际的异步执行逻辑private Pointcut pointcut; // 切点,用于确定哪些方法需要被增强/*** 为bean风格的配置创建一个新的{@code AsyncAnnotationAdvisor}*/public AsyncAnnotationAdvisor() {this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);}/*** 为给定的任务执行器创建一个新的{@code AsyncAnnotationAdvisor}* @param executor 用于异步方法的任务执行器(可以为{@code null}以触发默认执行器解析)* @param exceptionHandler 用于处理异步方法执行时抛出的意外异常的{@link AsyncUncaughtExceptionHandler}* @see AnnotationAsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)*/public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));}/*** 为给定的任务执行器创建一个新的{@code AsyncAnnotationAdvisor}* @param executor 用于异步方法的任务执行器(可以为{@code null}以触发默认执行器解析)* @param exceptionHandler 用于处理异步方法执行时抛出的意外异常的{@link AsyncUncaughtExceptionHandler}* @since 5.1* @see AnnotationAsyncExecutionInterceptor#getDefaultExecutor(BeanFactory)*/@SuppressWarnings("unchecked")public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {// 创建异步注解类型集合,默认包含@Async注解Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {// 尝试添加EJB 3.1的@Asynchronous注解(如果存在)asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// 如果EJB 3.1 API不存在,则忽略}// 构建通知和切点this.advice = buildAdvice(executor, exceptionHandler);this.pointcut = buildPointcut(asyncAnnotationTypes);}/*** 设置'async'注解类型* <p>默认的异步注解类型是{@link Async}注解,以及EJB 3.1的{@code javax.ejb.Asynchronous}注解(如果存在)* <p>此setter属性的存在是为了让开发人员可以提供自己的(非Spring特定的)注解类型来指示方法应异步执行* @param asyncAnnotationType 所需的注解类型*/public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();asyncAnnotationTypes.add(asyncAnnotationType);// 重新构建切点this.pointcut = buildPointcut(asyncAnnotationTypes);}/*** 设置在按限定符查找执行器时要使用的{@code BeanFactory}*/@Overridepublic void setBeanFactory(BeanFactory beanFactory) {if (this.advice instanceof BeanFactoryAware) {((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);}}/*** 获取通知* @return Advice通知实例*/@Overridepublic Advice getAdvice() {return this.advice;}/*** 获取切点* @return Pointcut切点实例*/@Overridepublic Pointcut getPointcut() {return this.pointcut;}/*** 为给定的异步注解类型构建通知* @param executor 执行器供应器* @param exceptionHandler 异常处理器供应器* @return 构建的Advice实例*/protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;}/*** 为给定的异步注解类型计算切点(如果有的话)* @param asyncAnnotationTypes 要内省的异步注解类型* @return 适用的Pointcut对象,如果没有则返回{@code null}*/protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {// 类级别的注解匹配切点Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);// 方法级别的注解匹配切点Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}// 如果没有匹配的注解类型,返回Pointcut.TRUE(匹配所有)return (result != null ? result : Pointcut.TRUE);}}
3.3 AnnotationAsyncExecutionInterceptor 执行拦截
这是异步方法执行的核心拦截器:
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {@Override@Nullableprotected String getExecutorQualifier(Method method) {// 确定方法使用的执行器限定符Annotation asyncAnnotation = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);if (asyncAnnotation == null) {asyncAnnotation = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);}return (asyncAnnotation != null ? ((Async) asyncAnnotation).value() : null);}@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);// 查找真正要执行的方法final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);// 选择合适的异步执行器AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);// 创建异步任务Callable<Object> task = () -> {try {Object result = invocation.proceed();if (result instanceof Future) {return ((Future<?>) result).get();}} catch (ExecutionException ex) {handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());} catch (Throwable ex) {handleError(ex, userDeclaredMethod, invocation.getArguments());}return null;};// 提交任务到执行器return doSubmit(task, executor, invocation.getMethod().getReturnType());}
}
4. 线程池配置与优化
4.1 SimpleAsyncTaskExecutor 的缺陷
Spring Boot默认使用SimpleAsyncTaskExecutor
,它存在以下问题:
public class SimpleAsyncTaskExecutor implements AsyncTaskExecutor {@Overridepublic void execute(Runnable task, long startTimeout) {// 每次执行都创建新线程,没有线程复用Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task);thread.start();}
}
主要缺陷:
- 每次调用都创建新线程,线程创建和销毁开销大
- 缺乏线程池管理,容易导致系统资源耗尽
- 无法控制并发数量
4.2 ThreadPoolTaskExecutor 自定义配置
默认行为
- Spring 会搜索关联的线程池定义:
- 上下文中的唯一
TaskExecutor
bean - 或者名为 “taskExecutor” 的
Executor
bean
- 上下文中的唯一
- 如果两者都不可解析,将使用
SimpleAsyncTaskExecutor
来处理异步方法调用 - 带有
void
返回类型的注解方法无法将异常传回给调用者 - 默认情况下,这些未捕获的异常只会被记录到日志中
自定义配置
要自定义配置,需要实现 AsyncConfigurer
接口并提供:
- 通过
AsyncConfigurer.getAsyncExecutor()
方法提供自定义的Executor
- 通过
AsyncConfigurer.getAsyncUncaughtExceptionHandler()
方法提供自定义的AsyncUncaughtExceptionHandler
配置示例
@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(7);executor.setMaxPoolSize(42);executor.setQueueCapacity(11);executor.setThreadNamePrefix("MyExecutor-");executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new MyAsyncUncaughtExceptionHandler();}
}
重要注意事项
- NOTE:
AsyncConfigurer
配置类在应用程序上下文引导过程中会较早初始化 - 如果需要依赖其他 bean,请尽可能将它们声明为 ‘lazy’,以确保它们也能通过其他后处理器
简化配置
- 如果只需要自定义其中一个项目,可以对另一个返回
null
来保持默认设置 - 考虑在可能的情况下从
AsyncConfigurerSupport
扩展
Bean 管理
- 注意:在上面的示例中,
ThreadPoolTaskExecutor
不是完全受管理的 Spring bean - 如果希望获得完全受管理的 bean,请在
getAsyncExecutor()
方法上添加@Bean
注解 - 在这种情况下,不再需要手动调用
executor.initialize()
方法,因为这会在 bean 初始化时自动调用
5. 异步处理完整流程
5.1 启动阶段流程
-
@EnableAsync注解解析
- 通过
@Import
导入AsyncConfigurationSelector
- 根据
AdviceMode
选择配置类(通常是ProxyAsyncConfiguration
)
- 通过
-
Bean后处理器注册
ProxyAsyncConfiguration
创建AsyncAnnotationBeanPostProcessor
- 后处理器在Bean初始化前后进行拦截处理
-
拦截器构建
AsyncAnnotationAdvisor
创建切点和通知AnnotationAsyncExecutionInterceptor
作为方法拦截器
5.2 运行时执行流程
-
方法调用拦截
- 代理对象拦截带有
@Async
注解的方法调用 AsyncAnnotationAdvisor
匹配切点
- 代理对象拦截带有
-
执行器选择
- 根据
@Async
注解的value值选择执行器 - 默认使用
AsyncConfigurer
配置的执行器
- 根据
-
异步任务提交
- 将同步方法调用包装为
Callable
任务 - 通过
ThreadPoolTaskExecutor
提交到线程池
- 将同步方法调用包装为
-
结果处理
- 对于返回
Future
的方法,返回Future
对象 - 对于void方法,在后台异步执行
- 对于返回
6. 最佳实践与注意事项
6.1 配置建议
# application.yml
async:thread-pool:core-size: 20max-size: 100queue-capacity: 200keep-alive-seconds: 60
6.2 异常处理
@Component
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {log.error("异步方法执行异常: {}.{}", method.getDeclaringClass().getName(), method.getName(), ex);// 发送告警、记录日志等处理}
}
6.3 常见问题解决
- 自调用问题:同一个类中方法调用
@Async
方法不会生效 - 事务管理:异步方法中的事务需要特殊处理
- 上下文传递:需要手动传递安全上下文等线程绑定信息
6.4 使用 TransmittableThreadLocal (TTL)设置父子线程上下文传递
对于使用了线程池的场景,阿里开源的TransmittableThreadLocal (TTL) 是更好的选择,它解决了InheritableThreadLocal
在线程池中数据混乱的问题。
-
添加 TTL 依赖
首先在你的pom.xml
中添加TTL依赖:<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.14.2</version> </dependency>
-
设置 TransmittableThreadLocal 装饰器
@Bean
public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() {return new BeanPostProcessor() {@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (!(bean instanceof ThreadPoolTaskExecutor)) {return bean;}ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) bean;executor.setTaskDecorator(TtlRunnable::get); // 设置 TransmittableThreadLocal 装饰器return executor;}};
}
7. 总结
Spring Boot的异步处理框架通过AOP和线程池技术实现了优雅的异步编程模型。核心在于:
- @EnableAsync:通过导入配置启动异步支持
- AsyncAnnotationBeanPostProcessor:识别并代理异步方法
- AnnotationAsyncExecutionInterceptor:拦截并异步执行方法
- ThreadPoolTaskExecutor:提供高效的线程池管理
深入理解这些核心组件的源码和实现原理,有助于我们更好地使用和优化Spring Boot的异步处理能力,构建高性能的并发应用程序。