JUC并发工具
#摘要
介绍基于AQS 实现的并发工具类 CountDownLatch Semaphore 源码分析
CountDownLatch 源码
CountDownLatch 是 Java 并发包 java.util.concurrent 中的一个同步工具类,它允许一个或多个线程等待其他线程完成操作。其核心原理是使用一个计数器,初始值为需要等待的操作数量,当每个操作完成时,计数器的值减 1,当计数器的值变为 0 时,等待的线程将被唤醒继续执行。
1 类接口
允许一个或多个线程等待,直到其他线程中正在执行的一组操作完成。
会使用给定的计数进行初始化。await 方法会阻塞调用线程,直到由于调用 countDown 方法使得当前计数达到零。此后,所有等待的线程都会被释放,并且任何后续对 await 的调用都会立即返回。
计数无法重置
使用场景:
1 启动信号 初始计数为 1 的 CountDownLatch 可作为一个简单的开关门闩或闸门:所有调用 await 的线程会在闸门处等待,直到有一个线程调用 countDown 打开闸门。
2 完成信号 初始计数为 N 的 CountDownLatch 可用于让一个线程等待,直到 N 个线程完成了某个操作,或者某个操作被执行了 N 次。
调用 countDown 扣减计数之后不会阻塞,
2 成员变量及其对应的数
内部类 Sync 继承AQS 实现线程同步,state为当前计数,
实现方法 tryAcquireShared(实际判断state是否为0 是,返回1 否则返回-1 ) 和 tryReleaseShared 用于控制state的更新
java.util.concurrent.CountDownLatch#await()
阻塞直到被中断或者其他线程更新计数为0后唤醒当前线程
实际上调用的是 AQS的 java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
AQS的方法会调用子类 Sync 的 tryAcquireShared 方法,在方法里判断当前state是否为0,如果不是0阻塞当前线程
java.util.concurrent.CountDownLatch#countDown
实际上调用的是 java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
会调用子类的 tryReleaseShared ,在该方法里面 CAS+死循环更新state,如果更新后state== 0 返回ture代表释放成功,AQS会唤醒同步队列上阻塞的线程、
阻塞和唤醒原理其实是使用了AQS 共享锁进行线程同步,执行countDown方法的线程是释放锁,执行await方法的线程是尝试获取锁,区别在于获取锁成功条件是当前state是否为0
之所以为共享锁,是因为共享锁获取锁后会执行 setHeadAndPropergate方法唤醒后续阻塞节点,使用排他锁,释放锁后唤醒第一个阻塞节点后,阻塞节点成功获取锁,后直接返回没有唤醒后继节点的操作
Semaphore 源码
1 类接口
内部类 Sync extends AbstractQueuedSynchronizer 复写 tryReleaseShared ,有子类分别实现公平和非公平逻辑、
NonfairSync
tryAcquireShared
nonfairTryAcquireShared for死循环+CAS更新state 如果小于0 代表获取信号量失败,如果CAS成功返回剩余信号量。如果CAS失败,继续循环尝试获取直到成功
FairSync
tryAcquireShared
相比非公平锁,每次循环多了 hasQueuedPredecessors 判断同步队列head后的节点是否是当前线程如果不是 tryAcquireShared 方法返回-1代表获取信号量失败,当前 tryAcquireShared 方法返回-1 获取锁失败。否则CAS更新stat
该类的所有方法都代理给内部类 sync实现
state被定义为信号量 ,该类支持公平和非公平逻辑
作用-> 接口暴露的方法源码
获取信号量方法(两个版本 -- 单独信号量操作和批量信号量操作)
acquire 中断响应 获取锁
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
acquireUninterruptibly 不响应中断
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
tryAcquire() 子类的tryAcquire方法实现,忽略公平策略,直接尝试获取信号量
忽略公平策略,直接尝试获取信号量,java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared
tryAcquire(long timeout, TimeUnit unit) 支持中断,超时响应
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireSharedNanos
获取信号量的线程阻塞在同步队列上,类型都是SHARED的,方便在 setHeadAndPropergate 中被唤醒
释放信号量(两个版本 -- 单独信号量操作和批量信号量操作)
release
java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
java.util.concurrent.Semaphore.Sync#tryReleaseShared 增加state返回true ,父类AQS会调用 doReleaseShared 唤醒同步队列的阻塞线程,阻塞线程唤醒后会获取锁,同时调用 setHeadAndPropergate 继续唤醒后续阻塞SHARED类型的阻塞节点