当前位置: 首页 > news >正文

多线程-JUC源码

简介

JUC的核心是AQS,大部分锁都是基于AQS扩展出来的,这里先结合可重入锁和AQS,做一个讲解,其它的锁的实现方式也几乎类似

ReentrantLock和AQS

AQS的基本结构

AQS,AbstractQueuedSynchronizer,抽象队列同步器,JUC中的基础组件,基于AQS,JUC实现了多种锁和同步工具。

AQS在设计模式是采用了模板方法设计模式,要想基于AQS实现一个同步工具,需要继承AQS,同时实现所有protected权限的方法,这些方法定义了如何获取锁(独占锁、共享锁),AQS负责整体流程的编排,同时维护阻塞队列、线程的阻塞和唤醒。

AQS用于实现依赖单个原子值去表示状态的同步器

1、AQS的继承体系:

// AQS:AQS继承了AbstractOwnableSynchronizer,并且AQS本身是一个抽象类
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {  }

// AQS的父类,AbstractOwnableSynchronizer,只有一个成员变量,用于存放持有独占锁的线程。
// 一个线程可以独占的同步器。本类为创建可能包含所有权概念的锁和相关同步器提供了基础
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    protected AbstractOwnableSynchronizer() { }

    // 持有排他锁的线程
    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

2、AQS的体系结构

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    // protected权限的构造方法,只允许子类继承和调用
    protected AbstractQueuedSynchronizer() { }
    
    // 定义了队列中的节点,节点中封装了线程对象和指向前后节点的指针。AQS中没有队列实例,
    // 而是通过包装线程为Node类,用前、后节点指针来实现一个虚拟的双向队列。
    static final class Node {
        /* 两个常量,定义了锁的模式 */
        // 锁是共享模式
        static final Node SHARED = new Node();
        // 锁是独占模式
        static final Node EXCLUSIVE = null;
        Node nextWaiter;  // 锁的模式,为null,独占模式,SHARED,共享模式;同时,Condition使用它构建条件队列

        // 节点间组成一个双向链表
        volatile Node prev;
        volatile Node next;
        // 节点中封装的线程
        volatile Thread thread;
    }

    // 双向链表的头结点
    private transient volatile Node head;
    // 双向链表的尾结点
    private transient volatile Node tail;

    // 锁的状态,这个状态由子类来操作
    private volatile int state;
    
    /* 模板方法:交给子类实现的方法 */
    // 尝试获取独占锁
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放独占锁
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试获取共享锁
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放共享锁
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 判断当前锁是否是独占模式
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

    /* 父类中负责流程编排 */
    // 获取独占锁,先调用交给子类实现的tryAcquire方法,如果获取锁失败,进入队列,然后等待
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    // 释放独占锁,先调用交给子类实现的tryRelease方法,如果释放成功,当前节点移出队列,然后唤醒队列中的第一个节点
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    // 获取共享锁
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    // 释放共享锁
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
}

从AQS的基本结构中可以看到:

  • AQS的队列,实际是一个双向链表。
  • AQS在设计上使用了模板方法模式,父类中定义好流程,然后再定义好交给子类实现的模板方法。在当前案例中,交给子类实现的是获取锁、释放锁的方式,父类来维护阻塞队列、线程的阻塞和唤醒。通过这种方式,子类指定锁的获取和释放的方式,子类可以实现多种类型的锁,比如公平锁、非公平锁、读写锁、并发度控制(信号量)等。

AQS中提供了两套模式:独占模式和共享模式

  • 独占模式:exclusive, 同一时间只有一个线程能拿到锁执行
  • 共享模式:shared, 同一时间有多个线程可以拿到锁协同工作

子类可以选择独占模式、也可以选择共享模式,例如,读写锁中,读锁就是共享模式,写锁就是独占模式,可重入锁也是独占模式。

ReentrantLock的基本结构

1、ReentrantLock的继承体系:ReentrantLock实现了Lock接口,并且支持序列化

public class ReentrantLock implements Lock, java.io.Serializable {

Lock接口:定义了一个锁应该具备的功能

public interface Lock {
    /* 获取锁 */
  
    // 获取锁
    void lock();
    // 可打断地获取锁
    void lockInterruptibly() throws InterruptedException;
    // 不阻塞地获取锁,如果获取不到,立刻返回
    boolean tryLock();
    // 获取锁,指定超时时长
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    // 释放锁
    void unlock();

    // 获取条件变量,条件变量是指调用wait方法、notify方法的锁对象,ReentrantLock可以实现在多个条件变量上等待和唤醒
    Condition newCondition();
}

2、ReentrantLock的类结构

public class ReentrantLock implements Lock, java.io.Serializable {
    // 同步器:抽象静态内部类,定义了获取锁、释放锁的功能。同步器继承了AQS,AQS是juc的核心。
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        // 获取锁的功能,交给子类扩展
        abstract void lock();
        // 释放锁
        protected final boolean tryRelease(int releases) {  }  // 这里暂时不关心它的实现
    }
  
    // 同步器的实例
    private final Sync sync;
  
    // 非公平锁
    static final class NonfairSync extends Sync {    }
    // 公平锁
    static final class FairSync extends Sync { }

    // 构造方法
    public ReentrantLock() {
        sync = new NonfairSync();  // 默认使用非公平锁
    }

    // 获取锁
    public void lock() {
        sync.lock();
    }
    // 释放锁
    public void unlock() {
        sync.release(1);
    }
}

总结:

  • ReentrantLock内部定义了一个同步器,并且它的所有功能实际上都是委托给同步器来完成。
  • 同步器把获取锁的方法定义为抽象方法,同时实现了释放锁的方法,在同步器的基础上,扩展出了两个子类,公平锁、非公平锁,公平锁和非公平锁只是获取锁的方式不同,其它都相同。ReentrantLock默认使用非公平锁
  • 同步器继承了AQS

ReentrantLock是如何获取锁的?

ReentrantLock内部实现了公平锁和非公平锁两种获取锁的模式,默认使用非公平锁,这里分别讲解它们的工作机制。

非公平锁是如何获取锁的?

源码:

1、整体流程

// 这是同步器的子类NonfairSync中获取锁的方法
final void lock() {
    // 获取锁:尝试使用cas算法改变状态变量,把它的值从0改为1,改变成功,表示获取到锁
    if (compareAndSetState(0, 1))
        // 获取到锁后,把当前线程设置为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 没有获取到锁:调用AQS中的方法,进入阻塞队列
        acquire(1);
}

这个方法中调用的方法都是来自AQS。从方法的实现步骤来看,非公平锁在获取锁时,先尝试直接获取锁,获取不到,再进入阻塞队列,符合之前提到的非公平锁的原理。

2、AQS中acquire方法:它在AQS中定义了获取独占锁的流程

// 整体流程
public final void acquire(int arg) {
    // 尝试获取锁:首先调用交给子类实现的的tryAcquire方法,尝试获取锁,如果获取成功,直接进入同步代码块,
    if (!tryAcquire(arg) &&  
        // 如果没有获取到锁,进入阻塞队列:如果获取锁失败,线程进入队列,阻塞,入队之前还会尝试再次获取锁,
        // 成功则不入队
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      
        // 为线程打上中断状态,因为acquireQueued方法中会清除线程的中断标识,所以这里需要重新为线程打上中断标识
        selfInterrupt();
}

2.1、交给子类实现的tryAcquire方法:尝试获取锁

// 定义在NonfairSync中
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// 定义在Sync中
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 表示无锁
        // 尝试获取锁,获取锁就是通过cas算法改变state变量的状态,改变成功,就是获取到锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {  // 如果获取锁的线程是当前线程,重入
        int nextc = c + acquires; // 状态加1
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);  // 更新状态
        return true;
    }
    return false;
}

2.2、获取锁失败后进入阻塞队列的逻辑:

// 1、addWaiter方法:创建节点并且把节点加入到队列中
private Node addWaiter(Node mode) {  // 这里的参数mode表示节点的模式,这里是独占模式
    // 将当前线程封装成一个node节点
    Node node = new Node(Thread.currentThread(), mode);

    /* 下面是操作双向链表的代码,将新节点加入到队列的尾部,队列中的头结点是一个虚拟节点 */
    Node pred = tail;
    if (pred != null) { // 如果当前队列中有节点
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);  // 当前队列中没有节点,那么需要创建虚拟头结点,然后新节点入队
    return node;
}

// 节点入队的逻辑
private Node enq(final Node node) {
    for (;;) {// 自旋
        // 获取当前队列中的尾节点
        Node t = tail;
        if (t == null) { // Must initialize 
            // 初始化一个空节点 new Node(),作为队列中的哨兵节点
            if (compareAndSetHead(new Node())) // cas操作,设置头节点为空节点
                tail = head;// 初始化空节点后,因为此时队列中只有一个节点,所以head和tail都指向这一个节点
        } else {
            // 第二次循环,将节点挂载到空节点上
            node.prev = t;
            // 将用户传入的node节点设置为尾节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t; // 返回前一个节点
            }
        }
    }
}

2.3、线程进入阻塞的逻辑:

// 2、acquireQueued,线程进入阻塞
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 自旋
            // 获取当前节点的上一个节点
            final Node p = node.predecessor();
            // 判断前一个节点是不是头节点(队列中使用一个虚拟节点作为头结点,所以当前节点的前一个节点是头结点,
            // 那么当前节点就是事实上的头结点),如果是,尝试抢占锁资源
            if (p == head && tryAcquire(arg)) {
                // 抢到锁资源后,这里可以理解为当前节点出队,将当前节点设置为队列的虚拟头结点,
                // 所以线程获取到锁之后,就会被移出阻塞队列
                setHead(node);
                p.next = null; // help GC,将原先的头结点清除
                failed = false;
                return interrupted;
            }
            // 抢锁失败
            if (shouldParkAfterFailedAcquire(p, node) &&  // 判断是否应该阻塞当前线程
                parkAndCheckInterrupt()) // 调用LockSupport中的park方法,阻塞当前线程
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);  // 发生异常,当前节点出队
    }
}

// 判断是否应该阻塞当前线程:shouldParkAfterFailedAcquire
// 第一个参数是当前节点的上一个节点,第二个参数是当前节点,
// 总结:如果上一个节点的状态是SIGNAL,那么当前节点应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 这个方法中需要判断节点的状态。表示状态的常量:
    // static final int CANCELLED =  1;:表示当前节点的线程已经被取消
    // static final int SIGNAL    = -1;:表示当前节点的后继节点的线程正在等待唤醒
    // static final int CONDITION = -2;:节点在条件队列中,节点线程等待在Condition上,
                   // 不过当其他的线程对Condition调用了signal()方法后,该节点就会从等待队列
                   // 转移到同步队列中,然后开始尝试对同步状态的获取
    // static final int PROPAGATE = -3;:表示下一个共享状态应该被无条件传播

    int ws = pred.waitStatus;
    // 判断上一个节点的状态
    if (ws == Node.SIGNAL)
        return true;  // 如果前驱节点状态为SIGNAL,当前节点需要被阻塞
    if (ws > 0) { // 如果前驱节点的状态是CANCELLED,需要把CANCELLED状态的节点移除出队列
        do {
            node.prev = pred = pred.prev; // 前一个节点前移,同时当前节点指向前前一个节点
        } while (pred.waitStatus > 0);
        pred.next = node;  // 将当前节点连接到新的前驱节点
    } else {
        // 将前驱节点状态设置为SIGNAL,当下一次执行这个方法时,
        // 因为ws == SIGNAL状态成立,所以下一次会执行返回true
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  
    }
    return false;
}

// parkAndCheckInterrupt:阻塞当前线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();  // 这个方法会清除中断标记
}

2.4、发生异常,节点出队的方法:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)  // 如果前一个节点的状态是CANCELLED,需要移除前一个节点,重新构建队列
        node.prev = pred = pred.prev;  // 前一个节点前移,同时当前节点指向前前一个节点
    Node predNext = pred.next;  // 循环过后前面的节点的下一个节点

    node.waitStatus = Node.CANCELLED;

    // 如果当前节点是尾结点,将尾结点更新为前一个节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 同时前一个节点的next指针指向null
        compareAndSetNext(pred, predNext, null);
    } else {
        // 如果后序节点需要被唤醒
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
          
            // 进入if分支,证明:前驱节点不是头节点、前驱节点的状态是SIGNAL或可以被设置为SIGNAL、
            // 前驱节点仍然有线程关联,移除当前节点即可
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);  // 从队列中移除当前节点
        } else {
            // 进入else分支,证明当前节点的前驱节点无法正常处理后续节点的唤醒逻辑,因此在这里唤醒后续节点
            unparkSuccessor(node);  // 唤醒后续节点
        }

        node.next = node; // help GC
    }
}

// 唤醒后续节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 遍历,从尾结点开始,找到当前节点之后第一个状态不为CANCELLED的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);  // 唤醒该节点
}

总结:用一张流程图来总结代码中的流程

1、lock方法:尝试获取锁的流程

在这里插入图片描述

2、acquire方法的流程

在这里插入图片描述

公平锁是如何获取锁的?

源码:

// FairSync:这是同步器的子类FairSync中获取锁的方法
final void lock() {
    // 直接进入阻塞队列,也符合公平锁的原理,acquire中会调用tryAcquire方法,
    // 公平锁和非公平锁都分别实现了这个方法,定制自己获取锁的逻辑
    acquire(1);  
}

// FairSync 公平锁中获取锁的流程
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {  // 如果无锁
        if (!hasQueuedPredecessors() && // 等待队列中没有节点
            compareAndSetState(0, acquires)) {  // 获取锁成功
            setExclusiveOwnerThread(current); // 设置当前线程为拥有独占锁的线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {  // 锁的可重入逻辑
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁不像非公平锁那样,先尝试获取锁,而是按照流程,判断阻塞队列中有没有节点,如果有,进入阻塞队列

ReentrantLock是如何释放锁的?

公平锁和非公平锁释放锁的原理是一样的,这里统一讲解。

调用AQS中的release方法,来完成释放锁的逻辑

public final boolean release(int arg) {
    // tryRelease,交给子类实现的模板方法,执行释放锁的逻辑,如果返回true,表示锁释放成功
    if (tryRelease(arg)) {
        // 处理等待队列,唤醒后继节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒后继节点,这里是唤醒队列中的第一个节点
            unparkSuccessor(h);  // 这个方法在前面有提到
        return true;
    }
    return false;
}

// ReentrantLock中释放锁的逻辑,同样的,改变状态,将独占锁标识改为null
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;  // 改变状态,这里包含可重入的逻辑
    if (Thread.currentThread() != getExclusiveOwnerThread())  // 判断,如果当前线程没有持有锁
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 确认是释放锁,将独占锁的标识置为null,此前它是当前线程
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);  // 设置状态
    return free;
}

Condition 源码

1、ReentrantLock中获取Condition的方法:

// ReentrantLock中的方法:
public Condition newCondition() {
    return sync.newCondition();
}
// 同步器中的方法:
final ConditionObject newCondition() {
    return new ConditionObject();
}

可以看到,最终是获取了一个ConditionObject的实例

2、ConditionObject:它被定义在AQS中,实现了Condition接口。

Condition接口:条件对象,条件对象从Object类中提取出了wait、notify、notifyAll方法的功能,使得一个锁对象上可以支持多个等待队列。

public interface Condition {
    // 使当前线程进入等待状态,直到被唤醒或调用interrupt方法
    void await() throws InterruptedException;
    // 等待,直到被唤醒
    void awaitUninterruptibly();
    // 等待,并且指定时长
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 唤醒单个线程
    void signal();
    // 唤醒全部线程
    void signalAll();
}

2、ConditionObject的整体结构:它是AQS的成员内部类

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /* 维护一个等待队列 */
    // 头节点
    private transient Node firstWaiter;
    // 尾节点
    private transient Node lastWaiter;
}

3、阻塞逻辑的实现:

// await方法:线程入队,然后调用park方法,进入等待状态
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
  
    // 向条件队列添加一个节点
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);  // 完全释放当前线程持有的锁
    int interruptMode = 0;
    // 判断当前节点是否在同步队列
    while (!isOnSyncQueue(node)) {
        // 如果不在,进入阻塞状态
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
  
    // 唤醒后,没有进入上面的while,证明节点在同步队列中,尝试重新获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)  // 判断节点是否仍在条件队列中
        unlinkCancelledWaiters();  // 清理队列中所有被取消的节点,确保队列的正确性
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);   // 处理中断逻辑,被外部中断后,是抛异常还是继续进行
}

// 清理队列中失效的节点:这里实际上是处理单向链表的逻辑,等待队列中,使用nextWaiter指向下一个节点
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {  // 从头结点开始遍历
        Node next = t.nextWaiter;  // 下一个节点
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;  // 用作下一次循环时记录前一个节点
        t = next;  // 头结点后移
    }
}

3.1、添加线程到条件队列,条件队列是一个单向链表

private Node addConditionWaiter() {
    Node t = lastWaiter;
    
    // 这里先判断尾结点的状态,如果它不是CONDITION状态
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();  // 清理队列中失效的节点
        t = lastWaiter;
    }
    // 节点入队
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

3.2、释放锁的逻辑

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) { // 释放锁
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED; // 如果释放失败,将节点状态改为CANCELLED
    }
}

4、唤醒逻辑的实现:

// signal方法:唤醒线程
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);  // 唤醒队列中的第一个节点,方法中会调用LockSupport.unpark方法
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)  // 头结点后移
            lastWaiter = null;  // 如果等待队列中只有一个节点,尾结点置为空
        first.nextWaiter = null;  // 原先的头结点出队
    } while (!transferForSignal(first) &&  // 唤醒头结点所在的线程,如果唤醒成功,退出循环
             (first = firstWaiter) != null);
}

// 唤醒头结点所在的线程
final boolean transferForSignal(Node node) {
    // 状态转换,由CONDITION变为初始状态
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    
    Node p = enq(node);  // 节点加入同步队列,这里返回的是当前节点的前一个节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))  // 如果前一个节点的状态无法被转换为SIGNAL
        LockSupport.unpark(node.thread);  // 手动唤醒当前节点
    return true;
}

总结:Condition的工作机制

  • Condition的实现类ConditionObject,是AQS的内部类,所以它会持有AQS的引用,在当前案例中,AQS是通过子类ReentrantLock实例化的,所以Condition实际上持有ReentrantLock的实例。成员内部类的实例依赖于外部类的实例,通过ReentrantLock创建ConditionObject的实例。

在这里插入图片描述

  • Condition内部会维护一个自己的等待队列,它又持有ReentrantLock的实例,所以它可以操作ReentrantLock的同步队列
  • 用户通过Condition实例调用await方法,condition会把用户线程加入到自己的条件队列中,然后阻塞
  • 用户通过Condition实例调用signal方法,condition会把用户线程从自己的条件队列中移除,然后加入到ReentrantLock的同步队列中,然后唤醒用户线程
  • 用户线程被唤醒后,判断自己是不是在同步队列中,如果在,抢锁,如果不在,继续阻塞。抢到锁之后,会额外判断,如果当前线程还在条件队列中,会清理条件队列中失效的节点

总结

这里介绍了可重入锁和AQS是如何配合在一起工作的,它们的设计模式,哪些功能被定义在可重入锁中,哪些功能被定义在AQS中,后面介绍的几个工具也是这么实现的,模式基本相同。

ReentrantReadWriteLock 源码

读写锁,读锁是共享锁,写锁是独占锁,和之前的ReentrantLock类似的一点,ReentrantLock本身可以被理解为是一个写锁

ReentrantReadWriteLock的基本结构

1、ReentrantReadWriteLock的继承体系:ReentrantReadWriteLock实现了读写锁(ReadWriteLock)接口,并且支持序列化

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {

ReadWriteLock接口:定义了读锁和写锁

public interface ReadWriteLock {
    // 读锁
    Lock readLock();
    // 写锁
    Lock writeLock();
}

2、ReentrantReadWriteLock的类结构

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {

    // 同步器
    abstract static class Sync extends AbstractQueuedSynchronizer {
        /* 同步器中定义的抽象方法 */
        // 判断读是否应该阻塞
        abstract boolean readerShouldBlock();
        // 判断写是否应该阻塞
        abstract boolean writerShouldBlock();
    }

    // 非公平锁
    static final class NonfairSync extends Sync {
        final boolean writerShouldBlock() {
            return false; // 非公平锁,写锁可以竞争
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    // 公平锁
    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
  
    // 读锁:内部操作的是共享锁
    public static class ReadLock implements Lock, java.io.Serializable {
        // 同步器,锁的内部,所有的功能都是委托同步器实现的
        private final Sync sync;

        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

        public void lock() {  // 获取锁
            sync.acquireShared(1);
        }
        public void unlock() { // 释放锁
            sync.releaseShared(1);
        }
    }

    // 写锁,内部操作的是独占锁
    public static class WriteLock implements Lock, java.io.Serializable {
        // 同步器
        private final Sync sync;
      
        public void lock() {
            sync.acquire(1);
        }
        public void unlock() {
            sync.release(1);
        }
    }
  
    // 构造方法,读锁和写锁默认都是非公平锁
    public ReentrantReadWriteLock() {
        this(false);
    }
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
}

ReentrantReadWriteLock和ReentrantLock类似,只是它的内部扩展出了读锁和写锁,读锁和写锁依赖的是同一个同步器,读锁是共享锁,写锁是排它锁。

读写锁内置的同步器

读写锁需要在读锁和写锁之间同步,这些功能都依赖同步器

同步器的结构:

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;

		/* 锁的状态:用state的高16位作为读锁的数量,低16位表示写锁的数量 */
    
    static final int SHARED_SHIFT   = 16;  // 计算常量
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    // 读锁的次数,状态字段无符号右位移16位
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 写锁的次数,状态字段取后16位
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

		// 静态内部类:封装了线程id和该线程持有读锁的次数
    static final class HoldCounter {
        int count = 0;
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());  // 线程id,在创建实例时初始化
    }
    private transient HoldCounter cachedHoldCounter;

    // 静态内部类:继承了ThreadLocal,用于记录每个线程持有读锁的次数
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    private transient ThreadLocalHoldCounter readHolds;
    
    // 第一个持有读锁的线程
    private transient Thread firstReader = null;
    // 第一个持有读锁的线程持有几次读锁,这里是可重入锁的逻辑
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // ensures visibility of readHolds
    }
}

获取读锁的逻辑

读锁直接调用了AQS中获取共享锁的逻辑,并且在某些步骤做了自己的定制,这就是模板方法设计模式。

源码:

1、基本步骤

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)  // 尝试获取共享锁
        doAcquireShared(arg);    // 获取失败,进入阻塞队列
}

2、尝试获取共享锁

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    // 获取 state 的值
    int c = getState();
    // 判断是否有线程获取了写锁并且不是当前线程,exclusiveCount(c),计算写锁的数量
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;  // 如果有线程获取了写锁,直接返回

    // 计算读锁的数量
    int r = sharedCount(c);
    
    if (!readerShouldBlock() &&  // 读是否应阻塞,这个实现在子类中
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {  // cas算法修改状态,修改成功,表示获取到读锁
        /* 下面这一大段逻辑都是用来记录当前线程获取了几次共享锁 */
        if (r == 0) {
            // 表示是第一个获取读锁的线程
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 如果是当前获取读锁的线程重入,再次获取锁
            firstReaderHoldCount++;
        } else {
            // 如果不是第一个获取读锁的线程来获取读锁,使用cachedHoldCounter和readHolds。
            // cachedHoldCounter记录线程id和该线程对于读锁的持有次数,readHolds将该数据
            // 存储到ThreadLocal中
            HoldCounter rh = cachedHoldCounter;
            // 这个if else是处理readHolds和cachedHoldCounter之间的关系
            if (rh == null || rh.tid != getThreadId(current)){
                // 这里的get方法,会调用初始化数据的方法initialValue() // 参考ThreadLocal
                cachedHoldCounter = rh = readHolds.get();
            } else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;  // 线程id在创建实例时初始化,这里记录该线程对于读锁的持有次数
        }
        return 1;  // 表示获取锁成功
    }
    // 需要被阻塞获取读锁失败,那么需要进入下面完整版的获取锁的过程
    return fullTryAcquireShared(current);
}

// 读是否需要被阻塞:这个逻辑定义在同步器中。具体实现是,如果队列中的第一个节点是一个独占模式,读线程需要被阻塞,否则不需要。
// 因为即使当前没有写线程持有锁,队列中也可能会有写请求,并且队列中的写请求也应该被优先处理,在非公平锁中,读线
// 程和写线程的请求顺序可能不严格按照先进先出处理,但写线程的请求仍然需要被优先处理。
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&  // 队列中的头结点是独占模式
        s.thread != null;
}

获取锁失败后,需要调用完整版的获取锁的流程:

// 完整版的获取锁的过程:用于在快速路径失败后,提供一种更全面的尝试获取共享锁的机制。
// 它处理了快速路径未能处理的复杂情况,例如锁降级、读锁数量达到上限、线程是否需要阻塞等。
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {  // 自旋
        int c = getState();
        // 判断是否有线程获取了写锁并且不是当前线程
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;  // 如果有,直接返回
           
        } else if (readerShouldBlock()) {  // 判断读线程是否应该被阻塞
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();  // 移除当前线程持有锁的记录
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
      
        // 如果当前线程已经持有写锁,继续尝试获取读锁(锁降级),
        if (sharedCount(c) == MAX_COUNT)  // 检查读锁数量是否达到上限
            throw new Error("Maximum lock count exceeded");
        // 尝试获取共享锁
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

3、获取锁失败后,进入阻塞队列

private void doAcquireShared(int arg) {
    // 向队列尾部添加一个节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 如果当前节点是头结点,再次尝试获取锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取锁成功,头结点出队并且唤醒后续节点
                    setHeadAndPropagate(node, r); 
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);  // 处理异常情况
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);  // 将当前节点设置为一个虚拟头结点

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        // 如果下一个节点是共享模式或者null,唤醒后续节点
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

释放读锁的逻辑

整体流程:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 尝试释放共享锁
        doReleaseShared();  // 释放共享锁
        return true;
    }
    return false;
}

1、tryReleaseShared:负责更新状态

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 更新当前线程持有共享锁的次数
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    // 更新state变量
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

2、doReleaseShared:维护阻塞队列

private void doReleaseShared() {
    for (;;) {  // 无限循环,直到释放操作完成
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))  // 尝试将状态从SIGNAL改为0
                    continue; // 如果CAS失败,重新循环检查
                unparkSuccessor(h);  // 唤醒头节点的后继节点
            }
            // 如果头节点的状态为0,说明当前没有线程需要被唤醒,但需要确保释放操作能够继续传播。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            // 在每次循环结束时,检查头节点是否发生变化。如果头节点没有变化,说明当前的释放操作已经完成,可以退出循环。
            break;  
    }
}

获取和释放写锁的逻辑

和ReentrantLock类似,只是操作state字段的方式不同。

获取写锁的逻辑:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { // 如果锁已经被占用
        // (Note: if c != 0 and w == 0 then shared count != 0),如果c != 0 && w == 0,
        // 证明共享锁不为0,有线程持有共享锁
        if (w == 0 || current != getExclusiveOwnerThread())  // 不是当前线程获取的写锁
            return false;  // 退出
      
        // 如果当前线程已经持有写锁,检查是否超过最大次数
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 重入
        setState(c + acquires);
        return true;
    }
    // 如果锁未被占用(c == 0)
    if (writerShouldBlock() ||   // 如果写线程应该阻塞
        !compareAndSetState(c, c + acquires))  // 或者 CAS 更新状态失败
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

释放写锁的逻辑:

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

总结

读写锁使用同一个状态字段、同一个阻塞队列,内部基于同一个同步器,彼此之间互相影响,只是获取锁的方式不同。

Semaphore 源码

Semaphore底层是基于共享锁,也可以理解为读锁,加锁和释放锁,操作的都是读锁。它允许多个线程同时执行同步代码块,但是它又会限制线程数量,从而达到控制并发量的目的

Semaphore的基本结构:

public class Semaphore implements java.io.Serializable {
    // 同步器的实例
    private final Sync sync;

    // 同步器的定义
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        // 这里重点关注构造方法,它调用AQS中的setState方法,state字段表示锁的状态,如果是共享锁,
        // state的值0是到n,如果是排它锁,state的值是0到1,0表示无锁,1表示获取到锁。值为n,表示
        // 允许n个线程同时获取共享锁
        Sync(int permits) {
            setState(permits);
        } 
    }

    // 构造方法,参数premits代表许可证数量,也就是同时允许多少个线程进行并发操作
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);  // 默认使用非公平锁
    }

    // 非公平锁
    static final class NonfairSync extends Sync {}
    // 公平锁
    static final class FairSync extends Sync { }
  
    // Semaphore方法将获取锁和释放锁的功能都委托给同步器,同步器又调用了AQS中的方法,所以核心实现在AQS
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void release() {
        sync.releaseShared(1);
    }

}

和ReentrantLock类似,Semaphore内部定义了同步器Sync,同步器继承了AQS,基于同步器扩展出了公平锁、非公平锁,Semaphore默认使用非公平锁。

原理:Semaphore将状态设置为n,acquire方法,n - 1,代表获取到锁、release方法, n + 1,代表释放锁

CountDownLatch 源码

CountDownLatch的内部是一个共享锁,同样的,它的内部定义了同步器,但是没有根据同步器扩展出非公平锁、公平锁

public class CountDownLatch {
    // 同步器,继承了AQS
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        // 构造方法
        Sync(int count) {
            setState(count); // 调用了AQS中的setState方法
        }
    }
    
    // 同步器实例
    private final Sync sync;

    // 构造方法,参数count,指定了调用countDown方法的次数,调用够指定次数的countDown方法后,
    // await方法才会结束阻塞
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 等待,直到倒计时锁内部值为0
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 倒计时锁减1,如果减1后值为0,释放等待的线程
    public void countDown() {
        sync.releaseShared(1);
    }
}

和Semaphore类似,将state设置为n,调用countDown方法, n - 1,调用await方法,判断 n 是否等于 0 ,如果是,取消阻塞

Q&A

读锁、写锁,公平锁、非公平锁,是怎么配合在一起工作的?

读锁、写锁,公平锁、非公平锁,是从不同的角度描写了一个锁的特性,读锁可以是公平锁、也可以是非公平锁,它们是一个实体的两个属性

线程什么时候从阻塞队列中移出?

获取到锁之后,所以线程释放锁资源时只需要唤醒队列中的第一个节点即可

锁超时怎么实现?tryLock方法

进入阻塞状态时加上超时时长。

源码:

tryAcquireNanos:尝试获取锁,在用户指定的时长过后,如果没有获取锁,结束阻塞,是ReentrantLock中tryLock等方法的基础

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 判断打断状态
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||  // 尝试获取锁
        doAcquireNanos(arg, nanosTimeout);  // 获取锁,或者阻塞
}

// doAcquireNanos:真正执行获取锁或阻塞的逻辑
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 结束阻塞的时间,当前时间加上用户传入的时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) { // 自旋
            final Node p = node.predecessor();
            // 如果上一个节点是头节点,尝试获取锁,成功后直接返回,不进入阻塞
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 计算出应该阻塞的时长
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 判断是否应该阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 这是一项优化,如果时长大于自旋阈值才进行阻塞,否则进行自旋,
                // 因为如果阻塞时间特别短,相较于自旋,阻塞比较耗费性能
                nanosTimeout > spinForTimeoutThreshold)
              
                // 阻塞
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可打断怎么实现?tryInterruptibly方法

AQS中会判断当前线程是否调用了interrupt方法,可打断的情况下,如果调用了interrupt方法,抛异常,同时用户需要处理这个异常,不可打断的情况下,也就是通过lock()方法获取锁时,即使检测到interrupt方法被调用,也会继续向下执行。

相关文章:

  • 利用opencv_python(pdf2image、poppler)将pdf每页转为图片
  • 2025年夸克网盘自动签到程序,验证通过!
  • android App主题颜色动态更换
  • IO进程线程3
  • 【AD】5-3 PCB板框的内缩与外扩
  • OpenBMC:BmcWeb app获取socket
  • 嵌入式 ARM Linux 系统构成(1):Bootloader层
  • oracle通过dmp导入数据
  • PHP之运算符
  • python-串口助手(OV7670图传)
  • 文献分享: ConstBERT固定数目向量编码文档
  • java 查找连个 集合的交集部分数据
  • 生命周期总结(uni-app、vue2、vue3生命周期讲解)
  • Linux总结
  • 进程间通信
  • 【单片机】嵌入式系统设计流程
  • 【仿muduo库one thread one loop式并发服务器实现】
  • 美股回测:历史高频分钟数据的分享下载与策略解析20250305
  • 配置hosts
  • 【软考-架构】9.2、摘要-签名-PKI-访问控制-DOS-欺骗技术
  • 共建医学人工智能高地,上海卫健委与徐汇区将在这些方面合作
  • GDP逼近五千亿,向海图强,对接京津,沧州剑指沿海经济强市
  • 多家中小银行存款利率迈入“1时代”
  • 中美大幅下调超100%关税,印巴四日“战争”复盘|907编辑部
  • 上海能源科技发展有限公司原董事长李海瑜一审获刑13年
  • 外交部就习近平主席将出席中拉论坛第四届部长级会议开幕式介绍情况