当前位置: 首页 > news >正文

线程池阻塞队列:告别资源耗尽,拥抱优雅并发!

在这里插入图片描述

目录

    • 一、什么是阻塞队列?(简单回顾)
    • 二、阻塞队列的特点
    • 三、Java 提供的常用阻塞队列
    • 四、如何选择合适的阻塞队列?
    • 五、总结

🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!

🌟了解 synchronized 关键字请看: synchronized 关键字:线程同步的“VIP 包间”

其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等

如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning

一、什么是阻塞队列?(简单回顾)

还记得我们餐厅的例子吗? 线程池就像一个预先雇佣好一批服务员的餐厅。 客人(任务)来了,直接分配一个空闲的服务员去服务。 如果所有服务员都在忙,新来的客人就要排队。 这个排队的地方,就是阻塞队列 ⏳。

深入了解线程池的拒绝策略请看:线程池拒绝策略:让你的程序不再“爆仓”!

想深入了解线程池请看: 线程池:从入门到精通,只需一杯咖啡的时间

二、阻塞队列的特点

阻塞队列是一种特殊的队列,它在队列为空时,会阻塞尝试从队列中获取元素的线程;在队列已满时,会阻塞尝试向队列中添加元素的线程。 就像餐厅门口的等位区,如果餐厅满了,新来的客人就只能等着,直到有空位才能进去 🚶‍♀️。 如果等位区空了,服务员会等着,直到有客人来才能安排座位 🧍。

三、Java 提供的常用阻塞队列

Java 提供了多种阻塞队列的实现,每种实现都有其特点和适用场景。 我们来介绍几种常用的:

  1. ArrayBlockingQueue : 基于数组实现的有界阻塞队列
  • 解释: 就像一个固定大小的等位区,只能容纳一定数量的客人。 先来的客人先排队,满了就不能再加人了 🙅‍♀️。
  • 特点:
    • 有界: 必须在创建时指定队列的大小。
    • 公平性: 可以选择公平模式或非公平模式。 公平模式下,等待时间最长的线程会优先获得队列的访问权。
    • 基于数组: 内部使用数组存储元素,因此随机访问效率高。
  • 适用场景: 适用于需要限制队列大小,并且对公平性有要求的场景。
  • 代码示例:
import java.util.concurrent.*;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个容量为 3 的 ArrayBlockingQueue
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        // 向队列中添加元素
        queue.put("Task 1");
        queue.put("Task 2");
        queue.put("Task 3");

        // 尝试添加第四个元素,会阻塞直到队列有空位
        // queue.put("Task 4"); // 这行代码会阻塞

        System.out.println("队列已满,等待空位...");

        // 从队列中取出元素
        System.out.println("取出: " + queue.take()); // 取出 Task 1
        System.out.println("取出: " + queue.take()); // 取出 Task 2
        System.out.println("取出: " + queue.take()); // 取出 Task 3

        // 尝试取出元素,会阻塞直到队列有元素
        // System.out.println("取出: " + queue.take()); // 这行代码会阻塞

        System.out.println("队列已空,等待元素...");
    }
}
  • 输出结果:

在这里插入图片描述

  • 代码解释:
    • put() 方法:如果队列已满,put() 方法会阻塞,直到队列有空位。
    • take() 方法:如果队列为空,take() 方法会阻塞,直到队列有元素。
  1. LinkedBlockingQueue : 基于链表实现的无界/有界阻塞队列
  • 解释: 就像一个可以无限延伸的等位区(如果无界),或者一个可以灵活调整大小的等位区(如果有界)。 客人可以一直排队,直到餐厅实在容纳不下(如果有界) 🚶‍♂️。
  • 特点:
    • 无界/有界: 可以指定队列的大小,也可以不指定(默认为无界)。
    • 基于链表: 内部使用链表存储元素,因此插入和删除效率高。
    • 吞吐量通常高于 ArrayBlockingQueue: 因为链表结构更适合并发操作。
  • 适用场景: 适用于需要高吞吐量,并且对队列大小没有严格限制的场景(无界),或者需要动态调整队列大小的场景(有界)。
  • 代码示例:
import java.util.concurrent.*;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个无界的 LinkedBlockingQueue
        // LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

        // 创建一个容量为 3 的 LinkedBlockingQueue
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);

        // 向队列中添加元素
        queue.put("Task 1");
        queue.put("Task 2");
        queue.put("Task 3");

        // 尝试添加第四个元素,如果是有界队列,会阻塞
        // queue.put("Task 4"); // 如果是有界队列,这行代码会阻塞

        System.out.println("队列已满,等待空位...");

        // 从队列中取出元素
        System.out.println("取出: " + queue.take()); // 取出 Task 1
        System.out.println("取出: " + queue.take()); // 取出 Task 2
        System.out.println("取出: " + queue.take()); // 取出 Task 3

        // 尝试取出元素,会阻塞直到队列有元素
        // System.out.println("取出: " + queue.take()); // 这行代码会阻塞

        System.out.println("队列已空,等待元素...");
    }
}
  • 代码解释:
    • 无界队列:如果没有指定容量,LinkedBlockingQueue 会创建一个无界队列。 理论上,它可以容纳无限多的元素,但实际上会受到系统内存的限制。 使用无界队列需要谨慎,防止 OOM (OutOfMemoryError) 错误!
    • 有界队列:如果指定了容量,LinkedBlockingQueue 会创建一个有界队列。 行为与 ArrayBlockingQueue 类似。
  1. PriorityBlockingQueue : 支持优先级排序的无界阻塞队列
  • 解释: 就像一个可以根据客人身份(优先级)安排座位的等位区。 VIP 客人可以优先入座 👑。
  • 特点:
    • 无界: 理论上可以容纳无限多的元素。
    • 优先级排序: 队列中的元素会根据优先级进行排序。 优先级高的元素会优先被取出。
    • 元素必须实现 Comparable 接口: 或者在创建队列时提供一个 Comparator 对象,用于比较元素的优先级。
  • 适用场景: 适用于需要根据优先级处理任务的场景。
  • 代码示例:
import java.util.concurrent.*;
import java.util.Comparator;

class Task implements Comparable<Task> {
    private int priority;
    private String name;

    public Task(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }

    public int getPriority() {
        return priority;
    }

    @Override
    public int compareTo(Task other) {
        // 优先级高的任务排在前面
        return Integer.compare(other.priority, this.priority);
    }

    @Override
    public String toString() {
        return "Task{" +
                "priority=" + priority +
                ", name='" + name + '\'' +
                '}';
    }
}

public class PriorityBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个 PriorityBlockingQueue
        PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();

        // 向队列中添加任务
        queue.put(new Task(3, "Task C"));
        queue.put(new Task(1, "Task A"));
        queue.put(new Task(2, "Task B"));

        // 从队列中取出任务,会按照优先级排序
        System.out.println("取出: " + queue.take()); // 取出 Task A (优先级 1)
        System.out.println("取出: " + queue.take()); // 取出 Task B (优先级 2)
        System.out.println("取出: " + queue.take()); // 取出 Task C (优先级 3)
    }
}
  • 输出结果:

在这里插入图片描述

  • 代码解释:
    • Task 类实现了 Comparable 接口,用于比较任务的优先级。
    • PriorityBlockingQueue 会根据 compareTo() 方法的返回值对任务进行排序。
    • 优先级最高的任务会优先被取出。
  1. DelayQueue : 支持延时获取元素的无界阻塞队列
  • 解释: 就像一个可以预约座位的等位区。 客人可以提前预约,但只有到了预约时间才能入座 ⏰。
  • 特点:
    • 无界: 理论上可以容纳无限多的元素。
    • 延时获取: 队列中的元素只有在延时时间到达后才能被取出。
    • 元素必须实现 Delayed 接口: Delayed 接口继承自 Comparable 接口,用于比较元素的剩余延时时间。
  • 适用场景: 适用于需要延时执行任务的场景,例如:定时任务、缓存过期等。
  • 代码示例:
import java.util.concurrent.*;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

class DelayedTask implements Delayed {
    private String name;
    private Instant startTime;

    public DelayedTask(String name, long delayInMilliseconds) {
        this.name = name;
        this.startTime = Instant.now().plusMillis(delayInMilliseconds);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime.toEpochMilli() - Instant.now().toEpochMilli();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "DelayedTask{" +
                "name='" + name + '\'' +
                ", startTime=" + startTime +
                '}';
    }
}

public class DelayQueueExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个 DelayQueue
        DelayQueue<DelayedTask> queue = new DelayQueue<>();

        // 向队列中添加任务
        queue.put(new DelayedTask("Task A", 2000)); // 2 秒后执行
        queue.put(new DelayedTask("Task B", 1000)); // 1 秒后执行
        queue.put(new DelayedTask("Task C", 3000)); // 3 秒后执行

        // 从队列中取出任务,会按照延时时间排序
        System.out.println("取出: " + queue.take()); // 1 秒后取出 Task B
        System.out.println("取出: " + queue.take()); // 2 秒后取出 Task A
        System.out.println("取出: " + queue.take()); // 3 秒后取出 Task C
    }
}
  • 输出结果:

在这里插入图片描述

  • 代码解释:
    • DelayedTask 类实现了 Delayed 接口,用于表示一个延时任务。
    • getDelay() 方法返回任务的剩余延时时间。
    • compareTo() 方法用于比较任务的剩余延时时间。
    • DelayQueue 会根据 getDelay() 方法的返回值对任务进行排序。
    • 只有延时时间到达的任务才能被取出。
  1. SynchronousQueue : 不存储元素的阻塞队列
  • 解释: 就像一个只能同时容纳一个客人的小餐馆。 客人来了,必须立刻有服务员接待,否则客人就走了 🏃。
  • 特点:
    • 不存储元素: SynchronousQueue 不存储任何元素。 每个插入操作必须等待一个相应的移除操作,反之亦然。
    • 公平性: 可以选择公平模式或非公平模式。
    • 吞吐量高: 因为没有实际的存储操作,所以吞吐量很高。
  • 适用场景: 适用于生产者和消费者需要直接交换数据的场景,例如:线程池的 Executors.newCachedThreadPool()
  • 代码示例:
 import java.util.concurrent.*;

 public class SynchronousQueueExample {
     public static void main(String[] args) throws InterruptedException {
         // 创建一个 SynchronousQueue
         SynchronousQueue<String> queue = new SynchronousQueue<>();

         // 生产者线程
         Thread producer = new Thread(() -> {
             try {
                 System.out.println("生产者:准备放入数据");
                 queue.put("Data"); // 放入数据,会阻塞直到有消费者取出
                 System.out.println("生产者:数据已放入");
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         });

         // 消费者线程
         Thread consumer = new Thread(() -> {
             try {
                 Thread.sleep(1000); // 模拟消费者需要一些时间准备
                 System.out.println("消费者:准备取出数据");
                 String data = queue.take(); // 取出数据,会阻塞直到有生产者放入
                 System.out.println("消费者:取出数据:" + data);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         });

         producer.start();
         consumer.start();
     }
 }
  • 输出结果:

在这里插入图片描述

  • 代码解释:
    • put() 方法:生产者线程调用 put() 方法放入数据,会阻塞直到有消费者线程调用 take() 方法取出数据。
    • take() 方法:消费者线程调用 take() 方法取出数据,会阻塞直到有生产者线程调用 put() 方法放入数据。
    • 生产者和消费者必须同步进行,才能完成数据的交换。

四、如何选择合适的阻塞队列?

选择哪种阻塞队列取决于你的应用场景和对队列特性的需求。

  • ArrayBlockingQueue: 适用于需要限制队列大小,并且对公平性有要求的场景。
  • LinkedBlockingQueue: 适用于需要高吞吐量,并且对队列大小没有严格限制的场景(无界),或者需要动态调整队列大小的场景(有界)。
  • PriorityBlockingQueue: 适用于需要根据优先级处理任务的场景。
  • DelayQueue: 适用于需要延时执行任务的场景。
  • SynchronousQueue: 适用于生产者和消费者需要直接交换数据的场景。

五、总结

  • 阻塞队列是一种特殊的队列,它在队列为空时会阻塞尝试获取元素的线程,在队列已满时会阻塞尝试添加元素的线程。
  • Java 提供了多种阻塞队列的实现,每种实现都有其特点和适用场景。
  • 选择合适的阻塞队列取决于你的应用场景和对队列特性的需求。

希望这篇文章能够帮助你理解线程池的阻塞队列! 记住,理解这些概念的关键在于结合实际场景进行思考 🤔。 多写代码,多实践 💻,你就能掌握它们! 🎉

相关文章:

  • 从零开始实现 C++ TinyWebServer 阻塞队列 BlockQueue类详解
  • Java Timer定时任务源码分析
  • 洛谷P1434 [SHOI2002] 滑雪
  • 一篇文章入门Python Flask框架前后端数据库开发实践(pycharm在anaconda环境下)
  • Python Seaborn面试题及参考答案
  • 算法模型从入门到起飞系列——广度优先遍历(BFS)
  • 常见JavaScript页面部分内容显示/隐藏设置总结
  • TextView、AppCompatTextView和MaterialTextView该用哪一个?Android UI 组件发展史与演进对照表
  • Prime: 1靶场渗透测试
  • 【问题解决】Postman 测试报错 406
  • 大模型在肺源性心脏病预测及治疗方案制定中的应用研究报告
  • Linux进程与进程控制学习总结
  • Node.js系列(6)--安全实践指南
  • JVM常用概念之压缩引用
  • 《深度学习》——YOLOv2详解
  • 3651翻转后1的数量
  • 矩阵指数的定义和基本性质
  • c#-单例模式
  • 集群环境下Redis 商品库存系统设计
  • Qt6.8.2中JavaScript调用WebAssembly的js文件<3>
  • 越南前国家主席陈德良因病去世
  • 济南维尔康:公司上届管理层个别人员拒不离岗,致多项业务难以推进
  • 马上评|纠偏“左行右立”,并排站不必有道德压力
  • 全红婵旧伤复发,缺席全国跳水冠军赛
  • 广东“龙舟水”开启:降水少于去年,但仍比常年多两成
  • 北斗专访|星纪魅族郭鹏:AR眼镜正迈入行业发展“破局之年”