首页 > 其他分享 >20.AQS家族的“外门弟子”:CyclicBarrier

20.AQS家族的“外门弟子”:CyclicBarrier

时间:2023-06-19 22:31:51浏览次数:46  
标签:20 AQS await 计数器 屏障 线程 CountDownLatch CyclicBarrier

关注王有志,一个分享硬核Java技术的互金摸鱼侠 欢迎你加入Java人的提桶跑路群共同富裕的Java人

今天我们来学习AQS家族的“外门弟子”:CyclicBarrier。

为什么说CyclicBarrier是AQS家族的“外门弟子”呢?那是因为CyclicBarrier自身和内部类Generation并没有继承AQS,但在源码的实现中却深度依赖AQS家族的成员ReentrantLock。就像修仙小说中,大家族会区分外门和内门,外门弟子通常会借助内门弟子的名声行事,CyclicBarrier正是这样,因此算是AQS家族的“外门弟子”。在实际的面试中,CyclicBarrier的出现的次数较少,通常会出现在与CountDownLatch比较的问题当中

今天我们就逐步拆解CyclicBarrier,来看看它与CountDownLatch之间到底有什么差别。

CyclicBarrier是什么?

先从CyclicBarrier的名字开始入手,Cyclic是形容词,译为“循环的,周期的”,Barrier是名词,译为“屏障,栅栏”,组合起来就是“循环的屏障”,那么该怎么理解“循环的屏障”呢?我们来看CyclicBarrier的注释是怎么解释的:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarrier是一种同步辅助工具,允许一组线程等待彼此到达共同的屏障点。

The barrier is called cyclic because it can be re-used after the waiting threads are released. 因为在等待线程释放后可以重复使用,所以屏障被称为循环屏障。

看起来与CountDownLatch有些相似,我们通过一张图来展示下CyclicBarrier是怎样工作的:

20.AQS家族的“外门弟子”:CyclicBarrier_System

部分线程到达屏障后,会在屏障处等待,只有全部线程都到达屏障后,才会继续执行。如果以CountDownLatch中越野徒步来举例的话,把老板拿掉,选手之间的互相等待,就是CyclicBarrier了。

另外,注释中说CyclicBarrier是“re-used”,即可重复使用的。回想一下CountDownLatch的实现,并未做任何重置计数器的工作,即当CountDownLatch的计数减为0后不能恢复,也就是说CountDownLatch的功能是一次性的

Tips:实际上,可以用CountDownLatch实现类似于CyclicBarrier的功能。

CyclicBarrier怎么用?

我们用没有老板参加的越野徒步来举例,部分先到的选手要等待后到的选手一起吃午饭,用CyclicBarrier来实现的代码是这样的:

// 初始化CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

for (int i = 0; i < 10; i++) {
  int finalI = i;
  new Thread(() -> {
    try {
      TimeUnit.SECONDS.sleep((finalI + 1));
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    try {
      System.out.println("选手[" + finalI + "]到达终点,等待其他选手!!!");
      
      // 线程在屏障点处等待
      cyclicBarrier.await();
      
      System.out.println("选手[" + finalI + "]开始吃午饭啦!!!");
    } catch (InterruptedException | BrokenBarrierException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

用法和CountDownLatch很相似,构造函数设置CyclicBarrier需要多少个线程达到屏障后统一行动,区别是CyclicBarrier在每个线程中都调用了CyclicBarrier#await,而我们在使用CountDownLatch时只在主线程中调用了一次CountDownLatch#await

那CountDownLatch可以在线程中调用CountDownLatch#await吗?答案是可以的,这样使用的效果和CyclicBarrier是一样的:

CountDownLatch countDownLatch = new CountDownLatch(10);

for (int i = 0; i < 10; i++) {
  int finalI = i;
  new Thread(() -> {
    try {
      TimeUnit.SECONDS.sleep((finalI + 1));
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    System.out.println("选手[" + finalI + "]到达终点!!!");
    countDownLatch.countDown();
    try {
      countDownLatch.await();
      System.out.println("选手[" + finalI + "]开始吃午饭啦!!!");
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

通过上面的例子,我们不难想到CyclicBarrier#await方法是同时具备了CountDownLatch#countDown方法和CountDownLatch#await方法的能力,即执行了计数减1,又执行了暂停线程

CyclicBarrier是怎么实现的?

我们先整体认识一下CyclicBarrier:

20.AQS家族的“外门弟子”:CyclicBarrier_System_02

CyclicBarrier的内部结构比CountDownLatch复杂一些,除了我们前面提到的借助AQS的“内门弟子”ReentrantLock类型的lock和Condition类型的trip外,CyclicBarrier还有两个“特别”的地方:

  • 内部类Generation,直译过来是“代”,它起到什么作用?
  • Runnable类型的成员变量barrierCommand,它又做了些什么?

其余的部分,大部分可以在CountDownLatch中找到对应的方法,或者通过名称我们就很容易得知它们的作用。

CyclicBarrier的构造方法

CyclicBarrier提供了两个(实际是一个)构造方法:

// 需要到达屏障的线程数
private final int parties;

// 所有线程都到达后执行的动作
private final Runnable barrierCommand;

// 计数器
private int count;

public CyclicBarrier(int parties) {
  this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
  if (parties <= 0) {
    throw new IllegalArgumentException();
  }
  this.parties = parties;
  this.count = parties;
  this.barrierCommand = barrierAction;
}

第二个构造函数接收了两个参数:

  • parties:表示需要多少个线程到达屏障处调用CyclicBarrier#await
  • barrierAction:所有线程到达屏障后执行的动作。

构造方法的代码一如既往的简单,只有一处比较容易产生疑惑,partiescount有什么区别?

首先来看成员变量的声明,parties使用了final,表明它是不可变的对象,代表CyclicBarrier需要几个线程共同到达屏障处;而count是计数器,初始值是parties,随着到达屏障处的线程数量增多count会逐步减少至0。

CyclicBarrier的内部类Generation
private static class Generation {
  Generation() {}  
  
  boolean broken;
}

Generation用于标记CyclicBarrier的当前代,Doug Lea是这么解释它的作用的:

Each use of the barrier is represented as a generation instance. The generation changes whenever the barrier is tripped, or is reset. 每次使用屏障(CyclicBarrier)都需要一个Generation实例。无论是通过屏障还是重置屏障,Generation都会发生改变。

Generation中的broken用于标记当前的CyclicBarrier是否被打破,默认为false,值为true时表示当前CyclicBarrier已经被打破,此时CyclicBarrier不能正常使用,需要调用CyclicBarrier#reset方法重置CyclicBarrier的状态。

CyclicBarrier#await方法

前面我们猜测CyclicBarrier#await方法即实现了计数减1,又实现了线程等待的功能,下面我们就通过源码来验证我们的想法:

public int await() throws InterruptedException, BrokenBarrierException {
  try {
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}

public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
  return dowait(true, unit.toNanos(timeout));
}

两个重载方法都指向了CyclicBarrier#dowait方法:

private int dowait(boolean timed, long nanos)  throws InterruptedException, BrokenBarrierException, TimeoutException {
  // 使用ReentrantLock
  final ReentrantLock lock = this.lock;
  lock.lock();
  
  try {
    // 第2部分
    // 获取CyclicBarrier的当前代,并检查CyclicBarrier是否被打破
    final Generation g = generation;
    if (g.broken) {
      throw new BrokenBarrierException();
    }
    
    // 线程被中断时,调用breakBarrier方法
    if (Thread.interrupted()) {
      breakBarrier();
      throw new InterruptedException();
    }
    
    // 第3部分
    //计数器减1
    int index = --count;
    // 计数器为0时表示所有线程都到达了,此时要做的就是唤醒等待中的线程
    if (index == 0) {
      boolean ranAction = false;
      try {
        // 执行唤醒前的操作
        final Runnable command = barrierCommand;
        if (command != null) {
          command.run();
        }
        ranAction = true;
        // CyclicBarrier进入下一代
        nextGeneration();
        return 0;
      } finally {
        if (!ranAction) {
          breakBarrier();
        }
      }
    }
    
    // 第4部分
    // 只有部分线程到达屏障处的情况
    for (;;) {
      try {
        //调用等待逻辑)
        if (!timed) {
          trip.await();
        } else if (nanos > 0L) {
          nanos = trip.awaitNanos(nanos);
        }
      } catch (InterruptedException ie) {
        // 线程被中断时,调用breakBarrier方法
        if (g == generation && ! g.broken) {
          breakBarrier();
          throw ie;
        } else {
          Thread.currentThread().interrupt();
        }
      }
      if (g.broken) {
        throw new BrokenBarrierException();
      }
      // 如果不是当前代,返回计数器的值
      if (g != generation) {
        return index;
      }
      // 如果等待超时,调用breakBarrier方法
      if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
      }
    }
  } finally {
    lock.unlock();
  }
}

CyclicBarrier#dowait方法看起来很长,但如果拆成3部分来看逻辑并不复杂:

  • 第1部分:CyclicBarrier与线程的状态校验;
  • 第2部分:当计数器减1后值为0时,唤醒所有等待中的线程;
  • 第3部分:当计数器减1后值不为0时,线程进入等待状态。

先来看第1部分,CyclicBarrier与线程的状态校验的部分,先是判断CyclicBarrier是否被打破,接着判断当前线程是否为中断状态,如果是则调用CyclicBarrier#breakBarrier方法:

private void breakBarrier() {
  generation.broken = true;
  count = parties;
  trip.signalAll();
}

CyclicBarrier#breakBarrier方法非常简单,只做了3件事:

  • 标记CyclicBarrier被打破;
  • 重置CyclicBarrier的计数器;
  • 唤醒全部等待中的线程。

也就是说,一旦有个线程标记为中断状态,都会直接打破CyclicBarrier的屏障。

我们先跳过第2部分的唤醒逻辑,直接来看第3部分线程进入等待状态的逻辑。根据timed参数选择调用Condition不同的等待方法,随后是对异常的处理和线程中断状态的处理,同样是调用CyclicBarrier#breakBarrier,标记CyclicBarrier不可用。线程进入等待状态的逻辑并不复杂,本质上是通过AQS的Condition来实现的。

最后来看第2部分唤醒所有等待中线程的操作,根据计数器是否为0判断是否需要进行唤醒。如果需要唤醒,最后一个执行CyclicBarrier#await的线程执行barrierCommand(此时尚未执行任何线程唤醒的操作),做通过屏障前的处理操作,接着调用CyclicBarrier#nextGeneration方法:

private void nextGeneration() {
  trip.signalAll();
  count = parties;
  generation = new Generation();
}

CyclicBarrier#nextGeneration方法也做了3件事:

  • 唤醒所有Condition上等待的线程;
  • 重置CyclicBarrier的计数器;
  • 创建新的Generation对象。

很符合进入“下一代”的名字,先唤醒“上一代”所有等待中的线程,然后重置CyclicBarrier的计数器,最后更新CyclicBarrier的Generation对象,对CyclicBarrier进行重置工作,让CyclicBarrier进入下一个纪元。

到这里我们不难发现,CyclicBarrier自身只做了维护计数器和重置计数器的工作,而保证互斥性和线程的等待与唤醒则是依赖AQS家族的成员完成的:

  • ReentrantLock保证了同一时间只有一个线程可以执行CyclicBarrier#await,即同一时间只有一个线程可以维护计数器;
  • Condition为CyclicBarrier提供了条件等待队列,完成了线程的等待与唤醒的工作。
CyclicBarrier#reset方法

最后我们来看CyclicBarrier#reset方法:

public void reset() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 主动打破CyclicBarrier
    breakBarrier();
    // 使CyclicBarrier进入下一代
    nextGeneration();
  } finally {
    lock.unlock();
  }
}

CyclicBarrier#reset方法都是老面孔,先是CyclicBarrier#breakBarrier打破上一代CyclicBarrier,既然要重新开始就不要再“怀念”过去了;最后调用CyclicBarrier#nextGeneration开始新的时代。需要注意的是,这里加锁的目的是为了保证执行CyclicBarrier#reset时,没有任何线程正在执行CyclicBarrier#await方法。

好了,到这里CyclicBarrier的核心内容我们就一起分析完了,剩下的方法就非常简单了,相信通过名字大家就可以了解它们的作用,并猜到它们的实现了。

TipsCyclicBarrier#getNumberWaiting中加了锁,这是为什么?

CountDownLatch和Cyclicbarrier有什么区别?

最后的部分,我们来解答下开篇时的面试题,CountDownLatch和Cyclicbarrier有什么区别?

第1点:CyclicBarrier可以重复使用,CountDownLatch不能重复使用

无论是正常使用结束,还是调用CyclicBarrier#reset方法,Cyclicbarrier都可以重置内部的计数器

第2点:Cyclicbarrier只阻塞调用CyclicBarrier#await方法的线程,而CountDownLatch可以阻塞任意一个或多个线程

CountDownLatch将计数减1与阻塞拆分成了CountDownLatch#countDownCountDownLatch#await两个方法,而Cyclicbarrier只通过CyclicBarrier#await完成两步操作。如果在同一个线程中连续CountDownLatch#countDownCountDownLatch#await则实现了与CyclicBarrier#await方法相同的功能。

结语

好了,今天就到这里结束了。如果本文对你有帮助的话,请多多点赞支持。最后欢迎大家关注分享硬核技术的金融摸鱼侠王有志,我们下次再见!

标签:20,AQS,await,计数器,屏障,线程,CountDownLatch,CyclicBarrier
From: https://blog.51cto.com/u_15955875/6517953

相关文章

  • 2021数字中国创新大赛虎符网络安全赛-Writeup
    文章目录Web签到“慢慢做”管理系统Misc你会日志分析吗Web签到http://cn-sec.com/archives/313267.htmlUser-Agentt:zerodiumsystem("cat/flag");“慢慢做”管理系统根据题目提示,这里第一步登录应该利用一些字符串被md5($string,true)之后会形成如下,从而造成注入PSC:\Users\A......
  • BUUCTF:[安洵杯 2019]easy_web
    https://buuoj.cn/challenges#[%E5%AE%89%E6%B4%B5%E6%9D%AF%202019]easy_webTXpVek5UTTFNbVUzTURabE5qYz0经过base64decode->base64decode->hexdecode得到555.png解码编码脚本或者自己利用在线工具编码也行frombinasciiimport*frombase64import*defdecode(param):r......
  • 2021-DASCTF-三月赛-Writeup
    文章目录WEBBestDBez_serializebaby_flaskez_loginMISC签到简单的png隐写雾都孤儿小田的秘密Ascii_art问卷调查和团队的师傅们组队拿了个第十,师傅们带飞,我就是团队的MVP(MostVegetablePeople)WEBBestDB简单的SQL注入/?query=mochu"or/**/1=1%23/?query=mochu"order/**/by/**/......
  • 2020 纵横杯 线上赛 MISC部分Writeup
    文章目录签到马赛克My_Secret问卷调查签到oct_str='[0146,0154,0141,0147,0173,0167,063,0154,0143,0157,0155,0145,0137,0164,0157,0137,062,0157,0156,0147,0137,0150,063,0156,0147,0137,0142,0145,061,0175]'oct_list=oct_str.replace(&q......
  • BUUCTF:[BJDCTF2020]EasySearch
    index.php.swp发现源码<?php ob_start(); functionget_hash(){ $chars='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()+-'; $random=$chars[mt_rand(0,73)].$chars[mt_rand(0,73)].$chars[mt_rand(0,73)].$chars[mt_ran......
  • BUUCTF:[CISCN2019 华东南赛区]Web11
    注意到了banner中信息说是smarty,并且将XFF输出到页面直接尝试Smarty模板注入{$smarty.version}Smarty3官方手册:https://www.smarty.net/docs/zh_CN/language.function.if.tpl{ifsystem('ls-lha/')}{/if}{ifsystem('cat/flag')}{/if}......
  • premiere下载2023正版下载安装 软件大全
    软件介绍adobepremiereprocs6是由Adobe公司研发出品的一款超强大的视频编辑软件。Premiere功能强大,为您提供采集,剪辑,添加字幕等功能于一体,满足您的工作要求,让你制作出高品质的作品。[下载地址]:后台私信我软件对比adobepremiere和Vegas的对比1、PR的优点在于其专业性,输出的视频......
  • BUUCTF:[WesternCTF2018]shrine
    https://buuoj.cn/challenges#[WesternCTF2018]shrineimportflaskimportosapp=flask.Flask(__name__)app.config['FLAG']=os.environ.pop('FLAG')@app.route('/')defindex(): returnopen(__file__).read() @app.route(&......
  • 2022 RealWorld CTF体验赛Writeup
    文章目录DigitalSouvenirlog4flagBe-a-Database-HackertheSecretsofMemorybabyflaglabFlagConsoleBe-a-Database-Hacker2JavaRemoteDebuggerDigitalSouvenirrwctf{RealWorldIsAwesome}log4flag有一些正则过滤网上bypass方法很多,随便找一个就行${${::-j}ndi:${lower:r......
  • “东华杯”2021年大学生网络安全邀请赛 暨第七届上海市大学生网络安全大赛线上赛MISC-
    文章目录checkinprojectJumpJumpTigerwhere_can_find_code题目附件请自取:链接:https://pan.baidu.com/s/1T9nG-CDg_D8QYQZapuxucg提取码:2wubcheckin+AGYAbABhAGcAewBkAGgAYgBfADcAdABoAH0-UTF-7编码UTF-7在线解码站:http://toolswebtop.com/text/process/decode/utf-7flag{dhb_......