首页 > 编程语言 >并发编程系列-AQS

并发编程系列-AQS

时间:2023-09-21 11:55:27浏览次数:39  
标签:Node return AQS 编程 public 并发 线程 final

AbstractQueuedSynchronizer(AQS)是一个抽象队列同步器,它用于构建依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。该类的目的在于提供基本功能的封装,适用于大多数需要使用单个原子int值表示同步状态的同步器。举例来说,ReentrantLock、Semaphore以及FutureTask等都是基于AQS实现的,我们也可以通过继承AQS来实现自定义的同步器。

核心思想

网络上常见的解释是:

如果所请求的共享资源是空闲的,那么当前请求资源的线程将被设置为有效的工作线程,并且共享资源将被锁定。然而,如果所请求的共享资源已被占用,则需要一套机制来阻塞等待线程并在适当时分配锁。AQS采用了CLH队列锁的实现方式来处理这种情况,即将无法立即获取到锁的线程添加到队列中进行排队等待。

我理解,可以把AQS视为一种锁,并将其用于管理共享资源的状态和线程请求。当有线程请求资源时,AQS会记录该线程作为当前工作线程,并将自身设置为锁定状态。如果其他线程请求AQS,则它们将被记录到等待队列中,无法获取到锁的线程进入阻塞等待状态。

为什么需要 AQS

在深入研究AQS之前,我们可能会问为什么需要AQS? 其中synchronized关键字和CAS原子类已经提供了丰富的同步方案。

然而,在实际需求中,我们对同步的需求各不相同。例如,如果我们需要对锁设置超时时间,单独依靠synchronized关键字或CAS是无法实现的,需要进行二次封装。而JDK中提供了丰富的同步方案,比如使用基于AQS实现的ReentrantLock。

用法

要将AQS用作同步器的基础,请根据具体使用情况重新定义以下方法,可以使用getState、setState和/或compareAndSetState来检查和/或修改同步状态:

  • tryAcquire

  • tryRelease

  • tryAcquireShared

  • tryReleaseShared

  • isHeldExclusively

这些方法的默认实现会抛出UnsupportedOperationException。这些方法的实现必须是线程安全的,并且通常应该是短暂而不是阻塞的。定义这些方法是使用此类的唯一受支持的方式。所有其他方法都被声明为最终方法,因为它们不应该被修改。

你可能还会发现从AbstractOwnableSynchronizer继承的方法对于跟踪拥有独占同步器的线程非常有用。我们鼓励您使用这些方法,这样监控和诊断工具可以帮助用户确定哪些线程持有锁。

尽管这个类基于内部FIFO队列,但它并不自动执行FIFO调度策略。独占同步的核心形式为:

   Acquire:
       while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

AQS是一种底层技术,用于实现锁机制。它提供了一种通过队列的方式,将争用锁的线程进行排队等待的机制。与此类似的是,共享模式也是通过类似的机制来实现。然而,共享模式可能涉及到级联信号的问题。

在获取操作之前,新获取的线程有可能会在其他被阻塞和排队的线程之前获得锁资源。如果需要的话,可以通过定义tryAcquire和/或tryAcquireShared方法来禁止插队,从而提供公平的FIFO获取顺序。具体来说,大多数公平同步器可以在tryAcquire返回false时,通过调用hasQueuedPredecessors方法返回true,以保证公平性。还有其他修改的可能性。

默认的插入策略通常是贪婪、放弃和避免护送策略,这种策略可以提供最高的吞吐量和可扩展性。尽管这不能保证公平性或无饥饿,但它允许较早排队的线程在较晚排队的线程之前重新竞争,并且每次重新竞争都有公平机会与传入线程成功竞争。此外,虽然获取操作不是传统意义上的自旋,但它们可能会在阻塞之前多次调用tryAcquire,并在其中插入其他计算。这样做可以提供自旋带来的许多好处,而不会增加太多的负担。如果需要的话,可以通过预先调用具有"快速路径"检查的方法来增加这一点,例如预先检查hasContended和/或hasQueuedThreads,以便仅在同步器可能不会争用的情况下执行自旋操作。

AQS提供了高效且可扩展的同步基础,适用于依赖于整数状态、获取和释放参数以及内部FIFO等待队列的同步器。如果这还不够,可以使用原子类、自定义java.util.Queue类和LockSupport阻塞支持从较低级别构建同步器。

用法示例

这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。它还支持条件并公开一些检测方法:

class Mutex implements Lock, java.io.Serializable {

   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Reports whether in locked state
     public boolean isLocked() {
       return getState() != 0;
     }

     public boolean isHeldExclusively() {
       // a data race, but safe due to out-of-thin-air guarantees
       return getExclusiveOwnerThread() == Thread.currentThread();
     }

     // Provides a Condition
     public Condition newCondition() {
       return new ConditionObject();
     }

     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()              { sync.acquire(1); }
   public boolean tryLock()        { return sync.tryAcquire(1); }
   public void unlock()            { sync.release(1); }
   public Condition newCondition() { return sync.newCondition(); }
   public boolean isLocked()       { return sync.isLocked(); }
   public boolean isHeldByCurrentThread() {
     return sync.isHeldExclusively();
   }
   public boolean hasQueuedThreads() {
     return sync.hasQueuedThreads();
   }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。因为锁存器是非独占的,所以它使用共享的获取和释放方法。

 class BooleanLatch {

   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }

     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }

     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }

   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }

AQS 底层原理

父类 AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;
  // 设置当前持有锁的线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。

CLH 队列

AQS(AbstractQueuedSynchronizer)的等待队列是CLH(Craig, Landin, and Hagersten)锁定队列的一种变体。CLH锁通常用于自旋锁。AQS通过将每个请求共享资源的线程封装为一个CLH节点来实现。这个节点的定义如下:

   /** CLH Nodes */
    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others

        // methods for atomic operations
        final boolean casPrev(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值
        }
        final boolean casNext(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, NEXT, c, v);
        }
        final int getAndUnsetStatus(int v) {     // for signalling
            return U.getAndBitwiseAndInt(this, STATUS, ~v);
        }
        final void setPrevRelaxed(Node p) {      // for off-queue assignment
            U.putReference(this, PREV, p);
        }
        final void setStatusRelaxed(int s) {     // for off-queue assignment
            U.putInt(this, STATUS, s);
        }
        final void clearStatus() {               // for reducing unneeded signals
            U.putIntOpaque(this, STATUS, 0);
        }
        private static final long STATUS = U.objectFieldOffset(Node.class, "status");
        private static final long NEXT = U.objectFieldOffset(Node.class, "next");
        private static final long PREV = U.objectFieldOffset(Node.class, "prev");
    }

CLH节点的数据结构是一个双向链表的节点,其中每个操作都经过CAS(Compare and Swap)来确保线程安全。要将其加入CLH锁队列,您可以将其自动拼接为新的尾节点;要出队,则需要设置头节点,以便下一个满足条件的等待节点成为新的头节点:

  +------+  prev +-------+  prev +------+
 |      | <---- |       | <---- |      |
 | head | next  | first | next  | tail |
 |      | ----> |       | ----> |      |
 +------+       +-------+       +------+

Node 中的 status 字段表示当前节点代表的线程的状态。status 存在三种状态:

     static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative 
    static final int COND      = 2;          // in a condition wait
  • WAITING:表示等待状态,值为 1。

  • CANCELLED:表示当前线程被取消,为 0x80000000。

  • COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。

在上面的 COND 中,提到了一个条件等待队列的概念。首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:

  • ExclusiveNode

  • SharedNode

  • ConditionNode

前两者都是空实现:

     static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

而最后的 ConditionNode 多了些内容:

   static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter; 
        // 检查线程是否中断或当前线程的状态已取消等待。
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }

        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }

ConditionNode 拓展了两个方法:

  • 检查线程状态是否处于等待。

  • 阻塞当前线程:当前线程正在等待执行,通过 LockSupport.park() 阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。

而到这一步,所有的信息都指向了一个相关的类 Condition 。

总结

AQS是锁机制实现的底层技术,它通过队列的方式将争用锁的线程进行排队等待。与此不同的是,Condition代替了Object中的同步方法能力。

当Condition进入等待状态时,它会在等待队列的队尾插入新的节点,并且通过release方法释放锁资源。在释放锁资源后,它会通过acquire开启一个自旋尝试重新获取锁资源。当自旋一定时间后,如果未成功获取到锁资源,就会通过LockSupport的API进入阻塞状态。

对于依赖单个原子int值来表示同步状态的解读,Node通过status属性来控制与之关联的线程的等待状态,而AQS的state用于控制同步状态。

顶尖架构师栈

关注回复关键字

【C01】超10G后端学习面试资源

【IDEA】最新IDEA激活工具和码及教程

【JetBrains软件名】 最新软件激活工具和码及教程

工具&码&教程

转载于:https://mp.weixin.qq.com/s/rM7eJ48_FuYzquJgSyrpjA

标签:Node,return,AQS,编程,public,并发,线程,final
From: https://www.cnblogs.com/dc-s/p/17719588.html

相关文章

  • Win32编程之函数转发注入DLL(十五)
    一、创建目标DLL文件DLL名称:targetdll.dll头文件(targetdll.h):#pragmaonce__declspec(dllexport)void__stdcallhello();__declspec(dllexport)int__stdcalladd(inta,intb);源文件(targetdll.cpp)#include<stdio.h>#include"targetdll.h"void_......
  • HNU 结对编程 对队友代码的分析 中小学数学卷子自动生成程序
    基本功能实现一、主要内容认真学习和阅读同伴的代码,分析优劣。二、题目要求个人项目:中小学数学卷子自动生成程序用户:小学、初中和高中数学老师。功能:1、命令行输入用户名和密码,两者之间用空格隔开(程序预设小学、初中和高中各三个账号,具体见附表),如果用户名和密码都正确,将根......
  • 并发编程系列 - ReadWriteLock
    实际工作中,为了优化性能,我们经常会使用缓存,例如缓存元数据、缓存基础数据等,这就是一种典型的读多写少应用场景。缓存之所以能提升性能,一个重要的条件就是缓存的数据一定是读多写少的,例如元数据和基础数据基本上不会发生变化(写少),但是使用它们的地方却很多(读多)。针对读多写少这种并......
  • java并发
    符合死锁的四个条件:互斥条件:一个时刻一个线程一个资源请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。不剥夺条件:线程已获得的资源,在未用完之前,不能被其他线程剥夺。循环等待条件:若干线程形成头尾相接的循环等待资源关系。如何预防和避免线程死......
  • 【赠书活动 -第01期】-〖Java编程思想(第四版)〗
    【赠书活动-第01期】-〖Java编程思想(第四版)〗活动时间:2023年9月21日~2023年9月30日参与方式:https://m.hlcode.cn/?id=NK1fWUR......
  • SHELL编程开发:如何轻松一键部署Nginx脚本
    实现思路下载Nginx源码包解压源码包进入解压后的目录配置编译选项编译并安装Nginx启动Nginx服务代码实现以下是脚本内容及注释:#!/bin/bash#一键部署Nginx脚本#下载Nginx源码包wgethttp://nginx.org/download/nginx-1.20.1.tar.gz#解压源码包tar-zxvfnginx-1.20.1.tar.......
  • 论文查重-第一次个人编程
    1、github链接:https://github.com/lanzeye7/lanzeye72、PSP表格PSP2.1PersonalSoftwareProcessStages预估耗时(分钟)实际耗时(分钟)Planning计划 60 85·Estimate·估计这个任务需要多少时间 180 210Development开发......
  • 日常编程奇葩又常规问题总结
    相信大家在日常编程中遇到过不少奇葩问题,第一反应就是不可能,怎么可能会出现这个问题呢?于是抓耳挠腮,带着充满疑惑的问题,四处寻找答案。其实遇到问题并不可怕,可怕的是我们缺少解决问题的信心,正所谓只要思想不滑坡,办法总比困难多。解决问题才是打工人的价值所在,也是个人价值展现......
  • Java学习之路--GUI编程06
    packagecom.gui.lesson06;importjavax.swing.*;importjava.awt.*;//2023.3.25/3.26GUI编程--下拉框学习(Combobox)//这个程序最终运行结果不美观,正常情况下下拉框我们放在一个面板里面再添加到容器中。这里就只是演示下拉框是什么样子publicclassTestComboboxDemo01extendsJ......
  • Java学习之路--GUI编程--贪吃蛇小游戏
    贪吃蛇小游戏界面实现四步走1.定义数据2.面板里将数据画上去3.监听事件键盘事件packagecom.gui.snake;importjavax.swing.*;importjava.net.URL;//2023.3.28GUI编程--贪吃蛇小游戏的实现--数据中心(只有一堆图片的数据)--贪吃蛇各个部分的导入--图片文件夹有两种导入......