当前位置: 首页 > news >正文

ReentrantLock源码和AQS

AQS

抽象队列同步器

方法成员变量
  • CANCELLED 表示线程的等待状态被取消(通常是因为超时或者被中断),并且不会再参与锁竞争。被标记为 CANCELLED 的节点不会被唤醒,它的前驱和后继节点需要跳过它。

  • SIGNAL: 表示当前节点的后继节点需要被唤醒。当一个节点(假设是 A)被阻塞时,A 会检查它的前驱节点 B 是否是 SIGNAL 状态:特征SIGNAL 状态的节点依赖于前驱节点的唤醒逻辑。

    • 如果是,则意味着当 B 释放锁时,A 需要被唤醒。

    • 如果不是,A 可能需要自己主动挂起或者进行状态调整。

  • CONDITION:表示节点当前在 Condition 队列中等待ConditionObject 机制允许线程调用 await() 进入等待状态,并等待 signal() 唤醒。只有当 Condition.signal() 被调用时,CONDITION 状态的节点才会被移动到 同步队列 进行锁竞争。

  • PROPAGATE :表示需要无条件地传播唤醒操作,通常用于 共享模式(如 ReentrantReadWriteLock 的读锁)。这个状态会使得释放锁的操作不仅唤醒下一个节点,而且会继续向后传播,确保所有等待线程都能有机会获得锁。多线程同步工具(如 CountDownLatch)可能会使用 PROPAGATE 进行信号传播。

  • nextWaiter:指向同一个 Condition 队列中的下一个等待线程。用于 ConditionObject:当线程调用 Condition.await() 进入等待状态时,它会被添加到 条件队列(condition queue),这些等待线程通过 nextWaiter 进行链接。当 Condition.signal() 被调用时,一个线程会从 condition queue 移动到 同步队列(sync queue) 继续争夺锁。

public abstract class AbstractQueueSynchronizer{
    static final class Node{ //包装的线程节点

        static final Node SHARED = new Node();
        static final Node EXCELUSIVE = null;
        
        static final int CANCELLED = 1; 
        static final int SIGNAL = -1; 
        static final int CONDITTION = -2;
        static final int PROPAGATE = -3;
		
        volatile int waitStatus; // 上面的值的其中一个。 0初始状态,表示线程没有特殊状态。
        
        volatile Node pre; // 当前节点的前驱 同步阻塞队列
        volatile Node next; // 当前节点的后继 同步阻塞队列
        volatile Thread thread; // 被包装的线程
        Node nextWaiter; // 还有一个waitSet队列,单项队列
    }
    // 同步双向队列
    private transient volatile Node head;
    private transient volatile Node tail;
    // 相当于锁,线程标识状态
    private volatile int state;
    
    // CAS形式状态修改上锁
    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }
    
    /**
    AQS 是一个通用的同步框架,但它本身不定义具体的同步逻辑,例如如何加锁、如何释放锁。
	AQS 只提供了一套线程排队和等待的机制,而具体的加锁/释放逻辑由子类实现。
	如果不重写,直接调用这些方法就会抛出异常,提醒开发者必须自己实现逻辑。
    
    **/
    // 尝试以独占模式(Exclusive Mode)获取同步状态
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放独占模式的同步状态。
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试以共享模式(Shared Mode)获取同步状态。
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 尝试释放共享模式的同步状态。
     protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 判断当前锁是否被当前线程独占。
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
}
ReentrantLock
public class ReentrantLock implements Lock{
    Sync synx; // 
    abstract static class Sync extends AbstractQueuedSynchronizer{
        // 重写了5个方法
    }
}
lock()
// ReentrantLock中
public void lock() {
    sync.acquire(1); // AQS本身已经实现了acquire
}
// tryAcquire实现了公平和非公平

		protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                /**
                hasQueuedPredecessors遍历同步队列  
                该线程没有前驱节点并且CAS修改为1acquires,获得锁,就会设置当前线程
                exclusiveOwnerThread=current 返回锁,获取成功
                **/
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 可重入锁
                int nextc = c + acquires; // 重入次数加1
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); // 已经获取锁就用CAS
                return true;
            }
            return false;
        }

// AQS中 tryAcquire在ReentrantLock实现逻辑
	static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
public final void acquire(int arg) {
    /**
    获取锁成功退出
    获取锁失败将当前先线程封装为node放入到同步队列尾巴上(自旋),返回这个结点node
    acquireQueued(node, arg))
    
    调用 tryAcquire 尝试获取锁,如果成功则直接返回。
	如果 tryAcquire 失败,则调用 addWaiter 将当前线程加入等待队列,并调用 acquireQueued 进行排队等待。如果在等待过程中被中断,则调用 selfInterrupt 恢复中断状态。
    **/
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// =================================================================================================

	private Node addWaiter(Node mode) {
        Node node = new Node(mode); // mode当前线程是什么时候被封装的呢?
        for (;;) {
            Node oldTail = tail; // 保存为此时状态的尾节点,多线程Tail可能后面就不一样了,循环去更新oldTail
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail); // node前驱指向尾巴节点
                if (compareAndSetTail(oldTail, node)) { // cas,Node放到尾节点,成功
                    oldTail.next = node; // 之前的尾巴节点指向node,双向
                    return node;
                }
            } else {
                initializeSyncQueue(); // 为节点为空,就初始化
            }
        }
    }
// mode当前线程是什么时候被封装的呢?
	Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());// 这里被封装
        }
	private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))// new一个空结点。head设置为它,初始为null,如果其他的已经设置了就略过
            tail = h;
    }
// ========================================================================================================================

	final boolean acquireQueued(final Node node, int arg) {  
    /**
    
    初始化一个标志位 interrupted,用于记录当前线程是否被中断。
    进入无限循环,检查当前节点的前驱节点是否为头节点,并尝试获取锁。
    如果成功获取锁,则将当前节点设置为新的头节点,并返回是否被中断的状态。
    如果获取锁失败,则判断是否需要阻塞当前线程,如果需要则阻塞并更新中断状态。
    捕获异常并处理,取消获取锁的操作。
    
   	目的:阻塞当前线程,但需要把前面的设置为SINGAL,才能放心的去阻塞。
    **/
    
    /**
    flowchart TD
    A[开始] --> B{前驱是头节点?}
    B -->|Yes| C{尝试获取锁}
    C -->|成功| D[设置当前节点为头节点]
    D --> E[返回是否被中断]
    B -->|No| F{是否需要阻塞?}
    F -->|Yes| G[阻塞并检查中断]
    G --> H[更新中断状态]
    H --> B
    F -->|No| B
    A --> I{捕获异常}
    I --> J[取消获取锁]
    J --> K{如果被中断,中断自身}
    K --> L[抛出异常] 
    **/  
        boolean interrupted = false; // 目前不可倍中断
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }

	private void setHead(Node node) { // 
        head = node;
        node.thread = null;
        node.prev = null;
    }

    // 在p == head && tryAcquire(arg)后是否应该park
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        /**
        获取前驱节点的状态。
        如果前驱节点状态为 SIGNAL,返回 true,表示当前节点可以安全地阻塞。
        如果前驱节点状态为取消状态(大于0),则跳过所有已取消的前驱节点,并重新连接链表。
        否则,将前驱节点状态设置为 SIGNAL,但不立即阻塞,返回 false
        **/
        
        // pred可能就是头节点或者有线程包装的结点
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                // 会存在内存泄漏吗a<=>b<=>c<=>d
                // a<===>d
                // a<-b<=>c->d
                // ????????
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL); // 设置为SIGNAL,表示唤醒下一个结点
        }
        return false;
    }
   
	private final boolean parkAndCheckInterrupt() {
        /**
        1. LockSupport.park(this);
        LockSupport.park(Object blocker) 让当前线程挂起(阻塞),直到被其他线程显式唤醒。
        this 参数(即当前对象)用于调试信息,可以帮助追踪哪个对象导致线程被挂起。
        线程在 park() 后会进入等待状态,直到满足以下任一条件:
        	其他线程调用 LockSupport.unpark(targetThread) 释放它。
        	线程被中断(即 Thread.interrupt())。
        	可能由于虚假唤醒(Spurious Wakeup) 自动返回。
        2. return Thread.interrupted();
        Thread.interrupted() 用于检查并清除当前线程的中断标志:
        	如果线程在 park() 期间被中断,该方法返回 true,表示线程曾被中断过。
        	同时,这个方法会清除当前线程的中断标志(interrupt status)。
        	如果没有中断,返回 false。
        **/
        LockSupport.park(this);
        return Thread.interrupted();
    }
unlock()
// ReentrantLock中
public void unlock() {
    sync.release(1);
}
// ===========================================================================
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
    		/** 解锁一定是先lock在unlock, lock成功后会setExclusiveOwnerThread
    		    判断这两个是否相等
    		**/
            if (Thread.currentThread() != getExclusiveOwnerThread()) 
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }		

// AQS中

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 头节点为空,就没有,检查头节点是否Signal,不是就没必要唤醒后面的
            unparkSuccessor(h);
        return true;
    }
    return false;
}



private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
    // ReenLock中不会出现下述情况
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
    
    
        if (s != null)
            LockSupport.unpark(s.thread);
    }
手动实现一个
package zy.sats.java.juc;

import java.lang.ref.Reference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

/**
 * @Description: TODO
 * @Author: sats@jz
 * @Date: 2025/2/17 14:54
 **/
public class AQSLinkedLock  implements MyLock{
    private final boolean fair;
    private final AtomicBoolean lock = new AtomicBoolean(false);
    private final AtomicReference<Node> head = new AtomicReference<>(new Node());
    private final AtomicReference<Node> tail = new AtomicReference<>(head.get());
    private Thread owner = null;

    class Node{
        Node pre;
        Node next;
        Thread thread;
        public Node(Thread thread) {
            this.thread = thread;
        }
        public Node() {
        }
    }
    public AQSLinkedLock(Boolean fair) {
        this.fair = fair;
    }
    @Override
    public void lock() {
        //  非公平 先尝试直接拿锁, 拿不到就将线程包装为节点加入到链表尾部
        if(!fair && lock.compareAndSet(false,true)){
            System.out.println(Thread.currentThread().getName()+" get lock");
            owner = Thread.currentThread();
            return;
        }
        // 获取当前线程,包装为Node节点
        Node current = new Node(Thread.currentThread());
        //  尝试将当前的节点放到链表尾部, 链表尾部进行CAS替换直到成功。
        //  将原来的链表尾部节点的next指向当前节点, 当前节点的前驱指向头信息。
        while(true){
            //  每次重新获取链表尾部的引用,因为多线程情况下可能会变化
            Node currentTail = tail.get();
            if(tail.compareAndSet(currentTail, current)){
                current.pre = currentTail;
                currentTail.next = current;
                System.out.println(Thread.currentThread().getName()+"加入链表尾部");
                break;
            }
        }
//        加入之后, 就LockSupport.park(), 但是需要注意,唤醒是个什么逻辑
        //  唤醒之后, 判断当前的线程的前驱是否是head节点,只有这样才是下一个释放,逻辑正确。
        //  并且要获取锁成功  才能释放。
        while(true){

            // condition
            // head -> A -> B -> C -> D
            if(current.pre == head.get() && lock.compareAndSet(false,true)){
                //  获取锁成功后,设置头节点的指向为当前线程
                owner = Thread.currentThread();
                Node old = head.get();
                head.set(current);
                old.next = null;
                current.pre = null;
                System.out.println(Thread.currentThread().getName()+"获得锁");
                return;
            }
            LockSupport.park(); // 先判断一次逻辑在阻塞, 保证一次自己唤醒自己的操作
        }
    }

    @Override
    public void unlock() {
        if(Thread.currentThread() != owner){
            throw new RuntimeException("not owner");
        }


//        Node headNode = head.get();
        owner = null;
        lock.set(false);
        Node next = head.get().next;

        // -----
        if(next != null){
            System.out.println(Thread.currentThread().getName()+"唤醒下一个线程"+next.thread.getName());
            LockSupport.unpark(next.thread);
        }


    }
}

相关文章:

  • 【Vela学习】存储系统
  • 用android studio模拟器,模拟安卓手机访问网页,使用Chrome 开发者工具查看控制台信息
  • 全面解析Tomcat:简介、安装与配置指南
  • CentOS7离线部署安装Dify
  • Linux 提权
  • 工业节能新利器:第二类吸收式热泵与MVR热泵深度剖析
  • 【蓝桥】帮派弟位-DFS遍历树
  • 常用工具: kafka,redis
  • 管中窥豹数字预失真(DPD)
  • 尚硅谷TS快速入门笔记(个人笔记用)
  • 【大模型项目NexLM】如何封装多个 LLM(大模型) API 调用
  • maven无法解析插件 org.apache.maven.plugins:maven-jar-plugin:3.4.1
  • 科技快讯 | 中国团队发布通用型AI Agent产品Manus;谷歌安卓原生 Linux Terminal 终端应用上线
  • 平安养老险陕西分公司启动315金融消费者权益保护教育宣传活动
  • OpenHarmony子系统开发 - 编译构建Kconfig可视化配置指导
  • 探索在生成扩散模型中基于RAG增强生成的实现与未来
  • NET400系列协议网关技术方案
  • vue3中接收props的两种写法
  • Liunx系统 : 进程间通信【IPC-Shm共享内存】
  • 基于PyQt5的全能图片处理工具开发实践
  • 做侵权电影网站什么后果/如何进行关键词优化工作
  • 建网站用什么工作站/怎么提高seo关键词排名
  • 香河做网站/数字营销策划
  • 2016企业网站建设合同/百度竞价代运营外包
  • 做电工的有接单的网站吗/电子商务营销方法
  • 品牌网站制作流程/优化关键词的方法包括