线程池阻塞队列:告别资源耗尽,拥抱优雅并发!
目录
- 一、什么是阻塞队列?(简单回顾)
- 二、阻塞队列的特点
- 三、Java 提供的常用阻塞队列
- 四、如何选择合适的阻塞队列?
- 五、总结
🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!
🌟了解 synchronized 关键字请看: synchronized 关键字:线程同步的“VIP 包间”
其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等
如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning
一、什么是阻塞队列?(简单回顾)
还记得我们餐厅的例子吗? 线程池就像一个预先雇佣好一批服务员的餐厅。 客人(任务)来了,直接分配一个空闲的服务员去服务。 如果所有服务员都在忙,新来的客人就要排队。 这个排队的地方,就是阻塞队列 ⏳。
深入了解线程池的拒绝策略请看:线程池拒绝策略:让你的程序不再“爆仓”!
想深入了解线程池请看: 线程池:从入门到精通,只需一杯咖啡的时间
二、阻塞队列的特点
阻塞队列是一种特殊的队列,它在队列为空时,会阻塞尝试从队列中获取元素的线程;在队列已满时,会阻塞尝试向队列中添加元素的线程。 就像餐厅门口的等位区,如果餐厅满了,新来的客人就只能等着,直到有空位才能进去 🚶♀️。 如果等位区空了,服务员会等着,直到有客人来才能安排座位 🧍。
三、Java 提供的常用阻塞队列
Java 提供了多种阻塞队列的实现,每种实现都有其特点和适用场景。 我们来介绍几种常用的:
ArrayBlockingQueue
: 基于数组实现的有界阻塞队列
- 解释: 就像一个固定大小的等位区,只能容纳一定数量的客人。 先来的客人先排队,满了就不能再加人了 🙅♀️。
- 特点:
- 有界: 必须在创建时指定队列的大小。
- 公平性: 可以选择公平模式或非公平模式。 公平模式下,等待时间最长的线程会优先获得队列的访问权。
- 基于数组: 内部使用数组存储元素,因此随机访问效率高。
- 适用场景: 适用于需要限制队列大小,并且对公平性有要求的场景。
- 代码示例:
import java.util.concurrent.*;
public class ArrayBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个容量为 3 的 ArrayBlockingQueue
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 向队列中添加元素
queue.put("Task 1");
queue.put("Task 2");
queue.put("Task 3");
// 尝试添加第四个元素,会阻塞直到队列有空位
// queue.put("Task 4"); // 这行代码会阻塞
System.out.println("队列已满,等待空位...");
// 从队列中取出元素
System.out.println("取出: " + queue.take()); // 取出 Task 1
System.out.println("取出: " + queue.take()); // 取出 Task 2
System.out.println("取出: " + queue.take()); // 取出 Task 3
// 尝试取出元素,会阻塞直到队列有元素
// System.out.println("取出: " + queue.take()); // 这行代码会阻塞
System.out.println("队列已空,等待元素...");
}
}
- 输出结果:
- 代码解释:
put()
方法:如果队列已满,put()
方法会阻塞,直到队列有空位。take()
方法:如果队列为空,take()
方法会阻塞,直到队列有元素。
LinkedBlockingQueue
: 基于链表实现的无界/有界阻塞队列
- 解释: 就像一个可以无限延伸的等位区(如果无界),或者一个可以灵活调整大小的等位区(如果有界)。 客人可以一直排队,直到餐厅实在容纳不下(如果有界) 🚶♂️。
- 特点:
- 无界/有界: 可以指定队列的大小,也可以不指定(默认为无界)。
- 基于链表: 内部使用链表存储元素,因此插入和删除效率高。
- 吞吐量通常高于
ArrayBlockingQueue
: 因为链表结构更适合并发操作。
- 适用场景: 适用于需要高吞吐量,并且对队列大小没有严格限制的场景(无界),或者需要动态调整队列大小的场景(有界)。
- 代码示例:
import java.util.concurrent.*;
public class LinkedBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个无界的 LinkedBlockingQueue
// LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 创建一个容量为 3 的 LinkedBlockingQueue
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3);
// 向队列中添加元素
queue.put("Task 1");
queue.put("Task 2");
queue.put("Task 3");
// 尝试添加第四个元素,如果是有界队列,会阻塞
// queue.put("Task 4"); // 如果是有界队列,这行代码会阻塞
System.out.println("队列已满,等待空位...");
// 从队列中取出元素
System.out.println("取出: " + queue.take()); // 取出 Task 1
System.out.println("取出: " + queue.take()); // 取出 Task 2
System.out.println("取出: " + queue.take()); // 取出 Task 3
// 尝试取出元素,会阻塞直到队列有元素
// System.out.println("取出: " + queue.take()); // 这行代码会阻塞
System.out.println("队列已空,等待元素...");
}
}
- 代码解释:
- 无界队列:如果没有指定容量,
LinkedBlockingQueue
会创建一个无界队列。 理论上,它可以容纳无限多的元素,但实际上会受到系统内存的限制。 使用无界队列需要谨慎,防止 OOM (OutOfMemoryError) 错误! - 有界队列:如果指定了容量,
LinkedBlockingQueue
会创建一个有界队列。 行为与ArrayBlockingQueue
类似。
- 无界队列:如果没有指定容量,
PriorityBlockingQueue
: 支持优先级排序的无界阻塞队列
- 解释: 就像一个可以根据客人身份(优先级)安排座位的等位区。 VIP 客人可以优先入座 👑。
- 特点:
- 无界: 理论上可以容纳无限多的元素。
- 优先级排序: 队列中的元素会根据优先级进行排序。 优先级高的元素会优先被取出。
- 元素必须实现
Comparable
接口: 或者在创建队列时提供一个Comparator
对象,用于比较元素的优先级。
- 适用场景: 适用于需要根据优先级处理任务的场景。
- 代码示例:
import java.util.concurrent.*;
import java.util.Comparator;
class Task implements Comparable<Task> {
private int priority;
private String name;
public Task(int priority, String name) {
this.priority = priority;
this.name = name;
}
public int getPriority() {
return priority;
}
@Override
public int compareTo(Task other) {
// 优先级高的任务排在前面
return Integer.compare(other.priority, this.priority);
}
@Override
public String toString() {
return "Task{" +
"priority=" + priority +
", name='" + name + '\'' +
'}';
}
}
public class PriorityBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 PriorityBlockingQueue
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
// 向队列中添加任务
queue.put(new Task(3, "Task C"));
queue.put(new Task(1, "Task A"));
queue.put(new Task(2, "Task B"));
// 从队列中取出任务,会按照优先级排序
System.out.println("取出: " + queue.take()); // 取出 Task A (优先级 1)
System.out.println("取出: " + queue.take()); // 取出 Task B (优先级 2)
System.out.println("取出: " + queue.take()); // 取出 Task C (优先级 3)
}
}
- 输出结果:
- 代码解释:
Task
类实现了Comparable
接口,用于比较任务的优先级。PriorityBlockingQueue
会根据compareTo()
方法的返回值对任务进行排序。- 优先级最高的任务会优先被取出。
DelayQueue
: 支持延时获取元素的无界阻塞队列
- 解释: 就像一个可以预约座位的等位区。 客人可以提前预约,但只有到了预约时间才能入座 ⏰。
- 特点:
- 无界: 理论上可以容纳无限多的元素。
- 延时获取: 队列中的元素只有在延时时间到达后才能被取出。
- 元素必须实现
Delayed
接口:Delayed
接口继承自Comparable
接口,用于比较元素的剩余延时时间。
- 适用场景: 适用于需要延时执行任务的场景,例如:定时任务、缓存过期等。
- 代码示例:
import java.util.concurrent.*;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
class DelayedTask implements Delayed {
private String name;
private Instant startTime;
public DelayedTask(String name, long delayInMilliseconds) {
this.name = name;
this.startTime = Instant.now().plusMillis(delayInMilliseconds);
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime.toEpochMilli() - Instant.now().toEpochMilli();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayedTask{" +
"name='" + name + '\'' +
", startTime=" + startTime +
'}';
}
}
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 DelayQueue
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// 向队列中添加任务
queue.put(new DelayedTask("Task A", 2000)); // 2 秒后执行
queue.put(new DelayedTask("Task B", 1000)); // 1 秒后执行
queue.put(new DelayedTask("Task C", 3000)); // 3 秒后执行
// 从队列中取出任务,会按照延时时间排序
System.out.println("取出: " + queue.take()); // 1 秒后取出 Task B
System.out.println("取出: " + queue.take()); // 2 秒后取出 Task A
System.out.println("取出: " + queue.take()); // 3 秒后取出 Task C
}
}
- 输出结果:
- 代码解释:
DelayedTask
类实现了Delayed
接口,用于表示一个延时任务。getDelay()
方法返回任务的剩余延时时间。compareTo()
方法用于比较任务的剩余延时时间。DelayQueue
会根据getDelay()
方法的返回值对任务进行排序。- 只有延时时间到达的任务才能被取出。
SynchronousQueue
: 不存储元素的阻塞队列
- 解释: 就像一个只能同时容纳一个客人的小餐馆。 客人来了,必须立刻有服务员接待,否则客人就走了 🏃。
- 特点:
- 不存储元素:
SynchronousQueue
不存储任何元素。 每个插入操作必须等待一个相应的移除操作,反之亦然。 - 公平性: 可以选择公平模式或非公平模式。
- 吞吐量高: 因为没有实际的存储操作,所以吞吐量很高。
- 不存储元素:
- 适用场景: 适用于生产者和消费者需要直接交换数据的场景,例如:线程池的
Executors.newCachedThreadPool()
。 - 代码示例:
import java.util.concurrent.*;
public class SynchronousQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 SynchronousQueue
SynchronousQueue<String> queue = new SynchronousQueue<>();
// 生产者线程
Thread producer = new Thread(() -> {
try {
System.out.println("生产者:准备放入数据");
queue.put("Data"); // 放入数据,会阻塞直到有消费者取出
System.out.println("生产者:数据已放入");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000); // 模拟消费者需要一些时间准备
System.out.println("消费者:准备取出数据");
String data = queue.take(); // 取出数据,会阻塞直到有生产者放入
System.out.println("消费者:取出数据:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
- 输出结果:
- 代码解释:
put()
方法:生产者线程调用put()
方法放入数据,会阻塞直到有消费者线程调用take()
方法取出数据。take()
方法:消费者线程调用take()
方法取出数据,会阻塞直到有生产者线程调用put()
方法放入数据。- 生产者和消费者必须同步进行,才能完成数据的交换。
四、如何选择合适的阻塞队列?
选择哪种阻塞队列取决于你的应用场景和对队列特性的需求。
ArrayBlockingQueue
: 适用于需要限制队列大小,并且对公平性有要求的场景。LinkedBlockingQueue
: 适用于需要高吞吐量,并且对队列大小没有严格限制的场景(无界),或者需要动态调整队列大小的场景(有界)。PriorityBlockingQueue
: 适用于需要根据优先级处理任务的场景。DelayQueue
: 适用于需要延时执行任务的场景。SynchronousQueue
: 适用于生产者和消费者需要直接交换数据的场景。
五、总结
- 阻塞队列是一种特殊的队列,它在队列为空时会阻塞尝试获取元素的线程,在队列已满时会阻塞尝试添加元素的线程。
- Java 提供了多种阻塞队列的实现,每种实现都有其特点和适用场景。
- 选择合适的阻塞队列取决于你的应用场景和对队列特性的需求。
希望这篇文章能够帮助你理解线程池的阻塞队列! 记住,理解这些概念的关键在于结合实际场景进行思考 🤔。 多写代码,多实践 💻,你就能掌握它们! 🎉