Java并发 - 阻塞队列详解
1. 阻塞队列概述
1.1 什么是阻塞队列
阻塞队列(BlockingQueue)是Java并发包中的一个重要组件,它是一个支持两个附加操作的队列:
- 阻塞插入:当队列满时,插入元素的线程会被阻塞,直到队列不满
- 阻塞移除:当队列空时,获取元素的线程会被阻塞,直到队列不空
1.2 阻塞队列的核心价值
1.2.1 解决的问题
- 生产者-消费者问题:自动协调生产者和消费者的速度差异
- 线程安全:内置同步机制,无需额外的同步代码
- 流量控制:通过队列容量限制,防止内存溢出
- 解耦合:生产者和消费者通过队列解耦,提高系统灵活性
1.2.2 应用场景
- 线程池任务队列:存储待执行的任务
- 消息中间件:异步消息传递
- 数据缓冲:平衡不同速度的数据处理组件
- 事件驱动系统:事件的缓存和分发
1.3 BlockingQueue接口
public interface BlockingQueue<E> extends Queue<E> {// 阻塞插入void put(E e) throws InterruptedException;// 阻塞获取E take() throws InterruptedException;// 超时插入boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 超时获取E poll(long timeout, TimeUnit unit) throws InterruptedException;// 剩余容量int remainingCapacity();// 移除指定元素boolean remove(Object o);// 批量操作int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}
1.4 操作方法对比
操作类型 | 抛出异常 | 返回特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不适用 | 不适用 |
方法说明:
- 抛出异常:操作失败时抛出异常
- 返回特殊值:操作失败时返回false或null
- 阻塞:操作失败时阻塞线程直到成功
- 超时:操作失败时阻塞指定时间,超时后返回失败
2. 阻塞队列实现类详解
2.1 ArrayBlockingQueue
2.1.1 基本特性
ArrayBlockingQueue 是基于数组实现的有界阻塞队列,具有以下特点:
- 数据结构:底层使用数组存储元素
- 容量限制:创建时必须指定容量,且容量不可变
- FIFO顺序:先进先出的元素顺序
- 公平性:支持公平和非公平的访问策略
- 线程安全:使用ReentrantLock保证线程安全
2.1.2 构造方法
// 指定容量,默认非公平策略
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity);// 指定容量和公平策略
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity, fair);// 指定容量、公平策略和初始元素
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity, fair, collection);
2.1.3 核心实现原理
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {// 存储元素的数组final Object[] items;// 取元素的索引int takeIndex;// 放元素的索引 int putIndex;// 元素个数int count;// 主锁final ReentrantLock lock;// 等待取元素的条件private final Condition notEmpty;// 等待放元素的条件private final Condition notFull;
}
2.1.4 适用场景
- 固定容量需求:需要严格控制内存使用的场景
- 生产消费平衡:生产者和消费者速度相对平衡
- 线程池任务队列:固定大小的线程池
- 缓冲区应用:网络IO缓冲、日志缓冲等
2.1.5 完整示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class ArrayBlockingQueueExample {public static void main(String[] args) throws InterruptedException {// 创建容量为5的公平队列BlockingQueue<Task> queue = new ArrayBlockingQueue<>(5, true);// 启动生产者线程Thread producer = new Thread(new Producer(queue), "Producer");// 启动消费者线程Thread consumer1 = new Thread(new Consumer(queue), "Consumer-1");Thread consumer2 = new Thread(new Consumer(queue), "Consumer-2");producer.start();consumer1.start();consumer2.start();// 等待生产者完成producer.join();// 等待队列清空Thread.sleep(2000);// 中断消费者线程consumer1.interrupt();consumer2.interrupt();}static class Task {private final int id;private final String data;public Task(int id, String data) {this.id = id;this.data = data;}@Overridepublic String toString() {return "Task{id=" + id + ", data='" + data + "'}";}}static class Producer implements Runnable {private final BlockingQueue<Task> queue;public Producer(BlockingQueue<Task> queue) {this.queue = queue;}@Overridepublic void run() {try {for (int i = 1; i <= 10; i++) {Task task = new Task(i, "Data-" + i);// 使用put方法,队列满时会阻塞queue.put(task);System.out.println(Thread.currentThread().getName() + " produced: " + task);// 模拟生产耗时Thread.sleep(100);}System.out.println("Producer finished");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Producer interrupted");}}}static class Consumer implements Runnable {private final BlockingQueue<Task> queue;public Consumer(BlockingQueue<Task> queue) {this.queue = queue;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {// 使用poll方法,超时返回nullTask task = queue.poll(1, TimeUnit.SECONDS);if (task != null) {System.out.println(Thread.currentThread().getName() + " consumed: " + task);// 模拟处理耗时Thread.sleep(200);} else {System.out.println(Thread.currentThread().getName() + " timeout, no task available");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(Thread.currentThread().getName() + " interrupted");}}}
}
2.1.6 性能特点
优势:
- 预分配数组,避免动态扩容开销
- 使用索引操作,访问效率高
- 支持公平策略,避免线程饥饿
劣势:
- 容量固定,无法动态调整
- 数组预分配可能浪费内存
- 单锁设计,高并发时可能成为瓶颈
2.2 LinkedBlockingQueue
2.2.1 基本特性
LinkedBlockingQueue 是基于链表实现的可选有界阻塞队列,具有以下特点:
- 数据结构:底层使用单向链表存储元素
- 容量限制:可选择有界或无界(默认Integer.MAX_VALUE)
- FIFO顺序:先进先出的元素顺序
- 双锁设计:读写操作使用不同的锁,提高并发性能
- 动态扩容:根据需要动态创建节点
2.2.2 构造方法
// 无界队列,容量为Integer.MAX_VALUE
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>();// 有界队列,指定容量
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>(capacity);// 使用集合初始化
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>(collection);
2.2.3 核心实现原理
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 链表节点static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}// 容量限制private final int capacity;// 当前元素数量private final AtomicInteger count = new AtomicInteger();// 头节点transient Node<E> head;// 尾节点private transient Node<E> last;// 取元素锁private final ReentrantLock takeLock = new ReentrantLock();// 非空条件private final Condition notEmpty = takeLock.newCondition();// 放元素锁private final ReentrantLock putLock = new ReentrantLock();// 非满条件private final Condition notFull = putLock.newCondition();
}
2.2.4 双锁机制优势
读写分离:
- putLock:控制入队操作,保护尾节点
- takeLock:控制出队操作,保护头节点
- 并发优势:读写可以同时进行,提高吞吐量
2.2.5 适用场景
- 生产消费不平衡:生产者和消费者速度差异较大
- 高并发场景:需要高吞吐量的并发环境
- 线程池默认选择:ThreadPoolExecutor的默认队列
- 消息缓冲:异步消息处理系统
2.2.6 完整示例
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class LinkedBlockingQueueExample {private static final AtomicInteger taskIdGenerator = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {// 创建有界队列,容量为100BlockingQueue<Message> queue = new LinkedBlockingQueue<>(100);// 创建多个生产者Thread producer1 = new Thread(new MessageProducer(queue, "Producer-1"), "Producer-1");Thread producer2 = new Thread(new MessageProducer(queue, "Producer-2"), "Producer-2");// 创建多个消费者Thread consumer1 = new Thread(new MessageConsumer(queue), "Consumer-1");Thread consumer2 = new Thread(new MessageConsumer(queue), "Consumer-2");Thread consumer3 = new Thread(new MessageConsumer(queue), "Consumer-3");// 启动所有线程producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();// 等待生产者完成producer1.join();producer2.join();// 等待队列处理完成Thread.sleep(3000);// 停止消费者consumer1.interrupt();consumer2.interrupt();consumer3.interrupt();System.out.println("Final queue size: " + queue.size());}static class Message {private final int id;private final String content;private final String producer;private final long timestamp;public Message(String content, String producer) {this.id = taskIdGenerator.incrementAndGet();this.content = content;this.producer = producer;this.timestamp = System.currentTimeMillis();}@Overridepublic String toString() {return String.format("Message{id=%d, content='%s', producer='%s', timestamp=%d}",id, content, producer, timestamp);}public int getId() { return id; }public String getContent() { return content; }public String getProducer() { return producer; }public long getTimestamp() { return timestamp; }}static class MessageProducer implements Runnable {private final BlockingQueue<Message> queue;private final String producerName;public MessageProducer(BlockingQueue<Message> queue, String producerName) {this.queue = queue;this.producerName = producerName;}@Overridepublic void run() {try {for (int i = 1; i <= 20; i++) {Message message = new Message("Message-" + i, producerName);// 使用offer方法,避免无限阻塞boolean success = queue.offer(message, 2, TimeUnit.SECONDS);if (success) {System.out.println(producerName + " produced: " + message.getId());} else {System.out.println(producerName + " failed to produce: " + message.getId());}// 模拟生产间隔Thread.sleep(50 + (int)(Math.random() * 100));}System.out.println(producerName + " finished producing");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(producerName + " interrupted");}}}static class MessageConsumer implements Runnable {private final BlockingQueue<Message> queue;public MessageConsumer(BlockingQueue<Message> queue) {this.queue = queue;}@Overridepublic void run() {String consumerName = Thread.currentThread().getName();try {while (!Thread.currentThread().isInterrupted()) {// 使用poll方法,超时机制Message message = queue.poll(1, TimeUnit.SECONDS);if (message != null) {// 模拟消息处理processMessage(message, consumerName);} else {System.out.println(consumerName + " waiting for messages...");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(consumerName + " interrupted");}}private void processMessage(Message message, String consumerName) throws InterruptedException {System.out.println(String.format("%s processing message %d from %s",consumerName, message.getId(), message.getProducer()));// 模拟处理时间Thread.sleep(100 + (int)(Math.random() * 200));System.out.println(String.format("%s completed message %d",consumerName, message.getId()));}}
}
2.2.7 性能特点
优势:
- 双锁设计,读写并发性能好
- 动态扩容,内存使用灵活
- 支持有界和无界模式
- 适合高并发场景
劣势:
- 链表结构,内存开销相对较大
- 无界模式可能导致内存溢出
- 节点创建和GC开销
2.2.8 与ArrayBlockingQueue对比
特性 | ArrayBlockingQueue | LinkedBlockingQueue |
---|---|---|
数据结构 | 数组 | 链表 |
容量 | 固定有界 | 可选有界/无界 |
锁机制 | 单锁 | 双锁 |
内存使用 | 预分配 | 动态分配 |
并发性能 | 中等 | 较高 |
适用场景 | 固定容量 | 高并发 |
2.3 PriorityBlockingQueue
2.3.1 基本特性
PriorityBlockingQueue 是支持优先级排序的无界阻塞队列:
- 数据结构:基于数组实现的二叉堆
- 容量限制:无界队列,容量可动态扩展
- 排序规则:支持自然排序或自定义Comparator
- 线程安全:使用ReentrantLock保证线程安全
- 阻塞特性:只有take操作会阻塞,put操作不会阻塞
2.3.2 适用场景
- 任务调度系统:根据优先级执行任务
- 事件处理:按重要性处理事件
- 资源分配:优先分配给高优先级请求
- 消息队列:VIP消息优先处理
2.3.3 完整示例
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class PriorityBlockingQueueExample {public static void main(String[] args) throws InterruptedException {// 创建优先级队列BlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();// 启动生产者Thread producer = new Thread(new TaskProducer(queue), "Producer");// 启动消费者Thread consumer = new Thread(new TaskConsumer(queue), "Consumer");producer.start();consumer.start();producer.join();Thread.sleep(2000);consumer.interrupt();}static class PriorityTask implements Comparable<PriorityTask> {private final int priority;private final String taskName;private final long createTime;public PriorityTask(int priority, String taskName) {this.priority = priority;this.taskName = taskName;this.createTime = System.currentTimeMillis();}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面(数字越小优先级越高)int result = Integer.compare(this.priority, other.priority);if (result == 0) {// 优先级相同时,按创建时间排序result = Long.compare(this.createTime, other.createTime);}return result;}@Overridepublic String toString() {return String.format("Task{priority=%d, name='%s', createTime=%d}",priority, taskName, createTime);}public int getPriority() { return priority; }public String getTaskName() { return taskName; }}static class TaskProducer implements Runnable {private final BlockingQueue<PriorityTask> queue;public TaskProducer(BlockingQueue<PriorityTask> queue) {this.queue = queue;}@Overridepublic void run() {try {// 添加不同优先级的任务queue.put(new PriorityTask(3, "Low Priority Task 1"));queue.put(new PriorityTask(1, "High Priority Task 1"));queue.put(new PriorityTask(2, "Medium Priority Task 1"));queue.put(new PriorityTask(1, "High Priority Task 2"));queue.put(new PriorityTask(3, "Low Priority Task 2"));queue.put(new PriorityTask(2, "Medium Priority Task 2"));System.out.println("All tasks produced");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class TaskConsumer implements Runnable {private final BlockingQueue<PriorityTask> queue;public TaskConsumer(BlockingQueue<PriorityTask> queue) {this.queue = queue;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {PriorityTask task = queue.poll(1, TimeUnit.SECONDS);if (task != null) {System.out.println("Processing: " + task);Thread.sleep(500); // 模拟处理时间}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Consumer interrupted");}}}
}
2.4 DelayQueue
2.4.1 基本特性
DelayQueue 是支持延迟获取元素的无界阻塞队列:
- 延迟特性:元素只有在延迟期满后才能被取出
- 无界队列:容量无限制
- 排序规则:按照延迟时间排序
- 元素要求:元素必须实现Delayed接口
2.4.2 适用场景
- 定时任务调度:延迟执行任务
- 缓存过期:缓存元素的过期处理
- 订单超时:订单超时自动取消
- 重试机制:延迟重试失败的操作
2.4.3 完整示例
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class DelayQueueExample {public static void main(String[] args) throws InterruptedException {DelayQueue<DelayedTask> queue = new DelayQueue<>();// 启动生产者Thread producer = new Thread(new DelayedTaskProducer(queue), "Producer");// 启动消费者Thread consumer = new Thread(new DelayedTaskConsumer(queue), "Consumer");producer.start();consumer.start();producer.join();Thread.sleep(15000); // 等待所有延迟任务执行完成consumer.interrupt();}static class DelayedTask implements Delayed {private final String taskName;private final long executeTime;public DelayedTask(String taskName, long delayInSeconds) {this.taskName = taskName;this.executeTime = System.currentTimeMillis() + delayInSeconds * 1000;}@Overridepublic long getDelay(TimeUnit unit) {long remaining = executeTime - System.currentTimeMillis();return unit.convert(remaining, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed other) {return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);}@Overridepublic String toString() {return String.format("DelayedTask{name='%s', executeTime=%d, remaining=%d ms}",taskName, executeTime, getDelay(TimeUnit.MILLISECONDS));}public String getTaskName() { return taskName; }}static class DelayedTaskProducer implements Runnable {private final DelayQueue<DelayedTask> queue;public DelayedTaskProducer(DelayQueue<DelayedTask> queue) {this.queue = queue;}@Overridepublic void run() {try {// 添加不同延迟时间的任务queue.put(new DelayedTask("Task-5s", 5));queue.put(new DelayedTask("Task-2s", 2));queue.put(new DelayedTask("Task-8s", 8));queue.put(new DelayedTask("Task-1s", 1));queue.put(new DelayedTask("Task-3s", 3));System.out.println("All delayed tasks added to queue");} catch (Exception e) {e.printStackTrace();}}}static class DelayedTaskConsumer implements Runnable {private final DelayQueue<DelayedTask> queue;public DelayedTaskConsumer(DelayQueue<DelayedTask> queue) {this.queue = queue;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {DelayedTask task = queue.take(); // 阻塞直到有可用元素System.out.println("Executing: " + task.getTaskName() + " at " + System.currentTimeMillis());// 模拟任务执行Thread.sleep(100);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Consumer interrupted");}}}
}
2.5 SynchronousQueue
2.5.1 基本特性
SynchronousQueue 是一个特殊的阻塞队列:
- 零容量:不存储任何元素
- 直接传递:每个put操作必须等待对应的take操作
- 同步机制:生产者和消费者直接交换数据
- 公平性:支持公平和非公平模式
2.5.2 适用场景
- CachedThreadPool:Executors.newCachedThreadPool()的默认队列
- 直接传递:需要立即处理的任务
- 线程间通信:线程间直接数据交换
- 背压控制:自然的流量控制机制
2.5.3 完整示例
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class SynchronousQueueExample {public static void main(String[] args) throws InterruptedException {// 创建公平的SynchronousQueueBlockingQueue<String> queue = new SynchronousQueue<>(true);// 启动多个生产者for (int i = 1; i <= 3; i++) {Thread producer = new Thread(new DataProducer(queue, "Producer-" + i));producer.start();}// 启动多个消费者for (int i = 1; i <= 2; i++) {Thread consumer = new Thread(new DataConsumer(queue, "Consumer-" + i));consumer.start();}// 主线程等待Thread.sleep(10000);System.out.println("Main thread finished");}static class DataProducer implements Runnable {private final BlockingQueue<String> queue;private final String producerName;public DataProducer(BlockingQueue<String> queue, String producerName) {this.queue = queue;this.producerName = producerName;}@Overridepublic void run() {try {for (int i = 1; i <= 5; i++) {String data = producerName + "-Data-" + i;System.out.println(producerName + " trying to put: " + data);// 使用offer方法,避免无限阻塞boolean success = queue.offer(data, 2, TimeUnit.SECONDS);if (success) {System.out.println(producerName + " successfully put: " + data);} else {System.out.println(producerName + " failed to put: " + data + " (timeout)");}Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(producerName + " interrupted");}}}static class DataConsumer implements Runnable {private final BlockingQueue<String> queue;private final String consumerName;public DataConsumer(BlockingQueue<String> queue, String consumerName) {this.queue = queue;this.consumerName = consumerName;}@Overridepublic void run() {try {while (true) {System.out.println(consumerName + " trying to take data...");String data = queue.poll(3, TimeUnit.SECONDS);if (data != null) {System.out.println(consumerName + " received: " + data);// 模拟处理时间Thread.sleep(500);} else {System.out.println(consumerName + " timeout, no data available");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(consumerName + " interrupted");}}}
}
2.6 LinkedTransferQueue
2.6.1 基本特性
LinkedTransferQueue 是基于链表的无界阻塞队列,实现了TransferQueue接口:
- 数据结构:基于链表的无锁算法
- 容量限制:无界队列
- 传递模式:支持直接传递和异步传递
- 性能特点:高并发性能,无锁实现
- 特殊方法:transfer()方法可直接传递给等待的消费者
2.6.2 核心方法
方法 | 描述 | 阻塞行为 |
---|---|---|
transfer(E e) | 直接传递给消费者,如果没有消费者则阻塞 | 阻塞 |
tryTransfer(E e) | 尝试直接传递,失败则返回false | 非阻塞 |
tryTransfer(E e, long timeout, TimeUnit unit) | 在指定时间内尝试传递 | 超时阻塞 |
hasWaitingConsumer() | 检查是否有等待的消费者 | 非阻塞 |
getWaitingConsumerCount() | 获取等待消费者数量 | 非阻塞 |
2.6.3 适用场景
- 实时数据传递:需要立即处理的数据
- 请求-响应模式:Web服务器处理请求
- 事件驱动系统:事件的实时分发
- 高性能消息传递:替代SynchronousQueue的高性能方案
2.6.4 完整示例
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.TimeUnit;public class LinkedTransferQueueExample {public static void main(String[] args) throws InterruptedException {TransferQueue<Message> queue = new LinkedTransferQueue<>();// 启动消费者(先启动,模拟等待状态)Thread consumer1 = new Thread(new MessageConsumer(queue, "Consumer-1"));Thread consumer2 = new Thread(new MessageConsumer(queue, "Consumer-2"));consumer1.start();consumer2.start();// 等待消费者启动Thread.sleep(1000);// 启动生产者Thread producer = new Thread(new MessageProducer(queue, "Producer"));producer.start();producer.join();Thread.sleep(3000);consumer1.interrupt();consumer2.interrupt();}static class Message {private final String content;private final long timestamp;private final int priority;public Message(String content, int priority) {this.content = content;this.priority = priority;this.timestamp = System.currentTimeMillis();}@Overridepublic String toString() {return String.format("Message{content='%s', priority=%d, timestamp=%d}",content, priority, timestamp);}public String getContent() { return content; }public int getPriority() { return priority; }}static class MessageProducer implements Runnable {private final TransferQueue<Message> queue;private final String producerName;public MessageProducer(TransferQueue<Message> queue, String producerName) {this.queue = queue;this.producerName = producerName;}@Overridepublic void run() {try {for (int i = 1; i <= 8; i++) {Message message = new Message("Message-" + i, i % 3 + 1);System.out.println(producerName + " checking waiting consumers: " + queue.getWaitingConsumerCount());if (queue.hasWaitingConsumer()) {// 直接传递给等待的消费者System.out.println(producerName + " transferring: " + message.getContent());queue.transfer(message);System.out.println(producerName + " transferred: " + message.getContent());} else {// 尝试传递,如果没有消费者则放入队列boolean transferred = queue.tryTransfer(message, 500, TimeUnit.MILLISECONDS);if (transferred) {System.out.println(producerName + " try-transferred: " + message.getContent());} else {queue.put(message);System.out.println(producerName + " queued: " + message.getContent());}}Thread.sleep(800);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(producerName + " interrupted");}}}static class MessageConsumer implements Runnable {private final TransferQueue<Message> queue;private final String consumerName;public MessageConsumer(TransferQueue<Message> queue, String consumerName) {this.queue = queue;this.consumerName = consumerName;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {System.out.println(consumerName + " waiting for message...");Message message = queue.poll(2, TimeUnit.SECONDS);if (message != null) {System.out.println(consumerName + " received: " + message);// 模拟处理时间(根据优先级)Thread.sleep(message.getPriority() * 300);System.out.println(consumerName + " processed: " + message.getContent());} else {System.out.println(consumerName + " timeout, no message");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(consumerName + " interrupted");}}}
}
2.7 LinkedBlockingDeque
2.7.1 基本特性
LinkedBlockingDeque 是基于链表的双端阻塞队列:
- 数据结构:双向链表
- 容量限制:可选择有界或无界
- 双端操作:支持从头部和尾部插入/移除
- 线程安全:使用ReentrantLock保证线程安全
- 阻塞特性:支持双端的阻塞操作
2.7.2 核心方法
操作类型 | 头部操作 | 尾部操作 | 说明 |
---|---|---|---|
插入 | addFirst() , offerFirst() , putFirst() | addLast() , offerLast() , putLast() | 从两端插入 |
移除 | removeFirst() , pollFirst() , takeFirst() | removeLast() , pollLast() , takeLast() | 从两端移除 |
检查 | getFirst() , peekFirst() | getLast() , peekLast() | 检查两端元素 |
2.7.3 适用场景
- 工作窃取算法:ForkJoinPool的任务队列
- 双端缓存:LRU缓存实现
- 撤销操作:支持撤销的操作队列
- 双向数据流:需要双向处理的数据
2.7.4 完整示例
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;public class LinkedBlockingDequeExample {public static void main(String[] args) throws InterruptedException {// 创建容量为10的双端队列BlockingDeque<Task> deque = new LinkedBlockingDeque<>(10);// 启动工作线程(从头部获取高优先级任务)Thread worker1 = new Thread(new Worker(deque, "Worker-1", true));Thread worker2 = new Thread(new Worker(deque, "Worker-2", false));// 启动任务生产者Thread producer = new Thread(new TaskProducer(deque));worker1.start();worker2.start();producer.start();producer.join();Thread.sleep(5000);worker1.interrupt();worker2.interrupt();}static class Task {private final String taskId;private final int priority;private final String description;public Task(String taskId, int priority, String description) {this.taskId = taskId;this.priority = priority;this.description = description;}@Overridepublic String toString() {return String.format("Task{id='%s', priority=%d, desc='%s'}",taskId, priority, description);}public int getPriority() { return priority; }public String getTaskId() { return taskId; }}static class TaskProducer implements Runnable {private final BlockingDeque<Task> deque;public TaskProducer(BlockingDeque<Task> deque) {this.deque = deque;}@Overridepublic void run() {try {// 添加不同优先级的任务for (int i = 1; i <= 12; i++) {int priority = (i % 3) + 1;Task task = new Task("T" + i, priority, "Task " + i + " description");if (priority == 1) {// 高优先级任务放在头部deque.putFirst(task);System.out.println("Added high priority task to front: " + task.getTaskId());} else {// 普通任务放在尾部deque.putLast(task);System.out.println("Added normal task to rear: " + task.getTaskId());}Thread.sleep(300);}System.out.println("All tasks produced. Queue size: " + deque.size());} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Producer interrupted");}}}static class Worker implements Runnable {private final BlockingDeque<Task> deque;private final String workerName;private final boolean preferHighPriority;public Worker(BlockingDeque<Task> deque, String workerName, boolean preferHighPriority) {this.deque = deque;this.workerName = workerName;this.preferHighPriority = preferHighPriority;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {Task task;if (preferHighPriority) {// 优先从头部获取(高优先级任务)task = deque.pollFirst(1, TimeUnit.SECONDS);if (task != null) {System.out.println(workerName + " took from FRONT: " + task);}} else {// 从尾部获取(LIFO方式,类似栈)task = deque.pollLast(1, TimeUnit.SECONDS);if (task != null) {System.out.println(workerName + " took from REAR: " + task);}}if (task != null) {// 模拟任务处理时间Thread.sleep(task.getPriority() * 200);System.out.println(workerName + " completed: " + task.getTaskId());} else {System.out.println(workerName + " timeout, no task available");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(workerName + " interrupted");}}}
}
3. 阻塞队列性能对比
3.1 性能特性对比
队列类型 | 数据结构 | 容量 | 锁机制 | 并发性能 | 内存使用 | 适用场景 |
---|---|---|---|---|---|---|
ArrayBlockingQueue | 数组 | 有界 | 单锁 | 中等 | 预分配 | 固定容量 |
LinkedBlockingQueue | 链表 | 可选 | 双锁 | 高 | 动态 | 高并发 |
PriorityBlockingQueue | 堆 | 无界 | 单锁 | 中等 | 动态 | 优先级 |
DelayQueue | 堆 | 无界 | 单锁 | 中等 | 动态 | 延迟处理 |
SynchronousQueue | 无 | 0 | CAS | 高 | 最小 | 直接传递 |
LinkedTransferQueue | 链表 | 无界 | 无锁 | 最高 | 动态 | 高性能 |
LinkedBlockingDeque | 双向链表 | 可选 | 单锁 | 中高 | 动态 | 双端操作 |
3.2 吞吐量测试
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;public class BlockingQueuePerformanceTest {private static final int PRODUCER_COUNT = 4;private static final int CONSUMER_COUNT = 4;private static final int MESSAGES_PER_PRODUCER = 100000;private static final int QUEUE_CAPACITY = 1000;public static void main(String[] args) throws InterruptedException {System.out.println("阻塞队列性能测试\n");testQueue("ArrayBlockingQueue", new ArrayBlockingQueue<>(QUEUE_CAPACITY));testQueue("LinkedBlockingQueue", new LinkedBlockingQueue<>(QUEUE_CAPACITY));testQueue("SynchronousQueue", new SynchronousQueue<>());testQueue("LinkedTransferQueue", new LinkedTransferQueue<>());}private static void testQueue(String queueName, BlockingQueue<Integer> queue) throws InterruptedException {System.out.println("测试队列: " + queueName);AtomicLong producedCount = new AtomicLong(0);AtomicLong consumedCount = new AtomicLong(0);long startTime = System.currentTimeMillis();// 启动生产者Thread[] producers = new Thread[PRODUCER_COUNT];for (int i = 0; i < PRODUCER_COUNT; i++) {producers[i] = new Thread(new Producer(queue, producedCount));producers[i].start();}// 启动消费者Thread[] consumers = new Thread[CONSUMER_COUNT];for (int i = 0; i < CONSUMER_COUNT; i++) {consumers[i] = new Thread(new Consumer(queue, consumedCount));consumers[i].start();}// 等待生产者完成for (Thread producer : producers) {producer.join();}// 等待消费完成while (consumedCount.get() < PRODUCER_COUNT * MESSAGES_PER_PRODUCER) {Thread.sleep(10);}// 停止消费者for (Thread consumer : consumers) {consumer.interrupt();}long endTime = System.currentTimeMillis();long duration = endTime - startTime;long totalMessages = PRODUCER_COUNT * MESSAGES_PER_PRODUCER;double throughput = (double) totalMessages / duration * 1000;System.out.printf(" 总消息数: %d\n", totalMessages);System.out.printf(" 执行时间: %d ms\n", duration);System.out.printf(" 吞吐量: %.2f messages/sec\n\n", throughput);}static class Producer implements Runnable {private final BlockingQueue<Integer> queue;private final AtomicLong counter;public Producer(BlockingQueue<Integer> queue, AtomicLong counter) {this.queue = queue;this.counter = counter;}@Overridepublic void run() {try {for (int i = 0; i < MESSAGES_PER_PRODUCER; i++) {queue.put(i);counter.incrementAndGet();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class Consumer implements Runnable {private final BlockingQueue<Integer> queue;private final AtomicLong counter;public Consumer(BlockingQueue<Integer> queue, AtomicLong counter) {this.queue = queue;this.counter = counter;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {Integer message = queue.take();counter.incrementAndGet();// 模拟少量处理时间if (message % 10000 == 0) {Thread.sleep(1);}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
4. 最佳实践
4.1 选择合适的阻塞队列
4.1.1 决策树
需要阻塞队列?
├─ 是否需要优先级?
│ ├─ 是 → PriorityBlockingQueue
│ └─ 否 → 继续
├─ 是否需要延迟处理?
│ ├─ 是 → DelayQueue
│ └─ 否 → 继续
├─ 是否需要双端操作?
│ ├─ 是 → LinkedBlockingDeque
│ └─ 否 → 继续
├─ 是否需要直接传递?
│ ├─ 是 → SynchronousQueue 或 LinkedTransferQueue
│ └─ 否 → 继续
├─ 容量是否固定?
│ ├─ 是 → ArrayBlockingQueue
│ └─ 否 → LinkedBlockingQueue
4.1.2 场景推荐
高并发Web服务器:
// 使用LinkedTransferQueue处理请求
TransferQueue<HttpRequest> requestQueue = new LinkedTransferQueue<>();// 或者使用LinkedBlockingQueue
BlockingQueue<HttpRequest> requestQueue = new LinkedBlockingQueue<>(10000);
任务调度系统:
// 使用PriorityBlockingQueue按优先级处理
BlockingQueue<ScheduledTask> taskQueue = new PriorityBlockingQueue<>();// 使用DelayQueue处理延迟任务
DelayQueue<DelayedTask> delayedTasks = new DelayQueue<>();
线程池配置:
// 固定大小线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<>(queueCapacity)
);// 缓存线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<>()
);
4.2 性能优化技巧
4.2.1 批量操作
public class BatchProcessor {private final BlockingQueue<Task> queue;private final int batchSize;public BatchProcessor(BlockingQueue<Task> queue, int batchSize) {this.queue = queue;this.batchSize = batchSize;}public void processBatch() throws InterruptedException {List<Task> batch = new ArrayList<>(batchSize);// 使用drainTo批量获取int count = queue.drainTo(batch, batchSize);if (count > 0) {// 批量处理processTasks(batch);}}private void processTasks(List<Task> tasks) {// 批量处理逻辑for (Task task : tasks) {// 处理单个任务}}
}
4.2.2 避免无限阻塞
public class SafeProducer implements Runnable {private final BlockingQueue<Message> queue;private volatile boolean running = true;@Overridepublic void run() {while (running) {try {Message message = createMessage();// 使用超时方法,避免无限阻塞boolean success = queue.offer(message, 5, TimeUnit.SECONDS);if (!success) {// 处理队列满的情况handleQueueFull(message);}} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}public void stop() {running = false;}
}
4.2.3 监控队列状态
public class QueueMonitor {private final BlockingQueue<?> queue;private final ScheduledExecutorService scheduler;public QueueMonitor(BlockingQueue<?> queue) {this.queue = queue;this.scheduler = Executors.newScheduledThreadPool(1);}public void startMonitoring() {scheduler.scheduleAtFixedRate(() -> {int size = queue.size();int remaining = queue.remainingCapacity();System.out.printf("Queue Status - Size: %d, Remaining: %d, Usage: %.2f%%\n",size, remaining, (double) size / (size + remaining) * 100);// 队列使用率过高时告警if (size > (size + remaining) * 0.8) {System.out.println("WARNING: Queue usage is high!");}}, 0, 10, TimeUnit.SECONDS);}
}
4.3 常见陷阱和解决方案
4.3.1 内存泄漏
问题:无界队列可能导致内存溢出
解决方案:
// 使用有界队列
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(10000);// 或者实现背压机制
public class BackpressureProducer {private final BlockingQueue<Task> queue;private final AtomicInteger pendingTasks = new AtomicInteger(0);private final int maxPendingTasks;public boolean submitTask(Task task) {if (pendingTasks.get() >= maxPendingTasks) {return false; // 拒绝新任务}boolean success = queue.offer(task);if (success) {pendingTasks.incrementAndGet();}return success;}
}
4.3.2 线程中断处理
问题:不正确的中断处理
解决方案:
public class ProperInterruptHandling implements Runnable {private final BlockingQueue<Task> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {Task task = queue.take();processTask(task);}} catch (InterruptedException e) {// 恢复中断状态Thread.currentThread().interrupt();// 清理资源cleanup();}}private void processTask(Task task) {// 处理任务}private void cleanup() {// 清理资源}
}
4.3.3 死锁预防
问题:多个队列操作可能导致死锁
解决方案:
public class DeadlockFreeProcessor {private final BlockingQueue<Task> inputQueue;private final BlockingQueue<Result> outputQueue;public void processWithTimeout() {try {// 使用超时方法避免死锁Task task = inputQueue.poll(1, TimeUnit.SECONDS);if (task != null) {Result result = process(task);boolean success = outputQueue.offer(result, 1, TimeUnit.SECONDS);if (!success) {// 处理输出队列满的情况handleOutputQueueFull(result);}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
4.4 总结
阻塞队列是Java并发编程中的重要工具,正确选择和使用阻塞队列可以:
- 简化并发编程:自动处理线程同步
- 提高系统性能:减少线程间的竞争
- 增强系统稳定性:提供流量控制机制
- 改善代码可维护性:清晰的生产者-消费者模式
选择建议:
- 高并发场景:LinkedTransferQueue > LinkedBlockingQueue > ArrayBlockingQueue
- 内存敏感场景:ArrayBlockingQueue > LinkedBlockingQueue
- 特殊需求场景:根据具体需求选择PriorityBlockingQueue、DelayQueue等
- 直接传递场景:SynchronousQueue 或 LinkedTransferQueue
通过合理选择和正确使用阻塞队列,可以构建高效、稳定的并发应用程序。