首页 > 其他分享 >常用的并发工具类

常用的并发工具类

时间:2022-11-08 18:40:24浏览次数:55  
标签:常用 System 并发 线程 CountDownLatch println 工具 public out

CountDownLatch概念

CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

CountDownLatch的用法

CountDownLatch典型用法:

1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

CountDownLatch典型用法:

2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。

CountDownLatch的不足

CountDownLatch是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

CountDownLatch(倒计时计算器)使用说明

方法说明

public void countDown()

  递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.

public boolean await(long timeout,TimeUnit unit) throws InterruptedException

  使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。

  如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下三种情况之一前,该线程将一直出于休眠状态:

  由于调用countDown()方法,计数到达零;或者其他某个线程中断当前线程;或者已超出指定的等待时间。

  • 如果计数到达零,则该方法返回true值。
  • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
  • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待。

参数:

  timeout-要等待的最长时间

  unit-timeout 参数的时间单位

返回:

  如果计数到达零,则返回true;如果在计数到达零之前超过了等待时间,则返回false

抛出:

  InterruptedException-如果当前线程在等待时被中断

例子1:

  主线程等待子线程执行完成在执行

package com.example.demo.CountDownLatchDemo;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
* 主线程等待子线程执行完成再执行
*/
public class CountdownLatchTest1 {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
final CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("子线程" + Thread.currentThread().getName() + "开始执行");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("子线程"+Thread.currentThread().getName()+"执行完成");
latch.countDown();//当前线程调用此方法,则计数减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}


try {
System.out.println("主线程"+Thread.currentThread().getName()+"等待子线程执行完成...");
latch.await();//阻塞当前线程,直到计数器的值为0
System.out.println("主线程"+Thread.currentThread().getName()+"开始执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

例子2:

  百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名

package com.example.demo.CountDownLatchDemo;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CountdownLatchTest2 {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("选手" + Thread.currentThread().getName() + "正在等待裁判发布口令");
cdOrder.await();
System.out.println("选手" + Thread.currentThread().getName() + "已接受裁判口令");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("选手" + Thread.currentThread().getName() + "到达终点");
cdAnswer.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("裁判"+Thread.currentThread().getName()+"即将发布口令");
cdOrder.countDown();
System.out.println("裁判"+Thread.currentThread().getName()+"已发送口令,正在等待所有选手到达终点");
cdAnswer.await();
System.out.println("所有选手都到达终点");
System.out.println("裁判"+Thread.currentThread().getName()+"汇总成绩排名");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}

CountDownLatch和CyclicBarrier

CyclicBarrier在用法上其实跟CountDownLatch十分相似,但是前者功能更加强大。
CountDownLatch举例:

CountDownLatch countDownLatch=new CountDownLatch(n);

当程序多次执行countDownLatch.countDown();导致计数器n=0时,阻塞的线程都将同时被唤醒。但是此时的n已经是等于0了,也就是说这个计数器就是一次性的。

CyclicBarrier举例:

CyclicBarrier cyclicBarrier = new CyclicBarrier(n);

CyclicBarrier也能实现countDownLatch的功能,并且它的计数器n是可以被重置的,也就是说n=0线程被唤醒后,n又能重新回到原有值。

CyclicBarrier实现CountDownLatch的功能

public class CycleBarrierTest {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
// 当计数器为0时,立即执行
@Override
public void run() {
System.out.println("汇总线程:" + Thread.currentThread().getName() + " 任务合并。");
}
});


public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);


// 将线程A添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程A:" + Thread.currentThread().getName() + "执行任务。");
System.out.println("线程A:到达屏障点");
cyclicBarrier.await();
System.out.println("线程A:退出屏障点");
} catch (Exception e) {
e.printStackTrace();
}
}
});


// 将线程B添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程B:" + Thread.currentThread().getName() + "执行任务。");
System.out.println("线程B:到达屏障点");
cyclicBarrier.await();
System.out.println("线程B:退出屏障点");
} catch (Exception e) {
e.printStackTrace();
}
}
});


// 关闭线程池
executorService.shutdown();
}
}

执行结果:

线程A:pool-1-thread-1执行任务。
线程A:到达屏障点
线程B:pool-1-thread-2执行任务。
线程B:到达屏障点
汇总线程:pool-1-thread-2 任务合并。
线程B:退出屏障点
线程A:退出屏障点

结果分析:

CyclicBarrier指定了计数器为2,线程A执行cyclicBarrier.await();到达屏障点阻塞,此时计数器等于2-1=1,线程B也执行cyclicBarrier.await();到达屏障点阻塞,此时计数器等于0,CyclicBarrier里的任务会立即执行,执行完成后,线程A和B也被同时唤醒继续执行。

CyclicBarrier相对CountDownLatch的优化

public class CycleBarrierTest2 {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);


public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);


// 将线程A添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程A:" + Thread.currentThread().getName() + "执行第1步。");
cyclicBarrier.await();
System.out.println("线程A:" + Thread.currentThread().getName() + "执行第2步。");
cyclicBarrier.await();
System.out.println("线程A:" + Thread.currentThread().getName() + "执行第3步。");
} catch (Exception e) {
e.printStackTrace();
}
}
});


// 将线程B添加到线程池
executorService.submit(new Runnable() {
@Override
public void run() {
try {
System.out.println("线程B:" + Thread.currentThread().getName() + "执行第1步。");
cyclicBarrier.await();
System.out.println("线程B:" + Thread.currentThread().getName() + "执行第2步。");
cyclicBarrier.await();
System.out.println("线程B:" + Thread.currentThread().getName() + "执行第3步。");
} catch (Exception e) {
e.printStackTrace();
}
}
});


// 关闭线程池
executorService.shutdown();
}
}

执行结果:


线程A:pool-1-thread-1执行第1步。
线程B:pool-1-thread-2执行第1步。
线程B:pool-1-thread-2执行第2步。
线程A:pool-1-thread-1执行第2步。
线程A:pool-1-thread-1执行第3步。
线程B:pool-1-thread-2执行第3步。

结果分析:
CyclicBarrier指定了计数器为2,线程A和线程B都分别执行了2次cyclicBarrier.await();这样导致的效果就是:每个线程分阶段完成任务,等所有线程都完成了第1步,然后才能接着执行第2步,等所有线程都完成了第2步,才能执行第3步。

图解:

常用的并发工具类_主线程


CyclicBarrier的特性就在于它的计数器是可以重置的,这也就有了上图那种多屏障的情况,虽然第一次调用await使得计数器等于0屏障1失效,但是后续如果继续调用await,屏障还能继续使用,这也就是计数器重置的好处。而CountDownLatch第一次屏障失效后,就结束了。


Semaphore,信号量,一般用于控制同时访问资源的线程数量。可以认为Synchronized代表的是一把锁,那么Semaphore就是多把锁。

常用方法

public class Semaphore implements java.io.Serializable {
//构造方法,传入令牌数,默认实例化一个非公平锁
public Semaphore(int permits);
//获取一个令牌,在获取成功之前,以及被其他线程中断之前,当前线程会被阻塞
public void acquire() throws InterruptedException;
//获取一个令牌,在获取成功之前,当前线程会被阻塞(中断被忽略)
public void acquireUninterruptibly() ;
//尝试获取令牌,立即返回获取成功与否,不阻塞当前线程
public boolean tryAcquire();
//释放一个令牌
public void release();
//返回当前可用的令牌数
public int availablePermits();
}

现在有这样的一个例子:

某卫生间只有3个坑位,把坑前面的挡门理解为令牌,因此这里有3个令牌,现在模拟5个人抢坑位的场景。

package com.xue.testSemaphore;

import java.util.concurrent.Semaphore;

public class Main {
public static void main(String[] args) {
//最多支持3个人同时蹲坑
Semaphore semaphore = new Semaphore(3);
//5个人来抢坑位
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "已经在蹲坑");
//模拟蹲坑时长
Thread.sleep((long) (Math.random() * 10 * 1000));
//离开坑位
System.out.println(Thread.currentThread().getName() + "即将离开坑位");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号").start();
}
}
}

输出如下:

常用的并发工具类_ide_02

首先0、1、2号已经抢完了所有的坑位,3与4号只能在外面等候,对的他们没排队(默认实例化了一个非公平锁)。2号出来后,3号才能进去。接着0号出来,4号才能进去。

这个例子虽然有点俗,这确实能让人印象深刻呀。

原理解析

类图

常用的并发工具类_主线程_03

Semaphore有2个内部类,FairSync与NonfairSync,它们都继承自Sync,Sync又继承自AQS。可以看的出来,Semaphore与CountDownLatch的结构类似,都需要借助于AQS。

构造方法


public Semaphore(int permits) {
sync = new NonfairSync(permits);
}


public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

默认实例化了一个非公平锁,当然也可以进行指定。这里的permits最终会传到AQS的state变量中,代表当前可用的令牌数。

acquire()

获取一个令牌,获取到线程可以继续执行,否则将会被阻塞。

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

调用了AQS中的模版方法

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取arg个令牌,该方法返回可用令牌数-需求数,如果小于0,则进行阻塞
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

其中tryAcquireShared()由具体的子类(AQS的子类Sync的子类NonfailSync)进行实现

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

这里又调用了父类Sync的方法

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

Exchanger 

public class ExchangerDemo {

static Exchanger<String> exchanger = new Exchanger <>();

public static void main(String[] args) {
String t1String = "T1";
String t2String = "T2";

new Thread(()->{
try {
System.out.println("T1交换之前的值:" + t1String);
String exchange = exchanger.exchange(t1String);
System.out.println("T1交换之后的值:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();

new Thread(()->{
try {
System.out.println("T2交换之前的值:" + t2String);
String exchange = exchanger.exchange(t2String);
System.out.println("T2交换之后的值:" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}

案例输出:T1交换之前的值:T1

                 T2交换之前的值:T2

                 T2交换之后的值:T1

                 T1交换之后的值:T2


      丛输出结果可以看出T1、T2线程实现了数据交换,当T1线程调用exchanger.exchange(t1String)的时候就表示它把t1String拿出来跟其他线程做交换,当有T2调用exchanger.exchange(t2String)的时候就表示把t2String拿出来跟其他线程做交互,当有两条线程要做交换的时候,Exchanger就满足了交换的条件,就会进行值的交换,并且把交换后的值返回。

使用注意点:

      1、需要交换值的线程必须是成对的出现。

      2、当有两条线程将值交换完成后,Exchanger实例可重复使用,其他线程也可以使用此Exchanger实例来进行数据交换。

Exchanger的实现原理:

      Exchanger交换值的原理是使用了ThreadLocal来实现的,至于线程阻塞的部分是使用java中的Usafe类park()/unpark()来实现的。


关注公众号 soft张三丰 

常用的并发工具类_java_04

标签:常用,System,并发,线程,CountDownLatch,println,工具,public,out
From: https://blog.51cto.com/u_15501087/5834149

相关文章

  • 可视化图形工具Portainer
    Portainer介绍Portainer是一个可视化的容器镜像的图形管理工具,利用Portainer可以轻松构建,管理和维护Docker环境。而且完全免费,基于容器化的安装方式,方便高效部署。官方站点:h......
  • FigrCollage for Mac(照片拼贴工具)3.3.5直装版mac/win
    FigrCollageforMac是应用在Mac上的一款照片拼贴工具,从任何字母表制作一个心拼贴,数字拼贴,字母拼贴,单词拼贴或拼贴。绝对任何人都可以在几分钟内从形状或文本中制作拼贴画,......
  • 12个面向前端开发者真正有用的 VSCode 插件工具
    英文|https://medium.com/frontend-canteen/most-useful-vscode-extensions-for-frontend-developer-7c0f7ce5ebc4翻译|杨小爱如果你不知道如何安装VSCode插件,可以查......
  • RestTemplateUtils 转发工具类
    @ComponentpublicclassRestTemplateUtils{@AutowiredprivateRestTemplaterestTemplate;privatestaticRestTemplateUtilsrtu;@PostConstr......
  • Sublime Text 常用且比较实用的插件
    Sublimetext3安装ControlPackage插件管理器1、按键盘上的Ctrl+~打开控制面板,并粘贴复制以下代码。importurllib.request,os,hashlib;h='817937144c34c84c88cd43b......
  • vconsole手机前端开发调试工具
    功能:可以在手机端看到cosole打印的内容、接口请求、storage信息、页面元素信息使用方法:1.npm安装npminstallvconsole2.引入页面importVConsolefrom'vconsole'......
  • 14个前端开发人员必备的有用工具
    英文| https://javascript.plainenglish.io/14-useful-tools-for-faster-and-easier-web-development-9fd0ebc1f3f8翻译|web前端开发我从事网站开发多年,我不认为保留自......
  • 13个常用的​JavaScript代码片段
    JavaScript可以做很多好玩的事,从复杂的框架到处理API,有太多的东西需要学习。但是,它也能让我们只用一行就能做一些了不起的事情。1、获得一个随机的布尔值(true/false)该函数......
  • Python工具箱系列(十三)
    上文介绍了使用AES算法进行文件加解密的代码。但是如果在代码中写死了(hardcode)文件名,每次要加解密文件都要去改python源代码,显然有些太笨了。为此,可以使用命令行参数来在......
  • 推荐 4 个开源工具
    今天推荐4个登上GitHub热搜的开源项目,它们分别是:1.炫酷的UI工具:glslViewer2.Textual3.ToolJet:开源的低代码开发框架4. Linux命令大全搜索工具01炫酷的UI......