Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它可以控制同一时间内对资源的访问次数.
无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.
Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。
信号量上定义两种操作:
- acquire(获取):当一个线程调用acquire操作时,它要么成功获取到信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时,Semaphore内部会维护一个等待队列用于存储这些被暂停的线程.
- release(释放)实际上会将信号量的值+1,然后唤醒相应Sepmaphore实例的等待队列中的一个任意等待线程.
信号量主要用于两个目的:
用于多个共享资源的互斥使用
用于并发线程数的控制
例子
以下的例子:5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位.
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//申请资源
System.out.println(Thread.currentThread().getName()+"抢到车位");
ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
System.out.println(Thread.currentThread().getName()+"归还车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放资源
semaphore.release();
}
}
},"线程"+i).start();
}
}
注意事项
Semaphore.acquire()和Semaphore.release()总是配对使用的,这点需要由应用代码自身保证.
Semaphore.release()调用应该放在finally块中,已避免应用代码出现异常的情况下,当前线程所获得的信号量无法返还.
如果Semaphore构造器中的参数permits值设置为1,所创建的Semaphore相当于一个互斥锁.与其他互斥锁不同的是,这种互斥锁允许一个线程释放另外一个线程所持有的锁.因为一个线程可以在未执行过Semaphore.acquire()的情况下执行相应的Semaphore.release().
默认情况下,Semaphore采用的是非公平性调度策略.
原理
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略
}
Semaphore内部使用Sync类,Sync又是继承AbstractQueuedSynchronizer,所以Sync底层还是使用AQS实现的.Sync有两个实现类NonfairSync和FairSync,用来指定获取信号量时是否采用公平策略.
初始化方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
如上所示,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。
参数permits被传递给AQS的state值,用来表示当前持有的信号量个数.
void acquire()方法
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等0,则当前线程会被放入AQS的阻塞队列。当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。
//Semaphore方法
public void acquire() throws InterruptedException {
//传递参数为1,说明要获取1个信号量资源
sync.acquireSharedInterruptibly(1);
}
//AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//(2)否则调用Sync子类方法尝试获取,这里根据构造函数确定使用公平策略
if (tryAcquireShared(arg) < 0)
//如果获取失败则放入阻塞队列.然后再次尝试,如果使用则调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
由如上代码可知,acquire()在内部调用了Sync的acquireSharedlnterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。尝试获取信号量资源的AQS的方法 tryAcquireShared是由Sync的子类实现的,所以这里分别从两 方面来讨论。
先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;