前导知识
// node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30 ;
// 默认初始值,必须是2的幕数
private static final int DEFAULT_CAPACITY = 16 ;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;
// 负载因子
private static final float LOAD_FACTOR = 0 .75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8 ;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6 ;
static final int MIN_TREEIFY_CAPACITY = 64 ;
private static final int MIN_TRANSFER_STRIDE = 16 ;
private static int RESIZE_STAMP_BITS = 16 ;
// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = ( 1 << ( 32 - RESIZE_STAMP_BITS)) - 1 ;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED = - 1 ;
// 树根节点的hash值
static final int TREEBIN = - 2 ;
// ReservationNode的hash值
static final int RESERVED = - 3 ;
// 可用处理器数量
static final int NCPU
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符
-1:代表hash表所有元素转移完成
-n:代表hash表此时正在进行扩容
*/
private transient volatile int sizeCtl;
一:ConcurrentHashMap数据结构
比较ConcurrentHashMap中用了cas的地方,而相同的逻辑hashmap没有用的地方,就是ConcurrentHashMap的优点所在。
底层实现:数组+红黑树+链表
控制并发的核心:利用cas不停的自旋,进入不同的分支达到对应的目的,直至退出(以下的讲述我将会对应不同的场景来说明各个分支的作用)
一般的我们在使用线程池的时候确定核心线程的数量有2种办法
1:io密集型(自己预估比较耗io的线程有多少),2:cpu密集型( 也就是接下来的源码中的NCPU= Runtime.getRuntime().availableProcessors(); )
put()
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap不支持插入null
if (key == null || value == null) throw new NullPointerException();
//根据key算出一个hash值
int hash = spread(key.hashCode());
//对hash表中的元素进行计数,hash表中的所有元素个数=bincount+所有cellcount中的value之和
int binCount = 0;
//table(一开始hash表为null)
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//把hash表的长度赋值给n
if (tab == null || (n = tab.length) == 0)
//如果hash表长度为0,或者是空的,进行初始化
tab = initTable();
//如果hash表已经初始化过了且hash表(n - 1) & hash这个位置的元素为null
//进行cas控制,保证只有一个线程能插入成功
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果hash表已经初始化过了且hash表(n - 1) & hash这个位置的元素不为null,但是这个位置的元素的hash值为MOVEN(默认值为-1),说明别的线程在对tab进行扩容,帮助其扩容(转移元素到新的hash表)。此时fh = hash值
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//如果hash表已经初始化过了且hash表(n - 1) & hash这个位置的元素不为null,且这个位置的元素的hash值不为MOVEN(默认值为-1),分情况插入链表或者红黑树
else {
V oldVal = null;
//锁住链表的头节点或者是红黑树的根节点,保证同个位置同一时间只有一个线程能进行操作
synchronized (f) {
//再次判断这个位置的节点是否和之前得到的节点f一样,锁住了节点但是没有锁住位置!,防止此时tab正好扩容完成,但是这个位置的节点已经不是f这个情况
if (tabAt(tab, i) == f) {
//如果这个节点有hash值,说明这个节点的下面是个链表,进行链表操作(treebin中没有hash这个属性)
if (fh >= 0) {
binCount = 1;
//死循环
for (Node<K,V> e = f;; ++binCount) {
K ek;
//判断链表中的节点是否string或者int类型的key与现在要put的string或者int类型的key是否相同,如果相同返回老的value,新的value替换老的value,退出死循环
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果循环到链表尾了,尾节点前面的节点都没有相同的key与之相同,那么就把这个key,value加到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果这个位置的节点是个treebin,加入红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//
if (binCount != 0) {
//如果链表长度超过8,这个位置的链表,转红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
//返回被替换的节点的value
return oldVal;
break;
}
}
}
//put操作成功,hash表中元素个数加1
addCount(1L, binCount);
return null;
}
initTable()
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl<0说明其他线程正在初始化tab
if ((sc = sizeCtl) < 0)
//让出其他线程初始化tab,执行的时间,接着自旋
Thread.yield(); // lost initialization race; just spin
//cas确保只有一个线程能初始化tab
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//只有tab为null或者长度为0才初始化
if ((tab = table) == null || tab.length == 0) {
//sc = -1,n为默认的DEFAULT_CAPACITY
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
//进行tab初始化赋值
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//修改sc标记,让其他线程不再Thread.yield();
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
//退出死循环
break;
}
}
return tab;
}
helpTransfer()
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//tab != null,被转移tab不为null
//f instanceof ForwardingNode f是个ForwardingNode(f是传进来的tabAt(tab, i = (n - 1) & hash)))
//(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null这个nextTab就是新的hash表,不为null以上几点综合,说明元素还在转移
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
//如果是帮助进行扩容,那么sizeCTL,在扩容的时候会被赋值一个超级小的负数
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//帮助进行扩容,nextTab是正在扩容的新tab
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
transfer()
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//stride区间(划定每个线程从旧的tab的转移哪一段的元素)
int n = tab.length, stride;
//如果是多核cpu,n>>>3<MIN_TRANSFER_STRIDE,默认区间为MIN_TRANSFER_STRIDE=16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果nextTab为null,创建新的tab容量为2倍旧的tab,开始扩容
//如果nextTab不为null,说明其他线程正在扩容,为帮助扩容,
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//旧hash表最右下标的值
transferIndex = n;
}
//新hash表的长度
int nextn = nextTab.length;
//注意这个ForwardingNode节点中包括新的hash表!!!!!!!!!!!
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//每一个区间中的元素,是否需要接着转移的标识符号,true标识还要接着转移
boolean advance = true;
//扩容是否完成的标识符,扩容完成为true
boolean finishing = false; // to ensure sweep before committing nextTab
//死循环
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//自旋
while (advance) {
int nextIndex, nextBound;
//判断区间[bound,i]是否减到头了(区间元素是否还需要接着转移)
if (--i >= bound || finishing)
advance = false;
//一开始nextIndex = transferIndex = n = tab.length > 0 不会走这
//如果旧的tab剩下的节点 <= stride = 16,那么其他的线程不会继续划分区间,transferIndex = TRANSFERINDEX = nextBound = 0,且最后一个区间的元素全部转移完成,把i改成-1
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//这个时候一个线程进来了,经过cas确保这个区间是唯一的且只属于这个线程
//假设旧的hash表的长度为100,经过这个判断transferIndex = TRANSFERINDEX = nextBound = 84
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//bound = nextBound = 84
// i = nextIndex - 1 = 100 - 1 =99
//[bound,i]那么这个区间的下标就是[84,99],这个线程就负责转移这个区间的元素,然后其他的线程过来同理划分区间,因为transferIndex 加了volital关键字,确保了可见性所以第二个线程这个时候的transferIndex = 84,且上面第一个else if中nextIndex = transferIndex=84然后接着这个逻辑划分区间,一直划分到[0,15]
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//包含了2种情况:当i=-1时说明最后一段区间的所有元素转移完成
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果完成扩容,此时才替换老tab
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//在帮助扩容时sc会加1,这里相当于计数,当前线程完成了工作,sc-1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断此时的sc与传进来的sc是否相同,如果在sc-1的情况下都不同,说明在扩容的过程中有别的线程帮助了扩容导致sc加1,直到循环sc-1,与传进来的sc相同为止,确保所有线程对扩容或者帮扩容完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//接下来的俩个else if目的排除hash表上的null节点和fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//根据头节点,遍历转移下面的链表或者是红黑树
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
untreeify()替换树节点为普通节点
/**
* Returns a list on non-TreeNodes replacing those in given list.
*/
static <K,V> Node<K,V> untreeify(Node<K,V> b) {
Node<K,V> hd = null, tl = null;
for (Node<K,V> q = b; q != null; q = q.next) {
Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
if (tl == null)
hd = p;
else
tl.next = p;
tl = p;
}
return hd;
}