首页 > 编程语言 >Java同步器之辅助类Semaphore

Java同步器之辅助类Semaphore

时间:2022-12-27 18:12:06浏览次数:57  
标签:调用 Java AQS 同步器 int 线程 Semaphore 方法

一、概述

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

二、使用案例

可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

 
/**
 * 类说明:演示Semaphore用法,一个数据库连接池的实现
 */
public class SemaphoreExample {

    private final static int POOL_SIZE = 10;
    
    //两个指示器,分别表示池子还有可用连接和已用连接
    private final Semaphore useful, useless;
    
    //存放数据库连接的容器
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    //初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }

    /*归还连接*/
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
                    + "可用连接数:" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }

    /*从池子拿连接*/
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}

三、实现原理

3.1 内部类

Semaphore同样由AQS实现,总共有三个内部类,并且三个内部类是紧密相关的,SemaphoreReentrantLock的内部类的结构相同,类内部总共存在SyncNonfairSyncFairSync三个类,NonfairSyncFairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。

 
//内部类,继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {

    private static final long serialVersionUID = 1192457210091910933L;
    
	// 构造函数,初始化同步资源 
    Sync(int permits) {
	    // 设置状态数
        setState(permits);
    }

    // 获取许可数
    final int getPermits() {
        return getState();
    }

    // 非公平策略获取同步资源
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
			// 剩余的许可
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))// 许可小于0或者有许可直接CAS尝试设置state的值
                return remaining;
        }
    }

    // 共享模式下进行释放
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
			// 可用的许可
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))// 比较并进行设置成功
                return true;
        }
    }

    // 根据指定的缩减量减小可用许可的数目
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    // 获取并返回立即可用的所有许可
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))// 许可为0或者比较并设置成功
                return current;
        }
    }
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    // 共享模式下获取
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

/**
 * Fair version
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())// 同步队列中存在其他节点
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

Semaphore自身只有两个属性,最重要的是sync属性,基于Semaphore对象的操作绝大多数都转移到了对sync的操作
NonfairSyncFairSync都继承了Sync类,NonfairSync表示采用非公平策略获取资源,FairSync表示采用公平策略获取资源,它们都重写了AQStryAcquireShared方法,NonfairSync会调用父类SyncnonfairTryAcquireShared方法,表示按照非公平策略进行资源的获取。FairSync使用公平策略来获取资源,它会判断同步队列中是否存在其他的等待节点。

3.2 构造函数

 
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

默认也是非公平锁,并发度(permits)赋值给了AQSvolatile int state, 并发度就是AQS的state的初始值

3.3 acquire方法

此方法从信号量获取一个(多个)许可,在提供一个许可前一直将线程阻塞,或者线程被中断,其源码如下

 
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

该方法中将会调用Sync对象的acquireSharedInterruptibly(从AQS继承而来的方法)方法。

3.4 release方法

此方法释放一个(多个)许可,将其返回给信号量,其实是AQSstate加1,释放成功后,唤醒AQS队列中等待的线程,从被阻塞的位置开始执行,源码如下。

 
public void release() {
    sync.releaseShared(1);
}

该方法中将会调用Sync对象的releaseShared(从AQS继承而来的方法)方法。

3.5 小结

Semaphore的内部工作流程也是基于AQS,并且不同于CyclicBarrierReentrantLock,单独使用Semaphore是不会使用到AQS的条件队列的,其实,只有进行await操作才会进入条件队列,其他的都是在同步队列中,只是当前线程会被park

3.5.1 非公平模式下的总结

  1. 当线程要获取许可时,可以直接调用Semaphoreacquire()tryAcquire()方法。
  2. 如果调用acquire()方法,那么将会直接调用AQSacquireShared()方法,该方法又会调用非公平同步器的tryAcquireShared()方法,该方法会直接调用抽象同步器提供的nonfairTryAcquireShared()方法(用当前同步资源的个数 - 要获取的同步资源个数,如果大于等于0则表示获取同步资源成功,则通过CAS更新同步状态的值,然后返回剩余的可用资源个数,否则表示获取同步资源失败,返回负数)
  3. 如果调用tryAcquire()方法,那么将会直接调用抽象同步器的提供的nonfairTryAcquireShared()方法尝试获取同步资源。
  4. 当线程要释放许可时,将会调用Semaphorerelease()方法,该方法会直接调用AQSreleaseShared()方法,releaseShared()方法又会调用抽象同步器的tryReleaseShared()方法,将当前同步资源的个数 + 要释放的同步资源个数 ,然后通过CAS更新同步状态的值。

3.5.2 公平模式下的总结

  1. 当线程要获取许可时,可以直接调用Semaphoreacquire()tryAcquire()方法。
  2. 如果调用acquire()方法,那么将会直接调用AQSacquireShared()方法,该方法又会调用公平同步器的tryAcquireShared()方法,该方法只有当当前线程是等待队列中的头节点的后继节点所封装的线程,或者当前等待队列为空或只有一个节点时,才允许尝试获取同步资源(用当前同步资源的个数 - 要获取的同步资源个数,如果大于等于0则表示获取同步资源成功,则通过CAS更新同步状态的值,然后返回剩余的可用资源个数,否则表示获取同步资源失败,返回负数)
  3. 如果调用tryAcquire()方法,那么将会直接调用抽象同步器的提供的nonfairTryAcquireShared()方法尝试获取同步资源。
  4. 当线程要释放许可时,将会调用Semaphorerelease()方法,该方法会直接调用AQSreleaseShared()方法,releaseShared()方法又会调用抽象同步器的tryReleaseShared()方法,将当前同步资源的个数 + 要释放的同步资源个数,然后通过CAS更新同步状态的值。

四、总结

Semaphore基于AQS实现,分为公平和非公平模式,需要拿到许可才能执行,state是可执行线程的数量,当获取锁时,state-1,执行完毕时state+1,当state小于0时,新加入的线程需要等待直到有其他线程之心完毕释放为止。

  转载: https://www.cnblogs.com/ciel717/p/16190775.html  

标签:调用,Java,AQS,同步器,int,线程,Semaphore,方法
From: https://www.cnblogs.com/cainiao-Shun666/p/17008679.html

相关文章

  • 严重漏洞攻击:影响PHP、Java和ASP.NET
    安全研究员AlexanderKlink和JulianWalde发现了一个严重的漏洞,这个漏洞影响到大多数网络服务器。针对这个漏洞的攻击只需要一个HTTP请求,这个特殊设定......
  • Java8中Optional类入门-替代null避免冗杂的非空校验
    场景Java核心工具库Guava介绍以及Optional和Preconditions使用进行非空和数据校验:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/127683387上面在讲Guava......
  • 程序员必须掌握的java进制转换(全网最详细讲解)
    前言在上一篇文章中,壹哥给大家讲了Java里的各种运算符。其中在讲解位运算符时,我给大家提到了计算机中进制的概念。但是现在很多小白同学,对进制的概念还不够了解,比如二进制......
  • java throws异常处理
        throws异常,程序不能继续执行,直接中断,要想程序继续执行,还必须用try...catch     ......
  • 跟光磊学Java-Windows版Java17开发环境搭建
    Java语言核心技术  如果想要开发Java程序/Java项目之前,必须要安装和配置JDK,这里的JDK表示Java17,不过下载软件的时候,强烈推荐大家一定要去软件的官网下载,因为官......
  • 跟光磊学Java-macOS版Java8开发环境搭建(基于ARM 64-bit)
    Java语言核心技术  日常办公和软件开发除了可以使用Windows系统以外,还可以使用macOS系统,至于具体使用什么系统取决于你入职公司之后公司给你发的什么电脑,如果是......
  • java编译异常和运行时异常
     运行时异常:   编译时异常:处理方案:try...catch() ......
  • 跟光磊学Java-Java概述
     Java语言核心技术  1.Java的发展简史现在人们谈论Java的时候,它所表示的含义发生了一些变化,Java不只是一门高级计算机语言,更是最流行的开发平台和运行平台......
  • 理解JavaScript中的“面向对象”
    理解JavaScript中的“面向对象”一引子面向对象,是程序开发者再熟悉不过的一个概念。一说到它,你首先会想到的是什么?类?继承?方法与属性?不同技术栈的开发者或许有不同的第一反......
  • java try...catch
    解释:    ......