并发阻塞队列原理分析
摘要
介绍常用阻塞队列区别及其实现原理,包括: ArrayBlockingQueue ,LinkedBlockingQueue,PriorityBlockingQueue,ConcurrentLinkedQueue,DelayQueue
ArrayBlockingQueue
FIFO 有界缓冲区 ,一旦创建,其容量就无法更改。尝试向已满的队列中放入元素将导致操作阻塞;同样,尝试从空队列中取出元素也会阻塞。底层使用数组维护元素。
支持公平和非公平策略 通过 ReentrantLock 实现,增删改查 都需要获取锁
线程同步通过定义成员变量 ReentrantLock 实现,队列空和满线程阻塞通过分别定义 Condition notEmpty; Condition notFull; 实现
实现接口 BlockingQueue 提供了中断响应|超时等待 插入|获取数据的接口,首先获取锁,如果插入时候队列满,notFull 上等待,获取数据同理在notEmpty上等待。
维护 takeIndex 和 putIndex 分别记录读写位置,循环index
LinkedBlockingQueue
双锁队列 为提高读写效率使用插入和删除两把锁 。以让入队和出队操作在不同的锁控制下并行执行,提高了队列在并发环境下的操作效率。 底层使用链表维护数据,根据构造方法是否传输长度值决定,如果传入就是有界的,否则默认长度为 Integer.MAX_VALUE 可以认为是无界的
相比 ArrayBlockingQueue 读写都使用同一把锁,该队列吞吐量会更高
与这两把锁相关联,还会有相应的条件变量。 当队列已满时,入队操作会在putLock相关的条件变量上等待;当队列已空时,出队操作会在takeLock相关的条件变量上等待,直到队列状态满足操作继续的条件。
共享计数器 记录队列中的元素数量 使用 AtomicInteger 实现
插入数据
从队尾插入,先获取 putLock,通过共享计数器判断是否已经满,如果是在notFull条件对象上等待,否则尾部插入元素,更新共享计数器,同时如果没有超过 capacity 唤醒等待在 notFull 上的线程(就算没有线程等待,调用signal方法也不影响,因为会判断条件队列头节点是否为null)
释放锁
如果插入前队列数量为0 ,获取 takeLock 锁,同时唤醒等待在 notEmpty 上的一个读线程(在读取数据的时候会判断当前队列数量是否大于1 ,然后唤醒 notEmpty 上等待的线程,所以每次读取的时候都会判断,然后唤醒),在塞入方法里面唤醒获取线程的操作相当于是fire 以下,后续唤醒其他等待读线程还是得由读线程唤醒
删除数据
和上面的流程相反
从队列头取数据,获取锁 takeLock ,通过共享计数器判断是否为0,如果是在 notEmpty 条件对象上等待,否则头部获取元素,更新共享计数器,同时如果取数之前元素个数大于1 唤醒等待在 notEmpty 上的线程
释放锁
如果删除元素前队列数量等于capacity,获取 putLock 锁,同时唤醒等待在 notFull 上的一个写线程
PriorityBlockingQueue
一个无界阻塞队列,它使用与 PriorityQueue 类相同的排序规则,并提供阻塞式检索操作 对这个类的操作并不保证具有相同优先级的元素的顺序。如果你需要强制规定顺序,可以定义自定义类或比较器,通过使用辅助键来解决主优先级值相同的情况。
底层是基于数组实现的二叉堆,可以认为是无界队列,因为数组会扩容。增删改查都需要获取 ReentrantLock 锁。获取元素时,如果为null,在条件对象 notEmpty 上无限期等待。插入元素后会signal条件对象上阻塞的线程
扩容逻辑:插入元素时判断队列已有数量是否已经达到数组长度,如果是进行扩容 java.util.concurrent.PriorityBlockingQueue#tryGrow
1 释放锁,防止阻塞读操作,维护了 volatile allocationSpinLock 用于标记有线程正在进行扩展,并且进行到了计算新数组长度的步骤,CAS更新变量
2 成功CAS更新 allocationSpinLock 后,计算新数组长度(如果长度小于64 增加2 否则扩容为原来2倍),并为新数组分配内存(这个分配内存的步骤可以往后推迟,可能存在这样的情况,多个线程依次CAS成功,每个线程都会创建新数组申请内存)
3 allocationSpinLock 赋值=0
4 获取锁,阻塞其他读写线程,double check 队列是否是之前的队列
5 将原数组元素copy到新数组,因为扩容是在插入方法offer中调用,所以这里获取锁后不释放,调用该方法的地方是while循环。如果未扩容成功会再次进入扩容方法
优化: 增加线程引用,用于记录可以扩容的线程。
为了避免分配内存导致的内存溢出,释放锁后尝试一次CAS竞争 allocationSpinLock ,成功后更新线程引用为当前线程,继续获取锁,然后判断线程引用是否为当前线程,如果是,分配内存,将原来数组copy到新数组,完成后,释放线程引用并且释放 allocationSpinLock
其他线程需要判断线程引用是否为当前线程,并且CAS更新 allocationSpinLock 成功后,也需要判断引用的数组是不是原来的数组。
ConcurrentLinkedQueue
敬请期待
DelayQueue
抢占式获取任务,先获取任务的为leader其他线程阻塞等待,直到被leader唤醒 或者 新塞入超时等待时间更小的任务,被塞入线程唤醒。每次获取任务的时候都会重新计算剩余时间(参考 java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask#getDelay 截至时间减去当前时间)
底层使用PriorityQueue维护元素,元素 实现 Delayed 接口。该接口继承自 Comparable<Delayed>,需要实现两个方法:
long getDelay(TimeUnit unit):返回元素剩余的延迟时间,单位由 unit 指定。当返回值小于等于 0 时,表示元素的延迟时间已到期。
int compareTo(Delayed other):用于比较元素的延迟时间,确定元素在队列中的顺序。
使用了类似领导者-追随者的设计模式,当有多个线程尝试从队列中获取元素时,为了避免所有线程都进行不必要的定时等待(即不断检查元素是否到期)
在 DelayQueue 里,可以把正在等待队列头部元素到期的线程看作是 “领导者线程”。这个线程只需要等待下一个元素的延迟时间结束,而不是无差别地去轮询所有元素。
其他线程则可以看作是 “追随者线程”,它们在没有成为等待队列头部元素的线程时,会进行不同程度的等待,直到有合适的时机去竞争成为等待头部元素的线程。
在 take() 中,如果领导者为空,当前线程升级为领导者,调用 available.awaitNanos(delay) 等待队列头部元素到期。其他线程调用 available.await() 进行等待。
当领导者线程等待的元素到期并处理完后,它会将 leader 置为 null。此时,如果队列中还有元素,会调用 available.signal() 唤醒其他等待的线程,让它们竞争成为新的领导者线程。
维护全局锁 ReentrantLock 对队列增删改查都需要先获取锁
leader 和 follower 关系 可以参考 java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue 也是相同的模式了,阻塞队列
插入步骤:都是调用 offer方法
1 获取锁
2 将待插入元素塞入 PriorityQueue
3 获取 PriorityQueue 第一个元素,如果是插入的元素,代表插入元素超时剩余时间最小,清理 leader ,唤醒等待在 available 获取元素的线程,让线程认领当前第一个任务
4 如果第一个元素不是插入的元素,直接返回。如果在这之前有其他线程尝试获取未超时的元素,一定会将自己作为leader ,并定时等待
take步骤:死循环尝试从队列上获取元素,直到获取到元素
1 获取锁,中断响应方式获取锁
2 如果队列为空,在 available 上无限期等待,直到被唤醒
3 如果第一个元素超时剩余时间小于等于0 直接返回该元素。
4 如果第一个元素超时剩余时间不小于等于0 ,如果 leader 不为空,在 available 上无限期等待,直到被唤醒。如果leader 为空将当前线程升级为leader ,然后在 available 上定时等待 ,等待超时后,从队列从新获取元素
5 期间出现异常或者正常获取了元素后,在finally代码会判断leader是否为空,并且队列不为空,如果是则唤醒 available 上等待的线程,所以每次读取的时候都会判断,然后唤available上等待的线程
综上 : leader领导者线程本身执行从队列获取元素的任务,同时还负责唤醒阻塞在 available 上的线程,leader的赋值是在获取lock锁后,不存在线程竞争的问题
- 问题描述: leader 作用
> 该成员变量是 leader/follwer 编发模式的简化版本需要使用,考虑场景: 队列不为空,多个读线程获取队列头元素,为了保证元素只能分配给一个读线程
> 引入的leader,记录抢占头节点成功的线程,后续进入的线程无限期阻塞,直到leader超时唤醒,调用signal唤醒。
> 当前生产者线程往队列塞入权值更新的元素,成为队列新的头节点,相当于是新一轮的线程抢占,所以插入时候清空了leader
>
> public boolean offer(E e) {
> final ReentrantLock lock = this.lock;
> lock.lock();
> try {
> q.offer(e);
> if (q.peek() == e) {
> leader = null;
> // 存在这样的场景: 原始队列为空塞入元素e1 在队列头部,有线程抢占leader成功,后续的其他线程无限期阻塞,等待被唤醒抢占新的队列头节点
> // 后续生产者往里面塞入新元素,权值最小,位于队列头部,这个时候通过signal唤醒阻塞的线程,因为 leader/follower模式保证
> // 队列头部元素每次只能分配给一个线程。so 这里需要唤醒等待的线程(包括了上一次抢占成功的leader线程以及无限期等待的线程)
> available.signal();
> }
> return true;
> } finally {
> lock.unlock();
> }
> }
> // leader的作用其实就是多个线程争抢队列头节点时候,只能有一个线程抢占元素成功,其他的都无限期等待被leader线程唤醒
> // 保证了同时多个线程通过当前Queue对象获取第一个元素时候,不会出现同一个元素分配给多个线程的场景。
> public E take() throws InterruptedException {
> final ReentrantLock lock = this.lock;
> lock.lockInterruptibly();
> try {
> for (;;) {
> E first = q.peek();
> if (first == null)
> available.await();
> else {
> long delay = first.getDelay(NANOSECONDS);
> if (delay <= 0)
> return q.poll();
> first = null; // don't retain ref while waiting
> if (leader != null)
> available.await();
> else {
> Thread thisThread = Thread.currentThread();
> leader = thisThread;
> try {
> available.awaitNanos(delay);
> } finally {
> // 这个是有权重的队列,新塞入的元素可能在队列的头部,这个时候 leader 会被set 为null
> // remove 直接将头部的元素删除,在此之前,已有线程调用了该方法并阻塞在该元素上面,
> // 后续生产者线程往里面塞入了一个元素,并且 set leader = null,同时另外的消费者来到这里,更新leader
> // 所以原来的leader超时唤醒后需要判断leader是否为自己 可能为null 可能为其他线程
> if (leader == thisThread)
> // 这里赋值为null,是因为当前唤醒后,可能头节点还未超时,需要继续等待
> // 如果不为set null,上面的 if (leader != null) 判断,会让线程无限期等待,超过任务的实际等待时长
> // 另外,等待过程中抛异常,需要清空 leader ,这样允许其他线程继续争抢头节点
> // 被唤醒有两种可能,一种是新塞入了更新的任务(重新获取任务),一种是原有任务等待超时(leader 赋值为null ,在当前线程释放锁后,让其他线程竞争头节点)
> leader = null;
> }
> }
> }
> }
> } finally {
> if (leader == null && q.peek() != null)
> available.signal();
> lock.unlock();
> }
> }
> leader/follower模式 相关文档
> 参考文档: https://github.com/robbie-cao/note/blob/master/concurrency-pattern.md#leader--follower
> http://kircher-schwanninger.de/michael/publications/lf.pdf