首页 > 编程语言 >Semaphore源码解析

Semaphore源码解析

时间:2023-02-18 09:23:15浏览次数:69  
标签:许可 permits int 可用 信号量 源码 线程 Semaphore 解析

Semaphore源码解析

描述:

一个计数信号量。从概念上讲,信号量维护一组许可。每个acquire() 方法在必要时阻塞,直到获得许可,然后才能使用它。每次 release() 释放一个许可,潜在地释放一个阻塞获取者。但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。

信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。例如,下面是一个使用信号量来控制对项目池的访问的类:

class Pool {
  private static final int MAX_AVAILABLE = 100;
  private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
  public Object getItem() throws InterruptedException {
    available.acquire();
    return getNextAvailableItem();
  }
  public void putItem(Object x) {
    if (markAsUnused(x))
      available.release();
  }
  
  // 不是一个特别有效的数据结构;只是为了演示
 protected Object[] items = ... 被管理的任何类型的项目
 protected boolean[] used = new boolean[MAX_AVAILABLE];
 protected synchronized Object getNextAvailableItem() {
   for (int i = 0; i < MAX_AVAILABLE; ++i) {
     if (!used[i]) {
        used[i] = true;
        return items[i];
     }
   }
   return null; // not reached
 }
 protected synchronized boolean markAsUnused(Object item) {
   for (int i = 0; i < MAX_AVAILABLE; ++i) {
     if (item == items[i]) {
        if (used[i]) {
          used[i] = false;
          return true;
        } else
          return false;
     }
   }
   return false;
 }
}

在获取一个项目之前,每个线程必须从信号量获得一个许可,以保证项目可以使用。当线程处理完项目后,将其返回到池中,并向信号量返回一个许可,允许另一个线程获取该项目。注意,当调用 acquire() 时,不会持有同步锁,因为这将阻止某个项目返回到池中。信号量封装了限制对池的访问所需的同步,与维护池本身一致性所需的同步分开。

初始化为1的信号量,使用时最多只有一个许可,可以作为互斥锁。这通常被称为 二进制信号量 ,因为它只有两种状态:一个许可证可用,或者零许可证可用。当以这种方式使用时,二进制信号量具有属性(与许多java.util.concurrent.locks Lock 的实现不同),那么" Lock "可以由线程所有者以外的线程释放(因为信号量没有所有权的概念)。这在一些特殊的上下文中很有用,比如死锁恢复。

该类的构造函数可选地接受一个公平性形参。当设置为false时,该类不保证线程获得许可的顺序。特别是,闯入是允许的,也就是说,调用 acquire 的线程可以在已经等待的线程之前分配一个许可 - - 逻辑上,新线程将自己置于等待线程队列的头部。当公平性设置为true时,信号量保证调用 acquire() 方法的线程,会被选择以按照它们调用这些方法的顺序获得许可(先进先出;FIFO)。请注意,FIFO 排序必须应用于这些方法中的特定内部执行点。因此,一个线程可以在另一个线程之前调用 acquire ,但在另一个线程之后到达排序点,从方法返回时也是类似的情况。还要注意,未计时的 tryAcquire() 方法不遵守公平性设置,而是接受任何可用的许可。

通常,用于控制资源访问的信号量应该初始化为公平,以确保没有线程因访问资源而饿死。当将信号量用于其他类型的同步控制时,非公平排序的吞吐量优势通常超过公平性考虑。

这个类还提供了方便的方法来 acquire(int) 和 release(int) 一次释放多个许可证。当没有公平地使用这些方法时,请注意增加无限期延期的风险。

内存一致性影响:线程中调用 “release” 方法之前的操作。例如 release() happen-before acquire

源码:

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    
    /** 所有机制通过AbstractQueuedSynchronizer子类 */
    private final Sync sync;

    /**
     * 信号量的同步实现。使用AQS状态表示许可证。子类分为公平和非公平版本。
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        /**
         * 非公平模式获取许可
         */
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        /**
         * 释放许可
         */        
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        /**
         * 减少许可
         */
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        /**
         * 清空许可(许可证数置为0)
         */        
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * 非公平版本
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * 公平版本
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**
     * 构造函数:指定许可证个数,采用非公平版本
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 构造函数:指定许可证个数,指定是否公平版本
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 从这个信号量获取一个许可,阻塞直到有一个可用,或者线程被中断。
     *
     * 1.获取一个许可证(如果有)并立即返回,将可用许可证的数量减少一个。
     * 2.如果没有可用的许可,那么当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到发生以下两种情况之一:
     * 其他线程为这个信号量调用  release 方法,当前线程下一个将被分配一个许可; 或其他线程中断当前线程。
     * 3.如果当前线程: 在进入此方法时设置其中断状态; 或当等待线程中断时,线程中断 。然后 InterruptedException 被抛出,当前线程的中断状态被清除。
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 从这个信号量获取一个许可,阻塞直到一个可用。
     *
     * 1.获取一个许可证(如果有)并立即返回,将可用许可证的数量减少一个。
     * 2.如果没有可用的许可,则当前线程出于线程调度的目的被禁用,并处于休眠状态,直到其他线程调用这个信号量的 release 方法,当前线程下一个将被分配一个许可。
     * 3.如果当前线程在等待许可时被中断,那么它将继续等待,但与未发生中断时线程收到许可证的时间相比,分配给线程的许可证时间可能会发生变化。当线程从这个方法返回时,它的中断状态将被设置。
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 仅当调用时可用时,才从此信号量获取许可。
     *
     * 1.获取一个许可(如果有)并立即返回,值为  true ,将可用的许可数量减少1。
     * 2.如果没有可用的许可,则该方法将立即返回值  false 。
     * 3.即使这个信号量被设置为使用公平排序策略,如果有一个许可可用,调用 tryAcquire() 将立即获得一个许可,无论当前是否有其他线程正在等待。
     *
     * 这个“闯入“ 行为在某些情况下是有用的,即使它破坏了公平。
     * 如果你想遵守公平性设置,那么使用  tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit. seconds),这几乎是等效的(它也检测中断)。
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 如果在给定的等待时间内有一个信号量可用,并且当前线程没有被中断,则从这个信号量获得一个许可。
     *
     * 1.获取一个许可(如果有)并立即返回,值为  true ,将可用的许可数量减少 1 。
     * 2.如果没有可用的许可,那么当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到发生以下三种情况之一:
     *     其他线程为这个信号量调用 release 方法,当前线程下一个将被分配一个许可; 或一些其他线程中断当前线程; 或等待时间已过。
     * 3.如果获得了许可,则返回值  true 。
     * 4.如果当前线程:在进入此方法时设置其中断状态; 或在等待获取许可证时被中断。 然后  InterruptedException 被抛出,当前线程的中断状态被清除。
     * 5.如果指定的等待时间过去了,则返回值  false 。如果时间小于或等于零,该方法将根本不等待。
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放一个许可,并将其返回给信号量。
     *
     * 1.释放一个许可证,可用的许可证数量增加一个。如果有任何线程试图获得许可证,那么将选择一个线程并授予刚刚释放的许可证。出于线程调度的目的,该线程已(重新)启用。
     * 2.没有要求释放许可的线程必须通过调用  acquire 来获得该许可。信号量的正确用法是通过应用程序中的编程约定来确定的。
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * 从这个信号量中获取给定数量的许可,阻塞直到所有许可都可用,或者线程被中断。
     *
     * 1.获取给定数量的许可证(如果它们可用),并立即返回,将可用许可证的数量减少给定的数量。
     * 2.如果可用的许可不足,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到发生以下两种情况之一:
     *    其他线程调用这个信号量的  release() 方法之一,当前线程是下一个被分配许可的线程,可用的许可数量满足这个请求; 或其他线程中断当前线程。
     * 3.如果当前线程:在进入该方法时设置其中断状态; 或在等待许可时被中断。 然后 InterruptedException 被抛出,当前线程的中断状态被清除。
     *
     * 任何分配给该线程的许可都会分配给其他试图获取许可的线程,就像通过调用 release()获得了许可一样。
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * 从这个信号量中获取给定数量的许可,直到所有许可都可用为止。
     *
     * 1.获取给定数量的许可证(如果它们可用),并立即返回,将可用许可证的数量减少给定的数量。
     * 2.如果可用的许可数量不足,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到其他线程调用此信号量的 release 方法之一,当前线程将被分配许可,并且可用许可的数量满足此请求。
     * 3.如果当前线程在等待允许时是被中断,那么它将继续等待,并且它在队列中的位置不受影响。当线程从这个方法返回时,它的中断状态将被设置。
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * 仅当调用时所有许可都可用时,才从此信号量获取给定数量的许可。
     *
     * 1.获取给定数量的许可(如果它们可用),并立即返回值 true,将可用许可的数量减少给定的数量。
     * 2.如果可用的许可数量不足,则该方法将立即返回值  false,可用的许可数量不变。
     * 3.即使这个信号量已经被设置为使用公平排序策略,如果有一个许可可用,调用  tryAcquire 将立即获得一个许可,无论当前是否有其他线程正在等待。
     * 这个“闯入”行为在某些情况下是有用的,即使它破坏了公平。
     * 如果你想遵守公平性设置,那么使用 tryAcquire(int, long, TimeUnit) tryAcquire(permissions, 0, TimeUnit. seconds) ,这几乎是等效的(它也检测中断)。
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 从这个信号量中获取给定数量的许可,如果所有许可都在给定的等待时间内可用,并且当前线程没有被线程中断。
     *
     * 1.获取给定数量的许可,如果它们可用并立即返回,值为  true,将可用许可的数量减少给定的数量。
     * 2.如果可用的许可不足,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到发生以下三种情况之一:
     *     其他线程调用这个信号量的 release() release 方法之一,当前线程是下一个被分配许可的线程,可用的许可数量满足这个请求; 或一些其他线程中断当前线程; 或等待时间已过。
     * 3.如果获得了许可,则返回值  true。
     * 4.如果当前线程: 在进入此方法时设置其中断状态; 或在等待获取许可时,线程中断。然后 InterruptedException 被抛出,当前线程的中断状态被清除。
     *     任何分配给这个线程的许可,都被分配给了其他试图获得许可的线程,就好像这些许可是通过调用 release() 获得的一样。
     * 5.如果指定的等待时间过去了,则返回值  false 。如果时间小于或等于零,该方法将根本不等待。任何分配给这个线程的许可,都被分配给了其他试图获得许可的线程,就好像这些许可是通过调用  release() 获得的一样。
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * 释放给定数量的许可,并将它们返回给信号量。
     *
     * 1.释放给定数量的许可证,将可用许可证的数量增加该数量。如果有任何线程试图获得许可,那么将选择一个线程并授予刚刚释放的许可。
     * 如果可用许可的数量满足线程的请求,那么该线程将(重新)为线程调度目的启用;否则线程将等待,直到有足够的许可可用。
     * 如果在这个线程的请求被满足之后,仍然有可用的许可证,那么这些许可证将依次分配给试图获得许可证的其他线程。
     * 2.没有要求释放许可证的线程必须通过调用 acquire 来获得许可证。信号量的正确用法是通过应用程序中的编程约定来确定的。
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * 返回此信号量中可用的许可证的当前数量。(此方法通常用于调试和测试目的。)
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 获取并返回所有立即可用的许可。
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 将可用许可证的数量按指定的减少量缩小。此方法在使用信号量跟踪不可用资源的子类中非常有用。此方法与  acquire 的不同之处在于,它不会阻塞等待许可证变得可用。
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * 如果这个信号量的公平性设置为true,则返回 true 。
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
      * 查询是否有线程正在等待获取。请注意,因为取消可能在任何时候发生,返回  true 并不保证任何其他线程将获得。这种方法主要用于监视系统状态。
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * 返回等待获取的线程数的估计值。该值只是一个估计值,因为当此方法遍历内部数据结构时,线程数可能会动态更改。此方法设计用于监视系统状态,而不是用于同步控制。
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * 返回一个包含可能正在等待获取的线程的集合。因为在构造此结果时,实际的线程集可能会动态更改,因此返回的集合只是最佳估计。
     * 返回集合的元素没有特定的顺序。设计此方法是为了便于构造提供更广泛监视功能的子类。
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

标签:许可,permits,int,可用,信号量,源码,线程,Semaphore,解析
From: https://www.cnblogs.com/coolyang/p/17131013.html

相关文章

  • captura怎样解决FFmpeg解析错误问题
     captura怎样解决FFmpeg解析错误问题?captura软件里大家在进行屏幕录制的工作得时候都会用到captura软件,软件得功能可以满足大家的需求,可以轻松的录制屏幕,进行屏幕截屏等......
  • 解析大型电商网站系统架构分层设计
    DevOps人员需要了解公司的网站架构设计,如果牵涉了具体的高流量高并发的场景,那么,此时也需要提供实际的解决方案,所以了解网站的分层系统架构设计是非常有必要的。网站架构一般......
  • J1周:ResNet-50算法实战与解析
    本周任务根据本文TensorFlow代码,编写出相应的Pytorch代码了解残差结构是否可以将残差模块融入到C3当中(自由探索)一、知识储备深度残差网络ResNet(deepresidualn......
  • python-json解析
    json函数:json.dumps:将python对象解析成jsonjson.loads:将已编码的JSON字符串解码为Python对象json.dumps使用将数组转为json格式数据importjsonif__name__=='__main_......
  • 阅读GitHub上的项目源码有以下几种方法
    GitHub是一个非常流行的代码托管平台,上面有很多优秀的开源项目。阅读这些项目的源码可以帮助我们学习和提高编程技能。阅读GitHub上的项目源码有以下几种方法:1、下载源码到......
  • 多线程等待所有子线程执行完使用总结(3)——CyclicBarrier使用和源码初步分析
    问题背景我们在日常开发和学习过程中,经常会使用到多线程的场景,其中我们经常会碰到,我们代码需要等待某个或者多个线程执行完再开始执行,上一篇文章中(参考https://blog.51cto......
  • 3、TreeMap源码解析
    目录1TreeMap基本介绍2红黑树数据结构回顾3成员变量4内部类Entry5构造函数6重要方法分析6.1get方法分析6.2put方法分析6.3插入调整函数fixAfterInsertion()解析6.......
  • 3D目标检测 | BEVDet系列源码解读
    前言本文介绍了BEVDet实现过程中的代码注释,希望能帮助大家更好地理解如何从论文原理到mmdet3d上代码实现BEVDet。 本文转载自自动驾驶之心作者丨小书童 欢......
  • udhcp源码剖析(一)——DHCP服务器和客户端的工作流程
    DHCP服务器的工作流程udhcpd,即dhcp服务器,在路由器等网关设备中,DHCP服务器启动后用于给LAN侧和无线终端分配IP、租约和其他网络配置。根据RFC2131文档规范,一个标准的DHCP服......
  • DHCP源码分析_dhcpd后台进程子模块
    dhcpd是dhcp服务器端后台进程文件,dhcpd后台程序总是读取配置文件/etc/dhcpd.conf。一,守护进程的基本流程    dhcpd基本流程为: main(){/*设置isc和d......