ConcurrentHashMap
基本介绍
在JDK1.8中,它的数据结构:Node数组+链表/红黑树
初始化
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。
if ((sc = sizeCtl) < 0)
// 让出 CPU 使用权
// 神来之笔
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
它主要使用了CAS来保证初始化的线程安全。
使用了一个可见的状态量sizeCtl,用它来标识了是否有线程正在执行初始化,如果有正在初始化的,那么使用Thread.yield()来让出时间切片,我觉得这块是最神来之笔的,一方面可以预防其他线程的初始化失败,另一方面又不占用CPU。
put()
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key 和 value 不能为空
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f = 目标位置元素
Node<K,V> f; int n, i, fh;// fh 后面存放目标位置的元素 hash 值
if (tab == null || (n = tab.length) == 0)
// 数组桶为空,初始化数组桶(自旋+CAS)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 桶内为空,CAS 放入,不加锁,成功了就直接 break 跳出
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 使用 synchronized 加锁加入节点
synchronized (f) {
if (tabAt(tab, i) == f) {
// 说明是链表
if (fh >= 0) {
binCount = 1;
// 循环加入新的或者覆盖节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
如果哈希桶为null,则用CAS去初始化哈希桶。
在存放时,通过减小锁的颗粒度的synchronized锁来保证线程安全,锁的只是单个哈希桶.
get()
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// key 所在的 hash 位置
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果指定位置元素存在,头结点hash值相同
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
// key hash 值相等,key值相同,直接返回元素 value
return e.val;
}
else if (eh < 0)
// 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 是链表,遍历查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
ConcurrentSkipListMap
基本介绍
ConcurrentSkipListMap
由跳表实现,支持多线程场景下的增删改查操作,是一个线程安全的并发容器.
跳表
跳表是一种基于有序链表的数据结构,它有多个层级,称为多级索引。检锁时通过多级索引定位来提高元素的查找效率,实现较快的插入、删除和查找操作。
定位逻辑如下:
1、首先先从最上层索引出发,到最底层索引结束。
2、遍历索引
(1)若在遍历过程中找到目标值,则直接停止
(2)若在遍历过程中发现前驱节点大于目标值时,直接停止遍历,走下层索引
(3)若遍历发现前驱节点为null,则走下层索引
3、定位可能在某一层索引就结束了
例子:
首先从 3 级索引开始搜寻,执行步骤为:
-
3 级索引从
headIndex
开始遍历,得到 38 的索引,大于 23,说明 3 级索引后续索引都大于目标值,该层索引已没有搜索的必要,到下级索引查找。 -
2 级索引遍历到 3 的索引,该值小于 23,指针继续向右。
-
2 级索引遍历到 17 的索引,同样小于 23,指针继续向右推进。
-
2 级索引遍历到 38,大于 23,说明 2 级索引没有目标节点索引,继续向下层查找。
-
1 级索引得到 25,大于 23,继续向下层查找。
-
在链表中查到 23,搜寻结束,查找次数为 6。
相关方法
findPredecessor(重要)
它的方法逻辑由跳表可知。
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
//key为空抛出空指针异常
if (key == null)
throw new NullPointerException();
for (;;) {
//从高级别索引开始查找,q指向最高级别的headIndex,r指向后继节点
for (Index<K,V> q = head, r = q.right, d;;) {
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//如果n为空,索引该节点被删除了,进行一次cas让q直接指向r的后继节点,和r指向的节点断开联系。如果unlink失败则说明其他线程也有相同操作,退出内部循环
if (n.value == null) {
if (!q.unlink(r))
break;
//unlink成功后,让r指向q的最新后继节点,即unlink进行cas操作之后的新后继节点
r = q.right;
continue;
}
//比较key和索引指向node的k,如果key比k大,则让q和r继续向后走
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
//如果d的下一级为null则说明到达node节点且该节点的key和入参的key相等,直接返回前驱节点q
if ((d = q.down) == null)
return q.node;
//到这里说明上述步骤没有找到对应节点,索引要继续向下,q指向d(即下层索引),r指向下级索引的后继节点
q = d;
r = d.right;
}
}
}
put
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
主要的实现方法在doPut上,它主要分为三个步骤
找到合适的位置插入新节点
主要是在一级索引上找合适的位置
若发现有值大于key,则该值的后面就是他的位置。
若发现没有值大于key,则他就插入到末尾
for (;;) {
//从一级索引找到小于key的最大值
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
//获取n的后继节点并赋值给f
Node<K,V> f = n.next;
//如果n不是b的后继节点说明当前链表被其他线程修改了,我们需要结束内部循环,重新进入
if (n != b.next)
break;
//如果n的值为空说明被删除了,调用helpDelete进行一下清理工作将n删除,并通过CAS方式将b指向f,简言之就算 b->n(value为空)->f 变成b->f
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//b被删除了,直接结束内部循环
if (b.value == null || v == n) // b is deleted
break;
//比较一下key和n的key的大小,如果key比n的key大,则b、n两个指针向后推进,跳出本次循环
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
//如果比较结果相等,则通过CAS方式交换一下值并返回旧值即可
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
}
//代码走到这里说明当前的key链表中最大的节点,需要创建一个追加到链表末尾
z = new Node<K,V>(key, value, n);
//通过CAS的方式将z追加到Node链表末尾,如果失败则说明其他线程先于本线程做一些改动,我们需要退出内部循环重新进入循环重复插入操作
if (!b.casNext(n, z))
break;
//插入工作全部完成,退出外部大循环,进入下一步。
break outer;
}
}
利用随机数判断是否要构建索引
当前线程为生成一个随机数和0x80000001(一个高低位都为1的二进制)
进行与运算,判断其结果是否为0.如果是则创建索引。(判断随机数是否为正整数)
如果需要创建索引,再进一步通过((rnd >>>= 1) & 1) != 0
判断是否需要新增多级别索引,>>>=是无符号右移。(定索引等级)
//生成随机数 具体逻辑看得有点懵
int rnd = ThreadLocalRandom.nextSecondarySeed();
//如果是正偶数为当前节点创建索引
if ((rnd & 0x80000001) == 0) {
int level = 1, max;
//通过右移运算确定要创建几级索引
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
//h指向一级索引头节点
HeadIndex<K,V> h = head;
//如果level小于h.level说明无需新建高级别索引,遍历level次,为每级索引链表创建索引,node指向新建Node,down指向高1级的索引
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
//创建索引,node指向z(新建的Node),down指向低1个级别的索引,例如:level为2的idx的down就指向level为1对应的索引。
idx = new Index<K,V>(z, idx, null);
}
else { // level大于当前索引级别,需要创建新的索引链表
level = max + 1; //设置新一层索引级别为当前索引层级+1
//创建一个数组,长度为索引的级别数+1,记录每一层级指向新增节点的索引
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
//从level 1开始为新节点创建索引,各层索引用down指针串联
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
//h设置为指向当前最高级别的索引头节点
h = head;
//记录当前索引链表最高层级数
int oldLevel = h.level;
//如果level 比oldLevel小,说明此时有别的线程做修改了,结束循环
if (level <= oldLevel)
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
//循环为每层索引链表创建当前新增节点的索引,用down指针相关联
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//使用cas的方式让head指向新建的headIndex
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
将新索引维护到各级索引上
for (int insertionLevel = level;;) {
//拿到最新的索引级别
int j = h.level;
//从高级别索引开始遍历,将新建索引用cas的方式串联到索引链表上
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
// 如果key比当前比较的键值对的key大,则将q和r都往后移动一位
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
//用cas的方式将将t插入到索引链表上,并让r作为其后继节点
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
//insertionLevel 为0时,说明各层索引链都维护完成了退出循环
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
//上述步骤完成后,q指向下一层的索引,r指向下一层索引的后继索引,进入下一论循环维护新节点索引的位置。
q = q.down;
r = q.right;
}
}
get
public V get(Object key) {
return doGet(key);
}
步入doGet
具体执行流程为:
-
进入循环通过
findPredecessor
小于 key 的最大节点 b,设置一个变量 n 存储 b 的后继节点,设置一个变量 f 存储 n 的后继节点,节点关系为 b->n->f。 -
进行各种非空校验,决定是否结束循环。
-
如果 n 的 key 和 key 相等,直接返回 n 对应的 node 的 value。
-
如果 n 的 key 大于 key,则说明 map 中没有 key 的 value,直接返回 null。
-
反之让 b 指向 n,n 指向 f,即将 b 和 n 两个节点向前移动,继续比对查到是否存在 key 的 value。
-
循环执行 2-5,直到得到结果。
private V doGet(Object key) {
//key为空抛出空指针
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//通过findPredecessor小于key的最大节点,n为b的后继节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//如果n为空则说明map内部找不到对应的key,直接退出循环返回null
if (n == null)
break outer;
//f设置为n的后继节点,逻辑结构为:b->n->f
Node<K,V> f = n.next;
//如果n不为b的后继节点则说明n被其他线程修改了顺序,直接退出内部循环,进行下一次循环重新查找
if (n != b.next)
break;
//v为空说明n被其他线程删除了,调用helpDelete通过CAS方式让b和f连接,构成b->f,并结束内部循环
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
//b的value为空说明b被删除,结束内部循环,重新一轮新的查找
if (b.value == null || v == n)
break;
//n和key比对结果相等,直接返回value
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
//key小于n的key,说明当前map找不到对应key的值,直接退出循环返回null
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
remove
remove
方法核心逻辑会调用doRemove
,返回删除节点的值,如果没找到要删除的 node 则返回 null。
public V remove(Object key) {
return doRemove(key, null);
}
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//查找小于key的最大值对应节点b,n指向其后继节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//n为空说明没找到直接退出并返回null
if (n == null)
break outer;
//f指向n的后继节点,构成b->n->f
Node<K,V> f = n.next;
//如果n不为b的后继节点,说明其他线程有做调整直接结束内部循环,进入下次循环
if (n != b.next)
break;
//n对应的值为空,说明n节点被删除,调用helpDelete维护一下b和f的关系,并结束内部循环
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
//b为空说明b被删除,退出内部循环,进行下一次循环
if (b.value == null || v == n)
break;
//key小于n的key说明map中没有要删除的key直接退出循环,返回null
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
//如果key大于n的key则让b和n向前推进查找等于key的节点
if (c > 0) {
b = n;
n = f;
continue;
}
//如果传入value不为空,且value不等与查到的v说明n被其他线程修改了,不可删除直接退出循环,因为我们remove传如null所以跳过这个判断
if (value != null && !value.equals(v))
break outer;
//到此说明找到要删除的节点,采用CAS方式将value设置为null
if (!n.casValue(v, null))
break;
//通过n.appendMarker(f)实现CAS将n的next指针指向一个新建的node,让n和f断开联系,再通过b.casNext(n, f)让b的next指向f
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key);
else {
findPredecessor(key, cmp); // clean index
if (head.right == null)
tryReduceLevel();
}
//返回删除的node的value
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
ConcurrentSkipListMap 锁分离技术
锁分离技术(Lock Striping)
是一种并发编程中常见的技术,通过减小锁的粒度,确保尽可能多的线程可以操作不同的数据,提高线程的并发度和性能。
在 Java 中ConcurrentSkipListMap
、ConcurrentHashMap
等并发容器都用到了锁分离技术,但是两者的实现却有如下区别:
-
ConcurrentHashMap
则是通过分段锁(Segment Locking)
,而ConcurrentSkipListMap
实现锁分离是通过**乐观锁**(Optimistic Locking)
,这也是为什么我们查看Node
源码时发现 value 和 next 都用 volatile 修饰(保证使用 CAS 操作数据时对其它线程可见)。 -
ConcurrentSkipListMap
锁的是对应 key 的 Node,而ConcurrentHashMap
锁的是对应 key 经过哈希运算的 Node,这个 Node 上可能存在一个到多个节点(因为多个 key 经过哈希运算得到会到一个 bucket 上),所以前者相较于ConcurrentHashMap
锁的粒度更小,并发度更高,而且前者是通过 CAS 操作而不是互斥锁修改,因此也避免了频繁的线程阻塞和上下文切换的开销。 -
ConcurrentSkipListMap
实现锁分离是通过 CAS,而 CAS 的原子操作是需要硬件支持实现的,所以修改时存在一定的开销。在极端情况下(多个线程操作同一个 key),ConcurrentSkipListMap
可能会出现大量线程修改失败不断循环重试的情况,相较于ConcurrentHashMap
分段锁+互斥锁升级的实现来说并发操作的开销更大,所以我们使用时需要考虑到这些情况。
缺点
获取size
时需要遍历链表,所以时间复杂度为O(n)
public int size() {
long count = 0;
//从存放链表的第一个元素,不断往后走,统计有效键值对个数
for (Node<K,V> n = findFirst(); n != null; n = n.next) {
if (n.getValidValue() != null)
++count;
}
return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
CopyOnWriteArrayList
add()
public boolean add(E e) {
// 用ReentrantLock来作为锁 保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 创建一个新的数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
每次add都会去创建一个新的数组,会带来一定的内存开销,所以CopyOnWriteArrayList适用于多读少写的场景。
标签:Node,Java,索引,value,---,并发,key,null,节点 From: https://www.cnblogs.com/ayu0v0/p/18532209