当前位置: 首页 > news >正文

Java多线程与高并发专题——如何利用 CompletableFuture 解决“聚合打车服务平台”的问题?

聚合打车服务平台问题

什么是聚合打车服务平台问题呢?

相信很多小伙伴们都用高德打过车,它就实现了一个聚合打车服务平台,其核心用户选定一条路线以后,可以同时获取多家打车公司的报价信息,比如你想要打车从公司回家,当你确定路线以后,需要看不同打车公司的价格是多少?

因为有很多家打车公司,所以应该把所有打车公司的报价等信息都获取到,然后再聚合。由于每个打车公司都有自己的服务器,所以分别去请求它们的服务器就可以了,比如请求小拉、滴滴、曹操等,如下图所示:

串行

最简单的一种比较原始的方案,就是用串行的方式来解决这个问题。

比如我们想获取价格,要先去访问小拉,然后再去访问滴滴,以此类推。当每一个请求发出去之后,等它响应回来以后,我们才能去请求下一个平台,这就是串行的方式。

但可以预见的,这样做的效率非常低下,因为打车公司比较多,假设每个打车公司都需要 1 秒钟的话,那么用户肯定等不及,所以这种方式是肯定不可取的。

并行

接下来我们就对刚才的思路进行改进,最主要的思路就是把串行改成并行,如下图所示: 

我们可以并行地去获取这些报价信息,然后再把报价信息给聚合起来,这样的话,效率会成倍的提高。

这种并行虽然提高了效率,但也有一个缺点,那就是会“一直等到所有请求都返回”。如果有一个平台特别慢,那么你不应该被那个平台拖累,比如说某个平台获取需要二十秒,那肯定是等不了这么长时间的,所以我们需要一个功能,那就是有超时的获取。

有超时的并行获取

下面我们就来看看下面这种有超时的并行获取的情况。

在这种情况下,就属于有超时的并行获取,同样也在并行的去请求各个平台信息。但是我们规定了一个时间的超时,比如 3 秒钟,那么到 3 秒钟的时候如果都已经返回了那当然最好,把它们收集起来即可;但是如果还有些平台没能及时返回,我们就把这些请求给忽略掉,这样一来用户体验就比较好了,它最多只需要等固定的 3 秒钟就能拿到信息,虽然拿到的可能不是最全的,但是总比一直等更好。

想要实现这个目标有几种实现方案,我们一个一个的来看看。

线程池的实现

第一个实现方案是用线程池,我们来看一下代码。 

/**
 * 线程池演示类
 * 该类展示了如何使用线程池并发获取多个平台的报价
 */
public class ThreadPoolDemo {
    // 创建一个固定大小为3的线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    /**
     * 主方法,用于演示线程池的使用
     * @param args 命令行参数(未使用)
     * @throws InterruptedException 如果线程被中断
     */
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();
        System.out.println(threadPoolDemo.getPrices());
    }

    /**
     * 获取多个平台的报价
     * @return 包含所有平台报价的Set集合
     * @throws InterruptedException 如果线程被中断
     */
    private Set<Integer> getPrices() throws InterruptedException {
        // 创建一个线程安全的HashSet来存储价格
        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        
        // 提交三个任务到线程池,每个任务处理一个平台的报价
        threadPool.submit(new Task(123, prices));
        threadPool.submit(new Task(456, prices));
        threadPool.submit(new Task(789, prices));
        
        // 等待3秒,让所有任务有足够的时间完成
        Thread.sleep(3000);
        
        return prices;
    }

    /**
     * 内部类Task,实现了Runnable接口
     * 用于模拟获取单个平台报价的任务
     */
    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;

        /**
         * Task构造函数
         * @param productId 产品ID
         * @param prices 用于存储报价的Set集合
         */
        public Task(Integer productId, Set<Integer> prices) {
            this.productId = productId;
            this.prices = prices;
        }

        /**
         * 任务的运行方法
         * 模拟获取平台报价的过程
         */
        @Override
        public void run() {
            int price = 0;
            try {
                // 模拟网络延迟,随机等待0-4秒
                Thread.sleep((long) (Math.random() * 4000));
                // 生成一个随机价格(0-3999)
                price = (int) (Math.random() * 4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 将获取到的价格添加到共享的prices集合中
            prices.add(price);
        }
    }
}

在代码中,新建了一个线程安全的 Set,它是用来存储各个价格信息的,把它命名为 Prices,然后往线程池中去放任务。线程池是在类的最开始时创建的,是一个固定 3 线程的线程池。而这个任务在下方的 Task 类中进行了描述,在这个 Task 中我们看到有 run 方法,在该方法里面,我们用一个随机的时间去模拟各个打车平台的响应时间,然后再去返回一个随机的价格来表示报价,最后把这个票价放到 Set 中。这就是我们 run 方法所做的事情。

再回到 getPrices 函数中,我们新建了三个任务,productId 分别是 123、456、789,这里的productId 并不重要,因为我们返回的价格是随机的,为了实现超时等待的功能,在这里调用了Thread 的 sleep 方法来休眠 3 秒钟,这样做的话,它就会在这里等待 3 秒,之后直接返回prices。

此时,如果前面响应速度快的话,prices 里面最多会有三个值,但是如果每一个响应时间都很慢,那么可能 prices 里面一个值都没有。不论你有多少个,它都会在休眠结束之后,也就是执行完 Thread 的 sleep 之后直接把 prices 返回,并且最终在 main 函数中把这个结果给打印出来。

这就是用线程池去实现的最基础的方案。

CountDownLatch

在这里会有一个优化的空间,比如说网络特别好时,每个打车公司响应速度都特别快,你根本不需要等三秒,有的打车公司可能几百毫秒就返回了,那么我们也不应该让用户等 3 秒。所以需要进行一下这样的改进,看下面这段代码:

/**
 * CountDownLatch的演示类
 * 该类展示了如何使用CountDownLatch来协调多个线程的执行
 */
public class CountDownLatchDemo {
    // 创建一个固定大小为3的线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    /**
     * 主方法,用于演示CountDownLatchDemo的使用
     * @param args 命令行参数
     * @throws InterruptedException 如果线程被中断
     */
    public static void main(String[] args) throws InterruptedException {
        CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
        System.out.println(countDownLatchDemo.getPrices());
    }

    /**
     * 获取报价的方法
     * 该方法创建三个任务,每个任务模拟获取一个打车的报价
     * @return 包含所有获取到的报价的Set
     * @throws InterruptedException 如果线程被中断
     */
    private Set<Integer> getPrices() throws InterruptedException {
        // 创建一个线程安全的Set来存储报价
        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        // 创建一个CountDownLatch,计数器设置为3
        CountDownLatch countDownLatch = new CountDownLatch(3);
        // 提交三个任务到线程池
        threadPool.submit(new Task(123, prices, countDownLatch));
        threadPool.submit(new Task(456, prices, countDownLatch));
        threadPool.submit(new Task(789, prices, countDownLatch));
        // 等待所有任务完成,最多等待3秒
        countDownLatch.await(3, TimeUnit.SECONDS);
        return prices;
    }

    /**
     * 内部类Task,实现了Runnable接口
     * 用于模拟获取打车报价的任务
     */
    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;
        CountDownLatch countDownLatch;

        /**
         * Task的构造方法
         * @param productId 打车ID
         * @param prices 用于存储报价的Set
         * @param countDownLatch CountDownLatch实例
         */
        public Task(Integer productId, Set<Integer> prices,
                    CountDownLatch countDownLatch) {
            this.productId = productId;
            this.prices = prices;
            this.countDownLatch = countDownLatch;
        }

        /**
         * 任务的运行方法
         * 模拟获取报价的过程,包括随机延迟和生成随机报价
         */
        @Override
        public void run() {
            int price = 0;
            try {
                // 模拟获取报价的延迟,最多4秒
                Thread.sleep((long) (Math.random() * 4000));
                // 生成一个随机报价
                price = (int) (Math.random() * 4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 将报价添加到共享的Set中
            prices.add(price);
            // 完成一个任务,计数器减1
            countDownLatch.countDown();
        }
    }
}

这段代码使用 CountDownLatch 实现了这个功能,整体思路和之前是一致的,不同点在于我们新增了一个 CountDownLatch,并且把它传入到了 Task 中。在 Task 中,获取完报价信息并且把它添加到 Set 之后,会调用 countDown 方法,相当于把计数减 1。

这样一来,在执行 countDownLatch.await(3,TimeUnit.SECONDS) 这个函数进行等待时,如果三个任务都非常快速地执行完毕了,那么三个线程都已经执行了 countDown 方法,那么这个 await 方法就会立刻返回,不需要傻等到 3 秒钟。

如果有一个请求特别慢,相当于有一个线程没有执行 countDown 方法,来不及在 3 秒钟之内执行完毕,那么这个带超时参数的 await 方法也会在 3 秒钟到了以后,及时地放弃这一次等待,于是就把 prices 给返回了。所以这样一来,我们就利用 CountDownLatch 实现了这个需求,也就是说我们最多等 3 秒钟,但如果在 3 秒之内全都返回了,我们也可以快速地去返回,不会傻等,提高了效率。

CompletableFuture

千呼万唤始出来,下面我们收回标题,来看一下用 CompletableFuture 来实现这个功能的用法,代码如下所示:

/**
 * CompletableFuture示例类
 * 演示了如何使用CompletableFuture执行并发任务并收集结果
 */
public class CompletableFutureDemo {

    /**
     * 主方法,用于演示CompletableFutureDemo的功能
     * @param args 命令行参数(未使用)
     * @throws Exception 可能抛出的异常
     */
    public static void main(String[] args) throws Exception {
        CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo();
        System.out.println(completableFutureDemo.getPrices());
    }

    /**
     * 获取报价的方法
     * 创建并执行三个并发任务,每个任务模拟获取一个打车的报价
     * @return 包含所有获取到的报价的Set集合
     */
    private Set<Integer> getPrices() {
        Set<Integer> prices = Collections.synchronizedSet(new HashSet<>());
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(123, prices));
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(456, prices));
        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(789, prices));
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
        try {
            allTasks.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            // 处理中断异常
        } catch (ExecutionException e) {
            // 处理执行异常
        } catch (TimeoutException e) {
            // 处理超时异常
        }
        return prices;
    }

    /**
     * 内部Task类,实现了Runnable接口
     * 用于模拟获取单个打车报价的任务
     */
    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;

        /**
         * Task构造函数
         * @param productId 打车ID
         * @param prices 用于存储报价的Set集合
         */
        public Task(Integer productId, Set<Integer> prices) {
            this.productId = productId;
            this.prices = prices;
        }

        /**
         * 运行任务,模拟获取报价的过程
         * 包括随机延迟和生成随机报价
         */
        @Override
        public void run() {
            int price = 0;
            try {
                Thread.sleep((long) (Math.random() * 4000));
                price = (int) (Math.random() * 4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            prices.add(price);
        }
    }
}

这里我们不再使用线程池了,我们看到 getPrices 方法,在这个方法中,我们用了 CompletableFuture 的 runAsync 方法,这个方法会异步的去执行任务。

我们有三个任务,并且在执行这个代码之后会分别返回一个 CompletableFuture 对象,我们把它们命名为 task 1、task 2、task 3,然后执行 CompletableFuture 的 allOf 方法,并且把 task 1、task 2、task 3 传入。这个方法的作用是把多个 task 汇总,然后可以根据需要去获取到传入参数的这些 task 的返回结果,或者等待它们都执行完毕等。我们就把这个返回值叫作 allTasks,并且在下面调用它的带超时时间的 get 方法,同时传入 3 秒钟的超时参数。

这样一来它的效果就是,如果在 3 秒钟之内这 3 个任务都可以顺利返回,也就是这个任务包括的那三个任务,每一个都执行完毕的话,则这个 get 方法就可以及时正常返回,并且往下执行,相当于执行到 return prices。在下面的这个 Task 的 run 方法中,该方法如果执行完毕的话,对于CompletableFuture 而言就意味着这个任务结束,它是以这个作为标记来判断任务是不是执行完毕的。 但是如果有某一个任务没能来得及在 3 秒钟之内返回,那么这个带超时参数的 get 方法便会抛出 TimeoutException 异常,同样会被我们给 catch 住。这样一来它就实现了这样的效果:会尝试等待所有的任务完成,但是最多只会等 3 秒钟,在此之间,如及时完成则及时返回。那么所以我们利用 CompletableFuture,同样也可以解决了聚合打车服务平台问题。它的运行结果也和之前是一样的,有多种可能性。

相关文章:

  • 2.4 隐函数及由参数方程确定的函数求导
  • C#中值类型与引用类型是直观使用示例
  • __init__.py
  • openGauss关联列数据类型不一致引起谓词传递失败
  • fuse性能关键参数entry_timeout
  • python面试高频考点(深度学习大模型方向)
  • V8引擎源码编译踩坑实录
  • 如何在 Vue 项目中使用v - show和v - if指令,它们的原理、区别和适用场景是什么
  • jangow-01-1.0.1靶机攻略
  • QuecPython 外设接口之GPIO应用指南
  • java中的常量可以不用在声明的时候初始化,c中的必须在声明的时候初始化,可不可以这么理解?
  • HDMI(High-Definition Multimedia Interface)详解
  • 三分钟读懂微服务
  • UE4学习笔记 FPS游戏制作16 重构FppShooter和RoboteShooter 提出父类Shooter
  • HTML应用指南:利用POST请求获取城市肯德基门店位置信息
  • 【八股文】http怎么建立连接的
  • 破解云端依赖!如何通过Flowise搭建私有化的端到端AI开发环境
  • [250324] Kafka 4.0.0 版本发布:告别 ZooKeeper,拥抱 KRaft!| Wine 10.4 发布!
  • 上海瀛旻信息科技有限公司
  • 总结 Spring 中存储 Bean 的相关注解以及这些注解的用法.
  • 李铁案二审今日宣判
  • “光荣之城”2025上海红色文化季启动,红色主题市集亮相
  • 邮储银行一季度净赚超252亿降逾2%,营收微降
  • 国家卫健委:工作相关肌肉骨骼疾病、精神和行为障碍成职业健康新挑战
  • “下山虎”张名扬一回合摘下“狮心”:你们再嘘一个给我听听
  • 洗冤录·巴县档案|道咸年间一起家暴案