JUC原子操作类
原子操作类
-
基本类型原子类:AtomicInteger、AtomicBoolean、AtomicLong,常见API:
- get 获取当前值
- getAndSet 获取当前的值,并设置新的值
- getAndIncrement 获取当前的值,并自增
- getAndDecrement 获取当前的值,并自减
- getAndAdd 获取当前的值,并加上预期的值
- compareAndSet 如果输入的值等于预期值,则以原子方式将值更新
-
数组类型原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray,用法和基本类型原子类类似
AtomicIntegerArray atomicIntegerArray =new AtomicIntegerArray(new int[]{1,2,3,4});
-
引用类型原子类
- AtomicReference :原子引用
- AtomicStampedReference :携带版本号的引用类型原子类,可以解决ABA问题,记录了引用被修改过多少次
- AtomicMarkableReference:带有标记位的引用类型原子类,通过一个布尔值的状态戳,用来判断对象是否被修改过,解决一次性修改问题
-
对象的属性修改原子类
-
原子操作增强类
CountDownLatch使用场景
-
线程计数器 用于线程执行任务,计数 等待线程结束
-
例如:
public static void test() throws Exception{ AtomicInteger ato = new AtomicInteger(0); Integer size = 100; //100个线程用于计算 CountDownLatch countDownLatch = new CountDownLatch(100); for (int a = 0; a < size; a++) { new Thread(()->{ try { Thread.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } for (int b =0;b <100;b++){ //自增 ato.getAndIncrement(); } //计算完了就减1 countDownLatch.countDown(); }).start(); } //只有 countDownLatch 中的数量为0 才能继续执行,避免 Thread.sleep()的操作,计算完成能够即时执行 countDownLatch.await(); System.out.println("最终计算结果:"+ato.get()); }
对象的属性修改原子类
对象的属性修改原子类,可以用线程安全的方式操作非线程安全对象内的某些字段,例如:某个对象有多个属性,但是只有少量字段需要频繁更新,加锁虽然可以保证线程安全,但是有可能会锁住整个对象,所以可以用原子更新代替加锁
-
AtomicIntegerFieldUpdater,基于反射,原子更新对象中,被volatile修饰的 int类型字段的值
-
AtomicLongFieldUpdater基于反射,原子更新对象中,被volatile修饰的 long类型字段的值
-
AtomicReferenceFieldUpdater,基于反射,原子更新引用类型,被volatile修饰的字段的值
-
使用原子更新的要:
- 更新的对象属性必须是 public volatile 修饰的
- 而且因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,通过更新器去更新属性
-
示例:如下出了金额以外的字段是不会经常更新的,针对金额字段加锁,肯定是可以保证线程安全的,但是很重量级,不加锁的话,结果会不确定,线程不安全
@Data @AllArgsConstructor @NoArgsConstructor public class AtomicEntity { //账号 private String account; //联系方式 private String phone; //姓名 private String name; //金额 private volatile Integer money; public synchronized void addMoney(Integer add) { money = money + add; } public synchronized void subMoney(Integer sub) { money = money - sub; } } private static void test3() throws Exception { AtomicEntity atomic = new AtomicEntity("123456789", "123456", "张三", 0); CountDownLatch countDownLatch = new CountDownLatch(200); for (int a = 0; a < 100; a++) { new Thread(() -> { for (int b = 0; b < 100; b++) { atomic.addMoney(1); } countDownLatch.countDown(); }, "线程" + a).start(); } for (int a = 0; a < 100; a++) { new Thread(() -> { for (int b = 0; b < 100; b++) { atomic.subMoney(1); } countDownLatch.countDown(); }, "线程" + a).start(); } countDownLatch.await(); //这里执行结果肯定是正确的,如果不加锁会导致线程错乱 System.out.println("最终结果" + atomic.getMoney()); }
如果不想加锁,使用对象的属性修改原子类,AtomicIntegerFieldUpdater示例
@Data @AllArgsConstructor @NoArgsConstructor public class AtomicEntity { //账号 private String account; //联系方式 private String phone; //姓名 private String name; //金额 ,修饰符必须是 public volatile public volatile int money; public AtomicEntity(String account, String phone, String name, Integer money) { this.account = account; this.phone = phone; this.name = name; this.money = money; } AtomicIntegerFieldUpdater<AtomicEntity> ato = AtomicIntegerFieldUpdater.newUpdater(AtomicEntity.class, "money"); public void atoAddMoney(AtomicEntity atomic, Integer add) { ato.getAndAdd(atomic, add); } public void atoSubMoney(AtomicEntity atomic, Integer sub) { ato.getAndAdd(atomic, -sub); } } private static void test4() throws Exception { AtomicEntity atomic = new AtomicEntity("123456789", "123456", "李四", 0); CountDownLatch countDownLatch = new CountDownLatch(200); for (int a = 0; a < 100; a++) { new Thread(() -> { for (int b = 0; b < 100; b++) { atomic.atoAddMoney(atomic,1); } countDownLatch.countDown(); }, "线程" + a).start(); } for (int a = 0; a < 100; a++) { new Thread(() -> { for (int b = 0; b < 100; b++) { atomic.atoSubMoney(atomic,1); } countDownLatch.countDown(); }, "线程" + a).start(); } countDownLatch.await(); //不加锁也可以保证线程安全 System.out.println("最终结果" + atomic.getMoney()); }
原子操作增强类
- 从 jdk 8 才引入了,原子操作增强类,如果是i++操作,推荐使用 LongAdder ,能比 AtomicLong 有更好的性能,因为LongAdder 可以减少乐观锁的重试次数
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
- 上面四个增强类继承于 Striped64 ,Striped64 继承了 Number,加上上面的16个类,统称为18罗汉
LongAdder
- 当多个线程更新用于收集统计信息,但不用于细粒度同步控制的目的公共和时,通常优于AtomicLong
- 在低并发下LongAdder和AtomicLong差距不大,高并发下明显优于AtomicLong,代价是空间消耗也大
- 常见api
- add (long x) 将当前值加x
- increment() 将当前值加1
- decrement() 将当前值减1
- sum() 返回当前值,如果是并发更新,sum返回的值不精确
- reset() 将value重置为0,这个方法只能在没有并发更新时使用
- sumThenReset() 获取当前值,并将value重置为0
- 只能计算加法,且必须从0开始计算
LongAccumulator
-
能够自定义计算规则
@FunctionalInterface public interface LongBinaryOperator { //left是初始值 long applyAsLong(long left, long); } private static void test1() { //实现函数式接口 LongBinaryOperator ,自定义计算规则,第二个参数是初始值 LongAccumulator longAccumulator = new LongAccumulator((x, y) -> { return x * y; }, 1); //这里的参数 y:5,和原本的值1,通过自定义的计算规则,得到两者的计算结果 longAccumulator.accumulate(5); longAccumulator.accumulate(5); //结果为25 System.out.println(longAccumulator.longValue()); }
计数器案例
原子操作和同步方法的性能比较
/**
* 计数器
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class LikeCounter {
public volatile long num;
public LikeCounter(long num) {
this.num = num;
}
/**
* 第一种同步方法
*/
public synchronized void add() {
num++;
}
/**
* atomicLong原子操作
*/
AtomicLong atomicLong = new AtomicLong(num);
public void addAto() {
atomicLong.getAndIncrement();
}
public long getAto() {
return atomicLong.get();
}
/**
* 对象的属性修改原子类 atomicLongFieldUpdater
*/
AtomicLongFieldUpdater<LikeCounter> atomicLongFieldUpdater = AtomicLongFieldUpdater.
newUpdater(LikeCounter.class, "num");
public void addAtoField(LikeCounter likeCounter) {
atomicLongFieldUpdater.getAndIncrement(likeCounter);
}
/**
* 原子扩展类 longAdder
*/
LongAdder longAdder = new LongAdder();
public void addLongAdder() {
longAdder.increment();
}
public long getLongAdder() {
return longAdder.longValue();
}
/**
* 原子扩展类 LongAccumulator
*/
LongAccumulator longAccumulator =new LongAccumulator((x,y)->{
return x+y;
},num);
public void addlongAccumulatorr() {
longAccumulator.accumulate(1);
}
public long getlongAccumulator() {
return longAccumulator.get();
}
}
public static void main(String[] args) {
try {
test1();
test2();
test3();
test4();
test5();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void test1() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.add();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用synchronized方法,执行结果为:"+like1.getNum()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test2() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addAto();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用atomicLong方法,执行结果为:"+like1.getAto()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test3() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addAtoField(like1);
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用atomicLongFieldUpdater方法,执行结果为:"+like1.getNum()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test4() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addLongAdder();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用longAdder方法,执行结果为:"+like1.getLongAdder()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test5() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addlongAccumulatorr();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用longAccumulator方法,执行结果为:"+like1.getlongAccumulator()+
",耗时:"+(System.currentTimeMillis()-l1));
}
调用结果如下:
- synchronized方法最慢,
- longAdder 和 longAccumulato 两者最快,性能基本相当
调用synchronized方法,执行结果为:100000000,耗时:4621
调用atomicLong方法,执行结果为:100000000,耗时:1522
调用atomicLongFieldUpdater方法,执行结果为:100000000,耗时:2611
调用longAdder方法,执行结果为:100000000,耗时:368
调用longAccumulator方法,执行结果为:100000000,耗时:333
longAdder
Striped64 内部结构
-
内部变量 base :并发低的时候,直接累加到base上
-
另外还有一个内部类cell ,和 cell[],并发高的时候,会把线程分散到自己的槽 cell[i] 中
longAdder 为什么快
- AtomicLong的底层是cas,即使并发很大,一次也只能有一个线程完成cas操作,剩下的线程只能自旋等待,就会导致大量的cpu空转
- longAdder的同样是cas
- 但是采用了分散热点的思想,将value值分散到一个cell 数组中,不同的线程会命中数组中不同的槽,每个线程对自己槽中的值进行cas操作,这样就分摊了压力,减少了线程冲突的概率,也就减少了线程自旋等待的时间,
- 如果要获取真正的value值时,sum方法会将所有的cell数组中的value值和base累加作为返回值
- 所以如果并发不大,longAdder和AtomicLong差别不大,都是对一个base进行操作
- 但是高并发下longAdder会采用空间换时间的做法,使用一个cell数组拆分value值,多个线程需要同时对value值进行操作的时候,先通过线程 id 的 hash 值映射到数组对应位置,再对该位置的值进行操作,当所有线程都执行完毕,base的值加上cell 数组中的值就是最终结果
longAdder 和 AtomicLong对比
- AtomicLong是多个线程对单个热点值进行原子更新,是线程安全的,会损失一些性能,再高精度要求时,可以使用
- longAdder 再高并发下,有较好的性能,对值精确度要求不高时,可以使用,每个线程都有自己的槽,各个线程一般只对自己槽中的值进行cas操作
add 方法
-
方法入参是要增加的值
-
刚开始 cell[] 等于null,尝试用cas操作更新base值,cas执行成功,把base值改为了期望值,本次add就结束了
-
随着并发的升高,cas操作失败,就需要执行 longAccumulate方法,去初始化cell数组分散压力
-
一旦 cell[] 数组初始化完成,就需要判断当前线程所在的 cell 是否为null
-
为 null,执行 Striped64 的 longAccumulate方法,来初始化对应位置的cell
-
不为null,就执行对应 cell 的 cas操作,
- 执行成功就没有冲突,结束本次add操作
- 执行失败,表示本次操作有冲突,需要执行 Striped64 的 longAccumulate方法来扩容 cell []
-
public void add(long x) {
//b获取的base值,v表示期望值 ,m为数组长度,a表示当前线程命中的数组单元格
Cell[] as; long b, v; int m; Cell a;
//刚开始cells等于null,执行casBase,结果取反
if ((as = cells) != null || !casBase(b = base, b + x)) {
//这个boolean值代表cell数组有没有冲突
boolean uncontended = true;
//判断数组有没有初始化完成
if (as == null || (m = as.length - 1) < 0 ||
//判断线程有没有冲突,等于null说明没有冲突,getProbe()计算hash值
(a = as[getProbe() & m]) == null ||
//如果线程映射的对应位置不为null,就执行对应 cell 的 cas操作,执行成功返回true,取反得到false表没有冲突,结束本次add操作
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
longAccumulate
-
方法属于 Striped64,方法入参:
- x 是需要增加的值,
- longAdder 中 fn 默认是null, LongAccumulator会传递计算规则进来
- wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是false
-
getProbe()方法,获取线程的hash值,如果返回0,会重新计算一个hash值,重新计算后,认为线程本次操作没有竞争关系,把竞争标志改为 true ,也就是不存在冲突
-
advanceProbe(h),重置当前线程的hash值,让线程再次参与竞争
-
longAccumulate 方法分为三大部分
-
数组没有初始化
- 如果数组没有初始化,就需要加锁去初始cell数组
- 这里没有使用synchronized加锁,而是使用内部变量cellsBusy,0表示无锁状态,1表示被其他线程持有了锁
- 如果是无锁状态,就会使用cas操作把该值更新为1,更新成功代表加锁成功,去完成数组的初始化,cell数组,默认长度为2,同时初始化当前线程hash对应数组位置的cell对象,数组初始化完成后释放锁
- 如果数组没有初始化,就需要加锁去初始cell数组
-
数组正在初始化,也计是其他未拿到锁的线程,作为兜底方案,会再这个分支把值直接累加到base上
-
数组初始化完成
-
如果数组初始化完成,但是线程对应位置的cell对象为null,就需要初始化cell对象
- 初始化cell对象,同样依靠内部变量cellsBusy,0表示无锁状态,1表示被其他线程持有了锁,值为0就使用cas把值更新为1,代表加锁
- cell对象初始化完成后,释放锁
-
如果线程竞争标志为false 存在冲突,就把竞争标识改为 true,然后重置当前线程的hash值,重新计算线程的槽位,让线程重新循环参与竞争
-
如果通过cas操作重新竞争成功,就跳出循环
-
如果数组的长度 n大于当前cpu的核数,就不可扩容,然后重置当前线程的hash值,让线程重新循环参与竞争
-
如果cell[] 需要扩容,同样需要拿到cas锁,新数组的长度是原数组的两倍,把原本的cell拷贝到新数组,数组的引用指向新数组后,释放锁
-
-
//x 是需要增加的值,fn 默认是null, wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//数组初始化完成
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//重制竞争标志
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//重新竞争cas操作
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重置hash值
h = advanceProbe(h);
}
//数组没有初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//如果cell为null,初始化cell数组,默认长度为2
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//数组正在初始化,作为一种兜底,把值直接累加到base上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
sum方法
- 将所有cell数组中的value值和base累加作为返回值
- 但是再sum执行时,并没有限制对base和cell的更新,所以longAdder 不是强一致性的,而是保证最终一致性
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}