Java集合详解:LinkedBlockingQueue
1. 简介
java.util.concurrent.LinkedBlockingQueue
是一种线程安全的有界阻塞式FIFO链表队列,这种队列的容量是有限的,当队列已满时,生产者线程需要挂起等待,一旦消费者出队了元素,腾出了空间,就要唤醒生产者线程,当队列为空时,消费者线程需要挂起等待,一旦生产者入队了新元素,也需要唤醒消费者线程。LinkedBlockingQueue
采用了一种基于双锁队列的高效阻塞式同步算法,并且在双锁队列的基础上进行了改进,改进了双锁队列算法在 Java 平台下可能会出现的一些性能问题,下面我们先介绍一下双锁队列以及 LinkedBlockingQueue
的改进措施。
2. 双锁队列 & LinkedBlockingQueue 的改进
双锁队列是一种高效的线程安全的阻塞式有界队列,使用链表实现队列,链表使用双指针域,头指针域指向队列的首结点,队列的首节点是一个哨兵结点,没有具体意义,哨兵结点的后继结点才是队列的第一个元素结点,尾指针域指向队列的尾结点,因此可以在 O(1) 时间到达尾结点,入队操作就是让队尾结点的后继指针域指向新结点,同时让尾指针指向新结点,出队操作就是让头指针域向前推进,指向头结点的后继。同步算法主要使用双锁 + 条件机制 + 原子变量等同步机制来实现;采用这些机制的原因如下:
- 双锁:我们知道,FIFO队列的基本操作是入队和出队,这两种操作分别只在队列的队首和队尾进行,一般情况下是互不干扰的(一般情况就是队列不空或不满,当队列为空或者队列满时,是特殊情况),为了保证生产者和消费者线程可以同时访问队列,使用双锁,入队锁用于同步生产者线程,出队锁用于同步消费者线程,同一时间只允许有一个生产者线程入队和一个消费者线程出队。这种双锁机制可以避免入队者锁住出队者,从而避免了因为锁竞争而发生的消费者饥饿和队列数据堆积问题。
- 条件机制:为了控制生产者线程和消费者线程的等待和唤醒,需要引入条件机制,通过条件机制的等待、通知机制来控制生产者和消费者在特殊情况下的协作和同步,因为条件机制可能会导致唤醒丢失问题,因此双锁队列采用每次唤醒所有等待线程的方式来避免唤醒丢失。
- 原子变量:因为队列是有界的,每次入队操作都需要判断队列是否已满,所以需要准确的统计队列中的元素数量,因此使用原子变量来保存元素数量,原子变量可以保证统计值在多线程环境下的准确性。
唤醒丢失问题:当条件发生时,一个或多个线程并没有得到通知,导致这些线程一直等待下去。
双锁队列存在的问题主要有两个
- 频繁唤醒阻塞导致的性能损失问题:双锁队列使用了条件机制,采用总是通知所有等待条件的线程的方式来避免唤醒丢失问题,这种方法虽然简单,但其问题主要在于,如果某一个时刻一侧的线程非常不活跃,那么另一侧的所有线程就会被频繁的唤醒 + 阻塞,这样会浪费很多的性能。比如某一个时刻,队列为空,消费者线程都在等待,生产者很长时间才入队一个元素,入队后唤醒了所有消费者线程,这些消费者线程会同时竞争出队锁,结果只有一个线程能够消费到数据,其他线程在竞争到锁之后,发现队列为空,又会重新挂起等待,很明显,除了消费到数据的线程之外,唤醒其他线程是没有必要的。所以更好的方法就是按需唤醒,按需唤醒需要经过合理的设计,以避免唤醒丢失;
- 在 Java 平台下,导致的 GC 问题:由于链表队列的出队操作就是将头指针指向头结点的后继,使其向前推进,因此头指针经过的结点就是已出队的结点,这些已出队的结点会通过
next
指针域链接在一起,并且每一个结点都有被无限期使用的可能性,一旦发生某一个已出队节点被无限期使用的情况,这将会导致两个问题:A. 会导致该结点之后的所有出队结点都是 GC 可达的,从而无法被回收,导致内存泄漏,并且随着不断入队和出队操作,出队结点链会越来越长,最终发生内存溢出。B. 无法被回收的出队节点都会进入老年代,而新加入的节点会分配在新生代,这些节点通过 next 指针域相连接,会出现旧结点和新结点之间的跨代链接问题,目前的 GC 无法处理这种跨代链接问题,只会不断地重复执行 MajorGC。
LinkedBlockingQueue 的改进
对于双锁队列的问题,LinkedBlockingQueue
采取了一些方法进行改进:
- 对于问题1:
LinkedBlockingQueue
使用了按需唤醒的机制,即每一次条件发生变化时,只唤醒一个相关的等待线程,为了解决唤醒丢失问题,在每次入队或出队操作,都会进行一些一致性判断,判断是否还有未唤醒的线程,保证每一次只唤醒需要唤醒的线程,避免的无谓的唤醒和阻塞带来的性能损失; - 对于问题2:
LinkedBlockingQueue
在每次出队后,将已出队结点的next
指针域指向其自身来打断出队结点的引用链就,帮助已出队结点被 GC 回收。
3. LinkedBlockingQueue 的实现
下面我们通过 LinkedBlockingQueue
的源码,来分析一下它的实现原理,源代码 C1 就是 LinkedBlockingQueue
的实现,我们只列出了主要成员和入队、出队的方法,本文我们只分析 LinkedBlockingQueue
队列的基本操作入队和出队,其他方法有兴趣的同学可以去看 JDK 的源码。
C1:LinkedBlockingQueue 的主要成员和方法
1 -> public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {2 -> static class Node<E> {3 -> E item;4 -> Node<E> next;5 -> Node(E x) { item = x; }6 -> }7 -> private final int capacity;8 -> private final AtomicInteger count = new AtomicInteger();9 -> transient Node<E> head;
10 -> private transient Node<E> last;
11 -> private final ReentrantLock takeLock = new ReentrantLock();
12 -> private final Condition notEmpty = takeLock.newCondition();
13 -> private final ReentrantLock putLock = new ReentrantLock();
14 -> private final Condition notFull = putLock.newCondition();
15 ->
16 -> private void signalNotEmpty() {
17 -> final ReentrantLock takeLock = this.takeLock;
18 -> takeLock.lock();
19 -> try {
20 -> notEmpty.signal();
21 -> } finally {
22 -> takeLock.unlock();
23 -> }
24 -> }
25 ->
26 -> private void signalNotFull() {
27 -> final ReentrantLock putLock = this.putLock;
28 -> putLock.lock();
29 -> try {
30 -> notFull.signal();
31 -> } finally {
32 -> putLock.unlock();
33 -> }
34 -> }
35 ->
36 -> private void enqueue(Node<E> node) {
37 -> last = last.next = node;
38 -> }
39 ->
40 -> private E dequeue() {
41 -> Node<E> h = head;
42 -> Node<E> first = h.next;
43 -> h.next = h; // help GC
44 -> head = first;
45 -> E x = first.item;
46 -> first.item = null;
47 -> return x;
48 -> }
49 ->
50 -> public void put(E e) throws InterruptedException {
51 -> if (e == null) throw new NullPointerException();
52 -> int c = -1;
53 -> Node<E> node = new Node<E>(e);
54 -> final ReentrantLock putLock = this.putLock;
55 -> final AtomicInteger count = this.count;
56 -> putLock.lockInterruptibly();
57 -> try {
58 -> while (count.get() == capacity) {
59 -> notFull.await();
60 -> }
61 -> enqueue(node);
62 -> c = count.getAndIncrement();
63 -> if (c + 1 < capacity)
64 -> notFull.signal();
65 -> } finally {
66 -> putLock.unlock();
67 -> }
68 -> if (c == 0)
69 -> signalNotEmpty();
70 -> }
71 ->
72 -> public E take() throws InterruptedException {
73 -> E x;
74 -> int c = -1;
75 -> final AtomicInteger count = this.count;
76 -> final ReentrantLock takeLock = this.takeLock;
77 -> takeLock.lockInterruptibly();
78 -> try {
79 -> while (count.get() == 0) {
80 -> notEmpty.await();
81 -> }
82 -> x = dequeue();
83 -> c = count.getAndDecrement();
84 -> if (c > 1)
85 -> notEmpty.signal();
86 -> } finally {
87 -> takeLock.unlock();
88 -> }
89 -> if (c == capacity)
90 -> signalNotFull();
91 -> return x;
92 -> }
93 -> ...
94 -> }
LinkedBlockingQueue 链表结点结构
链表的结点结构由静态内部类 Node
定义(2 ~ 6行代码),Node
有2个域,其中 item
域用于保存队列的元素值,next
域指向链表中的下一个结点。
LinkedBlockingQueue 主要成员变量
LinkedBlockingQueue
主要成员变量都是双锁队列算法锁所必须的共享变量。
capacity
是队列的容量,一旦队列中的元素数量等于capacity
的值,表示队列已满;count
是一个全局的共享原子变量,用来实时统计队列中的元素数量,对count
的维护操作也是LinkedBlockingQueue
的顺序瓶颈的主要因素之一;head
链表头指针,指向队列的头结点,头结点是一个哨兵结点,item
域为null
;last
链表尾指针,指向队列尾结点,尾结点的next
域始终为null
;takeLock
出队锁,用于同步消费者线程;notEmpty
条件对象,用于维护等待队列非空的消费者线程,由takeLock
锁进行保护;putLock
入队锁,用于同步生产者线程;notFull
条件对象,用于维护等待队列释放空闲空间的生产者线程,由putLock
锁进行保护。
LinkedBlockingQueue 的不变式
LinkedBlockingQueue
具有以下一些不变式:
- 只有头结点
head
的item
域为null
,head
是哨兵结点,当队列为空时,head
和last
都会指向这个哨兵结点; - 只有尾结点
last
的next
域为null
。
put入队方法分析
LinkedBlockingQueue
主要通过 put
(第 50 行)方法将新的元素添加队尾,我们来分析一下 put
方法的实现:
- L51:对新加入的元素进行非空判断,因为
LinkedBlockingQueue
不允许空元素,空元素会破坏不变式1; - L56 和 L66:常规加锁解锁操作;
- L58 ~ 60:判断队列是否已满,如果队列已满,则挂起等待
notFull
条件,每次被唤醒后,需要重新判断; - L61:调用了常规入队操作
enqueue
(第 36 行),将尾结点的next
指针域指向新结点,同时让尾指针指向新加入的结点; - L62:更新
count
,结点数加1; - L63 ~ 64:新加入结点后,对当前的结点数进行判断,如果队列仍有空闲位置,尝试唤醒一个等待入队的生产者线程,这段代码是为了避免唤醒丢失问题的,因为
LinkedBlockingQueue
采取的按需唤醒的机制,当队列由满状态转化成为非满的状态时,只会唤醒一个生产者线程(第 89 ~ 90 行),而其他的生产者线程,会在此处根据队列的状态陆续被唤醒; - L68 ~ 69:判断新入队结点之后,队列是否由空转为非空状态,如果发生了状态变化,需要通知唤醒一个等待出队的消费者线程,
signalNotEmpty
方法(第 16 行),用于唤醒消费者线程,每次只会通过notFull.signal
唤醒一个消费者线程。
take入队方法分析
LinkedBlockingQueue
主要通过 take
(第 72 行)方法将队首元素出队的,我们来分析一下 take
方法的实现:
- L77 和 L87:常规加锁操作;
- L79 ~ 81:判断队列是否位空,如果为空,则挂起等待非空条件,每次被唤醒后,需要重新判断;
- L82:调用了常规出队操作
dequeue
(第 40 行),LinkedBlockingQueue
的头结点是哨兵结点,其item
为null
,而队列的第一个元素结点时哨兵结点的后继结点,所以每次出队,LinkedBlockingQueue
都是将第一个元素结点(就是需要出队的结点)转化成为新的哨兵结点,而将旧的哨兵结点移除;所以在dequeue
方法中,LinkedBlockingQueue
将head
指针域向前推进一个结点,使head
指针指向出队的结点,同时将该结点的item
域设置为null
,使之转化成为哨兵结点,旧的哨兵结点的next
指针域被指向其自身,这样做的目的是为了打断已移除结点的引用链,帮助 GC 回收这些结点; - L83:出队后更新原子变量
count
; - L84 ~ 85:判断队列中是否还有其他元素,如果有,尝试唤醒一个等待出队的消费者线程,这是为了避免唤醒丢失问题。
- L89 ~ 90:判断移除队首结点后,队列是否由满转化为非满状态,如果发生了状态变化,需要通知唤醒一个等待入队的生产者线程。
4. 总结
本文介绍了 LinkedBlockingQueue
的实现,LinkedBlockingQueue
是一种线程安全的阻塞式有界队列,采用了双锁 + 条件机制 + 按需唤醒 + 全局原子变量等同步机制,是一种高效的阻塞式并发队列,生产者线程和消费者线程分别竞争不同的锁,因此入队和出队操作可以并发执行,但是同一时刻,只允许两个线程(1个生产者和1个消费者)同时执行入队和出队,生产者线程和消费者线程各自之间仍然存在着锁竞争,并且所有线程都需要对全局的原子变量 count
进行维护,这些因素都使 LinkedBlockingQueue
存在着顺序瓶颈;LinkedBlockingQueue
的迭代器,只提供弱一致性的遍历,需要谨慎使用。