原子操作类之18罗汉增强
原子操作类
java.util.concurrent.atomic
包下的所有类
上图中分为了两部分,为什么呢? 看下阿里巴巴手册
基本类型原子类
// 可以原子方式更新的值 boolean
AtomicBoolean
// 可以原子方式更新的 int值
AtomicInteger
// 可以原子方式更新的 long值
AtomicLong
常用API
public final int get();
public final int getAndSet(int new Value);
public final int getAndIncrement();
public final int getAndDecrement();
public final int getAndAdd(int delta);
public comapreAndSet(int expect,int update);
案列
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus() {
atomicInteger.getAndIncrement();
}
}
/**
* @author zjh
*/
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
for (int i = 0; i < SIZE; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
myNumber.addPlusPlus();
}
}, String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName() + "\t result:" + myNumber.atomicInteger.get());
}
}
// 执行了3次的结果都不一样,是因为在线程还没计算完,主线程就输出结果了
main result:26540
main result:22272
main result:30550
使用TimeUnit.SECONDS.sleep(2)
(不推荐,只用于测试)
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus() {
atomicInteger.getAndIncrement();
}
}
/**
* @author zjh
*/
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
for (int i = 0; i < SIZE; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
myNumber.addPlusPlus();
}
}, String.valueOf(i)).start();
}
// 暂停两秒main线程
try { TimeUnit.SECONDS.sleep( 2 ); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t result:" + myNumber.atomicInteger.get());
}
}
// 结果
main result:50000
使用CountDownLatch(推荐)
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus() {
atomicInteger.getAndIncrement();
}
}
/**
* @author zjh
*/
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (int i = 0; i < SIZE; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
myNumber.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t result:" + myNumber.atomicInteger.get());
}
}
// 结果
main result:50000
数组类型原子类
// 一个 int数组,其中元素可以原子方式更新
AtomicIntegerArray
// 一个 long数组,其中元素可以原子方式更新
AtomicLongArray
// | 一组对象引用,其中元素可以原子方式更新
AtomicRreferenceArray
案列
public class AtomicIntegerArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
// AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
// AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});
for (int i = 0; i <atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}
int tmpInt = 0;
tmpInt = atomicIntegerArray.getAndSet(0,1122);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
tmpInt = atomicIntegerArray.getAndIncrement(0);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));
}
}
引用类型原子类
// 可以原子方式更新的对象引用
AtomicReference
// AtomicStampedReference维护一个对象引用以及一个整数“标记”,可以原子方式更新
AtomicStampedReference
// AtomicMarkableReference维护一个对象引用以及一个标记位,可以原子方式更新
AtomicMarkableReference
AtomicReference(SpinLockDemo
案列)
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t ------come in");
while (!atomicReference.compareAndSet(null, thread)) {
// 线程自旋中
System.out.println(Thread.currentThread().getName() + "\t ------自旋中...");
try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t ------task over, unlock...");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.lock();
// 暂停几秒线程
try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}
spinLockDemo.unlock();
}, "A").start();
// 暂停500毫秒, 线程A先于线程B
try {TimeUnit.MICROSECONDS.sleep(5);} catch (InterruptedException e) {throw new RuntimeException(e);}
new Thread(() -> {
spinLockDemo.lock();
spinLockDemo.unlock();
}, "B").start();
}
}
AtomicStampedReference(ABA案列)
携带版本号的引用类型原子类,可以解决ABA问题。解决修改过几次的问题
public class AbaDemo {
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100,1);
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp);//1-----------初始获得一样的版本号
// 暂停500毫秒,保证t4线程初始化拿到的版本号和我一样,
try { TimeUnit.MILLISECONDS.sleep( 500 ); } catch (InterruptedException e) { e.printStackTrace(); }
atomicStampedReference.compareAndSet(100,101, atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName() + "\t 2次版本号:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName() + "\t 3次版本号:" + atomicStampedReference.getStamp());
},"t3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();//记录一开始的版本号,并且写死
System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp);//1------------初始获得一样的版本号
// 暂停1秒钟线程,等待上面的t3线程,发生了ABA问题
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace();}
// 这个还是初始的版本号,但是实际上版本号被T3修改了,所以会失败
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t"+result + "\t" + atomicStampedReference.getReference());
},"t4").start();
}
}
AtomicMarkableReference
解决是否修改过,它的定义就是将状态戳
简化为true|false
,类似一次性筷子
public class AtomicMarkableReferenceDemo {
static AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(100,false);
public static void main(String[] args) {
new Thread(()->{
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t" + "默认标识" + marked);
// 暂停1秒钟线程,等待后面的T2线程和我拿到一样的模式flag标识,都是false
try {
TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}
markableReference.compareAndSet(100, 1000, marked, !marked);
},"t1").start();
new Thread(()->{
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t" + "默认标识" + marked);
// 暂停2秒,让t1先修改, 然后t2试着修改
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
boolean b = markableReference.compareAndSet(100, 1000, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t" + "t2线程结果:" + b);
System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked());
System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference());
},"t2").start();
}
}
// 结果
t1 默认标识false
t2 默认标识false
t2 t2线程结果:false
t2 true
t2 1000
对象的属性修改原子类
// 原子更新对象中int类型字段的值
AtomicIntegerFieldUpdater
// 原子更新对象中Long类型字段的值
AtomicLongFieldUpdater
// 原子更新引用类型字段的值
AtomicReferenceFieldUpdater
使用目的
以一种线程安全带 方式操作非线程安全对象内的某些字段
使用要求
-
更新的对象属性必须使用public volatile修饰符
-
因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
synchronized
这种方法虽然安全了,但是锁的粒度太大了我只是修改money却把我真个bankAccount对象都给锁了
class BankAccount {
String bankName = "CCB";
public int money = 0;
public synchronized void add() {
money ++;
}
}
/**
* @author zjh
*/
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
bankAccount.add();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t" + "result:" + bankAccount.money);
}
}
AtomicReferenceFieldUpdater
这块只针对 money 字段进行了原子操作
class BankAccount {
String bankName = "CCB";
// 更新的对象属性必须使用public volatile修饰符
public volatile int money = 0;
// 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性
AtomicIntegerFieldUpdater<BankAccount> atomicIntegerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");
public void add(BankAccount bankAccount) {
atomicIntegerFieldUpdater.getAndIncrement(bankAccount);
}
}
/**
* @author zjh
*/
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
bankAccount.add(bankAccount);
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t" + "result:" + bankAccount.money);
}
}
AtomicReferenceFieldUpdater
class MyVar {
private volatile Boolean isInit = Boolean.FALSE;
AtomicReferenceFieldUpdater<MyVar, Boolean> atomicReferenceFieldUpdater =
AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");
public void init(MyVar myVar) {
if (atomicReferenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + "\t" + "----- start init,needs 2 seconds");
try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName() + "\t" + "----- over init");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "----- 已经有线程在进行初始化工作...");
}
}
}
/**
* @author zjh
*/
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) {
MyVar myVar = new MyVar();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
myVar.init(myVar);
}, String.valueOf(i)).start();
}
}
}
原子操作增强类原理深度解析
// 前面所讲的几种类型java5就有了,下面这些是java8才有的
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
题目
- 热点商品点赞计算器,点赞数加加统计,不要求实时精确
- 一个很大的List,里面都是int类型,如何实现加加,说说思路
模拟下点赞计数器,看看性能
LongAdder
常用API
入门案列
-
LongAdder
只能用来计算加法 。且从零开始计算 -
LongAccumulator
提供了自定义的函数操作 (利用lambda表达式,可以加减乘除)
public class LongAdderApiDemo {
public static void main(String[] args) {
LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.increment();
longAdder.increment();
System.out.println(longAdder.longValue());
LongAccumulator longAccumulator = new LongAccumulator(Long::sum,0);
longAccumulator.accumulate(1);
longAccumulator.accumulate(3);
System.out.println(longAccumulator.get());
}
}
synchronized
、atomicLong
、longAdder
、longAccumulator
性能对比
class ClickNumber {
int number = 0;
public synchronized void ClickBySynchronized(){
number++;
}
AtomicLong atomicLong = new AtomicLong(0);
public void ClickByAtomicLong(){
atomicLong.incrementAndGet();
}
LongAdder longAdder =new LongAdder();
public void ClickByLongAdder(){
longAdder.increment();
}
LongAccumulator longAccumulator = new LongAccumulator(Long::sum,0);
public void ClickByLongAccumulator(){
longAccumulator.accumulate(1);
}
}
/**
* @author zjh
*
* 需求:50个线程,每个线程1w次,计算总点赞数
*/
public class AccumulatorCompareDemo {
/**
* 每个线程执行次数
*/
public static final int THREAD_FREQUENCY = 1000000;
/**
* 线程数
*/
public static final int THREAD_NUMBER = 50;
public static void main(String[] args) throws InterruptedException {
ClickNumber clickNumber = new ClickNumber();
long startTime;
long endTime;
CountDownLatch countDownLatch1 = new CountDownLatch(THREAD_NUMBER);
CountDownLatch countDownLatch2 = new CountDownLatch(THREAD_NUMBER);
CountDownLatch countDownLatch3 = new CountDownLatch(THREAD_NUMBER);
CountDownLatch countDownLatch4 = new CountDownLatch(THREAD_NUMBER);
// synchronized
startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
try {
for (int j = 0; j < THREAD_FREQUENCY; j++) {
clickNumber.ClickBySynchronized();
}
} finally {
countDownLatch1.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("------synchronized:" + (endTime - startTime) + ",\tnumber:" + clickNumber.number);
// longAtomicLong
startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
try {
for (int j = 0; j < THREAD_FREQUENCY; j++) {
clickNumber.ClickByAtomicLong();
}
} finally {
countDownLatch2.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("------AtomicLong:" + (endTime - startTime) + ",\tnumber:" + clickNumber.number);
// longAdder
startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
try {
for (int j = 0; j < THREAD_FREQUENCY; j++) {
clickNumber.ClickByLongAdder();
}
} finally {
countDownLatch3.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("------LongAdder:" + (endTime - startTime) + ",\tnumber:" + clickNumber.number);
// longAccumulator
startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
try {
for (int j = 0; j < THREAD_FREQUENCY; j++) {
clickNumber.ClickByLongAccumulator();
}
} finally {
countDownLatch4.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("------LongAccumulator:" + (endTime - startTime) + ",\tnumber:" + clickNumber.number);
}
}
// 结果
------synchronized:3343, number:50000000
------AtomicLong:876, number:50000000
------LongAdder:116, number:50000000
------LongAccumulator:85, number:50000000
源码分析
架构图
LongAdder
是Striped64
的子类
LongAdder为什么这么快
- 官网说明
- 阿里说明
Striped64重要的成员函数
// Number of CPUS, to place bound on table size
// CPU数量,即cells数组的最大长度
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Table of cells. When non-null, size is a power of 2.
// 单元格数组|cells数组,为2的幂,2,4,8,16.....,方便以后位运算
transient volatile Cell[] cells;
// 基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
// Base value, used mainly when there is no contention, but also as
// a fallback during table initialization races. Updated via CAS.
transient volatile long base;
// 创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
// Spinlock (locked via CAS) used when resizing and/or creating Cells.
transient volatile int cellsBusy;
Cell
是java.util.concurrent.atomic
下Striped64
的一个静态内部类
LongAdder和AtomicLong比较
-
LongAdder
的基本思路就是分散热点 ,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。 -
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。
-
数学表达,内部有一个base变量,一个Cell[]数组。
-
base
变量:低并发,直接累加到该变量上 -
Cell[]
数组:高并发,累加各个线程自己的槽Cell[i]中
-
源码小总结
LongAdder
在无竞争的情况,跟AtomicLong
一样,对同一个base
进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间
,用一个数组cells
,将一个value
拆分进这个数组cells
。多个线程需要同时对value
进行操作时候,可以对线程id
进行hash
得到hash
值,再根据hash值映射到这个数组cells
的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells
的所有值和无竞争值base都加起来作为最终结果。
LongAdder.increment()
add(1L)
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
- 真正干活的是
longAccumulate
as
表示cells引用b
表示获取的base值v
表示期望值m
表示cells数组的长度a
表示当前线程命中的cell单元格
caseBase
方法可以看到使用了到了cas
/**
* CASes the base field.
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
第一次if条件满足一个条件即可,(as = cells) != null
如果是初始且无竞争情况下这个条件肯定不成立,!casBase(b = base, b + x)
无竞争情况下cas成功了直接结束if条件,如果失败则进入说明目前是高并发情况
第二次if条件是在竞争激烈(高并发)的情况下执行,可以看到进行了四种条件判断满足一个就会进入执行longAccumulate(x, null, uncontended);
该方法就会进行扩容
- 总结
- 最初无竞争时只更新base;
- 如果更新base失败后,首次新建一个Cell[]数组
- 当多个线程竞争同一个Cell比价激烈时,可能就要利用longAccumulate对Cell[]扩容。
public void add(long x) {
// as是striped64中的ceLLs数组属性
// b是striped64中的base属性
// v是当前线程hash到的ceLL中存储的值
// m是cells的长度减1,hash时作为掩码使用
// a是当前线和hash到的的cell
Cell[] as; long b, v; int m; Cell a;
// 首次首线程((as = cells) l= null)一定是false,此时走casBase方法,以CAS的方式更新base,且只有当cas失败时,才会走if中
// 条件1: cells不为空,说明出现过竞争,ceLL[]已创建
// 条件2: cas操作base 失败,说明其它线程先一步修改了base正在出现竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
// true无竞务 false天示意争激烈,多个线积hash到同一个cell,可能要扩容
boolean uncontended = true;
// 条件1: cells 为空,说明正在出现竞争,上面是从条件2过来的
// 条件2: 应该不会出现
// 条件3: 当前线程所在cell 为空,说明当前线程还没有更新过cell,应初始化一个cell
// 条件4: 更新当前线程所在的cell 失败,说明现在竞争很激烈,多个线程hash到了同一个cell,应扩容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是线程中前threadLocaLRandomProbe字段
// 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
(a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
-
步骤
(a = as[getProbe() & m])
里的probe,这里其实拿了hash值,通过hash值知道我们去到哪个cell槽
static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
-
总纲
- 上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:
- CASE1:Cell[]数组已经初始化
- CASE2:Cell[]数组未初始化(首次新建)
- CASE3:Cell[]数组正在初始化中
- 上述代码首先给当前线程分配一个hash值,然后进入一个for(;;)自旋,这个自旋分为三个分支:
-
计算
-
刚刚要初始化Cell[]数组(首次新建)
如果上面条件都执行成功就会执行数组的初始化及赋值操作, Cell[] rs = new Cell[2]表示数组的长度为2,rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1。
h & 1类似于我们之前HashMap常用到的计算散列桶index的算法,通常都是hash & (table.len - 1)。同hashmap一个意思。 -
兜底
多个线程尝试CAS修改失败的线程会走到这个分支else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
-
Cell数组不再为空且可能存在Cell数组扩容
多个线程同时命中一个cell的竞争,这个是最复杂的部分
(1)
上面代码判断当前线程hash后指向的数据位置元素是否为空,如果为空则将Cell数据放入数组中,跳出循环。如果不空则继续循环。(2)
(3)
(4)
(5)
(6)
总结
-
sum
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
sum()
会将所有Cell
数组中的value
和base
累加作为返回值。
核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点 。
为啥在并发情况下sum的值不精确?
sum执行时,并没有限制对base和cells的更新。所以LongAdder不是强一致性的,它是最终一致性的。首先,最终返回的sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了 ,而此时局部变量sum不会更新,造成不一致。
其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果。
使用总结
AtomicLong
-
线程安全,可允许一些性能损耗,要求高精度时可使用
-
保证精度,性能代价
-
AtomicLong是多个线程针对单个热点值value进行原子操作
LongAdder
-
当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
-
保证性能,精度代价
-
LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作
总结
AtomicLong
-
原理
CAS+自旋 -
场景
低并发下的全局计算,AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题 -
缺陷
高并发后性能急剧下降,AtomicLong的自旋会称为瓶颈(N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。)
LongAdder
-
原理
CAS + Base + Cell数组分散,空间换时间并分散了热点数据 -
场景
高并发的全局计算 -
缺陷
sum求和后还有计算线程修改结果的话,最后结果不够准确