1. 概述
AtomicLong通过循环CAS实现原子操作,缺点是当高并发下竞争比较激烈的时候,会出现大量的CAS失败,导致循环CAS次数大大增加,这种自旋是要消耗时间cpu时间片的,会占用大量cpu的时间,降低效率。那这个问题如何解决呢?JUC给我们提供了一个类,LongAdder, 它的作用和AtomicLong是一样的,都是一个实现了原子操作的累加器,LongAdder通过维护一个基准值base和 Cell 数组,多线程的时候多个线程去竞争Cell数组的不同的元素,进行cas累加操作,并且每个线程竞争的Cell的下标不是固定的,如果CAS失败,会重新获取新的下标去更新,从而极大地减少了CAS失败的概率,最后在将base 和 Cell数组的元素累加,获取到我们需要的结果。
2. 涉及到的类、变量、方法
变量:
Striped64(LongAdder的父类)中维护者三个变量:base、cellsBusy、Cell数组 ,都用 volatile修饰
base 为基本数据,在线程竞争比较少的时候,cas更新base,sum()返回的结果也为base;
cellsBusy 是一把锁,值为0或者1,1表示Cell数组正在进行初始化或者拓容操作;
Cell[] 数组 线程竞争的数据就是这个数组的元素,size为2的整数次幂(初始化size为2,后面每次拓容都是是原来的size 左移一位)
类:
Cell 类,Striped64的内部类,这个类和AtomicLong很像,是线程竞争的具体对象的类,提供了cas更新其value的方法,并且使用@sun.misc.Contended修饰,用来防止伪共享。
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
方法:
add(long x); LongAdder的add累加方法
sum(); LongAdder的获取累加结果的方法,由于在sum中没有任何的加锁的操作,在遍历获取cells中的值的时候,可能有线程在拓容累加等操作,所以sum返回的值并不精确,不是一个原子快照
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
casBase(long cmp, long val) ; 更新base的CAS方法
getProbe(); 获取线程的探针值,用于计算线程操作的cells数组的下标
advanceProbe(int probe) ; 修改线程的探针值,用于cas失败的时候不会再去循环等待某个元素,而重新获取新的元素去竞争
longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended);下面会详细说明
3. 代码解释
调用add 方法:
(1)逻辑为: cells 为null 并且cas更新base成功,则直接返回结果不进行后续判断,
(3)(4)(5) 逻辑为: cells !=null && as[getProbe() & m] != null && cas更新as[getProbe() & m] 成功,则直接结束,否则就会执行longAccumulate(x, null, uncontended)方法
(2)(5) uncontended 用来记录cas更新cell元素失败
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { //(1) boolean uncontended = true; //(2) if (as == null || (m = as.length - 1) < 0 || //(3) (a = as[getProbe() & m]) == null || //(4) !(uncontended = a.cas(v = a.value, v + x))) //(5) longAccumulate(x, null, uncontended); } }
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 用于通过 h & cells.length -1 计算线程操作的位置 int h; // 判断线程的probe是否初始化 if ((h = getProbe()) == 0) { // 初始化线程probe ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } // 用于记录cell数组当前是否存在空的桶,false表示大概率存在空桶,及时满足拓容条件也暂时先不拓容 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // cells 是否为空 if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { // cells 不为空,并且线程操作桶为null,则new Cell(x)并更新 if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } // 在LongAdder.add()方法中已经cas过一次,如果probe没有更新,则直接进行a.cas操作大概率失败,则加了此次判断 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // cell数组长度大于cpu数,则后续不再拓容 // cells != as 正在拓容,则下次循环有空桶的概率较大 将collide = false,下次执行到此处则会advanceProbe(h) 一次而非直接拓容 else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; // 拓容,每次为 数组长度 << 1 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } // 每次失败后都会修改probe值,重新进入循环,而非probe不变 h = advanceProbe(h); } // cells 为null,则初始化cells,初始size为2 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // cells 为null ,并且cellsBusy 锁竞争失败,则其他线程正在初始化,尝试casBase else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
我整理了一个详细的逻辑流程图:
4. 优缺点
高并发下不会循环cas,是的在高并发情况下效率比AtomicLong高,但是sum()方法返回值并不完全精确,所以可以用在并发要求比较高,但是结果精度要求不是特别搞得情况,如QPS的统计等场景,在工作中可能应用场景不会很多,但是我们学习的目的是更多的事为了学习其实现的原理和思路,Doug Lea 在写JUC包的时候的一些解决问题的思想和方式,帮助我们理解其他的并发类,也为我们自己的代码编写提供参考。
5. 问题思考
cells 是如何解决伪共享问题的 : 通过@sun.misc.Contended 注解强制Cells数组的单个元素独占cpu缓存行,进而避免伪共享
a = as[getProbe() & (cells.length -1)] 为何使用这样的计算方式: cells 的length 为 2的整数次幂,当 m 为 2的整数幂的时候, a % m 和 a & (m-1) 的结果是一样的,而且 & 是位运算比 %运算高很多,所以以这种方式计算线程操作cells的下标速度会很快,cells的长度之所以设计为2的整数次幂也是这个原因
wasUncontended 的作用是什么:减少二次竞争,wasUncontended =false 则说明最近一次cas失败了,继续执行也大概率事变,所以已经知道cas为 fail了就直接先执行h = advanceProbe(h),进行下一次循环,这样也是体现了JUC作者对性能的极致追求了,不放过任何一点的可能优化的空间;
collide 的作用: 不加这个标识,执行到拓容的代码部分时,就直接拓容了,但是,在上个循环中如果我们刚拓容的或者正在拓容,或者刚判断出有空的cell,这次循环中cells是大概率有空桶的并不需要立即拓容,所以这个标识的作用是再给一次机会,wasUncontended = true; h = advanceProbe(h),如果下次还是执行到了拓容部分,则再拓容,从而尽量减少不必要的拓容操作。