首页 > 编程语言 >Java并发编程——Semaphore

Java并发编程——Semaphore

时间:2023-02-01 16:03:31浏览次数:65  
标签:Java 许可 int 编程 获取 线程 Semaphore 节点

一、Semaphore

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

 

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

 

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

 

信号量通过一组许可证来控制对共享资源的访问。

 

如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程就会醒来尝试获取许可。

 

Semaphore提供给了若干个api对应不同的功能:

  • Semaphore(int permits):非公平模式创建;
  • Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
  • acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
  • acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
  • acquireUninterruptibly():尝试获取一个许可,不可中断;
  • acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
  • tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
  • tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
  • release():释放一个许可;
  • release(int permits):释放n个许可;

下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly():

这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。

二、执行原理

Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:

  • 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
  • 执行release的时候,state + releases,把许可加回去。

三、Semaphore用法

/**
 * @Description: 演示Semaphore用法
 */
public class SemaphoreDemo {

    public static Semaphore semaphore = new Semaphore(3,true);

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(50);

        for (int i = 0; i < 100; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"拿到了许可证");
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println(Thread.currentThread().getName()+"释放了许可证");
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}

注意,如果使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。

四、源码

公平信号量 和 非公平信号量 的区别

"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!

 

不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。

4.1 Semaphore构造方法

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
	public Semaphore(int permits) {
		sync = new NonfairSync(permits);
	}

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}
  • 1、Semaphore 构造器,permits 为传入的许可证数,默认非公平构造器;

  • 2、Semaphore 构造器,permits 为传入的许可证数,fair 是 boolean 型的,如果传入 true,则公平,否则不公平;

4.2 NonfairSync 和 FairSync源码

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }	
	
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

两者都继承了 Sync 同步器,初始化时都调用了父类构造器,同时都有一个获取信号的方法,稍后再分析获取信号的区别。

4.3 acquire(获取信号量)

  • 这个方法是从信号量获取一个许可,在获取到许可,或线程中断之前,当前线程阻塞;获取许可后立即返回并将许可数减一
public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
	/**
	* 如果没有许可可用,则会休眠,直到发生以下两种情况
	* 1、其他调用release方法释放许可,并且当前线程获取到许可
	* 2、其他线程中断了当前线程
	* 	1)当前线程在进入这个方法时设置了中断标志位
	* 	2)等待许可时发生了中断,则抛出中断异常
	*/	
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}
  • acquireSharedInterruptibly 这个方法是直接调用AQS的acquireSharedInterruptibly(int ard)方法;
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
	
   /**
	* 首先检测是否中断.中断后抛出异常
	* 尝试获取许可,成功退出;失败则进入AQS队列,直至成功获取或中断
	*/
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
		// 尝试获取锁,返回剩余共享锁的数量;小于0则加入同步队列,自旋	
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
}

tryAcquireShared(arg)则会调用Semaphore中两个同步器的tryAcquireShared实现方法; 如果获取失败则加入队列等待唤醒;

4.4 非公平模式的实现

非公平实现都是首先查看是否有可获取的许可,如果有则获取成功,没有则进队列等待;利用此可以提高并发量

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
    static final class NonfairSync extends Sync {

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
}
  • 直接调用其父类Sync中非公平共享获取
public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
    abstract static class Sync extends AbstractQueuedSynchronizer {
		
        final int nonfairTryAcquireShared(int acquires) {
			// 自旋直到无许可或者状态位赋值成功
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
				// 如果小于0则直接返回,否则利用CAS给AQS状态位赋值
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
	}
}

通过自旋+CAS来一直尝试获取许可,直到获取成功或者没有许可,返回剩余的许可数

4.5 公平模式的实现

公平与非公平的区别在于始终按照AQS队列FIFO的顺序来的

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
    static final class FairSync extends Sync {

        protected int tryAcquireShared(int acquires) {
			//自旋 CAS 实现线程安全
            for (;;) {
				// 判断是否有前置任务排队
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
				// 如果小于0则直接返回,否则利用CAS给AQS状态位赋值
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

如果等待队列不为空,则直接返回-1。 以上两种模式获取失败后都会调用doAcquireSharedInterruptibly(int arg);自旋等待获取锁

  • doAcquireSharedInterruptibly方法:会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //创建“当前线程”的Node节点,且node中记录的锁是“共享锁”类型,并将节点添加到CLH队列末尾。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取前继节点,如果前继节点是等待锁队列的表头,则尝试获取共享锁
                // 判断新增的节点的前一个节点是否头节点
                final Node p = node.predecessor();
                if (p == head) {
                    // 是头节点,那么在此尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取成功,把当前节点变为新的head节点,
                        //并且检查后续节点是否可以在共享模式下等待,
                        //并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //前继节点不是头节点,当前线程一直等待,直到获取到锁
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
}
  • shouldParkAfterFailedAcquire方法:判断当前线程获取锁失败之后是否需要挂起
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
 /*说明:4.shouldParkAfterFailedAcquire 返回当前线程是否应该阻塞
    (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值)

    CANCELLED[1]  -- 当前线程已被取消
    SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。
                        一般发生情况是:当前线程的后继线程处于阻塞状态,
                        而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
    PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
    [0]           -- 当前线程不属于上面的任何一种状态。
    
    (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

    规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
    规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
    规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
    */  
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
        if (ws == Node.SIGNAL)
            return true;
            
        // 如果前继节点是取消的状态即前驱节点状态为CANCELLED 
        if (ws > 0) {
            // 从队尾向前寻找第一个状态不为CANCELLED的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 将前驱节点的状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }   
}   

4.6 void release()

公平和非公平使用相同的释放 释放许可

public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
	public void release() {
		sync.releaseShared(1);
	}
}
  • 调用AQS中的releaseShared(int arg)
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
    //目的是让当前线程释放它所持有的共享锁,它首先会通过tryReleaseShared()去尝试释放共享锁。
    //尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
    public final boolean releaseShared(int arg) {
		//释放共享锁
        if (tryReleaseShared(arg)) {
			 //唤醒所有共享节点线程
            doReleaseShared();
            return true;
        }
        return false;
    }
}
  • tryReleaseShared()在Semaphore.Sync中被重写,释放共享锁,将锁计数器加回去
public class Semaphore implements java.io.Serializable {

    private final Sync sync;
	
    abstract static class Sync extends AbstractQueuedSynchronizer {
		
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
				// 获取“锁计数器”的状态
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
				// 通过CAS函数进行赋值。	
                if (compareAndSetState(current, next))
                    return true;
            }
        }
	}
}
  • 如果释放许可成功,则调用AQS中的doReleaseShared()方法来唤醒AQS队列中等待的线程
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
	/**
     * 唤醒同步队列中的一个线程
     */	
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
				//是否需要唤醒后继节点
                if (ws == Node.SIGNAL) {
					//修改状态为初始0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
					//唤醒h.nex节点线程	
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
}
  • 1)获取队列的头节点元素,如果不为null,并且不为尾节点,说白了,就是不止一个人等待,进入判断。

  • 2)如果线程节点是需要唤醒的线程,则进行唤醒,获取资源使用。

  • 3)失败后重试。

  • 4)如果没有后继需要唤醒的节点,则退出,就相当于每人排队上厕所了,让出来资源就空着。

Semaphore 总结

  • 1、Semaphore 内部维护一组信号量,即一个 volatile 的整型 state 变量。

  • 2、Semaphore 分为公平或非公平两种方式,获取信号量或释放信号量的本质是对 state 进行原子的减少或增加操作。

  • 3、获取不到信号的线程放在等待队列里面,释放信号的时候会唤醒后继节点。

  • 4、Semaphore 主要用于对线程数量、公共资源(比如数据库连接池)等进行数量控制。

参考: https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html

https://www.cnblogs.com/200911/p/6060359.html

https://juejin.cn/post/6844904119547723789

https://blog.csdn.net/yhl_jxy/article/details/87279383

标签:Java,许可,int,编程,获取,线程,Semaphore,节点
From: https://blog.51cto.com/u_14014612/6031636

相关文章

  • Java并发编程——并发包中锁的AQS通用实现
    一、包结构介绍我们查看下java.util.concurrent.locks包下面,发现主要包含如下类:可以发现ReentrantLock和ReentrantReadWriteLock都是AbstractQueueSynchronizer类。我们......
  • Java并发编程——ArrayBlockingQueue
    一、阻塞队列BlockingQueue在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速......
  • Java并发编程——LinkedBlockingQueue
    一、阻塞队列BlockingQueue在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速......
  • java/Android获取单个文件的MD5值,解决首位0被省略问题,解决超大文件问题,cmd命令行查看
    code来源:https://betheme.net/news/txtlist_i67135v.html?action=onClickcmd命令行查看文件md5码:certutil-hashfilea.txtmd5,不加后面的md5,查看的默认是sha1码。packag......
  • 【Java】自定义mybatis
    处理sqlin多条件搜索时单引号双引号问题StringBuilderpidNoZeroIds=newStringBuilder();IntegernumTmp=0;for(ShequLsDatingTypepidNoZero:pidNoZeroLis......
  • Java线程池中的execute和submit
    一、概述execute和submit都是线程池中执行任务的方法。execute是Executor接口中的方法publicinterfaceExecutor{voidexecute(Runnablecommand);}submit是......
  • Java多线程:Future和FutureTask
    一、FutureFuture是一个接口,所有方法如下:上源码:packagejava.util.concurrent;publicinterfaceFuture<V>{booleancancel(booleanmayInterruptIfRunning);......
  • 使用java python 实现 QI-页面排序-骑马钉
    链接:http://www.cnprint.org/bbs/thread/77/339531/......
  • Spring开启@Async异步方法(javaconfig配置)
    在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。应用场景:某些耗时较长的......
  • JavaScript闭包的概念
    什么是闭包?闭包有什么作用,缺点是什么?闭包的概念:JavaScript中函数会产生闭包(closure)。闭包是函数本身和该函数声明时所处的环境状态的组合;函数能够“记忆住”其定义......