多线程-JUC源码
简介
JUC的核心是AQS,大部分锁都是基于AQS扩展出来的,这里先结合可重入锁和AQS,做一个讲解,其它的锁的实现方式也几乎类似
ReentrantLock和AQS
AQS的基本结构
AQS,AbstractQueuedSynchronizer,抽象队列同步器,JUC中的基础组件,基于AQS,JUC实现了多种锁和同步工具。
AQS在设计模式是采用了模板方法设计模式,要想基于AQS实现一个同步工具,需要继承AQS,同时实现所有protected权限的方法,这些方法定义了如何获取锁(独占锁、共享锁),AQS负责整体流程的编排,同时维护阻塞队列、线程的阻塞和唤醒。
AQS用于实现依赖单个原子值去表示状态的同步器
1、AQS的继承体系:
// AQS:AQS继承了AbstractOwnableSynchronizer,并且AQS本身是一个抽象类
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable { }
// AQS的父类,AbstractOwnableSynchronizer,只有一个成员变量,用于存放持有独占锁的线程。
// 一个线程可以独占的同步器。本类为创建可能包含所有权概念的锁和相关同步器提供了基础
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
// 持有排他锁的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
2、AQS的体系结构
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// protected权限的构造方法,只允许子类继承和调用
protected AbstractQueuedSynchronizer() { }
// 定义了队列中的节点,节点中封装了线程对象和指向前后节点的指针。AQS中没有队列实例,
// 而是通过包装线程为Node类,用前、后节点指针来实现一个虚拟的双向队列。
static final class Node {
/* 两个常量,定义了锁的模式 */
// 锁是共享模式
static final Node SHARED = new Node();
// 锁是独占模式
static final Node EXCLUSIVE = null;
Node nextWaiter; // 锁的模式,为null,独占模式,SHARED,共享模式;同时,Condition使用它构建条件队列
// 节点间组成一个双向链表
volatile Node prev;
volatile Node next;
// 节点中封装的线程
volatile Thread thread;
}
// 双向链表的头结点
private transient volatile Node head;
// 双向链表的尾结点
private transient volatile Node tail;
// 锁的状态,这个状态由子类来操作
private volatile int state;
/* 模板方法:交给子类实现的方法 */
// 尝试获取独占锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放独占锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 尝试获取共享锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放共享锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 判断当前锁是否是独占模式
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/* 父类中负责流程编排 */
// 获取独占锁,先调用交给子类实现的tryAcquire方法,如果获取锁失败,进入队列,然后等待
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 释放独占锁,先调用交给子类实现的tryRelease方法,如果释放成功,当前节点移出队列,然后唤醒队列中的第一个节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 获取共享锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 释放共享锁
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
从AQS的基本结构中可以看到:
- AQS的队列,实际是一个双向链表。
- AQS在设计上使用了模板方法模式,父类中定义好流程,然后再定义好交给子类实现的模板方法。在当前案例中,交给子类实现的是获取锁、释放锁的方式,父类来维护阻塞队列、线程的阻塞和唤醒。通过这种方式,子类指定锁的获取和释放的方式,子类可以实现多种类型的锁,比如公平锁、非公平锁、读写锁、并发度控制(信号量)等。
AQS中提供了两套模式:独占模式和共享模式
- 独占模式:exclusive, 同一时间只有一个线程能拿到锁执行
- 共享模式:shared, 同一时间有多个线程可以拿到锁协同工作
子类可以选择独占模式、也可以选择共享模式,例如,读写锁中,读锁就是共享模式,写锁就是独占模式,可重入锁也是独占模式。
ReentrantLock的基本结构
1、ReentrantLock的继承体系:ReentrantLock实现了Lock接口,并且支持序列化
public class ReentrantLock implements Lock, java.io.Serializable {
Lock接口:定义了一个锁应该具备的功能
public interface Lock {
/* 获取锁 */
// 获取锁
void lock();
// 可打断地获取锁
void lockInterruptibly() throws InterruptedException;
// 不阻塞地获取锁,如果获取不到,立刻返回
boolean tryLock();
// 获取锁,指定超时时长
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 获取条件变量,条件变量是指调用wait方法、notify方法的锁对象,ReentrantLock可以实现在多个条件变量上等待和唤醒
Condition newCondition();
}
2、ReentrantLock的类结构
public class ReentrantLock implements Lock, java.io.Serializable {
// 同步器:抽象静态内部类,定义了获取锁、释放锁的功能。同步器继承了AQS,AQS是juc的核心。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 获取锁的功能,交给子类扩展
abstract void lock();
// 释放锁
protected final boolean tryRelease(int releases) { } // 这里暂时不关心它的实现
}
// 同步器的实例
private final Sync sync;
// 非公平锁
static final class NonfairSync extends Sync { }
// 公平锁
static final class FairSync extends Sync { }
// 构造方法
public ReentrantLock() {
sync = new NonfairSync(); // 默认使用非公平锁
}
// 获取锁
public void lock() {
sync.lock();
}
// 释放锁
public void unlock() {
sync.release(1);
}
}
总结:
- ReentrantLock内部定义了一个同步器,并且它的所有功能实际上都是委托给同步器来完成。
- 同步器把获取锁的方法定义为抽象方法,同时实现了释放锁的方法,在同步器的基础上,扩展出了两个子类,公平锁、非公平锁,公平锁和非公平锁只是获取锁的方式不同,其它都相同。ReentrantLock默认使用非公平锁
- 同步器继承了AQS
ReentrantLock是如何获取锁的?
ReentrantLock内部实现了公平锁和非公平锁两种获取锁的模式,默认使用非公平锁,这里分别讲解它们的工作机制。
非公平锁是如何获取锁的?
源码:
1、整体流程
// 这是同步器的子类NonfairSync中获取锁的方法
final void lock() {
// 获取锁:尝试使用cas算法改变状态变量,把它的值从0改为1,改变成功,表示获取到锁
if (compareAndSetState(0, 1))
// 获取到锁后,把当前线程设置为独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 没有获取到锁:调用AQS中的方法,进入阻塞队列
acquire(1);
}
这个方法中调用的方法都是来自AQS。从方法的实现步骤来看,非公平锁在获取锁时,先尝试直接获取锁,获取不到,再进入阻塞队列,符合之前提到的非公平锁的原理。
2、AQS中acquire方法:它在AQS中定义了获取独占锁的流程
// 整体流程
public final void acquire(int arg) {
// 尝试获取锁:首先调用交给子类实现的的tryAcquire方法,尝试获取锁,如果获取成功,直接进入同步代码块,
if (!tryAcquire(arg) &&
// 如果没有获取到锁,进入阻塞队列:如果获取锁失败,线程进入队列,阻塞,入队之前还会尝试再次获取锁,
// 成功则不入队
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 为线程打上中断状态,因为acquireQueued方法中会清除线程的中断标识,所以这里需要重新为线程打上中断标识
selfInterrupt();
}
2.1、交给子类实现的tryAcquire方法:尝试获取锁
// 定义在NonfairSync中
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 定义在Sync中
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 表示无锁
// 尝试获取锁,获取锁就是通过cas算法改变state变量的状态,改变成功,就是获取到锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果获取锁的线程是当前线程,重入
int nextc = c + acquires; // 状态加1
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 更新状态
return true;
}
return false;
}
2.2、获取锁失败后进入阻塞队列的逻辑:
// 1、addWaiter方法:创建节点并且把节点加入到队列中
private Node addWaiter(Node mode) { // 这里的参数mode表示节点的模式,这里是独占模式
// 将当前线程封装成一个node节点
Node node = new Node(Thread.currentThread(), mode);
/* 下面是操作双向链表的代码,将新节点加入到队列的尾部,队列中的头结点是一个虚拟节点 */
Node pred = tail;
if (pred != null) { // 如果当前队列中有节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 当前队列中没有节点,那么需要创建虚拟头结点,然后新节点入队
return node;
}
// 节点入队的逻辑
private Node enq(final Node node) {
for (;;) {// 自旋
// 获取当前队列中的尾节点
Node t = tail;
if (t == null) { // Must initialize
// 初始化一个空节点 new Node(),作为队列中的哨兵节点
if (compareAndSetHead(new Node())) // cas操作,设置头节点为空节点
tail = head;// 初始化空节点后,因为此时队列中只有一个节点,所以head和tail都指向这一个节点
} else {
// 第二次循环,将节点挂载到空节点上
node.prev = t;
// 将用户传入的node节点设置为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t; // 返回前一个节点
}
}
}
}
2.3、线程进入阻塞的逻辑:
// 2、acquireQueued,线程进入阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
// 获取当前节点的上一个节点
final Node p = node.predecessor();
// 判断前一个节点是不是头节点(队列中使用一个虚拟节点作为头结点,所以当前节点的前一个节点是头结点,
// 那么当前节点就是事实上的头结点),如果是,尝试抢占锁资源
if (p == head && tryAcquire(arg)) {
// 抢到锁资源后,这里可以理解为当前节点出队,将当前节点设置为队列的虚拟头结点,
// 所以线程获取到锁之后,就会被移出阻塞队列
setHead(node);
p.next = null; // help GC,将原先的头结点清除
failed = false;
return interrupted;
}
// 抢锁失败
if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应该阻塞当前线程
parkAndCheckInterrupt()) // 调用LockSupport中的park方法,阻塞当前线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 发生异常,当前节点出队
}
}
// 判断是否应该阻塞当前线程:shouldParkAfterFailedAcquire
// 第一个参数是当前节点的上一个节点,第二个参数是当前节点,
// 总结:如果上一个节点的状态是SIGNAL,那么当前节点应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 这个方法中需要判断节点的状态。表示状态的常量:
// static final int CANCELLED = 1;:表示当前节点的线程已经被取消
// static final int SIGNAL = -1;:表示当前节点的后继节点的线程正在等待唤醒
// static final int CONDITION = -2;:节点在条件队列中,节点线程等待在Condition上,
// 不过当其他的线程对Condition调用了signal()方法后,该节点就会从等待队列
// 转移到同步队列中,然后开始尝试对同步状态的获取
// static final int PROPAGATE = -3;:表示下一个共享状态应该被无条件传播
int ws = pred.waitStatus;
// 判断上一个节点的状态
if (ws == Node.SIGNAL)
return true; // 如果前驱节点状态为SIGNAL,当前节点需要被阻塞
if (ws > 0) { // 如果前驱节点的状态是CANCELLED,需要把CANCELLED状态的节点移除出队列
do {
node.prev = pred = pred.prev; // 前一个节点前移,同时当前节点指向前前一个节点
} while (pred.waitStatus > 0);
pred.next = node; // 将当前节点连接到新的前驱节点
} else {
// 将前驱节点状态设置为SIGNAL,当下一次执行这个方法时,
// 因为ws == SIGNAL状态成立,所以下一次会执行返回true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// parkAndCheckInterrupt:阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 这个方法会清除中断标记
}
2.4、发生异常,节点出队的方法:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0) // 如果前一个节点的状态是CANCELLED,需要移除前一个节点,重新构建队列
node.prev = pred = pred.prev; // 前一个节点前移,同时当前节点指向前前一个节点
Node predNext = pred.next; // 循环过后前面的节点的下一个节点
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾结点,将尾结点更新为前一个节点
if (node == tail && compareAndSetTail(node, pred)) {
// 同时前一个节点的next指针指向null
compareAndSetNext(pred, predNext, null);
} else {
// 如果后序节点需要被唤醒
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 进入if分支,证明:前驱节点不是头节点、前驱节点的状态是SIGNAL或可以被设置为SIGNAL、
// 前驱节点仍然有线程关联,移除当前节点即可
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); // 从队列中移除当前节点
} else {
// 进入else分支,证明当前节点的前驱节点无法正常处理后续节点的唤醒逻辑,因此在这里唤醒后续节点
unparkSuccessor(node); // 唤醒后续节点
}
node.next = node; // help GC
}
}
// 唤醒后续节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 遍历,从尾结点开始,找到当前节点之后第一个状态不为CANCELLED的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒该节点
}
总结:用一张流程图来总结代码中的流程
1、lock方法:尝试获取锁的流程
2、acquire方法的流程
公平锁是如何获取锁的?
源码:
// FairSync:这是同步器的子类FairSync中获取锁的方法
final void lock() {
// 直接进入阻塞队列,也符合公平锁的原理,acquire中会调用tryAcquire方法,
// 公平锁和非公平锁都分别实现了这个方法,定制自己获取锁的逻辑
acquire(1);
}
// FairSync 公平锁中获取锁的流程
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 如果无锁
if (!hasQueuedPredecessors() && // 等待队列中没有节点
compareAndSetState(0, acquires)) { // 获取锁成功
setExclusiveOwnerThread(current); // 设置当前线程为拥有独占锁的线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 锁的可重入逻辑
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁不像非公平锁那样,先尝试获取锁,而是按照流程,判断阻塞队列中有没有节点,如果有,进入阻塞队列
ReentrantLock是如何释放锁的?
公平锁和非公平锁释放锁的原理是一样的,这里统一讲解。
调用AQS中的release方法,来完成释放锁的逻辑
public final boolean release(int arg) {
// tryRelease,交给子类实现的模板方法,执行释放锁的逻辑,如果返回true,表示锁释放成功
if (tryRelease(arg)) {
// 处理等待队列,唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点,这里是唤醒队列中的第一个节点
unparkSuccessor(h); // 这个方法在前面有提到
return true;
}
return false;
}
// ReentrantLock中释放锁的逻辑,同样的,改变状态,将独占锁标识改为null
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 改变状态,这里包含可重入的逻辑
if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断,如果当前线程没有持有锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 确认是释放锁,将独占锁的标识置为null,此前它是当前线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 设置状态
return free;
}
Condition 源码
1、ReentrantLock中获取Condition的方法:
// ReentrantLock中的方法:
public Condition newCondition() {
return sync.newCondition();
}
// 同步器中的方法:
final ConditionObject newCondition() {
return new ConditionObject();
}
可以看到,最终是获取了一个ConditionObject的实例
2、ConditionObject:它被定义在AQS中,实现了Condition接口。
Condition接口:条件对象,条件对象从Object类中提取出了wait、notify、notifyAll方法的功能,使得一个锁对象上可以支持多个等待队列。
public interface Condition {
// 使当前线程进入等待状态,直到被唤醒或调用interrupt方法
void await() throws InterruptedException;
// 等待,直到被唤醒
void awaitUninterruptibly();
// 等待,并且指定时长
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒单个线程
void signal();
// 唤醒全部线程
void signalAll();
}
2、ConditionObject的整体结构:它是AQS的成员内部类
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/* 维护一个等待队列 */
// 头节点
private transient Node firstWaiter;
// 尾节点
private transient Node lastWaiter;
}
3、阻塞逻辑的实现:
// await方法:线程入队,然后调用park方法,进入等待状态
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 向条件队列添加一个节点
Node node = addConditionWaiter();
int savedState = fullyRelease(node); // 完全释放当前线程持有的锁
int interruptMode = 0;
// 判断当前节点是否在同步队列
while (!isOnSyncQueue(node)) {
// 如果不在,进入阻塞状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 唤醒后,没有进入上面的while,证明节点在同步队列中,尝试重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 判断节点是否仍在条件队列中
unlinkCancelledWaiters(); // 清理队列中所有被取消的节点,确保队列的正确性
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); // 处理中断逻辑,被外部中断后,是抛异常还是继续进行
}
// 清理队列中失效的节点:这里实际上是处理单向链表的逻辑,等待队列中,使用nextWaiter指向下一个节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) { // 从头结点开始遍历
Node next = t.nextWaiter; // 下一个节点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t; // 用作下一次循环时记录前一个节点
t = next; // 头结点后移
}
}
3.1、添加线程到条件队列,条件队列是一个单向链表
private Node addConditionWaiter() {
Node t = lastWaiter;
// 这里先判断尾结点的状态,如果它不是CONDITION状态
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 清理队列中失效的节点
t = lastWaiter;
}
// 节点入队
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
3.2、释放锁的逻辑
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) { // 释放锁
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED; // 如果释放失败,将节点状态改为CANCELLED
}
}
4、唤醒逻辑的实现:
// signal方法:唤醒线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒队列中的第一个节点,方法中会调用LockSupport.unpark方法
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) // 头结点后移
lastWaiter = null; // 如果等待队列中只有一个节点,尾结点置为空
first.nextWaiter = null; // 原先的头结点出队
} while (!transferForSignal(first) && // 唤醒头结点所在的线程,如果唤醒成功,退出循环
(first = firstWaiter) != null);
}
// 唤醒头结点所在的线程
final boolean transferForSignal(Node node) {
// 状态转换,由CONDITION变为初始状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 节点加入同步队列,这里返回的是当前节点的前一个节点
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前一个节点的状态无法被转换为SIGNAL
LockSupport.unpark(node.thread); // 手动唤醒当前节点
return true;
}
总结:Condition的工作机制
- Condition的实现类ConditionObject,是AQS的内部类,所以它会持有AQS的引用,在当前案例中,AQS是通过子类ReentrantLock实例化的,所以Condition实际上持有ReentrantLock的实例。成员内部类的实例依赖于外部类的实例,通过ReentrantLock创建ConditionObject的实例。
- Condition内部会维护一个自己的等待队列,它又持有ReentrantLock的实例,所以它可以操作ReentrantLock的同步队列
- 用户通过Condition实例调用await方法,condition会把用户线程加入到自己的条件队列中,然后阻塞
- 用户通过Condition实例调用signal方法,condition会把用户线程从自己的条件队列中移除,然后加入到ReentrantLock的同步队列中,然后唤醒用户线程
- 用户线程被唤醒后,判断自己是不是在同步队列中,如果在,抢锁,如果不在,继续阻塞。抢到锁之后,会额外判断,如果当前线程还在条件队列中,会清理条件队列中失效的节点
总结
这里介绍了可重入锁和AQS是如何配合在一起工作的,它们的设计模式,哪些功能被定义在可重入锁中,哪些功能被定义在AQS中,后面介绍的几个工具也是这么实现的,模式基本相同。
ReentrantReadWriteLock 源码
读写锁,读锁是共享锁,写锁是独占锁,和之前的ReentrantLock类似的一点,ReentrantLock本身可以被理解为是一个写锁
ReentrantReadWriteLock的基本结构
1、ReentrantReadWriteLock的继承体系:ReentrantReadWriteLock实现了读写锁(ReadWriteLock)接口,并且支持序列化
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
ReadWriteLock接口:定义了读锁和写锁
public interface ReadWriteLock {
// 读锁
Lock readLock();
// 写锁
Lock writeLock();
}
2、ReentrantReadWriteLock的类结构
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
// 同步器
abstract static class Sync extends AbstractQueuedSynchronizer {
/* 同步器中定义的抽象方法 */
// 判断读是否应该阻塞
abstract boolean readerShouldBlock();
// 判断写是否应该阻塞
abstract boolean writerShouldBlock();
}
// 非公平锁
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // 非公平锁,写锁可以竞争
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
// 公平锁
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
// 读锁:内部操作的是共享锁
public static class ReadLock implements Lock, java.io.Serializable {
// 同步器,锁的内部,所有的功能都是委托同步器实现的
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() { // 获取锁
sync.acquireShared(1);
}
public void unlock() { // 释放锁
sync.releaseShared(1);
}
}
// 写锁,内部操作的是独占锁
public static class WriteLock implements Lock, java.io.Serializable {
// 同步器
private final Sync sync;
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
// 构造方法,读锁和写锁默认都是非公平锁
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
}
ReentrantReadWriteLock和ReentrantLock类似,只是它的内部扩展出了读锁和写锁,读锁和写锁依赖的是同一个同步器,读锁是共享锁,写锁是排它锁。
读写锁内置的同步器
读写锁需要在读锁和写锁之间同步,这些功能都依赖同步器
同步器的结构:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/* 锁的状态:用state的高16位作为读锁的数量,低16位表示写锁的数量 */
static final int SHARED_SHIFT = 16; // 计算常量
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 读锁的次数,状态字段无符号右位移16位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 写锁的次数,状态字段取后16位
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 静态内部类:封装了线程id和该线程持有读锁的次数
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread()); // 线程id,在创建实例时初始化
}
private transient HoldCounter cachedHoldCounter;
// 静态内部类:继承了ThreadLocal,用于记录每个线程持有读锁的次数
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
// 第一个持有读锁的线程
private transient Thread firstReader = null;
// 第一个持有读锁的线程持有几次读锁,这里是可重入锁的逻辑
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}
获取读锁的逻辑
读锁直接调用了AQS中获取共享锁的逻辑,并且在某些步骤做了自己的定制,这就是模板方法设计模式。
源码:
1、基本步骤
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) // 尝试获取共享锁
doAcquireShared(arg); // 获取失败,进入阻塞队列
}
2、尝试获取共享锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取 state 的值
int c = getState();
// 判断是否有线程获取了写锁并且不是当前线程,exclusiveCount(c),计算写锁的数量
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1; // 如果有线程获取了写锁,直接返回
// 计算读锁的数量
int r = sharedCount(c);
if (!readerShouldBlock() && // 读是否应阻塞,这个实现在子类中
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // cas算法修改状态,修改成功,表示获取到读锁
/* 下面这一大段逻辑都是用来记录当前线程获取了几次共享锁 */
if (r == 0) {
// 表示是第一个获取读锁的线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果是当前获取读锁的线程重入,再次获取锁
firstReaderHoldCount++;
} else {
// 如果不是第一个获取读锁的线程来获取读锁,使用cachedHoldCounter和readHolds。
// cachedHoldCounter记录线程id和该线程对于读锁的持有次数,readHolds将该数据
// 存储到ThreadLocal中
HoldCounter rh = cachedHoldCounter;
// 这个if else是处理readHolds和cachedHoldCounter之间的关系
if (rh == null || rh.tid != getThreadId(current)){
// 这里的get方法,会调用初始化数据的方法initialValue() // 参考ThreadLocal
cachedHoldCounter = rh = readHolds.get();
} else if (rh.count == 0)
readHolds.set(rh);
rh.count++; // 线程id在创建实例时初始化,这里记录该线程对于读锁的持有次数
}
return 1; // 表示获取锁成功
}
// 需要被阻塞获取读锁失败,那么需要进入下面完整版的获取锁的过程
return fullTryAcquireShared(current);
}
// 读是否需要被阻塞:这个逻辑定义在同步器中。具体实现是,如果队列中的第一个节点是一个独占模式,读线程需要被阻塞,否则不需要。
// 因为即使当前没有写线程持有锁,队列中也可能会有写请求,并且队列中的写请求也应该被优先处理,在非公平锁中,读线
// 程和写线程的请求顺序可能不严格按照先进先出处理,但写线程的请求仍然需要被优先处理。
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() && // 队列中的头结点是独占模式
s.thread != null;
}
获取锁失败后,需要调用完整版的获取锁的流程:
// 完整版的获取锁的过程:用于在快速路径失败后,提供一种更全面的尝试获取共享锁的机制。
// 它处理了快速路径未能处理的复杂情况,例如锁降级、读锁数量达到上限、线程是否需要阻塞等。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 自旋
int c = getState();
// 判断是否有线程获取了写锁并且不是当前线程
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1; // 如果有,直接返回
} else if (readerShouldBlock()) { // 判断读线程是否应该被阻塞
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove(); // 移除当前线程持有锁的记录
}
}
if (rh.count == 0)
return -1;
}
}
// 如果当前线程已经持有写锁,继续尝试获取读锁(锁降级),
if (sharedCount(c) == MAX_COUNT) // 检查读锁数量是否达到上限
throw new Error("Maximum lock count exceeded");
// 尝试获取共享锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
3、获取锁失败后,进入阻塞队列
private void doAcquireShared(int arg) {
// 向队列尾部添加一个节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果当前节点是头结点,再次尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取锁成功,头结点出队并且唤醒后续节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 处理异常情况
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 将当前节点设置为一个虚拟头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 如果下一个节点是共享模式或者null,唤醒后续节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
释放读锁的逻辑
整体流程:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 尝试释放共享锁
doReleaseShared(); // 释放共享锁
return true;
}
return false;
}
1、tryReleaseShared:负责更新状态
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 更新当前线程持有共享锁的次数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 更新state变量
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2、doReleaseShared:维护阻塞队列
private void doReleaseShared() {
for (;;) { // 无限循环,直到释放操作完成
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 尝试将状态从SIGNAL改为0
continue; // 如果CAS失败,重新循环检查
unparkSuccessor(h); // 唤醒头节点的后继节点
}
// 如果头节点的状态为0,说明当前没有线程需要被唤醒,但需要确保释放操作能够继续传播。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
// 在每次循环结束时,检查头节点是否发生变化。如果头节点没有变化,说明当前的释放操作已经完成,可以退出循环。
break;
}
}
获取和释放写锁的逻辑
和ReentrantLock类似,只是操作state字段的方式不同。
获取写锁的逻辑:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // 如果锁已经被占用
// (Note: if c != 0 and w == 0 then shared count != 0),如果c != 0 && w == 0,
// 证明共享锁不为0,有线程持有共享锁
if (w == 0 || current != getExclusiveOwnerThread()) // 不是当前线程获取的写锁
return false; // 退出
// 如果当前线程已经持有写锁,检查是否超过最大次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 重入
setState(c + acquires);
return true;
}
// 如果锁未被占用(c == 0)
if (writerShouldBlock() || // 如果写线程应该阻塞
!compareAndSetState(c, c + acquires)) // 或者 CAS 更新状态失败
return false;
setExclusiveOwnerThread(current);
return true;
}
释放写锁的逻辑:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
总结
读写锁使用同一个状态字段、同一个阻塞队列,内部基于同一个同步器,彼此之间互相影响,只是获取锁的方式不同。
Semaphore 源码
Semaphore底层是基于共享锁,也可以理解为读锁,加锁和释放锁,操作的都是读锁。它允许多个线程同时执行同步代码块,但是它又会限制线程数量,从而达到控制并发量的目的
Semaphore的基本结构:
public class Semaphore implements java.io.Serializable {
// 同步器的实例
private final Sync sync;
// 同步器的定义
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 这里重点关注构造方法,它调用AQS中的setState方法,state字段表示锁的状态,如果是共享锁,
// state的值0是到n,如果是排它锁,state的值是0到1,0表示无锁,1表示获取到锁。值为n,表示
// 允许n个线程同时获取共享锁
Sync(int permits) {
setState(permits);
}
}
// 构造方法,参数premits代表许可证数量,也就是同时允许多少个线程进行并发操作
public Semaphore(int permits) {
sync = new NonfairSync(permits); // 默认使用非公平锁
}
// 非公平锁
static final class NonfairSync extends Sync {}
// 公平锁
static final class FairSync extends Sync { }
// Semaphore方法将获取锁和释放锁的功能都委托给同步器,同步器又调用了AQS中的方法,所以核心实现在AQS
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
}
和ReentrantLock类似,Semaphore内部定义了同步器Sync,同步器继承了AQS,基于同步器扩展出了公平锁、非公平锁,Semaphore默认使用非公平锁。
原理:Semaphore将状态设置为n,acquire方法,n - 1,代表获取到锁、release方法, n + 1,代表释放锁
CountDownLatch 源码
CountDownLatch的内部是一个共享锁,同样的,它的内部定义了同步器,但是没有根据同步器扩展出非公平锁、公平锁
public class CountDownLatch {
// 同步器,继承了AQS
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 构造方法
Sync(int count) {
setState(count); // 调用了AQS中的setState方法
}
}
// 同步器实例
private final Sync sync;
// 构造方法,参数count,指定了调用countDown方法的次数,调用够指定次数的countDown方法后,
// await方法才会结束阻塞
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 等待,直到倒计时锁内部值为0
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 倒计时锁减1,如果减1后值为0,释放等待的线程
public void countDown() {
sync.releaseShared(1);
}
}
和Semaphore类似,将state设置为n,调用countDown方法, n - 1,调用await方法,判断 n 是否等于 0 ,如果是,取消阻塞
Q&A
读锁、写锁,公平锁、非公平锁,是怎么配合在一起工作的?
读锁、写锁,公平锁、非公平锁,是从不同的角度描写了一个锁的特性,读锁可以是公平锁、也可以是非公平锁,它们是一个实体的两个属性
线程什么时候从阻塞队列中移出?
获取到锁之后,所以线程释放锁资源时只需要唤醒队列中的第一个节点即可
锁超时怎么实现?tryLock方法
进入阻塞状态时加上超时时长。
源码:
tryAcquireNanos:尝试获取锁,在用户指定的时长过后,如果没有获取锁,结束阻塞,是ReentrantLock中tryLock等方法的基础
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判断打断状态
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || // 尝试获取锁
doAcquireNanos(arg, nanosTimeout); // 获取锁,或者阻塞
}
// doAcquireNanos:真正执行获取锁或阻塞的逻辑
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 结束阻塞的时间,当前时间加上用户传入的时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) { // 自旋
final Node p = node.predecessor();
// 如果上一个节点是头节点,尝试获取锁,成功后直接返回,不进入阻塞
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算出应该阻塞的时长
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
// 判断是否应该阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 这是一项优化,如果时长大于自旋阈值才进行阻塞,否则进行自旋,
// 因为如果阻塞时间特别短,相较于自旋,阻塞比较耗费性能
nanosTimeout > spinForTimeoutThreshold)
// 阻塞
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可打断怎么实现?tryInterruptibly方法
AQS中会判断当前线程是否调用了interrupt方法,可打断的情况下,如果调用了interrupt方法,抛异常,同时用户需要处理这个异常,不可打断的情况下,也就是通过lock()方法获取锁时,即使检测到interrupt方法被调用,也会继续向下执行。