JAVA并发编程高级--读写锁 ReentrantReadWriteLock 的原理
解决线程安全问题使用 ReentrantLock 就可以,但是 ReentrantLock 是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然 ReentrantLock 满足不了这个需求,所以 ReentrantReadWriteLock 应运而生。ReentrantReadWriteLock 采用读写分离的策略,允许多个线程可以同时获取读锁。
类图结构
为了了解 ReentrantReadWriteLock 的内部构造,我们先看下它的类图结构,如下图所示。
读写锁的内部维护了一个 ReadLock 和一个 WriteLock,它们依赖 Sync 实现具体功能。而 Sync 继承自 AQS,并且也提供了公平和非公平的实现。下面只介绍非公平的读锁实现。我们知道 AQS 中只维护了一个 state 状态,而 ReentrantReadWriteLock 则需要维护读状态和写状态,一个 state 怎么表示写和读两种状态呢?ReentrantReadWriteLock 巧妙地使用 state 的高 16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示获取到写锁的线程的可重入次数。
static final int SHARED_SHIFT = 16;
// 共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享锁线程最大个数65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 排它锁(写锁)掩码,二进制,15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回读锁线程数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
其中 firstReader 用来记录第一个获取到读锁的线程,firstReaderHoldCount 则记录第一个获取到读锁的线程获取读锁的可重入次数。cachedHoldCounter 用来记录最后一个获取读锁的线程获取读锁的可重入次数。
static final class HoldCounter {
int count = 0;
// 线程id
final long tid = getThreadId(Thread.currentThread());
}
readHolds 是 ThreadLocal 变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。ThreadLocalHoldCounter 继承了 ThreadLocal,因而 initialValue 方法返回一个 HoldCounter 对象。
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
写锁的获取与释放
在 ReentrantReadWriteLock 中写锁使用 WriteLock 来实现。
void lock()
写锁是个独占锁,某时只有一个线程可以获取该锁。如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。如果当前已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。另外,写锁是可重入锁,如果当前线程已经获取了该锁,再次获取只是简单地把可重入次数加 1 后直接返回。
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// sync重写的tryAcquire方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如以上代码所示,在 lock() 内部调用了 AQS 的 acquire 方法,其中 tryAcquire 是 ReentrantReadWriteLock 内部的 sync 类重写的,代码如下
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// (1) c!=0说明该锁或者写锁已经被某线程获取
if (c != 0) {
// (2) w=0说明已经有线程获取了读锁,w!=0并且当前线程不是写锁拥有者,则返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
}
// (3) 说明当前线程获取了写锁,判断可重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// (4) 设置可重入次数(1)
setState(c + acquires);
return true;
}
在代码(1)中,如果当前 AQS 状态值不为 0 则说明当前已经有线程获取到了读锁或者写锁。在代码(2)中,如果 w=0 说明状态值的低 16 位为 0,而 AQS 状态值不为 0,则说明高 16 位不为 0,这暗示已经有线程获取了读锁,所以直接返回 false。
而如果 w!=0 则说明当前已经有线程获取了该写锁,再看当前线程是不是该锁的持有者,如果不是则返回 false。
执行到代码(3)说明当前线程之前已经获取到了该锁,所以判断该线程的可重入次数是不是超过了最大值,是则抛出异常,否则执行代码(4)增加当前线程的可重入次数,然后返回 true。
如果 AQS 的状态值等于 0 则说明目前没有线程获取到读锁和写锁,所以执行代码(5)。其中,对于 writerShouldBlock 方法,非公平锁的实现为
final boolean writerShouldBlock() {
return false; // writers can always barge
}
如果代码对于非公平锁来说总是返回 false,则说明代码(5)抢占式执行 CAS 尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回 true,否则返回 false。
公平锁的实现为
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
这里还是使用 hasQueuedPredecessors 来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限,直接返回 false。
void lockInterruptibly()
类似于 lock() 方法,它的不同之处在于,它会对中断进行响应,也就是当其他线程调用了该线程的 interrupt() 方法中断了当前线程时,当前线程会抛出异常 InterruptedException 异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
boolean tryLock()
尝试获取写锁,如果当前没有其他线程持有写锁或者读锁,则当前线程获取写锁会成功,然后返回 true。如果当前已经有其他线程持有写锁或者读锁则该方法直接返回 false,且当前线程不会被阻塞。如果当前线程已经持有了该写锁则简单增加 AQS 的状态值后直接返回 true。其他代码类似 tryLock 的代码,这里不再讲述。
public boolean tryLock() {
return sync.tryWriteLock();
}
boolean tryLock(long timeout, TimeUnit unit)
与 tryAcquire () 的不同之处在于,多了超时时间参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回 false。另外,该方法对中断响应,也就是当其他线程调用了该线程的 interrupt() 方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
void unlock()
尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的 AQS 状态值减 1,如果减去 1 后当前状态值为 0 则当前线程会释放该锁,否则仅仅减 1 而已。如果当前线程没有持有该锁而调用了该方法则会抛出 IllegalMonitorStateException 异常,代码如下。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 调用ReentrantReadWriteLock中sync实现的tryRelease方法
if (tryRelease(arg)) {
// 激活阻塞队列里面的一个线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// (6) 看是否是写锁拥有者调用的unlock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// (7) 获取可重入值,这里没有考虑高16位,因为获取写锁时读锁状态值肯定为0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// (8) 如果写锁可重入值为0则释放锁,否则只是简单地更新状态值
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
在如上代码中,tryRelease 首先通过 isHeldExclusively 判断是否当前线程是该写锁的持有者,如果不是则抛出异常,否则执行代码(7),这说明当前线程持有写锁,持有写锁说明状态值的高 16 位为 0,所以这里 nextc 值就是当前线程写锁的剩余可重入次数。代码(8)判断当前可重入次数是否为 0,如果 free 为 true 则说明可重入次数为 0,所以当前线程会释放写锁,将当前锁的持有者设置为 null。如果 free 为 false 则简单地更新可重入次数。
读锁的获取与释放
ReentrantReadWriteLock 中的读锁是使用 ReadLock 来实现的。
void lock()
获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS 的状态 state 的高 16 位的值会增加 1,然后方法返回。否则如果其他一个线程持有写锁,则当前线程会被阻塞。
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// 调用ReentrantReadWriteLock中的sync的tryAcquireShared方法
if (tryAcquireShared(arg) < 0)
// 调用AQS的doAcquireShared方法把当前线程放入AQS阻塞队列。
doAcquireShared(arg);
}
在如上代码中,读锁的 lock 方法调用了 AQS 的 acquireShared 方法,在其内部调用了 ReentrantReadWriteLock 中的 sync 重写的 tryAcquireShared 方法,代码如下。
protected final int tryAcquireShared(int unused) {
// (1)获取当前状态值
Thread current = Thread.currentThread();
int c = getState();
// (2)判断是否写锁被占用
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// (3) 获取读锁计数
int r = sharedCount(c);
// (4) 尝试获取锁,多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// (5) 第一个线程获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// (6) 如果当前线程是第一个获取读锁的线程
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// (7) 记录最后一个获取读锁的线程或记录其他线程读锁的可重入人数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// (8) 类似tryAcquireShared,但是是自旋获取
return fullyTryAcquireShared(current);
}
如上代码首先获取了当前 AQS 的状态值,然后代码(2)查看是否有其他线程获取到了写锁,如果是则直接返回 -1,而后调用 AQS 的 doAcquireShared 方法把当前线程放入 AQS 阻塞队列。
如果当前要获取读锁的线程已经持有了写锁,则也可以获取读锁。但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁。
否则执行代码(3),得到获取到的读锁的个数,到这里说明目前没有线程获取到写锁,但是可能有线程持有读锁,然后执行代码(4)。其中非公平锁的readerShouldBlock实现代码如下。
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
如上代码的作用是,如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否达到了最大值。最后执行CAS 操作将 AOS状态值的高16位值增加1。
代码(5)(6)记录第一个获取读锁的线程并统计该线程获取读锁的可重入数。代码(7)使用cachedHoldCounter 记录最后一个获取到读锁的线程和该线程获取读锁的可重入数,readHolds记录了当前线程获取读锁的可重入数。
如果readerShouldBlock返回true则说明有线程正在获取写锁,所以执行代码(8)。fullTryAcquireShared 的代码与tryAcquireShared 类似,它们的不同之处在于,前者通过循环自旋获取。
final int fullyTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
if (rh.count == 0)
return -1;
}
return 1;
}
}
}
void lockInterruptibly()
类似于 lock() 方法,不同之处在于,该方法会对中断进行响应,也就是当其他线程调用了该线程的 interrupt() 方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
boolean tryLock()
尝试获取读锁,如果当前没有其他线程持有写锁,则当前线程获取读锁会成功,然后返回 true。如果当前已经有其他线程持有写锁则该方法直接返回 false,但当前线程并不会被阻塞。如果当前线程已经持有了该读锁则简单增加 AQS 的状态值高 16 位后直接返回 true。其他代码类似 tryLock 的代码,这里不再讲述。
public boolean tryLock() {
return sync.tryReadLock();
}
boolean tryLock(long timeout, TimeUnit unit)
与 tryLock () 的不同之处在于,多了超时时间参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到读锁则返回 false。另外,该方法对中断响应,也就是当其他线程调用了该线程的 interrupt() 方法中断了当前线程时,当前线程会抛出 InterruptedException 异常。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
void unlock()
public void unlock() {
sync.releaseShared(1);
}
如上代码具体释放锁的操作是委托给 Sync 类来做的,sync.releaseShared 方法的代码如下。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中 tryReleaseShared 的代码如下,
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 循环直到自己的读计数-1,CAS更新成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如以上代码所示,在无限循环里面,首先获取当前AQS状态值并将其保存到变量c,然后变量c被减去一个读计数单位后使用CAS操作更新AQS状态值,如果更新成功则查看当前AOS状态值是否为0,为0则说明当前已经没有读线程占用读锁,则tryReleaseShared 返回 true。然后会调用 doReleaseShared 方法释放一个由于获取写锁而被阻塞的线程,如果当前AOS状态值不为0,则说明当前还有其他线程持有了读锁,所以tryReleaseShared 返回 false。如果 tryRelcaseShared 中的 CAS 更新 AQS 状态值失败,则自旋重试直到成功。
案例介绍
上节介绍了如何使用 ReentrantLock 实现线程安全的 list,但是由于 ReentrantLock 是独占锁,所以在读多写少的情况下性能很差。下面使用 ReentrantReadWriteLock 来改造它,代码如下。
public static class ReentrantLockList {
// 线程不安全的list
private ArrayList<String> array = new ArrayList<String>();
// 独占锁
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
// 添加元素
public void add(String e) {
writeLock.lock();
try {
array.add(e);
} finally {
writeLock.unlock();
}
}
// 删除元素
public void remove(String e) {
writeLock.lock();
try {
array.remove(e);
} finally {
writeLock.unlock();
}
}
// 获取数据
public String get(int index) {
readLock.lock();
try {
return array.get(index);
} finally {
readLock.unlock();
}
}
}
以上代码调用 get 方法时使用的是读锁,这样运行多个读线程来同时访问 list 的元素,这在读多写少的情况下性能会更好。
小结
本节介绍了读写锁 ReentrantReadWriteLock 的原理,它的底层是使用 AQS 实现的。ReentrantReadWriteLock 巧妙地使用 AQS 的状态的高 16 位表示获取到读锁的个数,低 16 位表示获取写锁的线程的可重入次数,并通过 CAS 对其进行操作实现了读写分离,这在读多写少的场景下比较适用。