首页 > 编程语言 >【并发编程】并发工具类

【并发编程】并发工具类

时间:2023-01-20 11:35:08浏览次数:373  
标签:Thread 获取 int 编程 并发 线程 CountDownLatch 工具 public


文章目录

  • ​​1.并发工具CountDownLatch类​​
  • ​​1.1.CountDownLatch简介​​
  • ​​1.2.CountDownLatch数据结构​​
  • ​​3.CountDownLatch源码分析​​
  • ​​4.CountDownLatch案例​​
  • ​​2.并发工具Semaphore类​​
  • ​​2.1.Semaphore是什么​​
  • ​​2.2.Semaphore常用的方法​​
  • ​​2.3.Semaphore案例实战​​
  • ​​3.并发工具CyclicBarrier类​​
  • ​​3.1.CyclicBarrier是什么​​
  • ​​3.2.CyclicBarrier常用的方法​​
  • ​​3.3.CyclicBarrier案例实战​​
  • ​​4.并发工具Exchanger类​​
  • ​​4.1.Exchanger是什么​​
  • ​​4.2.Exchanger核心方法​​
  • ​​4.3.Exchanger案例实战​​

1.并发工具CountDownLatch类

1.1.CountDownLatch简介

  • CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的的操作之前,他允许一个或者多个线程一直等待。
  • CountDownLatch函数列表
#构造一个给定计数初始化的CountDownLatch
CountDownLatch(int count)

#使用当前线程在锁存器倒计数至零之前一直等待,除非线程被中断
void await()

#使当前线程在锁存器倒计时至零之前一直等待,除非线程中断或者超出了指定的等待时间。
boolean await(long timeout,TimeUnit unit)

#递减锁存器的计数,如果计数达到零,则释放所有等待的线程
void countDown()

#返回当前计数
long getCount()

#返回标识此锁存器及器状态的字符串
String toString()

1.2.CountDownLatch数据结构

  • CountDownLatch的UML类图:

【并发编程】并发工具类_共享锁

  • Count的数据结构很简单,它通过共享锁实现的 。它包含了sync对象,sync是Sync类型的实例,他继承于AQS。

3.CountDownLatch源码分析

  • CountDownLatch的源码是基于JDK1.7以上的

(1)CountDownLatch(int count)

public CountDownLatch(int count){
if(count<0) throw new IllegalArgumentException("count < 0");
this.sync = nw Sync(count);
}
  • **说明:**该函数是创建一个Sync对象,而Sync是继承于AQS类。Sync构造函数如下:
Sync(int count){
setState(count);
}
  • setState()在AQS中实现,源码如下:
protected final void setState(long newState){
state=newState;
}
  • **说明:**在AQS中,state是一个private volatitle long 类型的对象。对于CountDownLatch而言,state标识的“锁计数器”。CountDownLatch中getCount()最终调用的是AQS中的getState(),返回的state对象,及锁计数器。

(2)await

public void await() throw InterruptedException{
sync.acquireSharedInterruptibly(1);
}
  • **说明:**该函数实际上是调用AQS的acquireSharedInterruptibly(1);
  • AQS中的acquireSharedInterruptibly()源码如下:
public final void acquireSharedInterruptibly(long arg) throw InterruptedException{
if(Thread.interrupted())
throw new InterruptedException();
if(tryAcauireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
  • **说明:**acquireSharedInterruptibly()的作用是获取共享锁。
  • 如果当前线程是中断状态,则抛出异常InterruptedException。否则。调用tryAcquireShared(arg)尝试获取共享锁,尝试成功则返回,否则就调用doAcquireSharedInterruptibily()。doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。
  • tryAcquireShared()在CountLownLatch.java中被重写,源码如下:
protected int tryAcquireSShared(int acquiress){
return (getState() == 0)?1:-1;
}
  • 说明:tryAcquireShared()的作用是尝试获取共享锁。
  • 如果"锁计数器=0",即锁是可获取状态,则返回1;否则,锁是不可获取状态,则返回-1。
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。
  • node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
  • houldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
  • 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。

(3)countDown()

public void countDown(){
sync.releaseShared(1);
}
  • 说明:该函数实际上嗲用releaseShared(1)释放共享锁。
  • releaseShared()在AQS中实现,源码如下:
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
  • 说明:releaseShared()的目的是让当前线程释放它锁持有的共享锁。
  • 它首先会通过tryReleaseShared()去尝试释放共享锁,尝试成功,则直接返回成功,尝试失败,则通过doReleaseShared()取释放共享锁。
  • tryReleaseShared()在CountDownLatch.java中被重写,源码如下:
protected boolean tryReleaseShared(int releases){
for(;;){
//获取“锁计数器”的状态
int c = getState();
if( c == 0)
return false;
//“锁计数器”-1
int nextc = c-1;
//通过CAS函数进行赋值
if(compareAndSetState(c,nextc))
return nextc == 0;
}
}
  • **说明:**tryReleaseShared()的作用是释放共享锁,将“锁计数器”的值-1;
  • **总结:**CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count个线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”而运行。而“共享锁”可用条件,就是“锁计数器”的值为0,而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1,通过这种方式,必须有count个线程调用countDown()之后,"锁计数器"才为0,等待的线程才能继续运行。

4.CountDownLatch案例

  • 启动20个线程,只有当20个线程执行完成之后,才会执行目标线程。
public class CountDownLatchDemo{

private static CountDownLatch countDownLatch = new CountDownLatch(20);

public static void main(String[] args) {

new Thread(()->{
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+":等待其他线程全部运行完成,完成解锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();


for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+":线程运行");
countDownLatch.countDown();
}).start();
}

for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+":线程运行");
countDownLatch.countDown();
}).start();
}
}

}
  • 运行结果:

【并发编程】并发工具类_后端_02

2.并发工具Semaphore类

2.1.Semaphore是什么

  • Semaphore通常我们叫它信号量,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
  • 可以把他简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车辆为0时,停车场入口的栏杆就不会在打开,车辆就无法进入停车场,直到有一辆车从停车场出去为止。
  • Semaphore使用场景
  • 通常用于那些资源有明确访问数量限制的场景,常用于限流。
  • 比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获取到数据库连接。

2.2.Semaphore常用的方法

//获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态
acquire()

//获取一个令牌,在获取令牌、或者被其他线程调用中断、或超时之前线程处于阻塞状态
acquire(int permits)

//获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)
acquireUninterruptibly()

//尝试获取令牌,返回获取令牌成功或失败,不阻塞线程
tryAcquire()

//尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程
tryAcquire(long timeout,TimeUnit unit)

//释放一个令牌,唤醒一个获取令牌不成功的阻塞线程
release()

//等待队列里是否还存在等待线程
hasQueuedThreads()

//获取等待队列里阻塞的线程数
getQueueLength()

//清空令牌把可用令牌数置为0,返回清空令牌数量
drainPermits()

//返回可用的令牌数量
availablePermits()

2.3.Semaphore案例实战

  • 每个停车场入口都有一个提示牌,上面显示着停车场的剩余车位还有多少,当剩余车位为0时,不允许车辆进入停车场,直到停车场里面有车离开停车场,这时提示牌上会显示新的剩余车位数。
  • 业务场景:
  • 停车场容纳总停车量10。
  • 当一辆车进入停车场后,显示牌的剩余车位数响应的减1.
  • 每有一辆车驶出停车场后,显示牌的剩余车位数响应的加1。
  • 停车场剩余车位不足时,车辆只能在外面等待。
  • 编码实战
public class SemaphoreDemo {
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
//模拟100辆汽车进入停车场
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(()->{
try{
System.out.println("===="+Thread.currentThread().getName()+"来到停车场");
if (semaphore.availablePermits() == 0){
System.out.println("车位不足,请耐心等待");
}
semaphore.acquire(); //获取令牌进入停车场
System.out.println(Thread.currentThread().getName()+"成功进入停车场");
Thread.sleep(10000L);
System.out.println(Thread.currentThread().getName()+"驶出停车场");
semaphore.release(); //释放令牌。腾出停车场车位
}catch(Exception e){
e.printStackTrace();
}
},i+"号车");
thread.start();
}
}
}

【并发编程】并发工具类_i++_03


【并发编程】并发工具类_i++_04

3.并发工具CyclicBarrier类

3.1.CyclicBarrier是什么

  • 从字面上的意思就可以知道,这个类的中文意思是”循环栅栏“。大概的意思就是一个可循环利用的屏障。
  • 它的作用就是会让所有线程都等待完成之后才会继续执行下一步行动。
  • 举个例子,就像生活中我们会约朋友到某个餐厅一起吃饭,有些朋友会早到,有些朋友会晚到,但是这个餐厅规定必须人到齐了才可以让我们进去,这里的朋友们就是各个线程,餐厅就是CyclicBarrier。

3.2.CyclicBarrier常用的方法

(1)构造方法

public CyclicBarrier(int parties)

public CyclicBarrier(int parties,Runnable barrierAction)
  • parties是参与线程的个数
  • 第二个构造方法有一个Runnable参数,这个参数的意思是最后一个到达线程要做的任务

(2)常用方法

//线程调用 await() 表示自己已经到达栅栏
public int await() throws InterruptedException, BrokenBarrierException

//线程调用 await() 表示自己已经到达栅栏
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

//返回当前在屏障处等待的参与者数目
int getNumberWaiting()

//返回要求启动此barrier的参与者数目
int getParties()

//查询此屏障是否处于损坏状态
boolean isBroken()

//将屏障设为初始化状态
void reset()
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时。
  • 线程调用 await() 表示自己已经到达栅栏。

3.3.CyclicBarrier案例实战

(1)案例实战

  • 一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务
public class CyclicBarrierDemo extends Thread {

private CyclicBarrier cyclicBarrier;

public CyclicBarrierDemo(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(getName() + " 达到栅栏 A");
cyclicBarrier.await();
System.out.println(getName() + " 冲破栅栏 A");

Thread.sleep(2000);
System.out.println(getName() + " 达到栅栏 B");
cyclicBarrier.await();
System.out.println(getName() + " 冲破栅栏 B");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {

int threadNum = 5;

CyclicBarrier barrier = new CyclicBarrier(threadNum,()->{
System.out.println(Thread.currentThread().getName()+" 完成最后任务");
});
for (int i = 0; i < threadNum; i++) {
new CyclicBarrierDemo(barrier).start();
}

}
}

【并发编程】并发工具类_System_05

4.并发工具Exchanger类

4.1.Exchanger是什么

  • Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供了一个共同点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程限制性exchange方法,他会一直等待第二个线程也执行exchange,当两个线程都达到同步点时,这两个线程就可以交换数据,将本线程产出来的数据传递给对方。

4.2.Exchanger核心方法

//值传递,将数据传给对方线程
exchange(V x);

//值传递,将数据传给对方线程,超过指定的时间内,没有传递成功,报超时异常
exchange(V x,long timeout,TimeUnit unit)

4.3.Exchanger案例实战

  • 模拟买家支付,商家发货,数据交换。
public class ExchangerDemo {

//Exchanger实例
private static final Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
//商家线程
new Thread(()->{
try {
String phone = "iphone13";
System.out.println(Thread.currentThread().getName()+":商品正在等待卖家付款");
//阻塞,模拟买家支付的过程
Thread.sleep(2000);
String money = exchanger.exchange(phone);
System.out.println(Thread.currentThread().getName()+":收到买家支付的金额:"+money);
}catch (Exception e){
e.printStackTrace();
}
}).start();

new Thread(()->{
try {
String money = "1000";
System.out.println(Thread.currentThread().getName()+":等待商家发货商品");
//阻塞,模拟商家发货的过程
Thread.sleep(2000);
String phone = exchanger.exchange(money);
System.out.println(Thread.currentThread().getName()+":收到商家的产品:"+phone);
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
}

【并发编程】并发工具类_i++_06

  • Exchanger也可以用于校对工作,比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致。
public class ExchangerDemo {    

private static final Exchanger<String> exchanger = new Exchanger<>();

private static ExecutorService executorService = Executors.newFixedThreadPool(2);

public static void main(String[] args) {

executorService.execute(()->{
try {
String A = "银行流水A"; //A录入银行的流水数据
exchanger.exchange(A);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

executorService.execute(()->{
try {
String B= "银行流水B";
String A = exchanger.exchange(B);
System.out.println("A和B的数据是否一致:"+A.equals(B));
}catch (InterruptedException e){
e.printStackTrace();
}
});

executorService.shutdown();

}
}

【并发编程】并发工具类_java_07


标签:Thread,获取,int,编程,并发,线程,CountDownLatch,工具,public
From: https://blog.51cto.com/u_15646271/6020579

相关文章

  • 【并发编程】原子类
    文章目录​​1.什么是原子类​​​​2.原子更新基本类型​​​​3.原子更新数组类型​​​​4.原子地更新属性​​1.什么是原子类(1)原子类简介一度认为原子是不可分割的最小......
  • 一种面向业务配置基于JSF广播定时生效的工具
    作者:京东物流王北永姚再毅李振1背景目前,ducc实现了实时近乎所有配置动态生效的场景,但是配置是否实时生效,不能直观展示每个机器上jvm内对象对应的参数是否已变更为准......
  • Redis 官方可视化工具
    转载:blog.csdn.net/weixin_46902396/article/details/120807629RedisInsight是一个直观高效的RedisGUI管理工具,它可以对Redis的内存、连接数、命中率以及正常运行时......
  • 截图工具,QQ截图独立版,可以脱离QQ使用的QQ截图小工具,有人把QQ截图功能单独拆出来了,真的
    QQ自带的截图功能真的很强大,而且非常方便,包含了多种实用的功能,可以在截图上进行标记,可以截图进行文字提取等。现在有人把这个功能从QQ上分离出来了,在没有网络不登录QQ的情......
  • 读函数式编程思维笔记01_演化的语言
    1. 范式转变1.1. 学习一种全新的编程范式,困难并不在于掌握新的语言1.1.1. 真正考验人的,是怎么学会用另一种方式去思考1.2. 计算机科学的间歇式进步,好思路有时搁置......
  • 资深程序员在编程中有哪些特殊的习惯或方法?
    知乎上有一个问答:高级程序员在编程中有哪些特殊的习惯或方法?我是一个有着22年编程经验的老程序员,谈不上高级,我来谈谈自己的三点心得。一定要有自己的代码库以前有程......
  • 17种编程语言实现排序算法-快速排序
    开源地址https://gitee.com/lblbc/simple-works/tree/master/sort/1.安卓Java版privatestaticvoidsort(int[]array){sortMe(array,0,array.length-1);......
  • CAD 卸载工具,完美彻底卸载清除干净cad各种残留注册表和文件【转载】
    CAD卸载工具完全彻底删除干净CAD各种残留注册表和文件。有些同学由于一些原因想把cad重新安装,但是cad安装到一半就失败了或者显示已安装或者未完成,还有的同学会遇到“安装......
  • 链式编程
    目录一、什么是链式编程二、链式编程的实现(指针)三、链式编程(引用)一、什么是链式编程链式编程在C++中使用的地方很多,比如说输出的时候可以使用很多的<<,这就使用了链式编程......
  • 代码影响范围工具探索
    作者:京东零售 田创新、耿蕾一、背景1.祖传代码不敢随意改动,影响范围无法评估。并且组内时常有因为修改了某块代码,导致其他业务受到影响,产生bug,影响生产。2.研发提测完......