首页 > 编程语言 >CAS与AQS源码简析

CAS与AQS源码简析

时间:2023-08-03 19:36:25浏览次数:50  
标签:node Node CAS 节点 int 简析 源码 null final


什么是CAS?



CAS(Compare And Swap),顾名思义就是比较并交换。用于解决多线程使用锁带来的性能损耗的问题,是一种非阻塞算法,其交换原理如下图:



 


CAS与AQS源码简析_等待队列


 



 



CAS用法:



- 数据库中的乐观锁:即表字段+version字段,然后每次更新时就比较当前version版本是否一致,一直才更新并且升级version=version+1。



 



- unsafe的用法:



 



java.util.concurrent.atomic.*



 




什么是AQS?



AQS(AbstractQueuedSynchronizer),顾名思义就是抽象队列同步器。由FIFO(先进先出)的阻塞队列和相关同步器组成。这是在concurrent包(并发处理)下。



 


    AbstractQueuedSynchronizer为锁机制维护了一个队列,需要获取锁的线程们排在队列中,只有排在队首的线程才有资格获取锁。


 



 



首先看张图,取自《Java并发编程的艺术》:



 


CAS与AQS源码简析_queue_02


 



 



然后看如下,AbstractQueuedSynchronizer源码及其分析如下:



/**



* 提供一个阻塞锁和相关依赖FIFO等待队列同步器的实现。



* 这个类支持排他和共享模式。排他模式下当一个已获取到了,其他线程尝试获取不可能成功。共享模式可以被多个线程获取。通常子类实现仅支持其中一种,但是也有两种的支持的如ReadWriteLock。



* 这个类定义了一个实现了Condition的内部类ConditionObject,用于排他模式。    



* 使用一个基础的同步器需要重新定义以下方法:


* <li> {@link #tryAcquire}
 
  
* <li> {@link #tryRelease}
 
  
* <li> {@link #tryAcquireShared}
 
  
* <li> {@link #tryReleaseShared}
 
  
* <li> {@link #isHeldExclusively}
 
  
* 以上的每个方法均默认抛出{UnsupportedOperationException}错误,所以以上的几个方法没有提供默认实现,需要子类重写。
 
  
* 这个类提供了一个有效的、可伸缩的基础给同步器如状态、acquire获取和的同步器释放参数、内部FIFO等待队。当这些不够用时,可使用atomic、Queue、LockSupport。
 
  
*/
 
  
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
 
  
    private static final long serialVersionUID = 7373984972572414691L;
 
  

    
  
 
  
    protected AbstractQueuedSynchronizer() { }
 
  

    
  
 
  
    /**
 
  
     * 等待队列的node class。Node作为等待队列的节点
 
  
     * 这个等待队列是CLH的变体,CLH一般用于自旋锁。使用其代替一般的同步器,但也用了相同的策略来控制。 
 
  
     */
 
  
    static final class Node {
 
  
        /** 共享模式 */
 
  
        static final Node SHARED = new Node();
 
  
        /** 排他模式 */
 
  
        static final Node EXCLUSIVE = null;
 
  
        /** 当前线程被取消 */
 
  
        static final int CANCELLED =  1;
 
  
        /** 当前节点的后继节点包含的线程需要运行*/
 
  
        static final int SIGNAL    = -1;
 
  
        /** 当前结点在condition队列中 */
 
  
        static final int CONDITION = -2;
 
  
       当前场景下后续的acquireShared能够得以执行  */
 
  
        static final int PROPAGATE = -3;
 
  

    
  
 
  
        /** 当前节点的状态。*/
 
  
        volatile int waitStatus;
 
  
        /** 前驱结点 */
 
  
        volatile Node prev;
 
  
        /** 后继节点 */
 
  
        volatile Node next;
 
  
        /** 入队线程 */
 
  
        volatile Thread thread;
 
  
        /** 存储condition队列中的后继节点 */
 
  
        Node nextWaiter;
 
  

    
  
 
  
        final boolean isShared() { return nextWaiter == SHARED;}
 
  
        /**
 
  
         * 返回前驱节点
 
  
         */
 
  
        final Node predecessor() throws NullPointerException {
 
  
            Node p = prev;
 
  
            if (p == null)
 
  
                throw new NullPointerException();
 
  
            else
 
  
                return p;
 
  
        }
 
  
        ..............................
 
  
    }
 
  
    /**
 
  
     * 仅用于初始化等待队列的head。只能通过setHead修改,当这个head还存在时不能将waitStatus=>cancelled
 
  
     */
 
  
    private transient volatile Node head;
 
  
    /**Tail节点初始化,仅能通过enq追加新的wait node*/
 
  
    private transient volatile Node tail;
 
  
synchronization state. */
 
  
    private volatile int state;
 
  

    
  
 
  
    /** CAS原子性的修改 synchronization state ,拉到代码最下面可见其值的设置*/
 
  
    protected final boolean compareAndSetState(int expect, int update) {
 
  
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 
  
    }
 
  
    static final long spinForTimeoutThreshold = 1000L;
 
  

    
  
 
  
    /**
 
  
     * 为队列追加node节点
 
  
     */
 
  
    private Node enq(final Node node) {
 
  
        for (;;) {//一直循环入队,直到成功
 
  
            Node t = tail;
 
  
            if (t == null) { //同样获取尾节点,并且如果为空就将尾节点初始化为头结点head一样
 
  
                if (compareAndSetHead(new Node()))
 
  
                    tail = head;
 
  
            } else { //尾节点不为空就执行addWaiter一样的过程把新的node加到最后
 
  
                node.prev = t;
 
  
                if (compareAndSetTail(t, node)) {
 
  
                    t.next = node;
 
  
                    return t;
 
  
                }
 
  
            }
 
  
        }
 
  
    }
 
  

    
  
 
  
    /**
 
  
新建Node并入队
 
  
     */
 
  
    private Node addWaiter(Node mode) {
 
  
        //新建一个Node
 
  
        Node node = new Node(Thread.currentThread(), mode);
 
  
        // 存储当前尾节点(当作旧的尾节点)
 
  
        Node pred = tail;
 
  
        if (pred != null) {  //如果当前尾节点不为空
 
  
            node.prev = pred;  //将新建的节点的前驱节点执行旧的为节点
 
  
            if (compareAndSetTail(pred, node)) {//CAS原子替换当前尾节点从旧的替换到新建node的位置
 
  
                pred.next = node;//将旧的尾节点位置的后置节点执行新建的节点
 
  
                return node;
 
  
            }
 
  
        }
 
  
        //如果上面入队失败则调用enq方法入队
 
  
        enq(node);
 
  
        return node;
 
  
    }
 
  
    private void setHead(Node node) {
 
  
        head = node; //将头结点指向node
 
  
        node.thread = null;  //线程置空
 
  
        node.prev = null;//因为是头节点了,不用需要前驱结点
 
  
    }
 
  

    
  
 
  
    /**
 
  
     * 唤醒后续节点
 
  
     */
 
  
    private void unparkSuccessor(Node node) {
 
  
        int ws = node.waitStatus;
 
  
        if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
 
  
        Node s = node.next;
 
  
        //如果后置节点是尾节点或Cancelled状态
 
  
        if (s == null || s.waitStatus > 0) {
 
  
            s = null;  //将当前后置节点置为null
 
  
            for (Node t = tail; t != null && t != node; t = t.prev)
 
  
                if (t.waitStatus <= 0)
 
  
                    s = t;
 
  
        }
 
  
        if (s != null)
 
  
            LockSupport.unpark(s.thread);
 
  
    }
 
  

    
  
 
  
    /**
 
  
     * 共享模式下
 
  
     */
 
  
    private void doReleaseShared() {
 
  
        for (;;) {
 
  
            Node h = head;
 
  
            if (h != null && h != tail) {
 
  
                int ws = h.waitStatus;
 
  
                if (ws == Node.SIGNAL) {
 
  
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 
  
                        continue;            // loop to recheck cases
 
  
                    unparkSuccessor(h);
 
  
                }
 
  
                else if (ws == 0 &&
 
  
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 
  
                    continue;                // loop on failed CAS
 
  
            }
 
  
            if (h == head)                   // loop if head changed
 
  
                break;
 
  
        }
 
  
    }
 
  

    
  
 
  
    private void setHeadAndPropagate(Node node, int propagate) {
 
  
        Node h = head; // Record old head for check below
 
  
        setHead(node);
 
  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
 
  
            (h = head) == null || h.waitStatus < 0) {
 
  
            Node s = node.next;
 
  
            if (s == null || s.isShared())
 
  
                doReleaseShared();
 
  
        }
 
  
    }
 
  

    
  
 
  
    /**
 
  
     * 取消正在尝试获取锁的节点
 
  
     */
 
  
    private void cancelAcquire(Node node) {
 
  
        if (node == null) return;
 
  
//cancel一个节点时会将当前节点thread置为null
 
  
        node.thread = null;
 
  
        // 循环跳过已设置了cancelled状态的节点
 
  
        Node pred = node.prev;
 
  
        while (pred.waitStatus > 0) node.prev = pred = pred.prev;
 
  
        //存储上面得到的节点前驱节点
 
  
        Node predNext = pred.next;
 
  
        //将当前要cancel的节点状态设置CANCELLED
 
  
        node.waitStatus = Node.CANCELLED;
 
  

    
  
 
  
        //1 如果当前节点node是尾节点。更新尾节点为pred.next指向null,相当于删除了node(和pred到node间为cancel的节点)
 
  
        if (node == tail && compareAndSetTail(node, pred)) {
 
  
            compareAndSetNext(pred, predNext, null);
 
  
        } else {
 
  
            int ws;
 
  
            //2 当前既不是尾节点,也不是head后继节点。设置node的前驱节点waitStatus为SIGNAL,node前驱节点指向后继节点,相当于删除node
 
  
            if (pred != head &&
 
  
                ((ws = pred.waitStatus) == Node.SIGNAL ||
 
  
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
 
  
                pred.thread != null) {
 
  
                Node next = node.next;
 
  
                if (next != null && next.waitStatus <= 0)
 
  
                    compareAndSetNext(pred, predNext, next);
 
  
            } else {
 
  
                //3 如果node是head的后继节点。则直接唤醒node的后继节点。在head后面的节点有资格尝试获取锁,但是当前node放齐了当前资格,所以会唤醒其后续的节点
 
  
                unparkSuccessor(node);
 
  
            }
 
  

    
  
 
  
            node.next = node; // help GC
 
  
        }
 
  
    }
 
  

    
  
 
  
    /**
 
  
     * 判断当前节点是否挂起
 
  
     */
 
  
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 
  
        int ws = pred.waitStatus;
 
  
        if (ws == Node.SIGNAL)  //当前状态下挂起
 
  
            return true;
 
  
        if (ws > 0) {
 
  
//跳过已被设置了cancelled的前驱节点
 
  
                node.prev = pred = pred.prev;
 
  
            } while (pred.waitStatus > 0);
 
  
            pred.next = node;
 
  
        } else {
 
  
            /** 将上级的等待状态设为SIGNAL */
 
  
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 
  
        }
 
  
        return false;
 
  
    }
 
  

    
  
 
  
    static void selfInterrupt() {
 
  
        Thread.currentThread().interrupt();
 
  
    }
 
  

    
  
 
  
    private final boolean parkAndCheckInterrupt() {
 
  
        LockSupport.park(this);
 
  
        return Thread.interrupted();
 
  
    }
 
  

    
  
 
  
    /**
 
  
     * 尝试获取锁
 
  
     */
 
  
    final boolean acquireQueued(final Node node, int arg) {
 
  
        boolean failed = true;
 
  
        try {
 
  
            boolean interrupted = false;
 
  
            for (;;) {  //进入循环后会不断的尝试获取
 
  
                final Node p = node.predecessor();//获取当前节点的头结点!!只有head头结点才持有锁!!
 
  
                //如果当前的前驱节点是头结点则尝试获取锁。
 
  
                //如果尝试成功则将当前node设为头结点,并将旧的head设为null便于回收
 
  
                //获取失败看是否需要挂起,如果需要挂起则挂起线程等待下一次被唤醒时继续尝试获取锁。
 
  
                if (p == head && tryAcquire(arg)) {
 
  
                    setHead(node);
 
  
                    p.next = null; // 帮助GC
 
  
                    failed = false;
 
  
                    return interrupted;
 
  
                }
 
  
                //判断是否挂起(根据Node的状态=-3就会挂起),然后调用刮起的方法(里面调了Thread.interrupted();)
 
  
                if (shouldParkAfterFailedAcquire(p, node) &&
 
  
                    parkAndCheckInterrupt())  
 
  
                    interrupted = true;
 
  
            }
 
  
        } finally {
 
  
            if (failed)
 
  
                cancelAcquire(node);
 
  
        }
 
  
    }
 
  
    。。。。。。。。。。。。。。。
 
  
    // Main exported methods
 
  

    
  
 
  
子类实现的尝试获取锁的方法
 
  
    protected boolean tryAcquire(int arg) {
 
  
        throw new UnsupportedOperationException();
 
  
    }
 
  

    
  
 
  
    /** 子类实现尝试释放锁的方法 */
 
  
    protected boolean tryRelease(int arg) {
 
  
        throw new UnsupportedOperationException();
 
  
    }
 
  

    
  
 
  
    /** 子类实现尝试获取共享锁的方法  */
 
  
    protected int tryAcquireShared(int arg) {
 
  
        throw new UnsupportedOperationException();
 
  
    }
 
  

    
  
 
  
/** 子类实现尝试释放共享锁的方法  */
 
  
    protected boolean tryReleaseShared(int arg) {
 
  
        throw new UnsupportedOperationException();
 
  
    }
 
  

    
  
 
  
 /** 子类实现排他模式下状态是否占用  */
 
  
    protected boolean isHeldExclusively() {
 
  
        throw new UnsupportedOperationException();
 
  
    }
 
  

    
  
 
  
    /**
 
  
     * 排他模式,获取互斥锁
 
  
     */
 
  
    public final void acquire(int arg) {
 
  
        //尝试获取锁(tryAcquire在此类中是抛异常的,应在子类实现),
 
  
acquireQueued再次尝试获取锁,addWaiter适用于新建一个新node
 
  
        if (!tryAcquire(arg) &&
 
  
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 
  
            selfInterrupt();
 
  
    }
 
  

    
  
 
  
    public final void acquireInterruptibly(int arg)
 
  
             throws InterruptedException {}
 
  
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
 
  
            throws InterruptedException {}
 
  

    
  
 
  
    public final boolean release(int arg) {
 
  
        if (tryRelease(arg)) {//尝试释放锁成功
 
  
            Node h = head;//获取当前被释放了锁的head头节点
 
  
//如果头节点不为空且当前节点状态正常就唤醒当前节点的后续节点
 
  
            if (h != null && h.waitStatus != 0)
 
  
                unparkSuccessor(h);
 
  
            return true;
 
  
        }
 
  
        return false;
 
  
    }
 
  

    
  
 
  
    public final void acquireShared(int arg) {
 
  
        if (tryAcquireShared(arg) < 0)
 
  
            doAcquireShared(arg);
 
  
    }


 



   

public final void acquireSharedInterruptibly(int arg)
 
  
            throws InterruptedException {... }
 
  

    
  
 
  
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
 
  
            throws InterruptedException {........ }
 
  

    
  
 
  
    public final boolean releaseShared(int arg) {
 
  
        if (tryReleaseShared(arg)) {
 
  
            doReleaseShared();
 
  
            return true;
 
  
        }
 
  
        return false;
 
  
    }

 



    // 以下是队列检查方法

public final boolean hasQueuedThreads() {
 
  
        return head != tail;//队列是否存在
 
  
    }
 
  
    public final boolean hasContended() {
 
  
        return head != null;  //头结点是否为空
 
  
    }
 
  
    public final Thread getFirstQueuedThread() {
 
  
        return (head == tail) ? null : fullGetFirstQueuedThread();
 
  
    }
 
  
。。。。。。。。。。
 
  
    //工具和监控方法
 
  
    public final int getQueueLength() {
 
  
        int n = 0;//拿到队列长度
 
  
        for (Node p = tail; p != null; p = p.prev) {
 
  
            if (p.thread != null)
 
  
                ++n;
 
  
        }
 
  
        return n;
 
  
    }
 
  
//获取当前node queue的所有线程
 
  
    public final Collection<Thread> getQueuedThreads() {
 
  
        ArrayList<Thread> list = new ArrayList<Thread>();
 
  
        for (Node p = tail; p != null; p = p.prev) {
 
  
            Thread t = p.thread;
 
  
            if (t != null)
 
  
                list.add(t);
 
  
        }
 
  
        return list;
 
  
    }
 
  
......的工具类..............................
 
  

    
  
 
  
    /**



     * condition queue, 单向队列。线程拿到锁,但条件不足时,会放到这个队列等待被唤醒

*/
 
  
    public class ConditionObject implements Condition, java.io.Serializable {
 
  
        private static final long serialVersionUID = 1173984872572414699L;
 
  
        private transient Node firstWaiter; //头结点
 
  
        private transient Node lastWaiter;//尾节点
 
  

    
  
 
  
        public ConditionObject() { }
 
  

    
  
 
  
        private Node addConditionWaiter() {
 
  
            Node t = lastWaiter;
 
  
            // If lastWaiter is cancelled, clean out.
 
  
            if (t != null && t.waitStatus != Node.CONDITION) {
 
  
                unlinkCancelledWaiters();
 
  
                t = lastWaiter;
 
  
            }
 
  
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
 
  
            if (t == null)
 
  
                firstWaiter = node;
 
  
            else
 
  
                t.nextWaiter = node;
 
  
            lastWaiter = node;
 
  
            return node;
 
  
        }
 
  

    
  
 
  
        private void doSignal(Node first) {
 
  
            do {
 
  
                if ( (firstWaiter = first.nextWaiter) == null)
 
  
                    lastWaiter = null;
 
  
                first.nextWaiter = null;
 
  
            } while (!transferForSignal(first) &&
 
  
                     (first = firstWaiter) != null);
 
  
        }
 
  

    
  
 
  
        private void doSignalAll(Node first) {
 
  
            lastWaiter = firstWaiter = null;
 
  
            do {
 
  
                Node next = first.nextWaiter;
 
  
                first.nextWaiter = null;
 
  
                transferForSignal(first);
 
  
                first = next;
 
  
            } while (first != null);
 
  
        }
 
  

    
  
 
  
        private void unlinkCancelledWaiters() {
 
  
            Node t = firstWaiter;
 
  
            Node trail = null;
 
  
            while (t != null) {
 
  
                Node next = t.nextWaiter;
 
  
                if (t.waitStatus != Node.CONDITION) {
 
  
                    t.nextWaiter = null;
 
  
                    if (trail == null)
 
  
                        firstWaiter = next;
 
  
                    else
 
  
                        trail.nextWaiter = next;
 
  
                    if (next == null)
 
  
                        lastWaiter = trail;
 
  
                }
 
  
                else
 
  
                    trail = t;
 
  
                t = next;
 
  
            }
 
  
        }
 
  

    
  
 
  
        public final void signal() {
 
  
            if (!isHeldExclusively())
 
  
                throw new IllegalMonitorStateException();
 
  
            Node first = firstWaiter;
 
  
            if (first != null)
 
  
                doSignal(first);
 
  
        }
 
  

    
  
 
  
        public final void signalAll() {
 
  
            if (!isHeldExclusively())
 
  
                throw new IllegalMonitorStateException();
 
  
            Node first = firstWaiter;
 
  
            if (first != null)
 
  
                doSignalAll(first);
 
  
        }
 
  

    
  
 
  
        public final void awaitUninterruptibly() {
 
  
            Node node = addConditionWaiter();
 
  
            int savedState = fullyRelease(node);
 
  
            boolean interrupted = false;
 
  
            while (!isOnSyncQueue(node)) {
 
  
                LockSupport.park(this);
 
  
                if (Thread.interrupted())
 
  
                    interrupted = true;
 
  
            }
 
  
            if (acquireQueued(node, savedState) || interrupted)
 
  
                selfInterrupt();
 
  
        }
 
  

    
  
 
  
        private static final int REINTERRUPT =  1;
 
  
        private static final int THROW_IE    = -1;
 
  

    
  
 
  
        private int checkInterruptWhileWaiting(Node node) {
 
  
            return Thread.interrupted() ?
 
  
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
 
  
                0;
 
  
        }
 
  

    
  
 
  
        private void reportInterruptAfterWait(int interruptMode)
 
  
            throws InterruptedException {
 
  
            if (interruptMode == THROW_IE)
 
  
                throw new InterruptedException();
 
  
            else if (interruptMode == REINTERRUPT)
 
  
                selfInterrupt();
 
  
        }
 
  

    
  
 
  
        public final void await() throws InterruptedException {
 
  
            if (Thread.interrupted())
 
  
                throw new InterruptedException();
 
  
            Node node = addConditionWaiter();
 
  
            int savedState = fullyRelease(node);
 
  
            int interruptMode = 0;
 
  
            while (!isOnSyncQueue(node)) {
 
  
                LockSupport.park(this);
 
  
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 
  
                    break;
 
  
            }
 
  
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 
  
                interruptMode = REINTERRUPT;
 
  
            if (node.nextWaiter != null) // clean up if cancelled
 
  
                unlinkCancelledWaiters();
 
  
            if (interruptMode != 0)
 
  
                reportInterruptAfterWait(interruptMode);
 
  
        }
 
  

    
  
 
  
        public final long awaitNanos(long nanosTimeout)
 
  
                throws InterruptedException {
 
  
            if (Thread.interrupted())
 
  
                throw new InterruptedException();
 
  
            Node node = addConditionWaiter();
 
  
            int savedState = fullyRelease(node);
 
  
            final long deadline = System.nanoTime() + nanosTimeout;
 
  
            int interruptMode = 0;
 
  
            while (!isOnSyncQueue(node)) {
 
  
                if (nanosTimeout <= 0L) {
 
  
                    transferAfterCancelledWait(node);
 
  
                    break;
 
  
                }
 
  
                if (nanosTimeout >= spinForTimeoutThreshold)
 
  
                    LockSupport.parkNanos(this, nanosTimeout);
 
  
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 
  
                    break;
 
  
                nanosTimeout = deadline - System.nanoTime();
 
  
            }
 
  
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 
  
                interruptMode = REINTERRUPT;
 
  
            if (node.nextWaiter != null)
 
  
                unlinkCancelledWaiters();
 
  
            if (interruptMode != 0)
 
  
                reportInterruptAfterWait(interruptMode);
 
  
            return deadline - System.nanoTime();
 
  
        }
 
  

    
  
 
  
        public final boolean awaitUntil(Date deadline)
 
  
                throws InterruptedException {
 
  
            long abstime = deadline.getTime();
 
  
            if (Thread.interrupted())
 
  
                throw new InterruptedException();
 
  
            Node node = addConditionWaiter();
 
  
            int savedState = fullyRelease(node);
 
  
            boolean timedout = false;
 
  
            int interruptMode = 0;
 
  
            while (!isOnSyncQueue(node)) {
 
  
                if (System.currentTimeMillis() > abstime) {
 
  
                    timedout = transferAfterCancelledWait(node);
 
  
                    break;
 
  
                }
 
  
                LockSupport.parkUntil(this, abstime);
 
  
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 
  
                    break;
 
  
            }
 
  
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 
  
                interruptMode = REINTERRUPT;
 
  
            if (node.nextWaiter != null)
 
  
                unlinkCancelledWaiters();
 
  
            if (interruptMode != 0)
 
  
                reportInterruptAfterWait(interruptMode);
 
  
            return !timedout;
 
  
        }
 
  

    
  
 
  
        public final boolean await(long time, TimeUnit unit)
 
  
                throws InterruptedException {
 
  
            long nanosTimeout = unit.toNanos(time);
 
  
            if (Thread.interrupted())
 
  
                throw new InterruptedException();
 
  
            Node node = addConditionWaiter();
 
  
            int savedState = fullyRelease(node);
 
  
            final long deadline = System.nanoTime() + nanosTimeout;
 
  
            boolean timedout = false;
 
  
            int interruptMode = 0;
 
  
            while (!isOnSyncQueue(node)) {
 
  
                if (nanosTimeout <= 0L) {
 
  
                    timedout = transferAfterCancelledWait(node);
 
  
                    break;
 
  
                }
 
  
                if (nanosTimeout >= spinForTimeoutThreshold)
 
  
                    LockSupport.parkNanos(this, nanosTimeout);
 
  
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 
  
                    break;
 
  
                nanosTimeout = deadline - System.nanoTime();
 
  
            }
 
  
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 
  
                interruptMode = REINTERRUPT;
 
  
            if (node.nextWaiter != null)
 
  
                unlinkCancelledWaiters();
 
  
            if (interruptMode != 0)
 
  
                reportInterruptAfterWait(interruptMode);
 
  
            return !timedout;
 
  
        }
 
  

       。。。。。。。。。。之类的工具类。。。。。。。。
  
 
  
    }
 
  

    
  
 
  
    private static final Unsafe unsafe = Unsafe.getUnsafe();
 
  
    private static final long stateOffset;
 
  
    private static final long headOffset;
 
  
    private static final long tailOffset;
 
  
    private static final long waitStatusOffset;
 
  
    private static final long nextOffset;
 
  

    
  
 
  
    static {
 
  
        try {
 
  
            stateOffset = unsafe.objectFieldOffset
 
  
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
 
  
            headOffset = unsafe.objectFieldOffset
 
  
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
 
  
            tailOffset = unsafe.objectFieldOffset
 
  
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
 
  
            waitStatusOffset = unsafe.objectFieldOffset
 
  
                (Node.class.getDeclaredField("waitStatus"));
 
  
            nextOffset = unsafe.objectFieldOffset
 
  
                (Node.class.getDeclaredField("next"));
 
  

    
  
 
  
        } catch (Exception ex) { throw new Error(ex); }
 
  
    }
 
  

    
  
 
  
    private final boolean compareAndSetHead(Node update) {
 
  
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
 
  
    }



    。。。。。一堆CAS方法。。。。。。。。


}



 



 



 



参考:



http://ifeve.com/introduce-abstractqueuedsynchronizer/



以下四篇:



https://www.jianshu.com/p/9ebca222513b



https://www.jianshu.com/p/c806dd7f60bc



https://www.jianshu.com/p/01f2046aab64



https://www.jianshu.com/p/134f494d76d8


 



 


 

标签:node,Node,CAS,节点,int,简析,源码,null,final
From: https://blog.51cto.com/u_13854513/6952283

相关文章

  • 软件第三方测评机构简析,软件检测证明材料的作用和意义
    软件检测证明材料是由信息产业部授权或省级软件产业主管部门认可的软件第三方测评机构出具的检测证明材料,对于用户来说非常重要,是从软件的开发过程、测试结果和用户评价等方面来判断软件的质量和安全性,它可以帮助用户判断软件的质量和安全性,提供选择和使用软件的依据。软件......
  • RTSP流媒体服务器LntonNVR(源码版)平台前端打包出现“UglifyJsPlugin”报错的问题解决
    LntonNVR既有软件版也有硬件版,平台基于RTSP/Onvif协议将前端设备接入,可实现的视频能力有视频监控直播、录像、视频转码分发、检索与回放、云存储、智能告警、国标级联等。平台可将接入的视频流进行转码分发,对外输出的视频流格式包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等。......
  • Spark Core源码分析: RDD基础
    RDD RDD初始参数:上下文和一组依赖1.abstractclass2.@transientprivate3.@transientprivate4.extends 以下需要仔细理清:AlistofPartitionsFunctiontocomputesplit(subRDDimpl)AlistofDependenciesPartitionerforK-VRDDs(Optional)Preferredl......
  • 国标GB28181平台LntonGBS(源码版)国标视频平台在连接MySQL数据库时提示“can’t connect
    LntonGBS国标视频云服务平台不仅支持无缝、完整接入内网或者公网的国标设备,还能够实现全平台、全终端输出。该平台支持将GB/T28181的设备/平台推送的PS流转成ES流,并提供RTSP、RTMP、FLV、HLS、WebRTC等多种格式视频流的分发服务,实现Web浏览器、手机浏览器、微信端、PC客户端等各终......
  • 国标GB28181视频平台LntonGBS(源码版)国标平台在登录页添加登录验证的具体操作步骤
    国标视频云服务LntonGBS支持设备/平台通过国标GB28181协议注册接入,实现视频的实时监控直播、录像、检索与回看、语音对讲、云存储、告警、平台级联等功能。该平台部署简单,可拓展性强,能够将接入的视频流进行全终端、全平台分发,支持的视频流格式包括RTSP、RTMP、FLV、HLS、WebRTC等。......
  • 海外直播APP源码的开发给商家们带来了什么
    目前,在海外市场中,TikTok和Facebook无疑是最受欢迎的社交网络直播平台,尤其是欧美社交APP,虽然直播内容更注重娱乐性,但其他地区例如东南亚,海外社交直播渗透率还较低,那么开发海外直播APP源码给商家们带来了哪些好处利益呢?1.海外直播APP源码开发本身具有较大的优势可以提高海外用户的参......
  • 实验室LIMS系统源码
    实验室LIMS系统采用国际规范的业务管理流程和严格的质量控制体系,对每个检测流程节点采用“人、机、料、法、环、测”进行质量控制,可记录,可追溯。强大的数据查询和统计分析能力,提高工作效率;自动化地采集实验室原始数据及处理结果,可减轻繁重的手工抄录数据和减少人为干扰检测结果,使......
  • StoneDB 源码解读系列|Tianmu 引擎工具类模块源码详解(一)
    StoneDB源码解读系列文章正式开启,预计以周更的形式跟大家见面,请多多支持~本篇源码解读内容已进行直播分享,可在视频号观看直播回放,也可点击阅读原文跳转至B站观看回放视频。PPT内容可在社区论坛中查看下载:https://forum.stonedb.io/t/topic/89各个工具类属于Tianmu引擎的......
  • Android手部检测和手势识别(含训练代码+Android源码+手势识别数据集)
    Android手部检测和手势识别(含训练代码+Android源码+手势识别数据集)目录Android实时手势动作识别(含训练代码++手势识别数据集)1.前言2.手势识别的方法(1)基于多目标检测的手势识别方法(2)基于手部检测+手势分类识别方法3.手势识别数据集说明(1)HaGRID手势识别数据集(2)自定义数据集4.基于......
  • github代码外泄监控——可用来提供源码泄露检测服务,数据泄露场景,原理就是在github搜索
     Hawkeye监控github代码库,及时发现员工托管公司代码到GitHub行为并预警,降低代码泄露风险。特点优点邮箱告警通知黑名单添加爬虫任务设置缺点spider通过关键词在github进行模糊搜索,搜索结果会比较杂依赖Python3.x(Hawkeye支持Python3.xonLinuxandmacOS;2.x兼容性需自行修改测试......