首页 > 其他分享 >AQS 中的 ConditionObject 内部类

AQS 中的 ConditionObject 内部类

时间:2023-02-07 19:13:43浏览次数:55  
标签:node ConditionObject 内部 AQS 队列 Node 线程 interruptMode null

AQS 中的 ConditionObject 内部类

ConditionObject 内部类

主要用作条件等待队列,条件等待队列为单项链表

加入条件队列的过程:等待队列的节点调用 await() 从同步队列移除,追加到条件队列的队尾

从条件队列出队的过程:当其他线程调用 single()/singleAll() 唤醒当前线程后,当前线程从条件队列移除,追加到同步队列队尾

ConditionObject 源码

  public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;

        /**
         * 条件队列的第一个节点
         */
        private transient Node firstWaiter;

        /**
         * 条件队列的最后一个节点
         */
        private transient Node lastWaiter;

        public ConditionObject() { }


        // ==================  内部方法  ==================
        // Internal methods

        /**
         * 向等待队列中添加一个新的等待节点(当前线程)
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 如果最后一个等待节点被取消,则清空队列
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 封装该线程
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        /**
         * 唤醒条件队列中等待的节点(unpark),后续添加到同步等待队尾部
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }

        /**
         * 唤醒所有等待线程(unpark)
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        /**
         * 从条件队列中取消已取消的等待节点,仅在持有锁时调用。
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        // ==================  公共方法  ==================
        // public methods

        /**
         *  唤醒最长条件等待的线程(也就是队列头的线程)
         */
        public final void signal() {
            // 判断是否 独占锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        /**
         * 唤醒所有等待线程
         */
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        /**
         * 导致当前线程等待(park),直到收到信号
         */
        public final void awaitUninterruptibly() {
            // 添加到条件等待队列
            Node node = addConditionWaiter();
            // 完全释放独占锁
            int savedState = fullyRelease(node);

            //标记中断状态
            boolean interrupted = false;

            //判断节点是否在同步队列中
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            //  以独占不中断模式获取已经在队列中的线程
            //  判断中断状态,重新中断线程
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }


        //==================  对于可中断的等待  ==================

        /**
         *  该模式:在退出等待时重新中断
         */
        private static final int REINTERRUPT =  1;

        /**
         * 该模式:在退出等待时抛出 InterruptedException
         */
        private static final int THROW_IE    = -1;

        /**
         * 检查等待时的中断
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }

        /**
         * [根据中断模式,执行具体的操作]
         * 1.THROW_IE 抛出异常
         * 2.REINTERRUPT 重新中断
         * 3.什么也不做
         *
         */
        private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        /**
         * 使当前线程等待(park),直到收到信号或被中断
         * 1.获取锁之前先检查中断状态,如果被中断,则抛出异常,结束运行
         * 2.在等待队列中被中断,唤醒获取锁之后被中断
         */
        public final void await() throws InterruptedException {
            // 如果当前线程被中断,则抛出InterruptedException。
            if (Thread.interrupted())
                throw new InterruptedException();

            //将该线程封装添加到条件等待队列
            Node node = addConditionWaiter();

            // 完全释放独占锁
            int savedState = fullyRelease(node);

            // 中断模式
            int interruptMode = 0;

            //判断是否在同步队列中,如果不在,继续执行以下代码
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 线程被条件队列(transferForSignal 方法)唤醒之后,继续向下运行

                // 检查是否被中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

            // ========== 尝试在同步等待队列中被唤醒 ,执行完毕之后,继续向下执行 ==========

            //  以独占不中断模式获取已经在队列中的线程
            //  判断线程是否需要重新中断
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

            // clean up if cancelled  取消时清理
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();

            //响应中断
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        /**
         * 导致当前线程等待,直到发出信号或中断,或指定的等待时间结束。
         */
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

        /**
         * 使当前线程等待,直到收到信号或中断,或超过指定的截止日期。
         */
        public final boolean awaitUntil(Date deadline)
                throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() > abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        /**
         * 使当前线程等待,直到发出信号或中断,或经过指定的等待时间
         */
        public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }


        // ==================  support for instrumentation  ==================
        //  support for instrumentation

        /**
         * 如果此条件是由给定的同步对象创建的,则返回true。
         */
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        /**
         * 查询是否有线程在此条件下等待。
         */
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    return true;
            }
            return false;
        }

        /**
         * 返回在此条件下等待的线程数
         */
        protected final int getWaitQueueLength() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    ++n;
            }
            return n;
        }

        /**
         * 返回正在等待该条件的线程的集合
         */
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION) {
                    Thread t = w.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    }

标签:node,ConditionObject,内部,AQS,队列,Node,线程,interruptMode,null
From: https://www.cnblogs.com/coolyang/p/17099511.html

相关文章

  • AQS 中的 Node 内部类
    AQS中的Node内部类Node内部类Node的主要作用,作为等待队列中的节点存在Node源码staticfinalclassNode{/***标记:该节点正在共享模式......
  • 《安富莱嵌入式周报》第302期:芯片内部Flash读保护攻击,开源智能手表设计,超棒静电学手册
    往期周报汇总地址:http://www.armbbs.cn/forum.php?mod=forumdisplay&fid=12&filter=typeid&typeid=1042023年的视频专题教程继续开始录制视频版:https://www.bilibili.......
  • 1.1 CPU的内部结构解析
    CPU(中央处理器)相当于计算机的大脑,CPU和内存都是由许多晶体管组成的电子部件,通常称为IC(集成电路)。CPU的内部是有寄存器,运算器,控制器,时钟组成的.CPU所负责的就是解释和运行......
  • 聊聊JUC包下的底层支撑类-AbstractQueuedSynchronizer(AQS)
    聊聊JUC包下的底层支撑类-AbstractQueuedSynchronizer(AQS)juc包下的一堆并发工具类是我们日常开发特别是面试中常被拿来问的八股文之一,为了工作也好,为了面试也罢,今天开始......
  • 厦门海辰储能--内部推荐
    岗位查询:https://wecruit.hotjob.cn/SU6232b13f2f9d244b1b501b6e/mc/position/society?t=1675683452509厦门海辰储能--内部推荐,新能源行业,前景巨大,欢迎社会人才加入。扫描......
  • 'powershell'/'netsh'不是内部或外部命令,也不是可运行的程序或批处理文件
    笔者win10系统;昨天在配置openssh的时候在cmd里遇到了这两个报错。上网查了一下,解决方法分别是:'powershell':在环境变量path后添加powershell的路径(C:\Windows\System......
  • Java多线程并发06—CAS、AQS
    CAS(CompareAndSwap/Set)概念CAS函数,是比较并交换函数,它是原子操作函数。原理CAS是基于乐观锁的原理进行操作的。它总是认为自己可以成功完成操作。当多个线程同时使用CAS......
  • 类的匿名内部类
    1.解释匿名内部类常用在接口的实现上,但是类也可以有匿名内部类.publicclassWrapping{privateinti;publicWrapping(intx){i=x;}publicint......
  • 内部类的 .this 和 .new 关键字
    1.  .this.this关键字可以在内部创建外部类的引用publicclassDotThis{voidf(){System.out.println("DotThis.f()");}publicclassInner{pu......
  • 内部类
    内部类内部类就是在一个类的内部再定义一个类,比如,A类中定义一个B类,那么B类对于A类来说就是内部类,A类对于B类就是外部类;成员内部类静态内部类局部内部类匿名内部类......