当前位置: 首页 > news >正文

【CompletableFuture】异步编程

CompletableFuture异步编程

  • CompletableFuture介绍
  • 与传统 Future 的对比
  • 使用方法
    • 1. 使用 supplyAsync(有返回值)
    • 使用 runAsync(无返回值)
    • 指定自定义线程池
  • 处理异步结果
    • 1. thenApply:转换结果
    • 2.thenAccept:消费结果
    • 3.thenRun:完成后执行操作
  • 组合任务
    • 1. thenCompose:串联两个任务
    • 2. thenCombine:合并两个任务结果
    • 3. allOf:等待所有任务完成
    • 4. anyOf:任意一个任务完成
  • 异常处理
    • 1. exceptionally:捕获异常并返回默认值
    • 2. handle:无论成功/失败都处理
    • 3. whenComplete:记录日志但不修改结果
  • 完整示例:链式调用 + 异常处理
  • 关键点总结

CompletableFuture介绍

1.基础概念

CompletableFuture 是 Java 8 引入的一个类,用于表示异步计算的结果。它实现了 Future 接口,但比传统的 Future 更强大,支持:

  • 非阻塞操作:通过回调函数处理结果,无需手动调用 get() 阻塞线程。

  • 链式编程:将多个异步任务串联或并联,形成复杂的执行流水线。

  • 异常处理:提供统一的异常捕获和恢复机制。

2. 核心思想

  • 异步编程:将耗时的操作(如I/O、网络请求)交给其他线程执行,主线程继续处理其他任务。

  • 函数式风格:通过 thenApply、thenAccept 等方法,以声明式的方式组合任务。

3. 关键特点

  • 回调驱动:任务完成后自动触发后续操作。

  • 线程池集成:支持自定义线程池,避免资源竞争。

  • 结果依赖管理:轻松处理多个任务之间的依赖关系(如A任务的结果是B任务的输入)。

与传统 Future 的对比

特性FutureCompletableFuture
结果获取阻塞调用 get()非阻塞回调(thenAccept)
任务组合需要手动轮询链式调用(thenApply、thenCompose)
异常处理需在调用代码中处理内置 exceptionally、handle
线程控制依赖 ExecutorService支持自定义线程池
适用场景简单的异步任务复杂的异步流水线

使用方法

1. 使用 supplyAsync(有返回值)

public class MyThreadTest {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "00";

        });
        // 获取结果(阻塞)
        String result = future.get();
        System.out.println("result:"+result);
    }
}

使用 runAsync(无返回值)

public class MyThreadTest {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("sleep 1m");
        });
        //等待任务完成
        completableFuture.get();
    }
}

指定自定义线程池

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Custom Thread Pool";
        }, executor);
        String s = future.get();
        System.out.println(s);
    }
}

处理异步结果

1. thenApply:转换结果

thenApply 方法用于在 CompletableFuture 完成时应用一个函数,并返回计算的结果。它返回一个新的 CompletableFuture,该 CompletableFuture 的类型由函数返回值的类型决定。
语法:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);

示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " World");
        String s = future.get();
        System.out.println(s);
    }

输出
在这里插入图片描述

2.thenAccept:消费结果

thenAccept 方法用于在 CompletableFuture 完成时执行一个消费者(Consumer)操作,但不返回任何值(即它的返回类型是 void)。这通常用于执行一些副作用,比如打印日志、更新UI等,而不关心计算的结果。
语法:

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);

示例:

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenAccept(s -> System.out.println("Result: " + s));
        future.get();
    }
}

输出
在这里插入图片描述

3.thenRun:完成后执行操作

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenRun(() -> System.out.println("Task finished"));
        future.get();
    }
}

输出
在这里插入图片描述

组合任务

1. thenCompose:串联两个任务

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->{
            String a = "Hello";
                    System.out.println(Thread.currentThread().getName() + "-a:" + a);
                    return a;
                })
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> {
                    String r = " World";
                    System.out.println(Thread.currentThread().getName() + "-r:" + r);
                    return s + r;
                }));
        String s = future.get();// "Hello World"
        System.out.println(Thread.currentThread().getName() + "s:"+s);
    }
}

输出:
在这里插入图片描述

2. thenCombine:合并两个任务结果

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            String h = "Hello";
            System.out.println(Thread.currentThread().getName() + " h:" + h);
            return h;
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() ->{
            String w = " World";
            System.out.println(Thread.currentThread().getName() + " w:" + w);
            return w;
        });
        CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
        String s = combined.get();
        System.out.println(Thread.currentThread().getName() + " s:" + s);
    }
}

输出:
在这里插入图片描述

3. allOf:等待所有任务完成

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1");
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");
        CompletableFuture<Void> all = CompletableFuture.allOf(task1, task2);
        all.thenRun(() -> System.out.println("All tasks completed"));
    }
}

输出
在这里插入图片描述

4. anyOf:任意一个任务完成

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "Task1";
        });
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");
        CompletableFuture<Object> any = CompletableFuture.anyOf(task1, task2);
        any.thenAccept(result -> System.out.println("First result: " + result)); // 输出 "Task2"
    }
}

输出
在这里插入图片描述

异常处理

1. exceptionally:捕获异常并返回默认值

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (true) throw new RuntimeException("Error!");
            return "Success";
        }).exceptionally(ex -> "Fallback Value");
        String s = future.get();// 返回 "Fallback Value"
        System.out.println(s);
    }
}

在这里插入图片描述

2. handle:无论成功/失败都处理

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (new Random().nextBoolean()) throw new RuntimeException("Error!");
            return "Success";
        }).handle((result, ex) -> {
            if (ex != null) return "Fallback";
            return result;
        });
    }

3. whenComplete:记录日志但不修改结果

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
                .whenComplete((result, ex) -> {
                    if (ex != null) ex.printStackTrace();
                    else System.out.println("Result: " + result);
                });
        String s = future.get();
        System.out.println(s);
    }
}

完整示例:链式调用 + 异常处理

public class MyThreadTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            // 步骤1:获取用户ID
            return 123;
        }).thenApply(userId -> {
            // 步骤2:根据用户ID查询名称
            if (userId == 123) return "Alice";
            else throw new IllegalArgumentException("Invalid User ID");
        }).thenApply(userName -> {
            // 步骤3:转换为大写
            return userName.toUpperCase();
        }).exceptionally(ex -> {
            // 统一异常处理
            System.out.println("Error: " + ex.getMessage());
            return "DEFAULT_USER";
        }).thenAccept(finalResult -> {
            System.out.println("Final Result: " + finalResult); // 输出 "ALICE" 或 "DEFAULT_USER"
        });
    }
}

输出
在这里插入图片描述

关键点总结

异步执行:使用 supplyAsync/runAsync 启动异步任务。

链式调用:通过 thenApply/thenAccept/thenRun 串联操作。

组合任务:thenCompose(依赖)和 thenCombine(并行)合并结果。

异常处理:优先使用 exceptionally 或 handle 提供容错。

线程池控制:避免使用默认线程池处理阻塞任务(如I/O)

相关文章:

  • 每天五分钟玩转深度学习PyTorch:搭建LSTM算法模型完成词性标注
  • 使用libcurl编写爬虫程序指南
  • UE4模型导入笔记
  • 若依 前后端部署
  • 聊透多线程编程-线程基础-4.C# Thread 子线程执行完成后通知主线程执行特定动作
  • KWDB创作者计划—KWDB技术重构:重新定义数据与知识的神经符号革命
  • 网络机顶盒常见问题全解析:从安装到故障排除
  • 使用 VBA 宏创建一个选择全部word图片快捷指令,进行图片格式编辑
  • vba讲excel转换为word
  • 从One-Hot到TF-IDF:NLP词向量演进解析与业务实战指南
  • 初版纳米AI_git pull分支关联关系
  • 如何降低论文的AIGC检测率,减少“AI味”
  • CD26.【C++ Dev】类和对象(17) static成员(下)
  • c++进阶之----异常
  • java实体类常用参数验证
  • DepthAI ROS 安装与使用教程
  • 从传统 CLI 到自动化:网管协议( SNMP / NETCONF / RESTCONF)与 YANG
  • Java SE(2)——运算符
  • 艾尔登法环Steam不同账号存档互通方法与替换工具分享
  • nginx入门,部署静态资源,反向代理,负载均衡使用