Semaphore源码解读
注意,阅读本文需要了解AQS,AQS采用了模板设计模式。后续本人会完善这篇文章
Semaphore的方法
- acquire()
阻塞获得一个许可,会阻塞,直到得到一个可用许可或被中断
重载版本 acquire(n) :尝试获取n个许可 - acquireUninterruptibly()
类acquire,但不可中断 - tryAcquire()
非阻塞版本,只尝试一次,可被中断
重载版本tryAcquire(n),tryAcquire(timeout,TimeUnit) 限时尝试,没抢到许可会等待直到得到许可或被中断 。tryAcquire(n,timeout,TimeUnit) - release()
释放一个许可。重载:release(n) - drainPermits()
当前线程剩余的许可 - hasQueuedThreads()
判断是否队列中有等待许可的线程 - getQueuedLength()
等待队列的长度
Semaphore总体框架
和ReentrantLock内部类似,Semaphore内部定义了一个抽象同步器Sync抽象类,它继承了AbstractQueuedSynchronizer。Sync有两个子类:NofairSync和FairSync,分别实现了非公平和公平两种版本的实现。Semaphore的各种acquire方法都是委托给Sync对象调用的。
关于permit
Sync的构造方法。permit的本质是AQS的state
Sync(int permits) {
// AQS的setState方法,
setState(permits);
}
Semaphore的 acquire方法
// Semaphore中
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AQS的模板方法acquireSharedInterruptibly,调用了tr·yAcquireShared钩子方法
//AQS
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
FairSync
FairSync重写了AQS的tryAcquireShared方法
protected int tryAcquireShared(int acquires) {
for (;;) {
// 若有前驱,则申请失败。保证申请者为第一个节点
if (hasQueuedPredecessors())
return -1;
// 获取当前剩余许可量
int available = getState();
int remaining = available - acquires;
// 若剩余许可<0,或cas成功,返回剩余量。
// 为什么剩余量<0也要照样返回呢?
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
再看AQS的方法
//AQS
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
第二个if块有两种情况:
- 钩子方法tryAcquireShared返回负数
即remaining<0,申请量大于许可量,则进入方法doAcquireSharedInterruptibly(arg)。由于钩子方法中使用短路或,remaining<0则不会cas交换 - remaining>0,cas成功,得到许可,则直接跳出方法,线程则继续下面的代码。在tryAcquireShared钩子方法中,进行过一次cas,若cas成功,返回remain,remain必然>=0,如果cas失败,则重新自旋,重复流程。
AQS的doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//进入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 死循环,
for (;;) {
//获得前驱节点
final Node p = node.predecessor();
// 若前驱为head,则进入申请方法
// 也就是说,非队首节点不能进入该申请方法
if (p == head) {
// 调用钩子方法
int r = tryAcquireShared(arg);
//r>0则成功得到许可,跳出方法
if (r >= 0) {
// 该方法中,将当前节点设为head,并改变状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//进行挂起判断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
总结申请方法
Semaphore的 acquire调用了AQS的模板方法acquireSharedInterruptibly,acquireSharedInterruptibly进一步调用了钩子方法tryAcquireShared进行申请许可,在钩子方法中的死循环中,钩子方法退出循环的方式有两种:(1)remaining<0;(2)remaining<=0且cas成功。若计算出剩余许可remaining<0,则直接返回remaining,若remaining>=0,则进行CAS,若成功,返回r,该r也必然>=0。
在模板方法acquireSharedInterruptibly中,只有所需许可>可用许可时,才会进入方法doAcquireSharedInterruptibly,该方法封装线程节点入队,并且只有队首节点(前驱为head)才能够进行tryAcquireShared钩子调用,申请成功则它成为head,并且unpark后驱。
Acquire时的入队条件
- 模板的if中,调用tryAcquireShared钩子方法,返回负数(申请量大于可用量)。在FairSync中,队列中有其他节点时,会直接返回-1以进入队列
- 只要在钩子方法中,remaining>0,则会重复循环,直到成功或者r<0进入队列。
- NofairSync与FairSync相比,只是钩子方法去掉了队列中是否有结点的判断