首页 > 其他分享 >多线程限流工具类-Semaphore

多线程限流工具类-Semaphore

时间:2024-03-03 16:25:38浏览次数:17  
标签:permits int sync 信号量 限流 线程 Semaphore 多线程 public

Semaphore介绍

Semaphore(信号量)是JAVA多线程中的一个工具类,它可以通过指定参数来控制执行线程数量,一般用于限流访问某个资源时使用。

Semaphore使用示例

需求场景:用一个核心线程数为6,最大线程数为20的线程池执行任务,但是要求最多只能同时运行3个线程

代码:

public class demo {

    //创建线程池,核心线程数:2;最大线程数:4;时间:5;时间单位:秒;阻塞队列:ArrayBlockingQueue,最大容量为10;线程工厂:默认;拒绝策略:默认
    static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(6, 20, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(3);//指定线程数量
        for (int i = 0; i < 10; i++) {
            poolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + " start...");
                        Thread.sleep(2000);
                        //用来表明当前线程结束
                        System.out.println(Thread.currentThread().getName() + " end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        }
        poolExecutor.shutdown();
    }
}

(结果分析)从输出结果可以看出:最多只能同时开启3个线程(创建Semaphore时指定线程数量),只有等最先开启的三个线程中的某个结束了才会开启新的线程,但同时运行的总量始终保持在3个以内!

pool-1-thread-1 start...
pool-1-thread-2 start...
pool-1-thread-3 start...
pool-1-thread-2 end...
pool-1-thread-3 end...
pool-1-thread-4 start...
pool-1-thread-1 end...
pool-1-thread-5 start...
pool-1-thread-6 start...
pool-1-thread-5 end...
pool-1-thread-6 end...
pool-1-thread-4 end...
pool-1-thread-6 start...
pool-1-thread-3 start...
pool-1-thread-2 start...
pool-1-thread-6 end...
pool-1-thread-2 end...
pool-1-thread-3 end...
pool-1-thread-1 start...
pool-1-thread-1 end...

Process finished with exit code 0

Semaphore实现原理

源码:

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;

    //继承AQS的内部类
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        //构造函数,传入的参数为信号量permits
        Sync(int permits) {
            setState(permits);
        }

        //获取信号量
        final int getPermits() {
            return getState();
        }

        //以非公平的方式尝试获取信号量
        final int nonfairTryAcquireShared(int acquires) {
            //自旋
            for (; ; ) {
                //当前信号量
                int available = getState();
                //获取acquires个信号量后的剩余信号量
                int remaining = available - acquires;
                //如果剩余信号量小于0(获取失败),或者成功把剩余信号量更新为当前信号量(获取成功)都会退出自旋并返回剩余信号量
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        //尝试释放信号量
        protected final boolean tryReleaseShared(int releases) {
            for (; ; ) {
                //当前信号量
                int current = getState();
                //下个信号量,即当前信号量+释放的信号量(线程运行结束将信号量还给Semaphore,所以相加)
                int next = current + releases;
                //如果下个信号量小于当前信号量则有越界的情况,报错!
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //如果没问题就用CAS更新当前信号量,并结束自旋
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        //减少信号量
        final void reducePermits(int reductions) {
            for (; ; ) {
                int current = getState();
                //下个信号量为当前信号量-减少信号量
                int next = current - reductions;
                //如果没有那么多可减少的信号则抛出异常
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                //如果没问题就更新信号量并结束自旋
                if (compareAndSetState(current, next))
                    return;
            }
        }

        //清空信号量
        final int drainPermits() {
            for (; ; ) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    //非公平
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    //公平
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        //公平的方式尝试获取信号量
        protected int tryAcquireShared(int acquires) {
            for (; ; ) {
                //如果队列当前节点已经有任务则结束自旋
                if (hasQueuedPredecessors())
                    return -1;
                //当前信号量
                int available = getState();
                //剩余信号量=当先信号量-获取信号量
                int remaining = available - acquires;
                //如果剩余信号量小于0(获取失败),或者成功把剩余信号量更新为当前信号量(获取成功)都会退出自旋并返回剩余信号量
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    //构造方法,默认以非公平的方式设置信号量
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    //构造方法,自定义以公平还是非公平的方式设置信号量
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    //获取信号量,如果当前线程已中止(interrupted)就抛出异常
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    //获取信号量,无论当前线程是否中止都尝试获取
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    //尝试获取信号量
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    //尝试获取信号量并设置超时时间
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    //释放信号量
    public void release() {
        sync.releaseShared(1);
    }

    //获取指定数量的信号量
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    //获取指定数量的信号量
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    //尝试获取指定数量的信号量
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    //尝试获取指定数量的信号量,并设置超时时间
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    //释放指定数量的信号量
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    //获取当前信号数量
    public int availablePermits() {
        return sync.getPermits();
    }

    //清空信号量
    public int drainPermits() {
        return sync.drainPermits();
    }

    //减少指定数量的信号量
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    //判断是否为公平
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    //判断是否有队列(有阻塞线程时才会产生队列,即判断是否有阻塞线程)
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    //获取阻塞线程数量
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    //获取阻塞线程并封装成集合返回
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }


    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

核心方法:

1、acquire()

    //获取信号量,如果当前线程已中止(interrupted)就抛出异常
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果当前线程已经终止则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取信号量(调用内部类Sync的方法)
        if (tryAcquireShared(arg) < 0)
            //获取信号量失败时会将当前线程封装成node加入到阻塞队列中
            doAcquireSharedInterruptibly(arg);
    }

2、release()

    //释放信号量
    public void release() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        //调用内部类Sync尝试释放信号量
        if (tryReleaseShared(arg)) {
            //释放成功后唤醒阻塞队列的next节点
            doReleaseShared();
            return true;
        }
        return false;
    }

 

标签:permits,int,sync,信号量,限流,线程,Semaphore,多线程,public
From: https://www.cnblogs.com/Bernard94/p/18047338

相关文章

  • C#多线程
    在C#编程中,多线程是实现高效并发编程的关键技术之一。通过创建多个线程,程序可以同时执行多个任务,从而充分利用多核处理器的计算能力。本文将带你快速回顾C#多线程的基础知识,通过10分钟的学习,你将能够掌握多线程的核心概念,并学会使用C#语言创建和管理线程。一、多线程基础概念在C......
  • 并发编程补充:基于多线程实现并发的套接字通信
    服务端:fromsocketimport*fromthreadingimportThreaddefcommunicate(conn):whileTrue:try:data=conn.recv(1024)ifnotdata:breakconn.send(data.upper())exceptConnectionResetError:......
  • 对于需要实时处理的代码语句 就用定时器中断模式,实现多线程模式,建议不要用查询模式。
    对于需要实时处理的代码语句就用定时器中断模式,实现多线程模式,建议不要用查询模式。 示例代码1:查看代码#include"delay.h"#include"sysInt.h"#include"intrins.h"charSMGDuan[]={0x5B,0x3F,0x5B,0x66, 0x40,0x40, 0x3F,0x3F}; //2024--MMcharsegDuan[]={0x3F,0......
  • pyqt5中多线程爬虫
       设立爬虫Class,继承pyqt5中的Thread函数中使用普通线程  整体代码:importsysimportpandasaspdimportjson,requests,time,threadingfromPyQt5.QtWidgetsimportQMainWindow,QApplication,QVBoxLayout,QMessageBoxfromui.ui_testimportUi_MainWind......
  • c++多线程按行读取同一个每行长度不规则文件
    对于非常大的比如上百G的大文件读取,单线程读是非常非常慢的,需要考虑用多线程读,多个线程读同一个文件时不用加锁的,每个线程打开一个独立的文件句柄多线程读同一个文件实现思路思路1先打开一个文件句柄,获取整个文件大小file_size确定要采用线程读取的部分大小read_size和......
  • c++多线程编程
    c++线程库:<thread>创建线程:需要可调用的函数或者函数对象作为线程入口点例:std::threadthreadname(function_name,args...)在C++中,当使用std::thread创建线程并传递类的成员函数时,需要使用&来获取成员函数的地址,同时还需要传递对象的指针(或引用)作为第一个参数。......
  • C++ 多线程笔记2 线程同步
    C++多线程笔记2线程同步并发(Concurrency)和并行(Parallelism)并发是指在单核CPU上,通过时间片轮转的方式,让多个任务看起来像是同时进行的。实际上,CPU在一个时间段内只会处理一个任务,但是由于切换时间非常快,用户感觉像是多个任务同时在进行。这种方式的优点是可以充分利用CPU资源,......
  • VSCode编写多线程程序碰到 mutex 和 thread 未定义的报错问题
    硬件:ThinkBook16G5+IRH系统:Win11家庭中文版22H2如果碰到在线安装MinGW-w64失败的问题可以参考以下链接在线安装MinGW-w64失败下载mingw-std-threads文件夹目前MinGWGCC缺少标准的C++11线程类,该库补充实现有关thread和mutex的内容https://github.com/mega......
  • envoy&istio 对接ratelimit 实现限流之ratelimit简介
    23年的时候公司因调用企业微信接口超限,导致业务问题。架构组经过协商后决定上一个限流服务。限流这块自然而然就落到我负责的网关这块,小公司我一个人负责api网关这块。之前基于istio给公司上线了一个本地的限流(我给公司开发了一个devops管理工具,可以用来管理k8s、istio、jenki......
  • 多线程文件拷贝
    多线程文件拷贝#include<stdio.h>#include<pthread.h>#include<unistd.h>#include<stdlib.h>#include<assert.h>#include<sys/mman.h>#include<sys/types.h>#include<sys/stat.h>#include<fcntl.h>#inclu......