首页 > 编程语言 >并发编程笔记三-ConditionObject源码深度解析

并发编程笔记三-ConditionObject源码深度解析

时间:2024-12-15 12:01:44浏览次数:5  
标签:Node ConditionObject node AQS 编程 源码 线程 唤醒 节点

 一. ConditionObject概述

        synchronized提供了wait和notify的方法实现线程在持有锁时,可以实现挂起,唤醒的操作。其实ReentrantLock也拥有这个功能,ReentrantLock提供了await和signal方法去实现类似wait和notify的功能。同样的,想执行await或者是signal就必须先持有lock锁的资源。此外,await和signal方法离不开AQS中的一个内部类——ConditionObject,就是它内部提供了如await,signal等方法帮我们实现了线程在持有锁时,可以实现挂起,唤醒的操作。此外,后续的源码讲解需要大家对ReentrantLock源码已经有了研读基础,如果没有的,请务必先看完并发编程笔记一和二。

二. ConditionObject的应用

        还记得java并发线程的一个经典应用案例吗,就是开启两个线程,一个线程打印偶数,一个线程打印奇数,最后实现顺序打印1-100的功能。以往我们用synchronized中的wait以及notify方法可以轻松实现,同理,我们使用ConditionObject,也同样的可以实现,如下代码。

package com.js.practice.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author jiangsong
 * @classPath com.js.practice.thread
 * @datetime 2024/12/9 23:24
 * @description
 **/
public class PrintNums03 {

    public static int i = 1;

    public static boolean isEVEN = false;

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition evencondition = lock.newCondition();
        Condition oddCondition = lock.newCondition();
        new Thread(()->{
            while(i < 100) {
                lock.lock();
                while (isEVEN) {
                    try {
                        oddCondition.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                System.out.println("奇数:" + i++);
                isEVEN = true;
                evencondition.signal();
                lock.unlock();
            }
        }).start();

        new Thread(()->{
            while(i < 100) {
                lock.lock();
                while(!isEVEN) {
                    try {
                        evencondition.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                System.out.println("偶数:" + i++);
                isEVEN = false;
                oddCondition.signal();
                lock.unlock();
            }
        }).start();
    }

}

        建议看不明白的同学们可以直接先复制我这个代码,自己去执行看看,验证一下如ReentrantLock也能轻松实现synchronized中wait和notify的效果,然后再自己去写出来,只有实践才能让自己更加深刻的感受和体会。上述代码我们可以看到,想要执行如await或者signal方法,同样需要线程先持有锁,只不过此处持有的时Lock锁。

三. ConditionObject的构建方式和核心属性

        由第二章节的应用,我们在通过lock锁对象执行newCondition方法时,本质就是直接new的AQS提供的ConditionObject对象,如下代码。

final ConditionObject newCondition() {
    return new ConditionObject();
}

         由上,我们还可以得出一个结论,Lock锁可以同时拥有多个ConditionObject对象,并且各个ConditionObject的操作是独立的,互不影响。

        接着,再往下看,ConditionObject实际上只有两个核心属性,如下代码块。

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

        虽然Node对象有prev和next,但是在ConditionObject中是不会使用这两个属性的,只要在Condition队列中,这两个属性都是null。在ConditionObject中只会使用Node节点的nextWaiter的属性实现单向链表的效果。

四. await方法前置分析

        到这里,咱们进行一个从简单再到复杂的拆解讲法。先分析在持有了Lock锁后的线程调用了await方法做了什么,其实很简单,有如下操作:

  • 判断线程是否中断,如果中断了,什么都不做。
  • 没有中断,就将当前线程封装为Node添加到Condition的单向链表中
  • 一次性释放掉锁资源。
  • 如果当前线程没有在AQS队列,就正常执行LockSupport.park(this)挂起线程。

        接下来,基于我们对于上述操作的了解,一起来看一下它的源码实现,此处只做了await方法的前置分析,为了便于理解,后置分析在讲完signal操作后会再详解,大家不用着急,一步步跟上来。

// await方法的前置分析,只分析到线程挂起
public final void await() throws InterruptedException {
    // 先判断线程的中断标记位是否是true
    if (Thread.interrupted())
        // 如果是true,就没必要执行后续操作挂起了。
        throw new InterruptedException();
    // 在线程挂起之前,先将当前线程封装为Node,并且添加到Condition队列中
    Node node = addConditionWaiter();
    // fullyRelease在释放锁资源,一次性将锁资源全部释放,并且保留重入的次数
    int savedState = fullyRelease(node);
    // 省略一行代码……
    // 当前Node是否在AQS队列中?
    // 执行fullyRelease方法后,线程就释放锁资源了,如果线程刚刚释放锁资源,其他线程就立即执行了signal方法,
    // 此时当前线程就被放到了AQS的队列中,这样一来线程就不需要执行LockSupport.park(this);去挂起线程了
    while (!isOnSyncQueue(node)) {
        // 如果没有在AQS队列中,正常在Condition单向链表里,正常挂起线程。
        LockSupport.park(this);
        // 省略部分代码……
    }
    // 省略部分代码……
}

// 线程挂起先,添加到Condition单向链表的业务~~
private Node addConditionWaiter() {
    // 拿到尾节点。
    Node t = lastWaiter;
    // 如果尾节点有值,并且尾节点的状态不正常,不是-2,尾节点可能要拜拜了~
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 如果尾节点已经取消了,需要干掉取消的尾节点~
        unlinkCancelledWaiters();
        // 重新获取lastWaiter
        t = lastWaiter;
    }
    // 构建当前线程的Node,并且状态设置为-2
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果last节点为null。直接将当前节点设置为firstWaiter
    if (t == null)
        firstWaiter = node;
    else
        // 如果last节点不为null,说明有值,就排在lastWaiter的后面
        t.nextWaiter = node;
    // 把当前节点设置为最后一个节点
    lastWaiter = node;
    // 返回当前节点
    return node;
}

// 干掉取消的尾节点。
private void unlinkCancelledWaiters() {
    // 拿到头节点
    Node t = firstWaiter;
    // 声明一个节点,爱啥啥~~~
    Node trail = null;
    // 如果t不为null,就正常执行~~
    while (t != null) {
        // 拿到t的next节点
        Node next = t.nextWaiter;
        // 如果t的状态不为-2,说明有问题
        if (t.waitStatus != Node.CONDITION) {
            // t节点的next为null
            t.nextWaiter = null;
            // 如果trail为null,代表头结点状态就是1,
            if (trail == null)
                // 将头结点指向next节点
                firstWaiter = next;
            else
                // 如果trail有值,说明不是头结点位置
                trail.nextWaiter = next;
            // 如果next为null,说明单向链表遍历到最后了,直接结束
            if (next == null)
                lastWaiter = trail;
        }
        // 如果t的状态是-2,一切正常
        else {
            // 临时存储t
            trail = t;
        }
        // t指向之前的next
        t = next;
    }
}

// 一次性释放锁资源
final int fullyRelease(Node node) {
    // 标记位,释放锁资源默认失败!
    boolean failed = true;
    try {
        // 拿到现在state的值
        int savedState = getState();
        // 一次性释放干净全部锁资源
        if (release(savedState)) {
            // 释放锁资源失败了么? 没有!
            failed = false;
            // 返回对应的锁资源信息
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            // 如果释放锁资源失败,将节点状态设置为取消
            node.waitStatus = Node.CANCELLED;
    }
}

五. signal方法分析

此处老套路,我将signal操作也分为了如下几个部分:

  • 确保执行signal方法的是持有锁的线程
  • 脱离Condition的队列
  • 将Node状态从-2改为0
  • 将Node添加到AQS队列
  • 为了避免当前Node无法在AQS队列正常唤醒做了一些判断和操作

接下来,基于我们对于上述操作的理解,来看看它的源码又是怎么实现的呢?

// 线程挂起后,可以基于signal唤醒~
public final void signal() {
    // 在ReentrantLock中,如果执行signal的线程没有持有锁资源,直接扔异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 拿到排在Condition首位的Node
    Node first = firstWaiter;
    // 有Node在排队,才需要唤醒,如果没有,直接告辞~~
    if (first != null)
        doSignal(first);
}

// 开始唤醒Condition中的Node中的线程
private void doSignal(Node first) {
    // 先一波do-while走你~~~
    do {
        // 获取到第二个节点,并且将第二个节点设置为firstWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            // 说明就一个节点在Condition队列中,那么直接将firstWaiter和lastWaiter置位null
            lastWaiter = null;
        // 如果还有nextWaiter节点,因为当前节点要被唤醒了,脱离整个Condition队列。将nextWaiter置位null
        first.nextWaiter = null;
        // 如果transferForSignal返回true,一切正常,退出while循环
    } while (!transferForSignal(first) &&
            // 如果后续节点还有,往后面继续唤醒,如果没有,退出while循环
             (first = firstWaiter) != null);
}

// 准备开始唤醒在Condition中排队的Node
final boolean transferForSignal(Node node) {
    // 将在Condition队列中的Node的状态从-2,改为0,代表要扔到AQS队列了。
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        // 如果失败了,说明在signal之前应当是线程被中断了,从而被唤醒了。
        return false;
    // 如果正常的将Node的状态从-2改为0,这是就要将Condition中的这个Node扔到AQS的队列。
    // 将当前Node扔到AQS队列,返回的p是当前Node的prev
    Node p = enq(node);
    // 获取上一个Node的状态
    int ws = p.waitStatus;
    // 如果ws > 0 ,说明这个Node已经被取消了。
    // 如果ws状态不是取消,将prev节点的状态改为-1,。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果prev节点已经取消了,可能会导致当前节点永远无法被唤醒。立即唤醒当前节点,基于acquireQueued方法,
            // 让当前节点找到一个正常的prev节点,并挂起线程
        // 如果prev节点正常,但是CAS修改prev节点失败了。证明prev节点因为并发原因导致状态改变。还是为了避免当前
            // 节点无法被正常唤醒,提前唤醒当前线程,基于acquireQueued方法,让当前节点找到一个正常的prev节点,并挂起线程
        LockSupport.unpark(node.thread);
    // 返回true
    return true;
}

六. await方法后置分析

我将await方法的后置分析,分为了如下几个部分:

  • 唤醒之后,要先确认是中断唤醒还是signal唤醒,还是signal唤醒后被中断
  • 确保当前线程的Node已经在AQS队列中
  • 执行acquireQueued方法,等待锁资源。
  • 在获取锁资源后,要确认是否在获取锁资源的阶段被中断过,如果被中断过,并且不是THROW_IE,那就确保interruptMode是REINTERRUPT。
  • 确认当前Node已经不在Condition队列中了
  • 最终根据interruptMode来决定具体做的事情
    • 0:嘛也不做。
    • THROW_IE:抛出异常
    • REINTERRUPT:执行线程的interrupt方法

接下来,我们再进行源码的研读,如下代码块。

// 现在分析await方法的后半部分
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 中断模式~
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 如果线程执行到这,说明现在被唤醒了。
        // 线程可以被signal唤醒。(如果是signal唤醒,可以确认线程已经在AQS队列中)
        // 线程可以被interrupt唤醒,线程被唤醒后,没有在AQS队列中。
        // 如果线程先被signal唤醒,然后线程中断了。。。。(做一些额外处理)
        // checkInterruptWhileWaiting可以确认当前中如何唤醒的。
        // 返回的值,有三种
        // 0:正常signal唤醒,没别的事(不知道Node是否在AQS队列)
        // THROW_IE(-1):中断唤醒,并且可以确保在AQS队列
        // REINTERRUPT(1):signal唤醒,但是线程被中断了,并且可以确保在AQS队列
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // Node一定在AQS队列
    // 执行acquireQueued,尝试在ReentrantLock中获取锁资源。
    // acquireQueued方法返回true:代表线程在AQS队列中挂起时,被中断过
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        // 如果线程在AQS队列排队时,被中断了,并且不是THROW_IE状态,确保线程的interruptMode是REINTERRUPT
        // REINTERRUPT:await不是中断唤醒,但是后续被中断过!!!
        interruptMode = REINTERRUPT;
    // 如果当前Node还在condition的单向链表中,脱离Condition的单向链表
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 如果interruptMode是0,说明线程在signal后以及持有锁的过程中,没被中断过,什么事都不做!
    if (interruptMode != 0)
        // 如果不是0~
        reportInterruptAfterWait(interruptMode);
}
// 判断当前线程被唤醒的模式,确认interruptMode的值。
private int checkInterruptWhileWaiting(Node node) {
    // 判断线程是否中断了。
    return Thread.interrupted() ?
        // THROW_IE:代表线程是被interrupt唤醒的,需要向上排除异常
        // REINTERRUPT:代表线程是signal唤醒的,但是在唤醒之后,被中断了。

        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        // 线程是正常的被signal唤醒,并且线程没有中断过。
        0;
}

// 判断线程到底是中断唤醒的,还是signal唤醒的!
final boolean transferAfterCancelledWait(Node node) {
    // 基于CAS将Node的状态从-2改为0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 说明是中断唤醒的线程。因为CAS成功了。
        // 将Node添加到AQS队列中~(如果是中断唤醒的,当前线程同时存在Condition的单向链表以及AQS的队列中)
        enq(node);
        // 返回true
        return true;
    }
    // 判断当前的Node是否在AQS队列(signal唤醒的,但是可能线程还没放到AQS队列)
    // 等到signal方法将线程的Node扔到AQS队列后,再做后续操作
    while (!isOnSyncQueue(node))
        // 如果没在AQS队列上,那就线程让步,稍等一会,Node放到AQS队列再处理(看CPU)
        Thread.yield();
    // signal唤醒的,返回false
    return false;
}

// 确认Node是否在AQS队列上
final boolean isOnSyncQueue(Node node) {
    // 如果线程状态为-2,肯定没在AQS队列
    // 如果prev节点的值为null,肯定没在AQS队列
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        // 返回false
        return false;
    // 如果节点的next不为null。说明已经在AQS队列上。、
    if (node.next != null) 
        // 确定AQS队列上有!
        return true;
    // 如果上述判断都没有确认节点在AQS队列上,在AQS队列中寻找一波
    return findNodeFromTail(node);
}
// 在AQS队列中找当前节点
private boolean findNodeFromTail(Node node) {
    // 拿到尾节点
    Node t = tail;
    for (;;) {
        // tail是否是当前节点,如果是,说明在AQS队列
        if (t == node)
            // 可以跳出while循环
            return true;
        // 如果节点为null,AQS队列中没有当前节点
        if (t == null)
            // 进入while,让步一手
            return false;
        // t向前引用
        t = t.prev;
    }
}

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    // 如果是中断唤醒的await,直接抛出异常!
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    // 如果是REINTERRUPT,signal后被中断过
    else if (interruptMode == REINTERRUPT)
        // 确认线程的中断标记位是true
        // Thread.currentThread().interrupt();
        selfInterrupt();
}

七.awaitNanos&signalAll方法分析

        awaitNanos:仅仅是在await方法的基础上,做了一内内的改变,整体的逻辑思想都是一样的。挂起线程时,传入要阻塞的时间,时间到了,自动唤醒,走添加到AQS队列的逻辑。

// await指定时间,多了个时间到了自动醒。
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // deadline:当前线程最多挂起到什么时间点
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // nanosTimeout的时间小于等于0,直接告辞!!
        if (nanosTimeout <= 0L) {
            // 正常扔到AQS队列
            transferAfterCancelledWait(node);
            break;
        }
        // nanosTimeout的时间大于1000纳秒时,才可以挂起线程
        if (nanosTimeout >= spinForTimeoutThreshold)
            // 如果大于,正常挂起
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 计算剩余的挂起时间,可能需要重新的走while循环,再次挂起线程
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    // 剩余的挂起时间
    return deadline - System.nanoTime();
}

        signalAll方法。这个方法一看就懂,之前signal是唤醒1个,这个是全部唤醒,跟signal方法几乎是一模一样的,只是唤醒一个和唤醒多个的区别。

// 以do-while的形式,将Condition单向链表中的所有Node,全部唤醒并扔到AQS队列
private void doSignalAll(Node first) {
    // 将头尾都置位null~
    lastWaiter = firstWaiter = null;
    do {
        // 拿到next节点的引用
        Node next = first.nextWaiter;
        // 断开当前Node的nextWaiter
        first.nextWaiter = null;
        // 修改Node状态,扔AQS队列,是否唤醒!
        transferForSignal(first);
        // 指向下一个节点
        first = next;
    } while (first != null);
}

八. 总结

        本次课程主要是围绕AQS中的内部类ConditionObject做了一个深层次源码解析,我们可以不用将每行代码全部读懂,但是希望大家起码要理解它做了些什么,又是怎么做的。只有理解了这部分,后续我们在阅读java.util.concurrent并发工具包的其他源码时才能得心应手,希望大家多看,多写,多去自己体会,最好是可以做到跟我一样,自己去拆解ConditionObject类是什么,又能帮我们做什么,怎么去做的,然后最好还能自己对着源码写下自己的注解。下节课,我将对ReentrantReadWriteLock做源码深度解析,如果有前面课程的基础,尤其是第一节课的同学,那这个ReentrantReadWriteLock的源码解析对于你来说将会很轻松,因为它实际上和ReentrantLock实现大部分一致。

标签:Node,ConditionObject,node,AQS,编程,源码,线程,唤醒,节点
From: https://blog.csdn.net/qq_41191331/article/details/144484445

相关文章

  • 【2025最新计算机毕业设计】基于SprintBoot+Vue爱之屋摄影在线预约管理系统【提供源码
    作者简介:✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流。✌ 主要内容:......
  • 说说你对自顶向下的编程方法的理解?
    自顶向下的编程方法(Top-DownProgramming)是一种软件开发策略,特别是在前端开发领域,这种方法强调从高层次、整体结构开始设计,逐步细化到具体实现细节。以下是对自顶向下编程方法在前端开发中的详细理解:1.高层设计自顶向下的方法首先关注的是系统的整体结构和功能需求。在前端开发......
  • B4X编程语言:B4X字符串生成器StringBuilder
            SQL查询字符串、B4J控件的Style属性字符串等有时候会很长,不但影响代码结构的可读性,而且易出错、不方便输入和维护。我们通常会拆分为两个或多个字符串变量,输入后再连接合并使用。        B4X为我们提供了一个很好用的字符串操作对象:StringBuilder字符......
  • Python 编程中的优秀代码框架与设计模式
    Python编程中的优秀代码框架与设计模式Python是一种高级编程语言,因其简洁、易读的语法和强大的库支持而广受开发者欢迎。在开发过程中,选择合适的代码框架和遵循良好的设计模式可以大大提高项目的可维护性、扩展性和效率。本文将介绍一些常用的Python代码框架以及推荐的......
  • 泷羽sec-shell编程(7)
    shell(7)声明!学习视频来自B站up主泷羽sec有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关,切勿触碰法律底线,否则后果自负!!!!有兴趣的小伙伴可以点击下面连接进入b站主......
  • Qt网络编程知识体系
    Qt网络编程基础Qt直接提供网络编程模块,基于TCP/IP客户端和服务器相关各种类。TCP通信(QTcpSocket/QTcpServer)。UDP通信(QUdpSocket)。还有部分实现HTTP、FTP等网络协议的高级类。如QNetworkRequest/QNetworkAccessManager等。我们开发过程中,UDP、TCP、HTTP通信类等,必须在程序......
  • 基于java的SpringBoot/SSM+Vue+uniapp的校园新闻网站的详细设计和实现(源码+lw+部署文
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • 基于java的SpringBoot/SSM+Vue+uniapp的新生宿舍管理系统的详细设计和实现(源码+lw+部
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • python面向对象高级编程:使用元类
    在Python中,元类(Metaclass)是创建类的“类”。换句话说,元类是用来控制类的行为的。虽然元类在Python中不常用,但在某些高级编程场景中,它们可以提供强大的功能,如自动注册类、验证类定义、修改类属性等。1.导入必要的模块虽然元类不需要导入额外的模块,但你需要了解如何使用内置的......
  • python面向对象高级编程:使用枚举类
    在Python中,枚举类(Enum)是一种特殊的数据类型,它允许我们定义一组命名的常量。使用枚举类可以使代码更加清晰和易于维护,特别是在处理一组相关常量时。Python的enum模块提供了创建枚举类的功能。以下是如何在Python中使用枚举类的一些高级编程技巧:1.导入enum模块首先,我们需要导......