当前位置: 首页 > news >正文

Java集合详解:LinkedBlockingQueue

1. 简介

  java.util.concurrent.LinkedBlockingQueue 是一种线程安全的有界阻塞式FIFO链表队列,这种队列的容量是有限的,当队列已满时,生产者线程需要挂起等待,一旦消费者出队了元素,腾出了空间,就要唤醒生产者线程,当队列为空时,消费者线程需要挂起等待,一旦生产者入队了新元素,也需要唤醒消费者线程。LinkedBlockingQueue 采用了一种基于双锁队列的高效阻塞式同步算法,并且在双锁队列的基础上进行了改进,改进了双锁队列算法在 Java 平台下可能会出现的一些性能问题,下面我们先介绍一下双锁队列以及 LinkedBlockingQueue 的改进措施。

2. 双锁队列 & LinkedBlockingQueue 的改进

  双锁队列是一种高效的线程安全的阻塞式有界队列,使用链表实现队列,链表使用双指针域,头指针域指向队列的首结点,队列的首节点是一个哨兵结点,没有具体意义,哨兵结点的后继结点才是队列的第一个元素结点,尾指针域指向队列的尾结点,因此可以在 O(1) 时间到达尾结点,入队操作就是让队尾结点的后继指针域指向新结点,同时让尾指针指向新结点,出队操作就是让头指针域向前推进,指向头结点的后继。同步算法主要使用双锁 + 条件机制 + 原子变量等同步机制来实现;采用这些机制的原因如下:

  • 双锁:我们知道,FIFO队列的基本操作是入队和出队,这两种操作分别只在队列的队首和队尾进行,一般情况下是互不干扰的(一般情况就是队列不空或不满,当队列为空或者队列满时,是特殊情况),为了保证生产者和消费者线程可以同时访问队列,使用双锁,入队锁用于同步生产者线程,出队锁用于同步消费者线程,同一时间只允许有一个生产者线程入队和一个消费者线程出队。这种双锁机制可以避免入队者锁住出队者,从而避免了因为锁竞争而发生的消费者饥饿和队列数据堆积问题。
  • 条件机制:为了控制生产者线程和消费者线程的等待和唤醒,需要引入条件机制,通过条件机制的等待、通知机制来控制生产者和消费者在特殊情况下的协作和同步,因为条件机制可能会导致唤醒丢失问题,因此双锁队列采用每次唤醒所有等待线程的方式来避免唤醒丢失。
  • 原子变量:因为队列是有界的,每次入队操作都需要判断队列是否已满,所以需要准确的统计队列中的元素数量,因此使用原子变量来保存元素数量,原子变量可以保证统计值在多线程环境下的准确性。

唤醒丢失问题:当条件发生时,一个或多个线程并没有得到通知,导致这些线程一直等待下去。

双锁队列存在的问题主要有两个
  1. 频繁唤醒阻塞导致的性能损失问题:双锁队列使用了条件机制,采用总是通知所有等待条件的线程的方式来避免唤醒丢失问题,这种方法虽然简单,但其问题主要在于,如果某一个时刻一侧的线程非常不活跃,那么另一侧的所有线程就会被频繁的唤醒 + 阻塞,这样会浪费很多的性能。比如某一个时刻,队列为空,消费者线程都在等待,生产者很长时间才入队一个元素,入队后唤醒了所有消费者线程,这些消费者线程会同时竞争出队锁,结果只有一个线程能够消费到数据,其他线程在竞争到锁之后,发现队列为空,又会重新挂起等待,很明显,除了消费到数据的线程之外,唤醒其他线程是没有必要的。所以更好的方法就是按需唤醒,按需唤醒需要经过合理的设计,以避免唤醒丢失;
  2. 在 Java 平台下,导致的 GC 问题:由于链表队列的出队操作就是将头指针指向头结点的后继,使其向前推进,因此头指针经过的结点就是已出队的结点,这些已出队的结点会通过 next 指针域链接在一起,并且每一个结点都有被无限期使用的可能性,一旦发生某一个已出队节点被无限期使用的情况,这将会导致两个问题:A. 会导致该结点之后的所有出队结点都是 GC 可达的,从而无法被回收,导致内存泄漏,并且随着不断入队和出队操作,出队结点链会越来越长,最终发生内存溢出。B. 无法被回收的出队节点都会进入老年代,而新加入的节点会分配在新生代,这些节点通过 next 指针域相连接,会出现旧结点和新结点之间的跨代链接问题,目前的 GC 无法处理这种跨代链接问题,只会不断地重复执行 MajorGC。
LinkedBlockingQueue 的改进

  对于双锁队列的问题,LinkedBlockingQueue 采取了一些方法进行改进:

  1. 对于问题1:LinkedBlockingQueue 使用了按需唤醒的机制,即每一次条件发生变化时,只唤醒一个相关的等待线程,为了解决唤醒丢失问题,在每次入队或出队操作,都会进行一些一致性判断,判断是否还有未唤醒的线程,保证每一次只唤醒需要唤醒的线程,避免的无谓的唤醒和阻塞带来的性能损失;
  2. 对于问题2:LinkedBlockingQueue 在每次出队后,将已出队结点的 next 指针域指向其自身来打断出队结点的引用链就,帮助已出队结点被 GC 回收。

3. LinkedBlockingQueue 的实现

  下面我们通过 LinkedBlockingQueue 的源码,来分析一下它的实现原理,源代码 C1 就是 LinkedBlockingQueue 的实现,我们只列出了主要成员和入队、出队的方法,本文我们只分析 LinkedBlockingQueue 队列的基本操作入队和出队,其他方法有兴趣的同学可以去看 JDK 的源码。

C1:LinkedBlockingQueue 的主要成员和方法
 1 ->	public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {2 ->		static class Node<E> {3 ->	        E item;4 ->	        Node<E> next;5 ->	        Node(E x) { item = x; }6 ->	    }7 ->	    private final int capacity;8 ->	    private final AtomicInteger count = new AtomicInteger();9 ->	    transient Node<E> head;
10 ->	    private transient Node<E> last;
11 ->	    private final ReentrantLock takeLock = new ReentrantLock();
12 ->	    private final Condition notEmpty = takeLock.newCondition();
13 ->	    private final ReentrantLock putLock = new ReentrantLock();
14 ->	    private final Condition notFull = putLock.newCondition();
15 ->	
16 ->	    private void signalNotEmpty() {
17 ->	        final ReentrantLock takeLock = this.takeLock;
18 ->	        takeLock.lock();
19 ->	        try {
20 ->	            notEmpty.signal();
21 ->	        } finally {
22 ->	            takeLock.unlock();
23 ->	        }
24 ->	    }
25 ->	
26 ->	    private void signalNotFull() {
27 ->	        final ReentrantLock putLock = this.putLock;
28 ->	        putLock.lock();
29 ->	        try {
30 ->	            notFull.signal();
31 ->	        } finally {
32 ->	            putLock.unlock();
33 ->	        }
34 ->	    }
35 ->	
36 ->	    private void enqueue(Node<E> node) {
37 ->	        last = last.next = node;
38 ->	    }
39 ->	
40 ->	    private E dequeue() {
41 ->	        Node<E> h = head;
42 ->	        Node<E> first = h.next;
43 ->	        h.next = h; // help GC
44 ->	        head = first;
45 ->	        E x = first.item;
46 ->	        first.item = null;
47 ->	        return x;
48 ->	    }
49 ->	
50 ->		public void put(E e) throws InterruptedException {
51 ->	        if (e == null) throw new NullPointerException();
52 ->	        int c = -1;
53 ->	        Node<E> node = new Node<E>(e);
54 ->	        final ReentrantLock putLock = this.putLock;
55 ->	        final AtomicInteger count = this.count;
56 ->	        putLock.lockInterruptibly();
57 ->	        try {
58 ->	            while (count.get() == capacity) {
59 ->	                notFull.await();
60 ->	            }
61 ->	            enqueue(node);
62 ->	            c = count.getAndIncrement();
63 ->	            if (c + 1 < capacity)
64 ->	                notFull.signal();
65 ->	        } finally {
66 ->	            putLock.unlock();
67 ->	        }
68 ->	        if (c == 0)
69 ->	            signalNotEmpty();
70 ->	    }
71 ->		
72 ->		public E take() throws InterruptedException {
73 ->	        E x;
74 ->	        int c = -1;
75 ->	        final AtomicInteger count = this.count;
76 ->	        final ReentrantLock takeLock = this.takeLock;
77 ->	        takeLock.lockInterruptibly();
78 ->	        try {
79 ->	            while (count.get() == 0) {
80 ->	                notEmpty.await();
81 ->	            }
82 ->	            x = dequeue();
83 ->	            c = count.getAndDecrement();
84 ->	            if (c > 1)
85 ->	                notEmpty.signal();
86 ->	        } finally {
87 ->	            takeLock.unlock();
88 ->	        }
89 ->	        if (c == capacity)
90 ->	            signalNotFull();
91 ->	        return x;
92 ->	    }
93 ->		...
94 ->	}
LinkedBlockingQueue 链表结点结构

  链表的结点结构由静态内部类 Node 定义(2 ~ 6行代码),Node 有2个域,其中 item 域用于保存队列的元素值,next 域指向链表中的下一个结点。

LinkedBlockingQueue 主要成员变量

LinkedBlockingQueue 主要成员变量都是双锁队列算法锁所必须的共享变量。

  • capacity 是队列的容量,一旦队列中的元素数量等于 capacity 的值,表示队列已满;
  • count 是一个全局的共享原子变量,用来实时统计队列中的元素数量,对 count 的维护操作也是 LinkedBlockingQueue 的顺序瓶颈的主要因素之一;
  • head 链表头指针,指向队列的头结点,头结点是一个哨兵结点,item 域为 null
  • last 链表尾指针,指向队列尾结点,尾结点的 next 域始终为 null
  • takeLock 出队锁,用于同步消费者线程;
  • notEmpty 条件对象,用于维护等待队列非空的消费者线程,由 takeLock 锁进行保护;
  • putLock 入队锁,用于同步生产者线程;
  • notFull 条件对象,用于维护等待队列释放空闲空间的生产者线程,由 putLock 锁进行保护。
LinkedBlockingQueue 的不变式

  LinkedBlockingQueue 具有以下一些不变式:

  1. 只有头结点 headitem 域为 nullhead 是哨兵结点,当队列为空时,headlast 都会指向这个哨兵结点;
  2. 只有尾结点 lastnext 域为 null
put入队方法分析

  LinkedBlockingQueue 主要通过 put(第 50 行)方法将新的元素添加队尾,我们来分析一下 put 方法的实现:

  • L51:对新加入的元素进行非空判断,因为 LinkedBlockingQueue 不允许空元素,空元素会破坏不变式1;
  • L56L66:常规加锁解锁操作;
  • L58 ~ 60:判断队列是否已满,如果队列已满,则挂起等待 notFull 条件,每次被唤醒后,需要重新判断;
  • L61:调用了常规入队操作 enqueue(第 36 行),将尾结点的 next 指针域指向新结点,同时让尾指针指向新加入的结点;
  • L62:更新 count,结点数加1;
  • L63 ~ 64:新加入结点后,对当前的结点数进行判断,如果队列仍有空闲位置,尝试唤醒一个等待入队的生产者线程,这段代码是为了避免唤醒丢失问题的,因为 LinkedBlockingQueue 采取的按需唤醒的机制,当队列由满状态转化成为非满的状态时,只会唤醒一个生产者线程(第 89 ~ 90 行),而其他的生产者线程,会在此处根据队列的状态陆续被唤醒;
  • L68 ~ 69:判断新入队结点之后,队列是否由空转为非空状态,如果发生了状态变化,需要通知唤醒一个等待出队的消费者线程,signalNotEmpty方法(第 16 行),用于唤醒消费者线程,每次只会通过 notFull.signal 唤醒一个消费者线程。
take入队方法分析

  LinkedBlockingQueue 主要通过 take(第 72 行)方法将队首元素出队的,我们来分析一下 take 方法的实现:

  • L77L87:常规加锁操作;
  • L79 ~ 81:判断队列是否位空,如果为空,则挂起等待非空条件,每次被唤醒后,需要重新判断;
  • L82:调用了常规出队操作 dequeue(第 40 行),LinkedBlockingQueue 的头结点是哨兵结点,其 itemnull,而队列的第一个元素结点时哨兵结点的后继结点,所以每次出队,LinkedBlockingQueue 都是将第一个元素结点(就是需要出队的结点)转化成为新的哨兵结点,而将旧的哨兵结点移除;所以在 dequeue 方法中,LinkedBlockingQueuehead 指针域向前推进一个结点,使 head 指针指向出队的结点,同时将该结点的 item 域设置为 null,使之转化成为哨兵结点,旧的哨兵结点的 next 指针域被指向其自身,这样做的目的是为了打断已移除结点的引用链,帮助 GC 回收这些结点;
  • L83:出队后更新原子变量 count
  • L84 ~ 85:判断队列中是否还有其他元素,如果有,尝试唤醒一个等待出队的消费者线程,这是为了避免唤醒丢失问题。
  • L89 ~ 90:判断移除队首结点后,队列是否由满转化为非满状态,如果发生了状态变化,需要通知唤醒一个等待入队的生产者线程。

4. 总结

  本文介绍了 LinkedBlockingQueue 的实现,LinkedBlockingQueue 是一种线程安全的阻塞式有界队列,采用了双锁 + 条件机制 + 按需唤醒 + 全局原子变量等同步机制,是一种高效的阻塞式并发队列,生产者线程和消费者线程分别竞争不同的锁,因此入队和出队操作可以并发执行,但是同一时刻,只允许两个线程(1个生产者和1个消费者)同时执行入队和出队,生产者线程和消费者线程各自之间仍然存在着锁竞争,并且所有线程都需要对全局的原子变量 count 进行维护,这些因素都使 LinkedBlockingQueue 存在着顺序瓶颈;LinkedBlockingQueue 的迭代器,只提供弱一致性的遍历,需要谨慎使用。

http://www.dtcms.com/a/192545.html

相关文章:

  • 26考研 | 王道 | 计算机组成原理 | 一、计算机系统概述
  • Window下Jmeter多机压测方法
  • 128.在 Vue 3 中使用 OpenLayers 实现绘制矩形截图并保存地图区域
  • OpenShift AI - 用 ModelCar 构建容器化模型,提升模型弹性扩展速度
  • IP地址、端口、TCP介绍、socket介绍、程序中socket管理
  • Golang 设计哲学
  • 用Python代码绘制动态3D爱心效果
  • AI日报 · 2025年5月15日|GPT-4.1 登陆 ChatGPT
  • 实验-时序电路设计2-存储器阵列(数字逻辑)
  • 光谱相机的图像预处理技术
  • MYSQL基本命令
  • 70、微服务保姆教程(十三)Docker容器详细讲义
  • 人体肢体渲染-一步几个脚印从头设计数字生命——仙盟创梦IDE
  • 工业操作系统核心技术揭秘
  • Web GIS可视化地图框架Leaflet、OpenLayers、Mapbox、Cesium、ArcGis for JavaScript
  • 从基础到实习项目:C++后端开发学习指南
  • 数据结构 -- 顺序查找和折半查找
  • python的宫崎骏动漫电影网站管理系统
  • 【论信息系统项目的合同管理】
  • OpenResty Manager 介绍与部署(Docker部署)
  • 20250515让飞凌的OK3588-C的核心板在Linux R4下适配以太网RTL8211F-CG为4线百兆时的接线图
  • 微服务如何实现服务的高并发
  • JAVA单元测试、反射
  • 数据结构 -- 树形查找(一)二叉排序树
  • 乡村地区无人机医药配送路径规划与优化仿真
  • 当服务器出现宕机情况该怎么办?
  • 【Vue】CSS3实现关键帧动画
  • [C++面试] lambda面试点
  • IOS CSS3 right transformX 动画卡顿 回弹
  • Flink 运维监控与指标采集实战(Prometheus + Grafana 全流程)