当前位置: 首页 > news >正文

并发阻塞队列原理分析

摘要

介绍常用阻塞队列区别及其实现原理,包括: ArrayBlockingQueue ,LinkedBlockingQueue,PriorityBlockingQueue,ConcurrentLinkedQueue,DelayQueue

ArrayBlockingQueue

	FIFO 有界缓冲区 ,一旦创建,其容量就无法更改。尝试向已满的队列中放入元素将导致操作阻塞;同样,尝试从空队列中取出元素也会阻塞。底层使用数组维护元素。
	支持公平和非公平策略 通过 ReentrantLock 实现,增删改查 都需要获取锁
	线程同步通过定义成员变量 ReentrantLock 实现,队列空和满线程阻塞通过分别定义 Condition notEmpty; Condition notFull; 实现
	实现接口 BlockingQueue 提供了中断响应|超时等待 插入|获取数据的接口,首先获取锁,如果插入时候队列满,notFull 上等待,获取数据同理在notEmpty上等待。
	维护 takeIndex  和  putIndex 分别记录读写位置,循环index 

LinkedBlockingQueue

	双锁队列 为提高读写效率使用插入和删除两把锁 。以让入队和出队操作在不同的锁控制下并行执行,提高了队列在并发环境下的操作效率。 底层使用链表维护数据,根据构造方法是否传输长度值决定,如果传入就是有界的,否则默认长度为 Integer.MAX_VALUE 可以认为是无界的
	相比 ArrayBlockingQueue 读写都使用同一把锁,该队列吞吐量会更高
	与这两把锁相关联,还会有相应的条件变量。   当队列已满时,入队操作会在putLock相关的条件变量上等待;当队列已空时,出队操作会在takeLock相关的条件变量上等待,直到队列状态满足操作继续的条件。
	共享计数器 记录队列中的元素数量 使用 AtomicInteger 实现
	插入数据
		从队尾插入,先获取 putLock,通过共享计数器判断是否已经满,如果是在notFull条件对象上等待,否则尾部插入元素,更新共享计数器,同时如果没有超过 capacity 唤醒等待在 notFull 上的线程(就算没有线程等待,调用signal方法也不影响,因为会判断条件队列头节点是否为null)
		释放锁
		如果插入前队列数量为0 ,获取 takeLock 锁,同时唤醒等待在 notEmpty 上的一个读线程(在读取数据的时候会判断当前队列数量是否大于1 ,然后唤醒 notEmpty 上等待的线程,所以每次读取的时候都会判断,然后唤醒),在塞入方法里面唤醒获取线程的操作相当于是fire 以下,后续唤醒其他等待读线程还是得由读线程唤醒
	删除数据
		和上面的流程相反
		从队列头取数据,获取锁 takeLock  ,通过共享计数器判断是否为0,如果是在 notEmpty 条件对象上等待,否则头部获取元素,更新共享计数器,同时如果取数之前元素个数大于1 唤醒等待在 notEmpty 上的线程
		释放锁
		如果删除元素前队列数量等于capacity,获取 putLock 锁,同时唤醒等待在 notFull 上的一个写线程
		

PriorityBlockingQueue

	一个无界阻塞队列,它使用与 PriorityQueue 类相同的排序规则,并提供阻塞式检索操作 对这个类的操作并不保证具有相同优先级的元素的顺序。如果你需要强制规定顺序,可以定义自定义类或比较器,通过使用辅助键来解决主优先级值相同的情况。
	底层是基于数组实现的二叉堆,可以认为是无界队列,因为数组会扩容。增删改查都需要获取 ReentrantLock 锁。获取元素时,如果为null,在条件对象 notEmpty 上无限期等待。插入元素后会signal条件对象上阻塞的线程
	
	扩容逻辑:插入元素时判断队列已有数量是否已经达到数组长度,如果是进行扩容 java.util.concurrent.PriorityBlockingQueue#tryGrow
	1 释放锁,防止阻塞读操作,维护了 volatile allocationSpinLock 用于标记有线程正在进行扩展,并且进行到了计算新数组长度的步骤,CAS更新变量
	2 成功CAS更新 allocationSpinLock 后,计算新数组长度(如果长度小于64 增加2 否则扩容为原来2倍),并为新数组分配内存(这个分配内存的步骤可以往后推迟,可能存在这样的情况,多个线程依次CAS成功,每个线程都会创建新数组申请内存)
	3 allocationSpinLock 赋值=0 
	4 获取锁,阻塞其他读写线程,double check 队列是否是之前的队列
	5 将原数组元素copy到新数组,因为扩容是在插入方法offer中调用,所以这里获取锁后不释放,调用该方法的地方是while循环。如果未扩容成功会再次进入扩容方法
	优化: 增加线程引用,用于记录可以扩容的线程。
	为了避免分配内存导致的内存溢出,释放锁后尝试一次CAS竞争 allocationSpinLock ,成功后更新线程引用为当前线程,继续获取锁,然后判断线程引用是否为当前线程,如果是,分配内存,将原来数组copy到新数组,完成后,释放线程引用并且释放 allocationSpinLock
	其他线程需要判断线程引用是否为当前线程,并且CAS更新 allocationSpinLock 成功后,也需要判断引用的数组是不是原来的数组。

ConcurrentLinkedQueue

敬请期待

DelayQueue

	抢占式获取任务,先获取任务的为leader其他线程阻塞等待,直到被leader唤醒 或者 新塞入超时等待时间更小的任务,被塞入线程唤醒。每次获取任务的时候都会重新计算剩余时间(参考	java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask#getDelay 截至时间减去当前时间)
	底层使用PriorityQueue维护元素,元素 实现 Delayed 接口。该接口继承自 Comparable<Delayed>,需要实现两个方法: 
		long getDelay(TimeUnit unit):返回元素剩余的延迟时间,单位由 unit 指定。当返回值小于等于 0 时,表示元素的延迟时间已到期。
		int compareTo(Delayed other):用于比较元素的延迟时间,确定元素在队列中的顺序。
	使用了类似领导者-追随者的设计模式,当有多个线程尝试从队列中获取元素时,为了避免所有线程都进行不必要的定时等待(即不断检查元素是否到期)
	在 DelayQueue 里,可以把正在等待队列头部元素到期的线程看作是 “领导者线程”。这个线程只需要等待下一个元素的延迟时间结束,而不是无差别地去轮询所有元素。
	其他线程则可以看作是 “追随者线程”,它们在没有成为等待队列头部元素的线程时,会进行不同程度的等待,直到有合适的时机去竞争成为等待头部元素的线程。 
	在 take()  中,如果领导者为空,当前线程升级为领导者,调用 available.awaitNanos(delay) 等待队列头部元素到期。其他线程调用  available.await() 进行等待。
	当领导者线程等待的元素到期并处理完后,它会将 leader 置为 null。此时,如果队列中还有元素,会调用 available.signal() 唤醒其他等待的线程,让它们竞争成为新的领导者线程。
	维护全局锁  ReentrantLock 对队列增删改查都需要先获取锁
	leader 和 follower 关系 可以参考 java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue 也是相同的模式了,阻塞队列 
	
	插入步骤:都是调用 offer方法
	1 获取锁
	2 将待插入元素塞入 PriorityQueue 
	3 获取 PriorityQueue 第一个元素,如果是插入的元素,代表插入元素超时剩余时间最小,清理 leader ,唤醒等待在 available 获取元素的线程,让线程认领当前第一个任务
	4 如果第一个元素不是插入的元素,直接返回。如果在这之前有其他线程尝试获取未超时的元素,一定会将自己作为leader ,并定时等待
	
	
	
	take步骤:死循环尝试从队列上获取元素,直到获取到元素
	1 获取锁,中断响应方式获取锁
	2 如果队列为空,在 available 上无限期等待,直到被唤醒
	3 如果第一个元素超时剩余时间小于等于0 直接返回该元素。
	4 如果第一个元素超时剩余时间不小于等于0 ,如果 leader 不为空,在 available 上无限期等待,直到被唤醒。如果leader 为空将当前线程升级为leader ,然后在 available 上定时等待 ,等待超时后,从队列从新获取元素
	5 期间出现异常或者正常获取了元素后,在finally代码会判断leader是否为空,并且队列不为空,如果是则唤醒 available 上等待的线程,所以每次读取的时候都会判断,然后唤available上等待的线程
	综上 : leader领导者线程本身执行从队列获取元素的任务,同时还负责唤醒阻塞在 available 上的线程,leader的赋值是在获取lock锁后,不存在线程竞争的问题

- 问题描述: leader 作用

  > 该成员变量是 leader/follwer 编发模式的简化版本需要使用,考虑场景: 队列不为空,多个读线程获取队列头元素,为了保证元素只能分配给一个读线程
  > 引入的leader,记录抢占头节点成功的线程,后续进入的线程无限期阻塞,直到leader超时唤醒,调用signal唤醒。
  > 当前生产者线程往队列塞入权值更新的元素,成为队列新的头节点,相当于是新一轮的线程抢占,所以插入时候清空了leader
  >
  >     public boolean offer(E e) {
  >         final ReentrantLock lock = this.lock;
  >         lock.lock();
  >         try {
  >             q.offer(e);
  >             if (q.peek() == e) {
  >                 leader = null;
  >     			// 存在这样的场景: 原始队列为空塞入元素e1 在队列头部,有线程抢占leader成功,后续的其他线程无限期阻塞,等待被唤醒抢占新的队列头节点
  >     			// 后续生产者往里面塞入新元素,权值最小,位于队列头部,这个时候通过signal唤醒阻塞的线程,因为 leader/follower模式保证
  >     			// 队列头部元素每次只能分配给一个线程。so 这里需要唤醒等待的线程(包括了上一次抢占成功的leader线程以及无限期等待的线程)
  >                 available.signal();
  >             }
  >             return true;
  >         } finally {
  >             lock.unlock();
  >         }
  >     }
  > // leader的作用其实就是多个线程争抢队列头节点时候,只能有一个线程抢占元素成功,其他的都无限期等待被leader线程唤醒
  > // 保证了同时多个线程通过当前Queue对象获取第一个元素时候,不会出现同一个元素分配给多个线程的场景。
  > public E take() throws InterruptedException {
  >         final ReentrantLock lock = this.lock;
  >         lock.lockInterruptibly();
  >         try {
  >             for (;;) {
  >                 E first = q.peek();
  >                 if (first == null)
  >                     available.await();
  >                 else {
  >                     long delay = first.getDelay(NANOSECONDS);
  >                     if (delay <= 0)
  >                         return q.poll();
  >                     first = null; // don't retain ref while waiting
  >                     if (leader != null)
  >                         available.await();
  >                     else {
  >                         Thread thisThread = Thread.currentThread();
  >                         leader = thisThread;
  >                         try {
  >                             available.awaitNanos(delay);
  >                         } finally {
  > 							// 这个是有权重的队列,新塞入的元素可能在队列的头部,这个时候 leader 会被set 为null
  > 							// remove 直接将头部的元素删除,在此之前,已有线程调用了该方法并阻塞在该元素上面,
  > 							// 后续生产者线程往里面塞入了一个元素,并且 set leader = null,同时另外的消费者来到这里,更新leader
  > 							// 所以原来的leader超时唤醒后需要判断leader是否为自己 可能为null 可能为其他线程
  >                             if (leader == thisThread)
  > 								// 这里赋值为null,是因为当前唤醒后,可能头节点还未超时,需要继续等待
  > 								// 如果不为set null,上面的 if (leader != null) 判断,会让线程无限期等待,超过任务的实际等待时长
  > 								// 另外,等待过程中抛异常,需要清空 leader ,这样允许其他线程继续争抢头节点
  > 								// 被唤醒有两种可能,一种是新塞入了更新的任务(重新获取任务),一种是原有任务等待超时(leader 赋值为null ,在当前线程释放锁后,让其他线程竞争头节点)
  >                                 leader = null;
  >                         }
  >                     }
  >                 }
  >             }
  >         } finally {
  >             if (leader == null && q.peek() != null)
  >                 available.signal();
  >             lock.unlock();
  >         }
  >     }	
  > leader/follower模式 相关文档
  > 参考文档: https://github.com/robbie-cao/note/blob/master/concurrency-pattern.md#leader--follower 
  > http://kircher-schwanninger.de/michael/publications/lf.pdf 

相关文章:

  • 用户自定义函数(UDF)开发与应用(二)
  • 快速幂运算
  • 阅读论文 smart pretrain,搭配MAE一起食用
  • Elasticsearch 性能优化:从原理到实践的全面指南
  • Elasticsearch入门指南(三) 之 高级篇
  • 2025蓝桥杯JavaB组真题解析
  • JavaScript性能优化(下)
  • Spring Boot集成Nacos
  • 【Web功能测试】Web商城搜索模块测试用例设计深度解析
  • 2025第十六届蓝桥杯PythonB组部分题解
  • [特殊字符] 第十七讲 | 随机森林:变量重要性识别与建模实战
  • 4月份到9月份看6本书第一天
  • 基于Flask-Login简单登录和权限控制实践
  • 句句翻译。
  • 平凡日子里的挣扎
  • 第7课:智能体安全与可靠性保障
  • 路由器开启QOS和UPNP的作用
  • AOSP14 Launcher3——手势模式下底部上滑的两种场景
  • Zabbix 简介+部署+对接Grafana(详细部署!!)
  • Redis-集群
  • 晋级四强!WTA1000罗马站:郑钦文2比0萨巴伦卡
  • 陕西一村民被冒名贷款40余万续:名下已无贷款,将继续追责
  • 国家林业和草原局原党组成员、副局长李春良接受审查调查
  • 央行等印发《关于金融支持广州南沙深化面向世界的粤港澳全面合作的意见》
  • 人民时评:莫让“假俗乱”讲解侵蚀“文博热”
  • “犍陀罗艺术与亚洲文明”在浙大对外展出