SpringBoot系列之CompletableFuture控制同步任务的先后执行
SpringBoot系列之CompletableFuture控制同步任务的先后执行
在 SpringBoot 项目中,“先同步分组数据、再同步分组人员绑定” 是典型的有先后执行顺序要求的业务场景。传统异步方式需手动处理任务依赖,易出现顺序混乱问题,而CompletableFuture能精准控制任务先后执行逻辑,本文聚焦其如何实现这一核心需求,结合实战代码与图表详解。
一、核心需求:明确任务先后执行顺序
业务场景存在强依赖关系,必须满足 “先完成、后执行”:
-
前置任务:同步外部系统分组数据(输出
List<Group>,若未完成,后续任务无法启动); -
后置任务:基于前置任务返回的分组列表,同步人员绑定关系(输入为前置任务结果,依赖前置任务执行完成)。
若跳过前置任务直接执行后置任务,会因缺少分组数据导致业务异常,因此严格控制先后顺序是核心目标。
二、CompletableFuture 如何控制先后执行?
CompletableFuture通过链式调用方法,强制后置任务等待前置任务完成后再执行,核心依赖方法如下:
| 核心方法 | 作用 | 适用场景 |
|---|---|---|
supplyAsync() | 执行有返回结果的前置任务 | 同步分组(需输出分组列表) |
thenAcceptAsync() | 前置任务完成后,执行无返回结果的后置任务,且接收前置任务结果 | 同步人员绑定(需分组列表参数) |
2.1 关键代码:强制先后执行逻辑
通过supplyAsync()定义前置任务,thenAcceptAsync()绑定后置任务,实现 “前置不完成,后置不启动”:
/*** 人员组先同步,然后同步人员绑定** @param msg msg* @param channel channel* @throws Exception Exception*/
@RabbitListener(queues = "sync.group_and_person_bind.queue", containerFactory = "rabbitListenerContainerFactory")
public void handleGroupAndPersonBindSync(Message msg, Channel channel) throws Exception {long deliveryTag = msg.getMessageProperties().getDeliveryTag();// 同步所有分组CompletableFuture<Void> groupFuture = CompletableFuture.runAsync(() -> {log.info("开始同步人员组(deliveryTag: {})...", deliveryTag);try {handleSync(msg, channel, Group.class);log.info("人员组同步完成(deliveryTag: {})", deliveryTag);} catch (Exception e) {log.error("人员组同步失败(deliveryTag: {})", deliveryTag, e);throw new RuntimeException("人员组同步失败", e);}}, syncExecutor);// 同步分组人员绑定CompletableFuture<Void> userBindFuture = groupFuture.thenRunAsync(() -> {log.info("开始同步人员绑定(deliveryTag: {})...", deliveryTag);try {handleSync(msg, channel, PersonGroupBind.class);log.info("人员绑定同步完成(deliveryTag: {})", deliveryTag);} catch (Exception exception) {log.error("人员绑定同步失败(deliveryTag: {})", deliveryTag, exception);throw new RuntimeException("人员绑定同步失败", exception);}}, syncExecutor);// 等待所有任务完成userBindFuture.get();
}
关键逻辑:thenRunAsync()的调用依赖groupFuture(前置任务结果),只有当groupFuture执行完成(无论成功或失败),thenRunAsync()对应的后置任务才会启动,从代码层面强制控制先后顺序。
三、SpringBoot 线程池配置:保障先后执行稳定性
为避免CompletableFuture默认线程池(ForkJoinPool)资源耗尽问题,需配置自定义线程池,确保前置、后置任务有稳定线程资源执行,不影响先后顺序控制:
3.1 线程池配置类(核心参数保障执行)
@Configurationpublic class ThreadPoolConfig {@Bean("syncExecutor")public Executor syncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100);executor.setThreadNamePrefix("sync-thread-");executor.setKeepAliveSeconds(60);executor.initialize();return executor;}
}
四、关键补充:join () 对先后执行的影响
join()方法虽不直接控制任务间的先后顺序,但会影响 “调用线程是否等待整个任务链完成”,需结合业务判断是否使用:
4.1 何时需要 join ()?
当调用线程(如接口请求线程)需等待 “前置 + 后置” 任务全部完成后再继续时,需调用join():
@GetMapping("/sync")
public String startSync() {CompletableFuture<Void> future = taskService.startSyncFlow();future.join(); // 等待前置+后置任务全部完成,再返回结果(保障业务顺序)return "分组-人员绑定同步完成";
}
4.2 何时不需要 join ()?
若为后台异步任务(无需等待结果),无需join(),避免调用线程阻塞:
// 后台任务:提交后无需等待,任务链按先后顺序自行执行
taskService.startSyncFlow().thenRun(() -> {log.info("分组-人员绑定同步完成(后台任务)");
});
五、异常处理:不破坏先后执行逻辑
若前置任务执行失败,需及时捕获异常,避免后置任务因参数异常报错,同时保障 “失败后不继续执行无效后置任务”:
CompletableFuture<Void> userBindFuture = groupFuture// 前置任务异常时,直接处理(不执行后置任务).exceptionally(ex -> {log.error("前置任务(分组同步)失败:", ex);throw new CompletionException("分组同步异常,终止后续人员绑定", ex);})// 前置任务成功后,再执行后置任务.thenAcceptAsync(groups -> businessService.syncGroupUserBind(groups), syncExecutor);
六、总结:CompletableFuture 控制先后执行的核心价值
-
代码层面强制依赖:通过
thenRunAsync()等链式方法,让后置任务必须等待前置任务完成,从根源避免顺序混乱; -
无需手动管理状态:无需用锁、计数器等工具判断前置任务状态,简化顺序控制逻辑;
-
结合线程池更稳定:自定义线程池保障任务执行资源,不因线程问题打乱先后顺序。
通过CompletableFuture,SpringBoot 项目中 “分组 - 人员绑定同步” 这类有先后顺序要求的业务,能实现高效、稳定的异步执行,同时避免传统方式的顺序管控难题。
