线程同步的几个概念
synchronized关键字
synchronized, wait, notify 是任何对象都具有的同步工具。wait/notify必须存在于synchronized块中。详情如下:
方法或代码块的互斥性来完成实际上的一个原子操作。(方法或代码块在被一个线程调用时,其他线程处于等待状态)
所有的Java对象都有一个与synchronzied关联的监视器对象(monitor),允许线程在该监视器对象上进行加锁和解锁操作。【在非多线程编码时该监视器不发挥作用,反之如果在synchronized 范围内,监视器发挥作用。】
a、静态方法:Java类对应的Class类的对象所关联的监视器对象。
b、实例方法:当前对象实例所关联的监视器对象。
c、代码块:代码块声明中的对象所关联的监视器对象。
Object类的wait、notify和notifyAll方法
生产者和消费者模式,判断缓冲区是否满来消费,缓冲区是否空来生产的逻辑。如果用while 和 volatile也可以做,不过本质上会让线程处于忙等待,占用CPU时间,对性能造成影响。
- wait: 将当前线程放入,该对象的等待池中,线程A调用了B对象的wait()方法,线程A进入B对象的等待池,并且释放B的锁。(这里,线程A必须持有B的锁,所以调用的代码必须在synchronized修饰下,否则直接抛出java.lang.IllegalMonitorStateException异常)。
- notify:将该对象中等待池中的线程,随机选取一个放入对象的锁池,当当前线程结束后释放掉锁, 锁池中的线程即可竞争对象的锁来获得执行机会。
- notifyAll:将对象中等待池中的线程,全部放入锁池。
- (notify锁唤醒的线程选择由虚拟机实现来决定,不能保证一个对象锁关联的等待集合中的线程按照所期望的顺序被唤醒,很可能一个线程被唤醒之后,发现他所要求的条件并没有满足,而重新进入等待池。因为当等待池中包含多个线程时,一般使用notifyAll方法,不过该方法会导致线程在没有必要的情况下被唤醒,之后又马上进入等待池,对性能有影响,不过能保证程序的正确性)
volatile关键词:
volatile关键词:用来对共享变量的访问进行同步,上一次写入操作的结果对下一次读取操作是肯定可见的。(在写入volatile变量值之后,CPU缓存中的内容会被写回内存;在读取volatile变量时,CPU缓存中的对应内容会被置为失效,重新从主存中进行读取),volatile不使用锁,性能优于synchronized关键词。
用来确保对一个变量的修改被正确地传播到其他线程中。
但是注意,一般多线程出现数据共享问题一般出现在修改中,而volatile只保证数据可见,即在使用该数据时去主存中刷新最新的值,当后面需要修改的时候,并不能保证该值没有被其他线程修改,所以需要修改数据的时候,还需要其他同步手段,一般结合cas使用。
ThreadLocal类
用处:保存线程的独立变量。对一个线程类(继承自Thread)
当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。常用于用户登录控制,如记录session信息。
实现:每个Thread都持有一个TreadLocalMap类型的变量(该类是一个轻量级的Map,功能与map一样,区别是桶里放的是entry而不是entry的链表。功能还是一个map。)以本身为key,以目标为value。
主要方法是get()和set(T a),set之后在map里维护一个threadLocal -> a,get时将a返回。ThreadLocal是一个特殊的容器。
ThreadLocal的接口方法
ThreadLocal类接口很简单,只有4个方法:
-
void set(Object value)设置当前线程的线程局部变量的值。
-
public Object get()该方法返回当前线程所对应的线程局部变量。
-
public void remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是JDK 5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。
-
protected Object initialValue()返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次。ThreadLocal中的缺省实现直接返回一个null。
相关lock类
java.util.concurrent这个包给提供了很多同步相关的类,后面会有两章来讲其相关类,这里以ReentrantLock为例,ReentrantLock类是可重入、互斥、实现了Lock接口的锁,它与使用synchronized方法和代码块具有相同的基本行为和语义,并且扩展了其能力。主要应用
lock() : 获得锁
unlock() : 释放锁synchronized和ReentrantLock对比
1.synchronized是java自带的关键字,直接作用与jvm层面,跟obeject类紧密相关,不会生产死锁;ReentrantLock是以一个java类,加锁和解锁必须成对出现,否者会出现死锁的可能。
2.Lock是可以中断锁,Synchronized是非中断锁,必须等待线程执行完成释放锁。
3.lock比synchronized更灵活,lock可实现公平锁,读写锁等等应用场景更灵活。
常用集合的线程安全问题
不知道大家用没用过迭代器删除元素,没有的话可以试试,大家就知道了。
Arraylist与hashmap集合类线程使用的时候都存在问题:
Arraylist底层是数组,在多线程环境下极其容易发生越界
hashmap底层是数组+链表,多线程环境下本身就容易发生数据丢失(比如两个线程同时往里面put,两者的key相同)
1.7 由于链表元素使用头插法,所以容易发生链表元素你指我,我指你的情况。
(根据我查到的信息,1.7使用头插法是因为最近插入的元素回被经常使用,但事实证明这玩意不行,所以1.8就改成了尾插法 注:1.7之前不知道是什么方法)
古老线程安全版本的Arraylist和Hashmap Vector和HashTable
Araylist和HashMap加个锁就是Vector和HashTable
这两个几乎所有方法都使用了synchronized锁,只有一些细节上的区别。
注:之所以说是古老,是因为ArrayList和hashMap一直在改进,这两个就成了历史了,反而现在问它们的区别其实意义不大了。
Vector与Arraylist的区别
主要是扩容不一样
HashTable和Hashmap的区别
- HashMap允许将 null 作为一个 entry 的 key 或者 value,而 Hashtable 不允许。
- HashTable 继承自 Dictionary 类,而 HashMap 是 Java1.2 引进的 Map interface 的一个实现。
- Hashtable 和 HashMap 采用的 hash/rehash 算法都不一样。
现在线程安全版本的Arraylist和Hashmap CopyOnWriteList和ConrrentHashMap
CopyOnWriteList
CopyOnWriteList的核心就是写入的时候加锁,保证线程安全,读取的时候不加锁。
简单点说就是:
在进行扩容的时候他会在新建一个数组,然后把数据复制过来,在此期间原来的数组依旧可以进行读,但不可写
ConrrentHashMap
ConrrentHashMap是一个线程安全版本的hashmap,他在1.7与1.8的实现是不一样的
1.7版本的ConrrentHashMap
put
JDK1.7 中的 ConcurrentHashMap 是由 Segment
数组结构和 HashEntry
数组结构组成,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment ),每个小数组有 n 个 HashEntry 组成。
如下图所示,首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问,实现了真正的并发访问。
需要注意的是segment本身其实是继承了ReentrantLock,所以 Segment 是一种可重入锁,扮演锁的角色。Segment 默认为 16,也就是并发度为 16。
存放元素的 HashEntry,也是一个静态内部类,主要的组成如下:
他的put方法因为他的独特结构,所以:
先定位到相应的 Segment ,然后再进行 put 操作。
首先会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut()
自旋获取锁。
- 尝试自旋获取锁。
- 如果重试的次数达到了
MAX_SCAN_RETRIES
则改为阻塞锁获取,保证能获取成功。
get方法
首先,根据 key 计算出 hash 值定位到具体的 Segment ,再根据 hash 值获取定位 HashEntry 对象,并对 HashEntry 对象进行链表遍历,找到对应元素。
由于 HashEntry 涉及到的共享变量都使用 volatile 修饰,volatile 可以保证内存可见性,所以每次获取时都是最新值。
1.8版本的ConrrentHashMap
在数据结构上, JDK1.8 中的ConcurrentHashMap 选择了与 HashMap 相同的Node数组+链表+红黑树结构;在锁的实现上,抛弃了原有的 Segment 分段锁,采用CAS + synchronized
实现更加细粒度的锁。
将锁的级别控制在了更细粒度的哈希桶数组元素级别,也就是说只需要锁住这个链表头节点(红黑树的根节点),就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。
JDK1.8 中为什么使用内置锁 synchronized替换 可重入锁 ReentrantLock?
- 在 JDK1.6 中,对 synchronized 锁的实现引入了大量的优化,并且 synchronized 有多种锁状态,会从无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁一步步转换。
- 减少内存开销 。假设使用可重入锁来获得同步支持,那么每个节点都需要通过继承 AQS 来获得同步支持。但并不是每个节点都需要获得同步支持的,只有链表的头节点(红黑树的根节点)需要同步,这无疑带来了巨大内存浪费。
put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算哈希值
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //判断是否需要初始化
tab = initTable();//初始化put
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//根据hash值求出下标,查询指定位置,如果为空,那么就通过cas的方式尝试添加
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;// no lock when adding to empty bin
}
//弱国f.hash = MOVED = -1,说明正在扩容,参与一起扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//如果都不满足,那么synchronized锁住f节点,判断链表还是红黑树,遍历插入
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//链表长度超过8,数组扩容或者链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//记数
return null;
}
- 根据 key 计算出 hash 值;
- 判断是否需要进行初始化;
- 定位到 Node,拿到首节点 f,判断首节点 f:
- 如果为 null ,则通过 CAS 的方式尝试添加;
- 如果为
f.hash = MOVED = -1
,说明其他线程在扩容,参与一起扩容; - 如果都不满足 ,synchronized 锁住 f 节点,判断是链表还是红黑树,遍历插入;
注意:当在链表长度达到 8 的时候,数组扩容或者将链表转换为红黑树。
总结:如果在数组上这个位置还没有值,就用cas插入,如果在链表和红黑树,就先用那么synchronized锁住这个节点
get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//计算哈希值
if ((tab = table) != null && (n = tab.length) > 0 && //判断数组是否为空
(e = tabAt(tab, (n - 1) & h)) != null) {//查看数组下标有没有元素
//如果在数组上,就直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果是红黑树,就在红黑树查
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果是链表,就循环遍历
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 根据 key 计算出 hash 值,判断数组是否为空;
- 如果是首节点,就直接返回;
- 如果是红黑树结构,就从红黑树里面查询;
- 如果是链表结构,循环遍历判断。
1.8版本下的扩容transfer
什么时候扩容
1、当前容量超过阈值
2、当链表中元素个数超过默认设定(8个),当数组的大小还未超过64的时候,此时进行数组的扩容,如果超过则将链表转化成红黑树
3、当发现其他线程扩容时,帮其扩容
如果准备加入扩容的线程,发现以下情况,放弃扩容,直接返回。
a、发现transferIndex=0,即所有node均已分配
b、发现扩容线程已经达到最大扩容线程数
ConcurrentHashMap在多线程环境下,如果发现正在扩容,就会辅助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算需要迁移多少个hash桶(MIN_TRANSFER_STRIDE该值作为下限,以避免扩容线程过多)
//每核处理的量小于16,则强制赋值16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")//扩容一倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //构建一个nextTable对象,其容量为原来容量的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 当advance == true时,表明该节点已经处理过了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//1 逆序迁移已经获取到的hash桶集合,如果迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶
//2 如果transferIndex=0,表示所以hash桶均被分配,将i置为-1,准备退出transfer方法
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 控制 --i ,遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 用CAS计算得到的transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 已经完成所有节点复制了
if (finishing) {
nextTable = null;
table = nextTab;// table 指向nextTable
sizeCtl = (n << 1) - (n >>> 1);// sizeCtl阈值为原来的1.5倍
return;//跳出死循环
}
// CAS 更扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 遍历的节点为null,则放入到ForwardingNode 指针节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
// 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 节点加锁
synchronized (f) {
// 节点复制工作
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 ,表示为链表节点
if (fh >= 0) {
// 构造两个链表 一个是原链表 另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置处插上链表
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置处插上链表
setTabAt(nextTab, i + n, hn);
// 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
setTabAt(tab, i, fwd);
// advance = true 可以执行--i动作,遍历节点
advance = true;
}
// 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 扩容后树节点个数若<=6,将树转链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
nextTable
扩容期间,将table数组中的元素 迁移到 nextTable
sizeCtl属性
private transient volatile int sizeCtl;
多线程之间,以volatile的方式读取sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。通过cas设置sizeCtl属性,告知其他线程ConcurrentHashMap的状态变更。
不同状态,sizeCtl所代表的含义也有所不同。
未初始化:sizeCtl=0:表示没有指定初始容量。sizeCtl>0:表示初始容量。
初始化中:sizeCtl=-1,标记作用,告知其他线程,正在初始化
正常状态:sizeCtl=0.75n,扩容阈值
扩容中:sizeCtl < 0 : 表示有其他线程正在执行扩容
sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT)+2 表示此时只有一个线程在执行扩容
123456
transferIndex 扩容索引
扩容索引,表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程,并发安全地获取迁移任务(hash桶)。
private transient volatile int transferIndex;
private static final int MIN_TRANSFER_STRIDE = 16; //扩容线程每次最少要迁移16个hash桶
12
1、在扩容之前,transferIndex 在数组的最右边 。此时有一个线程发现已经到达扩容阈值,准备开始扩容。
2、扩容线程,在迁移数据之前,首先要将transferIndex右移(以CAS的方式修改 transferIndex=transferIndex-stride(要迁移hash桶的个数)),获取迁移任务。每个扩容线程都会通过for循环+CAS的方式设置transferIndex,因此可以确保多线程扩容的并发安全。
换个角度,我们可以将待迁移的table数组,看成一个任务队列,transferIndex看成任务队列的头指针。而扩容线程,就是这个队列的消费者。扩容线程通过CAS设置transferIndex索引的过程,就是消费者从任务队列中获取任务的过程。为了性能考虑,我们当然不会每次只获取一个任务(hash桶),因此ConcurrentHashMap规定,每次至少要获取16个迁移任务(迁移16个hash桶,MIN_TRANSFER_STRIDE = 16)
ForwardingNode节点
1、标记作用,表示其他线程正在扩容,并且此节点已经扩容完毕
2、关联了nextTable,扩容期间可以通过find方法,访问已经迁移到了nextTable中的数据
扩容遍历与计数
ConcurrentHashMap实现原理:扩容遍历与计数
http://www.zijin.net/news/tech/339960.html
在此基础上,为了与扩容操作并发执行,遍历操作这样执行:
1) 依然是从前往后逐个访问每个bin,
2) 如果遇到FordwardingNode,则把当前table引用、当前bin的访问位置和当前table总长度保存到table stack中,然后跳转到FordwardingNode所指向的新table,
3) 当前的索引index保持不变,在新table中按这个index访问(因为map每次扩容都是大小扩展为原来的2倍,每个Node在新table中的索引要么保持不变要么后移)。
4) 访问完新table中的index位置的bin之后,再访问index+baseSize这个位置上的bin (baseSize是老table的总长度);
5) 从table stack还原出之前的table引用、index访问位置和table总长度,继续向后遍历。
这样就可以在有扩容操作也在进行的条件下,同时支持遍历操作,且保证每个Node只被访问一次。因为Node在老table和新table之间有固定对应关系,用这个条件保证。
其他
无锁的执行者-CAS
CAS的全称是Compare And Swap 即比较交换,其算法核心思想如下
执行函数:CAS(V,E,N)
其包含3个参数:V表示要更新的变量;E表示预期值;N表示新值
123
如果V值等于E值,则将V的值设为N。若V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。
通俗的理解就是CAS操作需要我们提供一个期望值,当期望值与当前线程的变量值相同时,说明还没线程修改该值,当前线程可以进行修改,也就是执行CAS操作,但如果期望值与当前线程不符,则说明该值已被其他线程修改,此时不执行更新操作,但可以选择重新读取该变量再尝试再次修改该变量,也可以放弃操作。
由于CAS操作属于乐观派,它总认为自己可以成功完成操作,当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作,这点从图中也可以看出来。基于这样的原理,CAS操作即使没有锁,同样知道其他线程对共享资源操作影响,并执行相应的处理措施。
同时从这点也可以看出,由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说无锁操作天生免疫死锁。
TreeNodes
和HashMap 结构类似,当链表节点数超过指定阈值8的话,会转换成红黑树。
8这个阈值定义在HashMap中,这段注释只说明了8是bin(bin就是bucket,即HashMap中hashCode值一样的元素保存的地方)从链表转成树的阈值.
TreeNodes占用空间是普通Nodes的两倍,所以只有当bin包含足够多的节点时才会转成TreeNodes,而是否足够多就是由TREEIFY_THRESHOLD的值决定的。当bin中节点数变少时,又会转成普通的bin。并且我们查看源码的时候发现,链表长度达到8就转成红黑树,当长度降到6就转成普通bin。
/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2 and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon shrinkage.
*/
static final int TREEIFY_THRESHOLD = 8;
原文链接:https://blog.csdn.net/wo1901446409/article/details/97388971
1234567891011121314
为什么要使用CAS+Synchronized取代Segment+ReentrantLock
假设你对CAS,Synchronized,ReentrantLock这些知识很了解,并且知道AQS,自旋锁,偏向锁,轻量级锁,重量级锁这些知识,也知道Synchronized和ReentrantLock在唤醒被挂起线程竞争的时候有什么区别。
Synchronized上锁的对象,请记住,Synchronized是靠对象的对象头和此对象对应的monitor来保证上锁的,也就是对象头里的重量级锁标志指向了monitor,而monitor内部则保存了一个当前线程,也就是抢到了锁的线程.
那么这里的f是什么呢?它是Node链表里的每一个node,也就是说,Synchronized是将每一个node对象作为了一个锁,这样做的好处是将锁细化了,也就是说,除非两个线程同时操作一个node,注意,是一个node而不是一个Node链表,那么才会争抢同一把锁.
如果使用ReentrantLock其实也可以将锁细化成这样的,只要让Node类继承ReentrantLock就行了,这样的话调用f.lock()就能做到和Synchronized(f)同样的效果,但为什么不这样做呢?
试想一下,锁已经被细化到这种程度了,那么出现并发争抢的可能性还高吗?哪怕出现争抢了,只要线程可以在30到50次自旋里拿到锁,那么Synchronized就不会升级为重量级锁,而等待的线程也就不用被挂起,我们也就少了挂起和唤醒这个上下文切换的过程开销.
但如果是ReentrantLock,它只有在线程没有抢到锁,然后新建Node节点后再尝试一次而已,不会自旋,而是直接被挂起,这样一来就很容易多出线程上下文开销的代价.当然,你也可以使用tryLock(),但是这样又出现了一个问题,你怎么知道tryLock的时间呢?在时间范围里还好,假如超过了呢?
所以,在锁被细化到如此程度上,使用Synchronized是最好的选择了.这里再补充一句,Synchronized和ReentrantLock他们的开销差距是在释放锁时唤醒线程的数量,Synchronized是唤醒锁池里所有的线程+刚好来访问的线程,而ReentrantLock则是当前线程后进来的第一个线程+刚好来访问的线程.
如果是线程并发量不大的情况下,那么Synchronized因为自旋锁,偏向锁,轻量级锁的原因,不用将等待线程挂起,偏向锁甚至不用自旋,所以在这种情况下要比ReentrantLock高效.
参考资料
深入分析ConcurrentHashMap1.8的扩容实现
https://www.jianshu.com/p/f6730d5784ad
Java的ConcurrentHashMap
https://www.jianshu.com/p/5dbaa6707017
ConcurrentHashMap 1.8为什么要使用CAS+Synchronized
https://www.cnblogs.com/yangfeiORfeiyang/p/9694383.html
ConcurrentHashMap实现原理:扩容遍历与计数
http://www.zijin.net/news/tech/339960.html
ConcurrentHashMap的红黑树实现分析
https://www.jianshu.com/p/23b84ba9a498