JUC.atomic原子操作类原理分析
摘要
多线程场景下共享变量原子性操作除了可以使用Java自带的synchronized关键字以及AQS锁实现线程同步外,java.util.concurrent.atomic 包下提供了对基本类型封装类(AtomicBoolean|AtomicLong|AtomicReference|AtomicBoolean) 以及对应的数组封装。对于已有的包含基本数据的类,该包提供了 FieldUpdater 对已有变量进行封装,减少代码侵入。对于原子操作的ABA问题,引入了AtomicStampedReference(特别的,AtomicMarkableReference实现01状态变更标记)。最后还提供了LongAdder和 DoubleAdder (特别的 DoubleAccumulator 和 LongAccumulator 指定了计算方式),实现原子累加,该了底层通过 cells 将并发更新线程散列到不同槽,提高并发写。
atomic原子类
该包下的类提供了支持对单个变量进行无锁的线程安全编程 支持 Boolean Integer Long Reference 类型 原子更新 ,AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 类的实例分别提供对相应类型的单个变量的访问和更新操作。
除了表示单个值的类之外,该包还包含 Updater 类,可用于对任何选定类的任何选定 volatile 字段执行 compareAndSet 操作。AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater 和 AtomicLongFieldUpdater 用于原子方式访问相关的字段类型。
AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray 类将原子操作支持扩展到这些类型的数组。这些类的显著特点是为其数组元素提供了 volatile 访问语义,而普通数组不支持这种语义。
AtomicMarkableReference 类将一个布尔值与一个引用关联起来。 这个布尔位可以用来表示被引用的对象在逻辑上已被删除。
AtomicStampedReference 类将一个整数值与一个引用关联起来。 可以用于表示与一系列更新对应的版本号。
DoubleAdder 和 DoubleAccumulator 提供了对Double类型值原子更新操作,Accumulator 类可以指定计算更新值的逻辑(函数引用)
AtomicBoolean 原子更新Boolean类型
AtomicInteger
AtomicIntegerArray
允许对整数数组中的每个元素进行原子更新 原子地将数组中指定索引位置的元素设置为新值
读写都是基于unsAFE更新,根据指定元素在数组对象的offset进行更新,通过unsAFE类的方法保证原子性和可见性
AtomicIntegerFieldUpdater
基于反射的实用工具,它能够对指定类中指定的整型字段进行原子更新。此类专为原子数据结构而设计,在这种结构中,同一节点的多个字段可独立地进行原子更新。
使用时候需要注意: 为了保证对对象字段原子性操作,需要通过相同的更新器 (同一个AtomicIntegerFieldUpdater)操作
用于对对象中的 volatile int 类型字段进行原子更新
AtomicLong
AtomicLongArray
AtomicLongFieldUpdater
用于对对象中的 volatile long 类型字段进行原子更新
AtomicReference
AtomicReferenceArray
AtomicReferenceFieldUpdater
AtomicMarkableReference
引用类型的原子操作,并增加了boolean类型的标记。创建了内部类 Pair 将对象引用和时间戳封装为Pair 对象
AtomicStampedReference 解决ABA问题
引用类型的原子操作,并增加了int类型的时间戳。创建了内部类 Pair 将对象引用和时间戳封装为Pair 对象
内部通过CAS方式(UNSAFE.compareAndSwapObject)更新对Pair对象的引用,更新逻辑如下
expectedReference == current.reference &&
expectedMark == current.mark &&
((newReference == current.reference &&
newMark == current.mark) ||
casPair(current, Pair.of(newReference, newMark)));
实际使用时,boolean类型是由用户指定。
Adder原理
Adder类实现了在多线程环境下高效地进行数值的累加操作
DoubleAccumulator
维护一个可以动态更新的 double 类型的值,该值通过用户提供的累加函数进行更新。
DoubleAdder
DoubleAdder 是 DoubleAccumulator 的一个特殊情况,专门用于高效地进行 double 类型数值的求和操作
LongAccumulator
LongAccumulator 与 DoubleAccumulator 类似,只不过它用于维护一个可以动态更新的 long 类型的值,通过用户提供的累加函数进行更新。
LongAdder
LongAdder 是 LongAccumulator 的一个特殊情况,专门用于高效地进行 long 类型数值的求和操作
Striped64
是上面四个Adder的父类,提供了线程同步的逻辑,有Long和Double单独的实现。
Adder原理
维护了 volatile 类型的base 和 volatile Cell[] cells 数组,在累加时候,会先CAS更新base如果失败了会根据当前线程的 threadLocalRandomProbe
散列到 cells数组上,对数组元素进行CAS更新,类似于桶算法,将对一个变量的竞争分散到不同的桶中。
1 如果cells数组不为空长度不为0 ,根据当前线程的 threadLocalRandomProbe 在数组上散列,
1.1 如果该槽节点为null , CAS更新 cellsBusy 为1,然后创建 cell 对象塞入该位置
1.2 如果不为null代表该槽位置属于其他的线程,尝试CAS更新base 如果成功直接返回
1.3 如果cas 更新base失败,尝试扩容cells数组,长度为原来的2倍,也是通过 cellsBusy 保证和其他线程同步
cells数组长度为超过 CPU数量的2 的幂的个数
1.4 如果上述更新都失败,重新计算线程的 Probe
2 如果cells数组未初始化,CAS更新 cellsBusy 保证多线程场景下线程一致性 ,初始化数组长度为2 。
3 如果上述步骤都失败,CAS更新base ,更新失败,进入下一循环
// wasUncontended 为true 就是初始化,只有为false时才有意义,代表竞争了但是竞争失败
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// uncontended == true 代表cells数组为null 或者当前线程散列的cell元素为null
// flase 代表当前线程散列cell元素上和其他线程有过竞争,并且更新值失败
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true; // 初始化了线程prob,同时初始化该变量
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // 初始化完成后,所有的累加操作都在这里进行
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null && // 可能存在这样情况,在成功进入上面if之前,cells数组扩容了不是原来的数组
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail CAS+死循环更新值,第一次调用该方法,如果线程竞争失败了wasUncontended=false
// 这里给 wasUncontended 重新初始化true,不代表任何意义,然后跳转到后面的rehash,重新计算当前线程的prob 然后重新CAS方式更新值
// 就是代表当前线程因为CAS失败 最开始调用当前方法时候 wasUncontended 会为false ,代表已经经过一次CAS并且失败了
// 直接跳到后面的rehash ,然后for循环重新CAS
// 除了这个场景,不会走到这行代码
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;// 上面的 wasUncontended = true 后重新进入CAS 会走到这里
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)// 能走到这里代表竞争很激烈,但是cells数组还没有超过最大长度
// 这里直接赋值true ,尝试再次CAS竞争一次,如果还走到这里的else if 代表确实竞争很激烈了
// 然后走下面的扩容逻辑
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {// 能走到这里代表竞争十分激烈了,考虑数组扩容
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
// 这里直接continue可能是因为 advanceProbe 是反射方式更新线程的prob 性能消耗
// 相比之下直接CAS更划算
continue; // Retry with expanded table
}
h = advanceProbe(h);
}// 这里判断有先后顺序,如果上来就先执行 casCellsBusy() 性能消耗,毕竟是反射获取 cellsBusy 值
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {// 双重锁校验 可能在上面 cells==as 和 casCellsBusy() 之前 ,其他线程已经初始化了cells数组
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}// 能走到这里代表别的线程正在初始化 cells 数组 ,当前线程尝试竞争 base ,在base上累加
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}