企业展厅布展设计seo网站制作优化
摘要
多线程场景下共享变量原子性操作除了可以使用Java自带的synchronized关键字以及AQS锁实现线程同步外,java.util.concurrent.atomic 包下提供了对基本类型封装类(AtomicBoolean|AtomicLong|AtomicReference|AtomicBoolean) 以及对应的数组封装。对于已有的包含基本数据的类,该包提供了 FieldUpdater 对已有变量进行封装,减少代码侵入。对于原子操作的ABA问题,引入了AtomicStampedReference(特别的,AtomicMarkableReference实现01状态变更标记)。最后还提供了LongAdder和 DoubleAdder (特别的 DoubleAccumulator 和 LongAccumulator 指定了计算方式),实现原子累加,该了底层通过 cells 将并发更新线程散列到不同槽,提高并发写。
atomic原子类
该包下的类提供了支持对单个变量进行无锁的线程安全编程 支持 Boolean Integer Long Reference 类型 原子更新 ,AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 类的实例分别提供对相应类型的单个变量的访问和更新操作。
除了表示单个值的类之外,该包还包含 Updater 类,可用于对任何选定类的任何选定 volatile 字段执行 compareAndSet 操作。AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater 和 AtomicLongFieldUpdater 用于原子方式访问相关的字段类型。
AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray 类将原子操作支持扩展到这些类型的数组。这些类的显著特点是为其数组元素提供了 volatile 访问语义,而普通数组不支持这种语义。AtomicMarkableReference 类将一个布尔值与一个引用关联起来。 这个布尔位可以用来表示被引用的对象在逻辑上已被删除。
AtomicStampedReference 类将一个整数值与一个引用关联起来。 可以用于表示与一系列更新对应的版本号。DoubleAdder 和 DoubleAccumulator 提供了对Double类型值原子更新操作,Accumulator 类可以指定计算更新值的逻辑(函数引用)AtomicBoolean 原子更新Boolean类型AtomicInteger
AtomicIntegerArray允许对整数数组中的每个元素进行原子更新 原子地将数组中指定索引位置的元素设置为新值 读写都是基于unsAFE更新,根据指定元素在数组对象的offset进行更新,通过unsAFE类的方法保证原子性和可见性
AtomicIntegerFieldUpdater基于反射的实用工具,它能够对指定类中指定的整型字段进行原子更新。此类专为原子数据结构而设计,在这种结构中,同一节点的多个字段可独立地进行原子更新。使用时候需要注意: 为了保证对对象字段原子性操作,需要通过相同的更新器 (同一个AtomicIntegerFieldUpdater)操作用于对对象中的 volatile int 类型字段进行原子更新
AtomicLong
AtomicLongArray
AtomicLongFieldUpdater用于对对象中的 volatile long 类型字段进行原子更新AtomicReference
AtomicReferenceArray
AtomicReferenceFieldUpdaterAtomicMarkableReference引用类型的原子操作,并增加了boolean类型的标记。创建了内部类 Pair 将对象引用和时间戳封装为Pair 对象
AtomicStampedReference 解决ABA问题引用类型的原子操作,并增加了int类型的时间戳。创建了内部类 Pair 将对象引用和时间戳封装为Pair 对象内部通过CAS方式(UNSAFE.compareAndSwapObject)更新对Pair对象的引用,更新逻辑如下expectedReference == current.reference &&expectedMark == current.mark &&((newReference == current.reference &&newMark == current.mark) ||casPair(current, Pair.of(newReference, newMark)));实际使用时,boolean类型是由用户指定。
Adder原理
Adder类实现了在多线程环境下高效地进行数值的累加操作
DoubleAccumulator维护一个可以动态更新的 double 类型的值,该值通过用户提供的累加函数进行更新。
DoubleAdderDoubleAdder 是 DoubleAccumulator 的一个特殊情况,专门用于高效地进行 double 类型数值的求和操作
LongAccumulatorLongAccumulator 与 DoubleAccumulator 类似,只不过它用于维护一个可以动态更新的 long 类型的值,通过用户提供的累加函数进行更新。
LongAdderLongAdder 是 LongAccumulator 的一个特殊情况,专门用于高效地进行 long 类型数值的求和操作
Striped64是上面四个Adder的父类,提供了线程同步的逻辑,有Long和Double单独的实现。
Adder原理
维护了 volatile 类型的base 和 volatile Cell[] cells 数组,在累加时候,会先CAS更新base如果失败了会根据当前线程的 threadLocalRandomProbe
散列到 cells数组上,对数组元素进行CAS更新,类似于桶算法,将对一个变量的竞争分散到不同的桶中。
1 如果cells数组不为空长度不为0 ,根据当前线程的 threadLocalRandomProbe 在数组上散列,1.1 如果该槽节点为null , CAS更新 cellsBusy 为1,然后创建 cell 对象塞入该位置1.2 如果不为null代表该槽位置属于其他的线程,尝试CAS更新base 如果成功直接返回1.3 如果cas 更新base失败,尝试扩容cells数组,长度为原来的2倍,也是通过 cellsBusy 保证和其他线程同步cells数组长度为超过 CPU数量的2 的幂的个数1.4 如果上述更新都失败,重新计算线程的 Probe
2 如果cells数组未初始化,CAS更新 cellsBusy 保证多线程场景下线程一致性 ,初始化数组长度为2 。
3 如果上述步骤都失败,CAS更新base ,更新失败,进入下一循环
// wasUncontended 为true 就是初始化,只有为false时才有意义,代表竞争了但是竞争失败final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {// uncontended == true 代表cells数组为null 或者当前线程散列的cell元素为null// flase 代表当前线程散列cell元素上和其他线程有过竞争,并且更新值失败int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true; // 初始化了线程prob,同时初始化该变量}boolean collide = false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;if ((as = cells) != null && (n = as.length) > 0) { // 初始化完成后,所有的累加操作都在这里进行if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null && // 可能存在这样情况,在成功进入上面if之前,cells数组扩容了不是原来的数组(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to fail CAS+死循环更新值,第一次调用该方法,如果线程竞争失败了wasUncontended=false// 这里给 wasUncontended 重新初始化true,不代表任何意义,然后跳转到后面的rehash,重新计算当前线程的prob 然后重新CAS方式更新值// 就是代表当前线程因为CAS失败 最开始调用当前方法时候 wasUncontended 会为false ,代表已经经过一次CAS并且失败了// 直接跳到后面的rehash ,然后for循环重新CAS// 除了这个场景,不会走到这行代码wasUncontended = true; // Continue after rehashelse if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;// 上面的 wasUncontended = true 后重新进入CAS 会走到这里else if (n >= NCPU || cells != as)collide = false; // At max size or staleelse if (!collide)// 能走到这里代表竞争很激烈,但是cells数组还没有超过最大长度// 这里直接赋值true ,尝试再次CAS竞争一次,如果还走到这里的else if 代表确实竞争很激烈了// 然后走下面的扩容逻辑collide = true;else if (cellsBusy == 0 && casCellsBusy()) {// 能走到这里代表竞争十分激烈了,考虑数组扩容try {if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;// 这里直接continue可能是因为 advanceProbe 是反射方式更新线程的prob 性能消耗// 相比之下直接CAS更划算continue; // Retry with expanded table}h = advanceProbe(h);}// 这里判断有先后顺序,如果上来就先执行 casCellsBusy() 性能消耗,毕竟是反射获取 cellsBusy 值else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {// 双重锁校验 可能在上面 cells==as 和 casCellsBusy() 之前 ,其他线程已经初始化了cells数组Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}// 能走到这里代表别的线程正在初始化 cells 数组 ,当前线程尝试竞争 base ,在base上累加else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}