首页 > 编程语言 >AbstractQueuedSynchronizer源码阅读

AbstractQueuedSynchronizer源码阅读

时间:2023-12-27 11:34:29浏览次数:38  
标签:node AQS 队列 AbstractQueuedSynchronizer 源码 线程 阅读 null 节点

AbstractQueuedSynchronizer源码阅读

目录

本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习

本文基于corretto-17.0.9源码,参考本文时请打开相应的源码对照,否则你会不知道我在说什么

例子

AbstractQueuedSynchronizer简称AQS。在JUC中基于AQS实现了各种各样的锁和同步工具(信号量、事件等),继承它的类只需要维护一个状态变量,剩下的队列操作以及阻塞机制都由AQS来完成。

什么叫继承它的类只需要维护一个状态变量,看一个官方文档的说明:

To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState, setState and/or compareAndSetState:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

意思是我们只需要重写AQS的这几个方法,并且通过AQS提供的getState, setState and/or compareAndSetState这几个方法对状态变量进行修改,就能达到我们的想要的各种效果。

看一下官方文档给出的使用AQS实现不可重入互斥锁的例子就明白了,这里定义状态为1表示锁被占用,0表示锁空闲:

public class Mutex implements Lock, Serializable {

  private static class Sync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int acquires) {
      assert acquires == 1;
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }

    @Override
    protected boolean tryRelease(int releases) {
      assert releases == 1;
      if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
      }
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }

    public boolean isLocked() {
      return getState() != 0;
    }

    @Override
    protected boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
    }

    public Condition newCondition() {
      return new ConditionObject();
    }

    private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
      s.defaultReadObject();;
      setState(0);
    }
  }

  private final Sync sync = new Sync();

  @Override
  public void lock() {
    sync.acquire(1);
  }

  @Override
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }

  @Override
  public boolean tryLock() {
    return sync.tryAcquire(1);
  }

  @Override
  public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(time));
  }

  @Override
  public void unlock() {
    sync.release(1);
  }

  @NotNull
  @Override
  public Condition newCondition() {
    return sync.newCondition();
  }

  public boolean isHeldByCurrentThread() {
    return sync.isHeldExclusively();
  }

  public boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
  }
}

首先AQS的子类通常作为 基于AQS的同步/互斥工具类 的私有静态内部类,这里是Sync类,然后再声明一个Sync类型的变量供我们的Mutex使用,而不是直接继承AQS。设计模式上来说,像是 适配器模式 吧。

然后看到Sync重写AQS的几个方法:

  • tryAcquire:通过CAS将状态设为1(占有锁),成功的话设置当前线程为持有锁的线程,否则返回false表示失败
  • tryRelease:检查当前线程不是占有锁的线程的话,直接抛异常。否则将状态设为0(释放锁)
  • isHeldExclusively:检查当前线程是否持有当前锁的线程

另外因为这里Mutex是独占锁而不是共享锁,因此不用实现

  • tryAcquireShared
  • tryReleaseShared

然后Mutex就可以拿着Sync调用方法进行lock、unlock等操作了,整个实现的代码十分简单,因为大部分活AQS都帮我们干了。而且那么多同步工具类都依赖于AQS,足以说明了AQS提供了很多的功能,十分强大,下面将进行分析。

Overview-CLH锁

上网一搜AQS解析,必定离不开CLH锁。CLH作为AQS阻塞队列的节点,一个线程绑定一个节点。注意只有发生竞争锁时才会有节点入队。比如只有单线程获取锁的话,根本不会阻塞,更没有竞争锁。

目前网上大多是基于JDK14之前的AQS分析,而JDK14之后AQS进行了重构,JDK17与14的AQS是差不多的,因此我将根据版本17的来分析,不再讨论旧的设计。另外在这一小节过后,CLH这个词也许不会在出现,通通称为”节点“、“队列”

下面的梳理是基于AQS源码中CLH Nodes的Overview注释进行,你也可以直接看文档的Overview,不过可能比较抽象,能理解多少理解多少,结合代码分析后再回来看Overview会非常清晰。

CLH单个节点的成员变量很好理解:

abstract static class Node {
    volatile Node prev;       // initially attached via casTail
    volatile Node next;       // visibly nonnull when signallable
    Thread waiter;            // visibly nonnull when enqueued
    volatile int status;      // written by owner, atomic bit ops by others
}

prev和next将节点链成双链表,waiter是节点绑定的线程,status成员用于记录节点状态(比如是否正在被阻塞),通过位操作可以记录多种状态。下面看一下CLH队列的结构。

首先CLH队列永远会有一个dummy node作为头节点并用变量head保存,当然,dummy node是懒初始化的,也就是在入队第一个节点的时候才会被初始化。下图是初始化了dummy node后的状态,tail变量指向队尾节点。

+-------+       +------+
|       |       |      |
| head  | <---- | tail |
|(dummy)|       |      |
+-------+       +------+

当第一个节点first入队后的状态图如下:

+-------+  prev +-------+       +------+
|       | <---- |       |       |      |
| head  |       | first | <---- | tail |
|(dummy)| ----> |       |       |      |
+-------+  next +-------+       +------+

这部分对应的代码如下,第一次循环懒初始化dummy(如果还没初始化的话),第二次才入队:

private void tryInitializeHead() {
    Node h = new ExclusiveNode();
    if (U.compareAndSetReference(this, HEAD, null, h))
        tail = h;
}

final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            if (t == null)                 // initialize
                tryInitializeHead();
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

接下来是唤醒操作(signalling),AQS采取一种像Dekker算法那样的策略:尝试获取资源的线程不断尝试获取锁,自旋结束还获取不到的话,就park睡眠,直到被释放锁的线程清除WAITING状态并用unpark唤醒,然后重复上述步骤。

这里提到的Dekker算法是操作系统中的互斥算法,类似的还有Peterson算法,这里简单穿插进来了解一下:

// dekker algorithm
process_p {
  pturn = true;
  while (qturn) {
    if (turn == 2) {
      pturn = false;
      while (turn == 2);
    }
    pturn = true;
  }
  // critical section
  turn = 2;
  pturn = false;
}

process_q {
  qturn = true;
  while (pturn) {
    if (turn == 1) {
      qturn = false;
      while (turn == 1);
    }
    qturn = true;
  }
  // critical section
  turn = 1;
  qturn = false;
}

Dekker用于两个进程互斥地访问临界区:pturn和qturn分别代表两个进程进入临界区的意愿,turn代表谁能真正进入临界区。当检测到对方有意愿,并且不是自己的turn时直接放弃自己的意愿,然后不断检测对方是否放弃turn,最后重新声明自己的意愿,并检测turn。

文档说CLH的signalling是Dekker-like的,我想是因为AQS.state就好比turn,自旋+阻塞就好比dekker的忙等。

Overview的剩余内容

AQS支持独占和共享模式,在共享模式下,尝试获取锁的操作会通知下一个等待线程也去获取锁。

在条件变量上等待的线程会单独形成一条单链表,await方法将其加入这条链表,被唤醒之后将其从链表摘下,并入队到主队列中(CLH锁队列)。

AQS在共享变量上进行了很细致的设计,比如对AQS的head, tail, state成员总是volatile读/写,即总是加上full fence,我想这一点是因为像state这样的变量是子类能直接使用的,而AQS不知道子类会怎么加fence,因此直接full fence避免内存可见性等问题。而Node类的status, prev, next变量虽然也是volatile,但是对于他们的访问使用了更宽松的fence策略,因为这些变量对外不可见的,AQS完全可以控制,比如Node.waiter变量的访问甚至没加volatile限制,因为对他的访问夹在两个原子访问中,相当于前后都已经加了fence。(这块是在我之前碎片化学习了内存屏障后的浅薄理解,可能有纰漏)。这些更宽松的访问是直接用Unsafe实现的,在JDK 9后提供了更安全的VarHandle,文档说没有使用它是为了 avoid potential VM bootstrap issues...不懂。

代码分析

从大段注释到代码一路读下来,第一个遇到的核心函数是

final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time)

但又遇到了比较复杂的函数cleanQueue,因此先分析这个cleanQueue

cleanQueue

根据doc可以知道,cleanQueue有两个作用;

  • 从tail开始遍历队列,清除已取消的节点
  • 唤醒那些新成为队头的节点去获得锁(与dummy节点区分开来,这里指的是head.next)

代码用了两层循环,外部那层用于当队列结构改变的时候,重新开始遍历,因此重点在于内层循环。先来看内循环第一行:

for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples

一上来先定义四个让人直呼acm高手的变量名,分别是q s p n,并且后面注释(p, q, s) triples,再联系下面的代码大概可以猜到这四个变量分别代表:

  • q:当前节点
  • s:q的后继节点successor
  • p:q的前驱节点predecessor
  • n:临时的后继节点next

明白变量的含义之后,接着看下去:

if (q == null || (p = q.prev) == null)
  return;                      // end of list

如果队列没有节点了,或者当前节点是第一个节点(dummy节点)则已处理完毕。这是唯一的函数出口。在这之后的代码都保证了q和p不为空。

if (s == null ? tail != q : (s.prev != q || s.status < 0))
  break;                       // inconsistent
  • 后继为空:tail != q的话,说明有其他线程修改了tail,队列结构改变,因此直接break然后重新开始遍历。
  • 后继不为空:后继的前驱不是q(也就是q的后继的前驱不是q的话,说明s已经不再是q的后继)或者后继已经被取消(s是已经被处理过的节点,这个函数就是要清除已取消的节点,因此得重新开始遍历,去处理这个s)的话,也说明队列结构改变了,直接break。
if (q.status < 0) {              // cancelled
  if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
    q.prev == p) {
    p.casNext(q, s);         // OK if fails
    if (p.prev == null)
      signalNext(p);
  }
  break;
}

到此终于来到了处理节点的代码,根据常量的定义,status<0代表节点已被取消(最高位为1的int类型必然是负数):

// Node status bits, also used as argument and return values
static final int WAITING   = 1;          // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND      = 2;          // in a condition wait

首先还是根据后继是否为空分情况讨论:

  • 后继为空:直接将tail指向前驱节点
  • 后继不为空:将后继的前驱指向q的前驱

上述两种情况用的都是CAS操作,在赋值的同时检测了队列结构是否改变(这里称CAS为testAndSet更合理),如果队列结构改变的话返回false,这样就可以直接break重新开始遍历了。

设置成功后将p.next设成s,至此从结构上已经完成了将节点q从队列删除(学过双链表就不难脑补出来)。

然后最后一个if判断,如果p是第一个节点的话,说明p是dummy节点,而原先的第一个有效节点(dummy.next)已经被删除,下一个节点成为了第一个有效节点,此时它可以去获取锁,所以调用signalNext唤醒它。

由于删除q是否成功都意味着队列结构的改变,因此最后break重新开始遍历。

if ((n = p.next) != q) {         // help finish
  if (n != null && q.prev == p) {
    p.casNext(n, q);
    if (p.prev == null)
      signalNext(p);
  }
  break;
}

这里检测p.next是否指向q(q的前驱的后继是否是q),如果是的话,说明队列结构没被改变继续处理,否则进入下一步判断:

如果n(p当前的后继)不是null,并且当前q的前驱为p,说明当前为这样的状态:

ef0357c65804f40fc6e774b9939c0fce

会出现这样的状态是因为并发导致p.next被其他线程所修改,后面将p.next强行恢复为指向q即可。然后检测如果第一个有效节点发生改变(检测到p是dummy),那么则唤醒这个节点去获得锁。最后也是由于队列结构改变直接break。

TODO:这里我暂时存疑,什么时候会出现这样的情况呢?强行将它恢复成cleanQueue需要的结构的话,不会有什么问题吗。话说回来,其实也没必要太纠结,因为AQS在关于并发、吞吐量以及其他的一些权衡上的细枝末节非常多,分析整体的思想即可。

s = q;
q = q.prev;

这是最后两行代码,走到这里也就说明本次循环没有找到已取消的节点,队列结构也没发生改变,因此往head方向继续遍历。

总之cleanQueue核心功能就是用来清理一些已取消的节点,期间可能会出现这样的情况:已经被遍历过的节点被取消了,但是没有被发现(q已经差不多前进到head了),因此本次cleanQueue不会被清除。这种情况是正常的,可以类比GC,节点的清除是批量的,不是一被取消就会被清除的,可以提高性能,没删除的节点可以等下一批删除。

acquire

这个函数的功能是,尝试通过tryAcquire获取锁,自旋数次获取不到之后,就主动调用park进入阻塞状态。如果某次tryAcquire成功则返回正数,被中断则返回负数,超时返回0。

先看下函数签名以及对应参数含义:

final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time)
  • node:只有在条件变量中acquire才会传入
  • arg:用户自定义参数,最终会传给tryAquire/tryAcquireShared
  • shared:是否共享模式的acquire
  • interruptible:如果为true,那么在中断后返回负数
  • timed:是否有限等待
  • time:有限等待时长,单位是纳秒

为了简单起见但是又不失根据地分析代码,我们注意到在之前Mutex例子中的lock方法中直接调用的是acquire(1),因此从这个调用入手去分析理解,并且Mutex对应的是独占模式,因此下面先暂时忽略共享模式的代码。

我们分析最简单的情况:只有一个线程尝试去获取已经被其他线程占用的锁,即对tryAcquire的调用通通返回false

public final void acquire(int arg) {
  if (!tryAcquire(arg))
    acquire(null, arg, false, false, false, 0L);
}

这个acquire的重载首先调用了tryAcquire(这个函数在Mutex示例中有,是由子类自定义的尝试获取锁的方法),如果尝试获取失败的话,再调用acquire。实际上acquire里也会数次调用tryAcquire

然后进入acquire(null, arg, false, false, false, 0L)

Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null;                // predecessor of node when enqueued
  • current:当前线程
  • spins:下次唤醒后的最大自旋次数
  • postSpins:暂存最大自选次数
  • interrupted:记录线程是否被中断
  • first:node是否为第一个有效节点(即是否为head.next指向的节点)
  • node:当前线程所绑定的节点

最外边是一个无限for循环,这个写法在之前enqueue方法里也见过,其实非常常见:循环里写了很多 if 分支根据当前的运行状态选择不同的处理逻辑。比如在enqueue方法中,首次循环时tail还没初始化(状态1)先去初始化tail,第二次循环时tail已经初始化(状态2),此时将节点入队。明白了这个写法之后,就不会被下面一堆的 if 弄乱了。

首先看第一块 if:

// 分支1
if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
  if (pred.status < 0) {
    cleanQueue();           // predecessor cancelled
    continue;
  } else if (pred.prev == null) {
    Thread.onSpinWait();    // ensure serialization
    continue;
  }
}

这个代码块主要用于清理已经被取消的前驱节点,目的是让node尽量往队头靠,最好变成第一个有效节点,这样就有机会去获取锁了。根据变量的含义,可知这条 if 判断的是:如果node不是空,并且不是head,并且不是第一个有效节点。换句话说就是:当node是非第一个有效节点时为 true,那么进入这个分支:

  • 当前驱节点被取消(pred.status < 0),用cleanQueue清除被取消的节点,然后重新循环。
  • 当节点是第一个有效节点(pred.prev == null),则重新开始循环。如果走到这一步的话,说明在它之前的节点在别的线程中被清理了,当前节点变成了第一个有效节点。

下面看第二块 if:

// 分支2
if (first || pred == null) {
  boolean acquired;
  try {
    if (shared)
      acquired = (tryAcquireShared(arg) >= 0);
    else
      acquired = tryAcquire(arg);
  } catch (Throwable ex) {
    cancelAcquire(node, interrupted, false);
    throw ex;
  }
  if (acquired) {
    if (first) {
      node.prev = null;
      head = node;
      pred.next = null;
      node.waiter = null;
      if (shared)
        signalNextIfShared(node);
      if (interrupted)
        current.interrupt();
    }
    return 1;
  }
}

这个代码块的功能是使用tryAcquire尝试获取锁,由于循环可能会执行多次,对应的会多次tryAcquire。另外因为要满足FIFO特性,因此只有第一个有效节点才去竞争这个锁(first=true),或者是一开始还没生成节点的时候也会去尝试一次tryAcquire(pred==null)。如果获取锁成功,那么清除该节点(如果有的话),并返回一个正数表示获取成功。

否则,如果上一步tryAcquire不成功,那么继续往下走。

下面是一个大的if-ifelse-else体,每个分支对应不同的处理逻辑,梳理一下各个分支的功能,写在注释里:

if (node == null) {                 // allocate; retry before enqueue
  // 分支3. 为当前线程生成节点
  if (shared)
    node = new SharedNode();
  else
    node = new ExclusiveNode();
} else if (pred == null) {          // try to enqueue
  // 分支4. 将节点入队
  node.waiter = current;
  Node t = tail;
  node.setPrevRelaxed(t);         // avoid unnecessary fence
  if (t == null)
    tryInitializeHead();
  else if (!casTail(t, node))
    node.setPrevRelaxed(null);  // back out
  else
    t.next = node;
} else if (first && spins != 0) {
  // 分支5. 如果节点是第一个有效节点则自旋获取锁,即重新再来一轮循环,运行第二个 if 块中的tryAcquire
  --spins;                        // reduce unfairness on rewaits
  Thread.onSpinWait();
} else if (node.status == 0) {
  // 分支6. 设置节点的状态为阻塞ing,到这一步的话,说明自旋次数用完了还没获取到锁,需要准备阻塞了,不然浪费CPU资源,很明显在设置完这个状态之后还会去挣扎一次tryAcquire
  node.status = WAITING;          // enable signal and recheck
} else {
  // 分支7. 真正开始阻塞节点对应的线程,阻塞之前增加节点下次唤醒后的最大自旋次数。被唤醒之后,清除阻塞ing的状态,如果被中断的话则退出循环
  long nanos;
  spins = postSpins = (byte)((postSpins << 1) | 1);
  if (!timed)
    LockSupport.park(this);
  else if ((nanos = time - System.nanoTime()) > 0L)
    LockSupport.parkNanos(this, nanos);
  else
    break;
  node.clearStatus();
  if ((interrupted |= Thread.interrupted()) && interruptible)
    break;
}

循环结束后只有一句:

return cancelAcquire(node, interrupted, interruptible);

cancelAcquire用于清理acquire失败的节点,只有两种情况会被调用:

  • tryAcquire抛出异常(对应分支2中的异常处理)
  • 超时或者被中断(对应分支7的两个break)
private int cancelAcquire(Node node, boolean interrupted, boolean interruptible) {
  if (node != null) {
    node.waiter = null;
    node.status = CANCELLED;
    if (node.prev != null)
      cleanQueue();
  }
  if (interrupted) {
    if (interruptible)
      return CANCELLED;
    else
      Thread.currentThread().interrupt();
  }
  return 0;
}

acquire总结

acquire完成的功能其实就是不断自旋获取锁,获取不到就阻塞,醒来后继续自旋,然后阻塞,如此往复。为了实现公平的特性(FIFO),队列中只有第一个有效节点才会去tryAcquire,并且是多次tryAcquire,比如在还没生成节点前会tryAcquire一次,阻塞前tryAcquire至少一次,唤醒后会tryAcquire一次。

一些疑问点:

  • 为什么第一个有效节点被正常唤醒后(非超时/中断)却还有可能获取不到锁,不是FIFO的吗?

    因为唤醒之后到重新获取锁这段时间内,可能有其它根本不在队列中的线程这个时候tryAcquire抢先拿到了锁。从这个角度看,AQS的FIFO公平性只是针对在队列中的线程来说的。因此AQS采取了线程唤醒后增加最大自旋次数来提高公平性(对应的分支5也有一句源代码注释reduce unfairness on rewaits

  • Thread.onSpinWait() 与 Thread.yield() 有什么区别

    直接看onSpinWait的官方描述:on some architectures the Java Virtual Machine may issue the processor instructions to address such code patterns in a more beneficial way....总之不明觉厉,跟具体处理器有关,具体区别得去问hotspot开发者们,反正busy-waiting的时候他就会用onSpinWait。

acquire共享模式补充

release

相比acquire方法,这个方法就“友好”多了。

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    signalNext(head);
    return true;
  }
  return false;
}

尝试释放锁,释放成功后再唤醒队列第一个等待的节点去获得锁。

内部类ConditionObject

到此,AQS还剩共享模式以及ConditionObject没分析。至于AQS其他的方法像hasQueuedPredecessorsisQueued等,经过前面的分析理解阻塞队列的结构以及行为之后就不难读懂了。下面直接看ConditionObject。

AQS还提供了内部类ConditionObject,实现了Condition接口,作为基于AQS的锁的条件变量,使得线程可以在不同的阻塞在不同的队列中。

在这里插入图片描述

条件变量无非就是await和signal

  • await:释放锁并阻塞在等待队列上,等待被signal,唤醒后会自动再次获取锁
  • signal:唤醒被阻塞在等待队列上的线程

signal比较简单,先看signal:

public final void signal() {
  ConditionNode first = firstWaiter;
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  if (first != null)
    doSignal(first, false);
}

可以看到不是随便一个线程都能调用signal的,必须要拥有独占锁的线程才能用signal唤醒其他线程。signal通过doSignal这个方法实现:

/**
 * Removes and transfers one or all waiters to sync queue.
 */
private void doSignal(ConditionNode first, boolean all) {
  while (first != null) {
    ConditionNode next = first.nextWaiter;
    if ((firstWaiter = next) == null)
      lastWaiter = null;
    if ((first.getAndUnsetStatus(COND) & COND) != 0) {
      enqueue(first);
      if (!all)
      	break;
    }
    first = next;
  }
}

doSignal很简单,只干了一件事,或者说signal只干了一件事:将节点从条件变量队列摘下来,重新入队到AQS队列。

接下来再看看await方法,另外里面有一个enableWait方法比较重要,先看这个enableWait:

/**
 * Adds node to condition list and releases lock.
 *
 * @param node the node
 * @return savedState to reacquire after wait
 */
private int enableWait(ConditionNode node) {
  // 检查当前线程有没有独占该锁
  if (isHeldExclusively()) {
    // 绑定节点与当前线程
    node.waiter = Thread.currentThread();
    node.setStatusRelaxed(COND | WAITING);
    // 将节点加入该条件变量的等待队列中
    ConditionNode last = lastWaiter;
    if (last == null)
      firstWaiter = node;
    else
      last.nextWaiter = node;
    lastWaiter = node;
    // 保存状态,并释放锁
    int savedState = getState();
    if (release(savedState))
      return savedState;
  }
  // 如果线程没有独占该锁,则直接抛异常
  node.status = CANCELLED; // lock not held or inconsistent
  throw new IllegalMonitorStateException();
}

结合注释可以知道,enableWait的功能是将该线程对应的节点加入条件变量的等待队列,并释放其对应的锁。

下面再来看await方法:

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  ConditionNode node = new ConditionNode(); // 为当前线程生成节点
  int savedState = enableWait(node); // 释放锁并将节点加入条件变量的阻塞队列
  LockSupport.setCurrentBlocker(this); // for back-compatibility,记录线程的阻塞源
  boolean interrupted = false, cancelled = false, rejected = false;
  while (!canReacquire(node)) { // 检测节点有没有被signal,还没有的话就不能acquire,这个函数返回false,进入循环体
    if (interrupted |= Thread.interrupted()) {
      if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
          break;              // 如果节点阻塞在条件变量上,并且被中断的话,退出循环,下面会检测到并且抛出中断异常
    } else if ((node.status & COND) != 0) {
      // 阻塞线程
      try {
        if (rejected)
          node.block();
        else
          ForkJoinPool.managedBlock(node);
      } catch (RejectedExecutionException ex) {
        rejected = true;
      } catch (InterruptedException ie) {
        interrupted = true;
      }
    } else
      // 可能还没完成入队,自旋等一下
      Thread.onSpinWait();    // awoke while enqueuing
  }
  LockSupport.setCurrentBlocker(null);
  node.clearStatus();
  // 重新获取锁
  acquire(node, savedState, false, false, false, 0L);
  // 如果之前被中断了,那么就清除条件变量队列上的这个节点,包括其他non-waiting的节点,并重新抛出中断异常
  if (interrupted) {
    if (cancelled) {
      unlinkCancelledWaiters(node);
      throw new InterruptedException();
    }
    Thread.currentThread().interrupt();
  }
}

await流程如下:

  1. 生成线程对应的节点
  2. 将节点加入条件变量的等待队列,并释放其对应的锁
  3. 不断调用canReacquire,通过节点在不在AQS队列中获知线程有没有被signal,如果不在的话就继续阻塞线程
  4. 当线程被signal或者被中断后退出循环,并重新尝试获得锁
  5. 最后处理中断

await需要注意的地方也不少:

  • 阻塞线程有两种方式,node.block()和ForkJoinPool.managedBlock(node)。后者使用ForkPoolManager管理。TODO:阅读ForkJoinPool
  • 无论是被signal还是被中断,await返回之前都会再次调用acquire获取锁,并且注意到参数interruptable=false,说明即使await是可中断的,但前提是得获取到锁才能返回。Condition.await方法的接口文档也说明了这一点:When the thread returns it is guaranteed to hold this lock
  • while循环里的else分支啥也没干,因此相当于自旋,之前说过这种情况下调用Thread.onSpinWait有利于优化运行。

参考链接

Jdk17 AQS cleanQueue方法源码分析

LockSupport源码阅读

AQS-基于JDK17

标签:node,AQS,队列,AbstractQueuedSynchronizer,源码,线程,阅读,null,节点
From: https://www.cnblogs.com/nosae/p/17930211.html

相关文章

  • 阅读笔记二
    相较于一般的技术类书籍,这本书的厚度属于偏薄的。而其内容所涵盖的又比较多,从技艺,思想和以COLA作为范例的实践,三个角度去分享自己的经验。这就势必使得这本书不会纠结于技术细节。技艺篇中,作者从最基础的命名开始,延伸到规范、函数、设计原则、设计规范以及模型和DDD,从细部到大......
  • 2023年秋季个人阅读计划9
    《见微知著:从软件实践到软件工程》读后感在信息时代,软件工程的重要性日益凸显。作为IT学生,我深知掌握软件工程知识对于未来的职业生涯至关重要。最近,我阅读了《见微知著:从软件实践到软件工程》这本书,它以酒店信息管理系统的开发为例,深入浅出地介绍了软件工程的全过程。软件工程......
  • ThreadLocal底层源码解析
    ThreadLocal底层源码解析ThreadLocal:顾名思义的意思是本地线程或者局部线程的意思,其真正含义是希望多个线程之间拥有自己的局部变量,多个线程间拥有自己的私人变量,在多线程间不被共享,被线程单独享用,这就是ThreadLocal设计之初的原衷因此,无论是操作系统级别还是编程语言......
  • openjdk源码-java是如何执行shell命令的
    一般我们在java中调用shell脚本的方式如下publicintexecuteLinuxCmd(Stringcmd){LOGGER.info("cmd:{}",cmd);Runtimerun=Runtime.getRuntime();try{Processprocess=run.exec(cmd);InputStreamin=proce......
  • 12.26阅读笔记
    读《需求工程——软件建模与分析》有感今天大致的看了一下这本书,对软件需求分析有了初步的了解,我认为学习软件需求分析需要掌握的内容主要包括五个方面:需求基础与过程、需求获取、需求分析、需求的文档化和验证、需求管理与工程管理。一、需求的基础与过程这一部......
  • APP开发详解:数字药店系统源码
    数字药店系统的兴起,不仅为消费者提供了更加便捷的购药体验,也为药店管理和药品销售带来了全新的机遇。一、明确系统的基本功能:1.用户注册与登录2.药品浏览与搜索3.购物车与结算。4.在线支付与订单管理二、开发环境与技术栈选择前端开发环境通常使用React、Vue或Angular等流行的前端......
  • 大语言模型生成模型的源码结构复习
    modeling_gpt2.py:1099iflabelsisnotNone:#movelabelstocorrectdevicetoenablemodelparallelismlabels=labels.to(lm_logits.device)#Shiftsothattokens<npredictnshift_logits=lm......
  • 基于SpringBoot+Vue的毕业设计系统的开发设计实现(源码+lw+部署文档+讲解等)
    (文章目录)前言:heartpulse:博主介绍:✌全网粉丝10W+,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌:heartpulse:......
  • Spring MVC 源码分析 - 一个请求的旅行过程
    在上一篇《WebApplicationContext容器的初始化》文档中分析了SpringMVC是如何创建两个容器的,其中创建RootWebApplicationContext 后,调用其refresh()方法会触发刷新事件,完成SpringIOC初始化相关工作,会初始化各种SpringBean到当前容器中,该系列文档暂不分析我们先来了解一......
  • 【源码系列#04】Vue3侦听器原理(Watch)
    专栏分享:vue2源码专栏,vue3源码专栏,vuerouter源码专栏,玩具项目专栏,硬核......