java通过线程池加CompletableFuture实现批量异步处理
1,线程池
package org.springblade.sample.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.*;/*** 肖扬: 线程池配置类**/@Configuration
public class MyThreadConfig {/*** 基础线程数为 cpu 2n-1* 核心线程为 基础线程数 2n-1*/@Beanpublic ThreadPoolExecutor threadPoolExecutor() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), // 核心线程数Runtime.getRuntime().availableProcessors()*2, // 最大线程数60, // 线程空闲时存活的时间TimeUnit.SECONDS, // 存活时间的单位new ArrayBlockingQueue<>(1000), // 队列类型和大小(无界队列)Executors.defaultThreadFactory(), // 线程工厂 (默认,无特殊需求都为默认)new ThreadPoolExecutor.CallerRunsPolicy() // 饱和策略(CallerRunsPolicy策略防止任务的丢失));}}
2,实现service
package org.springblade.sample.respiratorytract.controller.covid19;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;@Service
@Slf4j
public class MyService {@Autowiredprivate ThreadPoolExecutor executor;// 整个批次最大执行时间(30 秒)private static final long BATCH_TIMEOUT_SECONDS = 30;/*** 批量异步处理(30 秒整体超时 + 快速失败)*/public CompletableFuture<List<String>> processBatchAsync(List<String> inputs) {// 存放所有任务List<CompletableFuture<String>> futures = inputs.stream().map(input -> CompletableFuture.supplyAsync(() -> doWork(input), executor).handle((res, ex) -> {if (ex != null) {log.error("任务执行失败, input {} msg {}", input, ex.getMessage());throw new RuntimeException(ex.getMessage());}return res;})).collect(Collectors.toList());// 缓存 futures 数组,避免重复 toArrayCompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture[0]);// allOf 表示所有任务成功才完成CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresArray);// failFast: 只要有一个任务失败,立刻失败CompletableFuture<Object> failFast = CompletableFuture.anyOf(futuresArray);// allFutures 和 failFast 谁先完成用谁CompletableFuture<Void> combined = CompletableFuture.anyOf(allFutures, failFast).thenCompose(o -> allFutures);// 加整体 30 秒超时CompletableFuture<Void> batchWithTimeout = combined.orTimeout(BATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);// 收集结果return batchWithTimeout.thenApply(v ->futures.stream().map(CompletableFuture::join).collect(Collectors.toList())).exceptionally(ex -> {log.error("批量任务执行失败或超时: {}", ex.getMessage());// 取消所有未完成任务futures.forEach(f -> {if (!f.isDone()) {f.cancel(true);}});throw new RuntimeException("批量任务执行失败,请联系管理员!");});}/*** 具体业务逻辑*/private String doWork(String input) {//TODO 编写业务逻辑return "processed:" + input;}
}
注释:使用jdk9及以上