【Java并发编程】 AQS的实现类ReentrantLock的底层工作流程详细讲解
ReentrantLock
ReentrantLock
是 Java 并发包中提供的一个可重入的互斥锁,它提供了与使用 synchronized 方法和语句相同的基本行为和语义,但具有更强的功能。ReentrantLock
的实现基于 AQS(AbstractQueuedSynchronizer),通过维护一个同步状态 state
来控制加锁和解锁的过程。
工作原理
1. 获取锁
当我们调用 lock()
方法时,实际上是调用了 NonfairSync
或 FairSync
类中的 lock()
方法,这两个类都继承自 Sync
类,而 Sync
又是 ReentrantLock
的内部静态抽象类,并且直接继承了 AQS
。
- 非公平锁(NonfairSync):
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
这里首先尝试通过 CAS 操作设置同步状态 state
为 1。如果成功,则当前线程获得锁;如果失败(说明已经有其他线程持有了锁),则调用 acquire(1)
方法加入到等待队列中。
- 公平锁(FairSync):
final void lock() {
acquire(1);
}
对于公平锁而言,不会尝试“插队”,而是直接调用 acquire(1)
方法进入等待队列,然后按照 FIFO 原则等待获取锁。
acquire
ReentrantLock默认是非公平锁,无论哪种情况,当需要排队等待时,都会执行 AQS
中的 acquire
方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中,tryAcquire(arg)
尝试以原子方式获取锁,如果失败,则将当前线程封装成节点并添加到同步队列尾部 (addWaiter
),然后阻塞当前线程直到获取锁 (acquireQueued
)。
addWaiter
addWaiter
方法用于将当前线程封装成一个节点(Node),并将其添加到同步队列的尾部。同步队列是一个双向链表结构,每个节点代表一个等待获取锁的线程。
private Node addWaiter(Node mode) {
// 创建一个新节点,mode 通常为 Node.EXCLUSIVE 表示独占模式
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速地将节点添加到队列末尾
Node pred = tail;
// 判断同步队列的尾节点是否为空
if (pred != null) {
node.prev = pred; // 不为空则把该Node节点的前指针设置为该尾节点
if (compareAndSetTail(pred, node)) { // 通过cas操作把同步队列的尾节点指向该节点
pred.next = node;
return node;
}
}
// 如果队列为空或者CAS操作失败,则进入enq方法进行重试
enq(node);
return node;
}
这里首先尝试通过 CAS 操作直接将新创建的节点插入到队列尾部。如果队列当前不为空且 CAS 成功,则新节点成功加入队列;否则,调用 enq
方法进行重试直到成功为止。enq
方法会自旋直至成功将节点添加到队列尾部,并初始化队列(如果队列尚未初始化)。
enq
enq和addWaiter方法类似,就不再讲解,jdk9以后合并成了一个方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
一旦节点被成功添加到同步队列中,接下来就会调用 acquireQueued
方法让该节点对应的线程等待获取锁。这个方法的核心是循环检查前驱节点的状态,并尝试获取锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱是头结点,则尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否应该阻塞当前线程,并挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 首先检查当前节点的前驱是否是头节点(即表示当前节点是最接近获取锁的位置)。如果是,则再次尝试获取锁 (
tryAcquire(arg)
)。 - 如果获取失败,或者前驱不是头节点,则根据前驱节点的状态决定是否应该阻塞当前线程 (
shouldParkAfterFailedAcquire
)。如果可以安全地阻塞,则调用parkAndCheckInterrupt
方法挂起当前线程,直到被中断或被前驱节点唤醒。 - 在循环过程中,如果检测到线程被中断,则设置
interrupted
标志位。最终返回是否发生过中断的信息。
通过 addWaiter
和 acquireQueued
方法,ReentrantLock
能够有效地管理多个线程对锁的竞争,确保了即使在高并发环境下也能保证线程的安全性和程序的正确性。同时,这些机制也支持锁的公平性控制,使得线程能够按照正确的顺序获取锁。
2. 释放锁
释放锁是通过调用 unlock()
方法来完成的,这个方法会减少 state
的值,如果 state
减少到 0,则完全释放锁。
public void unlock() {
sync.release(1);
}
这里的 release
方法定义在 AQS
中:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
方法会减少 state
的值,并判断是否已经完全释放锁(即 state == 0
)。如果是,则唤醒下一个等待的线程 (unparkSuccessor
)。
总结
ReentrantLock 主要围绕着 state 变量和同步队列这两个核心元素来实现对锁的管理和线程间的同步协调。通过 state 来跟踪锁的状态变化,确保了锁的正确性和可重入性;而同步队列则保证了在高并发情况下多个线程能够有序地竞争锁资源,避免了死锁和资源争用问题。这种设计使得 ReentrantLock 不仅功能强大而且高效可靠,适用于各种需要精确控制并发访问的场景。
AQS是什么,以及AQS核心原理