目录
ConcurrentHashMap它是一个线程安全的Map容器类,那么它是如何来保证线程安全的?本文将通过源码,尝试分析它的底层原理,本文基于JDK8。
ConcurrentHashMap的底层原理和HashMap是比较相似的,比较大的区别就是在保证线程安全方面,建议在看本文之前,如果不了解HashMap可以先移步到我的另一篇博客java HashMap源码分析-CSDN博客。
ConcurrentHashMap的UML图如下。
一、一些重要属性
常量
MAXIMUM_CAPACITY = 1 << 30 ,这里1左移30位,表示 数组最大容量MAXIMUM_CAPACITY 为 2^30 即2的30次方,这个容量与 HashMap 的最大容量是一样的。
DEFAULT_CAPACITY = 16 表示的就是默认的数组初始容量为16,默认初始容量与HashMap的默认初始容量一样,都为16。
LOAD_FACTOR 加载因子为0.75,即当数组中加入的数据超过了当前容量的0.75倍时,要进行数组的扩容,这一点与 HashMap 是一样的,加载因子都是0.75。
table 就是我们 ConcurrentHashMap 底层真正存贮数据的那个数组,名为table。HashMap 底层的数组名字也叫 table。
MIN_TREEIFY_CAPACITY 变量代表了数组的阈值长度64。
TREEIFY_THRESHOLD这个变量代表了链表的长度阈值8,与上面的64紧密配合。当链表的长度大于8且数组的长度大于64,就会把链表树化,提高查找效率。这个转换成树的时机与HashMap 一样,都是数组长度大于等于64并且链表长度大于等于8时,链表转换成红黑树结构。
sizeCtl属性
想要读懂 ConcurrentHashMap 的源码,sizeCtl这个变量非常关键。这里大致总结了 sizeCtl 的几种情况:
- sizeCtl 为0,代表数组未初始化,且数组的初始容量为16。
- sizeCtl 为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么记录的是数则的扩容阈值。
- sizeCtl 为 -1,表示数组正在进行初始化。
- sizeCtl 小于0,并且不是 -1,表示数组正在扩容,-(1+n),表示此时有n个线程正在共同完成数组的扩容操作。
Node类
我们先来看看最基础的内部存储结构 Node,这就是一个一个的节点,如这段代码所示:
可以看出,每个 Node 里面是 key-value 的形式,并且把 value 用 volatile 修饰,以便保证可见性,同时内部还有一个指向下一个节点的 next 指针,方便产生链表结构。
TreeNode类
树节点类,另外一个核心的数据结构。
当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。
TreeBin类
这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。
这里仅贴出它的构造方法。可以看到在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识位。同时也看到我们熟悉的红黑树构造方法。
ForwardingNode类
ForwardingNode节点是数组扩容时用来连接新数组和旧数组的一个临时节点,在扩容进行中才会出现,只是在扩容阶段使用的节点。
可以看到ForwardingNode节点继承自Node,并添加了一个nextTable字段指向新表,并且hash值为MOEVD,即-1。当遍历到这个节点时,表明此节点处的链表正在复制转移,它的hash值固定为-1,且不存储实际数据。
如果旧table数组的一个hash桶中全部的结点都迁移到了新table中,也就是当前桶扩容完毕,则在这个桶中放置一个ForwardingNode。此时查询会跳转到查询扩容后的table。
读操作碰到ForwardingNode时,将操作转发到扩容后的新table数组上去执行;写操作碰见它时,则尝试帮助扩容,扩容是支持多线程一起扩容的。
这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。
其中的该外层循环用于避免在处理转发节点时出现深层递归。如果节点是转发节点ForwardingNode,则需要更新表指针并继续查找。
总结一下,通过使用外层和内层循环的结构,避免了在转发节点时的深度递归,增强了代码的健壮性。多重条件判断可以确保在查找过程中有效地处理各种情况,包括节点为空、转发节点、链表等。由于ConcurrentHashMap的并发设计,find
方法能够安全地在多线程环境下查找节点。
此方法通过循环和条件判断实现了高效的键查找,支持多线程安全性,同时也能灵活处理扩容带来的节点迁移。这样的设计确保了 ConcurrentHashMap
在高并发情况下的性能和可用性。
二、Unsafe类方法
在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法。这个方法是利用一个CAS算法实现无锁化的修改值的操作,可以大大降低锁代理的性能消耗。
这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁的思想是比较类似的。
unsafe静态块:
unsafe 代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。 在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。 利用CAS进行无锁操作,可以大大提高性能。
ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。
@SuppressWarnings("unchecked") //transient volatile Node<K,V>[] table; tab变量确实是volatile
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {//获取table中索引 i 处的元素。
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);//如果tab是volatile变量,则该方法保证其可见性。
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,//通过CAS设置table索引为 i 处的元素。
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//transient volatile Node<K,V>[] table; tab变量确实是volatile
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {//修改table 索引 i 处的元素。
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);//如果tab是volatile变量,则该方法保证其可见性。
}
我们不难看出,以上三个方法都是调用的 Unsafe(U)类中的方法,Unsafe类中定义了大量对内存的操作方法,是native的,不建议开发者直接使用。
tabAt和setTabAt最终调用的两个方法分别是 U.getObjectVolatile() 和 U.putObjectVolatile。顾名思义,其实是通过volatile保证的tab的可见性(Volatile只保证可见性不保证原子性)。前提是tab变量是Volatile修饰的变量。
我们通过调用栈,最终可以看到其实 tab 就是 ConcurrentHashMap 中的table。而这个变量是这么定义的,可见其确实是Volatile修饰的变量。
再看casTabAt方法,这个就是CAS方法了。CAS是Compare and Swap三个单词的缩写,比较交换。CAS在Java中又称之为乐观锁,即我们总认为是没有锁的。一般是通过上述用法达到自旋的目的。CAS一般通过自旋达到自旋锁的目的,即认为没有锁,失败重试,这种思路。
三、构造方法
ConcurrentHashMap一共有五个构造方法。
无参构造方法
1)无参构造方法
通过无参构造方法创建map,则不会做任何操作,直到向map中put元素的时候,才会去初始化ConcurrentHashMap内部的数组。
从无参构造源码也可以看出,在 ConcurrentHashMap 无参构造方法中,它没有做任何的动作。也就是说,采用无参构造创建 ConcurrentHashMap 时,底层并没有创建数组对象。(这里补充一点,创建数组对象的动作是在后续进行 put 操作添加元素时创建的)。
初始化源码上方有一句话,翻译过来就是"创建一个新数组,数组默认长度为16",也对应了上面我说到的,默认初始容量为16。
带参构造方法
2)可以指定初始容量的构造函数,通过这个构造函数可以指定初始容量。
在方法中,进行一些合法性的校验,如果初始容量小于0,则抛出异常,如果初始化容量大于等于最大的容量的二分之一,则让初始容量等于最大容量,否则,通过tableSizeFor()方法生成一个大于等于当前初始化容量,并且最接近于2的倍数的大小的数字,作为内部数组的数组长度。
进入tableSizeFor()方法看一下,
这个方法可能大家看不太懂,通俗来说这个方法的目的是返回一个 2 的整数次幂的数。如2^4 = 16,2^5 = 32,2^6 = 64。
结合上述扩容方法和 tableSizeFor 方法,我们可以知道,当我们传入一个初始值的时候,实际计算出的结果是 (传入的值+传入值的一半+1)的结果向上取整并且必须是2的整数次幂。
比如,如果我们传入的是32,那么计算出的初始容量就是 32 + 16 + 1 = 49,49不是2的整数次幂,向上取整最小为 64,所以初始容量为64而不是传入的32。如果我们传入的是16,那么计算出的结果就是 16 + 8 + 1 = 25,25不是2的整数次幂,向上取整在最小为32,所以计算出的初始容量为32而不是传入的16。
3)第三个构造方法
是将传入的map对象,将它的元素全部put到ConcurrentHashMap中,并且指定数组长度为默认长度16。
4)第四个构造方法
需要传入初始容量和负载因子,并且调用了最后一个构造方法。
5)第五个构造方法
内部依然是做一些参数合法性的判断,判断负载因子是否小于0,初始容量是否小于等于0,第三个参数为并发更新线程的估计数量,就是预计同时操作并发线程数有多少个。
如果初始容量小于预估的并发线程数,则让初始容量等于并发线程数,也就是让初始容量至少要等于同时操作它的线程数。
四、put()方法
大致分析
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//hash,对hashcode再散列
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//迭代桶数组,自旋
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)//懒加载。如果为空,则进行初始化
tab = initTable();//初始化桶数组
//(n - 1) & hash)计算下标,取值,为空即无hash碰撞
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))//通过cas插入新值
break; // no lock when adding to empty bin
}
//判断是否正在扩容。如果正在扩容,当前线程帮助进行扩容。
//每个线程只能同时负责一个桶上的数据迁移,并且不影响其它桶的put和get操作。
//(很牛逼的思路,能这么做建立在更细粒度的锁基础上)
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {//put5,存在hash碰撞
V oldVal = null;
//此处,f在上面已经被赋值,f为当前下标桶的首元素。对链表来说是链表头对红黑树来说是红黑树的头元素。
synchronized (f) {
//再次检查当前节点是否有变化,有变化进入下一轮自旋
//为什么再次检查?因为不能保证,当前线程到这里,有没有其他线程对该节点进行修改
if (tabAt(tab, i) == f) {
if (fh >= 0) {//当前桶为链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//迭代链表节点
K ek;
if (e.hash == hash &&//key相同,覆盖(onlyIfAbsent有什么用?)
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//找到链表尾部,插入新节点。(什么这里不用CAS?因为这在同步代码块里面)
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//当前桶为红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {//想红黑树插入新节点
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//树化。binCount > 8,进行树化,链表转红黑树
if (binCount >= TREEIFY_THRESHOLD)
//如果容量 < 64则直接进行扩容;不转红黑树。
//(你想想,假如容量为16,你就插入了9个元素,巧了,都在同一个桶里面,
//如果这时进行树化,时间复杂度会增加,性能下降,不如直接进行扩容,空间换时间)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//扩容。addCount内部会进行判断要不要扩容
return null;
}
总结以上过程,
- 懒加载,未初始化则初始化table
- hash,hashcode再散列,并计算下标。
- 无碰撞,通过CAS插入。
- 有碰撞,
- 如果正在扩容,协助其它线程去扩容。
- 如果是链表,插入链表。如果是红黑树,插入红黑树。
- 如果链表长度超过8,树化。
- 如果key已经存在,覆盖旧值。
- 需要扩容,则扩容。
相比HashMap过程多了一个协助扩容。
具体分析
在put()方法内部,又调用了putVal()方法。
大致分析如下:
putVal方法比较长,分段进行介绍。
1.第一阶段
刚开始,判断key或者value是否为null。如果key或者value为null则抛出空指针异常,这里也是和HashMap的一个区别,HashMap是允许key为null的,通过spread()方法传入key的hashCode,来计算key的最终hash值。然后声明binCount表示桶上的节点默认为0个。
spread()方法
我们进入spread()方法看一下,
在生成hash的方法spread内部,是将key的hashCode的高16位和低16位进行异或,可以增大随机性。HASH_BITS为0x7fffffff,二进制为11111111111111111111111111111111,进行&操作,如果两个都是1则为1,否则为0。这样是1和为0的概率都为百分之五十,这样做也是为了增大hash的随机性,减少hash碰撞的几率。
继续回到putVal方法,接下来是一个for的死循环,将数组table赋值给tab,首先判断tab是不是空或者tab的长度是否等于0,如果满足条件,则说明数组没有初始化,需要进行数组的初始化。
否则就通过hash计算元素在数组上的位置,计算的方式是(n-1)& hash,由于n总是2的n次方,减一,就是类00000...11111这样的数值,&操作之后一定会得到0到n-1的值,这个值就是当前元素在数组上的位置。
如果当前数组上的位置的元素为null,则创建一个Node使用CAS操作将元素放到桶中,并且终止循环。
initTable()方法
initTable()方法是数组初始化方法,这个方法比较简单,就是初始化一个合适大小的数组sizeCtl。如果没搞懂这个属性的意义,可能会被搞晕。这个标志是在 Node 数组初始化或者扩容的时候的一个控制位标识,负数代表正在进行初始化或者扩容操作。
- 0 标识 Node 数组还没有被初始化,正数代表初始化或者下一次扩容的大小。
- 负数代表正在进行初始化或者扩容操作。
- -1 代表正在初始化。
- -N 代表有 N-1 有二个线程正在进行扩容操作,这里不是简单的理解成 n 个线程,sizeCtl 就是-N,这块后续在讲扩容的时候会说明。
继续回到putVal方法,说说上面锁竞争的情况。以上过程我们不难发现对table的修改都是通过CAS操作实现的。比如下面这行代码,
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))//通过cas插入新值
break; // no lock when adding to empty bin
}
如果已经有线程正在操作 i 位置的元素,则意味着本轮自旋将会失败,继续自旋。当其他线程修改完成,本线程再次运行到tabAt以为是Volatile操作,其他线程的修改对本线程立即可见。本线程通过tabAt发现该处已经存在元素,即发生碰撞,继续往下运行。
上面使用的casTabAt()方法,可以看到底层使用了CAS操作,对应上面的语义就是将指定位置上的元素,从null替换成新的Node节点。
进入tabAt()方法看一下,tabAt()方法主要是获取对象中offset偏移地址对应的对象field的值。
实际上这段代码的含义等价于tab[i],但是为什么不直接使用 tab[i]来计算呢?因为对 volatile 写操作 happen-before 于 volatile 读操作,因此其他线程对 table 的修改均对 get 读取可见。虽然 table 数组本身是增加了 volatile 属性,但是 volatile 的数组只针对数组的引用具有volatile 的语义,而不是它的元素。 所以如果有其他线程对这个数组的元素进行写操作,那么当前线程来读的时候不一定能读到最新的值。出于性能考虑,是直接通过 Unsafe 类来对 table 进行操作的。
假如在上面这段代码中存在两个线程,在不加锁的情况下,线程 A 成功执行 casTabAt 操作后,随后的线程 B 可以通过 tabAt 方法立刻看到 table[i] 的改变。是因为线程 A 的casTabAt 操作具有 volatile 读写相同的内存语义,根据 volatile 的 happens-before 规则,可以得到线程 A 的 casTabAt 操作一定对线程 B 的 tabAt 操作可见。
2.第二阶段
继续回到putVal()方法,代码继续执行下一步,
如果对应的节点存在,判断这个节点的 hash 是不是等于 MOVED(-1)。如果等于MOVED则说明当前节点是ForwardingNode 节点,意味着有其他线程正在进行扩容,那么当前现在直接帮助它进行扩容,因此调用 helpTransfer()方法。
helpTransfer()方法
看一下这个复杂的表达式,这 4 个条件只要有一个条件为 true,说明当前线程不能帮助进行此次的扩容,直接跳出循环。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
- sc >>> RESIZE_STAMP_SHIFT!=rs 表示比较高 RESIZE_STAMP_BITS 位生成戳和 rs 是否相等。
- sc=rs+1 表示扩容结束。
- sc==rs+MAX_RESIZERS 表示帮助线程线程已经达到最大值了。
- nt=nextTable -> 表示扩容已经结束。
- transferIndex<=0 表示所有的 transfer 任务都被领取完了,没有剩余的 hash 桶来给这个线程来做 transfer操作了。
3.第三阶段
继续回到putVal()方法,最后这个else语句块比较长,分几块来分析,
走到这个else,说明数组已经初始化,并且桶节点也是有值的,那么接下来的操作就是要将Node添加到下面的链表或者红黑树。
首先声明一个oldVal为null。之后使用synchronized关键字,直接将桶上的节点锁住,也就是相当于拿到桶节点作为锁对象,其他的线程不能同时操作当前桶节点元素或者它下面链表或者红黑树上的元素,保证了线程安全。
之后进入一个 if 判断,如果当前桶节点的地址等于f,则继续执行。
如果当前桶节点f的hash值大于等于0,则说明这个桶下面的数据是以链表形式存在的,原因就是下面的定义。
桶上的节点有几种可能,第一种如果是链表,则hash值一定大于等于0。如果是红黑树,则桶节点hash值为-2。MOVED上面已经说过,RESERVED表示在computeIfAbsent和compute中使用的占位符节点。
如果大于等于0,说明下方是链表,则声明binCount为1,通过for循环对链表进行遍历,从f开始遍历,如果当前节点的hash值、key值相等,则将旧的value替换为新的value,这是在 onlyIfAbsent 为 false 的情况下,并终止循环。如果不相等,就利用元素的next指针进行遍历,直到遍历到最后一个节点,终止循环。
如果桶上的节点是树节点,则同样对红黑树进行遍历,找到key值相同的节点,如果节点存在,将旧值替换成新值,如果不存在则将新的节点添加到红黑树中。
之后退出同步代码块,代码继续向下执行,
如果当前桶下方节点不为0,并且大于TREEIFY_THRESHOLD,TREEIFY_THRESHOLD为8,当链表的节点大于等于8个,则将链表转为红黑树。
treeifyBin()方法
treeifyBin()方法用于将过长的链表转换为TreeBin对象。但是它并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回。如果满足条件才链表的结构抓换为TreeBin。
在treeifyBin内部,要注意一点,除了链表长度大于8之外,当前数组的长度必须大于MIN_TREEIFY_CAPACITY才会进行转树操作。
与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode。
4.第四阶段
回到putVal()方法,接下来,代码继续向下执行,
在 putVal()最后,会通过addCount来增加ConcurrentHashMap中的元素个数,并且还会可能触发扩容操作。这里会有两个非常经典的设计。
- 高并发下的扩容?
- 如何保证 addCount 的数据安全性以及性能?
调用 addCount() 方法的时候,传递了两个参数,分别是 1 和 binCount(链表长度),我们可以去看看 addCount 方法里面做了什么操作。
addCount()方法
addCount()方法的两个形参,x 参数表示这次需要在表中增加的元素个数,check 参数表示是否需要进行扩容检查,大于等于 0都需要进行检查。
至于其中的CounterCell ,ConcurrentHashMap 是采用 CounterCell 数组来记录元素个数的,像一般的集合记录集合大小,直接定义一个 size 的成员变量即可,当出现改变的时候只要更新这个变量就行。
为什么ConcurrentHashMap 要用这种形式来处理呢?问题还是处出并发上,ConcurrentHashMap 是并发集合,如果用一个成员变量来统计元素个数的话,为了保证并发情况下共享变量的的安全性,势必会需要通过加锁或者自旋来实现。如果竞争比较激烈的情况下,size 的设置上会出现比较大的冲突反而影响了性能,所以在ConcurrentHashMap 采用了分片的方法来记录大小。
具体什么意思,我们来分析下。
fullAddCount() 方法
其中的fullAddCount() 方法,主要是用来初始化 CounterCell 和记录元素个数,里面包含扩容,初始化等操作。
回到addCount(long x, int check),代码继续执行,
判断是否需要扩容,也就是当更新后的键值对总数 baseCount >= 阈值 sizeCtl 时,进行rehash,这里面会有两个逻辑。
- 如果当前正在处于扩容阶段,则当前线程会加入并且协助扩容
- 如果当前没有在扩容,则直接触发扩容操作
其中,这 4 个条件只要有一个条件为 true,说明当前线程不能帮助进行此次的扩容,直接跳出循环。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
- sc >>> RESIZE_STAMP_SHIFT!=rs 表示比较高 RESIZE_STAMP_BITS 位生成戳和 rs 是否相等。
- sc=rs+1 表示扩容结束。
- sc==rs+MAX_RESIZERS 表示帮助线程线程已经达到最大值了。
- nt=nextTable -> 表示扩容已经结束。
- transferIndex<=0 表示所有的 transfer 任务都被领取完了,没有剩余的 hash 桶来给这个线程来做 transfer操作了。
resizeStamp()方法
这块逻辑要理解起来,也有一点复杂。resizeStamp()方法用来生成一个和扩容有关的扩容戳,具体有什么作用呢?我们基于它的实现来做一个分析。
Integer.numberOfLeadingZeros() 这个方法是返回无符号整数 n 最高位非 0 位前面的 0 的个数,比如 10 的二进制是 0000 0000 0000 0000 0000 0000 0000 1010,那么这个方法返回的值就是 28。
根据 resizeStamp 的运算逻辑,我们来推演一下,假如 n=16,那么 resizeStamp(16)=32796。转化为二进制是[0000 0000 0000 0000 1000 0000 0001 1100]。
接着再来看,当第一个线程尝试进行扩容的时候,会执行下面这段代码U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)。rs 左移 16 位,相当于原本的二进制低位变成了高位 1000 0000 0001 1100 0000 0000 00000000。
然后再+2 =1000 0000 0001 1100 0000 0000 0000 0000+10=1000 0000 0001 1100 0000 00000000 0010。
高 RESIZE_STAMP_BITS 16 位代表扩容的标记、低RESIZE_STAMP_BITS 16 位代表并行扩容的线程数。这样来存储有什么好处呢?
- 首先在 CHM 中是支持并发扩容的,也就是说如果当前的数组需要进行扩容操作,可以由多个线程来共同负责。
- 可以保证每次扩容都生成唯一的生成戳,每次新的扩容,都有一个不同的 n,这个生成戳就是根据 n 来计算出来的一个数字,n 不同,这个数字也不同。
第一个线程尝试扩容的时候,为什么是+2 ?
因为 1 表示初始化,2 表示一个线程在执行扩容,而且对 sizeCtl 的操作都是基于位运算的,所以不会关心它本身的数值是多少,只关心它在二进制上的数值,而 sc + 1 会在低 16 位上加 1。
扩容是 ConcurrentHashMap 的精华之一,扩容操作的核心在于数据的转移,在单线程环境下数据的转移很简单,无非就是把旧数组中的数据迁移到新的数组。
但是这在多线程环境下,在扩容的时候其他线程也可能正在添加元素,这时又触发了扩容怎么办?可能大家想到的第一个解决方案是加互斥锁,把转移过程锁住。虽然是可行的解决方案,但是会带来较大的性能开销。因为互斥锁会导致所有访问临界区的线程陷入到阻塞状态,持有锁的线程耗时越长,其他竞争线程就会一直被阻塞,导致吞吐量较低。而且还可能导致死锁。
而 ConcurrentHashMap 并没有直接加锁,而是采用 CAS 实现无锁的并发同步策略。最精华的部分是它可以利用多线程来进行协同扩容。简单来说,它把 Node 数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的bucket会被替换为一个ForwardingNode节点,标记当前bucket已经被其他线程迁移完了。
transfer()方法
当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个transfer()方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。
transfer()方法主要负责把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置。
接下来分析一下它的源码实现,
- fwd类:是个标识类,用于指向新表用的。其他线程遇到这个类会主动跳过这个类,因为这个类要么就是扩容迁移正在进行,要么就是已经完成扩容迁移,也就是这个类要保证线程安全,再进行操作。
- advance变量:用于提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。
- finishing变量:用于提示扩容是否结束。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//将 (n>>>3 相当于 n/8) 然后除以 CPU 核心数。如果得到的结果小于 16,那么就使用 16
// 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,
// 默认一个 CPU(一个线程)处理 16 个桶,也就是长度为 16 的时候,扩容的时候只会有一个线程来扩容
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab 未初始化, nextTab 是用来扩容的 node 数组
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//新建一个 n<<1 原始 table 大小的 nextTab,也就是 32
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;//赋值给 nextTab
} catch (Throwable ex) { // try to cope with OOME
//扩容失败,sizeCtl 使用 int 的最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;//更新成员变量
transferIndex = n;//更新转移下标,表示转移时的下标
}//新的 tab 的长度
int nextn = nextTab.length;
// 创建一个 fwd 节点,表示一个正在被迁移的 Node,并且它的 hash 值为-1(MOVED),也
// 就是前面我们在讲 putval 方法的时候,会有一个判断 MOVED 的逻辑。它的作用是用来占位,表示
// 原数组中位置 i 处的节点完成迁移以后,就会在 i 位置设置一个 fwd 来告诉其他线程这个位置已经处理过了,
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,
// 如果是false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
//判断是否已经扩容完成,完成就 return,退出循环
boolean finishing = false; // to ensure sweep before committing nextTab
// 通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置
// transferIndex 属性值,并初始化 i 和 bound 值, i 指当前处理的槽位序号, bound 指需要处理
// 的槽位边界,先处理槽位 15 的节点;
for (int i = 0, bound = 0;;) {
// 这个循环使用 CAS 不断尝试为当前线程分配任务
// 直到分配成功或任务队列已经被全部分配完毕
// 如果当前线程已经被分配过 bucket 区域
// 那么会通过--i 指向下一个待处理 bucket 然后退出该循环
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//--i 表示下一个待处理的 bucket,如果它>=bound,表示当前线程已经分配过bucket 区域
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//表示所有 bucket 已经被分配完毕
i = -1;
advance = false;
}
//通过 cas 来修改 TRANSFERINDEX,为当前线程分配任务,处理的节点区间为(nextBound,nextIndex)->(0,15)
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;//0
i = nextIndex - 1;//15
advance = false;
}
}
//i<0 说明已经遍历完旧的数组,也就是当前线程已经处理完所有负责的 bucket
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//如果完成了扩容
nextTable = null;//删除成员变量
table = nextTab;//更新 table 数组
sizeCtl = (n << 1) - (n >>> 1);//更新阈值(32*0.75=24)
return;
}
// sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每增加一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 的低 16 位进行减 1,代表做完了属于自己的任务
// if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 第一个扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
// 后续帮其扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = sizeCtl+1
// 每一个退出 transfer 的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1 那么最后一个线程退出时:必然有
// sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 如果相等,扩容结束了,更新 finising 变量
finishing = advance = true;
// 再次循环检查一下整张表
i = n; // recheck before commit
}
}// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//表示该位置已经完成了迁移,也就是如果线程 A 已经处理过这个节点,那么线程 B 处理这个节点时,hash 值一定为 MOVED
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
......
}
具体分析
下图为并发扩容流程图,在分析源码前熟悉一下流程:
1)计算步幅
在这里首先会计算一个步长stride,表示一个线程处理的数组长度,用来控制对CPU的使用。
将 (n>>>3 相当于 n/8) 然后除以 CPU 核心数。如果得到的结果小于 16,那么就使用 16。这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶,也就是长度为 16 的时候,扩容的时候只会有一个线程来对其进行扩容的复制移动操作。
扩容的时候会一直遍历,直到复制完所有节点,没处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他,复制后在新数组中的链表不是绝对的反序的。
2)初始化新的哈希表
然后,如果 nextTab 为空,意味着正在进行扩容。尝试创建一个大小为 n << 1 的新数组 nextTab,也就是初始化一个新的哈希表,并处理可能的内存溢出异常(OOME)。
3)转移节点
创建一个 ForwardingNode 实例,表示一个正在迁移的节点,并将它的 hash 值为-1(MOVED),以标识该位置正在被迁移。
就是前面我们在讲 putval 方法的时候,会有一个判断 MOVED 的逻辑。它的作用是用来占位,原数组中位置 i 处的节点完成迁移以后,就会在 i 位置设置一个 fwd 来告诉其他线程这个位置已经处理过了。
4)任务分配和处理
使用一个循环和 CAS(Compare-And-Swap)机制分配任务,确保多个线程能够安全地并行迁移节点。 每个线程会尝试从 transferIndex 中获取需要处理的桶,并更新 bound 和 i 的值来管理当前线程处理的范围。
具体是通过 for 自循环处理每个槽位中的链表元素,默认 advace 为真,通过 CAS 设置transferIndex 属性值,并初始化当前处理的槽位序号 i 和要处理槽位边界 bound 值,先处理槽位 15 的节点。这个循环使用 CAS 不断尝试为当前线程分配任务,直到分配成功或任务队列已经被全部分配完毕。如果当前线程已经被分配过 bucket 区域,那么会通过--i 指向下一个待处理 bucket 然后退出该循环。
5)并发处理节点迁移
如果当前线程已经处理完所有的桶,则可能需要判断是否完成了整个扩容过程。如果所有线程都完成了迁移,更新哈希表的大小控制阈值 sizeCtl。
使用 transferIndex 和 CAS机制进行节点的迁移,保证线程安全。迁移过程中,线程会尝试将原数组中的每个桶中的节点迁移到新数组中。具体操作步骤包括:
1】每个线程负责处理一部分桶。通过 transferIndex 来跟踪当前处理的桶索引。
2】对于每个桶,线程将桶内的每个节点进行迁移。根据每个节点的hash值决定它们在新数组中的位置。
3】迁移后,将原数组中对应位置的值设置为 ForwardingNode,以指示其他线程这个位置已被处理。也就是当 i 指向的桶没有节点时,插入 ForwardingNode 以占位。如果该桶的 hash 值为 MOVED,说明其他线程已经处理过该节点,继续处理下一个桶。
- 第一个扩容的线程在执行transfer()方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)。
- 后续扩容的线程在执行transfer()方法之前,会设置 sizeCtl = sizeCtl+1。
- U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1) 这里是使用 CAS 操作对 sizeCtl 的低 16 位进行减 1,以表示当前线程已经完成了迁移任务。
- 每一个退出transfer()方法的线程在退出之前,会设置 sizeCtl = sizeCtl-1。
- 那么最后一个线程退出时,必然有 sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT。
- 如果 sc - 2 等于标识符左移 16 位了,说明没有线程在帮助他们扩容了,也就是说扩容结束了。
对于提到的U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1),可以在扩容操作 transfer 的第 2414 行看到,代码如下,
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
每存在一个线程执行完扩容操作,就通过 cas 执行 sc-1。接着判断(sc-2) !=resizeStamp(n) << RESIZE_STAMP_SHIFT。如果相等,则表示当前为整个扩容操作的最后一个线程,那么意味着整个扩容操作就结束了。如果不相等,则说明还得继续这么做的。
目的是一方面是防止不同扩容之间出现相同的 sizeCtl,另外一方面,还可以避免sizeCtl 的 ABA 问题导致的扩容重叠情况。
此实现是为了解决并发情况下的扩容问题,确保多个线程能够高效且安全地进行节点迁移,避免出现竞争条件和不一致性。
4】当最后一个线程完成后,将 finishing 状态设置为 true,并再次检查整个表的状态。
数据迁移阶段的实现分析
通过分配好迁移的区间之后,开始对数据进行迁移。我们来了解一下原理。ConcurrentHashMap 在做链表迁移时,会用高低位来实现,这里有两个问题要分析一下。
1)如何实现高低位链表的区分?
假如我们有下面的这样一个队列,第10个槽位插入新节点之后,链表元素个数目前是5,且数组长度为 16,肯定是优先通过扩容来缓解链表过长的问题,将数组长度由16扩容到32。扩容这块的图解稍后再分析,先分析高低位扩容的原理。
假如当前线程正在处理槽位为9的节点(它目前是一个链表结构)。在代码中,首先定义两个变量节点 ln 和 hn,实际就是 lowNode 和 HighNode,分别保存 hash 值的第x位为0和不为0的节点。
通过 fn&n 可以把这个链表中的元素分为两类:一类是hash值的第x位为0,另一类是hash值的第 x 位不为0,最终要达到的目的是一类的链表保持位置不动,另一类的链表为 9+16(扩容增加的长度)=25。并且通过 lastRun 记录最后要处理的节点。
我们把槽位9的链表单独拎出来,假如链表的分类是图上那样的,节点ACE是一类,其他的节点BC是一类。
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
通过上面这段代码遍历,会记录 runBit 以及 lastRun。再通过这段代码进行遍历,生成 ln 链以及 hn 链。
接着,通过 CAS 操作把 hn 链放在 i+n 也就是 14+16 的位置,ln 链保持原来的位置不动。并且设置当前节点为 fwd,表示已经被当前线程迁移完了。
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
迁移完成以后的数据分布就如图中右边所示。
为什么要做高低位的划分?
要想了解这么设计的目的,我们需要从 ConcurrentHashMap 的根据下标获取对象的算法来看,在 putVal 方法中 :(f = tabAt(tab, i = (n - 1) & hash)) == null。通过(n-1) & hash 来获得在 table 中的数组下标来获取节点数据。
假设 table 长度是 16, 二进制是【0001 0000】,减一以后的二进制是 【0000 1111】。假如某个 key 的 hash 值是 20,对应的二进制是【0001 0100】,仍然按照(n-1) & hash 算法,分别在 16 长度和 32 长度下的计算结果 0000 1111 & 0001 0100=0000 0100 和 0001 1111 & 0001 0100 =0001 0100 。
从结果来看,同样一个 hash 值,在扩容前和扩容之后,得到的下标位置是不一样的,这种情况当然是不允许出现的。所以在扩容的时候就需要考虑,而使用高低位的迁移方式,就是解决这个问题。
大家可以看到,16 位的结果到 32 位的结果,正好增加了 16。
- 比如 20 & 15=4 、20 & 31=20 ; 4-20 =16
- 比如 60 & 15=12 、60 & 31=28; 12-28=16
所以对于高位,直接增加扩容的长度。当下次 hash 获取数组位置的时候,就可以直接定位到对应的位置。
这个地方又是一个很巧妙的设计,直接通过高低位分类以后,就使得不需要在每次扩容的时候来重新计算 hash,极大提升了效率。
小结
put()方法添加元素的步骤如下:
- 判断key、value是否为空,是则抛异常。
- 判断数组是否初始化,没有则初始化。
- 判断该位置节点是否为空,是则通过cas方法直接添加。
- 判断数组是否在扩容,hash值为-1表明正在扩容,调用helpTransfer方法使当前线程帮助扩容。
- 最后一种情况,当前节点不为空,也不再扩容,通过syncronized来加锁,进行添加操作。
- 然后判断当前取出的节点位置存放的是链表还是树,
- 如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等且key的hash值也相等,则说明是同一个key,就覆盖掉value。否则添加到链表的末尾。
- 如果是树的话,则调用putTreeVal方法把这个元素添加到树中去。
- 添加完后,判断该节点处有多少个节点,如果达到了8个则转换成红黑树。
- 把ConcurrentHashMap的元素个数+1,并判断是否需要扩容。
五、get()方法
concurrentHashMap的get()方法操作的流程很简单,可以分为三个步骤来描述:
- 通过spread计算hash值,定位到该table索引位置,如果是首节点符合就返回。
- 如果hash值小于0,则说明该节点正在扩容或者是红黑树,通过ForwardingNode或TreeBin对应的find方法进行查找。也就是使用TreeNode的find方法,对红黑树的节点进行遍历,直到找到key相等的元素返回对应value。
- 如果桶节点下面不是红黑树,说明是一个链表,对链表进行遍历,直到遍历到key相等的元素返回对应value。
- 最后未找到,则返回null。
六、remove()方法
remove()方法的删除操作和添加方法逻辑类似。
七、总结
对比ConcurrentHashMap和HashTable,我们可以明显的看到,ConcurrentHashMap在写的时候,并没有锁住整个节点数组,在新节点上使用的是无锁竞争,在老节点上锁住的仅仅是一个节点。
因此,读的时候如果不是恰好读到写线程写入相同Hash值的位置,不受影响(可以认为我们的操作一般是读多写少,这种几率也比较低)。而HashTable是对整个节点数组进行锁定,读到时候不能写,写的时候不能读,这么一对比就可以明显感觉到性能差距是巨大的。
虽然ConcurrentHashMap的并发性能还算比较优异,但在亿级计算中,却依然会成为性能瓶颈。至于这里为什么会慢,我认为在这种超高并发下,节点数组的单节点的的写写竞争是互斥的,其次,由于红黑树具有读快写慢的特性,它要不断保持树的平衡而不断翻转,所以才会使得高并发写的性能急剧下降。
标签:扩容,ConcurrentHashMap,java,链表,源码,线程,数组,hash,节点 From: https://blog.csdn.net/qq_45956730/article/details/143255085