首页 > 编程语言 >Semaphore源码分析

Semaphore源码分析

时间:2023-04-27 22:37:22浏览次数:34  
标签:分析 int 信号量 获取 源码 线程 Semaphore 节点

1、Semaphore介绍

  计数信号量 - Semaphore,常用来限制访问资源的线程数量。优点类似限流中的令牌桶算法,只有拿到信号量的线程才能执行,与令牌桶算法未拿到令牌不处理请求不同的是,在Semaphore中未拿到信号量的线程会阻塞等待,直到有某个线程释放了持有的信号量。

2、Semaphore使用

 1 import java.util.concurrent.Semaphore;
 2 import java.util.concurrent.TimeUnit;
 3 
 4 public class TestSemaphore {
 5 
 6     public static void main(String[] args) throws InterruptedException {
 7        // 定义计数信号量
 8        final Semaphore semaphore = new Semaphore(1);
 9        // 创建子线程
10        new Thread(() -> {
11             try {
12                 System.out.println(Thread.currentThread().getName() + "== 待获取令牌 == ");
13                 semaphore.acquire();
14                 System.out.println(Thread.currentThread().getName() + "== 获取到令牌 == ");
15                 System.out.println(Thread.currentThread().getName());
16             } catch (InterruptedException e) {
17                 e.printStackTrace();
18             }finally {
19                 // 子线程休眠8s后,释放令牌资源
20                 try {
21                     TimeUnit.SECONDS.sleep(8);
22                 } catch (InterruptedException e) {
23                     e.printStackTrace();
24                 }
25                 semaphore.release();
26                 System.out.println(Thread.currentThread().getName() + "== 释放令牌 == ");
27             }
28         }).start();
29         
30         // 主线程休眠3s后,获取令牌资源
31         TimeUnit.SECONDS.sleep(3);
32         System.out.println(Thread.currentThread().getName() + "== 待获取令牌 == ");
33         semaphore.acquire();
34         System.out.println(Thread.currentThread().getName() + "== 获取令牌 == ");
35         semaphore.release();
36         System.out.println(Thread.currentThread().getName() + "== 释放令牌,执行结束 == ");
37     }
38 }

  执行结果如下:

 

  

  上述代码流程如下:

  

  Thread1通过acquire()先子线程获取计数信号量中的信号量(令牌),执行Thread1的业务逻辑。此时main线程通过acquire()获取不到信号量,main线程被挂起,线程阻塞。

  Thread1业务逻辑执行完成,通过release()方法将Thread1持有到的信号量还给Semaphore,并唤醒挂起的main线程。

  被唤醒的main线程再次尝试获取Semaphore信号量,获取成功,执行main的业务逻辑,执行完成,调用release()将信号量还给Semaphore。

3、Semaphore源码分析

  Semaphore基于 AQS + CAS 实现的,可根据构造参数的布尔值,选择使用公平锁,还是非公平锁。Semaphore默认使用非公平锁。

  Semaphore详情如下:

  

3.1、构造函数

 1 // AQS的实现
 2 private final Sync sync;
 3 
 4 // 默认使用非公平锁
 5 public Semaphore(int permits) {
 6     sync = new NonfairSync(permits);
 7 }
 8 
 9 // 根据fair布尔值选择使用公平锁还是非公平锁 
10 public Semaphore(int permits, boolean fair) {
11     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
12 }

3.2、公平锁与非公平锁

  Semaphore中公平锁与非公平锁的实现,可以在tryAcquireShared()方法中找到两种锁的区别。  

  

1、NonfairSync

  Semaphore#NonfairSync#tryAcquireShared() 详情如下

1 // 非公平锁  获取信号量
2 protected int tryAcquireShared(int acquires) {
3     return nonfairTryAcquireShared(acquires);
4 }

  Semaphore#Sync#nonfairTryAcquireShared() 详情如下

 1 // 非公平锁 获取信号量
 2 final int nonfairTryAcquireShared(int acquires) {
 3     // 自旋
 4     for (;;) {
 5         // 获取Semaphore中可用的信号量数
 6         int available = getState();
 7         // 当前可用信号量数 - acquires
 8         int remaining = available - acquires;
 9         // 可用信号量数不足 或 CAS操作获取信号量失败,返回  当前可用信号量数 - acquires
10         if (remaining < 0 ||
11             compareAndSetState(available, remaining))
12             return remaining;
13     }
14 } 

2、FairSync

  Semaphore#FairSync#tryAcquireShared() 详情如下

 1 protected int tryAcquireShared(int acquires) {
 2    // 自旋
 3     for (;;) {
 4         // 等待队列中挂起线程,返回-1 (根据返回的-1,将当前线程添加到等待队列中)
 5         if (hasQueuedPredecessors())
 6             return -1;
 7         // 尝试获取Semaphore的信号量,下面与非公平锁逻辑相同
 8         int available = getState();
 9         int remaining = available - acquires;
10         if (remaining < 0 ||
11             compareAndSetState(available, remaining))
12             return remaining;
13     }
14 }

3、总结

不难看出,公平锁与非公平锁的区别在于当线程尝试获取Semaphore中的信号量时:

  公平锁,优先判断等待队列中是否有挂起的线程,如果有,则将当前线程添加到等待队列中,等待唤醒后抢夺信号量;

  非公平锁,不管等待队列中是否有挂起线程,优先尝试获取信号量,获取失败,将当前线程添加到等待队列。

3.3、acquire()

  Semaphore默认实现的是非公平锁,acquire()按非公平锁的实现进行源码分析。

  Semaphore 中获取一个信号量,Semaphore#acquire() 详情如下:

1 //  Semaphore 中无信号量,阻塞
2 public void acquire() throws InterruptedException {
3     // 获取 Semaphore 信号量
4     sync.acquireSharedInterruptibly(1);
5 }

  AbstractQueuedSynchronizer#acquireSharedInterruptibly() 详情如下:

 1 public final void acquireSharedInterruptibly(int arg)
 2         throws InterruptedException {
 3     // 线程中断,抛出异常
 4     if (Thread.interrupted())
 5         throw new InterruptedException();
 6     // 尝试获取Semaphore的信号量
 7     if (tryAcquireShared(arg) < 0)
 8         // 尝试获取信号量失败,再次获取Semaphore信号量
 9         doAcquireSharedInterruptibly(arg);
10 }

  获取Semaphore中的信号量,Semaphore#Sync#nonfairTryAcquireShared():

 1 // 非公平锁 获取信号量
 2 final int nonfairTryAcquireShared(int acquires) {
 3     // 自旋
 4     for (;;) {
 5         // 获取Semaphore中可用的信号量数
 6         int available = getState();
 7         // 当前可用信号量数 - acquires
 8         int remaining = available - acquires;
 9         // 可用信号量数不足 或 CAS操作获取信号量失败,返回值 小于 0
10         if (remaining < 0 ||
11             compareAndSetState(available, remaining))
12             return remaining;
13     }
14 }

  获取Semaphore中的信号量,Semaphore#Sync#nonfairTryAcquireShared():

 1 private void doAcquireSharedInterruptibly(int arg)
 2     throws InterruptedException {
 3     final Node node = addWaiter(Node.SHARED);
 4     boolean failed = true;
 5     try {
 6         // 自旋
 7         for (;;) {
 8             final Node p = node.predecessor();
 9              // 当前节点的前驱节点为等待队列头节点
10             if (p == head) {
11                 // 尝试获取信号量
12                 int r = tryAcquireShared(arg);
13                 // 获取信号量成功
14                 if (r >= 0) {
15                     // 唤醒等待队列中的待唤醒线程
16                     setHeadAndPropagate(node, r);
17                     p.next = null; 
18                     failed = false;
19                     return;
20                 }
21             }
22             // 获取信号量失败,挂起线程  ==> 线程阻塞,待唤醒进行下一轮自旋
23             if (shouldParkAfterFailedAcquire(p, node) &&
24                 // 若当前线程被中断,抛出InterruptedException异常
25                 parkAndCheckInterrupt())
26                 throw new InterruptedException();
27         }
28     } finally {
29         if (failed)
30             cancelAcquire(node);
31     }
32 }

  AbstractQueuedSynchronizer#setHeadAndPropagate()

 1 // node: 当前节点;propagate 剩余资源
 2 private void setHeadAndPropagate(Node node, int propagate) {
 3     // 获取等待队列中的头节点
 4     Node h = head; 
 5     // 将当前Node节点设置为等待队列的头节点
 6     setHead(node);
 7     // 剩余资源大于0 || 原等待队列中的头节点为null || 原等待队列中 Node 的 ws 为 -1 或者 -3(共享锁) 
 8     if (propagate > 0 || h == null || h.waitStatus < 0 ||  (h = head) == null || h.waitStatus < 0) {
 9         // 获取当前等待队列头节点的后继节点
10         Node s = node.next;
11         // 当前节点的后继节点为null  或 当前节点的后继节点为共享锁
12         if (s == null || s.isShared())
13             doReleaseShared();
14     }
15 }

3.4、release()

  Semaphore默认实现的是非公平锁,release()按非公平锁的实现进行源码分析。

  归还Semaphore的信号量,Semaphore#release() 详情如下:

1 // 归还Semaphore的信号量
2 public void release() {
3     sync.releaseShared(1);
4 }

  归还信号量,Semaphore#Sync#releaseShared() 详情如下:

 1 public final boolean releaseShared(int arg) {
 2     // 尝试归还信号量
 3     if (tryReleaseShared(arg)) {
 4         // 归还信号量
 5         doReleaseShared();
 6         // 归还成功
 7         return true;
 8     }
 9     // 归还失败
10     return false;
11 }

  归还信号量,Semaphore#Sync#releaseShared() 详情如下:

 1 // 尝试归还信号量
 2 protected final boolean tryReleaseShared(int releases) {
 3     // 自旋
 4     for (;;) {
 5         // 获取Semaphore中可用的信号量数
 6         int current = getState();
 7         // 当前可用信号量数 + 归还的信号量 releases
 8         int next = current + releases;
 9         // 超出了int的最大值,变成了负数
10         if (next < current)
11             throw new Error("Maximum permit count exceeded");
12         // cas操作,将信号量归还给Semaphore
13         if (compareAndSetState(current, next))
14             return true;
15     }
16 }

  归还信号量成功,唤醒等待队列中的挂起线程,AbstractQueuedSynchronizer#doReleaseShared() :

 1 private void doReleaseShared() {
 2     // 自旋
 3     for (;;) {
 4         // 获取等待队列头节点
 5         Node h = head;
 6         // 等待队列中有排队的线程
 7         if (h != null && h != tail) {
 8             int ws = h.waitStatus;
 9             // 等待队列头节点ws = -1,说明其后继节点中有待唤醒的线程
10             if (ws == Node.SIGNAL) {
11                 // cas 操作,等待队列头节点的 ws 由 -1 更新为 0 ,cas失败,继续下一次自旋
12                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
13                     continue;           
14                 // 唤醒头节点的后继节点中待唤醒线程
15                 unparkSuccessor(h);
16             }
17             // 解决共享锁JDK1.5的bug,头节点的 ws 为0,将头节点的 ws 设置为 -3 ,代表后继节点中可能有待唤醒的线程
18             else if (ws == 0 &&
19                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
20                 continue;               
21         }
22         if (h == head)              
23             break;
24     }
25 }
 

标签:分析,int,信号量,获取,源码,线程,Semaphore,节点
From: https://www.cnblogs.com/RunningSnails/p/17360378.html

相关文章

  • 2023国内外毫米波雷达行研分析
    写在前面笔者做这项工作的目的是希望为课题组寻找毫米波雷达+智慧驾驶领域寻找可行的趋势与方向,尽可能贴近工业界需求。在这项工作中,笔者总结了以多级联、集成芯片、级联+虚拟孔径成像为代表方案的国内外相关公司,由于信息繁冗、时间紧张,笔者在本报告中尽可能体现了相关公司的有用......
  • CountDownLatch源码分析
    1、CountDownLatch介绍CountDownLatch让一个或多个线程等待其他线程执行完成后再执行。在创建CountDownLatch对象时,必须指定线程数count,每当一个线程执行完成调用countDown()方法,线程数count减1,当count减到0时,await()方法就不再阻塞。2、CountDownLatch使用1importjava......
  • vue2源码-十六、异步组件
    异步组件Vue中异步组件的写法有很多,主要用作大的组件异步加载的markdown组件editor组件。就是先渲染一个注释标签,等组件加载完毕,最后再重新渲染forceUpdate(图片懒加载)使用异步组件会配合webpack原理:异步组件默认不会调用Vue.extend()方法所有Ctor上没有cid属性,没有cid属......
  • TK Mall:以技术创新与数据分析驱动的全球跨境电商平台
    2022年,TKMall在新加坡成立,是一家以技术创新和数据分析为驱动力的全球跨境电商平台,该平台凭借其高质量的服务和领先的技术优势,在全球范围内迅速崛起。作为一家新兴的跨境电商平台,TKMall主打移动电商业务,其90%的卖家来自中国,同时也是日韩、东南亚最受TikTok主播青睐的带货选品移动......
  • RCU-3——经典(可抢占)RCU代码分析
    基于Linux-5.10一、相关数据结构1.structrcu_statercu_state用于描述RCU全局状态。structrcu_state{structrcu_nodenode[NUM_RCU_NODES];/*Hierarchy.*/structrcu_node*level[RCU_NUM_LVLS+1];/*Hierarchylevels(+1toshutbogusgccwarning)......
  • RCU-4——不可抢占RCU代码分析
    基于Linux-5.10一、不可抢占RCU1.不可抢占RCU不允许进程在读端临界区被其它进程抢占,使用函数rcu_read_lock_sched()/rcu_read_unlock_sched()标记读临界区。前者禁止内核抢占,后者开启内核抢占。staticinlinevoidrcu_read_lock_sched(void)//include/linux/rcupdate.h......
  • nginx出现504错误的原因分析及解决
    线上环境使用的是nginx代理到后端应用(java),对应用进行了一次压测发现nginx出现大量的504代码,即网关超时(GatewayTime-out)错误。 原因分析:首先504是网关超时错误,通常是nginx将请求代理到后端应用时,后端应用没有在规定的时间返回数据,需要开发检查下应用那块有什么耗时的操作,比如:......
  • ubuntu2004 下源码安装boost
    ubuntu2004下源码安装boosthttps://www.aiuai.cn/aifarm1186.htmlhttps://www.boost.org/users/history/version_1_78_0.htmlhttps://cloud.tencent.com/developer/article/1804511https://stackoverflow.com/questions/12578499/how-to-install-boost-on-ubuntuboost版本在......
  • SpringCloud微服务架构分析说明!
    SpringCloud是一个基于SpringBoot的微服务框架,它提供了一系列的工具和组件,用于构建分布式系统中各个微服务之间的通信和互联,实现服务发现、负载均衡、分布式配置等功能。下面我们来具体解析一下SpringCloud微服务架构。服务注册与发现在微服务架构中,服务的数量非常多,因此需要一个机......
  • Python的OS模块分析文件路径层次信息——获取文件路径、全名(文件名+尾缀)
    Code:importosFilePath="/a/bb/ccc/dddd.png"FolderPath,FullFileName=os.path.split(FilePath)Name,Suffix=os.path.splitext(FullFileName)print("文件路径:{}\n文件全名:{}\n文件名:{}\n文件后缀:{}".format(FolderPath,FullFileName,Name,......