当前位置: 首页 > news >正文

java通过线程池加CompletableFuture实现批量异步处理

1,线程池

package org.springblade.sample.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.*;/*** 肖扬: 线程池配置类**/@Configuration
public class MyThreadConfig {/*** 基础线程数为 cpu 2n-1* 核心线程为 基础线程数 2n-1*/@Beanpublic ThreadPoolExecutor threadPoolExecutor() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),   // 核心线程数Runtime.getRuntime().availableProcessors()*2,            // 最大线程数60,                                                        // 线程空闲时存活的时间TimeUnit.SECONDS,                                          // 存活时间的单位new ArrayBlockingQueue<>(1000),              // 队列类型和大小(无界队列)Executors.defaultThreadFactory(),                          // 线程工厂 (默认,无特殊需求都为默认)new ThreadPoolExecutor.CallerRunsPolicy()                  // 饱和策略(CallerRunsPolicy策略防止任务的丢失));}}

2,实现service

package org.springblade.sample.respiratorytract.controller.covid19;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;@Service
@Slf4j
public class MyService {@Autowiredprivate  ThreadPoolExecutor executor;// 整个批次最大执行时间(30 秒)private static final long BATCH_TIMEOUT_SECONDS = 30;/*** 批量异步处理(30 秒整体超时 + 快速失败)*/public CompletableFuture<List<String>> processBatchAsync(List<String> inputs) {// 存放所有任务List<CompletableFuture<String>> futures = inputs.stream().map(input -> CompletableFuture.supplyAsync(() -> doWork(input), executor).handle((res, ex) -> {if (ex != null) {log.error("任务执行失败, input {} msg {}", input, ex.getMessage());throw new RuntimeException(ex.getMessage());}return res;})).collect(Collectors.toList());// 缓存 futures 数组,避免重复 toArrayCompletableFuture<?>[] futuresArray = futures.toArray(new CompletableFuture[0]);// allOf 表示所有任务成功才完成CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresArray);// failFast: 只要有一个任务失败,立刻失败CompletableFuture<Object> failFast = CompletableFuture.anyOf(futuresArray);// allFutures 和 failFast 谁先完成用谁CompletableFuture<Void> combined = CompletableFuture.anyOf(allFutures, failFast).thenCompose(o -> allFutures);// 加整体 30 秒超时CompletableFuture<Void> batchWithTimeout = combined.orTimeout(BATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);// 收集结果return batchWithTimeout.thenApply(v ->futures.stream().map(CompletableFuture::join).collect(Collectors.toList())).exceptionally(ex -> {log.error("批量任务执行失败或超时: {}", ex.getMessage());// 取消所有未完成任务futures.forEach(f -> {if (!f.isDone()) {f.cancel(true);}});throw new RuntimeException("批量任务执行失败,请联系管理员!");});}/*** 具体业务逻辑*/private String doWork(String input) {//TODO 编写业务逻辑return "processed:" + input;}
}

注释:使用jdk9及以上


文章转载自:

http://SUcXRmQo.mhmsn.cn
http://bZIDFYSH.mhmsn.cn
http://scd25WGt.mhmsn.cn
http://WvaZNkS4.mhmsn.cn
http://RYwvEI1E.mhmsn.cn
http://2xA0CYoH.mhmsn.cn
http://TPzeOJJO.mhmsn.cn
http://Gumy3K62.mhmsn.cn
http://Qzb4VtEs.mhmsn.cn
http://y8dj9UQI.mhmsn.cn
http://lajvH0fX.mhmsn.cn
http://IApTxLr5.mhmsn.cn
http://0UF1Que8.mhmsn.cn
http://Vybg9mza.mhmsn.cn
http://rMQ3JupX.mhmsn.cn
http://XgEKCp9N.mhmsn.cn
http://srY72B4k.mhmsn.cn
http://2wasCNIC.mhmsn.cn
http://7b7kMxhW.mhmsn.cn
http://hAuvPAVf.mhmsn.cn
http://OKUVKh4E.mhmsn.cn
http://nB6kSDk7.mhmsn.cn
http://d3ueXEM9.mhmsn.cn
http://8PLleYyK.mhmsn.cn
http://U5FSZ2Ii.mhmsn.cn
http://Jn2xhIrF.mhmsn.cn
http://oA09rA8F.mhmsn.cn
http://U5i2MO8M.mhmsn.cn
http://dd7fjDmJ.mhmsn.cn
http://5vp36lza.mhmsn.cn
http://www.dtcms.com/a/385447.html

相关文章:

  • Coze源码分析-资源库-创建知识库-后端源码-详细流程梳理
  • 极简版 Nginx 反向代理实验步骤
  • python-86-基于Graphviz或Mermaid绘制流程图
  • 智能农机无人驾驶作业套圈路径规划
  • Rayon Rust中的数据并行库入门教程
  • NumPy数组与Python列表的赋值行为解析
  • 基于 AI 的大前端智能家居控制应用开发
  • RAGFlow集成SGLang部署的大模型:实现OpenAI API兼容的自定义LLM调用
  • sqlsever 内存配置错误无法连接,后面恢复连接
  • 51c大模型~合集182
  • 2025.9.15总结
  • 深入理解 Roo Code 的 Code Actions 功能
  • Java---线程池讲解
  • PEFT QLora Deepspeed Zero Stage 3 Offload Trainning
  • 线程概念,控制
  • 扫描仪常见样式:平板与馈纸的特性与适用场景
  • Python进程和线程——多线程
  • 2025年AIOCR审核革命!七大智能费控报销系统终结手工录入
  • 从循环到矩阵运算:矢量化加速机器学习的秘诀
  • R 语言入门实战|第七章 程序:从“老虎机”项目学透流程控制与代码优化
  • clickhouse 中SUM(CASE WHEN ...) 返回什么类型?
  • NR帧结构
  • 【联合查询】
  • 常见IC封装详解:从DIP到BGA的演进与应用
  • DockerComposeUI+cpolar:容器管理的远程可视化方案
  • tcp的三次握手与四次挥手简介
  • 2025算法八股——深度学习——MHA MQA GQA
  • 常见岩性分类与油气勘探意义笔记
  • 贪心算法应用:内存分配(First Fit)问题详解
  • RTK基站模块技术要点与作用解析