Java基于数组的阻塞队列实现详解
在多线程编程中,阻塞队列是一种非常有用的工具,它可以在生产者和消费者之间提供一个缓冲区,使得生产者可以往队列中添加数据,而消费者可以从队列中取出数据。当队列满时,生产者会被阻塞直到有空间可用;当队列空时,消费者会被阻塞直到有数据可用。本文将详细分析一个基于数组实现的阻塞队列代码。
代码结构概述
我们先来浏览一下整个代码的结构:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ArrBlockQueue<E> {// 成员变量private Object[] arr;private int size = 0;private int head = 0; // 存数据位置private int last = 0; // 取数位置private ReentrantLock lock = new ReentrantLock();private Condition notEmpty = lock.newCondition();private Condition notFull = lock.newCondition();// 构造函数public ArrBlockQueue(int len) {arr = new Object[len];}// 添加数据方法public void put(E e) throws InterruptedException {lock.lock();try {while (size == arr.length) {notFull.await();}System.out.println("put 添加数据 " + e);arr[head] = e;head = (head + 1) % arr.length;size++;notEmpty.signal();} finally {lock.unlock();}}// 取出数据方法(未完成)public E take() throws InterruptedException {}// 获取队列大小方法public int size() {lock.lock();try {return size;} finally {lock.unlock();}}// 主方法(测试用)public static void main(String[] args) {// 测试代码}
}
关键组件解析
1. 同步控制部分
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
这里使用了ReentrantLock
和Condition
来实现同步控制:
-
ReentrantLock
:这是一个可重入的互斥锁,它可以替代synchronized
关键字提供更灵活的锁定机制。在这里,它用于确保对共享资源的独占访问。 -
Condition
:这是ReentrantLock
的一个条件变量接口,用于实现复杂的等待/通知机制。在这里,我们创建了两个条件变量:notEmpty
:用于通知消费者队列已非空,可以取数据了。notFull
:用于通知生产者队列已非满,可以添加数据了。
2. 数据存储部分
private Object[] arr;
private int size = 0;
private int head = 0; // 存数据位置
private int last = 0; // 取数位置
这是阻塞队列的核心数据结构:
arr
:一个对象数组,用于存储队列中的元素。size
:当前队列中的元素数量。head
:指向队列头部(下一个要添加元素的位置)。last
:指向队列尾部(下一个要取出元素的位置)。
3. 添加数据方法
public void put(E e) throws InterruptedException {lock.lock();try {while (size == arr.length) {notFull.await();}System.out.println("put 添加数据 " + e);arr[head] = e;head = (head + 1) % arr.length;size++;notEmpty.signal();} finally {lock.unlock();}
}
这是生产者线程调用的方法:
-
lock.lock()
:获取锁,确保线程安全。 -
while (size == arr.length)
:检查队列是否已满。如果队列满了,调用notFull.await()
使当前线程进入等待状态,直到队列有空间可用。 -
添加数据到
arr
数组的head
位置,更新head
指针,并增加size
计数。 -
notEmpty.signal()
:通知可能正在等待队列非空的消费者线程,队列已经可以取出数据了。 -
lock.unlock()
:释放锁。
代码运行示例
在main
方法中,创建了一个长度为5的阻塞队列实例:
ArrBlockQueue<Integer> block = new ArrBlockQueue<>(5);
启动两个线程:
- 一个生产者线程,循环添加10个元素到队列中:
new Thread(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": put 线程启动!");for (int i = 0; i < 10; i++) {try {block.put(i);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}).start();
- 一个消费者线程,循环从队列中取出10个元素:
new Thread(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ": take 线程启动!");for (int i = 0; i < 10; i++) {try {Thread.sleep(500);System.out.println(block.take());} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}).start();
总结
这个基于数组的阻塞队列实现展示了如何使用ReentrantLock
和Condition
来实现复杂的等待/通知机制。它为生产者和消费者模式提供了一个线程安全的解决方案,确保了数据的一致性和线程间的高效协作。理解这种实现方式有助于我们更好地掌握Java并发编程的核心概念。