首页 > 其他分享 >高并发(AQS)

高并发(AQS)

时间:2024-04-18 15:55:21浏览次数:35  
标签:node 结点 AQS 队列 next 并发 线程 唤醒

AQS 抽象的队列同步器框架,主要通过程序来构建锁和同步器

AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器,它和Java的Synchronized作用和一样,用来同步加锁;

特性对比

ReentrantLock Synchronized
锁实现 依赖AQS 监视器Monitor模式
灵活性 支持响应中断、超时、尝试获取锁 不灵活
释放形式 必须显示的调用unlock()释放锁 自动释放监视器
锁类型 公平锁&非公平锁 非公平锁
条件队列 可关联多个队列 关联一个条件队列
可重入性 可重入 可重入

可以看出,AQS比系统锁更加灵活管理。性能上synchronized 在1.6版本后也引入了锁升级的概念,整体差异不大

为什么需要 AQS

性能是否可以成为“重复造轮子”的理由呢?Java1.5 中 synchronized 性能不如 AQS,但 1.6 之后,synchronized 做了很多优化,将性能追了上来。显然性能不能重复造轮子的理由,因为性能问题优化一下就可以了,完全没必要“重复造轮子”。

在前面在介绍死锁问题的时候,我们知道可以通过破坏死锁产生的条件从而避免死锁,但这个方案 synchronized 没有办法解决。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态,也释放不了线程已经占有的资源。我们需要新的方案解决这问题。

如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?AQS 提供了以下方案:

  1. 能够响应中断。synchronized 一旦进入阻塞状态,就无法被中断。但如果阻塞状态的线程能够响应中断信号,能够被唤醒。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

这三种方案可以全面弥补 synchronized 的问题。这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法。详情如下:

// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

AQS 必备知识

实现原理:管程

推荐文章:锁原理 - 信号量 vs 管程:JDK 为什么选择管程?

并发编程的两大核心问题:一是互斥,即同一时刻只允许一个线程访问共享资源;二是同步,即线程之间的 "wait-notify" 机制。管程能够解决这两大问题。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

AQS 可见性问题:volatile

你可能认为,锁的可见性不是显而易见的问题吗?还真没这么简单,这涉及到 JMM 提供的 happens-before 原则。

  1. 顺序性规则:单线程中,每个操作 happens-before 于任意后续操作。
  2. synchronized 规则:对一个锁的解锁,happens-before 于随后对这个锁的加锁。
  3. volatile 规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。
  4. 传递性规则:如果A happens-before B,且B happens-before C,那么A happens-before C。

synchronized 之所以能够保证有序性,是因为满足了 synchronized 规则,那 AQS 又满足了那条规则呢?如果我们仔细分析一下 JUC 并发工具类,可以发现一个通用化的实现模式:

  1. 声明共享变量为 volatile。
  2. 使用 CAS 的原子条件更新来实现线程之间的同步。
  3. 配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。

说明: AQS 就是根据上述模式实现的,在 lock.lock() 获取锁时,会读取 volatile 变量,同时 lock.unlock() 释放锁时,会修改 volatile 变量。这样满足了 volatile 规则,所以 AQS 能够保证可见性。

volatile 最开始只能保证可见性,禁止指定重排这个语义是在 JSR-133 加强的。禁止指定重排指,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入。这样,volatile 读其实和 lock.lock() 具有相同的语义,volatile 写具有 lock.unlock() 语义。

CLH 队列锁

CLH 队列锁是一种利用 CAS 实现的无锁队列。

  1. 为什么选择链表。二叉树相对链表的操作要复杂很多,需要左旋右旋来保持树的平衡,也是说二叉树需要锁住很多结点才行。但链表非常简单,通常只需要操作一个结点即可。
  2. 插入:和普通插入一样,一次 CAS 即可。
  3. 删除:和普通删除不一样,node 结点删除时,如果直接设置 pre.next = next,可能有结点正在插入到 node.next,这样会造成数据不安全。既然一次 CAS 不行,那就两次 CAS:第一次是逻辑删除,先标记 node 已经删除;第二次是物理删除,真正从链表上删除结点。

AQS 中的同步队列在 CLH 的基础上做了改进。CLH 是单身链表,但 AQS 使用双向链表,但要注意的是目前并不存在双向链表的原子性算法,AQS 也保证 node.prev 域的原子性,并不能保证 node.next 域的原子性。如果通过 tail 反向遍历可以查找到所有的结点,但从 head 正向遍历则不一定了,node.next 域只是起辅助作用。

AQS 源码分析 - Lock

AQS 包含 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

img

锁状态

AQS 使用共享变量 state 来管理锁的状态,很显然 state 必须使用 volatile 修辞,AQS 提供的如下三个方法来访问或修改同步状态:

  • getState():获取当前同步状态。
  • setState(int newState):设置当前同步状态。
  • compareAndSetState(int expect, int update):使用 CAS 设置当前状态,该方法能够保证状态
    设置的原子性。

思考1:state 为什么要提供 setState 和 compareAndSetState 两种修改状态的方法?

这个问题,关键是修改状态时是否存在数据竞争,如果有则必须使用 compareAndSetState。

  • lock.lock() 获取锁时会发生数据竞争,必须使用 CAS 来保障线程安全,也就是 compareAndSetState 方法。
  • lock.unlock() 释放锁时,线程已经获取到锁,没有数据竞争,也就可以直接使用 setState 修改锁的状态。

同步队列

同步队列(syncQueue)结构

img

说明: 首先,需要注意的是:如果没有锁竞争,线程可以直接获取到锁,就不会进入同步队列。也就说,没有锁竞争时,同步队列(syncQueue)是空的,当存在锁竞争时,线程会进入到同步队列中。一旦进入到同步队列中,就会有线程切换。

同步队列特点:

  • 同步队列头结点是哨兵结点,表示获取锁对应的线程结点。
  • 当获取锁时,其前驱结点必定为头结点。获取锁后,需要将头结点指向当前线程对应的结点。
  • 当释放锁时,需要通过 unparkSuccessor 方法唤醒头结点的后继结点。

标准的 CHL 无锁队列是单向链表,同步队列(syncQueue) 在 CHL 基础上做了改进:

  1. 同步队列是双向链表。事实上,和二叉树一样,双向链表目前也没有无锁算法的实现。双向链表需要同时设置前驱和后继结点,这两次操作只能保证一个是原子性的。
  2. node.pre 一定可以遍历所有结点,是线程安全的,而后继结点 node.next 则是线程不安全的。也就是说,node.pre 一定可以遍历整个链表,而 node.next 则不一定。至于为什么选择前驱结点而不是后继结点,会在 "第五部分 - AQS 无锁队列" 中进一步分析。

线程状态

volatile int waitStatus; // 结点状态
volatile Node prev;      // 同步队列:互斥等待队列 Lock
volatile Node next;      // 同步队列
volatile Thread thread;  // 阻塞的线程
Node nextWaiter;         // 等待队列:条件等待 Condition

Node 结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量 waitStatus 则表示当前 Node 结点的等待状态,共有 5 种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、INITIAL。

  • CANCELLED(1):表示当前结点已取消调度。因为超时或者中断,结点会被设置为取消状态,进入该状态后的结点将不会再变化。注意,只有 CANCELLED 是正值,因此正值表示结点已被取消,而负值表示有效等待状态。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • INITIAL(0):新结点入队时的默认状态。

Node 结点的主要状态变化过程

img

说明: 大致可以分为两种场景,Lock 和 Condition 两种。

  1. Lock 对应的状态变化:
    • lock.lock() 获取锁时:
      • 如果线程能获取到锁,就不会进行等待队列,进而也就不会有之后的各种状态变化。
      • 如果不能获取到锁,此时就会进行同步队列(syncQueue),进行同步队列后会前其驱结点的状态改为 SIGNAL。注意,是修改前驱结点的状态为 SIGNAL,表示需要唤醒后继结点。
      • 当然,在其前驱结点的状态改为 SIGNAL 前,线程可能就被中断、超时、唤醒。此时,会直接修改当前结点的状态为 CANCELLED。
    • lock.unlock() 释放锁时:
      • 释放锁时,需要通过 unparkSuccessor 方法唤醒后继结点。唤醒后继结点后,会将 head 指针移动到该后继结点,也就删除头结点。
  2. Condition 对应的状态变化:
    • condition.await 进行等待队列时:首先,线程会释放锁并唤醒后继结点。然后,将当前线程进入到等待队列(watiQueue)中,同时结点的状态变成 CONDITION。
    • condition.signal 唤醒等待线程:首先,将结点的状态修改为 INITIAL,如果失败则说明结点已经取消,不需要处理,继续轮询下一个结点。然后,将该结点的前驱节点状态修改为 SIGNAL,否则直接唤醒该线程。

AQS 支持阻塞、响应中断、锁超时、非阻塞获取锁四种场景,我们就以最常用的阻塞方式获取锁为例。

与获取锁有关的方法如下:

  • acquire:lock.lock() 方法用于获取锁。
  • tryAcquire:具体获取锁的策略,由子类实现。
  • addWaiter:通过 enq 方法添加到同步队列中。需要注意的是 addWaiter 方法会尝试一次添加到同步队列中,如果不成功,再调用 enq 自旋添加到同步队列中。
  • acquireQueued:线程进入同步队列后,会将该线程挂起,直到有甚至线程唤醒该线程。
  • shouldParkAfterFailedAcquire:将前驱结点的状态修改成 SIGNAL,同时会清理已经 CANCELLED 的结点。注意,只有前驱结点的状态为 SIGNAL,当它释放锁时才会唤醒后继结点。
  • parkAndCheckInterrupt:挂起线程,并判断线程在自旋过程中,是否被中断过。

与释放锁有关的方法如下:

  • release:lock.unlock() 方法用于释放锁。
  • tryRelease:具体释放锁的策略,由子类实现。
  • unparkSuccessor:唤醒后继结点。

acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

说明: tryAcquire 尝试获取锁,如果成功,就不用进入同步队列。否则,就需要通过 acquireQueued 进入等待队列。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {   // 自旋修改前驱结点的状态为SIGNAL,然后挂起线程,直到被唤醒和抢占锁成功
            final Node p = node.predecessor();     // p表示前驱结点
            if (p == head && tryAcquire(arg)) {    // 1. 抢占锁成功,唤醒该线程
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && // 2. 自旋修改前驱结点的状态为SIGNAL
                parkAndCheckInterrupt())                 // 3. 挂起当前线程,并判断是否被中断
                interrupted = true;
        }
    } finally {
        if (failed)   // 异常则将结点状态设置为CANCELLED
            cancelAcquire(node);
    }
}

说明: acquireQueued 进入同步队列中,直到线程被唤醒。

  1. 修改前驱结点状态为 SIGNAL:只有前驱结点的状态为 SIGNAL 时,才能唤醒后继结点。shouldParkAfterFailedAcquire 修改前驱结点状态成功返回 true,否则不断尝试。
  2. 挂起当前线程:parkAndCheckInterrupt 方法通过 LockSupport.park 挂起当前线程,并返回线程是否被中断,也就是可以响应中断操作。
  3. 线程被唤醒:当其它线程释放锁时,会唤醒后继结点。如果这个线程的前驱结点是头结点,并抢占锁成功,就会被唤醒,否则继续被挂起。

release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

说明: 释放锁相比获取锁要简单一些,因为此时线程已经获取到锁,可以不使用 CAS 原子性操作。

private void unparkSuccessor(Node node) {
  
    // 1. node表示需要删除的结点,将其状态重新设置为INITIAL。允许失败?
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 2. 这一段比较难理解,涉及到同步队列的线程安全问题,目前就记住一点就可以:
    //    node.prev是线程安全的,而node.next则不是线程安全的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 3. 唤醒同步线程,当然这个线程不一定能抢占到锁。比如非公平锁
    if (s != null)
        LockSupport.unpark(s.thread);
}

说明: unparkSuccessor 方法用于唤醒 node 的后继结点,有几个小细节需要关注一下:

  1. node 表示头结点,也就是当前获取锁的线程。
  2. 虽然,通过 LockSupport.unpark 唤醒了后继结点,但该线程不一定能争抢到锁。
  3. 一旦后继结点争抢到锁,就会向头指针向后移动。

AQS 源码分析 - Condition

管程的两个主要功能,我们已经分析了 Lock 是如何解决互斥问题,下面再看一下 Condition 是如何解决同步问题。条件同步 Condition 的典型用法如下:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

// 等待
public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
        while(条件不满足)
            condition.await();
    } finally {
        lock.unlock();
    }
}

// 通知
public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
        condition.signalAll();
    } finally {
        lock.unlock();
    }
}

等待队列

Condition 等待队列(waitQueue)要比 Lock 同步队列(syncQueue)简单很多,最重要的原因是 waitQueue 的操作都是在获取锁的线程中执行,不存在数据竞争的问题。

Condition 等待队列结构

img

ConditionObject 重要的方法说明:

  • await:阻塞线程并放弃锁,加入到等待队列中。
  • signal:唤醒等待线程,没有特殊的要求,尽量使用 signalAll。
  • addConditionWaiter:将结点(状态为 CONDITION)添加到等待队列 waitQueue 中,不存在锁竞争。
  • fullyRelease:释放锁,并唤醒后继等待线程。
  • isOnSyncQueue:根据结点是否在同步队列上,判断等待线程是否已经被唤醒。
  • acquireQueued:Lock 接口中的方法,通过同步队列方法竞争锁。
  • unlinkCancelledWaiters:清理取消等待的线程。

await

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();           // 1. 添加到等待队列中
    int savedState = fullyRelease(node);        // 2. 释放锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {              // 3. 判断线程是否在同步队列中
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 重新竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

说明: 条件同步调用 await 方法后主要完成以下几件事。

  1. 添加到等待队列:将该线程添加到等待队列后,初始状态为 CONDITION。
  2. 释放锁:调用 unparkSuccessor 唤醒后继结点。
  3. 阻塞:如果调用 signal 唤醒等待线程,该线程就会从等待队列移动到同步队列。isOnSyncQueue 判断该结点是否已经在 syncQueue 中。
  4. 重新竞争锁:acquireQueued 在分析 Lock 接口时已经分析过,重新竞争锁。

signal

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&     // 如果唤醒失败,就一直向下唤醒
             (first = firstWaiter) != null);
}

说明: doSignal 唤醒等待的线程,transferForSignal 都是真正唤醒等待线程的方法。如果该线程已经被唤醒或取消,则继续唤醒下一个线程。

final boolean transferForSignal(Node node) {
    // 1. 结点的状态必须是CONDITION。如果是其它状态,则要么已经唤醒,或已经取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 2. 重新加入到同步队列中,竞争锁
    Node p = enq(node);
    int ws = p.waitStatus;
    // 3. 设置前驱结点的状态为SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

说明: transferForSignal 唤醒等待的线程,重新加入到 syncQueue 同步队列来竞争锁。

  1. 等待线程的状态必须是 CONDITION,否则该等待线程已经被唤醒或取消。
  2. enq 方法重新加入到同步队列中,竞争锁。
  3. 前驱结点的状态必须是 SIGNAL。如果线程不能获取到锁,acquireQueued 自旋过程中会通过 shouldParkAfterFailedAcquire 修改其前驱结点的状态直到 SIGNAL。

AQS 无锁同步队列

AQS 中同步队列是双向链表,node.prev 和 node.next 不可能同时通过 CAS 保证其原子性。AQS 中选择了 node.prev 前驱结点的原子性,而 node.next 后继结点则是辅助结点。

同步队列(syncQueue)结构

img

为什么是前驱

思考1:AQS 为什么选择 node.prev 前驱结点的原子性,而 node.next 后继结点则是辅助结点?

  • next 域:需要修改二处来保证原子性,一是 tail.next;二是 tail 指针。
  • prev 域:只需要修改一处来保证原子性,就是 tail 指针。你可能会说不需要修改 node.prev 吗?当然需要,但 node 还没添加到链表中,其 node.prev 修改并没有锁竞争的问题,将 tail 指针指向 node 时,如果失败会通过自旋不断尝试。

前驱和后驱原子性操作对比

img

说明: 通过上图,前驱结点只需要一次原子性操作就可以,而后继结点则需要二次原子性操作,复杂性就会大提升,这就是 AQS 选择前驱结点进行原子性操作的原因。

思考2:AQS 明知道 node.next 有可见性问题,为什么还要设计成双向链表?

唤醒同步线程时,如果有后继结点,那么时间复杂为 O(1)。否则只能只反向遍历,时间复杂度为 O(n)。

以下两种情况,则认为 node.next 不可靠,需要从 tail 反向遍历。

  1. node.next=null:可能结点刚刚插入链表中,node.next 仍为空。此时有其它线程通过 unparkSuccessor 来唤醒该线程。
  2. node.next.waitStatus>0:结点已经取消,next 值可能已经改变。

思考3:AQS 同步队列什么时候删除结点?

  • 入队:lock.lock() 获取锁失败时,会将线程添加到同步队列中。tail 结点总是存在锁竞争的问题。
  • 出队:即物理删除。acquireQueued 自旋(图 5)时:
    • 获取锁成功时,会将头结点移除,同时将 head 重新指向新结点。也就是 node.prev 链打断了。head 结点基本上不存在锁竞争问题。因为只有在初始化时 head 头结点存在锁竞争,之后都是持有锁的线程在修改 head 结点。
    • 结点自旋时,如果 shouldParkAfterFailedAcquire 和 cancelAcquire 方法,发现结点已经被取消,则会剔除已经被取消的结点,node.prev 链同样被打断了。需要注意的是,node 结点自旋时,修改 node 自身的属性没有锁竞争,但如果修改其它结点的属性则会存在锁竞争。
  • 取消:即逻辑删除。如果线程被中断、超时,那么会将线程的状态修改为 CANCELLED,查找时会忽略该结点。同时会修改 node.next,但不会修改 node.prev,直到其它线程获取锁重新设置 head 才会打断 node.prev 链。

总结: 物理删除,acquireQueued 自旋时会修改 node.prev。而逻辑删除,会先标记为 CANCELLED 状态,并修改 node.next。

添加结点

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;                     // ① 不存在锁竞争(因为还没有添加到链表中)
            if (compareAndSetTail(t, node)) {  // ② node.prev一定是对其它线程可见的
                t.next = node;                 // ③ 可能存在并发操作,此时t.next=null
                return t;
            }
        }
    }
}

说明: 线程进入等待队列时,node.prev 是绝对线程安全的,但 node.next 就不一定了。如果线程只好在此时被唤醒,unparkSuccessor 通过 prev.next 就无法查找到该结点,只能反向遍历。

删除结点

删除结点,这里指的是物理删除,修改 node.prev 域,只有在 prev 链中断开才能真正的删除。

(1)acquireQueued 获取锁时,需要重新设置头结点,会将 node.prev=null:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

说明: head 头结点只有在获取锁才会更新,所以不需要 CAS 原子性操作。

(2)shouldParkAfterFailedAcquire(cancelAcquire 方法类似) 清除取消结点代码如下:

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;

说明: node 结点所在的线程自旋时,修改自身的属性 node.prev 不存在锁竞争,但如果修改其它结点的属性(eg pred.next)则会存在锁竞争。

取消结点

取消结点,这里指的是逻辑删除,将结点的状态标记为 CANCELLED,同时修改 node.next 域。取消结点操作比较复杂,因为要考虑取消的结点可能为尾结点、中间结点、头结点三种情况。

private void cancelAcquire(Node node) {
    node.thread = null;

    // 1. 查找前驱结点:忽略CANCELLED结点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    
    // 2. 逻辑删除:此时已经无法从同步队列中查找到该结点,直到其它线程获取锁时会真正物理删除
    node.waitStatus = Node.CANCELLED;

    // 3.1 被取消的结点是尾结点:直接将 tail.next=null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 3.2 被取消的结点是中间结点:前驱结点必须改成SIGNAL状态,否则直接唤醒线程
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        // 3.3 被取消的结点是头结点:直接唤醒后继结点
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

说明: cancelAcquire 唯一可以确定的是将 node.waitStatus 修改成 CANCELLED。如果被取消的是头结点时,需要唤醒后继结点。至于取消的结点是尾结点或中间结点,并不能保证操作成功与否。

img

从上图可以看到:

  • 取消尾结点:设置 tail=pred 且 pred.next=null,但这两个操作都不能保证成功。
  • 取消中间结点:确保 pred.waitStatus=SIGNAL,如果成功则设置 pred.next=node.next,否则直接唤醒后继结点。
  • 取消头结点或设置前驱结点状态为 SIGNAL 失败:直接唤醒后继结点。

总结: cancelAcquire 方法只是逻辑删除,将结点状态标记为 CANCELLED,同时可以修改 node.next 域。从这我们也可以看到为什么 unparkSuccessor 方法唤醒后继结点时,如果后继结点已经 CANCELLED,就需要从 tail 反向遍历结点,因为 next 域可能已经被修改。

唤醒结点

唤醒结点时,需要查找后继有效结点。如果 next=null 或 next.waitStatus>0 则需要反向遍历。

private void unparkSuccessor(Node node) {
    ...
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
}

说明: node.next 是辅助结点,存在可见性问题,但 node.prev 一定可以遍历所有的结点。

参考:

相关博客:

https://www.cnblogs.com/binarylei/p/12555166.html

标签:node,结点,AQS,队列,next,并发,线程,唤醒
From: https://www.cnblogs.com/luojw/p/18143679

相关文章

  • Java并发(二十五)----异步模式之生产者/消费者
    1.定义要点与Java并发(二十二)----同步模式之保护性暂停中的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应这样的好处是消费队列可以用来平衡生产和消费的线程资源生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据......
  • 高并发(锁)
    锁是用于控制多个线程对共享资源的访问的机制,防止出现程序对共享资源的竞态关系线程安全在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况线程的竞态条件竞态条件(racecondition)竞态......
  • 基于K8s+Docker+Openresty+Lua+SpringCloudAlibaba的高并发秒杀系统——与京东淘宝同
    ​介绍基于K8s+Docker+Openresty+Lua+SpringCloudAlibaba的高并发高性能商品秒杀系统,本系统实测单台(16核32G主频2.2GHz)openresty(nginx)的QPS可高达6w并发,如果您需要应对100w的并发,则需要100w/6w=17台openresty服务器,17台服务器同时接收并处理这100w的并发流量呢?当然是商业......
  • 测试Netty高并发工具
    测试Netty应用程序的高并发性能工具JMeterJMeter:ApacheJMeter是一个功能强大的用于性能测试的工具,可以模拟大量用户对Netty服务器的并发请求。你可以创建各种测试计划来模拟不同负载条件下的性能表现。wrkwrk:wrk是一个现代的HTTP基准测试工具,它可以轻松地对Netty服务器进......
  • 我们如何实现最基础的并发?
    OS的目标是在1保持控制权下2高性能的并发。因此现在我们有两大问题需要解决:1.如何高性能地并发?2.如何保持OS对计算机的控制权?(我们姑且只讨论在单CPU的机器上,运行微内核OS) 这篇博客中,我们先来回答这个问题:我们如何实现最基础的并发?       首先我们来回答一下什......
  • QPS才算高并发
    QPS才算高并发高并发场景QPS等专业指标揭秘大全与调优实战  合集-三“高”架构设计与调优(1) 1.高并发场景QPS等专业指标揭秘大全与调优实战04-14收起 高并发场景QPS等专业指标揭秘大全与调优实战最近经常有小伙伴问及高并发场景下QPS的一些问题,特意结合......
  • 高并发 (线程)
    我们都知道现在硬件水平越来越强,执行速度也越来越快。为充分利用CPU资源,我们可以通过多线程来执行分片任务提升整体的执行性能。如何创建线程在Java中通过newThread().start()方法来开启一个线程,实际是调用系统层的native方法privatenativevoidstart0();Java使用线程常用......
  • netcore 并发锁 多线程中使用SemaphoreSlim
    SemaphoreSlim是一个用于同步和限制并发访问的类,和它类似的还有Semaphore,只是SemaphoreSlim更加的轻量、高效、好用。今天说说它,以及如何使用,在什么时候去使用,使用它将会带来什么优势。代码的业务是:在多线程下进行数据的统计工作,简单点的说就是累加数据。1.首先我们建立一个程......
  • Docker安装部署Jenkins并发布NetCore应用
    Docker安装Jenkins#拉取镜像dockerpulljenkins/jenkins#查看镜像dockerimages#运行jenkins#8080端口为jenkinsWeb界面的默认端口13152是映射到外部:前面的是映射外部#50000端口为jenkins的默认代理节点(Agent)通信端口13153是映射到外部#--restart=on-fa......
  • Java并发编程实战读书笔记
    1.线程池模型    netty实战中讲到的线程池模型可以描述为:1.从线程池中选择一个空间的线程去执行任务,2.任务完成时,把线程归还给线程池。这个模型与连接池类似。    根据jdk源码的研究,具体的实现模型是,线程池ThreadPoolExecutor中有一个静态内部类Worker,使用装饰器模式扩......