当前位置: 首页 > news >正文

并发编程——09 CountDownLatch源码分析

1 概述

  • CountDownLatch 是 Java 并发包中用于线程同步的工具类,核心逻辑是:

    • 初始化时指定一个计数count

    • 线程执行完任务后调用 countDown(),使计数count减1;

    • 其他线程调用 await() 会被阻塞,直到count减到0时,阻塞的线程才会被唤醒并继续执行;

  • 流程步骤:

    在这里插入图片描述

    • 初始化:创建 CountDownLatch 时指定count=4(即图中的state=4),代表需要等待4个线程完成任务;

    • 主线程阻塞main线程调用 await(),进入阻塞状态(图中粉色“main 阻塞”块),直到count减为0;

    • 子线程执行并计数递减

      • Thread1 执行完任务,调用 countDown()count从4→3(图中state=3);
      • Thread2 调用 countDown()count从3→2(图中state=2);
      • Thread3 调用 countDown()count从2→1(图中state=1);
      • Thread4 调用 countDown()count从1→0(图中state=0);
  • 主线程唤醒并继续:当count=0时,main线程的await()不再阻塞,继续执行后续逻辑(图中绿色“main Done”块);

  • 源码方法总览:

    在这里插入图片描述

2 构造函数

public CountDownLatch(int count) {// 确保传入的计数 count 不能为负数,否则直接抛出异常,保证了 CountDownLatch 初始化的合理性if (count < 0) throw new IllegalArgumentException("count < 0");// 初始化sync属性this.sync = new Sync(count);
}
  • SyncCountDownLatch 内部基于 AQS(抽象队列同步器) 实现的同步组件,它将传入的 count 作为 AQS 的同步状态(state),为后续的 countDown() 计数递减和 await() 阻塞唤醒逻辑提供了底层支持。

3 Sync-队列同步器

// Sync 是 AQS 的子类,用于实现 CountDownLatch 的同步逻辑
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 构造函数通过 setState(count) 将 CountDownLatch 的初始化计数赋值给 AQS 的 state 属性(state 是 AQS 用于维护同步状态的核心变量)Sync(int count) {setState(count);}// 获取当前 AQS 中 state 的值,即 CountDownLatch 剩余的未完成线程数int getCount() {return getState();}// 判断是否可以“获取共享资源”(即 CountDownLatch 的计数是否已减到 0)// 若 state == 0,返回 1(表示可以获取,await() 方法不会阻塞)// 若 state != 0,返回 -1(表示无法获取,await() 方法会阻塞线程)protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 释放锁protected boolean tryReleaseShared(int releases) {// 自旋for (;;) {int c = getState(); // 获取 AQS 的 stateif (c == 0) // 计数已为0,无需再释放return false;int nextc = c-1; // 计数减1if (compareAndSetState(c, nextc)) // CAS原子更新statereturn nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)}}
}

4 await()-阻塞等待

  • CountDownLatch#await():入口方法

    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
    }
    
    • 作用:使当前线程阻塞等待,直到 CountDownLatch 的计数减到 0;

    • 实现:委托给 sync(基于 AQS 的同步组件)的 acquireSharedInterruptibly(1) 方法执行,1 是 AQS 共享式获取的参数(此处无实际数值意义,仅为兼容方法签名);

  • AQS#acquireSharedInterruptibly(int arg):共享式可中断获取逻辑

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); // 检查线程中断,若已中断则抛异常if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); // 若获取失败,进入阻塞队列逻辑
    }
    
    • 中断检查:先判断线程是否被中断,若已中断则立即抛出 InterruptedException

    • 尝试获取资源:调用 tryAcquireShared(arg)(由 CountDownLatchSync 实现),若返回值 < 0(表示计数未到 0),则进入 doAcquireSharedInterruptibly(arg) 处理阻塞逻辑;

  • CountDownLatch#Sync#tryAcquireShared(int acquires):共享式获取的状态判断

    protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
    }
    
    • 作用:判断 CountDownLatch 的计数是否已减到 0
      • state == 0,返回 1(表示可以获取,await() 不阻塞);
      • state != 0,返回 -1(表示无法获取,await() 会阻塞)。
  • AQS#doAcquireSharedInterruptibly(int arg):共享式阻塞获取(可中断)

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 将当前线程包装为“共享模式”节点,加入等待队列boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); // 获取前驱节点if (p == head) { // 若前驱是头节点,说明当前节点有资格尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) { // 计数已到0,获取成功setHeadAndPropagate(node, r); // 设置头节点并传播唤醒(共享模式特有)p.next = null; // 断开原头节点引用,帮助GCfailed = false;return;}}// 若获取失败,判断是否需要挂起线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 挂起线程并检查中断throw new InterruptedException();}} finally {if (failed)cancelAcquire(node); // 若获取过程中失败,取消节点的获取请求}
    }
    
    • 加入等待队列addWaiter(Node.SHARED) 将当前线程包装为“共享模式”的节点,加入 AQS 的等待队列尾部;

    • 自旋尝试获取:循环中先判断“前驱是否为头节点”(若为头节点,说明当前节点是队列中最有资格获取资源的线程),然后再次尝试 tryAcquireShared(arg)

    • 线程挂起与中断:若获取失败,通过 shouldParkAfterFailedAcquire 判断是否需要挂起线程;若线程在挂起期间被中断,parkAndCheckInterrupt() 会返回 true,进而抛出 InterruptedException

5 countDown()-释放锁资源

  • CountDownLatch#countDown():入口方法

    public void countDown() {sync.releaseShared(1);
    }
    
    • 作用:将 CountDownLatch 的计数减 1,若计数减到 0,则唤醒所有等待的线程;

    • 实现:委托给sync(基于 AQS 的同步组件)的releaseShared(1)方法执行,1表示要递减的计数(此处固定为 1,因为 countDown() 每次只减 1);

  • AQS#releaseShared(int arg):共享式释放逻辑

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 尝试递减计数doReleaseShared(); // 唤醒等待队列中的线程return true;}return false;
    }
    
    • 尝试释放资源:调用 tryReleaseShared(arg)(由 CountDownLatchSync 实现),若递减成功则进入下一步;

    • 唤醒等待线程:调用 doReleaseShared() 唤醒 AQS 等待队列中阻塞的线程(即调用 await() 的线程);

  • CountDownLatch#Sync#tryReleaseShared(int releases):计数递减的核心逻辑

    protected boolean tryReleaseShared(int releases) {for (;;) { // 自旋保证原子性int c = getState(); // 获取当前计数(AQS的state)if (c == 0) return false; // 计数已为0,无需再递减int nextc = c - 1; // 计数减1if (compareAndSetState(c, nextc)) { // CAS原子更新计数return nextc == 0; // 若减到0,返回true(触发唤醒所有阻塞线程)}}
    }
    
    • 自旋 + CAS 保证原子性:通过循环尝试 CAS 操作,保证多线程同时调用 countDown() 时计数递减的原子性;

    • 唤醒触发条件:当 nextc == 0 时返回 true,AQS 会感知到这个状态变化,进而调用 doReleaseShared() 唤醒所有等待的线程;

  • AQS#doReleaseShared():唤醒等待队列的共享线程

    private void doReleaseShared() {for (;;) { // 自旋保证唤醒可靠性Node h = head; // 获取等待队列的头节点if (h != null && h != tail) { // 队列非空且有等待线程int ws = h.waitStatus; // 获取AQS等待队列中头节点的等待状态if (ws == Node.SIGNAL) { // 头节点状态为SIGNAL(表示后续节点需要唤醒)if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // CAS更新状态失败,继续自旋unparkSuccessor(h); // 唤醒头节点的后继线程} // 处理共享模式下的状态传播(保证多个线程依次被唤醒)else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head) // 头节点未变化,说明唤醒操作完成break;}
    }
    
    • 自旋循环:不断尝试唤醒操作,确保所有需要唤醒的线程都能被处理

    • 状态判断与唤醒:若头节点状态为 SIGNAL,则通过 unparkSuccessor(h) 唤醒其后继线程;同时设置状态为 PROPAGATE,保证共享模式下唤醒的“传播性”(多个等待线程依次被唤醒)。

6 总结

  • CountDownLatch 基于 AQS(抽象队列同步器)CAS(比较并交换) 实现:

    • AQS 提供了同步状态管理(state 属性)和等待队列机制,是 CountDownLatch 实现“线程阻塞/唤醒”的基础;

    • CAS 保证了“计数递减”操作的原子性(如 countDown() 中通过 CAS 原子更新 state);

  • CountDownLatch 的构造函数必须指定 count(需等待的线程数),并通过内部类 Sync(继承自 AQS)将 count 赋值给 AQS 的 state 属性。这一步为后续的“计数递减”和“阻塞等待”逻辑奠定了状态基础;

  • 调用 countDown() 时,本质是将 AQS 的 state 减 1(通过 CAS 保证原子性);

  • 当所有线程执行完毕,state 会被减到 0,此时 countDown() 会触发 AQS 唤醒等待队列中所有挂起的线程(即调用 await() 的线程);

  • 调用 await() 时,本质是判断 AQS 的 state 是否为 0:

    • state > 0,说明还有线程未执行完毕,await() 会阻塞当前线程,将其加入 AQS 等待队列;
    • state == 0(最后一个线程执行 countDown() 后),await() 会停止阻塞,当前线程继续执行。
http://www.dtcms.com/a/358830.html

相关文章:

  • 信息系统架构
  • Java面试-MyBatis篇
  • 【后端数据库】MySQL 索引生效/失效规则 + 核心原理
  • oha:一款轻量级HTTP负载测试工具
  • XHR 介绍及实践
  • 论文介绍:《Small Language Models are the Future of Agentic AI》
  • SSR降级CSR:高可用容灾方案详解
  • 使用axios封装post和get
  • istringviewstream 和 outstringstream
  • 嵌入式学习日记
  • 【3D算法技术入门】如何基于建筑图片重建三维数字资产?
  • 行内元素块元素
  • 【办公类-39-06】20250830通义万相水果图(万相2.1专业Q版线描风格+万相专业2.2默认简笔画效果)
  • “我店模式“当下观察:三方逻辑未变,三大升级重构竞争力
  • 如何提高微型导轨的生产效率?
  • 【Java EE进阶 --- SpringBoot】Spring Web MVC(Spring MVC)(二)
  • Qt中的QSS介绍
  • JavaScript 中的 this 关键字
  • 机器视觉学习-day11-图像噪点消除
  • VuePress添加自定义组件
  • android studio编译安卓项目报gradle下载失败
  • [光学原理与应用-337]:ZEMAX - 自带的用于学习的样例设计
  • 知识随记-----Qt 样式表深度解析:何时需要重写 paintEvent 让 QSS 生效
  • [算法] 双指针:本质是“分治思维“——从基础原理到实战的深度解析
  • 05.《ARP协议基础知识探秘》
  • 构建AI智能体:十八、解密LangChain中的RAG架构:让AI模型突破局限学会“翻书”答题
  • 银河麒麟V10(Phytium,D2000/8 E8C, aarch64)开发Qt
  • 魔方的使用
  • 进制转换问题
  • 【车载开发系列】CAN与CANFD上篇