前言:
- 文章内容:线程与进程、线程生命周期、线程中断、线程常见问题总结
- 本文章内容来源于笔者学习笔记,内容可能与相关书籍内容重合
- 偏向于知识核心总结,非零基础学习文章,可用于知识的体系建立,核心内容复习,如有帮助,十分荣幸
- 相关文献:并发编程实战、计算机原理
CAS的概括:
- 比较并变换算法,判断需要更新的值是否等于预期值,成立则把需要更新的值的值设置为新值。不成立说明有其他线程修改了原来的值,则当前线程放弃更新
- CAS属于CPU层面的原子指令,保证原子性。多个线程CAS操作,只有一个线程成功,失败允许再次尝试或放弃
- Java实现CAS算法是基于Unsafe类:位于sum.misc包下,其方法都是public native,表示都是由JVM使用C或C++去实现。JDK提供了一些用于原子操作的类,在java.util.concurrent.atomic包下。包括原子更新基本类型、更新数组、更新引用、更新属性
JDK8对CAS的优化:
- 大量线程同时并发修改一个AtomicInteger变量,可能存在多个线程不停自旋,进入无限重复循环,导致大量线程循环,自旋转,性能和效率都不好。优化栗子:JDK8使用LongAdder来优化这个问题,采用分段CAS及自动分段迁移的方式,提升多线程高并发执行CAS操作的性能
- 原理:
- 底层有一个base值,多线程不停累加数值都是对base值进行累加。如果发现并发更新线程数量过多,开始施行分段CAS机制,内存生成Cell数组,每个数组是一个数值分段
- 让大量线程分别对不同Cell内部的value值进行CAS累加操作,分散CAS计算压力。大幅度降低多线程并发更新同一个数值时,出现的无限循环问题
- 自动分段迁移机制,当某个Cell的value执行CAS失败时,会自动去找另一个Cell分段内的value值进行CAS操作,解决线程空旋转,自旋不停等待执行CAS操作的问题
- 最后要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回
CAS三大问题及解决方案:
- ABA:即一个值原来是A,变成B后又变回了A,CAS无法检查出变化,但是实际上值被更新了两次。解决思路是在变量前追加版本号或时间戳,Java中提供AtomicStampedReference来解决。该类的compareAndSet方法:先检查当前引用是否等于预期引用,并检查当前时间戳是否等于预期时间戳。两者都相等,才使用CAS更新。
- 循环时间开销大:CAS多与自旋结合,如果自旋CAS长时间不成功,会占用大量CPU资源。比如在synchronized里面针对CAS自旋有做阈值设置,同时有自旋锁,和自适应自旋锁来优化。
- 只能保证一个共享变量的原子操作:JDK5后提供AtomicReference类保证对象之间的原子性,把多个变量放入一个对象里进行CAS操作。
AQS
AQS相关信息:
- 抽象队列同步器,用来构建锁和同步器的框架。ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue等皆是基于AQS的。
- 一个volatile修饰的int类型state变量,来判断是否可以获取资源,通过CAS修改保证原子性,实现加解锁,可重入。
- Node用来封装线程信息,包含线程信息、前驱后驱节点、头尾结点来构成双向链表
- 一个加锁线程标识,来记录当前加锁线程信息。
- 资源模式分为独占模式和共享模式,资源是独占时,一次只能一个线程获取,如ReentrantLock。资源是共享时,可以被多个线程获取,如Semaphore/CountDownLatch。
AQS加锁原理大致:
下面我们进行源码分析:以ReentrantLock为起点,分析源码
new ReentrantLock(); public ReentrantLock() { sync = new NonfairSync(); } //默认构造函数,创建了一个Sync,NonfairSync,应该是专门用于加锁的组件 //NonfairSync是Sync的子类,非公平锁。 public void lock() { sync.lock(); } //ReentrantLock加锁解锁时,基于底层的Sync进行加锁解锁。 private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { //Sync是抽象静态内部类,是AQS的子类。AbstractQueuedSynchronizer抽象队列同步器
AQS如何基于CAS实现高效加锁:
默认非公平锁加锁: final void lock() { if (compareAndSetState(0, 1)) //公平锁少了这一步判断,直接执行的acquire(1),所有线程要先来后到。 //非公平锁加锁,线程进来只要能够执行CAS成功,就能拿到锁。哪怕很多线程在队列中等待, //但是某个线程释放锁的一瞬间,可能有其他晚到的线程突然争抢到了锁。而非公平不行,需要进行排队等待操作。 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } compareAndSetState(0, 1):采用CAS,进行state变量的替换,如果state=0,就设置为1。 setExclusiveOwnerThread(Thread.currentThread()): 加锁成功,设置当前线程自己是加了一个独占锁的线程,标识自己是加锁线程
这里总结一下问题:ReentrantLock为何默认是非公平锁,原因是非公平每次加锁都会直接CAS去加锁,失败后再去执行入队操作。而非公平锁不会直接CAS,而是要先判断前面是否有排队的线程,大概率会加锁失败。在入队后,公平锁会被挂起的,这就导致会带来更多线程上下文切换。
AQS利用state实现可重入:
通过CAS进行state的修改,同一个线程重复加锁,会进行state的叠加,实现可重入
compareAndSetState(0, 1):重复加锁,cas会失败。就会走acquire(1); //acquire方法逻辑 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //此时tryAcquire(arg)的实现在NonfairSync类中 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } //入参acquires为1 final boolean nonfairTryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取state变量 int c = getState(); //如果为0,代表无线程持有锁 if (c == 0) { //CAS进行0变1 if (compareAndSetState(0, acquires)) { //加锁成功,进行加锁线程设置 setExclusiveOwnerThread(current); return true; } } //不为0,代表有线程持有锁。判断加锁线程是否是当前线程 else if (current == getExclusiveOwnerThread()) { //进行state的叠加 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置state,state这里就是同一个线程重复加锁的总次数 setState(nextc); //重入加锁成功 return true; } //失败,表示加锁线程不是当前线程,其他线程仍持有锁,加锁失败 return false; }
AQS的阻塞等待队列:
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //tryAcquire加锁失败,就会把当前线程入队等待。 //先看addWaiter(Node.EXCLUSIVE),EXCLUSIVE独占锁,同一时间只能有一个线程获取到这个锁。 private Node addWaiter(Node mode) { //将当前线程封装为Node,并标识为独占锁 Node node = new Node(Thread.currentThread(), mode); ... } //我们来看下Node结构 volatile int waitStatus;//核心变量,标识线程在队列中的等待状态。 volatile Node prev;//指向当前Node的上一个Node volatile Node next;//指向当前Node的下一个Node volatile Thread thread;//Node里封装的线程 Node nextWaiter;//可以认为是下一个等待的线程,在独占锁下是null //获取不到锁,处于等待的线程,会被封装为Node,并且有指向。多个阻塞等待状态的线程封装为一个Node双向链表。从AbstractQueuedSynchronizer来看AQS的组成:Node-自定义数据结构,存储线程信息和组成双写链表即队列。state核心变量,加锁解锁基于这个变量完成。
- Node SHARED = new Node():标记节点在共享模式下等待
- Node EXCLUSIVE = null:标记节点在独占模式下等待
- 节点状态:waitStatus 节点状态作用于对应的线程
- 1:节点已被取消,当超时或中断会变更为此状态
- -1:后继节点等待当前节点唤醒,后继节点入队会将前继节点状态更新为-1
- -2:表示该结点(对应的线程)在等待,其他线程调用了Codition的signal方法后,该结点将从等待队列转移到同步队列,等待获取同步锁
- -3:共享模式下,表示有资源可用,新头结点需要继续唤醒后继结点
- 0:新节点入队时的默认状态
- 负值表示节点处于有效等待状态,正值表示结点已经取消。
多个阻塞等待线程的Node形成一个双向链表:
核心流程图:
加锁失败的入队逻辑:
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); addWaiter(Node.EXCLUSIVE), arg);//线程入队逻辑 //继续看一下添加Node的逻辑 private Node addWaiter(Node mode) { //封装Node,独占锁mode实际是null Node node = new Node(Thread.currentThread(), mode); //private transient volatile Node tail:标记Node链表中的尾部Node Node pred = tail; //如果尾部不为null,代表链表不是空的 if (pred != null) { //设置新Node的上一个指针指向队尾Node node.prev = pred; //CAS:新Node作为链表的尾部Node if (compareAndSetTail(pred, node)) { //上一个Node的next指针指向新Node pred.next = node; return node; } } enq(node); //返回当前线程的Node return node; } //如果尾部是null,代表此时执行代码时,没有线程是在队列的 private Node enq(final Node node) { for (;;) { Node t = tail; //队尾指针是null if (t == null) { //创建一个空Node CAS把head设置为空Node 此时队尾Node也是队头 即初始化Node队列 //private transient volatile Node head; 标记Node链表中的头部Node if (compareAndSetHead(new Node())) tail = head; } else { //如果不是空的,可能是已经有其他的线程进行了入队 进行指针变换,并CAS把新Node作为链表的尾部Node node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued(addWaiter(当前线程的Node), 1)
final boolean acquireQueued(final Node node, int arg) { //是否成功竞争到锁 boolean failed = true; try { boolean interrupted = false; for (;;) { //获取Node的上一个节点 没有获取到会抛NPE final Node p = node.predecessor(); //如果上一个节点是head的Node,并尝试获取锁 if (p == head && tryAcquire(arg)) { //如果获取成功,就加锁成功,标识当前Node为head,设置Node的上一个指针为null,并清除Node的线程信息 setHead(node); //把当前Node下一个指针设置为null,完成当前Node移除出队列的操作。 p.next = null; // help GC failed = false; return interrupted; } //上一个节点不是head,或加锁失败了。shouldParkAfterFailedAcquire:判断是否需要将当前线程挂起,阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//parkAndCheckInterrupt:利用LockSupport.park进行阻塞 interrupted = true; } } finally { if (failed) //竞争锁失败,被中断取消竞争锁才会执行这个函数,取消同步队列同步队列中的等待结点 cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取上一个节点的状态 int ws = pred.waitStatus; //-1:后继节点等待当前节点唤醒 新的Node需要被挂起 if (ws == Node.SIGNAL) return true; //大于0:上一个节点已被取消,当超时或中断会变更为此状态。 //向前遍历寻找状态不大于0的结点,这个过程会回收这些已取消的结点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //把上一个的ws设置为-1 表示后继节点等待当前节点唤醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //新Node不需要被挂起 return false; }
公平锁加锁:设置new ReentrantLock(true),走公平锁FairSync
//公平锁lock方法,也是走acquire(ing arg): public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //其中tryAcquire方法的实现走的是公平锁的实现 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //资源未被占用,当前线程尝试竞争资源 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //锁资源未被释放,判断持有资源的线程是否是当前现在,是的话执行可重入逻辑 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //获取锁失败 return false; } //公平锁核心:每次加锁都要判断,前面有没有排队等待的线程,如果没有就尝试加锁,如果有就不能尝试加锁 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } //方法返回false才可以去竞争锁。 //h != t:如果h!=t,说明有人在排队,不可参与竞争。h==t链表只存在一个节点才可以去竞争 //(s = h.next) == null :如果head的下一个节点是null,说明没有人在排队,可以竞争锁 //或者排在队头的节点不是当前线程,表示可以尝试加锁
tryLock如何实现加锁等待一段时间后放弃:
在获取资源时,会获取等待时间。如果获取失败了,计算剩余时间,剩余时间小于0,就放弃加锁。剩余时间大于默认的时间就阻塞剩下的时间
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } //我们解析doAcquireNanos private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; //获取等待时间 final long deadline = System.nanoTime() + nanosTimeout; //封装Node final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { //获取Node的上一个节点 final Node p = node.predecessor(); //如果是上一节的是头结点,且当前线程获取资源成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } //获取剩下还有多少时间 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) //剩余时间小于0,代表阻塞时间已经过了,返回false,放弃加锁。 return false; // 判断是否需要将当前线程挂起,阻塞等待 并且剩余时间是否大于默认时间。 满足就进行阻塞。 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
可重入锁释放
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //释放资源结束后,如果头结点不为null,且头结点状态不是0,为0代表是新入队的Node if (h != null && h.waitStatus != 0) //唤醒队头的元素,让阻塞线程尝试抢占锁 unparkSuccessor(h); return true; } return false; } //尝试释放锁 protected final boolean tryRelease(int releases) { //减一次state int c = getState() - releases; //当前加锁线程不是要释放的线程 抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //当state=0,代表释放完成 free = true; //设置当前加锁线程为null setExclusiveOwnerThread(null); } //设置state setState(c); return free; } //唤醒后继结点线程,此时后继节点是Node链表中的队头 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //如果ws小于0 设置状态为0,代表新入队 compareAndSetWaitStatus(node, ws, 0); //获取下一个节点 Node s = node.next; //下一个节点为null,或者节点状态大于0,没有下一个节点或者下一个节点以及被取消了 if (s == null || s.waitStatus > 0) { s = null; //把队列里未被取消的节点进行前移 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //下一个节点不为null,就直接唤醒。 if (s != null) LockSupport.unpark(s.thread); }
标签:Node,node,加锁,AQS,CAS,源码,线程,节点 From: https://www.cnblogs.com/zhangbLearn/p/16638531.html