当前位置: 首页 > news >正文

Java并发 - 阻塞队列详解

1. 阻塞队列概述

1.1 什么是阻塞队列

阻塞队列(BlockingQueue)是Java并发包中的一个重要组件,它是一个支持两个附加操作的队列:

  • 阻塞插入:当队列满时,插入元素的线程会被阻塞,直到队列不满
  • 阻塞移除:当队列空时,获取元素的线程会被阻塞,直到队列不空

1.2 阻塞队列的核心价值

1.2.1 解决的问题
  • 生产者-消费者问题:自动协调生产者和消费者的速度差异
  • 线程安全:内置同步机制,无需额外的同步代码
  • 流量控制:通过队列容量限制,防止内存溢出
  • 解耦合:生产者和消费者通过队列解耦,提高系统灵活性
1.2.2 应用场景
  • 线程池任务队列:存储待执行的任务
  • 消息中间件:异步消息传递
  • 数据缓冲:平衡不同速度的数据处理组件
  • 事件驱动系统:事件的缓存和分发

1.3 BlockingQueue接口

public interface BlockingQueue<E> extends Queue<E> {// 阻塞插入void put(E e) throws InterruptedException;// 阻塞获取E take() throws InterruptedException;// 超时插入boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;// 超时获取E poll(long timeout, TimeUnit unit) throws InterruptedException;// 剩余容量int remainingCapacity();// 移除指定元素boolean remove(Object o);// 批量操作int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);
}

1.4 操作方法对比

操作类型抛出异常返回特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()不适用不适用

方法说明

  • 抛出异常:操作失败时抛出异常
  • 返回特殊值:操作失败时返回false或null
  • 阻塞:操作失败时阻塞线程直到成功
  • 超时:操作失败时阻塞指定时间,超时后返回失败

2. 阻塞队列实现类详解

2.1 ArrayBlockingQueue

2.1.1 基本特性

ArrayBlockingQueue 是基于数组实现的有界阻塞队列,具有以下特点:

  • 数据结构:底层使用数组存储元素
  • 容量限制:创建时必须指定容量,且容量不可变
  • FIFO顺序:先进先出的元素顺序
  • 公平性:支持公平和非公平的访问策略
  • 线程安全:使用ReentrantLock保证线程安全
2.1.2 构造方法
// 指定容量,默认非公平策略
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity);// 指定容量和公平策略
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity, fair);// 指定容量、公平策略和初始元素
ArrayBlockingQueue<E> queue = new ArrayBlockingQueue<>(capacity, fair, collection);
2.1.3 核心实现原理
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {// 存储元素的数组final Object[] items;// 取元素的索引int takeIndex;// 放元素的索引  int putIndex;// 元素个数int count;// 主锁final ReentrantLock lock;// 等待取元素的条件private final Condition notEmpty;// 等待放元素的条件private final Condition notFull;
}
2.1.4 适用场景
  • 固定容量需求:需要严格控制内存使用的场景
  • 生产消费平衡:生产者和消费者速度相对平衡
  • 线程池任务队列:固定大小的线程池
  • 缓冲区应用:网络IO缓冲、日志缓冲等
2.1.5 完整示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class ArrayBlockingQueueExample {public static void main(String[] args) throws InterruptedException {// 创建容量为5的公平队列BlockingQueue<Task> queue = new ArrayBlockingQueue<>(5, true);// 启动生产者线程Thread producer = new Thread(new Producer(queue), "Producer");// 启动消费者线程Thread consumer1 = new Thread(new Consumer(queue), "Consumer-1");Thread consumer2 = new Thread(new Consumer(queue), "Consumer-2");producer.start();consumer1.start();consumer2.start();// 等待生产者完成producer.join();// 等待队列清空Thread.sleep(2000);// 中断消费者线程consumer1.interrupt();consumer2.interrupt();}static class Task {private final int id;private final String data;public Task(int id, String data) {this.id = id;this.data = data;}@Overridepublic String toString() {return "Task{id=" + id + ", data='" + data + "'}";}}static class Producer implements Runnable {private final BlockingQueue<Task> queue;public Producer(BlockingQueue<Task> queue) {this.queue = queue;}@Overridepublic void run() {try {for (int i = 1; i <= 10; i++) {Task task = new Task(i, "Data-" + i);// 使用put方法,队列满时会阻塞queue.put(task);System.out.println(Thread.currentThread().getName() + " produced: " + task);// 模拟生产耗时Thread.sleep(100);}System.out.println("Producer finished");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Producer interrupted");}}}static class Consumer implements Runnable {private final BlockingQueue<Task> queue;public Consumer(BlockingQueue<Task> queue) {this.queue = queue;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {// 使用poll方法,超时返回nullTask task = queue.poll(1, TimeUnit.SECONDS);if (task != null) {System.out.println(Thread.currentThread().getName() + " consumed: " + task);// 模拟处理耗时Thread.sleep(200);} else {System.out.println(Thread.currentThread().getName() + " timeout, no task available");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(Thread.currentThread().getName() + " interrupted");}}}
}
2.1.6 性能特点

优势

  • 预分配数组,避免动态扩容开销
  • 使用索引操作,访问效率高
  • 支持公平策略,避免线程饥饿

劣势

  • 容量固定,无法动态调整
  • 数组预分配可能浪费内存
  • 单锁设计,高并发时可能成为瓶颈

2.2 LinkedBlockingQueue

2.2.1 基本特性

LinkedBlockingQueue 是基于链表实现的可选有界阻塞队列,具有以下特点:

  • 数据结构:底层使用单向链表存储元素
  • 容量限制:可选择有界或无界(默认Integer.MAX_VALUE)
  • FIFO顺序:先进先出的元素顺序
  • 双锁设计:读写操作使用不同的锁,提高并发性能
  • 动态扩容:根据需要动态创建节点
2.2.2 构造方法
// 无界队列,容量为Integer.MAX_VALUE
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>();// 有界队列,指定容量
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>(capacity);// 使用集合初始化
LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<>(collection);
2.2.3 核心实现原理
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 链表节点static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}// 容量限制private final int capacity;// 当前元素数量private final AtomicInteger count = new AtomicInteger();// 头节点transient Node<E> head;// 尾节点private transient Node<E> last;// 取元素锁private final ReentrantLock takeLock = new ReentrantLock();// 非空条件private final Condition notEmpty = takeLock.newCondition();// 放元素锁private final ReentrantLock putLock = new ReentrantLock();// 非满条件private final Condition notFull = putLock.newCondition();
}
2.2.4 双锁机制优势

读写分离

  • putLock:控制入队操作,保护尾节点
  • takeLock:控制出队操作,保护头节点
  • 并发优势:读写可以同时进行,提高吞吐量
2.2.5 适用场景
  • 生产消费不平衡:生产者和消费者速度差异较大
  • 高并发场景:需要高吞吐量的并发环境
  • 线程池默认选择:ThreadPoolExecutor的默认队列
  • 消息缓冲:异步消息处理系统
2.2.6 完整示例
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class LinkedBlockingQueueExample {private static final AtomicInteger taskIdGenerator = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {// 创建有界队列,容量为100BlockingQueue<Message> queue = new LinkedBlockingQueue<>(100);// 创建多个生产者Thread producer1 = new Thread(new MessageProducer(queue, "Producer-1"), "Producer-1");Thread producer2 = new Thread(new MessageProducer(queue, "Producer-2"), "Producer-2");// 创建多个消费者Thread consumer1 = new Thread(new MessageConsumer(queue), "Consumer-1");Thread consumer2 = new Thread(new MessageConsumer(queue), "Consumer-2");Thread consumer3 = new Thread(new MessageConsumer(queue), "Consumer-3");// 启动所有线程producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();// 等待生产者完成producer1.join();producer2.join();// 等待队列处理完成Thread.sleep(3000);// 停止消费者consumer1.interrupt();consumer2.interrupt();consumer3.interrupt();System.out.println("Final queue size: " + queue.size());}static class Message {private final int id;private final String content;private final String producer;private final long timestamp;public Message(String content, String producer) {this.id = taskIdGenerator.incrementAndGet();this.content = content;this.producer = producer;this.timestamp = System.currentTimeMillis();}@Overridepublic String toString() {return String.format("Message{id=%d, content='%s', producer='%s', timestamp=%d}",id, content, producer, timestamp);}public int getId() { return id; }public String getContent() { return content; }public String getProducer() { return producer; }public long getTimestamp() { return timestamp; }}static class MessageProducer implements Runnable {private final BlockingQueue<Message> queue;private final String producerName;public MessageProducer(BlockingQueue<Message> queue, String producerName) {this.queue = queue;this.producerName = producerName;}@Overridepublic void run() {try {for (int i = 1; i <= 20; i++) {Message message = new Message("Message-" + i, producerName);// 使用offer方法,避免无限阻塞boolean success = queue.offer(message, 2, TimeUnit.SECONDS);if (success) {System.out.println(producerName + " produced: " + message.getId());} else {System.out.println(producerName + " failed to produce: " + message.getId());}// 模拟生产间隔Thread.sleep(50 + (int)(Math.random() * 100));}System.out.println(producerName + " finished producing");} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(producerName + " interrupted");}}}static class MessageConsumer implements Runnable {private final BlockingQueue<Message> queue;public MessageConsumer(BlockingQueue<Message> queue) {this.queue = queue;}@Overridepublic void run() {String consumerName = Thread.currentThread().getName();try {while (!Thread.currentThread().isInterrupted()) {// 使用poll方法,超时机制Message message = queue.poll(1, TimeUnit.SECONDS);if (message != null) {// 模拟消息处理processMessage(message, consumerName);} else {System.out.println(consumerName + " waiting for messages...");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(consumerName + " interrupted");}}private void processMessage(Message message, String consumerName) throws InterruptedException {System.out.println(String.format("%s processing message %d from %s",consumerName, message.getId(), message.getProducer()));// 模拟处理时间Thread.sleep(100 + (int)(Math.random() * 200));System.out.println(String.format("%s completed message %d",consumerName, message.getId()));}}
}
2.2.7 性能特点

优势

  • 双锁设计,读写并发性能好
  • 动态扩容,内存使用灵活
  • 支持有界和无界模式
  • 适合高并发场景

劣势

  • 链表结构,内存开销相对较大
  • 无界模式可能导致内存溢出
  • 节点创建和GC开销
2.2.8 与ArrayBlockingQueue对比
特性ArrayBlockingQueueLinkedBlockingQueue
数据结构数组链表
容量固定有界可选有界/无界
锁机制单锁双锁
内存使用预分配动态分配
并发性能中等较高
适用场景固定容量高并发

2.3 PriorityBlockingQueue

2.3.1 基本特性

PriorityBlockingQueue 是支持优先级排序的无界阻塞队列:

  • 数据结构:基于数组实现的二叉堆
  • 容量限制:无界队列,容量可动态扩展
  • 排序规则:支持自然排序或自定义Comparator
  • 线程安全:使用ReentrantLock保证线程安全
  • 阻塞特性:只有take操作会阻塞,put操作不会阻塞
2.3.2 适用场景
  • 任务调度系统:根据优先级执行任务
  • 事件处理:按重要性处理事件
  • 资源分配:优先分配给高优先级请求
  • 消息队列:VIP消息优先处理
2.3.3 完整示例
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class PriorityBlockingQueueExample {public static void main(String[] args) throws InterruptedException {// 创建优先级队列BlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();// 启动生产者Thread producer = new Thread(new TaskProducer(queue), "Producer");// 启动消费者Thread consumer = new Thread(new TaskConsumer(queue), "Consumer");producer.start();consumer.start();producer.join();Thread.sleep(2000);consumer.interrupt();}static class PriorityTask implements Comparable<PriorityTask> {private final int priority;private final String taskName;private final long createTime;public PriorityTask(int priority, String taskName) {this.priority = priority;this.taskName = taskName;this.createTime = System.currentTimeMillis();}@Overridepublic int compareTo(PriorityTask other) {// 优先级高的排在前面(数字越小优先级越高)int result = Integer.compare(this.priority, other.priority);if (result == 0) {// 优先级相同时,按创建时间排序result = Long.compare(this.createTime, other.createTime);}return result;}@Overridepublic String toString() {return String.format("Task{priority=%d, name='%s', createTime=%d}",priority, taskName, createTime);}public int getPriority() { return priority; }public String getTaskName() { return taskName; }}static class TaskProducer implements Runnable {private final BlockingQueue<PriorityTask> queue;public TaskProducer(BlockingQueue<PriorityTask> queue) {this.queue = queue;}@Overridepublic void run() {try {// 添加不同优先级的任务queue.put(new PriorityTask(3, "Low Priority Task 1"));queue.put(new PriorityTask(1, "High Priority Task 1"));queue.put(new PriorityTask(2, "Medium Priority Task 1"));queue.put(new PriorityTask(1, "High Priority Task 2"));queue.put(new PriorityTask(3, "Low Priority Task 2"));queue.put(new PriorityTask(2, "Medium Priority Task 2"));System.out.println("All tasks produced");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class TaskConsumer implements Runnable {private final BlockingQueue<PriorityTask> queue;public TaskConsumer(BlockingQueue<PriorityTask> queue) {this.queue = queue;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {PriorityTask task = queue.poll(1, TimeUnit.SECONDS);if (task != null) {System.out.println("Processing: " + task);Thread.sleep(500); // 模拟处理时间}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Consumer interrupted");}}}
}

2.4 DelayQueue

2.4.1 基本特性

DelayQueue 是支持延迟获取元素的无界阻塞队列:

  • 延迟特性:元素只有在延迟期满后才能被取出
  • 无界队列:容量无限制
  • 排序规则:按照延迟时间排序
  • 元素要求:元素必须实现Delayed接口
2.4.2 适用场景
  • 定时任务调度:延迟执行任务
  • 缓存过期:缓存元素的过期处理
  • 订单超时:订单超时自动取消
  • 重试机制:延迟重试失败的操作
2.4.3 完整示例
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class DelayQueueExample {public static void main(String[] args) throws InterruptedException {DelayQueue<DelayedTask> queue = new DelayQueue<>();// 启动生产者Thread producer = new Thread(new DelayedTaskProducer(queue), "Producer");// 启动消费者Thread consumer = new Thread(new DelayedTaskConsumer(queue), "Consumer");producer.start();consumer.start();producer.join();Thread.sleep(15000); // 等待所有延迟任务执行完成consumer.interrupt();}static class DelayedTask implements Delayed {private final String taskName;private final long executeTime;public DelayedTask(String taskName, long delayInSeconds) {this.taskName = taskName;this.executeTime = System.currentTimeMillis() + delayInSeconds * 1000;}@Overridepublic long getDelay(TimeUnit unit) {long remaining = executeTime - System.currentTimeMillis();return unit.convert(remaining, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed other) {return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);}@Overridepublic String toString() {return String.format("DelayedTask{name='%s', executeTime=%d, remaining=%d ms}",taskName, executeTime, getDelay(TimeUnit.MILLISECONDS));}public String getTaskName() { return taskName; }}static class DelayedTaskProducer implements Runnable {private final DelayQueue<DelayedTask> queue;public DelayedTaskProducer(DelayQueue<DelayedTask> queue) {this.queue = queue;}@Overridepublic void run() {try {// 添加不同延迟时间的任务queue.put(new DelayedTask("Task-5s", 5));queue.put(new DelayedTask("Task-2s", 2));queue.put(new DelayedTask("Task-8s", 8));queue.put(new DelayedTask("Task-1s", 1));queue.put(new DelayedTask("Task-3s", 3));System.out.println("All delayed tasks added to queue");} catch (Exception e) {e.printStackTrace();}}}static class DelayedTaskConsumer implements Runnable {private final DelayQueue<DelayedTask> queue;public DelayedTaskConsumer(DelayQueue<DelayedTask> queue) {this.queue = queue;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {DelayedTask task = queue.take(); // 阻塞直到有可用元素System.out.println("Executing: " + task.getTaskName() + " at " + System.currentTimeMillis());// 模拟任务执行Thread.sleep(100);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Consumer interrupted");}}}
}

2.5 SynchronousQueue

2.5.1 基本特性

SynchronousQueue 是一个特殊的阻塞队列:

  • 零容量:不存储任何元素
  • 直接传递:每个put操作必须等待对应的take操作
  • 同步机制:生产者和消费者直接交换数据
  • 公平性:支持公平和非公平模式
2.5.2 适用场景
  • CachedThreadPool:Executors.newCachedThreadPool()的默认队列
  • 直接传递:需要立即处理的任务
  • 线程间通信:线程间直接数据交换
  • 背压控制:自然的流量控制机制
2.5.3 完整示例
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class SynchronousQueueExample {public static void main(String[] args) throws InterruptedException {// 创建公平的SynchronousQueueBlockingQueue<String> queue = new SynchronousQueue<>(true);// 启动多个生产者for (int i = 1; i <= 3; i++) {Thread producer = new Thread(new DataProducer(queue, "Producer-" + i));producer.start();}// 启动多个消费者for (int i = 1; i <= 2; i++) {Thread consumer = new Thread(new DataConsumer(queue, "Consumer-" + i));consumer.start();}// 主线程等待Thread.sleep(10000);System.out.println("Main thread finished");}static class DataProducer implements Runnable {private final BlockingQueue<String> queue;private final String producerName;public DataProducer(BlockingQueue<String> queue, String producerName) {this.queue = queue;this.producerName = producerName;}@Overridepublic void run() {try {for (int i = 1; i <= 5; i++) {String data = producerName + "-Data-" + i;System.out.println(producerName + " trying to put: " + data);// 使用offer方法,避免无限阻塞boolean success = queue.offer(data, 2, TimeUnit.SECONDS);if (success) {System.out.println(producerName + " successfully put: " + data);} else {System.out.println(producerName + " failed to put: " + data + " (timeout)");}Thread.sleep(1000);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(producerName + " interrupted");}}}static class DataConsumer implements Runnable {private final BlockingQueue<String> queue;private final String consumerName;public DataConsumer(BlockingQueue<String> queue, String consumerName) {this.queue = queue;this.consumerName = consumerName;}@Overridepublic void run() {try {while (true) {System.out.println(consumerName + " trying to take data...");String data = queue.poll(3, TimeUnit.SECONDS);if (data != null) {System.out.println(consumerName + " received: " + data);// 模拟处理时间Thread.sleep(500);} else {System.out.println(consumerName + " timeout, no data available");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(consumerName + " interrupted");}}}
}

2.6 LinkedTransferQueue

2.6.1 基本特性

LinkedTransferQueue 是基于链表的无界阻塞队列,实现了TransferQueue接口:

  • 数据结构:基于链表的无锁算法
  • 容量限制:无界队列
  • 传递模式:支持直接传递和异步传递
  • 性能特点:高并发性能,无锁实现
  • 特殊方法:transfer()方法可直接传递给等待的消费者
2.6.2 核心方法
方法描述阻塞行为
transfer(E e)直接传递给消费者,如果没有消费者则阻塞阻塞
tryTransfer(E e)尝试直接传递,失败则返回false非阻塞
tryTransfer(E e, long timeout, TimeUnit unit)在指定时间内尝试传递超时阻塞
hasWaitingConsumer()检查是否有等待的消费者非阻塞
getWaitingConsumerCount()获取等待消费者数量非阻塞
2.6.3 适用场景
  • 实时数据传递:需要立即处理的数据
  • 请求-响应模式:Web服务器处理请求
  • 事件驱动系统:事件的实时分发
  • 高性能消息传递:替代SynchronousQueue的高性能方案
2.6.4 完整示例
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.TimeUnit;public class LinkedTransferQueueExample {public static void main(String[] args) throws InterruptedException {TransferQueue<Message> queue = new LinkedTransferQueue<>();// 启动消费者(先启动,模拟等待状态)Thread consumer1 = new Thread(new MessageConsumer(queue, "Consumer-1"));Thread consumer2 = new Thread(new MessageConsumer(queue, "Consumer-2"));consumer1.start();consumer2.start();// 等待消费者启动Thread.sleep(1000);// 启动生产者Thread producer = new Thread(new MessageProducer(queue, "Producer"));producer.start();producer.join();Thread.sleep(3000);consumer1.interrupt();consumer2.interrupt();}static class Message {private final String content;private final long timestamp;private final int priority;public Message(String content, int priority) {this.content = content;this.priority = priority;this.timestamp = System.currentTimeMillis();}@Overridepublic String toString() {return String.format("Message{content='%s', priority=%d, timestamp=%d}",content, priority, timestamp);}public String getContent() { return content; }public int getPriority() { return priority; }}static class MessageProducer implements Runnable {private final TransferQueue<Message> queue;private final String producerName;public MessageProducer(TransferQueue<Message> queue, String producerName) {this.queue = queue;this.producerName = producerName;}@Overridepublic void run() {try {for (int i = 1; i <= 8; i++) {Message message = new Message("Message-" + i, i % 3 + 1);System.out.println(producerName + " checking waiting consumers: " + queue.getWaitingConsumerCount());if (queue.hasWaitingConsumer()) {// 直接传递给等待的消费者System.out.println(producerName + " transferring: " + message.getContent());queue.transfer(message);System.out.println(producerName + " transferred: " + message.getContent());} else {// 尝试传递,如果没有消费者则放入队列boolean transferred = queue.tryTransfer(message, 500, TimeUnit.MILLISECONDS);if (transferred) {System.out.println(producerName + " try-transferred: " + message.getContent());} else {queue.put(message);System.out.println(producerName + " queued: " + message.getContent());}}Thread.sleep(800);}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(producerName + " interrupted");}}}static class MessageConsumer implements Runnable {private final TransferQueue<Message> queue;private final String consumerName;public MessageConsumer(TransferQueue<Message> queue, String consumerName) {this.queue = queue;this.consumerName = consumerName;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {System.out.println(consumerName + " waiting for message...");Message message = queue.poll(2, TimeUnit.SECONDS);if (message != null) {System.out.println(consumerName + " received: " + message);// 模拟处理时间(根据优先级)Thread.sleep(message.getPriority() * 300);System.out.println(consumerName + " processed: " + message.getContent());} else {System.out.println(consumerName + " timeout, no message");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(consumerName + " interrupted");}}}
}

2.7 LinkedBlockingDeque

2.7.1 基本特性

LinkedBlockingDeque 是基于链表的双端阻塞队列:

  • 数据结构:双向链表
  • 容量限制:可选择有界或无界
  • 双端操作:支持从头部和尾部插入/移除
  • 线程安全:使用ReentrantLock保证线程安全
  • 阻塞特性:支持双端的阻塞操作
2.7.2 核心方法
操作类型头部操作尾部操作说明
插入addFirst(), offerFirst(), putFirst()addLast(), offerLast(), putLast()从两端插入
移除removeFirst(), pollFirst(), takeFirst()removeLast(), pollLast(), takeLast()从两端移除
检查getFirst(), peekFirst()getLast(), peekLast()检查两端元素
2.7.3 适用场景
  • 工作窃取算法:ForkJoinPool的任务队列
  • 双端缓存:LRU缓存实现
  • 撤销操作:支持撤销的操作队列
  • 双向数据流:需要双向处理的数据
2.7.4 完整示例
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;public class LinkedBlockingDequeExample {public static void main(String[] args) throws InterruptedException {// 创建容量为10的双端队列BlockingDeque<Task> deque = new LinkedBlockingDeque<>(10);// 启动工作线程(从头部获取高优先级任务)Thread worker1 = new Thread(new Worker(deque, "Worker-1", true));Thread worker2 = new Thread(new Worker(deque, "Worker-2", false));// 启动任务生产者Thread producer = new Thread(new TaskProducer(deque));worker1.start();worker2.start();producer.start();producer.join();Thread.sleep(5000);worker1.interrupt();worker2.interrupt();}static class Task {private final String taskId;private final int priority;private final String description;public Task(String taskId, int priority, String description) {this.taskId = taskId;this.priority = priority;this.description = description;}@Overridepublic String toString() {return String.format("Task{id='%s', priority=%d, desc='%s'}",taskId, priority, description);}public int getPriority() { return priority; }public String getTaskId() { return taskId; }}static class TaskProducer implements Runnable {private final BlockingDeque<Task> deque;public TaskProducer(BlockingDeque<Task> deque) {this.deque = deque;}@Overridepublic void run() {try {// 添加不同优先级的任务for (int i = 1; i <= 12; i++) {int priority = (i % 3) + 1;Task task = new Task("T" + i, priority, "Task " + i + " description");if (priority == 1) {// 高优先级任务放在头部deque.putFirst(task);System.out.println("Added high priority task to front: " + task.getTaskId());} else {// 普通任务放在尾部deque.putLast(task);System.out.println("Added normal task to rear: " + task.getTaskId());}Thread.sleep(300);}System.out.println("All tasks produced. Queue size: " + deque.size());} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("Producer interrupted");}}}static class Worker implements Runnable {private final BlockingDeque<Task> deque;private final String workerName;private final boolean preferHighPriority;public Worker(BlockingDeque<Task> deque, String workerName, boolean preferHighPriority) {this.deque = deque;this.workerName = workerName;this.preferHighPriority = preferHighPriority;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {Task task;if (preferHighPriority) {// 优先从头部获取(高优先级任务)task = deque.pollFirst(1, TimeUnit.SECONDS);if (task != null) {System.out.println(workerName + " took from FRONT: " + task);}} else {// 从尾部获取(LIFO方式,类似栈)task = deque.pollLast(1, TimeUnit.SECONDS);if (task != null) {System.out.println(workerName + " took from REAR: " + task);}}if (task != null) {// 模拟任务处理时间Thread.sleep(task.getPriority() * 200);System.out.println(workerName + " completed: " + task.getTaskId());} else {System.out.println(workerName + " timeout, no task available");}}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println(workerName + " interrupted");}}}
}

3. 阻塞队列性能对比

3.1 性能特性对比

队列类型数据结构容量锁机制并发性能内存使用适用场景
ArrayBlockingQueue数组有界单锁中等预分配固定容量
LinkedBlockingQueue链表可选双锁动态高并发
PriorityBlockingQueue无界单锁中等动态优先级
DelayQueue无界单锁中等动态延迟处理
SynchronousQueue0CAS最小直接传递
LinkedTransferQueue链表无界无锁最高动态高性能
LinkedBlockingDeque双向链表可选单锁中高动态双端操作

3.2 吞吐量测试

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;public class BlockingQueuePerformanceTest {private static final int PRODUCER_COUNT = 4;private static final int CONSUMER_COUNT = 4;private static final int MESSAGES_PER_PRODUCER = 100000;private static final int QUEUE_CAPACITY = 1000;public static void main(String[] args) throws InterruptedException {System.out.println("阻塞队列性能测试\n");testQueue("ArrayBlockingQueue", new ArrayBlockingQueue<>(QUEUE_CAPACITY));testQueue("LinkedBlockingQueue", new LinkedBlockingQueue<>(QUEUE_CAPACITY));testQueue("SynchronousQueue", new SynchronousQueue<>());testQueue("LinkedTransferQueue", new LinkedTransferQueue<>());}private static void testQueue(String queueName, BlockingQueue<Integer> queue) throws InterruptedException {System.out.println("测试队列: " + queueName);AtomicLong producedCount = new AtomicLong(0);AtomicLong consumedCount = new AtomicLong(0);long startTime = System.currentTimeMillis();// 启动生产者Thread[] producers = new Thread[PRODUCER_COUNT];for (int i = 0; i < PRODUCER_COUNT; i++) {producers[i] = new Thread(new Producer(queue, producedCount));producers[i].start();}// 启动消费者Thread[] consumers = new Thread[CONSUMER_COUNT];for (int i = 0; i < CONSUMER_COUNT; i++) {consumers[i] = new Thread(new Consumer(queue, consumedCount));consumers[i].start();}// 等待生产者完成for (Thread producer : producers) {producer.join();}// 等待消费完成while (consumedCount.get() < PRODUCER_COUNT * MESSAGES_PER_PRODUCER) {Thread.sleep(10);}// 停止消费者for (Thread consumer : consumers) {consumer.interrupt();}long endTime = System.currentTimeMillis();long duration = endTime - startTime;long totalMessages = PRODUCER_COUNT * MESSAGES_PER_PRODUCER;double throughput = (double) totalMessages / duration * 1000;System.out.printf("  总消息数: %d\n", totalMessages);System.out.printf("  执行时间: %d ms\n", duration);System.out.printf("  吞吐量: %.2f messages/sec\n\n", throughput);}static class Producer implements Runnable {private final BlockingQueue<Integer> queue;private final AtomicLong counter;public Producer(BlockingQueue<Integer> queue, AtomicLong counter) {this.queue = queue;this.counter = counter;}@Overridepublic void run() {try {for (int i = 0; i < MESSAGES_PER_PRODUCER; i++) {queue.put(i);counter.incrementAndGet();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}static class Consumer implements Runnable {private final BlockingQueue<Integer> queue;private final AtomicLong counter;public Consumer(BlockingQueue<Integer> queue, AtomicLong counter) {this.queue = queue;this.counter = counter;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {Integer message = queue.take();counter.incrementAndGet();// 模拟少量处理时间if (message % 10000 == 0) {Thread.sleep(1);}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}

4. 最佳实践

4.1 选择合适的阻塞队列

4.1.1 决策树
需要阻塞队列?
├─ 是否需要优先级?
│  ├─ 是 → PriorityBlockingQueue
│  └─ 否 → 继续
├─ 是否需要延迟处理?
│  ├─ 是 → DelayQueue
│  └─ 否 → 继续
├─ 是否需要双端操作?
│  ├─ 是 → LinkedBlockingDeque
│  └─ 否 → 继续
├─ 是否需要直接传递?
│  ├─ 是 → SynchronousQueue 或 LinkedTransferQueue
│  └─ 否 → 继续
├─ 容量是否固定?
│  ├─ 是 → ArrayBlockingQueue
│  └─ 否 → LinkedBlockingQueue
4.1.2 场景推荐

高并发Web服务器

// 使用LinkedTransferQueue处理请求
TransferQueue<HttpRequest> requestQueue = new LinkedTransferQueue<>();// 或者使用LinkedBlockingQueue
BlockingQueue<HttpRequest> requestQueue = new LinkedBlockingQueue<>(10000);

任务调度系统

// 使用PriorityBlockingQueue按优先级处理
BlockingQueue<ScheduledTask> taskQueue = new PriorityBlockingQueue<>();// 使用DelayQueue处理延迟任务
DelayQueue<DelayedTask> delayedTasks = new DelayQueue<>();

线程池配置

// 固定大小线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,new ArrayBlockingQueue<>(queueCapacity)
);// 缓存线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<>()
);

4.2 性能优化技巧

4.2.1 批量操作
public class BatchProcessor {private final BlockingQueue<Task> queue;private final int batchSize;public BatchProcessor(BlockingQueue<Task> queue, int batchSize) {this.queue = queue;this.batchSize = batchSize;}public void processBatch() throws InterruptedException {List<Task> batch = new ArrayList<>(batchSize);// 使用drainTo批量获取int count = queue.drainTo(batch, batchSize);if (count > 0) {// 批量处理processTasks(batch);}}private void processTasks(List<Task> tasks) {// 批量处理逻辑for (Task task : tasks) {// 处理单个任务}}
}
4.2.2 避免无限阻塞
public class SafeProducer implements Runnable {private final BlockingQueue<Message> queue;private volatile boolean running = true;@Overridepublic void run() {while (running) {try {Message message = createMessage();// 使用超时方法,避免无限阻塞boolean success = queue.offer(message, 5, TimeUnit.SECONDS);if (!success) {// 处理队列满的情况handleQueueFull(message);}} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}public void stop() {running = false;}
}
4.2.3 监控队列状态
public class QueueMonitor {private final BlockingQueue<?> queue;private final ScheduledExecutorService scheduler;public QueueMonitor(BlockingQueue<?> queue) {this.queue = queue;this.scheduler = Executors.newScheduledThreadPool(1);}public void startMonitoring() {scheduler.scheduleAtFixedRate(() -> {int size = queue.size();int remaining = queue.remainingCapacity();System.out.printf("Queue Status - Size: %d, Remaining: %d, Usage: %.2f%%\n",size, remaining, (double) size / (size + remaining) * 100);// 队列使用率过高时告警if (size > (size + remaining) * 0.8) {System.out.println("WARNING: Queue usage is high!");}}, 0, 10, TimeUnit.SECONDS);}
}

4.3 常见陷阱和解决方案

4.3.1 内存泄漏

问题:无界队列可能导致内存溢出

解决方案

// 使用有界队列
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(10000);// 或者实现背压机制
public class BackpressureProducer {private final BlockingQueue<Task> queue;private final AtomicInteger pendingTasks = new AtomicInteger(0);private final int maxPendingTasks;public boolean submitTask(Task task) {if (pendingTasks.get() >= maxPendingTasks) {return false; // 拒绝新任务}boolean success = queue.offer(task);if (success) {pendingTasks.incrementAndGet();}return success;}
}
4.3.2 线程中断处理

问题:不正确的中断处理

解决方案

public class ProperInterruptHandling implements Runnable {private final BlockingQueue<Task> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {Task task = queue.take();processTask(task);}} catch (InterruptedException e) {// 恢复中断状态Thread.currentThread().interrupt();// 清理资源cleanup();}}private void processTask(Task task) {// 处理任务}private void cleanup() {// 清理资源}
}
4.3.3 死锁预防

问题:多个队列操作可能导致死锁

解决方案

public class DeadlockFreeProcessor {private final BlockingQueue<Task> inputQueue;private final BlockingQueue<Result> outputQueue;public void processWithTimeout() {try {// 使用超时方法避免死锁Task task = inputQueue.poll(1, TimeUnit.SECONDS);if (task != null) {Result result = process(task);boolean success = outputQueue.offer(result, 1, TimeUnit.SECONDS);if (!success) {// 处理输出队列满的情况handleOutputQueueFull(result);}}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

4.4 总结

阻塞队列是Java并发编程中的重要工具,正确选择和使用阻塞队列可以:

  1. 简化并发编程:自动处理线程同步
  2. 提高系统性能:减少线程间的竞争
  3. 增强系统稳定性:提供流量控制机制
  4. 改善代码可维护性:清晰的生产者-消费者模式

选择建议

  • 高并发场景:LinkedTransferQueue > LinkedBlockingQueue > ArrayBlockingQueue
  • 内存敏感场景:ArrayBlockingQueue > LinkedBlockingQueue
  • 特殊需求场景:根据具体需求选择PriorityBlockingQueue、DelayQueue等
  • 直接传递场景:SynchronousQueue 或 LinkedTransferQueue

通过合理选择和正确使用阻塞队列,可以构建高效、稳定的并发应用程序。

http://www.dtcms.com/a/272462.html

相关文章:

  • XSS(ctfshow)
  • 文心大模型4.5开源测评:保姆级部署教程+多维度测试验证
  • 图书管理系统(完结版)
  • PyCharm 中 Python 解释器的添加选项及作用
  • 创始人IP如何进阶?三次关键突破实现高效转化
  • QT解析文本框数据——详解
  • pycharm中自动补全方法返回变量
  • 自动化脚本配置网络IP、主机名、网段
  • React封装过哪些组件-下拉选择器和弹窗表单
  • 常用的.gitconfig 配置
  • 【显示模块】嵌入式显示与触摸屏技术理论
  • HarmonyOS AI辅助编程工具(CodeGenie)UI生成
  • 时序数据库的存储之道:从数据特性看技术要点
  • 使用深度学习框架yolov8训练监控视角下非机动车电动车头盔佩戴检测数据集VOC+YOLO格式11999张4类别步骤和流程
  • UEditor 对接 秀米 手机编辑器流程与问题
  • ClickHouse 查看正在执行的SQL查询
  • Django--01基本请求与响应流程
  • go go go 出发咯 - go web开发入门系列(四) 数据库ORM框架集成与解读
  • selenium跳转到新页面时如何进行定位
  • 前缀和|差分
  • S7-1200 与 S7-300 PNS7-400 PN UDP 通信 TIA 相同项目
  • 缓存一致性问题(Cache Coherence Problem)是什么?
  • 使用Word/Excel管理需求的10个痛点及解决方案Perforce ALM
  • Word中字号与公式字体磅值(pt)的对应关系
  • 【AI智能体】智能音视频-通过关键词打断语音对话
  • RuoYi-Cloud ruoyi-gateway 网关模块
  • 海外盲盒系统:技术如何重构“信任经济”?
  • LLM 微调:从数据到部署的全流程实践与经验分享
  • 前端开发资源压缩与请求优化
  • FFmpeg滤镜相关的重要结构体