CountDownLatch源码解析
- countdown是倒计时的意思,latch是门闩的意思,也有门锁的意思,合起来字面意思就是一个倒计树计锁器的意思,先来看一个具体的案例分析大致了解
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) throws InterruptedException {
int N = 3; // 有3个操作需要等待
CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
new Thread(new Worker(latch), "Worker " + i).start();
}
latch.await(); // 主线程在这里等待
System.out.println("所有操作完成,主线程继续执行。");
}
static class Worker implements Runnable {
private final CountDownLatch latch;
Worker(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
// 执行一些操作
System.out.println(Thread.currentThread().getName() + " 完成操作");
} finally {
latch.countDown(); // 计数器减一
}
}
}
}
打印结果如下:
Worker 0 完成操作
Worker 2 完成操作
Worker 1 完成操作
所有操作完成,主线程继续执行。
在这个例子中,主线程创建了三个工作线程,并等待这三个线程完成工作。每个工作线程完成工作后会减少计数器的值。当所有工作线程都完成后,主线程继续执行。
这么一看他有点类似计数器锁的样式,有一说一他确实有这个功能,因此可以把他看成一个辅助工具
1.CountDownLatch的构造实现
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch
又名锁存器,他的实现得以依赖与内部的Sync
这个类,而此类也就是我们AQS
的一个子类,也就是说部分功能依旧通过我们这个AQS
的模板类去实现,因此这里的count实际上是注入到state
这个字段中去,这个字段用来表示锁的一个状态,也可以用来计数.
1.1.Sync同步队列
sync是锁存器中重要的一个内部类,因此了解sync的内部结构是有必要的:他用来辅助我们去处理一些同步状态的问题,其中最核心的两个方法如下:
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
如果有阅读过读写锁的相关源码或者了解过共享模式就能明白tryAcquireShared
方法和tryReleaseShared
方法具体含义,这里共享模式下仅仅只有0和大于0的数,每次释放-1,直到为0说明此时的所有事件皆以完成(参考前面案例).
2.await方法的具体实现
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可以看出此方法的实现依赖与AQS的源码实现
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
因此实际上传入的参数1没有发挥任何作用,实际上还是取决于AQS
的state字段来判断锁存器中的计数是否存在,如果小于0说明此时还有线程在执行或者某个事件还未完成,因此调用doAcquireSharedInterruptibly
方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这部分的代码是AQS
的底层实现,需要对AQS
有一定的基本认识再去阅读比较容易理解,核心思维便是管理共享资源的获取,大致逻辑如下:
- 添加一个新的节点添加到等待队列中(尾插:具体参考AQS)
addwaiter
; - 下面操作是一个自旋的操作,且每次自旋都会去检查是否需要中断
- 获取新节点的前驱节点,如果前驱节点为头节点(头节点不保存任何具体元素:具体参考AQS)
predecessor
; - 在共享模式下获取状态
state
,如果大于等于0(可以回头看Sync内部的getstate方法,这里其实类似于双重锁进行了一个临时的校验),说明获取成功,调用setHeadAndPropagate
,设置头节点并传播(这个是翻译得来的,具体查看源码),并设置节点域,更改标志位字段,退出自旋状态 - 如果在获取失败后,考虑到性能问题是否需要中断对线程进行检查,后续再来看
因此核心是
setHeadAndPropagate
方法,查看源码:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
- 这段代码的核心在于设置AQS等待队列中的头节点,并且传播的含义就是其他节点的一个唤醒问题,根据条件是否唤醒其他线程,比如
propagate
大于0,表示一个传播条件,以及后续的条件都是在考虑是否需要唤醒其他线程,因此会获取此节点的后继节点,如果他的后续为空,并且为共享模式下,那意味着可以唤醒队列中其他的等待节点,
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
- 先来说他的一个整体设计思想:在共享模式下释放同步状态时使用,它确保状态的释放能够传播到其他等待的线程,所以在更新节点状态的时候底层是一个CAS的操作,下面来解释一下这段代码的具体含义:
- 首先是一个自旋的过程,并且头节点不为null,也不是尾节点,这说明此时等待队列中还存在节点,还有等待的线程
- 接下来获取头节点的状态,然后是一段逻辑的处理来处理不同的状态,如果是singnal状态,则需要唤醒后续节点,
unparkSuccessor
会唤醒后继节点,并且直到更新成功,如果ws == 0
,表示当前没有特定的信号状态,但需要设置为传播状态(PROPAGATE
),以确保释放操作能传播给后继节点。这里同样使用 CAS 操作尝试更新状态,如果失败,则继续循环。(这里请参考AQS等待队列中节点的状态设置,0并不表示特定的状态) if (h == head) { break; }
:检查头节点是否在循环期间改变。如果没有改变,则跳出循环。这是一种优化,以避免在头节点已经被处理的情况下进行无用的循环。
3.countDown方法的具体实现
public void countDown() {
sync.releaseShared(1);
}
这个函数的意义就是去递减锁存器的计数,如果计数为0,则释放所有线程
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
-
和上述流程对应,在共享模式下会去释放对象
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
- 如果状态为0,说明没有线程独占,也就是锁存器为0的情况,其余情况他会自己减1,并且通过CAS更换state字段;
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
而在这里同样去调用此方法的目的是为了唤醒那些在
await
方法中等待的线程.从而实现一种多线程之间的协调有效通知/等待过程.
4.CountDownLatch的设计意义
现有一场景,需要同时满足时,才可以去调用或者实现:例如,在我们的软件开发设计中需要(数据库服务,网络服务,和相关配置服务都同时启动)才能去启动,这么说是否能够理解其中的含义,这其实意味着某几件事中如果可能存在关联的时候需要一定的协调的时候就可以用到这种同步辅助工具