一、Semaphore
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
信号量通过一组许可证来控制对共享资源的访问。
如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程就会醒来尝试获取许可。
Semaphore提供给了若干个api对应不同的功能:
- Semaphore(int permits):非公平模式创建;
- Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
- acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
- acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
- acquireUninterruptibly():尝试获取一个许可,不可中断;
- acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
- tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
- tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
- tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
- tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
- release():释放一个许可;
- release(int permits):释放n个许可;
下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly():
这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。
二、执行原理
Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:
- 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
- 执行release的时候,state + releases,把许可加回去。
三、Semaphore用法
/**
* @Description: 演示Semaphore用法
*/
public class SemaphoreDemo {
public static Semaphore semaphore = new Semaphore(3,true);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"拿到了许可证");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(Thread.currentThread().getName()+"释放了许可证");
semaphore.release();
}
});
}
executorService.shutdown();
}
}
注意,如果使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。
四、源码
公平信号量 和 非公平信号量 的区别
"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!
不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。
4.1 Semaphore构造方法
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}
-
1、Semaphore 构造器,permits 为传入的许可证数,默认非公平构造器;
-
2、Semaphore 构造器,permits 为传入的许可证数,fair 是 boolean 型的,如果传入 true,则公平,否则不公平;
4.2 NonfairSync 和 FairSync源码
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
两者都继承了 Sync 同步器,初始化时都调用了父类构造器,同时都有一个获取信号的方法,稍后再分析获取信号的区别。
4.3 acquire(获取信号量)
- 这个方法是从信号量获取一个许可,在获取到许可,或线程中断之前,当前线程阻塞;获取许可后立即返回并将许可数减一
public class Semaphore implements java.io.Serializable {
private final Sync sync;
/**
* 如果没有许可可用,则会休眠,直到发生以下两种情况
* 1、其他调用release方法释放许可,并且当前线程获取到许可
* 2、其他线程中断了当前线程
* 1)当前线程在进入这个方法时设置了中断标志位
* 2)等待许可时发生了中断,则抛出中断异常
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
- acquireSharedInterruptibly 这个方法是直接调用AQS的acquireSharedInterruptibly(int ard)方法;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 首先检测是否中断.中断后抛出异常
* 尝试获取许可,成功退出;失败则进入AQS队列,直至成功获取或中断
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁,返回剩余共享锁的数量;小于0则加入同步队列,自旋
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
}
tryAcquireShared(arg)则会调用Semaphore中两个同步器的tryAcquireShared实现方法; 如果获取失败则加入队列等待唤醒;
4.4 非公平模式的实现
非公平实现都是首先查看是否有可获取的许可,如果有则获取成功,没有则进队列等待;利用此可以提高并发量
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
}
- 直接调用其父类Sync中非公平共享获取
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 自旋直到无许可或者状态位赋值成功
for (;;) {
int available = getState();
int remaining = available - acquires;
// 如果小于0则直接返回,否则利用CAS给AQS状态位赋值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
通过自旋+CAS来一直尝试获取许可,直到获取成功或者没有许可,返回剩余的许可数
4.5 公平模式的实现
公平与非公平的区别在于始终按照AQS队列FIFO的顺序来的
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
//自旋 CAS 实现线程安全
for (;;) {
// 判断是否有前置任务排队
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 如果小于0则直接返回,否则利用CAS给AQS状态位赋值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
如果等待队列不为空,则直接返回-1。 以上两种模式获取失败后都会调用doAcquireSharedInterruptibly(int arg);自旋等待获取锁
- doAcquireSharedInterruptibly方法:会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//创建“当前线程”的Node节点,且node中记录的锁是“共享锁”类型,并将节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前继节点,如果前继节点是等待锁队列的表头,则尝试获取共享锁
// 判断新增的节点的前一个节点是否头节点
final Node p = node.predecessor();
if (p == head) {
// 是头节点,那么在此尝试获取共享锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取成功,把当前节点变为新的head节点,
//并且检查后续节点是否可以在共享模式下等待,
//并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//前继节点不是头节点,当前线程一直等待,直到获取到锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
- shouldParkAfterFailedAcquire方法:判断当前线程获取锁失败之后是否需要挂起
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/*说明:4.shouldParkAfterFailedAcquire 返回当前线程是否应该阻塞
(01) 关于waitStatus请参考下表(中扩号内为waitStatus的值)
CANCELLED[1] -- 当前线程已被取消
SIGNAL[-1] -- “当前线程的后继线程需要被unpark(唤醒)”。
一般发生情况是:当前线程的后继线程处于阻塞状态,
而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
[0] -- 当前线程不属于上面的任何一种状态。
(02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。
规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
if (ws == Node.SIGNAL)
return true;
// 如果前继节点是取消的状态即前驱节点状态为CANCELLED
if (ws > 0) {
// 从队尾向前寻找第一个状态不为CANCELLED的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点的状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
4.6 void release()
公平和非公平使用相同的释放 释放许可
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public void release() {
sync.releaseShared(1);
}
}
- 调用AQS中的releaseShared(int arg)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//目的是让当前线程释放它所持有的共享锁,它首先会通过tryReleaseShared()去尝试释放共享锁。
//尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//唤醒所有共享节点线程
doReleaseShared();
return true;
}
return false;
}
}
- tryReleaseShared()在Semaphore.Sync中被重写,释放共享锁,将锁计数器加回去
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取“锁计数器”的状态
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通过CAS函数进行赋值。
if (compareAndSetState(current, next))
return true;
}
}
}
}
- 如果释放许可成功,则调用AQS中的doReleaseShared()方法来唤醒AQS队列中等待的线程
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 唤醒同步队列中的一个线程
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//是否需要唤醒后继节点
if (ws == Node.SIGNAL) {
//修改状态为初始0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒h.nex节点线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
-
1)获取队列的头节点元素,如果不为null,并且不为尾节点,说白了,就是不止一个人等待,进入判断。
-
2)如果线程节点是需要唤醒的线程,则进行唤醒,获取资源使用。
-
3)失败后重试。
-
4)如果没有后继需要唤醒的节点,则退出,就相当于每人排队上厕所了,让出来资源就空着。
Semaphore 总结
-
1、Semaphore 内部维护一组信号量,即一个 volatile 的整型 state 变量。
-
2、Semaphore 分为公平或非公平两种方式,获取信号量或释放信号量的本质是对 state 进行原子的减少或增加操作。
-
3、获取不到信号的线程放在等待队列里面,释放信号的时候会唤醒后继节点。
-
4、Semaphore 主要用于对线程数量、公共资源(比如数据库连接池)等进行数量控制。
参考: https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html
https://www.cnblogs.com/200911/p/6060359.html
https://juejin.cn/post/6844904119547723789
https://blog.csdn.net/yhl_jxy/article/details/87279383
标签:Java,许可,int,编程,获取,线程,Semaphore,节点 From: https://blog.51cto.com/u_14014612/6031636