目录
1、Semaphore信号量源码解析(基于jdk11)
1.1 Semaphore概述
public class Semaphore
extends Object
implements Serializable
Semaphore直译过来就是信号量,被作为一种多线程并发控制工具来使用。
Semaphore可以控制同时访问共享资源的线程个数,线程通过 acquire方法获取一个信号量,信号量减一,如果没有就等待;通过release方法释放一个信号量,信号量加一。它通过控制信号量的总数量,以及每个线程所需获取的信号量数量,进而控制多个线程对共享资源访问的并发度,以保证合理的使用共享资源。
1.2 Semaphore的原理
1.2.1 基本结构(jdk11)
根据UML类图,可以很明显的看出来Semaphore和CountDownLatch一样都是直接使用AQS实现的。
区别就是Semaphore还分别实现了公平模式FairSync和非公平模式NonfairSync两个内部类。
实际上公平与非公平只是在获取信号量的时候得到体现,它们的释放信号量的方法都是一样的,这就类似于ReentrantLock:公平与非公平只是在获取锁的时候得到体现,它们的释放锁的方法都是一样的。
在构造器部分,如同CountDownLatch 构造函数传递的初始化计数个数count被赋给了AQS 的state 状态变量一样,Semaphore的信号量个数permits同样赋给了AQS 的state 值。
在创建Semaphore时可以使用一个fair变量指定是否使用公平策略,默认是非公平模式。公平模式会确保所有等待的获取信号量的线程按照先进先出FIFO顺序获取信号量,而非公平没有这个保证。
非公平的吞吐量比公平模式的吞吐量更高,而公平模式则可以避免线程饥饿。
1.2.2 可中断获取信号量
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程或者线程被中断。获取一个信号量就立即返回,将可用的信号量数减 1。
如果调用此方法时已被中断或者等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。
内部调用AQS的acquireSharedInterruptibly方法,这实际上就是共享式可中断获取资源的模版方法,因此Semaphore和CountDownLatch一样都是基于共享资源模式。
/**
* Semaphore的acquire方法
* 从信号量获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒或者线程被中断。
*
* @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
*/
public void acquire() throws InterruptedException {
//内部调用AQS的acquireSharedInterruptibly方法
//这实际上就是共享式可中断获取资源模版方法
sync.acquireSharedInterruptibly(1);
}
/**
* 从信号量获取permits个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒或者线程被中断。
*
* @param permits 需要获取的信号量数量
* @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
//参数就是permits
sync.acquireSharedInterruptibly(permits);
}
/**
1. AQS的acquireSharedInterruptibly方法
2. 共享式可中断获取信号量资源的模版方法
3. 4. @param arg 需要获取的信号量资源数量
5. @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//最开始就检查一次,如果当前线程是被中断状态,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//调用tryAcquireShared尝试获取共享信号量资源,这个方法是子类自己重写的
//如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
//Semaphore的FairSync和NonfairSync对tryAcquireShared分别做出了公平和不公平的实现
if (tryAcquireShared(arg) < 0)
//获取不到就执行doAcquireSharedInterruptibly方法
doAcquireSharedInterruptibly(arg);
}
在获取共享信号量资源的时候,Semaphore还实现了公平模式和非公平模式!它们的实现实际上和lock锁的实现中锁资源的公平、非公平获取非常类似!
1.2.3 公平模式
公平模式调用FairSync的tryAcquireShared方法!
根据AQS、ReentrantLock、ReadWriteLock的源码,我们会发现hasQueuedPredecessors方法,这个方法是AQS为实现公平模式的预定义方法,AQS帮我们实现了,该方法用于查询是否有任何线程等待获取信号量资源的时间超过当前线程。
具体步骤为:
- 开启一个死循环
- 调用hasQueuedPredecessors方法,判断是否有线程比当前线程更早地请求获取信号量。如果有,则返回true,由于是公平模式,tryAcquireShared则直接返回-1不应该获取信号量资源,表示获取信号量失败。
- 到这里还没返回,表示当前线程就是最早请求获取信号量资源的,可以尝试获取;
- 获取state的值available,state代表信号量资源数。remaining为available减去需要获取的信号量资源数量之后的差值。
- 如果remaining<0,那么返回remaining值,由于是负数,所以获取失败;如果>0表示可以获取成功,尝试CAS更新state,更新成功后同样返回remaining值;
- 如果remaining大于等于0,但是CAS更新state失败,那么循环重试。
展开:如果remaining大于等于0,但是CAS更新state失败,那么会循环重试,这里为什么要重试呢?
因为可能会有多个线程同时获取信号量资源,但是由于CAS只能保证一次只有一个线程成功,因此其他线程必定失败,但此时,实际上还是存在剩余的信号量资源没有被获取完毕的,因此让其他线程重试,相比于直接加入到同步队列中,对于信号量资源的利用率更高!
/**
* 公平模式
*/
static final class FairSync extends Sync {
/**
* 尝试公平的获取共享信号量资源
*
* @param acquires 获取信号量资源数量
* @return 如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
*/
protected int tryAcquireShared(int acquires) {
/*开启一个循环尝试获取共享信号量资源*/
for (; ; ) {
//这是AQS实现公平模式的预定义的方法,AQS帮我们实现好了。该方法用于查询是否有任何线程等待获取信号量资源的时间超过当前线程
//如果该方法返回true,则表示有线程比当前线程更早地请求获取信号量资源。由于是公平的的,因此当前线程不应该获取信号量资源,直接返回-1,表示获取信号量资源失败
if (hasQueuedPredecessors())
return -1;
//到这里,表示当前线程就是最早请求获取信号量资源,可以尝试获取
//获取state的值available,我们知道state代表信号量资源数量
int available = getState();
//remaining为available减去需要获取的信号量资源数量之后的差值
int remaining = available - acquires;
//如果remaining小于0,那么返回remaining值,由于是负数,因此获取失败
//如果大于等于0,那么表示可以获取成功,尝试CAS的更新state,更新成功之后同样返回remaining,由于是大于等于0的数,因此获取成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
//如果remaining大于等于0,但是CAS更新state失败,那么循环重试
}
}
}
1.2.4 非公平模式
非公平模式调用NonfairSync的tryAcquireShared方法
相比于公平模式的实现,少了hasQueuedPredecessors的判断。可以想象:如果某线程A 先调用了aquire()方法获取信号量,但是如果当前信号量个数为0,那么线程A 会被放入AQS 的同步队列阻塞。
另外,非公平模式的具体实现是在父类Sync中的nonfairTryAcquireShared方方法,为什么该方法要实现在父类中的,因为无论是指定的公平模式还是非公平模式,它们的tryAcquire方法都是调用的nonfairTryAcquireShared方法,即非公平的,因此实现在父类中!
1.2.4 不可中断获取信号量
public void acquireUninterruptibly()
不可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程。获取一个信号量就立即返回,将可用的信号量数减 1。
相比于acquire()方法,该方法不响应中断,不会抛出InterruptedException
实际上内部调用AQS的acquireShared方法,这实际上就是共享式获取资源的模版方法式。
1.2.5 超时可中断获取信号量
public boolean tryAcquire(long timeout, TimeUnit unit)
超时可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程或者线程被中断或者阻塞超时。获取一个信号量就立即返回,将可用的信号量数减 1。
如果调用此方法时已被中断或者等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。
实际上内部调用AQS的tryAcquireSharedNanos方法,这实际上就是共享式超时可中断获取资源的模版方法。
1.2.6 尝试获取信号量
public boolean tryAcquire()
仅在调用时至少存在至少一个可用信号量,才尝试获取一个信号量。
实际上内部就是直接调用的nonfairTryAcquireShared方法,即公平模式和非公平模式的tryAcquire实现是一样的!并且该方法不会阻塞线程,获取成功返回true,获取失败返回false!
1.2.7 释放信号量
public void release()
释放一个信号量,信号量总数加1。释放成功后,将唤醒在同步队列中等待获取信号量的结点(线程)!
公平模式和非公平模式的信号量的释放都是一样的。实际上内部调用AQS的releaseShared方法,这实际上就是共享式释放资源的模版方法。
1.3 Semaphore的使用
Semaphore可以用来控制多线程对于共享资源访问的并发量!
例:有10台电脑,但是有20个工人,一台电脑只能被一个员工使用,这时就需要限制了
我们可以通过Semaphore与之前的CountDownLatch搭配线程池来轻松实现。我们能发现,采用非公平模式的Semaphore时工人的总工作量大部分情况下要高于采用公平模式的工人总工作量,即非公平模式的执行效率更高(这是不一定的)。我们还能发现,在非公平模式工人的总工作量高于公平模式的工人总工作量时,非公平模式下总会有某些工人工(特别是工人0、1、2)作量更多,而另一些工人工作量更少,这就是线程饥饿!
1.4 Semaphore的总结
Semaphore和CountDownLatch的原理都差不多,都是直接使用AQS的共享模式实现自己的逻辑,都是对于AQS的state资源的利用,但是它们却实现了不同的功能,CountDownLatch中state被看作一个倒计数器,当state变为0时,表示线程可以放开执行。而Semaphore中的state被看作信号量资源,获取不到资源则可能会阻塞,获取到资源则可以访问共享区域,共享区域使用完毕要记得还回信号量。
注意:另外还需要注意的是,如果在AQS的同步队列中队头结点线程需要获取n个资源,目前有m个资源,如果m小于n,那么这个队列中的头结点线程以及后面的所有结点线程都会因为不能获取到资源而继续阻塞
标签:中断,jdk11,信号量,获取,源码,线程,公平,Semaphore From: https://www.cnblogs.com/checkcode/p/Semaphore.html