首页 > 编程语言 >Java同步器之AQS源码分析

Java同步器之AQS源码分析

时间:2022-12-26 16:44:07浏览次数:66  
标签:Node Java AQS int 获取 源码 线程 节点 资源

一、简介

AbstractQueuedSynchronizer(简称AQS),抽象的队列式的同步器,是Java并发包实现的基类。

AQS用来构建锁和同步器的框架,使用AQS能简单且高效地构造出大量的应用广泛的同步器,如常用的ReentrantLockSemaphoreCountDownLatchFutureTask等。

二、框架概述

AQS是基于FIFO的队列实现的,并且内部维护了一个状态变量state,通过原子更新这个状态变量state即可以实现加锁解锁操作。

CLH(Craig, Landin and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个节点(Node)来实现锁的分配。

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:

  • Exclusive(独占),只有一个线程能执行,如ReentrantLock
  • Share(共享),多个线程可同时执行,如:SemaphoreCountDownLatch

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回true,否则返回false

ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,stateCAS1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

三、源码解析

3.1 内部类

 
static final class Node {
    // 标识一个节点是共享模式
    static final Node SHARED = new Node();
    // 标识一个节点是互斥模式
    static final Node EXCLUSIVE = null;

    // 标识线程已取消
    static final int CANCELLED =  1;
    // 标识后继节点需要唤醒
    static final int SIGNAL    = -1;
    // 标识线程等待在一个条件上
    static final int CONDITION = -2;
    // 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
    static final int PROPAGATE = -3;
    
    // 当前节点保存的线程对应的等待状态
    volatile int waitStatus;

    // 前一个节点
    volatile Node prev;
    
    // 后一个节点
    volatile Node next;

    // 当前节点保存的线程
    volatile Thread thread;

    // 下一个等待在条件上的节点(Condition锁时使用)
    Node nextWaiter;

    // 是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 获取前一个节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    // 节点的构造方法
    Node() {    // Used to establish initial head or SHARED marker
    }

    // 节点的构造方法
    Node(Thread thread, Node mode) {     // Used by addWaiter
        // 把共享模式还是互斥模式存储到nextWaiter这个字段里面了
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 节点的构造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        // 等待的状态,在Condition中使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

典型的双链表结构,节点中保存着当前线程、前一个节点、后一个节点以及线程的状态等信息。Node节点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。

waitStatus表示当前Node节点的等待状态,共有以下5种:

  • CANCELLED(1):表示当前节点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的节点将不会再变化。
  • SIGNAL(-1):表示后继节点在等待当前节点唤醒。后继节点入队时,会将前继节点的状态更新为SIGNAL
  • CONDITION(-2):表示节点等待在Condition上,当其他线程调用了Conditionsignal()方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继节点不仅会唤醒其后继节点,同时也可能会唤醒后继的后继节点。
  • 0:新节点入队时的默认状态。

注意,负值表示节点处于有效等待状态,而正值表示节点已被取消。所以源码中很多地方用>0<0来判断节点的状态是否正常。

3.2 属性

 
// 队列的头节点
private transient volatile Node head;
// 队列的尾节点
private transient volatile Node tail;
// 控制加锁解锁的状态变量
private volatile int state;

定义了一个状态变量和一个队列,状态变量用来控制加锁解锁,队列用来放置等待的线程。

注意,这几个变量都要使用volatile关键字来修饰,因为是在多线程环境下操作,要保证它们的值修改之后对其它线程立即可见。

这几个变量的修改是直接使用的Unsafe这个类来操作的:

 
// 获取Unsafe类的实例,注意这种方式仅限于jdk自己使用,普通用户是无法这样调用的
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 状态变量state的偏移量
private static final long stateOffset;
// 头节点的偏移量
private static final long headOffset;
// 尾节点的偏移量
private static final long tailOffset;
// 等待状态的偏移量(Node的属性)
private static final long waitStatusOffset;
// 下一个节点的偏移量(Node的属性)
private static final long nextOffset;

static {
    try {
        // 获取state的偏移量
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        // 获取head的偏移量
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        // 获取tail的偏移量
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        // 获取waitStatus的偏移量
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        // 获取next的偏移量
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

// 调用Unsafe的方法原子更新state
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

3.3 acquire

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。

 
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

函数流程如下:

  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

3.3.1 tryAcquire

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()

 
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过stateget/set/CAS)!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

为什么不直接定义成抽象方法呢?

因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底还是尽量减少开发者不必要的工作量。

3.3.2 addWaiter

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的节点。

 
private Node addWaiter(Node mode) {
    //以给定模式构造节点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    
    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    //上一步失败则通过enq入队。
    enq(node);
    return node;
}

3.3.2.1 enq

此方法用于将node加入队尾。

 
private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,创建一个空的标志节点作为head节点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

如果你看过AtomicInteger.getAndIncrement()函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很经典的用法。

3.3.3 acquireQueued

通过tryAcquire()addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。进入等待状态,直到其他线程彻底释放资源后唤醒,再拿到资源。

是不是跟医院排队拿号有点相似~acquireQueued()就是干这件事:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。这个函数非常关键。

 
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过
        
        //又是一个“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驱
            //如果前驱是head,即该节点已成老二,那么便有资格去尝试获取资源
            //(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                //拿到资源后,将head指向该节点。所以head所指的标杆节点,就是当前获取到资源的那个节点或null。
                setHead(node);
                // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head节点。
                // 也就意味着之前拿完资源的节点出队了!
                p.next = null; 
                failed = false; // 成功获取资源
                return interrupted;//返回等待过程中是否被中断过
            }
            
            //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。
            // 如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消节点在队列中的等待。
        if (failed) 
            cancelAcquire(node);
    }
}

到这里了,我们先不急着总结acquireQueued()的函数流程,先看看shouldParkAfterFailedAcquire()parkAndCheckInterrupt()具体干些什么。

3.3.3.1 shouldParkAfterFailedAcquire

此方法主要用于检查状态,看看是否真的可以进入waiting状态,万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧!

 
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的节点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被GC回收!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败(刚刚释放完)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

整个流程中,如果前驱节点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

3.3.3.2 parkAndCheckInterrupt

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

 
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//调用park()使线程进入waiting状态
    return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是Thread.interrupted()会清除当前线程的中断标记位。

3.3.3.3 小结

总结下acquireQueued该函数的具体流程:

  1. 节点进入队尾后,检查状态,找到安全休息点;
  2. 调用park()进入waiting状态,等待unpark()interrupt()唤醒自己;
  3. 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前节点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

3.3.4 小结

acquireQueued分析完之后,我们接下来再回到acquire方法

 
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

再来总结下它的流程吧:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

由于此函数是重中之重,我再用流程图总结一下:

至此,acquire()的流程终于算是告一段落了。这也就是ReentrantLock.lock()的流程。

3.4 release

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()

 
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

逻辑并不复杂,调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!

3.4.1 tryRelease

此方法尝试去释放指定量的资源。

 
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。

但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

3.4.2 unparkSuccessor

此方法用于唤醒等待队列中下一个线程。

 
private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的节点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的节点状态,允许失败。
    compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;//找到下一个需要唤醒的节点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
            if (t.waitStatus <= 0)//从这里可以看出,<=0的节点,都是还有效的节点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。

这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到headnext节点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆节点,表示自己已经获取到资源了,acquire()也返回了!

3.4.3 小结

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

如果获取锁的线程在release时异常了,没有unpark队列中的其他节点,这时队列中的其他节点会怎么办?答案是没法再被唤醒!队列中等待锁的线程将永远处于park状态,无法再被唤醒!

那么获取锁的线程在什么情形下会release抛出异常呢?线程突然死掉了?可以通过thread.stop来停止线程的执行,但该函数的执行条件要严苛的多,而且函数注明是非线程安全的,已经标明Deprecated;线程被interrupt了?线程在运行态是不响应中断的,所以也不会抛出异常;目前来看,没有看出能引发异常的情形。

3.5 acquireShared

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

 
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。

所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

3.5.1 doAcquireShared

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

 
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//加入队列尾部
    boolean failed = true;//是否成功标志
    try {
        boolean interrupted = false;//等待过程中是否被中断过的标志
        for (;;) {
            final Node p = node.predecessor();//前驱
            //如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
            if (p == head) {
                int r = tryAcquireShared(arg);//尝试获取资源
                if (r >= 0) {//成功
                    setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                    p.next = null; // help GC
                    if (interrupted)//如果等待过程中被打断过,此时将中断补上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样。

跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。

那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。

独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

3.5.1.1 setHeadAndPropagate

 
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);//head指向自己
    //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继节点,毕竟是共享模式!

3.5.2 小结

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)。

3.6 releaseShared

此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

 
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试释放资源
        doReleaseShared();//唤醒后继节点
        return true;
    }
    return false;
}

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待节点。

例如,资源总量是13A(5)B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒CC一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒CC一看有5个够自己用了,然后C就可以跟AB一起运行。

ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值。

3.6.1 doReleaseShared

此方法主要用于唤醒后继。

 
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//唤醒后继
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}

3.7 小结

值得注意的是,acquire()acquireShared()两种方法下,线程在等待队列中都是忽略中断的。AQS也支持响应中断的,acquireInterruptibly()/acquireSharedInterruptibly()即是,相应的源码跟acquire()acquireShared()差不多,这里就不再详解了。

四、案例 - 自定义同步器

4.1 Mutex(互斥锁)

Mutex是一个不可重入的互斥锁实现。锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。下边是Mutex的核心源码:

 
class Mutex implements Lock, java.io.Serializable {
    
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取资源,立即返回。成功则返回true,否则false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,立即返回。成功则为true,否则false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定为1个量
            if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
                 throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 真正同步类的实现都依赖继承于AQS的自定义同步器!
    private final Sync sync = new Sync();

    //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    
    //unlock<-->release。两者语文一样:释放资源。
    public void unlock() {
        sync.release(1);
    }
    
    //锁是否占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
    
}

同步类在实现时一般都将自定义同步器(sync)定义为内部类,供自己使用;而同步类自己(Mutex)则实现某个接口,对外服务。当然,接口的实现要直接依赖sync,它们在语义上也存在某种对应关系!而sync只用实现资源state的获取-释放方式tryAcquire-tryRelease,至于线程的排队、等待、唤醒等,上层的AQS都已经实现好了,我们不用关心。

除了MutexReentrantLock/CountDownLatch/Semaphore这些同步类的实现方式都差不多,不同的地方就在获取-释放资源的方式tryAcquire-tryRelease

4.2 TwinsLock(双资源锁)

 
/**
 * 双资源锁
 */
public class TwinsLock implements Lock {

    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -8540764104913403569L;

        private Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must larger than 0");
            }
            // 调用 AQS 设置资源总数
            // 初始化 state 的值,表示可同时访问的线程数量
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            // cas 获取锁
            // 由 AQS 的 acquireShared -> doAcquireShared 调用
            for (; ; ) {
                // 获取当前
                int current = getState();
                // 计算剩余可用数量
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        public boolean tryReleaseShared(int returnCount) {
            // cas 释放锁
            // 由AQS releaseShared -> doReleaseShared 调用
            for (; ; ) {
                int current = getState();
                int newState = current + returnCount;
                if (compareAndSetState(current, newState)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    // 忽略,如要实现,直接调用 AQS
    @Override
    public boolean tryLock() {
        return false;
    }

    // 忽略,如要实现,直接调用 AQS
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    // 忽略,如要实现,直接调用 AQS
    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    // 忽略,如要实现,直接调用 AQS
    @Override
    public Condition newCondition() {
        return null;
    }
}

共享式需要重写tryAcquireShared()tryReleaseShared()方法。TwinsLock类的作用是同时允许两个线程通过,其他线程需要等待。获取和释放的具体逻辑可以看上面代码注释,使用方式如下

 
TwinsLock lock = new TwinsLock();
lock.lock();
try {
    // do sth...
} finaly {
    lock.unlock();
}

转载:https://www.cnblogs.com/ciel717/p/16185051.html

 

  TRANSLATE with x English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
  TRANSLATE with COPY THE URL BELOW Back EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back

标签:Node,Java,AQS,int,获取,源码,线程,节点,资源
From: https://www.cnblogs.com/cainiao-Shun666/p/17006158.html

相关文章

  • Java类MemoryUsage查看虚拟机的使用情况
     Arthas是阿里巴巴开源的一款监控java进程的工具,可以有效监控CPU、内存使用情况,更厉害的是可以帮助开发人员深入排查java代码的问题,比如java进程占用cpu过高是哪一个线程......
  • https Java SSL Exception protocol_version
      在java代码中,使用HttpClient爬取https页面时,遇到了这个bug:javax.net.ssl.SSLException:Receivedfatalalert:protocol_version     先奉上初始的代码:1/**2......
  • javascript使用正则表达式替换或者捕获子字符串
    letstring='mutiFile[{"name":"新建文件夹(2).zip","ext":".zip","size":1675876,"path":"/static/upload/2022December/ba145698fcc99fd414f0f4ec6ea418e5.zip"}]';......
  • java获取stream流
    java获取stream流可以通过以下四种方式获取1通过list集合获取,list.stream()List<String>list=newArrayList<>();list.add("北京");list.add("上海");list.add("......
  • 阿里巴巴2020届秋招最后一班车 企业智能事业部 企业大脑技术部 2020届秋招 Java 开发
    阿里巴巴企业智能事业部企业大脑技术部2020届秋招-JAVA工程师阿里巴巴企业智能事业部,2020年秋季校招最后一班车啦:JAVA开发工程师虚位以待,机会难得,占坑抓紧。入职就发师兄,一......
  • idea java开发给方法上加注释
    打开IDEA开发工具,file->setting->Editor->LiveTemplates点加号选择templategroup随便起一个名字点击加号,选择LiveTemplate依次填上红框中的......
  • JavaScript中的宏任务和微任务
    在JavaScript中,宏任务和微任务是指在执行代码的过程中的两种不同的任务类型。宏任务(macrotask)指的是浏览器在执行代码的过程中会调度的任务,比如事件循环中的每一次迭代......
  • java基础学习
    数据类型强类型语言要求变量的使用要严格符合规定,所有变量必须先定义再使用java基本数据类型基本类型(primitivetype)数值类型整型byte,short,int,long;......
  • Java双亲委派模型:为什么要双亲委派?如何打破它?破在哪里?---todo
    文章目录一、前言二、类加载器三、双亲委派机制1、什么是双亲委派2、为什么要双亲委派?四、破坏双亲委派1、直接自定义类加载器加载2、跳过AppClassLoader和ExtClas......
  • JavaScript学习--Item30 数组进阶全掌握
    在程序语言中数组的重要性不言而喻,JavaScript中数组也是最常使用的对象之一,数组是值的有序集合,由于弱类型的原因,JavaScript中数组十分灵活、强大,不像是Java等强类型高级语......