RocketMQ-CountDownLatch2
public class CountDownLatch2 {
private final Sync sync;
/**
* Constructs a {@code CountDownLatch2} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked before threads can pass through {@link
* #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch2(int count) {
if (count < 0)
throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>If the current count is zero then this method returns immediately.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of two things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* Causes the current thread to wait until the latch has counted down to
* zero, unless the thread is {@linkplain Thread#interrupt interrupted},
* or the specified waiting time elapses.
*
* <p>If the current count is zero then this method returns immediately
* with the value {@code true}.
*
* <p>If the current count is greater than zero then the current
* thread becomes disabled for thread scheduling purposes and lies
* dormant until one of three things happen:
* <ul>
* <li>The count reaches zero due to invocations of the
* {@link #countDown} method; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If the count reaches zero then the method returns with the
* value {@code true}.
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count
* reached zero
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
/**
* Returns the current count.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the current count
*/
public long getCount() {
return sync.getCount();
}
public void reset() {
sync.reset();
}
/**
* Returns a string identifying this latch, as well as its state.
* The state, in brackets, includes the String {@code "Count ="}
* followed by the current count.
*
* @return a string identifying this latch, as well as its state
*/
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
/**
* Synchronization control For CountDownLatch2.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
private final int startCount;
Sync(int count) {
this.startCount = count;
setState(count);
}
int getCount() {
return getState();
}
// 加共享锁方法——只有当资源数为0时才加锁成功
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放锁,就是将资源数减一,注意CAS失败后重试,所以为死循环
@Override
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
protected void reset() {
setState(startCount);
}
}
}
上文分析了 AQS 的源码、加锁释放锁的逻辑,其实 AQS 在许多框架中有自己的实现,比如在 RocketMQ 中封装的 CountDownLatch2
就是利用了 AQS 加锁失败时线程释放CPU资源并等待的原理。
加锁操作只有在内部资源数为0 时才会成功,对于 CountDownLatch2
最后退出时资源数才可能调整为0,所以所有的线程加锁都会失败,那么会进入自旋进而调用 LockSupport.park()
方法进入等待状态,该状态会释放系统资源。
// 对应 await() 方法,阻塞等待,因为调用 tryAcquireShared() 初始返回-1,进入doAcquireSharedInterruptibly() 后会自旋尝试加锁
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 尝试加锁操作
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 会在这里不停自旋尝试加锁,直到加锁成功退出,而加锁成功的条件是资源数为0
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
解锁操作就是将资源数减一,减为0 时标志着需要唤醒 CountDownLatch2
等待的线程,此时再执行加锁操作 tryAcquireShared()
会返回1,即加锁成功,那么会退出 doAcquireSharedInterruptibly()
方法,结束自旋加锁操作,await()
方法会理解结束。