首页 > 编程语言 >AQS源码解析

AQS源码解析

时间:2024-01-13 17:45:14浏览次数:32  
标签:Node node AQS int final 源码 线程 解析 节点

AQS 结构特性

  • 内部包含 NodeConditionObject 静态内部类,Node 用来存储没竞争到锁的线程状态、CondidtionObject 是对条件变量的封装;
  • volatile int state 变量记录锁的状态,1 表示锁被持有、0 表示锁被释放,同时对应三个方法来更改/获取锁的状态:getState()setState(int newState)compareAndSetState(int expect, int update)
  • AQS 内部维护了一个双向链表实现的线程等待队列,称为 CLH 队列;
  • AQS 支持两种模式的资源访问(独占/共享):独占模式是指同一时间只允许一个线程访问资源,例如 ReentrantLock,共享模式是指可以多线程访问资源 CountDownLatchSemaphore

AQS 中 CLH 队列结构如下:

  • 比如说此时恰好有个线程A 持有资源,持有资源的线程一定位于 Head 节点;
  • 此时另一个线程B 想要获取锁资源,但是获取锁失败,将B线程封装为 Node节点存到队列尾;
  • 线程B 被挂起,并通知线程A(将线程A 的 waitStatus 状态设置为 SIGNAL),在 A释放资源时通知其它线程;
  • 线程A 释放资源,将自身 Node 设置为 null 方便 GC回收,然后通知线程 B;
  • 线程B 尝试获取锁资源(公平锁大概率成功获取,非公平锁不一定)。

此外,AQS 还预留了一些接口给子类,由子类实现锁的释放和获取:

//尝试获取排他锁
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
//尝试释放排他锁
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
//尝试获取共享锁
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
//尝试释放共享锁
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
//判定当前线程获得的资源是否是排他资源
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

1、Node 静态内部常量类

static final class Node {
        /** 节点处于共享模式标记 */
        static final Node SHARED = new Node();
        /** 节点处于独占模式标记 */
        static final Node EXCLUSIVE = null;

        /** 当前节点作废,需要移除队列 */
        static final int CANCELLED =  1;
        /** 待唤醒后继节点,当前节点释放资源后一定会唤醒后继节点,因此后继节点可以安心挂起 */
        static final int SIGNAL    = -1;
        /** 当前节点线程获取了资源,但是执行过程中又主动放弃了资源,被移入 Condition 队列中(注意不是等待队列) */
        static final int CONDITION = -2;
        /**
         * Head 节点才持有,标记着连续唤醒队列中处于共享模式的节点,让他们并发获取共享资源
         */
        static final int PROPAGATE = -3;
        /** 当前节点的等待状态,标记为上述的几个值 */
        volatile int waitStatus;
        /** 前驱节点 */
        volatile Node prev;
        /** 后继节点 */
        volatile Node next;
        /** 当前节点绑定的线程 */
        volatile Thread thread;
        /** 等待条件的下一个节点 */
        Node nextWaiter;
}

Node 结构如上图所示,Node 就是绑定某一个线程,并存储该线程相关信息的结构。

2、ConditionObject 条件变量类


3、核心排他锁加锁方法

这里以 ReentrantLock.lock() 方法切入,我们最终会发现调用的还是 AQS 类的 acquire() 方法:

    // Reentrant.lock()
    public void lock() {
        sync.lock();
    }

    // FairSync.lock()
    final void lock() {
        acquire(1);
    }

    // AQS.acquire()
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire() 方法的逻辑如下:

  • 调用由子类实现的 tryAcquire() 方法获取锁,如果成功就结束,否则失败了返回 false;
  • 如果失败了继续执行以下步骤:
    • 调用 addWaiter(Node) 方法封装当前线程对应的 Node 结构,然后通过 CAS 操作添加到队尾并修改队尾节点;
    • 调用 acquireQueued() 方法,先找到当前节点的前驱节点 PrevNode:
      • 如果 PrevNode 是队列头节点,就再执行一次 tryAcquire() 方法获取锁,因为可能前驱节点已经释放了锁资源,相当于一次重试。如果成功获取锁,就将当前节点设置为头节点并将前驱节点置空帮助 GC 回收;
      • 如果 PrevNode 不是头节点,就执行 shouldParkAfterFailedAcquire() 方法判断是否将当前节点对应的线程挂起,如果需要挂起就调用 LockSupport.park() 方法挂起线程。如何判断是否需要挂起?根据前驱节点的 waitStatus 标志,如果是 SIGNAL 就挂起,否则设置前驱节点为 SIGNAL 并等待下一次自选时挂起。
  • 调用 selfInterrupt() 方法,只有在尝试加锁失败且 acquireQueued() 方法标识为 true 时才执行。

3.1 tryAcquire() 方法

这里以 ReentrantLock.FairSync.tryAcquire() 来分析:

// ReentranLock.Sync 内部类继承自 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {.....}

// FairSync 继承自 Sync
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    // 加锁操作
    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获取锁状态:0——未加锁;1——已有线程加锁
        int c = getState();
        if (c == 0) {
            // hasQueuedPredecessors():判断等待队列中是否有线程在排队,已有返回 true,否则 false
            // 如果没有线程排队就尝试 CAS 加锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 加锁成功,设置为当前独占线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 否则已经有线程获取了锁,判断是不是当前线程自身,如果是就是重入加锁,累加 state 变量
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

3.2 addWaiter() 方法

private Node addWaiter(Node mode) {
    // 构造当前线程对应的节点
    Node node = new Node(Thread.currentThread(), mode);
    // 判断队列尾是否为空,不为空 CAS 插入
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 否则队列未初始化,执行队列初始化然后插入节点
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter() 方法最终返回成功加入队列尾的节点。

3.3 acquireQueued() 方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 注意这里会一直自旋
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点为头节点,当前线程尝试重新获取锁
            if (p == head && tryAcquire(arg)) {
                // 成功后 CAS 设置头节点,并协助 GC
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 否则挂起当前线程,等待下一次自旋
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        // 前驱节点状态为 SIGNAL,安心挂起
        return true;
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        // 前驱节点状态为 CANCELED,循环删除直到找到找到节点状态正常的前驱节点为止
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        // 否则设置前驱节点状态为 SIGNAL,那么下一次循环就会挂起当前线程
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    // 挂起当前线程
    LockSupport.park(this);

    // Thread.interrupt() 检测当前线程是否可以被中断(检查中断标志),并返回一个 boolean 并清除中断状态,第二次调用时中断状态已被清除,将返回一个 false
    // interrupt() 并不中断线程,结果可能为 true/false
    return Thread.interrupted();
}

3.4 加排他锁方法总结

总结下 AQS 添加排他锁的逻辑:

  • 首先判断 CLH 队列中是否有等待资源的线程,如果没有直接 CAS 加锁,具体来说就是将当前节点设置为头节点;否则判断持有锁的线程和当前线程是不是为同一个线程,若为同一个,即重入操作,那么修改锁信息(+1)即可;
  • 反之当前线程可能需要等待,这取决于它的前驱节点的状态:
    • 若前驱节点为头节点,那么再重复尝试一次,这样能保证资源最大限度地利用。如果成功获取锁,就将头节点置空方便垃圾回收,并且当前线程设置为头节点;
    • 否则当前线程需要插入到队尾等待,但是此时线程是可能分配到CPU时间片的,所以我们还需要释放线程占有的系统资源,但是释放资源有个前提,就是前驱节点释放锁时能够唤醒我。所以自选检查前驱节点状态,如果不是 SIGNAL 就需要更新为 SIGNALSIGNAL 保证了前驱节点释放锁时一定会唤醒后继节点。这里检查的同时还清理了队列中需要丢弃的节点(CANCELLED)。

4、核心排他锁解锁方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

释放锁会首先尝试释放锁,如果释放成功,就唤醒后继第一个状态正常(非CANCELLED)的线程。

4.1 tryRelease() 方法

参考 ReentrantLock.Sync.tryRelease()

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果当前释放锁的线程不是独占锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        // 设置独占锁线程为空
        setExclusiveOwnerThread(null);
    }
    // 更新锁状态
    setState(c);
    return free;
}

4.2 unparkSuccessor() 方法

唤醒后继节点的方法:

private void unparkSuccessor(Node node) {
    /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
    // 将当前头节点的状态更新为 0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
    // 获取后继节点,如果后继节点为空或者状态为 CANCELLED,反而从队列尾反向查找第一个状态正常的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
    // 找到后唤醒线程,线程会进入竞争状态
        LockSupport.unpark(s.thread);
}

线程在被成功唤醒后会进入资源竞争状态,此时就对应了前面加锁时的 acquireQueued() 方法,方法中包含一个死循环,循环会一直重复调用 tryAcquire() 方法,这里举个例子方便理解:

  • 假设在 t1 时刻,线程A 获取了锁资源,线程B 也尝试获取锁,但是被线程A 占用,所以线程B被搞到了等待队列中(此时线程B 的前驱节点就是头节点也即线程A),线程B 会在acquireQueued的for(;;)中 不断自旋!
  • 如果 t2 时刻线程A 释放了锁资源,那么 unparkSuccessor() 方法会唤醒线程B 节点;
  • 接着在 t3 时刻,线程B 自旋到 if(p==head && tryAcquire(arg)) 方法时,将线程B 设置为头节点,此时 B持有锁资源;
  • 问题是如果B 的前驱节点不是头节点,参考 shouldParkAfterFailedAcquire() 方法,程序会循环向前遍历,将所有状态为 CANCELLED 的前驱节点剔除掉,这样队列中的所有节点都有很大概率是有效状态的节点。

4.3 释放锁总结

线程释放锁资源用到了一个特别的策略,在寻找第一个有效后继节点时(状态非 CANCELLED),如果第一个节点无效(null 或者 CANCELLED),就转而从队列尾开始向前查找。

相应的,队列中等待获取锁的线程会执行自旋操作,自旋操作过程中会不断清理队列中状态为 CANCELLED 的线程节点。

5、核心共享锁加锁方法

Semaphore 为例讲解 AQS 共享锁加锁过程:

// Semaphore.acquire() 加锁
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// Sync 继承 AQS类,调用 AQS类方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 若没有多余的剩余资源
    if (tryAcquireShared(arg) < 0)
        // 进入等待队列排队
        doAcquireSharedInterruptibly(arg);
}

// tryAcquireShared() 方法由 Semaphore 实现
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

5.1 tryAcquireShared() 函数

加锁后返回剩余可用的锁资源数:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    // 共享锁加锁,返回加锁完毕后剩余的可用资源数
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}


final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 计算剩余资源是否可用
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

5.2 doAcquireSharedInterruptibly() 函数

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 当前线程节点添加到队列尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点是 Head,重新尝试加锁
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 当前节点成功获取锁,向下传播,判断后续节点能否获取锁资源
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 否则调整前驱节点,挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

5.3 setHeadAndPropagate() 函数

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
        * Try to signal next queued node if:
        *   Propagation was indicated by caller,
        *     or was recorded (as h.waitStatus either before
        *     or after setHead) by a previous operation
        *     (note: this uses sign-check of waitStatus because
        *      PROPAGATE status may transition to SIGNAL.)
        * and
        *   The next node is waiting in shared mode,
        *     or we don't know, because it appears null
        *
        * The conservatism in both of these checks may cause
        * unnecessary wake-ups, but only when there are multiple
        * racing acquires/releases, so most need signals now or soon
        * anyway.
        */
    // 如果还有剩余锁资源或者头节点为空或头节点状态无效,尝试唤醒共享模式的后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}


private void doReleaseShared() {
    /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点状态为 SIGNAL,说明头节点释放后一定会唤醒后继节点
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 只要头节点CAS成功改变状态为0,就一定会唤醒后继节点
                unparkSuccessor(h);
            }
            // 若头节点是初始状态,就更改状态为 PROPAGATE,以便后续传播
            else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

什么情况下节点的 waitStatus=0

  • 节点进入队列后,如果要挂起一定会先将前驱节点状态调整为 SIGNAL,若此时前驱节点还未调整完,后继节点的状态为 0;
  • 在上面 doReleaseShared() 中,如果后继节点被唤醒,前驱节点需要先通过 CAS 将状态转换为 0;

唤醒逻辑梳理下如下:

  • 判断头节点的状态,如果是 SIGNAL 说明需要唤醒后继节点;
  • 如果头节点状态为 0,则告知头节点,在释放后不仅需要唤醒后继节点,还需要不断传播下去唤醒更多共享模式的节点。

标签:Node,node,AQS,int,final,源码,线程,解析,节点
From: https://www.cnblogs.com/istitches/p/17962659

相关文章

  • 服务器安全性漏洞和常见攻击方式解析
    服务器安全性是当今互联网信息安全的重要组成部分。在网络安全领域中,常见的威胁之-就是服务器安全性漏洞。本文将深入探讨服务器安全性漏洞的本质,并分析常见的攻击方式并提供一些建议以加强服务器的安全性。一、服务器安全性漏洞的本质服务器安全性漏洞指的是服务器系统中存在的缺......
  • 从零开始的源码搭建:详解连锁餐饮行业中的点餐小程序开发
    时下,点餐小程序成为了许多餐饮企业引入的一种创新工具,不仅方便了顾客的用餐体验,同时也提高了餐厅的运营效率。本文将详细探讨如何从零开始搭建一个源码,并深入解析连锁餐饮行业中的点餐小程序开发过程。 一、需求分析与规划在开始源码搭建之前,首先需要明确点餐小程序的具体需求。这......
  • 互联网医院系统|北京线上问诊|线上问诊系统功能解析
    随着科技的不断发展,线上问诊系统作为一种快速、便捷的医疗服务方式在近年来越来越受欢迎。本文将重点介绍线上问诊系统的开发功能及其优势,帮助读者更好地了解这一医疗服务方式的价值和好处。 一、线上问诊系统的开发功能:1、患者注册与登录:患者可以通过线上问诊系统进行注册和登录,......
  • 源码开发实战:连锁餐饮数字化转型中的点餐小程序
    如今,商家通过引入点餐小程序,不仅可以提高服务速度,还能够增加用户粘性,实现数字化运营的目标。为了实现这一愿景,源码开发成为一种高效的手段。 一、技术选型在开发点餐小程序时,选择合适的技术是关键一环,结合小程序开发框架,实现了前后端分离,提高了开发效率。此外,数据库采用了高性能的......
  • Android 14 新特性代码 UUID.fromString & Matcher.matches 的细节改动(扒源码)
    文章目录前言UUID处理的更改正则表达式的更改结束前言Android14已经出来好久好久了…今天其他的暂且不论,单纯的讲一下OpenJDK17更新的两点变更(扒源代码)~对正则表达式的更改UUID处理首先,正则表达式的更改:现在,为了更严格地遵循OpenJDK的语义,不允许无效的组引用。您可能会......
  • github 修改hosts DNS解析
    第一步打开在线dns解析github.comhttp://www.ip33.com/dns.html修改hostswindow下首先找到域名文件,一般在C:\Windows\System32\drivers\etc下,然后打开hosts文件,在里面增加一条域名解析记录,修改成功后刷新DNS解析,一般使用ipconfig/flushdns,在命令台输入便可。刷新成功后可......
  • Python逆向爬虫入门教程: 酷狗音乐加密参数signature逆向解析
    数据来源分析......
  • Chrome 浏览器插件 V3 版本 Manifest.json 文件中 Action 的类型(Types)、方法(Methods)和
    一、类型(Types)一、OpenPopupOptions1.属性windowId:number可选打开操作弹出式窗口的窗口ID。如果未指定,则默认为当前活动窗口。二、TabDetails1.属性tabId:number可选要查询其状态的标签页ID。如果未指定标签页,则返回非标签页专属状态。三、UserSettin......
  • 基于SpringBoot+Vue的居家养老系统设计实现(源码+lw+部署文档+讲解等)
    (文章目录)前言:heartpulse:博主介绍:✌全网粉丝10W+,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌:heartpulse:......
  • 解析Navicate密码
    <?phpclassNavicatPassword{protected$version=0;protected$aesKey='libcckeylibcckey';protected$aesIv='libccivlibcciv';protected$blowString='3DC5CA39';protected$blowKey=null;pr......