首页 > 其他分享 >AQS

AQS

时间:2023-06-19 17:11:39浏览次数:22  
标签:node Node head AQS tail 线程 节点

什么是AOS

AQS抽象队列同步器(AbstractQueuedSynchronizer)。是java juc里提供的一个实现锁同步机制的框架。它提供同步状态、阻塞和唤醒线程函数以及队列模型的原子管理。大多数同步类(Lock、Semaphore、ReentrantLock等)都是基于AQS实现的。AbstractQueuedSynchronizer是一个抽象类,实现了锁操作的大部分框架方法,还有一些具体实现留给子类实现。

实现原理

来看下AQS定义的几个主要属性

public abstract class AbstractQueuedSynchronizer{
    private volatile int state;
    private transient volatile Node head;
    private transient volatile Node tail;
    static final class Node {
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
  }
}

state用来标识当前是否锁空闲。0表示空闲,>0表示有线程占用锁。如果这个时候有新线程进入,则会进入等待队列。等待队列使用链表结构形式存储。AQS里记录链表的head指针和tail指针。Node节点是AQS一个内部类。除了链表结构需要的前驱(prev)和后继(next)属性外。还有一个thread用来记录当前线程,waitStatus用来记录当前线程的等待状态。
waitStatus的可选值有:

DEFAULT 0 默认值
SIGNAL -1 当前节点的后续节点被park阻塞了。当前节点释放锁后需要unpark唤醒其后继节点
CONDITION -2
CANCELLED 1 该节点不需要获取锁,已经取消,可以出队队列操作过程中会不断修改Node节点的这几个属性。

开始以ReentrantLock为例进行源码阅读。ReentrantLock里有两个实现类FairSync和NonfairSync继承自AQS对获取锁和释放锁方法进行了实现。类结构如下

class ReentrantLock implements Lock
{
    private final Sync sync;
    class Sync extends AbstractQueuedSynchronizer;
    class NonfairSync extends Sync;
    class FairSync extends Sync
}

公平锁与非公平锁

FairSync是公平锁,NonfairSync是非公平锁。ReentrantLock默认使用非公平锁,这里也从非公平锁开始看。
公平锁&非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

ReentrantLock使用lock()加锁,unlock()方法解锁。先从加锁开始。

加锁

加锁会调用lock()方法。然后lock调用Sync的lock()方法
调用NonfairSync.lock()方法

static final class NonfairSync extends Sync {
     /**
     * 
     * 2、先获取锁,就是调用compareAndSetState方法成功即标识拿到锁。
     * 否则调用acquire方法。acquire是AbstractQueuedSynchronizer类中实现方法
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    /**
    * AbstractQueuedSynchronizer的acquire方法if首先会执行该方法
    */
    protected final boolean tryAcquire(int acquires) {
        //nonfairTryAcquire方法在Sync中实现
        return nonfairTryAcquire(acquires);
    }
}

compareAndSetState()方法是AQS基类里的final方法,使用CAS方式来修改state的值。如果修改成功则获取锁成功,然后设置当前占用锁的线程为当前线程。(后面可重入使用)。如果设置state失败,则进入else调用acquire()方法再次尝试获取锁。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire()方法是AQS基类的方法,首先调用tryAcquire()方法,tryAcquire方法AQS中没有实现,是Sync里具体实现,非公平Sync会调用nonfairTryAcquire()方法。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//state是0表示当前没有线程暂用,再次CAS尝试获取锁
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    /**
     *没有获取成功,但是当前占用锁的线程和当前线程是同一个线程.
     * 那么锁信号量state累加赋值,实现了锁的可重入。
     * 多次加锁要对应相同次数的解锁
    */
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //还是没有获取到锁
    return false;
}

如果还是没有获取到锁,会继续执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。首先来看addWaiter方法。就是线程入队操作。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            //tail不为空,设置当前节点的前置节点为tail
            node.prev = pred;
            //设置整个AQS的tail节点为当前节点
            if (compareAndSetTail(pred, node)) {
                //设置前置节点的下一节点为当前节点
                pred.next = node;
                //到这里 tail 和node建立了双向链表。然后更新AQS的tail为当前node
                return node;
            }
        }
        //tail为空,进入初始化队列操作
        enq(node);
        return node;
    }
    /**
    enq方法和addWaiter方法主要逻辑差不多,只有tail为空,队列中没有元素的时候
    执行到enq进行必要的初始化
    */
     private Node enq(final Node node) {
        /**
         * 入队操作
         * 死循环
         * 第一次:tail节点是个null,进if 生成 head = tail = new Node() 空节点
         * 第二次:进else ,, Node() <-> node (tail) 第二次可能存在并发,如果compareAndSetTail失败会执行多次
         */
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //设置node的前置节点为第一次的空节点
                node.prev = t;
                //设置AQS的tail为当前节点
                if (compareAndSetTail(t, node)) {
                    //设置tail的下一节点为当前节点,完成双向连表设置
                    t.next = node;
                    return t;
                }
            }
        }
    }

这里第一次tail为空的时候进入到enq方法,enq方法第一次循环创建了一个空node节点然后,head和tail都指向该节点。第二次将队列tail和tail建立指向,入队。然后将整个队列的tail指向该节点。

如果有T3线程加入排队,当前节点的前线修改为tail,设置当前队列的tail指向当前node,设置原来tail的后续为当前节点,完成node3的入队

继续跟随方法调用栈,addWaiter返回node,然后执行acquireQueued(node, arg))方法。来看acquireQueued方法。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//自旋
                final Node p = node.predecessor();
                //前序节点是head节点,进行获取锁操作
                if (p == head && tryAcquire(arg)) {
                    //获取锁成功,当前节点出队
                    setHead(node);
                    //释放原head节点对当前节点的引用
                    p.next = null; // help GC
                    //修改标识为获取锁成功
                    failed = false;
                    //返回当前线程未被阻塞
                    return interrupted;
                }
                /**
                 * 获取锁失败,执行shouldParkAfterFailedAcquire判断是否当前线程需要进行park阻塞。防止不必要的空悬
                 * 返回true,则执行parkAndCheckInterrupt方法park当前线程,返回当前线程被终端
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果获取锁成功,取消正在进行的acquire操作,将当前节点status置为CANCELLED
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

acquireQueued方法这里一直死循环自旋操作。
拿到当前node的前序节点,判断前序节点是不是head节点
1、是head节点。再次调用tryAcquire尝试获取锁操作,如果获取锁成功,设置当前节点为head节点,解除引用关系。
当前节点的(thread、prev)置空,当前节点的前序节点的next置空

这样Node1节点相当于出队了,没有任何引用,可以进行垃圾回收
2、前序节点不是头节点或者获取锁失败。就要调用shouldParkAfterFailedAcquire方法判断当前线程是否需要park阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //前置节点waitStatus是SIGNAL 需要park当前线程
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {//前置节点是CANCELLED的
            /**
             * 循环向前查找,跳过所有CANCELLED的node,剔除队列(链表取消指向,当前节点指向找到第一个不是canceled的,然后将该节点的next指向当前节点)
             * 原理是  node1<-> node2(CANCELD)<-> node3(CANCELD)<->node(current)
             * 变成  node1<-> node(current)
             * 不挂起,进入下一次自旋
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //前置节点waitStatus是默认值或PROPAGATE,修改前序节点为SIGNAL。 进入下一次自旋,下一次再来判断前序就是SIGNAL状态,就需要park当前线程了
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

判断当前节点需不需要park的条件是其前序节点waitStatus是SIGNAL状态。
前序节点状态
case1、SIGNAL 当前节点需要park
case2、CANCELLED
如果其前序节点是CANCELLED取消状态,则继续递归的往前找前序的前序节点,直到找到不是取消状态的节点作为当前节点的前序进行连线。然后中间取消的节点相当于直接移除队列。这里过滤掉取消状态的线程节点后也不挂起,进入下一次自旋判断。
case3、其它状态
可能为初始状态,修改前序节点状态为SIGNAL,下次自旋时候判断就需要park当前线程了。

这里以上面队列中的T3为例,首先其prex节点是Node2对应T2不是head节点。就会进入是否需要挂起当前线程的判断方法。然后其prex节点Node2的waitStatus状态是初始值0,上面的第3种情况。会设置其prex的等待状态值为SIGNAL(-1)。第一次自旋结束。线程没有被挂起。继续回到acquireQueued方法开始第二次自旋。这个时候其prex节点Node2还不是head节点,但是其waitStatus状态已经由上次改为SIGNAL了。所以这一次自旋T3线程就需要被park阻塞挂起了。parkAndCheckInterrupt()方法会被执行。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

当前节点的自旋会修改其prex节点的waitStatus为SIGNAL。当前节点需不需要被park又依赖其prex节点的WaitStatus是不是SIGNAL。所以队列中的线程基本上经过两次自旋后都会被park挂起。然后等待被其prex线程唤醒。减少不必要的CPU资源消耗。
当T2节点拿到锁时自己会变成head节点

解锁

解锁要从unlock()方法开始。unlock会调用AOS的release方法,release首先尝试调用Sync实现的tryRelease方法。

//AQS
public final boolean release(int arg) {
    if (tryRelease(arg)) {//释放锁
        Node h = head;
        //判断head节点不为空且head的waitStatus不为初始状态0
        //为什么要判断head的waitStatus?
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒head线程的后继节点
        return true;
    }
    return false;
}
//唤醒线程,node参数是head
private void unparkSuccessor(Node node) {
    //判断head的waitStatus是否小于0,否则修改为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     *找到当前head节点的后继节点,判断不是null且waitStatus不是CANCELED,否则一直找
     * 注意这里for循环是从tail往前找,一直找到最前一个符合条件的
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒当前节点对应线程
}

//Sync
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //判断拿锁线程时当前线程
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//本次锁释放后,state回位0,所有锁都已释放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

锁的释放要做两件基本事情。
1、释放锁资源(修改state状态)
释放AQS的state为拿锁时锁定的acquire量,如果state变为0,则表示当前锁空闲,可以唤醒下一个等待线程。
2、唤醒其next等待线程。
唤醒线程从队列的head开始出队,

唤醒head的后继为什么要判断head的waitStatus不等于0?
因为前面等待入队的时候当前线程自选需要park阻塞的前提时其prex节点waitStatus是SIGNAL(-1)。如果head节点waitStatus状态是0初始值,则其next对应线程还未进行park挂起操作。这个时候对其unpark会报错。
为什么找符合条件的node从tail开始往前找?
因为前面入队addWaiter时候双向链表建立prex,next关系时候不是原子操作。先设置的prev后设置的next,所以这里也从后往前找,先拿node.prev。

if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
    }
}

总结

获取锁:多线程会竞争获取AQS中state资源。获取到的线程拿到锁。获取不到的进入等待队列。等待队列是双向链表格式,初始head节点是一个空Node对象,每个Node存有当前线程引用,还保有一个waitStatus状态值,用来标识当前线程状态。放入等待队列的线程会不断自旋去获取锁,可以调用aquire方法获取锁的条件是当前线程的前驱节点是head节点,因此公平锁一般还是先进先出。但是一般两次获取不到自己就会被挂起。是否需要挂起判断依据是当前节点的前驱waitStatus是不是SIGNAL状态。如果获取锁成功,当前队列head节点会指向该节点,然后解除其与原prex节点也就是原head节点的链接(原head节点出队),原head节点没有任何引用,所有引用属性都会值空,可以被回收。
释放锁:释放锁做两件事情,第一归还acquire的state数量。从队列head开始找下一个需要被唤醒的等待线程。释放完锁后当前线程虽然解锁但是还未完成出队,线程的出队在其next拿到锁后逻辑处理。
一个线程可以多次获取锁,对state累加每次的acquire量。实现锁的重入。多次加锁要对应相同次数解锁。
等待线程队列的维护基本上在AQS基类都已实现。大部分方法是final或private,一般继承AQS重写tryAcquire和tryRelease方法定义state修改的acquire量即可。

锁的实现类型

ReentrantLock
公平锁实现:新来线程要先判断当前队列是否有等待队列,如果有当前线程只能入队等待,不能尝试获取锁(修改state)。
非公平锁:新来的线程不管当前队列是否有数据,上来就先搞一次获取锁操作,获取不到再入队等待。
释放锁都是同样的实现,减去锁住的state量。
CountDownLatch
共享锁:CountDownLatch latch = new CountDownLatch(5);指定state数量,然后每个线程执行latch.countDown()释放一次锁state-1。主线程latch.await()会自旋加入等待队列被唤醒。当countDown()多次state置为0时,执行唤醒主线程方法。
Semaphore
共享锁:初始指定固定数量的共享资源,每个线程来到后先获取共享资源,成功后继续执行,否则进入等待队列。共享资源数为0的时候新来的线程获取通行证就会失败,必须等待其它有锁线程释放公共资源后才能再获取同行证。达到并发数控制的目的。

QA:

1、非公平锁,如果新来的线程抢占了刚被队列中unpark的等待线程的锁。当前线程运行结束,怎么唤醒其他线程,当前线程未入队。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放的时候都是从head开始找的,这个不会有问题,下次还是能找到他。
2、什么时候等待线程变成CANCELLED状态?

标签:node,Node,head,AQS,tail,线程,节点
From: https://www.cnblogs.com/bird2048/p/17491596.html

相关文章

  • 19.详解AQS家族的成员:CountDownLatch
    关注王有志,一个分享硬核Java技术的互金摸鱼侠欢迎你加入Java人的提桶跑路群:共同富裕的Java人今天我们来聊一聊AQS家族中的另一个重要成员CountDownLatch。关于CountDownLatch的面试题并不多,除了问“是什么”和“如何实现的“外,CountDownLatch还会和CyclicBarrier进行对比:什......
  • AQS的实现原理
    AQS的实现原理原文:https://www.cnblogs.com/sglx/p/15190246.html一、简介AQS全称为AbstractQueuedSynchronizer,它提供了一个FIFO(FirstinFirstout先入先出)队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等......
  • 18.详解AQS家族的成员:Semaphore
    关注:王有志,一个分享硬核Java技术的互金摸鱼侠。欢迎你加入Java人的提桶跑路群:共同富裕的Java人今天我们来聊一聊AQS家族中另一个重要成员Semaphore,我只收集到了一道关于Semaphore的面试题,问了问“是什么”和“如何实现的”:什么是Semaphore?它是如何实现的?按照我们的惯例,依......
  • AQS源码详解
    AQS源码详解可重入锁:同一个线程可重复获取同一把锁对象locksupport:用来创建锁和其他同步类的基本线程阻塞原语park()和unpark()为什么会引出locksupport?像传统的synchorized和lock,他们的wait()和notify()方法,await()和singal()方法使用不方便,必须在同步代码块或者锁内使用,并......
  • 从ReentrantLock角度解剖AQS----未完成,持续更新中。。
    具体如何流转,何时进入队列,状态何时变化,何时被唤醒等流程待更新中。。。AQS重要属性publicabstractclassAbstractQueuedSynchronizerextendsAbstractOwnableSynchronizerimplementsjava.io.Serializable{ //内部类node staticfinalclassNode{ //等待状......
  • JUC:AQS
    AQS是JUC的基石,提供了数据结构和底层实现方法,比如获取锁的方式由子类实现完成出入队、唤醒线程由功能。这里只分析AQS已经实现了的功能逻辑,如果要分析完成的功能需要配合具体的子类比如ReentrantLock核心思想如果共享资源空闲,当前线程就工作,并锁住资源。如果共享资源被占......
  • AQS源码解读----AbstractQueuedSynchronizer
    36packagecn.com.pep;37importjava.util.concurrent.TimeUnit;38importjava.util.concurrent.locks.AbstractOwnableSynchronizer;39importjava.util.concurrent.locks.Condition;40importjava.util.concurrent.locks.LockSupport;41importjava.......
  • 什么是AQS
    AQS(AbstractQueuedSynchronizer),即队列同步器,它是构建锁或者其他同步组件的基础框架,如ReentrantLock、ReentrantReadWriteLock、Semaphore,CountDownLatch等。AQS是一个抽象类,主要是通过继承方式使用,本身没有实现任何接口,仅仅是定义了同步状态的获取和释放的方法。AQS解决了了之类......
  • AQS源码分析
    AQS源码分析--哔哩哔哩(通俗易懂)AQS实现原理看这一篇就够了 ......
  • AQS
    AQS队列同步器java.util.concurrent.locks.AbstractQueuedSynchronizer是一个同步器+阻塞锁的基本架构,用于控制加锁和释放锁,并在内部维护一个FIFO的线程等待队列,juc包下的锁,屏障等同步器多数是基于它实现的.AQS每当有新的线程请求资源时,该线程就会进入一个等待队列(Waiter......