当前位置: 首页 > news >正文

JCTools 无锁并发计数器:ConcurrentAutoTable

ConcurrentAutoTable

ConcurrentAutoTable 是一个非常巧妙的高性能并发组件,由并发编程大师 Cliff Click 设计。它的核心目标是解决高并发场景下对单个值(如计数器)进行原子更新时产生的严重缓存争用问题

传统的 AtomicLong 在只有少量线程更新时表现良好,但当大量 CPU 核心同时对其进行 CAS (Compare-And-Swap) 操作时,这个 long 值所在的缓存行会在不同核心的缓存之间来回失效和同步,造成巨大的性能瓶颈。

CAT 通过数据分片(Striping)动态扩容的思想,将争用分散到多个 long 值上,从而实现了在高并发下近乎线性的性能扩展。它被广泛用于实现高性能的并发计数器、读写锁等。

核心思想:分片与分散争用

想象一下高速公路上的收费站。如果只有一个收费口,所有车辆都会堵在那里。但如果开设几十个收费口,车辆就可以分散到各个口,整体吞吐量大大提升。

ConcurrentAutoTable 就是这个原理。它内部维护一个 long 类型的数组(long[] _t),而不是单个 long 值。

  • 写操作(add / increment:当一个线程需要更新计数值时,它会根据自己的线程 ID(或其他哈希值)计算一个数组下标,然后只对该下标对应的 long 元素进行 CAS 更新。
  • 读操作(get:当需要获取总计数时,它会遍历整个数组,将所有元素的值相加得到最终结果。

关键优势

  • 分散争用:来自不同线程的更新操作会以极高的概率命中数组中的不同元素。由于这些元素在内存中是分开的(甚至可能位于不同的缓存行),线程之间几乎不会发生缓存行争用,每个线程都像是在操作自己的本地变量一样快。
  • 无锁:所有操作都基于 CAS,是无锁的。

ConcurrentAutoTable 与内部类 CAT

ConcurrentAutoTable 本身是一个非常轻量级的包装类,真正的核心逻辑和数据都封装在它的私有静态内部类 CAT 中。

// ... existing code ...
public class ConcurrentAutoTable implements Serializable {// ... public methods ...// The underlying array of concurrently updated long countersprivate volatile CAT _cat = new CAT(null,16/*Start Small, Think Big!*/,0L);private static AtomicReferenceFieldUpdater<ConcurrentAutoTable,CAT> _catUpdater =AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class,CAT.class, "_cat");private boolean CAS_cat( CAT oldcat, CAT newcat ) { return _catUpdater.compareAndSet(this,oldcat,newcat); }// ... hash() method ...// --- CAT -----------------------------------------------------------------private static class CAT implements Serializable {private final CAT _next;// ... other fields ...private final long[] _t;     // Power-of-2 array of longsCAT( CAT next, int sz, long init ) {_next = next;_t = new long[sz];_t[0] = init;}// ... methods ...}
}
  • ConcurrentAutoTable:

    • private volatile CAT _cat: 这是指向当前“活动”的 CAT 实例的 volatile 引用。volatile 保证了当 _cat 引用发生变化时(即扩容时),所有线程都能看到最新的 CAT 实例。
    • _catUpdater: 用于原子性地更新 _cat 引用。
  • CAT (Concurrent Auto Table 的缩写):

    • private final long[] _t: 这就是分片计数器的核心——一个 long 数组。
    • private final CAT _next: 这是一个指向旧的 CAT 实例的引用。这个字段是实现平滑扩容的关键。当数组大小需要翻倍时,会创建一个新的、更大的 CAT 实例,它的 _next 字段会指向旧的 CAT 实例。这形成了一个单向链表结构。

核心写操作:add_if 与自动扩容

addincrement 等方法最终都调用了 private long add_if(long x)

// ... existing code ...private long add_if( long x ) { return _cat.add_if(x,hash(),this); }// Hash spreaderprivate static int hash() {//int h = (int)Thread.currentThread().getId();int h = System.identityHashCode(Thread.currentThread());return h<<3;                // Pad out cache lines.  The goal is to avoid cache-line contention}
// ... existing code ...
  • hash(): 计算一个哈希值,用于选择数组中的槽位(slot)。这里巧妙地使用了 System.identityHashCode(Thread.currentThread()),它对于同一个线程返回值是固定的。h<<3 是一个优化,目的是让相邻线程的哈希值尽可能地跨越缓存行边界,进一步减少伪共享的可能。

CAT.add_if 的逻辑是精髓所在:

// ... existing code ...public long add_if( long x, int hash, ConcurrentAutoTable master ) {final long[] t = _t;final int idx = hash & (t.length-1); // 1. 计算槽位// Peel loop; try once fastlong old = t[idx];final boolean ok = CAS( t, idx, old, old+x ); // 2. 尝试一次CASif( ok ) return old;      // Got it// Try harderint cnt=0;while( true ) { // 3. 自旋重试old = t[idx];if( CAS( t, idx, old, old+x ) ) break; // Got it!cnt++;}if( cnt < MAX_SPIN ) return old; // 4. 如果争用不激烈,直接返回if( t.length >= 1024*1024 ) return old; // 数组太大,不再扩容// 5. 争用激烈,触发扩容// ... (一些复杂的、被注释掉的节流逻辑) ...CAT newcat = new CAT(this,t.length*2,0); // 6. 创建新CAT// Take 1 stab at updating the CAT with the new larger size.while( master._cat == this && !master.CAS_cat(this,newcat) ) {/*empty*/} // 7. CAS更新主引用return old;}
// ... existing code ...
  1. 计算槽位:通过 hash & (t.length-1) 快速计算出要操作的数组下标(利用了数组长度是2的幂次的特性)。
  2. 快速尝试:乐观地认为没有争用,直接尝试一次 CAS。在低并发下,这几乎总是成功的。
  3. 自旋重试:如果第一次失败,说明有其他线程同时在更新同一个槽位,进入一个简短的自旋循环,直到 CAS 成功。
  4. 争用判断MAX_SPIN 被设为1。这意味着只要自旋超过1次,就认为当前槽位的争用是“激烈”的。
  5. 触发扩容:一旦判定争用激烈,就触发扩容逻辑。
  6. 创建新 CAT:创建一个大小是原来两倍的新 CAT 实例。关键在于 new CAT(this, ...)this (旧的 CAT) 被作为 _next 传入,形成了链表。
  7. 更新主引用:尝试通过 CAS 将 ConcurrentAutoTable 的 _cat 字段指向这个新的 CAT 实例。这里有一个 while 循环,确保在多个线程同时尝试扩容时,最终只有一个能成功,并且其他线程能看到扩容的结果。

核心读操作:sum 与 estimate_sum

获取总和需要遍历所有分片。

// ... existing code ...public long sum( ) {// 1. 递归地累加旧CAT的值long sum = _next == null ? 0 : _next.sum(); final long[] t = _t;// 2. 累加当前CAT数组的值for( long cnt : t ) sum += cnt;return sum;}
// ... existing code ...
  • 递归求和sum() 方法首先会检查 _next 是否为 null。如果不为 null,它会递归调用 _next.sum(),将所有历史(旧的、已被替换的)CAT 实例中的计数值全部加起来。
  • 遍历当前:然后,它会遍历当前 CAT 实例的 _t 数组,累加所有值。
  • 非原子性:需要注意的是,get() (sum()) 操作不是原子的。在遍历求和的过程中,其他线程可能仍在更新数组中的值。因此,get() 返回的是一个“近似值”,但它包含了调用线程之前所有的更新。对于计数器这类场景,这种近似值通常是可以接受的。

estimate_sum() 是一个更轻量级的 get

// ... existing code ...public long estimate_sum( ) {// For short tables, just do the workif( _t.length <= 64 ) return sum();// For bigger tables, periodically freshen a cached valuelong millis = System.currentTimeMillis();if( _fuzzy_time != millis ) { // Time marches on?_fuzzy_sum_cache = sum(); // Get sum the hard way_fuzzy_time = millis;   // Indicate freshness of cached value}return _fuzzy_sum_cache;  // Return cached sum}
// ... existing code ...

它引入了一个基于时间的缓存。只有当时间戳(精确到毫秒)变化时,才会重新计算一次完整的 sum(),否则直接返回缓存值。这使得在读多写少的场景下,获取计数值的成本几乎等同于一次普通的字段读取。


重置操作:set

set 操作相对昂贵,因为它需要确保原子性。

// ... existing code ...public void set( long x ) {CAT newcat = new CAT(null,4,x);// Spin until CAS workswhile( !CAS_cat(_cat,newcat) ) {/*empty*/}}
// ... existing code ...

它的实现方式是:

  1. 创建一个全新的 CAT 实例,其 _next 为 null(切断了历史),数组大小为一个较小值(如4),并将要设置的值 x 放在数组的第一个元素中。
  2. 通过自旋 CAS,用这个全新的 CAT 替换掉 ConcurrentAutoTable 中当前的 _cat 引用。 一旦替换成功,旧的 CAT 链表就会被垃圾回收,所有后续的 add 和 get 操作都会在新 CAT 上进行,从而实现了原子性的 set

ConcurrentAutoTable 和 JDK 中 LongAdder 

ConcurrentAutoTable 是 JDK 中 LongAdder 的“精神前身”。

java.util.concurrent.atomic.LongAdder (自 Java 8 起) 和 org.jctools.maps.ConcurrentAutoTable (由并发大师 Cliff Click 创建,早于 LongAdder) 都是为了解决 AtomicLong 在极高并发下因缓存行争用(Cache-Line Contention)导致的性能瓶颈问题。

两者最核心的相似之处在于都采用了 分片(Striping)或者叫分段(Segmentation) 的思想。

  • 问题:当几十上百个线程同时尝试用 CAS 更新同一个 AtomicLong 时,这个 long 值所在的缓存行会在各个 CPU 核心之间疯狂传递,导致总线流量激增,性能急剧下降。
  • 解决方案:它们都不再维护单个 long 值,而是维护一个 long 类型的数组。当一个线程需要更新计数值时,它会通过哈希等方式找到数组中的一个位置(一个分片),并只更新那个位置的值。

这样,来自不同线程的更新压力就被分散到了整个数组中,多个线程可以并行地更新不同的数组元素,极大地减少了缓存行争用,从而实现了极高的更新吞吐量。

尽管核心思想一致,但它们在具体的实现上存在几个关键区别:

扩容机制 (最重要的区别)

当并发非常激烈,导致数组内的某个元素依然存在争用时,两者都需要对内部数组进行扩容以进一步分散压力。它们的扩容方式完全不同。

  • ConcurrentAutoTable (链表法):

    • 当一个线程检测到争用激烈时,它会创建一个新的、尺寸翻倍的 CAT 实例
    • 最巧妙的是,这个新的 CAT 实例内部有一个 _next 指针,会指向旧的 CAT 实例
    • 然后,主类会通过一次 CAS 操作,将自己的 _cat 引用指向这个新的 CAT 实例。
    • 这就在内存中形成了一个 CAT 对象的单向链表。扩容过程是完全无锁的,且不需要拷贝任何旧数据。
    // ... existing code ...private static class CAT implements Serializable {// ... existing code ...private final CAT _next; // 指向旧的CAT实例// ... existing code ...private final long[] _t;     // Power-of-2 array of longsCAT( CAT next, int sz, long init ) {_next = next; // 在构造时形成链表_t = new long[sz];_t[0] = init;}public long add_if( long x, int hash, ConcurrentAutoTable master ) {// ... 争用检测逻辑 ...if( cnt < MAX_SPIN ) return old; // 如果争用不激烈,直接返回// ...// 创建新的CAT,并将 this (旧CAT) 作为 next 传入CAT newcat = new CAT(this,t.length*2,0);// CAS更新主引用,指向新CATwhile( master._cat == this && !master.CAS_cat(this,newcat) ) {/*empty*/}return old;}
    // ... existing code ...
    
  • LongAdder (加锁替换法):

    • LongAdder 内部有一个 volatile Cell[] cells 数组和一个 volatile long base 字段。
    • 当需要扩容时(在 longAccumulate 方法中),它会使用一个自旋锁 (cellsBusy) 来锁住 cells 数组。
    • 获得锁的线程会创建一个新的、更大的 Cell 数组,并将旧数组的内容拷贝到新数组中。
    • 最后释放锁。这个过程虽然短暂,但存在一个锁竞争点。

求和逻辑

因为扩容机制不同,它们的求和逻辑也截然不同。

  • ConcurrentAutoTable (递归求和):

    • 为了得到总和,它必须遍历整个 CAT 链表。sum() 方法是一个递归调用,它会先计算 _next 指向的旧 CAT 的总和,然后再加上当前 CAT 数组里所有值的总和。

    // ... existing code ...public long sum( ) {// 递归地获取历史CAT的总和long sum = _next == null ? 0 : _next.sum(); final long[] t = _t;// 加上当前CAT数组的总和for( long cnt : t ) sum += cnt;return sum;}
    // ... existing code ...
    
  • LongAdder (迭代求和):

    • 它的求和逻辑相对简单。只需要将 base 的值和当前 cells 数组中所有 Cell 的值相加即可。它只需要遍历一个数组。

基础值与低争用优化

  • ConcurrentAutoTable: 没有“基础值”的概念,所有更新都直接分散到 _t 数组中。
  • LongAdder: 有一个 base 字段。在低并发、无争用的情况下,线程会优先尝试 CAS 更新 base 字段。这非常快,只有当 base 字段更新失败(出现争用)时,才会去操作 cells 数组。这是一个针对低争用场景的优化。

总结

特性ConcurrentAutoTable (JCTools)LongAdder (JDK 8+)
核心思想分片/条带化,分散争用分片/条带化,分散争用
扩容机制无锁链表法:创建新表,通过 _next 指针链接旧表自旋锁替换法:加锁、创建新表、拷贝数据、替换
求和逻辑递归遍历 CAT 链表迭代遍历 base 和 cells 数组
低争用优化无,直接操作数组有,优先更新 base 字段
来源JCTools (第三方库), Cliff ClickJDK 标准库, Doug Lea

结论

ConcurrentAutoTable 和 LongAdder 都是解决高并发计数器问题的顶尖实现。LongAdder 作为后来者和 JDK 标准,综合了性能和通用性,并对低争用场景做了优化,是目前 Java 开发中的首选。而 ConcurrentAutoTable 则展示了一种非常精妙的、完全无锁的动态扩容思路,体现了并发编程大师的深厚功力,非常值得学习和研究。

所以,它们不完全一样LongAdder 可以看作是吸收了这类思想并结合了更多工程实践考量后的 JDK 标准化成果。

总结

ConcurrentAutoTable 是一个为解决高并发更新争用而生的杰作。它通过以下关键技术实现了卓越的性能和扩展性:

  • 分片(Striping):将单个计数器扩展为一个计数器数组,将线程的更新操作分散到不同元素上,从根本上消除了缓存争用。
  • 自动扩容:当检测到某个槽位争用激烈时,能自动将数组大小加倍,进一步降低冲突概率。
  • 链式结构平滑扩容:通过 _next 指针形成 CAT 实例链表,使得扩容时无需拷贝旧数据,sum() 操作可以自然地将历史数据包含进来。
  • 近似求和与缓存get() 提供一个高性能但非严格原子的求和,estimate_get() 则提供了成本更低的缓存读取。

它完美诠释了在并发编程中,如何通过改变数据结构来适应并发模式,从而获得极致的性能。

http://www.dtcms.com/a/333576.html

相关文章:

  • obsidian ai/copilot 插件配置
  • epoll边缘模式收数据学习
  • 【100页PPT】数字化转型某著名企业集团信息化顶层规划方案(附下载方式)
  • 基于之前的Python附魔插件做出的一些改进
  • 3s岗位合集
  • 并行Builder-输出型流程编排的新思路
  • AI提高投放效率的核心策略
  • 【生产实践】内网YUM源中rpm包的替换与仓库升级实战
  • 应用侧华为云LoTDA设备接入平台
  • 2025二建成绩公布!各地合格标准汇总!
  • 通俗易懂:Vue3的ref()运行机理
  • Windows Server存储智能数据校验
  • AMQP协议介绍
  • 【进阶】Java技术栈八股文学习资料整理
  • 优化网络ROI:专线复用,上云出网一“线”牵!
  • 力扣top100(day04-04)--栈
  • 从“写代码”到“定义需求”:AI编程工具如何重构软件开发的核心流程?
  • 深度学习-卷积神经网络-ResNet 残差网络
  • 永磁同步电机控制 第二篇、电机的分类
  • 支持向量机的原理和案例解析
  • Sklearn 机器学习 手写数字识别 使用K近邻算法做分类
  • Android Studio
  • IO流-转换流
  • MySQL的分析查询语句(EXPLAIN):
  • stream流debug
  • 华硕主板怎样调整风扇转速
  • Redis高级优化实战:从键值设计到集群调优
  • [HDCTF 2023]Normal_Rsa(revenge)
  • 晶振电路的负载电容、电阻参数设计
  • 重新定义城市探索!如何用“城市向导”解锁旅行新体验?