首页 > 编程语言 >Java并发编程——并发包中锁的AQS通用实现

Java并发编程——并发包中锁的AQS通用实现

时间:2023-02-01 16:03:21浏览次数:72  
标签:Node head 中锁 AQS 队列 线程 发包 节点

一、包结构介绍

我们查看下java.util.concurrent.locks包下面,发现主要包含如下类:

可以发现ReentrantLock和ReentrantReadWriteLock都是AbstractQueueSynchronizer类。我们先来介绍下AbstractQueuedSynchronizer类。

二、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer,简写为AQS,抽象队列同步器。它是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效的构造出来,以下都是通过ASQ构造出来的:ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,SynchronousQueue。

三、AQS原理

AQS是通过队列来辅助实现线程同步的,线程并发争夺state资源,争夺失败的则进入等待队列(同步队列)并进入阻塞状态,在state资源被释放之后,从队列头唤醒被阻塞的线程节点,进行state资源的竞争。

 

这样势必会涉及很频繁的队列入队出队操作,以及线程的阻塞唤醒操作。这些操作恰恰是最难编写,最容易出错的,为此AQS把这些操作做了封装,以模板的方式提供出来,我们可以通过实现模板的相关方法,实现不一样的锁或者同步器。

 

AQS使用了模板方法,把同步队列都封装起来了,同时提供了以下五个未实现的方法,用于子类的重写:

  • boolean tryAcquire(int arg):尝试以独占模式进行获取。 此方法应查询对象的状态是否允许以独占模式获取对象,如果允许则获取它。如果获取失败,则将当前线程加入到等待队列,直到其他线程唤醒。

  • boolean tryRelease(int arg):尝试以独占模式释放锁。

  • int tryAcquireShared(int arg):尝试以共享模式获取锁,此方法应查询对象的状态是否允许以共享模式获取对象,如果允许则获取它。如果获取失败,则将当期线程加入到等待队列,直到其他线程唤醒。

  • boolean tryReleaseShared(int arg):尝试以共享模式释放锁。

  • boolean isHeldExclusively():是否独占模式

四、AQS数据结构

4.1 AQS同步器数据结构

AQS同步器数据结构

如上图,AQS中:

  • state:所有线程通过通过CAS尝试给state设值,当state>0时表示被线程占用;同一个线程多次获取state,会叠加state的值,从而实现了可重入;

  • exclusiveOwnerThread:在独占模式下该属性会用到,当线程尝试以独占模式成功给state设值,该线程会把自己设置到exclusiveOwnerThread变量中,表明当前的state被当前线程独占了;

  • 等待队列(同步队列):等待队列中存放了所有争夺state失败的线程,是一个双向链表结构。state被某一个线程占用之后,其他线程会进入等待队列;一旦state被释放(state=0),则释放state的线程会唤醒等待队列中的线程继续尝试cas设值state;

  • head:指向等待队列的头节点,延迟初始化,除了初始化之外,只能通过setHead方法进行修改;

  • tail:指向等待队列的队尾,延迟初始化,只能通过enq方法修改tail,该方法主要是往队列后面添加等待节点。

4.2 AQS队列节点数据结构

AQS队列节点数据结构

  • prev:指向队列中的上一个节点;
  • waitStatus:节点的等待状态,初始化为0,表示正常同步等待:
    • CANCELLED = 1:节点因超时或者被中断而取消时设置为取消状态;
    • SIGNAL = -1:指示当前节点被释放后,需要调用unpark通知后面节点,如果后面节点发生竞争导致获取锁失败,也会将当前节点设置为SIGNAL;
    • CONDITION = -2:指示该线程正在进行条件等待,条件队列中会用到;
    • PROPAGATE = -3:共享模式下释放节点时设置的状态,表示无限传播下去。
  • thread:当前节点操作的线程;
  • nextWaiter:该字段在Condition条件等待中会用到,指向条件队列的下一个节点。或者链接到SHARED常量,表示节点正在以共享模式等待;
  • next:指向队列中的下一个节点。

Node结点

Node结点是AbstractQueuedSynchronizer中的一个静态内部类,我们捡Node的几个重要属性来说一下

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 同步等待队列(双向链表)中的节点
     */ 
    static final class Node {
    
        /** 用于指示节点正在共享模式下等待的标记 */
        static final Node SHARED = new Node();
        
        /** 用于指示节点正在独占模式下等待的标记 */
        static final Node EXCLUSIVE = null;

        /** 线程被取消了 */
        static final int CANCELLED =  1;
        
        /** 
         * 如果前驱节点的等待状态是SIGNAL,表示当前节点将来可以被唤醒,那么当前节点就可以安全的挂起了 
         * 否则,当前节点不能挂起 
         */
        static final int SIGNAL    = -1;
        
        /** 线程正在等待条件 */
        static final int CONDITION = -2;
        
        /**
         * 指示下一个acquireShared应无条件传播
         */
        static final int PROPAGATE = -3;

        //值就是前四个int值和0(CANCELLED/SIGNAL/CONDITION/PROPAGATE/0)
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * 节点中的线程
         */
        volatile Thread thread;

        /**
         * 条件队列中的下一个节点,条件队列为单链表
         */
        Node nextWaiter;
    }
}

LockSupport的park和unpark

AQS中线程的阻塞和唤醒基本上都使用这两个方法实现的。其底层都是依赖Unsafe实现的。

 

LockSupport是用来创建锁和其他同步类的基本线程阻塞的原语。

 

此类与使用它的每个线程关联一个许可(permit: 0表示无许可,1 表示有许可),如果有许可,将立刻返回对park()的调用,并且在此过程化消耗掉它。否则,park()会导致线程进入阻塞;调用 unpark() 可使许可证可用,如果尚不可用。不过与信号量不同的是,许可证不会累加,最多只有一个。

 

该类中常见的两个方法两个方法:

  • park(Object blocker):实现线程的阻塞。除非有许可,否则出于线程调度目的将阻塞线程;如果有许可,则将许可消耗,然后线程往下继续执行;

  • unpark(Thread thread):实现解除线程的阻塞。如果线程在park方法上被阻塞,则调用该方法将取消阻塞。否则,许可变为1,保证下一次调用park方法不会阻塞。

这两个方法底层是调用了Unsafe中的park和unpark的native方法。

CAS

我们知道,计算机中提供了cas相关指令,这是一种乐观的并发策略,需要硬件指令集的发展才能支持,实现了:操作+冲突检测的原子性。

 

IA64 和 X86 使用cmpxchg指令完成CAS功能。

cas 内存位置 旧预期值 新值

CAS存在ABA问题,可以使用版本号进行控制,保证其正确性。

JDK中的CAS,相关类:Unsafe里面的compareAndSwapInt()以及compareAndSwapLong()等几个方法包装提供。只有启动类加载器加载的class才能访问他,或者通过反射获取。

五、AQS中的一般处理流程

为了弄清楚AQS中是如何进行队列同步的,我们先从一个简单的独占加锁方法说起。

5.1 public final void acquire(int arg)

这个方法是使用独占模式获取锁,忽略中断。通过至少调用一次tryAcquire成功返回来实现。 否则,线程将排队,并可能反复阻塞和解除阻塞,并调用tryAcquire直到成功。

 

我们先看一下这个方法的入口代码:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	
	public final void acquire(int arg) {
		// 尝试获取锁,这里是一个在AQS中未实现的方法,具体由子类实现
		if (!tryAcquire(arg) &&  
		
			// 获取不到锁,则 1.添加到等待队列 2.不断循环等待重试
			acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  
			selfInterrupt();
	}
}

5.2 tryAcquire(int arg)

一开始,会尝试调用AQS中未实现的方法tryAcquire()尝试获取锁,获取成功则表示获取锁了,该方法的实现一般通过CAS进行设置state尝试获取锁:

不同的锁可以有不同的tryAcquire()实现,所以,你可以看到ReentrantLock锁里面会有非公平锁和公平锁的实现方式。

ReentrantLock公平锁的实现代码在获取锁之前多了一个判断:!hasQueuedPredecessors(),这个是判断如果当前线程节点之前没有其他节点了,那么我们才可以尝试获取锁,这就是公平锁的体现。

5.3 private Node addWaiter(Node mode)

获取锁失败之后,则会进入这一步,这里会尝试把线程节点追加到等待队列后面,是通过CAS进行追加的,追加失败的情况下,会循环重试,直至追加成功为止。如果追加的时候,发现head节点还不存在,则先初始化一个head节点,然后追加上去:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	
    private Node addWaiter(Node mode) {
		// 将当期线程构造成Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
			// 将原来尾节点设置为新节点的上一个节点
            node.prev = pred;
			// 尝试用新节点取代原来的尾节点
            if (compareAndSetTail(pred, node)) {
				// 取代成功,则将原来尾指针的下一个节点指向新节点
                pred.next = node;
                return node;
            }
        }
		// 如果当前尾指针为空,则调用enq方法
        enq(node);
        return node;
    }
}

5.4 final boolean acquireQueued(final Node node, int arg)

加入等待队列之后,会执行该方法,不断循环地判断当前线程节点是否在head后面一位,如果是则调用tryAcquire()获取锁,如果获取成功,则把线程节点作为Node head,并把原Node head的next设置为空,断开原来的Node head。注意这个Node head只是占位作用,每次处理的都是Node head的下一个节点:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    /**
     * 已经入队的线程尝试获取锁
     */ 
    final boolean acquireQueued(final Node node, int arg) {
        //标记是否成功获取锁
        boolean failed = true;
        try {
            //标记线程是否被中断过
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 获取成功,将当前节点设置为head节点
                    setHead(node);
                    // 原head节点出队,在某个时间点被GC回收
                    p.next = null; // help GC
                    //获取成功
                    failed = false;
                    //返回是否被中断过
                    return interrupted;
                }
				// 判断是否需要阻塞线程,该方法中会把取消状态的节点移除掉,并且把当前节点的前一个节点设置为SIGNAL
                // 判断获取失败后是否可以挂起,若可以则挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 线程若被中断,设置interrupted为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}

如果当前节点的pre不是head,或者争抢失败,则会将前面节点的状态设置为SIGNAL。

 

如果前面的节点状态大于0,表示节点被取消,这个时候会把该节点从队列中移除掉。

 

下图为尝试CAS争抢锁,但失败了,然后把head节点状态设置为SIGNAL:

然后再会循环一次尝试获取锁,如果获取失败了,就调用LockSupport.park(this)挂起线程。

 

那么时候才会触发唤起线程呢?这个时候我们得先看看释放锁是怎么做的了。

5.5 public final boolean release(int arg)

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    public final boolean release(int arg) {
        //如果成功释放锁
        if (tryRelease(arg)) {
            //获取头节点:(注意:这里的头节点就是当前正在释放锁的节点)
            Node h = head;
            //头结点存在且等待状态不是取消
            if (h != null && h.waitStatus != 0)
                //唤醒距离头节点最近的一个非取消的节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
}

tryRelease()具体由子类实现。一般处理流程是让state减1。

 

如果释放锁成功,并且头节点waitStatus!=0,那么会调用unparkSuccessor()通知唤醒后续的线程节点进行处理。

 

注意:在遍历队列查找唤醒下一个节点的过程中,如果发现下一个节点状态是CANCELLED那么就会忽略这个节点,然后从队列尾部向前遍历,找到与头结点最近的没有被取消的节点进行唤醒操作。

唤醒之后,节点对应的线程2又从acquireQueued()方法的阻塞处醒来继续参与争抢锁。并且争抢成功了,那么会把head节点的下一个节点设置为null,让自己所处的节点变为head节点:

这样一个AQS独占式、非中断的抢占锁的流程就结束了。

5.6 完整流程

最后我们再以另一个维度的流程来演示下这个过程。

 

首先有4个线程争抢锁,线程1,成功了,其他三个失败了,分别依次入等待队列:

线程2、线程3依次入队列:

现在突然发生了点事情,假设线程3用的是带有超时时间的tryLock,超过了等待时间,线程3状态变为取消状态了,这个时候,线程4追加到等待队列中后,发现前一个节点的状态是1取消状态,那么会执行操作把线程3节点从队列中移除掉:

最后,线程1释放了锁,然后把head节点ws设置为0,并且找到了离head最靠近的一个waitStatus<=0的线程并唤醒,然后参与竞争获取锁:

最终,线程2获取到了锁,然后把自己变为了Head节点,并取代了原来的Head节点:

接着就一直这样循环。

六、组合大于继承原则

我们可以看到,许多同步器(比如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock)是通过委托AQS的子类Sync、FairSync或NonfairSync来调用AQS的方法,而不是直接扩展AQS,这样做可以避免ASQ中的方法污染了锁的API,破坏锁接口的简洁性。

 

参考: https://www.itzhai.com/articles/aqs-and-lock-implementation-in-concurrent-packages.html

标签:Node,head,中锁,AQS,队列,线程,发包,节点
From: https://blog.51cto.com/u_14014612/6031637

相关文章

  • Java并发JUC——AQS
    为什么需要AQS锁和协作类有共同点:闸门像ReentrantLock和Semaphore有一些共同点,并且很相似事实上,不仅仅是ReentrantLock和Semaphore,包括CountDownLatch、ReentrantReadW......
  • 带你熟悉3种AQS的线程并发工具的用法
    摘要:AQS的全称为(AbstractQueuedSynchronizer),AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器。本文分享自华为云社区《【高并......
  • AQS从应用到原理
    从高层来看AQS,即AbstractQueuedSynchronizer类,无论是听起来还是看起来,它都很令人畏惧,但抛离它的实现原理,站在AQS的用户——比如Mutex、CountDownLatch这些类——的视角来......
  • 【高并发】AQS中的CountDownLatch、Semaphore与CyclicBarrier用法总结
    CountDownLatch概述同步辅助类,通过它可以阻塞当前线程。也就是说,能够实现一个线程或者多个线程一直等待,直到其他线程执行的操作完成。使用一个给定的计数器进行初始化,该......
  • 常用go开发包
    前言随着时间的推移,语言爱好者已经构建和共享了许多Go框架和库。这些包执行不同的功能,从开发微服务到制作discord机器人,一直到构建Web应用程序!在本文中,我将尝试让您......
  • 认真学习MySQL中锁机制(二)
    接上文​​认真学习MySQL中锁机制(一)​​我们继续学习MySQL中的锁机制。【5】按加锁的方式划分:显示锁、隐式锁①隐式锁一个事务在执行insert操作时,如果即将插入的间隙已经被......
  • AQS抽象队列同步器
    AbstractQueuedSynchronizer抽象的队列同步器AQS是volatile+CAS机制实现的锁模板,保证了代码的同步性和可见性。AQS定义了一套多线程访问共享资源的同步器框架,封装了线程......
  • Java同步器之AQS源码分析
    一、简介AbstractQueuedSynchronizer(简称AQS),抽象的队列式的同步器,是Java并发包实现的基类。AQS用来构建锁和同步器的框架,使用AQS能简单且高效地构造出大量的应用广泛......
  • AQS互斥锁入队和出队的过程
    互斥锁:同步队列的初始化构建互斥锁:同步队列的出队过程......
  • AQS获取和释放独占锁流程
    非阻塞的获取和释放独占锁的流程,acquire方法可中断式获取和释放独占锁。acquireInterruptibly方法超时获取和释放独占锁。tryAcquireNanos方法......