JavaSE-多线程(5.2)- ReentrantLock (源码解析,公平模式)
JavaSE-多线程(5.2)- ReentrantLock (源码解析,公平模式)
ReentrantLock 解释
ReentrantLock 是 java JUC 包(java.util.concurrent)下的可重入锁工具类,具体功能实现依赖AbstractQueuedSynchronizer(AQS,抽象队列同步器),AQS 内部维护了一个变量 private volatile int state; 默认为 0,当线程上锁以后值变成 1 (state 值改变通过 CAS 实现),没有获取到锁的线程会加入等待对列。
ReentrantLock 使用举例
通过一个例子来分析 ReentrantLock 的执行原理:
public class FairLockAndNonFairLockTest {static class Task implements Runnable {private final ReentrantLock lock;private volatile int count = 0;public Task(boolean fair) {this.lock = new ReentrantLock(fair);}@Overridepublic void run() {for (int i = 0; i < 3; i++) {doTask();}}private void doTask() {lock.lock();try {System.out.println("任务开始。。。, 线程:" + Thread.currentThread().getName() + " , count=" + count);count++;sleep(500);System.out.println("任务结束。。。, 线程:" + Thread.currentThread().getName() + " , count=" + count);printQueuedThreads(lock);} finally {lock.unlock();}}}public static void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {throw new RuntimeException(e);}}public static void printQueuedThreads(ReentrantLock lock) {Class<ReentrantLock> reentrantLockClass = (Class<ReentrantLock>) lock.getClass();try {// getQueuedThreads 方法访问权限是 protected ,所以通过反射方式访问Method getQueuedThreads = reentrantLockClass.getDeclaredMethod("getQueuedThreads");getQueuedThreads.setAccessible(true);// 获取 aqs 中的线程队列List<Thread> list = (List<Thread>) getQueuedThreads.invoke(lock);// 看 getQueuedThreads 方法源码可知,它的顺序是从tail开始 加入list的,这里给它倒过来,从 head 开始Collections.reverse(list);System.out.println(list);} catch (NoSuchMethodException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}public static void main(String[] args) {Task task = new Task(true);for (int i = 0; i < 5; i++) {new Thread(task, "t" + (i + 1)).start();// 稍微延迟启动,让线程按顺序创建和启动sleep(10);}}
}
以上代码中启动了5个线程,它们各自都会执行task任务,每个task任务会循环3次调用 doTask 方法,doTask方法有加锁操作,每执行一次 count 加1,并且在doTask方法中会打印当前等待队列中的线程。
查看运行结果(是否和你预期一致?):
任务开始。。。, 线程:t1 , count=0
任务结束。。。, 线程:t1 , count=1
[Thread[t2,5,main], Thread[t3,5,main], Thread[t4,5,main], Thread[t5,5,main]]
任务开始。。。, 线程:t2 , count=1
任务结束。。。, 线程:t2 , count=2
[Thread[t3,5,main], Thread[t4,5,main], Thread[t5,5,main], Thread[t1,5,main]]
任务开始。。。, 线程:t3 , count=2
任务结束。。。, 线程:t3 , count=3
[Thread[t4,5,main], Thread[t5,5,main], Thread[t1,5,main], Thread[t2,5,main]]
任务开始。。。, 线程:t4 , count=3
任务结束。。。, 线程:t4 , count=4
[Thread[t5,5,main], Thread[t1,5,main], Thread[t2,5,main], Thread[t3,5,main]]
任务开始。。。, 线程:t5 , count=4
任务结束。。。, 线程:t5 , count=5
[Thread[t1,5,main], Thread[t2,5,main], Thread[t3,5,main], Thread[t4,5,main]]
任务开始。。。, 线程:t1 , count=5
任务结束。。。, 线程:t1 , count=6
[Thread[t2,5,main], Thread[t3,5,main], Thread[t4,5,main], Thread[t5,5,main]]
任务开始。。。, 线程:t2 , count=6
任务结束。。。, 线程:t2 , count=7
[Thread[t3,5,main], Thread[t4,5,main], Thread[t5,5,main], Thread[t1,5,main]]
任务开始。。。, 线程:t3 , count=7
任务结束。。。, 线程:t3 , count=8
[Thread[t4,5,main], Thread[t5,5,main], Thread[t1,5,main], Thread[t2,5,main]]
任务开始。。。, 线程:t4 , count=8
任务结束。。。, 线程:t4 , count=9
[Thread[t5,5,main], Thread[t1,5,main], Thread[t2,5,main], Thread[t3,5,main]]
任务开始。。。, 线程:t5 , count=9
任务结束。。。, 线程:t5 , count=10
[Thread[t1,5,main], Thread[t2,5,main], Thread[t3,5,main], Thread[t4,5,main]]
任务开始。。。, 线程:t1 , count=10
任务结束。。。, 线程:t1 , count=11
[Thread[t2,5,main], Thread[t3,5,main], Thread[t4,5,main], Thread[t5,5,main]]
任务开始。。。, 线程:t2 , count=11
任务结束。。。, 线程:t2 , count=12
[Thread[t3,5,main], Thread[t4,5,main], Thread[t5,5,main]]
任务开始。。。, 线程:t3 , count=12
任务结束。。。, 线程:t3 , count=13
[Thread[t4,5,main], Thread[t5,5,main]]
任务开始。。。, 线程:t4 , count=13
任务结束。。。, 线程:t4 , count=14
[Thread[t5,5,main]]
任务开始。。。, 线程:t5 , count=14
任务结束。。。, 线程:t5 , count=15
[]
以上,由于线程启动后 sleep(10); 所以总是线程 t1 先执行,其他线程由于竞争不到锁会依次加入到等待队列中,当线程 t1 执行完 1 次后,t1释放锁,t1 会继续循环,但是由于是公平模式,所以 t1 会加入到等待队列,t2 成功获取锁继续执行,依次交替执行。
ReentrantLock 源码解析
ReentrantLock 源码如下:
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;/*** @since 1.5* @author Doug Lea*/
public class ReentrantLock implements Lock, java.io.Serializable {private static final long serialVersionUID = 7373984872572414699L;/** Synchronizer providing all implementation mechanics */private final Sync sync;/*** Base of synchronization control for this lock. Subclassed* into fair and nonfair versions below. Uses AQS state to* represent the number of holds on the lock.*/abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/abstract void lock();/*** Performs non-fair tryLock. tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}final ConditionObject newCondition() {return new ConditionObject();}// Methods relayed from outer classfinal Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}final boolean isLocked() {return getState() != 0;}/*** Reconstitutes the instance from a stream (that is, deserializes it).*/private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // reset to unlocked state}}/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}/*** Creates an instance of {@code ReentrantLock}.* This is equivalent to using {@code ReentrantLock(false)}.*/public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}/*** Acquires the lock.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds the lock then the hold* count is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until the lock has been acquired,* at which time the lock hold count is set to one.*/public void lock() {sync.lock();}/*** Acquires the lock unless the current thread is* {@linkplain Thread#interrupt interrupted}.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds this lock then the hold count* is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until one of two things happens:** <ul>** <li>The lock is acquired by the current thread; or** <li>Some other thread {@linkplain Thread#interrupt interrupts} the* current thread.** </ul>** <p>If the lock is acquired by the current thread then the lock hold* count is set to one.** <p>If the current thread:** <ul>** <li>has its interrupted status set on entry to this method; or** <li>is {@linkplain Thread#interrupt interrupted} while acquiring* the lock,** </ul>** then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>In this implementation, as this method is an explicit* interruption point, preference is given to responding to the* interrupt over normal or reentrant acquisition of the lock.** @throws InterruptedException if the current thread is interrupted*/public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}/*** Acquires the lock only if it is not held by another thread at the time* of invocation.** <p>Acquires the lock if it is not held by another thread and* returns immediately with the value {@code true}, setting the* lock hold count to one. Even when this lock has been set to use a* fair ordering policy, a call to {@code tryLock()} <em>will</em>* immediately acquire the lock if it is available, whether or not* other threads are currently waiting for the lock.* This "barging" behavior can be useful in certain* circumstances, even though it breaks fairness. If you want to honor* the fairness setting for this lock, then use* {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }* which is almost equivalent (it also detects interruption).** <p>If the current thread already holds this lock then the hold* count is incremented by one and the method returns {@code true}.** <p>If the lock is held by another thread then this method will return* immediately with the value {@code false}.** @return {@code true} if the lock was free and was acquired by the* current thread, or the lock was already held by the current* thread; and {@code false} otherwise*/public boolean tryLock() {return sync.nonfairTryAcquire(1);}/*** Acquires the lock if it is not held by another thread within the given* waiting time and the current thread has not been* {@linkplain Thread#interrupt interrupted}.** <p>Acquires the lock if it is not held by another thread and returns* immediately with the value {@code true}, setting the lock hold count* to one. If this lock has been set to use a fair ordering policy then* an available lock <em>will not</em> be acquired if any other threads* are waiting for the lock. This is in contrast to the {@link #tryLock()}* method. If you want a timed {@code tryLock} that does permit barging on* a fair lock then combine the timed and un-timed forms together:** <pre> {@code* if (lock.tryLock() ||* lock.tryLock(timeout, unit)) {* ...* }}</pre>** <p>If the current thread* already holds this lock then the hold count is incremented by one and* the method returns {@code true}.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until one of three things happens:** <ul>** <li>The lock is acquired by the current thread; or** <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or** <li>The specified waiting time elapses** </ul>** <p>If the lock is acquired then the value {@code true} is returned and* the lock hold count is set to one.** <p>If the current thread:** <ul>** <li>has its interrupted status set on entry to this method; or** <li>is {@linkplain Thread#interrupt interrupted} while* acquiring the lock,** </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the specified waiting time elapses then the value {@code false}* is returned. If the time is less than or equal to zero, the method* will not wait at all.** <p>In this implementation, as this method is an explicit* interruption point, preference is given to responding to the* interrupt over normal or reentrant acquisition of the lock, and* over reporting the elapse of the waiting time.** @param timeout the time to wait for the lock* @param unit the time unit of the timeout argument* @return {@code true} if the lock was free and was acquired by the* current thread, or the lock was already held by the current* thread; and {@code false} if the waiting time elapsed before* the lock could be acquired* @throws InterruptedException if the current thread is interrupted* @throws NullPointerException if the time unit is null*/public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}/*** Attempts to release this lock.** <p>If the current thread is the holder of this lock then the hold* count is decremented. If the hold count is now zero then the lock* is released. If the current thread is not the holder of this* lock then {@link IllegalMonitorStateException} is thrown.** @throws IllegalMonitorStateException if the current thread does not* hold this lock*/public void unlock() {sync.release(1);}/*** Returns a {@link Condition} instance for use with this* {@link Lock} instance.** <p>The returned {@link Condition} instance supports the same* usages as do the {@link Object} monitor methods ({@link* Object#wait() wait}, {@link Object#notify notify}, and {@link* Object#notifyAll notifyAll}) when used with the built-in* monitor lock.** <ul>** <li>If this lock is not held when any of the {@link Condition}* {@linkplain Condition#await() waiting} or {@linkplain* Condition#signal signalling} methods are called, then an {@link* IllegalMonitorStateException} is thrown.** <li>When the condition {@linkplain Condition#await() waiting}* methods are called the lock is released and, before they* return, the lock is reacquired and the lock hold count restored* to what it was when the method was called.** <li>If a thread is {@linkplain Thread#interrupt interrupted}* while waiting then the wait will terminate, an {@link* InterruptedException} will be thrown, and the thread's* interrupted status will be cleared.** <li> Waiting threads are signalled in FIFO order.** <li>The ordering of lock reacquisition for threads returning* from waiting methods is the same as for threads initially* acquiring the lock, which is in the default case not specified,* but for <em>fair</em> locks favors those threads that have been* waiting the longest.** </ul>** @return the Condition object*/public Condition newCondition() {return sync.newCondition();}/*** Queries the number of holds on this lock by the current thread.** <p>A thread has a hold on a lock for each lock action that is not* matched by an unlock action.** <p>The hold count information is typically only used for testing and* debugging purposes. For example, if a certain section of code should* not be entered with the lock already held then we can assert that* fact:** <pre> {@code* class X {* ReentrantLock lock = new ReentrantLock();* // ...* public void m() {* assert lock.getHoldCount() == 0;* lock.lock();* try {* // ... method body* } finally {* lock.unlock();* }* }* }}</pre>** @return the number of holds on this lock by the current thread,* or zero if this lock is not held by the current thread*/public int getHoldCount() {return sync.getHoldCount();}/*** Queries if this lock is held by the current thread.** <p>Analogous to the {@link Thread#holdsLock(Object)} method for* built-in monitor locks, this method is typically used for* debugging and testing. For example, a method that should only be* called while a lock is held can assert that this is the case:** <pre> {@code* class X {* ReentrantLock lock = new ReentrantLock();* // ...** public void m() {* assert lock.isHeldByCurrentThread();* // ... method body* }* }}</pre>** <p>It can also be used to ensure that a reentrant lock is used* in a non-reentrant manner, for example:** <pre> {@code* class X {* ReentrantLock lock = new ReentrantLock();* // ...** public void m() {* assert !lock.isHeldByCurrentThread();* lock.lock();* try {* // ... method body* } finally {* lock.unlock();* }* }* }}</pre>** @return {@code true} if current thread holds this lock and* {@code false} otherwise*/public boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}/*** Queries if this lock is held by any thread. This method is* designed for use in monitoring of the system state,* not for synchronization control.** @return {@code true} if any thread holds this lock and* {@code false} otherwise*/public boolean isLocked() {return sync.isLocked();}/*** Returns {@code true} if this lock has fairness set true.** @return {@code true} if this lock has fairness set true*/public final boolean isFair() {return sync instanceof FairSync;}/*** Returns the thread that currently owns this lock, or* {@code null} if not owned. When this method is called by a* thread that is not the owner, the return value reflects a* best-effort approximation of current lock status. For example,* the owner may be momentarily {@code null} even if there are* threads trying to acquire the lock but have not yet done so.* This method is designed to facilitate construction of* subclasses that provide more extensive lock monitoring* facilities.** @return the owner, or {@code null} if not owned*/protected Thread getOwner() {return sync.getOwner();}/*** Queries whether any threads are waiting to acquire this lock. Note that* because cancellations may occur at any time, a {@code true}* return does not guarantee that any other thread will ever* acquire this lock. This method is designed primarily for use in* monitoring of the system state.** @return {@code true} if there may be other threads waiting to* acquire the lock*/public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();}/*** Queries whether the given thread is waiting to acquire this* lock. Note that because cancellations may occur at any time, a* {@code true} return does not guarantee that this thread* will ever acquire this lock. This method is designed primarily for use* in monitoring of the system state.** @param thread the thread* @return {@code true} if the given thread is queued waiting for this lock* @throws NullPointerException if the thread is null*/public final boolean hasQueuedThread(Thread thread) {return sync.isQueued(thread);}/*** Returns an estimate of the number of threads waiting to* acquire this lock. The value is only an estimate because the number of* threads may change dynamically while this method traverses* internal data structures. This method is designed for use in* monitoring of the system state, not for synchronization* control.** @return the estimated number of threads waiting for this lock*/public final int getQueueLength() {return sync.getQueueLength();}/*** Returns a collection containing threads that may be waiting to* acquire this lock. Because the actual set of threads may change* dynamically while constructing this result, the returned* collection is only a best-effort estimate. The elements of the* returned collection are in no particular order. This method is* designed to facilitate construction of subclasses that provide* more extensive monitoring facilities.** @return the collection of threads*/protected Collection<Thread> getQueuedThreads() {return sync.getQueuedThreads();}/*** Queries whether any threads are waiting on the given condition* associated with this lock. Note that because timeouts and* interrupts may occur at any time, a {@code true} return does* not guarantee that a future {@code signal} will awaken any* threads. This method is designed primarily for use in* monitoring of the system state.** @param condition the condition* @return {@code true} if there are any waiting threads* @throws IllegalMonitorStateException if this lock is not held* @throws IllegalArgumentException if the given condition is* not associated with this lock* @throws NullPointerException if the condition is null*/public boolean hasWaiters(Condition condition) {if (condition == null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException("not owner");return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);}/*** Returns an estimate of the number of threads waiting on the* given condition associated with this lock. Note that because* timeouts and interrupts may occur at any time, the estimate* serves only as an upper bound on the actual number of waiters.* This method is designed for use in monitoring of the system* state, not for synchronization control.** @param condition the condition* @return the estimated number of waiting threads* @throws IllegalMonitorStateException if this lock is not held* @throws IllegalArgumentException if the given condition is* not associated with this lock* @throws NullPointerException if the condition is null*/public int getWaitQueueLength(Condition condition) {if (condition == null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException("not owner");return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);}/*** Returns a collection containing those threads that may be* waiting on the given condition associated with this lock.* Because the actual set of threads may change dynamically while* constructing this result, the returned collection is only a* best-effort estimate. The elements of the returned collection* are in no particular order. This method is designed to* facilitate construction of subclasses that provide more* extensive condition monitoring facilities.** @param condition the condition* @return the collection of threads* @throws IllegalMonitorStateException if this lock is not held* @throws IllegalArgumentException if the given condition is* not associated with this lock* @throws NullPointerException if the condition is null*/protected Collection<Thread> getWaitingThreads(Condition condition) {if (condition == null)throw new NullPointerException();if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))throw new IllegalArgumentException("not owner");return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);}/*** Returns a string identifying this lock, as well as its lock state.* The state, in brackets, includes either the String {@code "Unlocked"}* or the String {@code "Locked by"} followed by the* {@linkplain Thread#getName name} of the owning thread.** @return a string identifying this lock, as well as its lock state*/public String toString() {Thread o = sync.getOwner();return super.toString() + ((o == null) ?"[Unlocked]" :"[Locked by thread " + o.getName() + "]");}
}
ReentrantLock 类内有一个内部抽象类 Sync
abstract static class Sync extends AbstractQueuedSynchronizer
公平同步器 FairSync 和非公平同步器 NonfairSync 都是继承至 Sync,以公平同步器为例:
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
① 创建公平锁
如下代码表示创建公平锁,
private final ReentrantLock lock = new ReentrantLock(true);
程序会执行如下代码:
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
即将 FairSync() 对象赋值给 引用 private final Sync sync;
FairSync 类继承关系:
② 获取锁 lock.lock();
实际执行的是 FairSync 类的 lock 方法:
final void lock() {acquire(1);
}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
如下,当第一个线程t1启动时,会尝试获取锁,即进入 if (c == 0) 代码块,
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();// 获取当前 state 值,为0时才可以修改其值 int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
当线程t1第一次执行时,tail = null,head = null,所以该方法返回 false,
// 判断AbstractQueuedSynchronizer中有没有等待的队列
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
接着执行 compareAndSetState(0, acquires) 代码,即通过 CAS 将 state 状态由 0 改成 1 ,由于主线程睡眠了10毫秒,所以其他线程启动推迟,大概率由 t1 获取锁成功,所以此时 exclusiveOwnerThread = t1。
// AbstractOwnableSynchronizer 类方法,将当前获取到锁的线程赋值给 exclusiveOwnerThread 变量
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}
② 其他尝试获取锁的线程加入等待队列
当线程没有获取到锁时,使用 Node 记录等待线程然后加入队列,Node 是 抽象类 AbstractQueuedSynchronizer 的内部类:
static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;volatile int waitStatus;volatile Node prev;volatile Node next;/*** The thread that enqueued this node. Initialized on* construction and nulled out after use.*/volatile Thread thread;/*** Link to next node waiting on condition, or the special* value SHARED. Because condition queues are accessed only* when holding in exclusive mode, we just need a simple* linked queue to hold nodes while they are waiting on* conditions. They are then transferred to the queue to* re-acquire. And because conditions can only be exclusive,* we save a field by using special value to indicate shared* mode.*/Node nextWaiter;/*** Returns true if node is waiting in shared mode.*/final boolean isShared() {return nextWaiter == SHARED;}final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
线程 t1 获取锁成功后开始执行 run 方法循环体中的 doTask 方法,其他线程启动后也会调用 lock.lock(); 方法, 由于此时 state =1 (即锁被 t1 持有),所以 tryAcquire 返回 false,此时线程 t2 执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
如下,t2 准备加入队列时,此时队列为空,首先会创建一个空的节点,head 和 tail 都指向这个空 node,
+------+ head ====> | 空 | <==== tail 备注: <==== 和 ====> 表示指向,<----表示 prev ,----> 表示 next+------+
当执行一次循环后,队列里已经有了一个空节点,此时进入else逻辑,此时队列节点如下
+------+ <---- +-----+ head ====> | 空 | | t2 | <==== tail+------+ ----> +-----+
private Node enq(final Node node) {for (;;) {Node t = tail;// 第一次执行时 t == null if (t == null) { // Must initialize// 当队列为空时,会创建一个空的节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
当 t2 加入队列后执行如下方法,第一次循环时,t2 节点的上一个节点 p waitStatus = 0 ,shouldParkAfterFailedAcquire 方法将 p 节点waitStatus 改成 Node.SIGNAL(-1) 并且返回false,再次循环,shouldParkAfterFailedAcquire 方法返回true
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取 node 节点的上一个节点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
t2 执行 parkAndCheckInterrupt 方法,t2 线程进入 waiting 状态。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
其他线程类似,由于竞争不到锁,同样会加入等待队列,如下:
+------+ <---- +-----+ <---- +-----+ <---- +-----+ <---- +-----+ head ====> | 空 | | t2 | | t3 | | t4 | | t5 | <==== tail+------+ ----> +-----+ ----> +-----+ ----> +-----+ ----> +-----+
③ 释放锁 lock.unlock();
public void unlock() {sync.release(1);
}
AbstractQueuedSynchronizer 类 release 方法
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}
t1 释放锁时,c = 1 -1;进入 if (c == 0) ,将获取锁的线程 exclusiveOwnerThread 变量置null,并且将 state 改成 0,
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
tryRelease 方法返回 true,执行 unparkSuccessor 方法:
第一步:将 t2 的前一个节点,即空节点 p 的 waitStatus 状态由 SIGNAL (-1) 改成 0 ,compareAndSetWaitStatus(node, ws, 0);
第二步:调用 LockSupport.unpark(s.thread); 将 t2 线程由 waiting 状态改成 runnable
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
}
④ 其他线程获取锁 lock.lock();
此时 t1 和 t2 线程都处于 runnable 的状态,t1 执行下一次循环,重新尝试获取锁 lock.lock(); 但是由于调用 tryAcquire 方法时hasQueuedPredecessors 方法返回 true,主要是 s.thread != Thread.currentThread() ,因为此时头结点的下一个节点是 t2 ,此时线程是 t1 ,所以 线程 t1 会加入到等待队列。
而线程 t2 会从 if (shouldParkAfterFailedAcquire(p, node) && 继续执行,从新开始循环,执行 tryAcquire 方法成功,即 t2 获取锁成功。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {// 将原来的头结点的下一个节点设置成头节点setHead(node);// 将原来的头节点断开p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 从这里继续执行。interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
所以新的队列是这样的:
+------+ <---- +-----+ <---- +-----+ <---- +-----+ <---- +-----+ head ====> | t2 | | t3 | | t4 | | t5 | | t1 | <==== tail+------+ ----> +-----+ ----> +-----+ ----> +-----+ ----> +-----+
依次力推。。。。