首页 > 其他分享 >18.详解AQS家族的成员:Semaphore

18.详解AQS家族的成员:Semaphore

时间:2023-06-08 09:36:35浏览次数:37  
标签:Node 许可 AQS int 18 节点 Semaphore public

关注:王有志,一个分享硬核Java技术的互金摸鱼侠。
欢迎你加入Java人的提桶跑路群共同富裕的Java人

今天我们来聊一聊AQS家族中另一个重要成员Semaphore,我只收集到了一道关于Semaphore的面试题,问了问“是什么”和“如何实现的”:

  • 什么是Semaphore?它是如何实现的?

按照我们的惯例,依旧是按照“是什么”,“怎么用”和“如何实现的”这3步来分析Semaphore。另外,今天提供了题解

Semaphore的使用

Semaphore直译过来是信号量,是计算机科学中非常Old School的处理同步与互斥的机制与互斥锁不同的是它允许指定数量的线程或进程访问共享资源

Semaphore处理同步与互斥的机制和我们平时过地铁站的闸机非常相似。刷卡打开闸机(acquire操作),通过后(访问临界区)闸机关闭(release操作),后面的人才能够继续刷卡,而在前一个人通过前,后面的人只能排队等候(队列机制)。当然,地铁站不可能只有一个闸机,拥有几个闸机,就允许几个人同时通过。

信号量也是这样的,通过构造函数定义许可数量,使用时申请许可,处理完业务逻辑后释放许可:

// 信号量中定义1个许可
Semaphore semaphore = new Semaphore(1);

// 申请许可
semaphore.acquire();

......

// 释放许可
semaphore.release();

当我们为Semaphore定义一个许可时,它和互斥锁相同,同一时间只允许一个线程进入临界区。但是当我们定义了多个许可时,它与互斥锁的差异就体现出来了:

Semaphore semaphore = new Semaphore(3);
for(int i = 1; i < 5; i++) {
  int finalI = i;
  new Thread(()-> {
    try {
      semaphore.acquire();
      System.out.println("第[" + finalI + "]个线程获取到semaphore");
      TimeUnit.SECONDS.sleep(10);
      semaphore.release();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

执行这段代码可以看到,同一时间3个线程都进入了临界区,只有第4个线程被挡在了临界区外。

Semaphore的实现原理

还记得在《AQS的今生,构建出JUC的基础》中提到的同步状态吗?我们当时说它是某些同步器的计数器:

AQS中,state不仅用作表示同步状态,也是某些同步器实现的计数器,如:Semaphore中允许通过的线程数量,ReentrantLock中可重入特性的实现,都依赖于state作为计数器的特性。

先来看Semaphore与AQS的关系:

与ReentrantLock一样,Semaphore内部实现了继承自AQS的同步器抽象类Sync,并有FairSyncNonfairSync两个实现类。接下来我们就通过剖析Semaphore的源码,来验证我们之前的说法。

构造方法

Semaphore提供了两个构造方法:

public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以看到Semaphore和ReentrantLock的设计思路是一致的,Semaphore内部也实现了两个同步器FairSyncNonfairSync,分别实现公平模式和非公平模式,而Semaphore的构造本质上是构造同步器的实现。我们以非公平模式的NonfairSync的实现为例:

public class Semaphore implements java.io.Serializable {
  static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
      super(permits);
    }
  }
  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
      setState(permits);
    }
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  protected final void setState(int newState) {
    state = newState;
  }
}

追根溯源,构造器的参数permits最终还是回归到了AQS的state身上,借助了state作为计数器的特性来实现Semaphore的功能。

acquire方法

现在我们已经为Semaphore设置了一定数量的许可(permits),接下来我们就需要通过Semaphore#acquire方法获取许可,进入Semaphore所“守护”的临界区:

public class Semaphore implements java.io.Serializable {
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    if (tryAcquireShared(arg) < 0) {
      doAcquireSharedInterruptibly(arg);
    }
  }
}

这两步和ReentrantLock非常相似,先通过tryAcquireShared尝试直接获取许可,失败后通过doAcquireSharedInterruptibly加入到等待队列中。

Semaphore中直接获取许可的逻辑非常简单:

static final class NonfairSync extends Sync {
  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
  }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      // 获取可用许可数量
      int available = getState();
      // 计算许可数量
      int remaining = available - acquires;
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}

首先是获取并减少可用许可的数量,当许可数量小于0时返回一个负数,或通过CAS更新许可数量成功后,返回一个正数。此时doAcquireSharedInterruptibly会将当前的申请Semaphore许可的线程添加到AQS的等待队列中。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    
 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
   // 创建共享模式的等待节点
   final Node node = addWaiter(Node.SHARED);
   try {
     for (;;) {
       final Node p = node.predecessor();
       if (p == head) {
         // 再次尝试获取许可,并返回剩余许可数量
         int r = tryAcquireShared(arg);
         if (r >= 0) {
           // 获取成功,更新头节点
           setHeadAndPropagate(node, r);
           p.next = null;
           return;
         }
       }
       // 获取失败进入等待状态
       if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
         throw new InterruptedException();
       }
     }
   } catch (Throwable t) {
     cancelAcquire(node);
     throw t;
   }
 }
}

Semaphore的使用的doAcquireSharedInterruptiblyReentrantLock使用的acquireQueued方法核心逻辑一直,但是有细微的实现差别:

  • 创建节点使用Node.SHARED模式;

  • 更新头节点使用了setHeadAndPropagate方法。

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  setHead(node);
  
  // 是否要唤醒等待中的节点
  if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared()) {
      // 唤醒等待中的节点
      doReleaseShared();
    }
  }
}

我们知道在ReentrantLock中执行acquireQueued,当成功获取锁后,只需要执行setHead(node)即可,那么为什么Semaphore还要再进行唤醒?

假设有3个许可的Semaphore同时有T1,T2,T3和T4总计4个线程竞争:

  • 它们同时进入nonfairTryAcquireShared方法,假设只有T1通过compareAndSetState(available, remaining)成功修改有效的许可数量,T1进入临界区;

  • T2,T3和T4进入doAcquireSharedInterruptibly方法,通过addWaiter(Node.SHARED)构建出AQS的等待队列(参考AQS的今生中关于addWaiter方法的分析);

  • 假设T2成为了头节点的直接后继节点,T2再次执行tryAcquireShared尝试获取许可,T3和T4执行parkAndCheckInterrupt

  • T2成功获取许可并进入临界区,此时Semaphore剩余1个许可,而T3和T4处于暂停状态中。

这种场景中,只有两个许可产生了作用,显然不符合我们对的初衷,因此在执行setHeadAndPropagate更新头节点时,判断剩余许可的数量,当数量大于0时继续唤醒后继节点。

Tips

  • Semaphore在获取许可的流程与ReentrantLock加锁的过程高度相似~~

  • 下文分析doReleaseShared是如何唤醒等待中节点的。

release方法

Semaphore的release方法就非常简单了:

public class Semaphore implements java.io.Serializable {
  public void release() {
    sync.releaseShared(1);
  }
  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        // 计算许可数量
        int next = current + releases;
        if (next < current) {
          throw new Error("Maximum permit count exceeded");
        }
        // 通过CAS更新许可数量
        if (compareAndSetState(current, next)) {
            return true;
        }
      }
    }
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
    }
    return false;
  }
  
  private void doReleaseShared() {
    for (;;) {
      Node h = head;
      // 判断AQS的等待队列是否为空
      if (h != null && h != tail) {
        int ws = h.waitStatus;
        // 判断当前节点是否处于待唤醒的状态
        if (ws == Node.SIGNAL) {
          if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
            continue;
          }
          unparkSuccessor(h);
        } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) {
          // 状态为0时,更新节点的状态为无条件传播
          continue;
        }
      }
      if (h == head) {
        break;
      }
    }
  }
}

我们可以看到Semaphore的release方法分了两部分:

  • tryReleaseShared方法更新Semaphore的有效许可数量;

  • doReleaseShared唤醒处于等待中的节点。

唤醒的逻辑并不复杂,依旧是对节点状态waitStatus的判断,来确定是否需要执行unparkSuccessor,当状态为ws == 0,会将节点的状态更新为Node.PROPAGAT,即无条件传播。

Tips:与ReentrantLock所不同的是,Semaphore并不支持Node.CONDITION状态,同样的ReentrantLock也不支持Node.PROPAGATE状态。

结语

关于Semaphore的内容到这里就结束了,今天我们只具体分析了非公平模式下核心方法的实现,至于公平模式的实现,以及其它方法的实现,就留个大家自行探索了。

好了,希望本文能够带给你一些帮助,我们下次再见!最后欢迎大家关注王有志的专栏《Java面试都问啥?》。

标签:Node,许可,AQS,int,18,节点,Semaphore,public
From: https://www.cnblogs.com/wyz1994/p/17464793.html

相关文章

  • 【每日一题】Problem 1832B - Maximum Sum
    原题解决思路:类似滑动窗口的方式,窗口大小为k次操作,从中找到更大的结果即可误区:一开始的想法是每次都在前一次的基础上减去最少的,然而在样例的第五个测试点出错,因为10+11>15#include<bits/stdc++.h>intmain(){intt;std::cin>>t;while(t--){......
  • 算法学习day50动态规划part11-123、188
    packageLeetCode.DPpart11;/***123.买卖股票的最佳时机III*给定一个数组,它的第i个元素是一支给定的股票在第i天的价格.*设计一个算法来计算你所能获取的最大利润。你最多可以完成两笔交易。*注意:你不能同时参与多笔交易(你必须在再次购买前出售掉之前的股票)。......
  • 针对 B/S、C/S 架构的 180 个简单测试案例——GUI 和可用性测试场景
    这是一个针对web应用和桌面应用程序的测试清单假设:假定你的应用程序支持下列功能:-带有多种字段的表单-子窗口-与数据库交互-多种查询过滤规则和结果显示-图片上传-邮件发送-数据导出GUI和可用性测试场景1.页面中的所有字段(如:文本框,单选选项,下拉列表)应该适当对齐2.除特殊指定外,数......
  • 2021-05-18:金朝阳上课课堂
     ......
  • 针对 B/S、C/S 架构的 180 个简单测试案例—窗口测试用例
    -测试清单可以提供给开发人员查阅,以保证在开发阶段就避免出现一些常见的问题。几点说明:1)用不同的用户角色执行这些测试场景,如:管理用户,来宾用户等。2)对于web应用,这些场景应该在客户认可的多种浏览器的各个版本上进行测试,如:IE,Firefox,Chrome,Safari等。3)用不同的屏幕分辨率进行测试,如......
  • 针对 B/S、C/S 架构的 180 个简单测试案例——结果表测试场景
    这是一个针对web应用和桌面应用程序的测试清单。假设:假定你的应用程序支持下列功能:-带有多种字段的表单-子窗口-与数据库交互-多种查询过滤规则和结果显示-图片上传-邮件发送-数据导出结果表测试场景1.当结果页面加载时长超过默认时长时,应该显示“页面加载中”之类的提示信息2.检......
  • 针对 B/S、C/S 架构的 180 个简单测试案例——图像上传功能的测试场景(也适用于其他文
    1.检查图片上传路径2.检查图像上传和修改功能3.检查各种扩展图像文件的上传(例如JPEG、PNG、BMP等).4.检查文件名中含有空格或其他可用特殊字符的图片的上传5.检查重复名称图片上传6.图片尺寸大于最大允许值,上传时应该显示适当的错误消息.7.检查上传的图片文件类型外的其它文件......
  • P9286 [ROI 2018] Extraction of radium
    来一发简单做法题目链接:P9286[ROI2018]Extractionofradium通过读题目,我们不难想到,找到既是横向最大值又是纵行最大值的位置,可以单独处理横向和纵向,满足一个方向的最大值就标记一次,那么标记两次的位置就是当前局面的一个可行点。这样静态操作就明晰了。现在考虑动态操作,把......
  • 为企业年中业务添动力,华为云618营销季正式开启​
    当下正值年中企业采购旺季。为在年中采购高峰更好服务企业客户,华为云发起618营销季,旨在让广大中小企业享受到更多优质产品、更大福利优惠,一键省心上云,实现稳定持续增长。华为云618营销季于6月1日正式开启,持续至6月30日。活动期间,华为云聚焦制造、电商、游戏、零售等各行各业,推出18......
  • 【每日一题】LeetCode 1185.一周中的第几天
    题目给你一个日期,请你设计一个算法来判断它是对应一周中的哪一天。输入为三个整数:day、month和year,分别表示日、月、年。您返回的结果必须是这几个值中的一个{“Sunday”,“Monday”,“Tuesday”,“Wednesday”,“Thursday”,“Friday”,“Saturday”}。示例输入:day=31,......