首页 > 编程语言 >并发编程五、CAS与AQS原理及源码分析

并发编程五、CAS与AQS原理及源码分析

时间:2022-08-30 11:25:56浏览次数:47  
标签:Node node 加锁 AQS CAS 源码 线程 节点

前言:

  1. 文章内容:线程与进程、线程生命周期、线程中断、线程常见问题总结
  2. 本文章内容来源于笔者学习笔记,内容可能与相关书籍内容重合
  3. 偏向于知识核心总结,非零基础学习文章,可用于知识的体系建立,核心内容复习,如有帮助,十分荣幸
  4. 相关文献:并发编程实战、计算机原理

CAS的概括:

  1. 比较并变换算法,判断需要更新的值是否等于预期值,成立则把需要更新的值的值设置为新值。不成立说明有其他线程修改了原来的值,则当前线程放弃更新
  2. CAS属于CPU层面的原子指令,保证原子性。多个线程CAS操作,只有一个线程成功,失败允许再次尝试或放弃
  3. Java实现CAS算法是基于Unsafe类:位于sum.misc包下,其方法都是public native,表示都是由JVM使用C或C++去实现。JDK提供了一些用于原子操作的类,在java.util.concurrent.atomic包下。包括原子更新基本类型、更新数组、更新引用、更新属性

JDK8对CAS的优化:

  • 大量线程同时并发修改一个AtomicInteger变量,可能存在多个线程不停自旋,进入无限重复循环,导致大量线程循环,自旋转,性能和效率都不好。优化栗子:JDK8使用LongAdder来优化这个问题,采用分段CAS及自动分段迁移的方式,提升多线程高并发执行CAS操作的性能
  • 原理:
  1. 底层有一个base值,多线程不停累加数值都是对base值进行累加。如果发现并发更新线程数量过多,开始施行分段CAS机制,内存生成Cell数组,每个数组是一个数值分段
  2. 让大量线程分别对不同Cell内部的value值进行CAS累加操作,分散CAS计算压力。大幅度降低多线程并发更新同一个数值时,出现的无限循环问题
  3. 自动分段迁移机制,当某个Cell的value执行CAS失败时,会自动去找另一个Cell分段内的value值进行CAS操作,解决线程空旋转,自旋不停等待执行CAS操作的问题
  4. 最后要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回

CAS三大问题及解决方案:

  • ABA:即一个值原来是A,变成B后又变回了A,CAS无法检查出变化,但是实际上值被更新了两次。解决思路是在变量前追加版本号或时间戳,Java中提供AtomicStampedReference来解决。该类的compareAndSet方法:先检查当前引用是否等于预期引用,并检查当前时间戳是否等于预期时间戳。两者都相等,才使用CAS更新。
  • 循环时间开销大:CAS多与自旋结合,如果自旋CAS长时间不成功,会占用大量CPU资源。比如在synchronized里面针对CAS自旋有做阈值设置,同时有自旋锁,和自适应自旋锁来优化。
  • 只能保证一个共享变量的原子操作:JDK5后提供AtomicReference类保证对象之间的原子性,把多个变量放入一个对象里进行CAS操作。

 AQS


AQS相关信息:

  1. 抽象队列同步器,用来构建锁和同步器的框架。ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue等皆是基于AQS的。
  2. 一个volatile修饰的int类型state变量,来判断是否可以获取资源,通过CAS修改保证原子性,实现加解锁,可重入。
  3. Node用来封装线程信息,包含线程信息、前驱后驱节点、头尾结点来构成双向链表
  4. 一个加锁线程标识,来记录当前加锁线程信息。
  5. 资源模式分为独占模式和共享模式,资源是独占时,一次只能一个线程获取,如ReentrantLock。资源是共享时,可以被多个线程获取,如Semaphore/CountDownLatch。

AQS加锁原理大致:

下面我们进行源码分析:以ReentrantLock为起点,分析源码

new ReentrantLock();
public ReentrantLock() {
    sync = new NonfairSync();
}
//默认构造函数,创建了一个Sync,NonfairSync,应该是专门用于加锁的组件
//NonfairSync是Sync的子类,非公平锁。
public void lock() {
    sync.lock();
}
//ReentrantLock加锁解锁时,基于底层的Sync进行加锁解锁。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
//Sync是抽象静态内部类,是AQS的子类。AbstractQueuedSynchronizer抽象队列同步器

AQS如何基于CAS实现高效加锁:

默认非公平锁加锁:
final void lock() {
    if (compareAndSetState(0, 1)) //公平锁少了这一步判断,直接执行的acquire(1),所有线程要先来后到。
    //非公平锁加锁,线程进来只要能够执行CAS成功,就能拿到锁。哪怕很多线程在队列中等待,
    //但是某个线程释放锁的一瞬间,可能有其他晚到的线程突然争抢到了锁。而非公平不行,需要进行排队等待操作。
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
compareAndSetState(0, 1):采用CAS,进行state变量的替换,如果state=0,就设置为1。
setExclusiveOwnerThread(Thread.currentThread()):
加锁成功,设置当前线程自己是加了一个独占锁的线程,标识自己是加锁线程

  这里总结一下问题:ReentrantLock为何默认是非公平锁,原因是非公平每次加锁都会直接CAS去加锁,失败后再去执行入队操作。而非公平锁不会直接CAS,而是要先判断前面是否有排队的线程,大概率会加锁失败。在入队后,公平锁会被挂起的,这就导致会带来更多线程上下文切换。

AQS利用state实现可重入:

  通过CAS进行state的修改,同一个线程重复加锁,会进行state的叠加,实现可重入

compareAndSetState(0, 1):重复加锁,cas会失败。就会走acquire(1);
//acquire方法逻辑
if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
//此时tryAcquire(arg)的实现在NonfairSync类中
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
//入参acquires为1
final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取state变量
    int c = getState();
    //如果为0,代表无线程持有锁
    if (c == 0) {
        //CAS进行0变1
        if (compareAndSetState(0, acquires)) {
            //加锁成功,进行加锁线程设置
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //不为0,代表有线程持有锁。判断加锁线程是否是当前线程
    else if (current == getExclusiveOwnerThread()) {
        //进行state的叠加
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //设置state,state这里就是同一个线程重复加锁的总次数   
        setState(nextc);
        //重入加锁成功
        return true;
    }
    //失败,表示加锁线程不是当前线程,其他线程仍持有锁,加锁失败
    return false;
}

 AQS的阻塞等待队列:

if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
//tryAcquire加锁失败,就会把当前线程入队等待。
//先看addWaiter(Node.EXCLUSIVE),EXCLUSIVE独占锁,同一时间只能有一个线程获取到这个锁。
private Node addWaiter(Node mode) {
    //将当前线程封装为Node,并标识为独占锁
    Node node = new Node(Thread.currentThread(), mode);
    ...
}
//我们来看下Node结构
volatile int waitStatus;//核心变量,标识线程在队列中的等待状态。
volatile Node prev;//指向当前Node的上一个Node
volatile Node next;//指向当前Node的下一个Node
volatile Thread thread;//Node里封装的线程
Node nextWaiter;//可以认为是下一个等待的线程,在独占锁下是null
//获取不到锁,处于等待的线程,会被封装为Node,并且有指向。多个阻塞等待状态的线程封装为一个Node双向链表。
从AbstractQueuedSynchronizer来看AQS的组成:Node-自定义数据结构,存储线程信息和组成双写链表即队列。state核心变量,加锁解锁基于这个变量完成。
  • Node SHARED = new Node():标记节点在共享模式下等待
  • Node EXCLUSIVE = null:标记节点在独占模式下等待
  • 节点状态:waitStatus 节点状态作用于对应的线程
  • 1:节点已被取消,当超时或中断会变更为此状态
  • -1:后继节点等待当前节点唤醒,后继节点入队会将前继节点状态更新为-1
  • -2:表示该结点(对应的线程)在等待,其他线程调用了Codition的signal方法后,该结点将从等待队列转移到同步队列,等待获取同步锁
  • -3:共享模式下,表示有资源可用,新头结点需要继续唤醒后继结点
  • 0:新节点入队时的默认状态
  • 负值表示节点处于有效等待状态,正值表示结点已经取消。

多个阻塞等待线程的Node形成一个双向链表:

核心流程图:

加锁失败的入队逻辑:

if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
addWaiter(Node.EXCLUSIVE), arg);//线程入队逻辑
//继续看一下添加Node的逻辑
private Node addWaiter(Node mode) {
    //封装Node,独占锁mode实际是null
    Node node = new Node(Thread.currentThread(), mode);
    //private transient volatile Node tail:标记Node链表中的尾部Node
    Node pred = tail;
    //如果尾部不为null,代表链表不是空的
    if (pred != null) {
        //设置新Node的上一个指针指向队尾Node
        node.prev = pred;
        //CAS:新Node作为链表的尾部Node
        if (compareAndSetTail(pred, node)) {
            //上一个Node的next指针指向新Node
            pred.next = node;
            return node;
        }
    }
    enq(node);
    //返回当前线程的Node
    return node;
}
//如果尾部是null,代表此时执行代码时,没有线程是在队列的
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //队尾指针是null
        if (t == null) { 
            //创建一个空Node CAS把head设置为空Node 此时队尾Node也是队头 即初始化Node队列
            //private transient volatile Node head; 标记Node链表中的头部Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //如果不是空的,可能是已经有其他的线程进行了入队 进行指针变换,并CAS把新Node作为链表的尾部Node
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued(addWaiter(当前线程的Node), 1)

final boolean acquireQueued(final Node node, int arg) {
    //是否成功竞争到锁
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取Node的上一个节点 没有获取到会抛NPE
            final Node p = node.predecessor();
            //如果上一个节点是head的Node,并尝试获取锁
            if (p == head && tryAcquire(arg)) {
                //如果获取成功,就加锁成功,标识当前Node为head,设置Node的上一个指针为null,并清除Node的线程信息
                setHead(node);
                //把当前Node下一个指针设置为null,完成当前Node移除出队列的操作。
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //上一个节点不是head,或加锁失败了。shouldParkAfterFailedAcquire:判断是否需要将当前线程挂起,阻塞等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//parkAndCheckInterrupt:利用LockSupport.park进行阻塞
                interrupted = true;
        }
    } finally {
        if (failed)
        //竞争锁失败,被中断取消竞争锁才会执行这个函数,取消同步队列同步队列中的等待结点
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取上一个节点的状态
    int ws = pred.waitStatus;
    //-1:后继节点等待当前节点唤醒 新的Node需要被挂起
    if (ws == Node.SIGNAL)
        return true;
    //大于0:上一个节点已被取消,当超时或中断会变更为此状态。
    //向前遍历寻找状态不大于0的结点,这个过程会回收这些已取消的结点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //把上一个的ws设置为-1 表示后继节点等待当前节点唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //新Node不需要被挂起
    return false;
}

公平锁加锁:设置new ReentrantLock(true),走公平锁FairSync

//公平锁lock方法,也是走acquire(ing arg):
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//其中tryAcquire方法的实现走的是公平锁的实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //资源未被占用,当前线程尝试竞争资源
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //锁资源未被释放,判断持有资源的线程是否是当前现在,是的话执行可重入逻辑
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //获取锁失败
    return false;
}
//公平锁核心:每次加锁都要判断,前面有没有排队等待的线程,如果没有就尝试加锁,如果有就不能尝试加锁
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
//方法返回false才可以去竞争锁。
//h != t:如果h!=t,说明有人在排队,不可参与竞争。h==t链表只存在一个节点才可以去竞争
//(s = h.next) == null :如果head的下一个节点是null,说明没有人在排队,可以竞争锁
//或者排在队头的节点不是当前线程,表示可以尝试加锁

tryLock如何实现加锁等待一段时间后放弃:

  在获取资源时,会获取等待时间。如果获取失败了,计算剩余时间,剩余时间小于0,就放弃加锁。剩余时间大于默认的时间就阻塞剩下的时间

 

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
//我们解析doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    //获取等待时间
    final long deadline = System.nanoTime() + nanosTimeout;
    //封装Node
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            //获取Node的上一个节点
            final Node p = node.predecessor();
            //如果是上一节的是头结点,且当前线程获取资源成功
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            //获取剩下还有多少时间
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
            //剩余时间小于0,代表阻塞时间已经过了,返回false,放弃加锁。
                return false;
            // 判断是否需要将当前线程挂起,阻塞等待 并且剩余时间是否大于默认时间。 满足就进行阻塞。
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可重入锁释放

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //释放资源结束后,如果头结点不为null,且头结点状态不是0,为0代表是新入队的Node
        if (h != null && h.waitStatus != 0)
        //唤醒队头的元素,让阻塞线程尝试抢占锁
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
    //减一次state
    int c = getState() - releases;
    //当前加锁线程不是要释放的线程 抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //当state=0,代表释放完成
        free = true;
        //设置当前加锁线程为null
        setExclusiveOwnerThread(null);
    }
    //设置state
    setState(c);
    return free;
}
//唤醒后继结点线程,此时后继节点是Node链表中的队头
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        //如果ws小于0 设置状态为0,代表新入队
        compareAndSetWaitStatus(node, ws, 0);
    //获取下一个节点    
    Node s = node.next;
    //下一个节点为null,或者节点状态大于0,没有下一个节点或者下一个节点以及被取消了
    if (s == null || s.waitStatus > 0) {
        s = null;
        //把队列里未被取消的节点进行前移
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //下一个节点不为null,就直接唤醒。
    if (s != null)
        LockSupport.unpark(s.thread);
}

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:Node,node,加锁,AQS,CAS,源码,线程,节点
From: https://www.cnblogs.com/zhangbLearn/p/16638531.html

相关文章

  • Mysql8.0修改lower_case_table_names参数导致重启失败
    GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。事件起因:在测试一个数据迁移工具时,源端orac......
  • 0039-Bytes-bytes源码阅读
    环境Time2022-05-28Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/bytes目标实现bytes.rs中的一部分方法。线程安全实现了两个线程安全......
  • 0040-Bytes-bytes源码阅读
    环境Time2022-05-29Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/byteshttps://zhuanlan.zhihu.com/p/109977513目标之前阅读的部分,都......
  • 0041-Bytes-bytes源码阅读
    环境Time2022-05-29Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/byteshttps://zhuanlan.zhihu.com/p/109977513目标之前阅读的部分,都......
  • 0035-Bytes-bytes源码阅读
    环境Time2022-05-28Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/bytes目标了解从静态生命周期的字节中创建bytes.rs,以及实现一部分方法。......
  • 0036-Bytes-bytes源码阅读
    环境Time2022-05-28Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/bytes目标实现bytes.rs中的一部分方法。Drop通过自定义的Vtable来......
  • 0037-Bytes-bytes源码阅读
    环境Time2022-05-28Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/bytes目标实现bytes.rs中的一部分方法。Deref通过实现Deref来实现......
  • 0038-Bytes-bytes源码阅读
    环境Time2022-05-28Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/bytes目标实现bytes.rs中的一部分方法。split_off在中间进行切割,分成......
  • 0034-Bytes-bytes源码阅读
    环境Time2022-05-27Rust1.61.0Bytes1.1.0前言说明参考:https://github.com/tokio-rs/bytes目标了解bytes.rs中Bytes的结构定义。lib.rs首先将bytes.rs......
  • 大家都能看得懂的源码 - 那些关于DOM的常见Hook封装(一)
    本文是深入浅出ahooks源码系列文章的第十四篇,该系列已整理成文档-地址。觉得还不错,给个 star 支持一下哈,Thanks。上一篇我们探讨了ahooks对DOM类Hooks使用规范,......