并发容器类
这些类专为支持并发环境中的高效数据访问和操作而设计。与传统的容器类相比,并发容器类具有更好的线程安全性和性能。在使用多线程环境时,通常推荐使用这些并发容器以避免手动加锁和同步操作。
ConcurrentHashMap
特点:一个线程安全的哈希表,支持高效的并发访问。通过分段锁定(Segment Locking)来提高性能。
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
Integer value = map.get("key1");
CopyOnWriteArrayList
特点:每当有修改(如添加、删除)操作时,都会复制整个底层数组。适用于读多写少的场景。
进行并发的读,只有写的时候会使用可重入锁(ReetrantLock),因此是线程安全的
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("one");
list.add("two");
CopyOnWriteArraySet
特点:基于 CopyOnWriteArrayList
实现的线程安全的 Set,适用于读多写少的场景。
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("element1");
线程不安全类
1、ArrayList
读写都没有加锁控制。如果并发修改会出现错误。
解决方案
使用Vector来代替ArrayList
vector使用读写都添加了同步锁,这样会保证线程安全,但是会降低效率
使用Collections.synchronizedList()
Collections 提供了方法 synchronizedList 保证 list 是同步线程安全的。
Collections 仅包含对集合进行操作或返回集合的静态方法,所以我们通常也称Collections 为集合的工具类。
对List本地的读写方法,包装了一层同步代码块。
读写操作对应的方法,使用到了同步代码块。这样锁定范围(粒度)更小,效率更高
使用CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("one");
list.add("two");
2、HashSet
读写都没有加锁控制。如果并发修改会出现错误。
解决办法
使用Collections.synchronizedSet()
Collections 提供了方法 synchronizedSet保证 set是同步线程安全的。
Collections 仅包含对集合进行操作或返回集合的静态方法,所以我们通常也称Collections 为集合的工具类。
对HashSet本地的读写方法,包装了一层同步代码块。
使用CopyOnWriteArraySet
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
set.add("element1");
CopyOnWrite写时复制容器
CopyOnWrite容器(简称COW容器)即写时复制的容器。
通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
更新操作开销大(add()、set()、remove()等等),因为要复制整个数组。
这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
当然,这个时候会抛出来一个新的问题,也就是读线程数据不一致的问题。
使用 CopyOnWrite 机制 实现 的 并发容器CopyOnWriteArrayList 和 CopyOnWriteArraySet。
3、HashMap
读写都没有加锁控制。如果并发修改会出现错误。
解决办法
使用 Hashtable
HashTable中的读写方法都是同步实例方法,都使用了synchronized。
使用Collections.synchronizedMap()
使用了同步代码块
上述两种方式的不足
即时多个线程向不同的链表中去put,也进行了加锁控制,虽然不同的链表的put操作,不存在线程安全问题。但是锁的范围太大了。导致读写效率很低。
使用ConcurrentHashMap
public class HashMapNonThreadSafeExample {
private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
map.put(j, j);
}
}).start();
}
Thread.sleep(2000);
System.out.println("Map size: " + map.size());
}
}
HashMap的结构
哈希表=数组+链表(当链表过长>=8,并且数组长度>=64,会转成红黑树)
对key进行哈希计算得到数组的索引,如果多个key进过hash计算后得:数组索引是一样的,这种现象"hash碰撞”
如何解决hash冲突(碰撞)?
1、链表法
2、开放地法(线性探测法)
如何降低碰撞的概况?
1、数组长度更长
2、hash函数如的设计
ConcurrentHashMap源码分析
由于ConcurrentHashMap在jdk1.7和jdk1.8的时候实现原理不太相同,因此需要分别来讲解一下两个不同版本的实现原理。
jdk1.7版本
java8之前ConcurrentHashMap的结构
简单来讲,就是ConcurrentHashMap 比 HashMap 多了一次hash计算过程:第1次hash定位到Segment,第2次hash定位到HashEntry,然后链表搜索找到指定节点。
在进行写操作时,只需锁住写元素所在的Segment即可(这种锁被称为分段锁),其他Segment无需加锁,从而产生锁竞争的概率大大减小,提高了并发读写的效率。该种实现方式的缺点是hash过程比普通的HashMap要长(因为需要进行两次hash操作)。
ConcurrentHashMap的put方法源码分析
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
public V put(K key, V value) {
// 定义一个Segment对象
Segment<K,V> s;
// 如果value的值为空,那么抛出异常
if (value == null) throw new NullPointerException();
// hash函数获取key的hashCode,然后做了一些处理
int hash = hash(key);
// 通过key的hashCode定位segment
int j = (hash >>> segmentShift) & segmentMask;
// 对定位的Segment进行判断,如果Segment为空,调用ensureSegment进行初始化操作(第一次hash定位)
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
// 调用Segment对象的put方法添加元素
return s.put(key, hash, value, false);
}
// Segment是一种可ReentrantLock,在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// 添加元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试对该段进行加锁,如果加锁失败,则调用scanAndLockForPut方法;在该方法中就要进行再次尝试或者进行自旋等待
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 获取HashEntry数组对象
HashEntry<K,V>[] tab = table;
// 根据key的hashCode值计算索引(第二次hash定位)
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;)
// 若不为null
if (e != null) {
K k;
// 判读当前节点的key是否和链表头节点的key相同(依赖于hashCode方法和equals方法)
// 如果相同,值进行更新
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
} else { // 若头结点为null
// 将新节点添加到链表中
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first); //头插法
int c = count + 1;
// 如果超过阈值,则进行rehash操作
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
}
}
总结:
(1)进行第一次hash计算,得到一个Segment对象
(2)调用Segment对象的put方法
(3)put方法中先使用ReentrantLock进行加锁(Segment继承了ReentrantLock)
先tryLock直接尝试加锁,如果加锁失败,则自旋加锁(while(!tryLock)), 并且会利用自旋的时间,将新HashEntry(节点对象)进行初始化【lock()加锁失败会当前线程堵塞,tryLocok+while自旋加锁失败不会让当前线程堵塞.】
(4)加锁成功,则进行第二次的hash计算,计算当前Segment下的HashEntry数组的索引位置,进而得到局部hash表中的某个链表的头部节点
(5)从头结点开始,依次遍历链表,如果找到HashEntry节点的key和put方法的key相同,则将找到的HashEntry节点的value进行替换(put方法的value),最后返回oldValue(找到的HashEntry节点的旧值)
(6)如果链表中没有找到任何一个HashEntry节点的key和put方法的key相同,则node=new HashEntry<K,V>(hash,key,value,first);利用put方法的key和value创建一个新的节点HashEntry,这个新节点插入到链表的头部【头插法】
(7)释放锁
jdk1.8版本
在JDK1.8中为了进一步优化ConcurrentHashMap的性能,去掉了Segment分段锁的设计。
在数据结构方面,则是跟HashMap一样,使用一个哈希表table数组(数组 + 链表 + 红黑树) ;
而线程安全方面是结合CAS机制 + 局部锁实现的,减低锁的粒度,提高性能。
同时在HashMap的基础上,对哈希表table数组和链表节点的value,next指针等使用volatile来修饰,从而实现线程可见性。
ConcurrentHashMap中的重要成员变量
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
// Node数组
transient volatile Node<K,V>[] table;
// Node类的定义
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; // 当前key的hashCode值
final K key; // 键
volatile V val; // 值
volatile Node<K,V> next; // 下一个节点
}
// TreeNode类的定义
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // 父节点
TreeNode<K,V> left; // 左子节点
TreeNode<K,V> right; // 右子节点
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red; // 节点的颜色状态
}
}
对应的结构
当链表的长度超过8并且数组的长度超过64的时候,就会把链表变成树。
ConcurrentHashMap的put方法源码分析
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
// 添加元素
public V put(K key, V value) {
return putVal(key, value, false);
}
// putVal方法定义
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key为null直接抛出异常
if (key == null || value == null) throw new NullPointerException();
// 计算key所对应的hashCode值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 哈希表如果不存在,那么此时初始化哈希表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//通过hash值计算key在table表中的索引,将其值赋值给变量i,然后根据索引找到对应的Node,如果Node为null,做出处理
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 新增链表头结点,cas方式添加到哈希表table
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// f为链表头结点,使用synchronized加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 节点已经存在,更新value即可
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 该key对应的节点不存在,则新增节点并添加到该链表的末尾
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);//尾插法
break;
}
}
} else if (f instanceof TreeBin) { // 红黑树节点,则往该红黑树更新或添加该节点即可
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 判断是否需要将链表转为红黑树
//当链表的长度超过8并且数组的长度超过64的时候,就会把链表变成树。
// 红黑树是一种自平衡的二叉搜索树。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
// CAS算法的核心类
private static final sun.misc.Unsafe U;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
...
} catch (Exception e) {
throw new Error(e);
}
}
// 原子获取链表节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// CAS更新或新增链表节点
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
}
标签:Node,JUC,hash,容器,value,链表,key,节点 From: https://www.cnblogs.com/21CHS/p/18528837总结
(1)进行第一个hash计算,得到Node数组的一个索引
(2)从table数组中根据上一步的索引获取到一个Node对像,此时他就是链表的头节点。
(3)synchronized使用同步代码块进行加锁控制,此时锁的粒度(范围),就是一个头节点对象,只锁住了一个链表。多个线程向同一个链表中去put时,才会存在锁的竟争(这里的锁对象就是头节点对象)。这个版本下的锁的粒度最小,并发度最高。
【hashTablet每次锁住整个hash表,java8之前的ConcurrentHashMap锁住一段hash表,java8之后只锁住一个链表】
(4)开始从头结点开始遍历当前这个链表,如果链表中的某个节点Node的key和put方法的key相同,则将当前Node节点的val替换成put方法的vaIue,返回oldVal(当前Node节点的旧值),put操作结束。
(5)如果链表中没有找到key相同的Node节点,则利用put方法的key和value创建一个新的Node节点,挂在链表的最后(尾插法)
(6)由于使用的synchronazed同步代码块进行控制,所以不需要释放锁。