当前位置: 首页 > news >正文

CompletableFuture:整合、超时、完成事件与批量处理

引言

在异步编程实践中,我们不仅需要处理单个任务的执行流程,更需要应对多个异步任务之间的复杂交互。本文将通过实际案例解析以下核心功能:

  • 双任务整合:合并两个独立任务的结果
  • 高效超时控制:防止异步操作无限等待
  • 完成事件处理:实时响应任务完成状态
  • 批量任务管理:使用 allOf 处理并行任务组

一、双任务结果整合

1.1 thenCombine 方法

合并两个独立任务的执行结果,适用于需要同时获取两个异步结果的场景

CompletableFuture<String> userInfo = getUserInfoAsync(userId);
CompletableFuture<Double> balance = getAccountBalanceAsync(accountId);

userInfo.thenCombine(balance, (info, money) -> {
    return String.format("用户 %s 余额 %.2f", info, money);
}).thenAccept(System.out::println);

执行流程

  1. 并行获取用户信息和账户余额
  2. 当两个任务都完成时执行合并操作
  3. 输出格式化字符串

1.2 thenCompose 方法

串联两个依赖型任务,前一个任务的结果作为下一个任务的输入

CompletableFuture<Order> orderFuture = fetchOrderAsync("O123");

orderFuture.thenCompose(order -> 
    calculateTaxAsync(order.getAmount())
).thenAccept(tax -> 
    System.out.println("应缴税款:" + tax)
);

二、高效超时控制

2.1 Java 9+ 原生超时支持

CompletableFuture<String> dataFuture = fetchDataFromNetwork()
    .orTimeout(3, TimeUnit.SECONDS)
    .exceptionally(ex -> {
        if (ex instanceof TimeoutException) {
            return "默认数据";
        }
        throw new CompletionException(ex);
    });

2.2 Java 8 兼容方案

ExecutorService timeoutExecutor = Executors.newScheduledThreadPool(2);

<T> CompletableFuture<T> withTimeout(CompletableFuture<T> future, 
                                    long timeout, TimeUnit unit) {
    CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
    timeoutExecutor.schedule(() -> {
        if (!future.isDone()) {
            timeoutFuture.completeExceptionally(new TimeoutException());
        }
    }, timeout, unit);
    return future.applyToEither(timeoutFuture, Function.identity());
}

// 使用示例
withTimeout(fetchData(), 2, TimeUnit.SECONDS)
    .exceptionally(ex -> handleTimeout(ex));

三、完成事件处理

3.1 whenComplete 方法

无论成功或异常都会触发的完成回调

processImageAsync(imageFile)
    .whenComplete((result, ex) -> {
        if (ex != null) {
            log.error("图片处理失败", ex);
            sendAlert(ex.getMessage());
        } else {
            updateUI(result);
        }
    });

3.2 thenAccept 方法

仅处理成功结果的消费者

generateReportAsync()
    .thenAccept(report -> {
        saveToDatabase(report);
        sendEmailNotification(report);
    })
    .exceptionally(ex -> {
        log.error("报告生成失败", ex);
        return null;
    });

四、批量任务管理

4.1 allOf 方法

等待所有任务完成(不保留结果顺序)

List<String> userIds = Arrays.asList("U1001", "U1002", "U1003");

List<CompletableFuture<UserProfile>> futures = userIds.stream()
    .map(id -> fetchUserProfileAsync(id))
    .collect(Collectors.toList());

CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);

allFutures.thenRun(() -> {
    List<UserProfile> profiles = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
    analyzeProfiles(profiles);
});

4.2 anyOf 方法

任一任务完成即触发

CompletableFuture<Object> anyResult = CompletableFuture.anyOf(
    queryFromCache(),
    queryFromDB(),
    queryFromRemoteService()
);

anyResult.thenAccept(result -> 
    System.out.println("最快返回结果: " + result)
);

五、综合实战案例

电商订单支付流程

public class PaymentProcessor {

    // 核心支付方法
    public CompletableFuture<PaymentResult> processPayment(PaymentRequest request) {
        return validateRequest(request)
            .thenCompose(valid -> 
                reserveInventory(request.getItems())
            )
            .thenCombine(
                calculateTax(request.getAmount()), 
                (inventoryId, tax) -> buildPaymentPayload(request, inventoryId, tax)
            )
            .thenCompose(payload -> 
                callPaymentGateway(payload)
                    .orTimeout(10, TimeUnit.SECONDS)
            )
            .handle((result, ex) -> {
                if (ex != null) {
                    rollbackOperations();
                    return new PaymentResult(FAILURE, ex.getMessage());
                }
                return result;
            });
    }

    // 辅助方法省略...
}

流程说明

  1. 请求验证(同步)
  2. 库存预留(异步)
  3. 并行计算税费
  4. 组合支付请求
  5. 调用支付接口(带超时)
  6. 统一处理结果/异常

六、性能优化要点

  1. 线程池分层

    // CPU密集型任务
    ExecutorService cpuBoundPool = Executors.newWorkStealingPool();
    
    // IO密集型任务
    ExecutorService ioBoundPool = Executors.newCachedThreadPool();
    
    // 定时任务
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    
  2. 结果缓存策略

    private final Map<String, CompletableFuture<ProductInfo>> cache = 
        new ConcurrentHashMap<>();
    
    public CompletableFuture<ProductInfo> getProduct(String id) {
        return cache.computeIfAbsent(id, 
            k -> fetchFromRemote(id).thenApply(this::parseResponse));
    }
    

七、常见问题排查

7.1 回调链未触发

  • 检查是否遗漏异常处理
  • 确认线程池未饱和

7.2 内存泄漏

  • 及时清理已完成的Future引用
  • 使用WeakReference持有上下文

7.3 死锁问题

  • 避免在回调中同步等待其他Future
  • 使用非阻塞的组合方法

结语

通过合理运用 CompletableFuture 的组合功能,我们可以构建出:

  • 支持超时熔断的健壮系统
  • 高效并行的批量处理流程
  • 实时响应的事件驱动架构

相关文章:

  • 处理甘特图启动依赖报错。
  • vite.config.js常用配置
  • STM32_USB
  • 提升Spring Boot开发效率的Idea插件:Spring Boot Helper
  • 【LLM】DeepResearch系列(Search-R1、Search-o1、R1-Searcher)
  • 大模型最新面试题系列:模型部署(二)
  • io_uring 异步 socket 编程
  • 自动化框架及其设计搭建浅谈(二)--分层自动化测试
  • 浮点数精度问题
  • vue项目中,添加主题皮肤切换功能
  • 负指数二项式展开
  • 【Unity】animator检测某state动画播放完毕方法
  • C高级,终端操作
  • 如何保证RabbitMQ消息的可靠传输?
  • 【Scratch编程系列】程序积木-外观类
  • 1101复位之后故障仍保持原因分析
  • 交换机和集线器的区别
  • Compose笔记(十四)--LazyColumn
  • 计算机系统---性能指标(1)CPU与GPU
  • 横扫SQL面试——PV、UV问题
  • 国内网站 专做国外视频/网络营销的基本流程
  • 做公司网站怎么推广/微信销售平台
  • 福田网站建设龙岗网站建设罗湖网站建设/最近新闻摘抄
  • 公司官方网站一般什么公司做/b2b网站推广优化
  • 三站合一网站建设/如何查询域名注册人信息
  • 网站建设 微盘下载/网络舆情监控