JCTools 无锁并发计数器:ConcurrentAutoTable
ConcurrentAutoTable
ConcurrentAutoTable
是一个非常巧妙的高性能并发组件,由并发编程大师 Cliff Click 设计。它的核心目标是解决高并发场景下对单个值(如计数器)进行原子更新时产生的严重缓存争用问题。
传统的 AtomicLong
在只有少量线程更新时表现良好,但当大量 CPU 核心同时对其进行 CAS (Compare-And-Swap) 操作时,这个 long
值所在的缓存行会在不同核心的缓存之间来回失效和同步,造成巨大的性能瓶颈。
CAT 通过数据分片(Striping)和动态扩容的思想,将争用分散到多个 long
值上,从而实现了在高并发下近乎线性的性能扩展。它被广泛用于实现高性能的并发计数器、读写锁等。
核心思想:分片与分散争用
想象一下高速公路上的收费站。如果只有一个收费口,所有车辆都会堵在那里。但如果开设几十个收费口,车辆就可以分散到各个口,整体吞吐量大大提升。
ConcurrentAutoTable
就是这个原理。它内部维护一个 long
类型的数组(long[] _t
),而不是单个 long
值。
- 写操作(
add
/increment
):当一个线程需要更新计数值时,它会根据自己的线程 ID(或其他哈希值)计算一个数组下标,然后只对该下标对应的long
元素进行 CAS 更新。 - 读操作(
get
):当需要获取总计数时,它会遍历整个数组,将所有元素的值相加得到最终结果。
关键优势:
- 分散争用:来自不同线程的更新操作会以极高的概率命中数组中的不同元素。由于这些元素在内存中是分开的(甚至可能位于不同的缓存行),线程之间几乎不会发生缓存行争用,每个线程都像是在操作自己的本地变量一样快。
- 无锁:所有操作都基于 CAS,是无锁的。
ConcurrentAutoTable
与内部类 CAT
ConcurrentAutoTable
本身是一个非常轻量级的包装类,真正的核心逻辑和数据都封装在它的私有静态内部类 CAT
中。
// ... existing code ...
public class ConcurrentAutoTable implements Serializable {// ... public methods ...// The underlying array of concurrently updated long countersprivate volatile CAT _cat = new CAT(null,16/*Start Small, Think Big!*/,0L);private static AtomicReferenceFieldUpdater<ConcurrentAutoTable,CAT> _catUpdater =AtomicReferenceFieldUpdater.newUpdater(ConcurrentAutoTable.class,CAT.class, "_cat");private boolean CAS_cat( CAT oldcat, CAT newcat ) { return _catUpdater.compareAndSet(this,oldcat,newcat); }// ... hash() method ...// --- CAT -----------------------------------------------------------------private static class CAT implements Serializable {private final CAT _next;// ... other fields ...private final long[] _t; // Power-of-2 array of longsCAT( CAT next, int sz, long init ) {_next = next;_t = new long[sz];_t[0] = init;}// ... methods ...}
}
ConcurrentAutoTable
:private volatile CAT _cat
: 这是指向当前“活动”的CAT
实例的volatile
引用。volatile
保证了当_cat
引用发生变化时(即扩容时),所有线程都能看到最新的CAT
实例。_catUpdater
: 用于原子性地更新_cat
引用。
CAT
(Concurrent Auto Table 的缩写):private final long[] _t
: 这就是分片计数器的核心——一个long
数组。private final CAT _next
: 这是一个指向旧的CAT
实例的引用。这个字段是实现平滑扩容的关键。当数组大小需要翻倍时,会创建一个新的、更大的CAT
实例,它的_next
字段会指向旧的CAT
实例。这形成了一个单向链表结构。
核心写操作:add_if
与自动扩容
add
、increment
等方法最终都调用了 private long add_if(long x)
。
// ... existing code ...private long add_if( long x ) { return _cat.add_if(x,hash(),this); }// Hash spreaderprivate static int hash() {//int h = (int)Thread.currentThread().getId();int h = System.identityHashCode(Thread.currentThread());return h<<3; // Pad out cache lines. The goal is to avoid cache-line contention}
// ... existing code ...
hash()
: 计算一个哈希值,用于选择数组中的槽位(slot)。这里巧妙地使用了System.identityHashCode(Thread.currentThread())
,它对于同一个线程返回值是固定的。h<<3
是一个优化,目的是让相邻线程的哈希值尽可能地跨越缓存行边界,进一步减少伪共享的可能。
CAT.add_if
的逻辑是精髓所在:
// ... existing code ...public long add_if( long x, int hash, ConcurrentAutoTable master ) {final long[] t = _t;final int idx = hash & (t.length-1); // 1. 计算槽位// Peel loop; try once fastlong old = t[idx];final boolean ok = CAS( t, idx, old, old+x ); // 2. 尝试一次CASif( ok ) return old; // Got it// Try harderint cnt=0;while( true ) { // 3. 自旋重试old = t[idx];if( CAS( t, idx, old, old+x ) ) break; // Got it!cnt++;}if( cnt < MAX_SPIN ) return old; // 4. 如果争用不激烈,直接返回if( t.length >= 1024*1024 ) return old; // 数组太大,不再扩容// 5. 争用激烈,触发扩容// ... (一些复杂的、被注释掉的节流逻辑) ...CAT newcat = new CAT(this,t.length*2,0); // 6. 创建新CAT// Take 1 stab at updating the CAT with the new larger size.while( master._cat == this && !master.CAS_cat(this,newcat) ) {/*empty*/} // 7. CAS更新主引用return old;}
// ... existing code ...
- 计算槽位:通过
hash & (t.length-1)
快速计算出要操作的数组下标(利用了数组长度是2的幂次的特性)。 - 快速尝试:乐观地认为没有争用,直接尝试一次 CAS。在低并发下,这几乎总是成功的。
- 自旋重试:如果第一次失败,说明有其他线程同时在更新同一个槽位,进入一个简短的自旋循环,直到 CAS 成功。
- 争用判断:
MAX_SPIN
被设为1。这意味着只要自旋超过1次,就认为当前槽位的争用是“激烈”的。 - 触发扩容:一旦判定争用激烈,就触发扩容逻辑。
- 创建新
CAT
:创建一个大小是原来两倍的新CAT
实例。关键在于new CAT(this, ...)
,this
(旧的CAT
) 被作为_next
传入,形成了链表。 - 更新主引用:尝试通过 CAS 将
ConcurrentAutoTable
的_cat
字段指向这个新的CAT
实例。这里有一个while
循环,确保在多个线程同时尝试扩容时,最终只有一个能成功,并且其他线程能看到扩容的结果。
核心读操作:sum
与 estimate_sum
获取总和需要遍历所有分片。
// ... existing code ...public long sum( ) {// 1. 递归地累加旧CAT的值long sum = _next == null ? 0 : _next.sum(); final long[] t = _t;// 2. 累加当前CAT数组的值for( long cnt : t ) sum += cnt;return sum;}
// ... existing code ...
- 递归求和:
sum()
方法首先会检查_next
是否为null
。如果不为null
,它会递归调用_next.sum()
,将所有历史(旧的、已被替换的)CAT
实例中的计数值全部加起来。 - 遍历当前:然后,它会遍历当前
CAT
实例的_t
数组,累加所有值。 - 非原子性:需要注意的是,
get()
(sum()
) 操作不是原子的。在遍历求和的过程中,其他线程可能仍在更新数组中的值。因此,get()
返回的是一个“近似值”,但它包含了调用线程之前所有的更新。对于计数器这类场景,这种近似值通常是可以接受的。
estimate_sum()
是一个更轻量级的 get
:
// ... existing code ...public long estimate_sum( ) {// For short tables, just do the workif( _t.length <= 64 ) return sum();// For bigger tables, periodically freshen a cached valuelong millis = System.currentTimeMillis();if( _fuzzy_time != millis ) { // Time marches on?_fuzzy_sum_cache = sum(); // Get sum the hard way_fuzzy_time = millis; // Indicate freshness of cached value}return _fuzzy_sum_cache; // Return cached sum}
// ... existing code ...
它引入了一个基于时间的缓存。只有当时间戳(精确到毫秒)变化时,才会重新计算一次完整的 sum()
,否则直接返回缓存值。这使得在读多写少的场景下,获取计数值的成本几乎等同于一次普通的字段读取。
重置操作:set
set
操作相对昂贵,因为它需要确保原子性。
// ... existing code ...public void set( long x ) {CAT newcat = new CAT(null,4,x);// Spin until CAS workswhile( !CAS_cat(_cat,newcat) ) {/*empty*/}}
// ... existing code ...
它的实现方式是:
- 创建一个全新的
CAT
实例,其_next
为null
(切断了历史),数组大小为一个较小值(如4),并将要设置的值x
放在数组的第一个元素中。 - 通过自旋 CAS,用这个全新的
CAT
替换掉ConcurrentAutoTable
中当前的_cat
引用。 一旦替换成功,旧的CAT
链表就会被垃圾回收,所有后续的add
和get
操作都会在新CAT
上进行,从而实现了原子性的set
。
ConcurrentAutoTable
和 JDK 中 LongAdder
ConcurrentAutoTable
是 JDK 中 LongAdder
的“精神前身”。
java.util.concurrent.atomic.LongAdder
(自 Java 8 起) 和 org.jctools.maps.ConcurrentAutoTable
(由并发大师 Cliff Click 创建,早于 LongAdder
) 都是为了解决 AtomicLong
在极高并发下因缓存行争用(Cache-Line Contention)导致的性能瓶颈问题。
两者最核心的相似之处在于都采用了 分片(Striping)或者叫分段(Segmentation) 的思想。
- 问题:当几十上百个线程同时尝试用 CAS 更新同一个
AtomicLong
时,这个long
值所在的缓存行会在各个 CPU 核心之间疯狂传递,导致总线流量激增,性能急剧下降。 - 解决方案:它们都不再维护单个
long
值,而是维护一个long
类型的数组。当一个线程需要更新计数值时,它会通过哈希等方式找到数组中的一个位置(一个分片),并只更新那个位置的值。
这样,来自不同线程的更新压力就被分散到了整个数组中,多个线程可以并行地更新不同的数组元素,极大地减少了缓存行争用,从而实现了极高的更新吞吐量。
尽管核心思想一致,但它们在具体的实现上存在几个关键区别:
扩容机制 (最重要的区别)
当并发非常激烈,导致数组内的某个元素依然存在争用时,两者都需要对内部数组进行扩容以进一步分散压力。它们的扩容方式完全不同。
ConcurrentAutoTable
(链表法):- 当一个线程检测到争用激烈时,它会创建一个新的、尺寸翻倍的
CAT
实例。 - 最巧妙的是,这个新的
CAT
实例内部有一个_next
指针,会指向旧的CAT
实例。 - 然后,主类会通过一次 CAS 操作,将自己的
_cat
引用指向这个新的CAT
实例。 - 这就在内存中形成了一个
CAT
对象的单向链表。扩容过程是完全无锁的,且不需要拷贝任何旧数据。
// ... existing code ...private static class CAT implements Serializable {// ... existing code ...private final CAT _next; // 指向旧的CAT实例// ... existing code ...private final long[] _t; // Power-of-2 array of longsCAT( CAT next, int sz, long init ) {_next = next; // 在构造时形成链表_t = new long[sz];_t[0] = init;}public long add_if( long x, int hash, ConcurrentAutoTable master ) {// ... 争用检测逻辑 ...if( cnt < MAX_SPIN ) return old; // 如果争用不激烈,直接返回// ...// 创建新的CAT,并将 this (旧CAT) 作为 next 传入CAT newcat = new CAT(this,t.length*2,0);// CAS更新主引用,指向新CATwhile( master._cat == this && !master.CAS_cat(this,newcat) ) {/*empty*/}return old;} // ... existing code ...
- 当一个线程检测到争用激烈时,它会创建一个新的、尺寸翻倍的
LongAdder
(加锁替换法):LongAdder
内部有一个volatile Cell[] cells
数组和一个volatile long base
字段。- 当需要扩容时(在
longAccumulate
方法中),它会使用一个自旋锁 (cellsBusy
) 来锁住cells
数组。 - 获得锁的线程会创建一个新的、更大的
Cell
数组,并将旧数组的内容拷贝到新数组中。 - 最后释放锁。这个过程虽然短暂,但存在一个锁竞争点。
求和逻辑
因为扩容机制不同,它们的求和逻辑也截然不同。
ConcurrentAutoTable
(递归求和):- 为了得到总和,它必须遍历整个
CAT
链表。sum()
方法是一个递归调用,它会先计算_next
指向的旧CAT
的总和,然后再加上当前CAT
数组里所有值的总和。
// ... existing code ...public long sum( ) {// 递归地获取历史CAT的总和long sum = _next == null ? 0 : _next.sum(); final long[] t = _t;// 加上当前CAT数组的总和for( long cnt : t ) sum += cnt;return sum;} // ... existing code ...
- 为了得到总和,它必须遍历整个
LongAdder
(迭代求和):- 它的求和逻辑相对简单。只需要将
base
的值和当前cells
数组中所有Cell
的值相加即可。它只需要遍历一个数组。
- 它的求和逻辑相对简单。只需要将
基础值与低争用优化
ConcurrentAutoTable
: 没有“基础值”的概念,所有更新都直接分散到_t
数组中。LongAdder
: 有一个base
字段。在低并发、无争用的情况下,线程会优先尝试 CAS 更新base
字段。这非常快,只有当base
字段更新失败(出现争用)时,才会去操作cells
数组。这是一个针对低争用场景的优化。
总结
特性 | ConcurrentAutoTable (JCTools) | LongAdder (JDK 8+) |
---|---|---|
核心思想 | 分片/条带化,分散争用 | 分片/条带化,分散争用 |
扩容机制 | 无锁链表法:创建新表,通过 _next 指针链接旧表 | 自旋锁替换法:加锁、创建新表、拷贝数据、替换 |
求和逻辑 | 递归遍历 CAT 链表 | 迭代遍历 base 和 cells 数组 |
低争用优化 | 无,直接操作数组 | 有,优先更新 base 字段 |
来源 | JCTools (第三方库), Cliff Click | JDK 标准库, Doug Lea |
结论:
ConcurrentAutoTable
和 LongAdder
都是解决高并发计数器问题的顶尖实现。LongAdder
作为后来者和 JDK 标准,综合了性能和通用性,并对低争用场景做了优化,是目前 Java 开发中的首选。而 ConcurrentAutoTable
则展示了一种非常精妙的、完全无锁的动态扩容思路,体现了并发编程大师的深厚功力,非常值得学习和研究。
所以,它们不完全一样,LongAdder
可以看作是吸收了这类思想并结合了更多工程实践考量后的 JDK 标准化成果。
总结
ConcurrentAutoTable
是一个为解决高并发更新争用而生的杰作。它通过以下关键技术实现了卓越的性能和扩展性:
- 分片(Striping):将单个计数器扩展为一个计数器数组,将线程的更新操作分散到不同元素上,从根本上消除了缓存争用。
- 自动扩容:当检测到某个槽位争用激烈时,能自动将数组大小加倍,进一步降低冲突概率。
- 链式结构平滑扩容:通过
_next
指针形成CAT
实例链表,使得扩容时无需拷贝旧数据,sum()
操作可以自然地将历史数据包含进来。 - 近似求和与缓存:
get()
提供一个高性能但非严格原子的求和,estimate_get()
则提供了成本更低的缓存读取。
它完美诠释了在并发编程中,如何通过改变数据结构来适应并发模式,从而获得极致的性能。