源码基于jdk1.8
这一片主要讲述ConcurrentHashMap如何实现低粒度锁的插入,如何实现统计元素个数,如何实现并发扩容迁移
一丶ConcurrentHashMap概述
-
支持高并发读写的哈希表,
ConcurrentHashMap
中每一个方法都是线程安全的,并且读数据通常不需要加锁,并发的性能远优于Hashtable
。//每一个方法都是线程安全,但是复合操作并不一定安全,如下的contains然后put ConcurrentHashMap<Object, Object> concurrentHashMap = new ConcurrentHashMap<>(); if (!concurrentHashMap.contains("1")){ concurrentHashMap.put("1","1"); }
-
虽然ConcurrentHashMap 具备很高的并发读写,但是对于并发的进行putAll 和get,只能反映
瞬时态
。下面这段代码如果线程A先拿到Hashtable的全局锁,那么线程B一定可以get出值,但是如果把Hashtable换做ConcurrentHashMap即使在时间上putAll先于get,也无法保证一定能get到key1对应值
Hashtable<Object, Object> hashtable = new Hashtable<>(); //线程A 执行 hashtable.putAll(hashMap); //线程B 执行 Object v1 = hashtable.get("key1");
-
ConcurrentHastMap的迭代器不会抛出
ConcurrentModificationException
但是可以遍历到元素只有创建迭代器时的那部分。
-
ConcurrentHashMap支持自动并发扩容,注意这个扩容是并发的,意味着扩容的期间读写并不会被阻塞
二丶ConcurrentHashMap的结构
ConcurrentHashMap的结构基本上和HashMap类似,都是数组+链表+红黑的方式:
-
数组
是散列表查找效率可以达到o(1)的关键
根据key的hash对数组大小取模得到下标index,然后根据下标快速定位到桶,利用了数组寻址快的优点
-
链表
作用是解决hash冲突,
发生hash冲突的节点就放在对应下标的链表 or 红黑树中
hash冲突: hash然后对数组大小取模的操作可以看作一个函数 这个函数的输入时无穷的,但是输出是介于0~数组大小的,那么必然存在多个key最后落在同一个下标的情况
-
红黑树
解决链表遍历时间复杂度o(n)的问题,使用红黑树利用二分搜索的性质进行优化。
红黑树本质是平衡二叉搜索树,平衡表示左子树右子树规模尽量接近,左子树小于父节点,父节点大于右子树的特征意味着可以使用log n的时间复杂度进行搜索,这个大小的比较优先使用Comparable类型key的compareTo方法,但是如果key不可比较那么将使用其hash值大小做比较。
在上图中锁的标志可以知道,ConcurrentHashMap锁的粒度,其锁只锁一个数组下标的元素,相比于Hashtable直接锁住整个哈希表,其锁的粒度更小,所以说ConcurrentHashMap具备更高的并发度。
三丶ConcurrentHashMap中的属性和常量
1.常量
这里只分析比较重要的常量,类似于为了兼容之前版本的常量不做列举
名称 | 值 | 含义 |
---|---|---|
MAXIMUM_CAPACITY | 1 << 30 | 数组最大大小 |
DEFAULT_CAPACITY | 16 | 默认容量大小 |
LOAD_FACTOR | 0.75 | 负载因子,1.8的负载因子为0.75,假如容量为8,那么当元素个数达到6的时候会发生扩容,扩容到原大小两倍 |
TREEIFY_THRESHOLD | 8 | 树化阈值,指定桶位 链表长度达到8的话,有可能发生树化操作 |
UNTREEIFY_THRESHOLD | 6 | 红黑树转化为链表的阈值 |
MIN_TREEIFY_CAPACITY | 64 | 接合TREEIFY_THRESHOLD控制桶位是否树化, 只有当table数组长度达到64且 某个桶位 中的链表长度达到8,才会真正树化 |
MIN_TRANSFER_STRIDE | 16 | 线程迁移数据最小步长,控制线程迁移任务最小区间一个值 |
NCPU | ? | 当前系统的cpu核数 |
RESIZE_STAMP_BITS | 16 | sizeCtl中用于生成扩容戳的位数(扩容时详细讲解) |
2.属性
属性 | 作用 |
---|---|
transient volatile Node<K,V>[] table | 散列表,长度一定是2次方数 |
private transient volatile Node<K,V>[] nextTable | 扩容过程中,会将扩容中的新table 赋值给nextTable 保持引用,扩容结束之后,这里会被设置为Null |
private transient volatile long baseCount | 实现ConcurrentHashMap计数的变量 |
private transient volatile int sizeCtl | ConcurrentHashMap中最复杂的变量,不同语境下具备不同作用,后面使用时介绍 |
private transient volatile int transferIndex | 扩容过程中,记录当前进度。所有线程都需要从transferIndex中分配区间任务,去执行自己的任务。 |
private transient volatile CounterCell[] counterCells | ConcurrentHashMap实现计数的变量 |
四丶源码分析
1.构造方法
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//如果初始容量大于 最大容量(1 << 30)的一半,那么直接使用最大容量,
//如果使用 tableSizeFor((初始容量 + 初始容量/2)+1) = 大于(1.5初始容量+1)的2的倍数
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
/**
* sizeCtl > 0
* 当目前table未初始化时,sizeCtl表示初始化容量
*/
this.sizeCtl = cap;
}
这里有趣的是,构造方法并不导致数组的初始化,这点类似HashMap,只有往ConcurrentHashMap中塞元素的才会初始化数组
1.1.tableSizeFor
使用位操作 返回大于等于c的最小的2的次方数
假如c为1297 n就是1296
1296的二进制 0000 0101 0001 0000
右移1位 0000 0010 1000 1000
或操作 0000 0111 1001 1000
右移2位 0000 0001 1110 0110
或操作 0000 0111 1111 1111
右移4位 0000 0000 0111 1111
或操作 0000 0111 1111 1111
。。。右移 16位就让32位树最高位及以后的都为1
最后进行+1 求出不小于它的最接近的2的整数幂m
n<0 说明当前数太大了 32位全是1 已经超过了int 最大,数组不可以开辟这么大
2.主线剧情——put方法
2.1带着问题学源码
put方法是看懂ConcurrenHashMap的重点,此方法会被多个线程并发调用,那么douglea 是如何并发问题的昵?
- 数组的初始化,如何保证只有一个线程初始化,并且不会重复初始化
- 如果插入key对应下标为空,多个线程如何保证不会发生覆盖
- 如何实现ConcurrentHashMap元素数量计数
- 如何实现并发扩容迁移
2.2 源码解析
线程安全且低粒度锁的插入 -> 热点数据分离的统计ConcurrentHashMap元素个数 -> 并发扩容迁移
put方法直接调用了putVal,其中第三个参数表示只有在存在相同key的时候才插入,这里为false说明覆盖(之前存在key=1,value=2,这时候插入key=1,value=3.那么会覆盖为3)。下面我们大致看下putVal方法的流程
2.2.1 spread让散列更均匀
下面我们分析为什么需要使用高16位进行异或操作,为什么这种操作可以让key散列到数组中更均匀(这里和HASH_BITS进行且操作的目的是让hash值恒定为一个正数,因为再ConcurrentHashMap中负数hash值具备特殊意义,HashMap中则没有这个特殊处理)
-
key是散列到指定数组下标的
这里很巧妙的利用了数组长度是2的倍数的特点,2的倍数-1意味着低位连续x个1,对其进行且操作就是只保留原hash值的低x位等于取模
-
为什么需要使用高16位进行异或操作,为什么这种操作可以让key散列到数组中更均匀
假设插入key的hash满足一个规律 hash = 6000整数倍+3,且数组的长度为8
hash 对应二进制 且上(数组长度-1 = 7 =0111) 6003 0001011101110 011 011 = 3 60003 1110101001100 011 011 = 3 600003 10010010011111000 011 011 = 3 6000003 010110111000110110000 011 011 = 3 可以看到最终这些key都将落在数组下标3中,这是为什么昵?
因为上面的i * 60000对8取余肯定是0 剩下的结果只能是3对8取余 导致结果一直是3 因为hash 高位的值没有参与计算
如果使用高16位进行异或,得到数字的低位16位包含了原高16位的特征,从而让高位也参与计算,让散列更均匀
hash 对应二进制 hash ^ (hash>>>16) 且上(数组长度-1 = 7 =0111) 6003 0001011101110 011 0001011101110 011 011 = 3 60003 1110101001100 011 1110101001100 011 011 = 3 600003 10010010011111000 011 011000000000000000000010 010 = 2 6000003 010110111000110110000 011 010110111000110111011000 000 = 0 可以看到
600003,6000003
被散列到2和0,避免了这一类数据都散列到下标3导致hash冲突剧烈的问题。 -
上述高16位异或的做法有什么缺点和优点
-
缺点
根据上面表格的例子可以看到
6003,60003
由于这些数字二进制高16位全为0,所以最终还是都散列到3,并没有被很好的离散开。可以学习redis 使用Crc16对key进行散列,可以让散列更均匀
-
优点
时间复杂度低,如果使用md5,或者Crc16等算法进行散列,其时间成本将更大。
-
2.2.2 如果数组没有初始化,如何保证只有一个线程初始化成功,并不发生覆盖
这种情况发生在ConcurrentHashMap第一次塞元素的情况,会调用initTable对数组进行初始化,和HashMap不同点在于,这个initTable存在多个线程并发调用的可能,下面我们看看doug lea如何保证这一步的线程安全,和线程可见性(线程A初始化了数组,线程B必须马上可见)
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//自选,只要数组没有初始化
while ((tab = table) == null || tab.length == 0) {
//sizeCtl<0 说明其他线程正在初始化数组
if ((sc = sizeCtl) < 0)
//当前线程只需要放弃cpu 静静等待
Thread.yield(); // lost initialization race; just spin
//cas设置sizeCtl 为-1 cas成功表示当前线程成功上锁,由当前线程进行初始化table
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次检查,避免其他线程初始化了,当前线程还进行初始化导致前面线程put的元素丢失
//因为下面finally 会释放锁,释放的一瞬间可能其他线程成功cas为-1,从而让前面初始化线程put的内容丢失
if ((tab = table) == null || tab.length == 0) {
//sc 大于0 这是用户在构造方法中设置的大小,如果用户没有设置那么使用默认大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//new 一个数组 并且赋值给table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//n为当前数组的大小
//n - (1/4*n) = 3/4 n 也就是负载因子0.75 * n
sc = n - (n >>> 2);
}
} finally {
//释放锁
sizeCtl = sc;
}
//结束自旋
break;
}
}
//返回初始化后的数组
return tab;
}
总体上是利用sizeCtl做了一把锁,如果没有抢占到锁,那么让出cpu,这里没有使用synchronized
的原因应该是:初始化一个数组是一个很快的过程,如果把其他线程挂起,初始化完后还需要进行唤醒,挂起和唤醒需要从用户态切换到内核态,为了一个短暂的初始化而进行挂起和唤醒是得不偿失的。
并且table属性是volatile修饰的属性,保证了可见性
代码中还有一点非常有趣
2.2.3 如果插入位置不存在元素,如何保证并发情况下不会发生覆盖
可以看到doug lea使用cas的方式保证了多个线程插入元素到先前不具备元素的桶中,只有一个线程会成功,成功的线程将结束自旋,失败的线程将继续自旋
这里还有一个细节,那就是如何保证获取数组元素的线程可见性,table数组虽然被volatile修饰,但是table中元素怎么保证可见性:
可以看到,获取table元素的方式是 Unsafe.getObjectVolatile(数组元素的地址)
的方式,这种方式会 在读指令前插入读屏障,可以让高速缓存中的数据失效,重新从主内存加载数据
2.2.4 如果发生hash冲突,如何保证线程安全的将当前插入数据追加到链表或者红黑树中
这里先暂时忽略helpTransfer 帮助扩容迁移的内容,后面讲到扩容我们再看看doug lea如何实现帮助扩容迁移的
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不可存null key 和null value
if (key == null || value == null) throw new NullPointerException();
// 对key的hash进行扰动,是高16位参与计算,让散列更均匀
int hash = spread(key.hashCode());
//如果插入到链表中的那么记录链表中的下标,
//如果插入到红黑树那么恒定为2,
//如果没有发生hash冲突(没有插入到链表也没插入到树)那么为0
int binCount = 0;
//自旋
for (Node<K,V>[] tab = table;;) {
//忽略无关代码。。。
else {
//进入这个位置的前提
//1.数组初始化了
//2.当前key hash对数组取模得到下标后,根据下标到数组中拿到对应的节点不是空(不是第一个插入在这个位置的元素)
//3 数组在当前插入位置没有进行扩容迁移
//旧值
V oldVal = null;
//加锁 锁的是头节点(可能是链表,也可能是树节点,等其他节点)
synchronized (f) {
// 重新获取以下,避免其他线程修改了头节点,从而锁错,如果其他线程修改了头,那么什么也不做,继续自选
if (tabAt(tab, i) == f) {
//当前key hash对数组取模得到下标后,根据下标到数组中拿到对应的节点的hash值 大于0,说明是链表节点,因为树节点的hash恒定-2
if (fh >= 0) {
//记录在链表中的位置
binCount = 1;
//遍历链表中每一个节点
for (Node<K,V> e = f;; ++binCount) {
// ek记录当前遍历节点的key
K ek;
//当前遍历节点kash = 企图插入key的hash
//且key是同一个对象 或者key equals为true 说明找到插入的位置
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//记录旧值
oldVal = e.val;
//如果不是不存在才插入(put方法onlyIfAbsent=false,会进行值替换)
if (!onlyIfAbsent)
//替换值
e.val = value;
//插入成功那么结束自选
break;
}
//遍历节点的前一个节点
Node<K,V> pred = e;
//e是当前遍历的节点,如果当前节点的下一个节点为null
//此处还会让e = e.next 相当于让遍历节点向后移动
if ((e = e.next) == null) {
//到这儿说明 当前key和链表中每一个节点的key都不同(不能==,也不能equals)
//那么将当前插入的值包装成node 挂到链表末尾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//当前key hash对数组取模得到下标后,根据下标到数组中拿到对应的节点 是一个树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
//恒定binCount为2
binCount = 2;
//红黑树的插入 p为插入过程中key ==或者equals的节点,
//说明存在重复的节点 p是被覆盖的节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
//记录旧值
oldVal = p.val;
//如果不是不存在才插入(put方法onlyIfAbsent=false,会进行值替换)
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount 如果插入下标对应的第一个节点是空(当前是第一个插入到这个位置的)才为0
//插入链表中 binCount记录插入到链表中的下标
//如果插入到红黑中,binCount恒定为2
if (binCount != 0) {
//大于树化的阈值,那么进行树化(内部会要求数组长度大于64)
if (binCount >= TREEIFY_THRESHOLD)
//树化
treeifyBin(tab, i);
//如果之前相同key的元素存在,返回旧的key
if (oldVal != null)
return oldVal;
break;
}
}
}
//计数,会记录ConcurrentHashMap中的元素个数
//如果超过扩容阈值 还会进行扩容迁移
addCount(1L, binCount);
return null;
}
这部分代码没有很复杂,使用synchronized
锁住头节点保证线程安全,但是即使加了锁,doug lea居然还会使用if (tabAt(tab, i) == f)
再次进行校验,这是为什么昵?
2.2.5 如何统计ConcurrentHashMap中元素数量,和实现并发扩容迁移——addCount方法
统计数量和并发扩容都交给putVal最后的addCount方法,如果是覆盖了原有key value或者没有插入到ConcurrentHashMap那么将不进行addCount(1L, binCount)
,因为没有新增元素。
-
统计元素个数的难点
如果addCount被并发调用且只使用一个属性记录元素个数,我们可以使用下列方案统计数量
-
使用独占锁
这种方案的缺点,多个线程并发插入最终都将受限于统计数量,这一步只能串行,这和ConcurrentHashMap主打一个并发是相悖的
-
使用AtomicLong
这种方案的缺点在于,假如当前有100个线程,线程1cas修改数目成功,将导致线程2~100失败继续自旋,线程2成功将导致其余线程失败自旋,导致线程进行多次无意义的自旋,把珍贵的cpu资源浪费在自旋上。这是
热点数据更新
导致的问题 -
使用LongAdder
ConcurrentHashMap没有直接使用LongAdder,几乎是一样的代码再写了一次,可能doug lea觉得ConcurrentHashMap只需要加1这种操作,不需要所有LongAdder所有功能从而进行了独立实现
JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法 - Cuzzz - 博客园 (cnblogs.com)这篇博客详细讲解了LongAdder的原理——热点数据分离
其实和分表思想类似,将热点数据分散到多出,从而减少热点数据更新的问题。
这种方案虽好但是有另外一个问题——统计热点数据之和不具备瞬时一致性,想象我们需要遍历完所有热点数据计算其和,可能遍历完A,再遍历B的时候,A被修改了。
鱼和熊掌不可兼得
-
-
并发扩容迁移的难点
如何让多个线程协调合作,如何让迁移的过程不阻塞读,如何保证多线程迁移的线程安全问题
addCount方法很长,我们分两部分讲解
2.2.5.1 ConcurrentHashMap统计总数
增加CocurrentHashMap元素数量,并统计总数
这部分需要LongAdder的基础,推荐学习《JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法 - Cuzzz - 博客园 (cnblogs.com)》
//as 表示 LongAdder.cells
//b 表示LongAdder.base
//s 表示当前map.table中元素的数量
CounterCell[] as; long b, s;
//条件一:(as = counterCells) != null
//true->表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell 去累加数据
// false->表示当前线程应该将数据累加到 base
//条件二:!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
//false->表示写base成功,数据累加到base中了,当前竞争不激烈,不需要创建cells
//true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
//有几种情况进入到if块中?
//1.true->表示cells已经初始化了,当前线程应该去使用hash寻址找到合适的cell 去累加数据
//2.true->表示写base失败,与其他线程在base上发生了竞争,当前线程应该去尝试创建cells。
//a 表示当前线程hash寻址命中的cell
CounterCell a;
//v 表示当前线程写cell时的期望值
long v;
//m 表示当前cells数组的长度
int m;
//true -> 未竞争 false->发生竞争
boolean uncontended = true;
//条件一:as == null || (m = as.length - 1) < 0
//true-> 表示当前线程是通过 写base竞争失败 然后进入的if块,就需要调用fullAddCount方法去扩容 或者 重试.. LongAdder.longAccumulate
//条件二:a = as[ThreadLocalRandom.getProbe() & m]) == null 前置条件:cells已经初始化了
//true->表示当前线程命中的cell表格是个空,需要当前线程进入fullAddCount方法去初始化 cell,放入当前位置.
//条件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
// false->取反得到false,表示当前线程使用cas方式更新当前命中的cell成功
// true->取反得到true,表示当前线程使用cas方式更新当前命中的cell失败,需要进入fullAddCount进行重试 或者 扩容 cells。
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
) {
//这里面涉及到Cell数组的扩容,自旋计数
//和LongAdder代码一模一样
fullAddCount(x, uncontended);
return;
}
//只有put插入的时候,key对应table位置的元素为空,且cas成功 这时候check为0
//或者删除的时候 check为-1
//也就是说是这个桶中第一次插入,没有发生hash冲突,doug lea 认为这种情况说明桶中元素并不多
//或者说是删除操作,删除自然不需要进行扩容
//所以直接返回 不会进行扩容
if (check <= 1)
return;
//记录当前ConcurrentHashMap中有多少个元素 这个过程会遍历Cell数组
//因此不具备瞬时一致性
s = sumCount();
}
....下面为并发扩容迁移
2.2.5.2 addCount调度线程发起扩容或者加入扩容
扩容过程中需要记录当前扩容批次,比如从数组长度16扩容到数组长度32会为扩容生成一个唯一标识
并且还会记录有多少个线程在进行扩容,这两部分信息用sizeCtl一并记录
高16位表示:扩容的标识戳(方法resizeStamp(当前数据大小)生成)
低16位表示:(1 + 参与扩容的线程数)
可以看到resizeStamp生成批次或上(扩容线程数+1)一定是负数==>sizeCtl如果是负数表示当前正在进行扩容
//s 是上面统计Cell数组得到的ConcurrentHashMap元素个数
//删除情况调用addCount的时候check是负数,这里标识是compute 或者put方法新增了元素
if (check >= 0) {
//tab 表示map.table
//nt 表示map.nextTable
//n 表示map.table数组的长度
//sc 表示sizeCtl的临时值
Node<K,V>[] tab, nt; int n, sc;
//自旋
//条件一:s >= (long)(sc = sizeCtl)
// true-> 1.当前sizeCtl为一个负数 表示正在扩容中..
// 2.当前sizeCtl是一个正数,表示扩容阈值
// false-> 表示当前table尚未达到扩容条件
//条件二:(tab = table) != null
// 恒成立 true
//条件三:(n = tab.length) < MAXIMUM_CAPACITY
// true->当前table长度小于最大值限制,则可以进行扩容。
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//扩容批次唯一标识戳,使用位操作生成,扩容唯一标识
//16 -> 32 扩容 标识为:1000 0000 0001 1011
int rs = resizeStamp(n);
//条件成立:表示当前table正在扩容
// 当前线程理论上应该协助table完成扩容
if (sc < 0) {
//条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
// true->说明当前线程获取到的扩容唯一标识戳 非 本批次扩容
//可能当前线程长时间没有获得时间片,以至于其他线程都扩容结束,进行下一轮扩容了
//当前线程才姗姗来迟
// false->说明当前线程获取到的扩容唯一标识戳 是 本批次扩容
//条件二: JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs << 16 ) + 1
// true-> 表示扩容完毕,当前线程不需要再参与进来了
// false->扩容还在进行中,当前线程可以参与
//条件三:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs<<16) + MAX_RESIZERS
// true-> 表示当前参与并发扩容的线程达到了最大值 65535 - 1
// false->表示当前线程可以参与进来
//条件四:(nt = nextTable) == null
//nextTable在完成扩容后会赋值给table,然后nextTable置为null
// true->表示本次扩容结束
// false->扩容正在进行中
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//前置条件:当前table正在执行扩容中.. 当前线程有机会参与进扩容。
//条件成立:说明当前线程成功参与到扩容任务中,并且将sc低16位值加1,表示多了一个线程参与工作
//条件失败:1.当前有很多线程都在此处尝试修改sizeCtl,有其它一个线程修改成功了,导致你的sc期望值与内存中的值不一致 修改失败
// 2.transfer 任务内部的线程也修改了sizeCtl。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//协助扩容线程,持有nextTable参数
transfer(tab, nt);
}
//1000 0000 0001 1011 0000 0000 0000 0000 +2 => 1000 0000 0001 1011 0000 0000 0000 0010
//条件成立,说明当前线程是触发扩容的第一个线程,在transfer方法需要做一些扩容准备工作
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//触发扩容条件的线程 不持有nextTable
transfer(tab, null);
s = sumCount();
}
}
可以看到扩容操作在transfer函数中进行,addCount会负责维护sizeCtl,并且让当前线程发起扩容,或者参与扩容,我们关注下发起和参与是如何区分的
可以看到addCount在并发扩容这一步相当于是一个调度者,在自旋中让当前线程,要么发起(只有一个幸运儿可以发起)要么加入,要么扩容结束当前线程break
接下来我们看看transfer是如何让多个线程有条不紊进行扩容的
2.2.5.3 transfer方法实现并发扩容
假如你现在有100个手下,你需要安排他们一起做一件事情(扩容迁移),且这一百个手下能力一样(CPU分时调度,谁也别瞧不起谁)你要如何协调他们昵?
-
分配任务
- 任务尽量均匀
- 任务不要太小,100个线程迁移64长度的格子,难道每一个线程0.64个格子么,ConcurrentHashMap是基于内存的,读写很快,所有任务太小反而会因为线程上下文的切换带来更多的开销
-
迁移过程中如何告诉读线程,这个数组下标的元素已经被迁移走了,你去新表中读吧,尽量不要阻塞读
如何告诉写线程,当前在扩容迁移,让写线程参与进来帮忙一起迁移?
这里Doug lea使用
ForwardingNode
来实现新数组的路由,和让写线程根据特定节点hash值,还参与扩容 -
如何同步多个线程扩容的进度
这里使用全局属性transfer,使用volatile修饰,实现进度对多个线程可见
transfer这段代码很妙,很长,我们分段看
-
任务分配和多线程间的协调和一些前置操作
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { //n 表示扩容之前table数组的长度 //stride 表示分配给线程任务的步长 int n = tab.length, stride; //MIN_TRANSFER_STRIDE = 16 //如果我们具备多核CPU 且 数组长度/8/cpu核心数 > 16 // 那么 每一个线程分配 【数组长度/8/cpu核心数】个连续数组单元进行扩容迁移 //NCPU > 1) ? (n >>> 3) / NCPU : n if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //条件成立:表示当前线程为触发本次扩容的线程,需要做一些扩容准备工作 //条件不成立:表示当前线程是协助扩容的线程.. if (nextTab == null) { // initiating try { //创建了一个比扩容之前大一倍的table @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } //赋值给对象属性 nextTable ,方便协助扩容线程 拿到新表 nextTable = nextTab; //记录迁移数据整体位置的一个标记。 // 从1开始计算的,不是从0开始,也就是说transferIndex = n 表示从n-1开始迁移直到0 transferIndex = n; } //表示新数组的长度 int nextn = nextTab.length; //fwd 节点,当某个桶位数据处理完毕后,将此桶位设置为fwd节点, //fwd节点持有nextTable的引用,且其hash恒定位MOVED = -1 //如果线程企图写到这个单元格那么会加入扩容,如果尝试读那么将被转移到新数组中进行读 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //推进标记 boolean advance = true; //完成标记 boolean finishing = false; // to ensure sweep before committing nextTab //i 表示分配给当前线程任务,执行到的桶位,从0开始计数, // 也就是说如果i=10 表示当前线程要迁移i-stride~i的数组单元格 //bound 表示分配给当前线程任务的下界限制 int i = 0, bound = 0; //自旋 for (;;) { //f 桶位的头结点 //fh 头结点的hash Node<K,V> f; int fh; //下面的代码要进行 //1.给当前线程分配任务区间 //2.维护当前线程任务进度(i 表示当前处理的桶位) //3.维护ConcurrentHashMap对象全局范围内的进度 while (advance) { //分配任务的开始下标 //分配任务的结束下标 int nextIndex, nextBound; //CASE1: //条件一:--i >= bound //成立:表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个 桶位. //不成立:表示当前线程任务已完成 或 者未分配 if (--i >= bound || finishing) advance = false; //CASE2: //前置条件:当前线程任务已完成 或 者未分配 //条件成立:表示对象全局范围内的桶位都分配完毕了,没有区间可分配了, // 设置当前线程的i变量为-1 跳出循环后,执行退出迁移任务相关的程序 //条件不成立:表示对象全局范围内的桶位尚未分配完毕,还有区间可分配 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //CASE3: //前置条件:1、当前线程需要分配任务区间 2.全局范围内还有桶位尚未迁移 //条件成立:说明给当前线程分配任务成功 //条件失败:说明分配给当前线程失败,应该是和其它线程发生了竞争吧 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //CASE1: //条件一:i < 0 //成立:表示当前线程未分配到任务 if (i < 0 || i >= n || i + n >= nextn) { //保存sizeCtl 的变量 int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //条件成立:说明设置sizeCtl 低16位 -1 成功,当前线程可以正常退出 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //1000 0000 0001 1011 0000 0000 0000 0000 //条件成立:说明当前线程不是最后一个退出transfer任务的线程 //如果当前线程是最后一个那么 sizeCtl 前16位是扩容标识戳, //后16位是 0000 0000 0010 = 2 所以这里减2 就是拿到 高16位为扩容戳,低16位全0的数 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) //正常退出 return; finishing = advance = true; i = n; // recheck before commit } } //前置条件:【CASE2~CASE4】 当前线程任务尚未处理完,正在进行中 //CASE2: //条件成立:说明当前桶位未存放数据,只需要将此处设置为fwd节点即可。 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //CASE3: //条件成立:说明当前桶位已经迁移过了,当前线程不用再处理了,直接再次更新当前线程任务索引,再次处理下一个桶位 或者 其它操作 else if ((fh = f.hash) == MOVED) advance = true; // already processed //CASE4: //前置条件:当前桶位有数据,而且node节点 不是 fwd节点,说明这些数据需要迁移。 else { //下面是真正进行迁移
-
nextTable1的初始化
这可以看到扩容数组大小是原数组的两倍(n<<1)但是doug lea 不害怕这里nextTable重复初始化导致迁移工作白做的问题么?其实在调用transfer方法的地方都有如下这段代码做控制
可以看到如果nextTable为null,那么其他线程会视当前扩容还没有启动,从而break出去!doug lea之所以这么做主要是为了实现nextTable初始化和其他线程写操作的异步,如果第一个线程没有初始化nextTable 那么其他线程break掉自旋,让写操作不要等待这个初始化nextTable的操作,狠狠的压榨CPU!!! 卧槽 doug lea您是真的牛逼!
-
自旋+cas分配任务
-
-
真正进行迁移
这一段代码最妙的点,在对于最后一个线程的判断,校验sc-2低16位是否全部为0的同时,还是保证了 当前线程是这一批次的扩容线程,并且对于最后一个线程,它将负责修改table 为新数组,修改nextTable = null
其中如果迁移了null格子,或者迁移到已经被迁移的格子,advance会赋值位true,将继续分配格子进行迁移
接下来就是使用synchronized,实现元素的迁移
//synchronized 加锁当前桶位的头结点 synchronized (f) { //这里和put 一样会 if (tabAt(tab, i) == f) { //ln 表示低位链表引用 //hn 表示高位链表引用 Node<K,V> ln, hn; //条件成立:表示当前桶位是链表桶位 if (fh >= 0) { //lastRun机制 //可以获取出 当前链表 末尾连续高位不变的 node int runBit = fh & n; Node<K,V> lastRun = f; //从头开始, for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; //确保为连续的高位,或者为连续的低位 if (b != runBit) { runBit = b; lastRun = p; } } //条件成立:说明lastRun引用的链表为 低位链表,那么就让 ln 指向 低位链表 if (runBit == 0) { ln = lastRun; hn = null; } //否则,说明lastRun引用的链表为 高位链表,就让 hn 指向 高位链表 else { hn = lastRun; ln = null; } //注意这里是new了一个节点,并没有改变原本链表的结构 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //加锁了下面没有使用 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //条件成立:表示当前桶位是 红黑树 代理结点TreeBin else if (f instanceof TreeBin) { //转换头结点为 treeBin引用 t TreeBin<K,V> t = (TreeBin<K,V>)f; //低位双向链表 lo 指向低位链表的头 loTail 指向低位链表的尾巴 TreeNode<K,V> lo = null, loTail = null; //高位双向链表 lo 指向高位链表的头 loTail 指向高位链表的尾巴 TreeNode<K,V> hi = null, hiTail = null; //lc 表示低位链表元素数量 //hc 表示高位链表元素数量 int lc = 0, hc = 0; //迭代TreeBin中的双向链表,从头结点 至 尾节点(TreeNode其实有next和pre指针将树节点串联起来) for (Node<K,V> e = t.first; e != null; e = e.next) { // h 表示循环处理当前元素的 hash int h = e.hash; //使用当前节点 构建出来的 新的 TreeNode TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); //条件成立:表示当前循环节点 属于低位链 节点 if ((h & n) == 0) { //条件成立:说明当前低位链表 还没有数据 if ((p.prev = loTail) == null) lo = p; //说明 低位链表已经有数据了,此时当前元素 追加到 低位链表的末尾就行了 else loTail.next = p; //将低位链表尾指针指向 p 节点 loTail = p; ++lc; } //当前节点 属于 高位链 节点 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果元素个数小于 红黑树转化为链表的阈值 6 那么进行链表化 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; //同上 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } }
这里最妙最妙最妙的就是lastRun这个机制,为什么doug lea要从头找到尾部连续个高位或者低位的节点
-
何为高位,低位
注意红色箭头指的这一位,由于数组的长度时2的倍数,所以结尾是全0,存在一位为1,和其他key的hash去且操作 = 获取这个key在红色箭头这一位是0 还是 1,如果是1视作高位,反之视为低位。当前线程在迁移位于x位置的节点,那么高位节点们迁移后将位于新表的x+n位置,低位节点迁移后还是位于新表的x位置。下面解释这一点
如果一个数原本在红色箭头位置是1,那么和扩容后的数进行且操作的时候,蓝色箭头位置这一位必然也是1,蓝色箭头之后的位置也是不变的,对于高位的key且操作之后,蓝色箭头位置是1,对于低位的key且 操作后蓝色位置是0。为1可以看作实在原本橙色基础上加上了一个64,为0可以看作是在原本橙色基础上加了0
-
为什么要这么做
减少new Node对象的操作
这里原本就有一部分连续的高位节点,直接拿到绿色节点的作为高位的头即可。但是不能切断原本链表中任何的节点的连接指针,也就是说如果想把蓝色框框这部分节点进行迁移,需要new Node,为什么昵?
因为并发迁移的过程中,可能存在其他线程正在读这个链表,如果我们断开了之前节点的连接将导致其他线程找不到原本链表中的节点,doug lea后面也确实是使用new的方式,这样相当于给读线程一个一致性视图(HashMap则没有这个策略,直接把原节点拆开两部分)