ReentrantLock源码和AQS
AQS
抽象队列同步器
方法成员变量
-
CANCELLED 表示线程的等待状态被取消(通常是因为超时或者被中断),并且不会再参与锁竞争。被标记为 CANCELLED 的节点不会被唤醒,它的前驱和后继节点需要跳过它。
-
SIGNAL: 表示当前节点的后继节点需要被唤醒。当一个节点(假设是 A)被阻塞时,A 会检查它的前驱节点 B 是否是
SIGNAL
状态:特征:SIGNAL
状态的节点依赖于前驱节点的唤醒逻辑。-
如果是,则意味着当 B 释放锁时,A 需要被唤醒。
-
如果不是,A 可能需要自己主动挂起或者进行状态调整。
-
-
CONDITION:表示节点当前在
Condition
队列中等待。ConditionObject
机制允许线程调用await()
进入等待状态,并等待signal()
唤醒。只有当Condition.signal()
被调用时,CONDITION
状态的节点才会被移动到 同步队列 进行锁竞争。 -
PROPAGATE :表示需要无条件地传播唤醒操作,通常用于 共享模式(如
ReentrantReadWriteLock
的读锁)。这个状态会使得释放锁的操作不仅唤醒下一个节点,而且会继续向后传播,确保所有等待线程都能有机会获得锁。多线程同步工具(如CountDownLatch
)可能会使用PROPAGATE
进行信号传播。 -
nextWaiter:指向同一个
Condition
队列中的下一个等待线程。用于ConditionObject
:当线程调用Condition.await()
进入等待状态时,它会被添加到 条件队列(condition queue),这些等待线程通过nextWaiter
进行链接。当Condition.signal()
被调用时,一个线程会从condition queue
移动到 同步队列(sync queue) 继续争夺锁。
public abstract class AbstractQueueSynchronizer{
static final class Node{ //包装的线程节点
static final Node SHARED = new Node();
static final Node EXCELUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITTION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; // 上面的值的其中一个。 0初始状态,表示线程没有特殊状态。
volatile Node pre; // 当前节点的前驱 同步阻塞队列
volatile Node next; // 当前节点的后继 同步阻塞队列
volatile Thread thread; // 被包装的线程
Node nextWaiter; // 还有一个waitSet队列,单项队列
}
// 同步双向队列
private transient volatile Node head;
private transient volatile Node tail;
// 相当于锁,线程标识状态
private volatile int state;
// CAS形式状态修改上锁
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
/**
AQS 是一个通用的同步框架,但它本身不定义具体的同步逻辑,例如如何加锁、如何释放锁。
AQS 只提供了一套线程排队和等待的机制,而具体的加锁/释放逻辑由子类实现。
如果不重写,直接调用这些方法就会抛出异常,提醒开发者必须自己实现逻辑。
**/
// 尝试以独占模式(Exclusive Mode)获取同步状态
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放独占模式的同步状态。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 尝试以共享模式(Shared Mode)获取同步状态。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放共享模式的同步状态。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 判断当前锁是否被当前线程独占。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
}
ReentrantLock
public class ReentrantLock implements Lock{
Sync synx; //
abstract static class Sync extends AbstractQueuedSynchronizer{
// 重写了5个方法
}
}
lock()
// ReentrantLock中
public void lock() {
sync.acquire(1); // AQS本身已经实现了acquire
}
// tryAcquire实现了公平和非公平
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/**
hasQueuedPredecessors遍历同步队列
该线程没有前驱节点并且CAS修改为1acquires,获得锁,就会设置当前线程
exclusiveOwnerThread=current 返回锁,获取成功
**/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 可重入锁
int nextc = c + acquires; // 重入次数加1
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 已经获取锁就用CAS
return true;
}
return false;
}
// AQS中 tryAcquire在ReentrantLock实现逻辑
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
public final void acquire(int arg) {
/**
获取锁成功退出
获取锁失败将当前先线程封装为node放入到同步队列尾巴上(自旋),返回这个结点node
acquireQueued(node, arg))
调用 tryAcquire 尝试获取锁,如果成功则直接返回。
如果 tryAcquire 失败,则调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 进行排队等待。如果在等待过程中被中断,则调用 selfInterrupt 恢复中断状态。
**/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// =================================================================================================
private Node addWaiter(Node mode) {
Node node = new Node(mode); // mode当前线程是什么时候被封装的呢?
for (;;) {
Node oldTail = tail; // 保存为此时状态的尾节点,多线程Tail可能后面就不一样了,循环去更新oldTail
if (oldTail != null) {
node.setPrevRelaxed(oldTail); // node前驱指向尾巴节点
if (compareAndSetTail(oldTail, node)) { // cas,Node放到尾节点,成功
oldTail.next = node; // 之前的尾巴节点指向node,双向
return node;
}
} else {
initializeSyncQueue(); // 为节点为空,就初始化
}
}
}
// mode当前线程是什么时候被封装的呢?
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());// 这里被封装
}
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))// new一个空结点。head设置为它,初始为null,如果其他的已经设置了就略过
tail = h;
}
// ========================================================================================================================
final boolean acquireQueued(final Node node, int arg) {
/**
初始化一个标志位 interrupted,用于记录当前线程是否被中断。
进入无限循环,检查当前节点的前驱节点是否为头节点,并尝试获取锁。
如果成功获取锁,则将当前节点设置为新的头节点,并返回是否被中断的状态。
如果获取锁失败,则判断是否需要阻塞当前线程,如果需要则阻塞并更新中断状态。
捕获异常并处理,取消获取锁的操作。
目的:阻塞当前线程,但需要把前面的设置为SINGAL,才能放心的去阻塞。
**/
/**
flowchart TD
A[开始] --> B{前驱是头节点?}
B -->|Yes| C{尝试获取锁}
C -->|成功| D[设置当前节点为头节点]
D --> E[返回是否被中断]
B -->|No| F{是否需要阻塞?}
F -->|Yes| G[阻塞并检查中断]
G --> H[更新中断状态]
H --> B
F -->|No| B
A --> I{捕获异常}
I --> J[取消获取锁]
J --> K{如果被中断,中断自身}
K --> L[抛出异常]
**/
boolean interrupted = false; // 目前不可倍中断
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
private void setHead(Node node) { //
head = node;
node.thread = null;
node.prev = null;
}
// 在p == head && tryAcquire(arg)后是否应该park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/**
获取前驱节点的状态。
如果前驱节点状态为 SIGNAL,返回 true,表示当前节点可以安全地阻塞。
如果前驱节点状态为取消状态(大于0),则跳过所有已取消的前驱节点,并重新连接链表。
否则,将前驱节点状态设置为 SIGNAL,但不立即阻塞,返回 false
**/
// pred可能就是头节点或者有线程包装的结点
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 会存在内存泄漏吗a<=>b<=>c<=>d
// a<===>d
// a<-b<=>c->d
// ????????
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL); // 设置为SIGNAL,表示唤醒下一个结点
}
return false;
}
private final boolean parkAndCheckInterrupt() {
/**
1. LockSupport.park(this);
LockSupport.park(Object blocker) 让当前线程挂起(阻塞),直到被其他线程显式唤醒。
this 参数(即当前对象)用于调试信息,可以帮助追踪哪个对象导致线程被挂起。
线程在 park() 后会进入等待状态,直到满足以下任一条件:
其他线程调用 LockSupport.unpark(targetThread) 释放它。
线程被中断(即 Thread.interrupt())。
可能由于虚假唤醒(Spurious Wakeup) 自动返回。
2. return Thread.interrupted();
Thread.interrupted() 用于检查并清除当前线程的中断标志:
如果线程在 park() 期间被中断,该方法返回 true,表示线程曾被中断过。
同时,这个方法会清除当前线程的中断标志(interrupt status)。
如果没有中断,返回 false。
**/
LockSupport.park(this);
return Thread.interrupted();
}
unlock()
// ReentrantLock中
public void unlock() {
sync.release(1);
}
// ===========================================================================
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
/** 解锁一定是先lock在unlock, lock成功后会setExclusiveOwnerThread
判断这两个是否相等
**/
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// AQS中
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 头节点为空,就没有,检查头节点是否Signal,不是就没必要唤醒后面的
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// ReenLock中不会出现下述情况
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
手动实现一个
package zy.sats.java.juc;
import java.lang.ref.Reference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* @Description: TODO
* @Author: sats@jz
* @Date: 2025/2/17 14:54
**/
public class AQSLinkedLock implements MyLock{
private final boolean fair;
private final AtomicBoolean lock = new AtomicBoolean(false);
private final AtomicReference<Node> head = new AtomicReference<>(new Node());
private final AtomicReference<Node> tail = new AtomicReference<>(head.get());
private Thread owner = null;
class Node{
Node pre;
Node next;
Thread thread;
public Node(Thread thread) {
this.thread = thread;
}
public Node() {
}
}
public AQSLinkedLock(Boolean fair) {
this.fair = fair;
}
@Override
public void lock() {
// 非公平 先尝试直接拿锁, 拿不到就将线程包装为节点加入到链表尾部
if(!fair && lock.compareAndSet(false,true)){
System.out.println(Thread.currentThread().getName()+" get lock");
owner = Thread.currentThread();
return;
}
// 获取当前线程,包装为Node节点
Node current = new Node(Thread.currentThread());
// 尝试将当前的节点放到链表尾部, 链表尾部进行CAS替换直到成功。
// 将原来的链表尾部节点的next指向当前节点, 当前节点的前驱指向头信息。
while(true){
// 每次重新获取链表尾部的引用,因为多线程情况下可能会变化
Node currentTail = tail.get();
if(tail.compareAndSet(currentTail, current)){
current.pre = currentTail;
currentTail.next = current;
System.out.println(Thread.currentThread().getName()+"加入链表尾部");
break;
}
}
// 加入之后, 就LockSupport.park(), 但是需要注意,唤醒是个什么逻辑
// 唤醒之后, 判断当前的线程的前驱是否是head节点,只有这样才是下一个释放,逻辑正确。
// 并且要获取锁成功 才能释放。
while(true){
// condition
// head -> A -> B -> C -> D
if(current.pre == head.get() && lock.compareAndSet(false,true)){
// 获取锁成功后,设置头节点的指向为当前线程
owner = Thread.currentThread();
Node old = head.get();
head.set(current);
old.next = null;
current.pre = null;
System.out.println(Thread.currentThread().getName()+"获得锁");
return;
}
LockSupport.park(); // 先判断一次逻辑在阻塞, 保证一次自己唤醒自己的操作
}
}
@Override
public void unlock() {
if(Thread.currentThread() != owner){
throw new RuntimeException("not owner");
}
// Node headNode = head.get();
owner = null;
lock.set(false);
Node next = head.get().next;
// -----
if(next != null){
System.out.println(Thread.currentThread().getName()+"唤醒下一个线程"+next.thread.getName());
LockSupport.unpark(next.thread);
}
}
}