首页 > 编程语言 >CyclicBarrier的源码分析

CyclicBarrier的源码分析

时间:2024-09-16 10:49:37浏览次数:3  
标签:分析 AQS 队列 await generation 源码 线程 CyclicBarrier

CyclicBarrier的源码分析

与CountDownLatch、Semaphore直接基于AQS实现不同,CyclicBarrier 是基于 ReentrantLock + ConditionObject 实现的,间接基于AQS实现的。

CyclicBarrier内部结构

  • Generation,静态内部类,持有布尔类型的属性broken,默认为false,只有在重置方法reset()、执行出现异常或中断调用breakBarrier() ,属性会被设置为true。
  • nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。
  • breakBarrier() 任务执行中断、异常、被重置,将Generation中的布尔类型属性设置为true,将Waiter队列中的线程转移到AQS队列中,待执行完unlock方法后,唤醒AQS队列中的挂起线程。
  • await() :CyclicBarrier的核心方法,计数器递减处理。

构造函数

  构造参数重载,最终调用的是CyclicBarrier(int, Runnable),详情如下:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    // 参数合法性校验
    if (parties <= 0) throw new IllegalArgumentException();
    // final修饰,所有线程执行完成归为或重置时 使用
    this.parties = parties;
    // 在await方法中计数值,表示还有多少线程待执行await
    this.count = parties;
    // 当计数count为0时 ,执行此Runnnable,再唤醒被阻塞的线程
    this.barrierCommand = barrierAction;
}

CyclicBarrier属性

核心方法源码分析

await()

  在CyclicBarrier中,await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法);await(timout, unit)表示等待timeout时间后,指定数量的线程未准备就绪,抛出TimeoutException超时异常。

CyclicBarrier#await 详情如下:

// 执行没有超时时间的await
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        // 执行dowait()
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

// 执行有超时时间的await
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await最终调用dowait()方法,CyclicBarrier#dowait 详情如下:

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 获取锁对象
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 获取generation对象
        final Generation g = generation;

        // 这组线程中在执行过程中是否异常、超时、中断、重置
        if (g.broken)
            throw new BrokenBarrierException();

        // 这组线程被中断,重置标识与计数值,
        //     将Waiter队列中的线程转移到AQS队列,抛出InterruptedException
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        // 计数值 - 1
        int index = --count;
        // 这组线程都已准备就绪
        if (index == 0) {
            // 执行结果标识
            boolean ranAction = false;
            try {
                // 若使用2个参数的有参构造,就传入了自实现任务,index == 0,先执行CyclicBarrier有参的任务
                //     此处设计与 FutureTask 构造参数设计类似
                final Runnable command = barrierCommand;
                if (command != null)
                    // 执行任务
                    command.run();
                // 执行完成,设置为true
                ranAction = true;
                // CyclicBarrier属性归位
                nextGeneration();
                return 0;
            } finally {
                // 执行过程中出现问题
                if (!ranAction)
                    // 重置标识与计数值,将Waiter队列中的线程转移到AQS队列
                    breakBarrier();
            }
        }

        // -- 之后,count不为0,表示还有线程在等待
        // 自旋 直到被中断、超时、异常、count = 0
        for (;;) {
            try {
                // 未设置超时时间
                if (!timed)
                    // 挂起线程,将线程转移到 Condition 队列
                    trip.await();
                // 未达到等待时间
                else if (nanos > 0L)
                    // 挂起线程,并返回剩余等待时间
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 中断异常
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 线程中断
                    Thread.currentThread().interrupt();
                }
            }

            // 该组线程被中断、执行异常、超时,抛出BrokenBarrierException异常
            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            // 超时,抛出异常TimeoutException
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁资源
        lock.unlock();
    }
}

breakBarrier() - 结束CyclicBarrier的执行

// 结束CyclicBarrier的执行
private void breakBarrier() {
    // 设置线程执行过程中是否异常、中断、重置标识
    generation.broken = true;
    // 重置计数值
    count = parties;
    // 将Condition队列中的Node转移到AQS队列中,等到执行完unlock,AQS队列中的挂起线程会被唤醒
    // 有后继节点的,设置ws = -1;
    // 无后继节点的,设置ws = 0
    trip.signalAll();
}

reset() - 重置CyclicBarrier

// 重置CyclicBarrier
public void reset() {
    // 获取锁对象
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 设置当前generation属性,并将Waiter队列中线程转移到AQS队列
        breakBarrier();
        // 重置generation 属性、计数值
        nextGeneration();
    } finally {
        // 释放锁
        lock.unlock();
    }
}

nextGeneration() - CyclicBarrier归位

private void nextGeneration() {
    // 将Waiter队列中线程转移到AQS队列
    trip.signalAll();
    // 计数值、generation 归位
    count = parties;
    generation = new Generation();
}

总结

  CyclicBarrier基于 ReentrantLock + ConditionObject实现,CyclicBarrier的构造函数中必须指定parties,同时对象generation,内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。

  parties是初始待执行线程数,在构造函数中会将parties赋给计数值count,每当一个线程执行await(),count就会减1。

  当count被减为0时,代表所有线程都准备就绪,此时判断构造函数是否初始化了barrierCommand属性,若对barrierCommand属性做了赋值,优先执行barrierCommand任务;

  barrierCommand任务执行完成,再将Waiter队列中的线程转移到AQS队列中,执行完unlock,唤醒AQS队列中的线程;计数值count、generation归位。

标签:分析,AQS,队列,await,generation,源码,线程,CyclicBarrier
From: https://blog.csdn.net/qq_36324341/article/details/142299861

相关文章

  • 并发容器(Map、List、Set)实战及其原理分析
    1.JUC包下的并发容器Java的集合容器框架中,主要有四大类别:List、Set、Queue、Map,大家熟知的这些集合类ArrayList、LinkedList、HashMap这些容器都是非线程安全的。所以,Java先提供了同步容器供用户使用。同步容器可以简单地理解为通过synchronized来实现同步的容器,比如Vector......
  • 【USB3.0协议学习】Topic3·三种Reset Events分析
    USB3.0中的三种ResetEvents1.PowerOnResetPowerOnReset被用来代指上电复位,当一个device接入到roothub或者外置hub的时候,该device检测到Vbus信号从无效变为有效,会自动执行复位。(注意,selfpowereddevice不通过Vbus供电,但是Vbus发生转变的时候它同样会执行复位)1.1软件设置P......
  • Java计算机毕业设计疫苗接种管理系统的设计与实现(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着全球公共卫生事件的频发,疫苗接种作为预防和控制传染病的有效手段,其重要性日益凸显。然而,传统的手工记录与管理方式已难以满足大规模、高效率的疫......
  • Java计算机毕业设计学生综合管理小程序(开题报告+源码+论文)
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在数字化时代,高校作为知识传播与创新的前沿阵地,其管理模式正逐步向智能化、便捷化转型。随着学生需求的多元化与个性化发展,传统的学生管理模式已难以......
  • mysql5.5源码主从复制搭建(以两台机器的单实例为例)
    mysql5.5源码主从复制搭建(以两台机器的单实例为例)如果对运维课程感兴趣,可以在b站上、csdn或微信视频号上搜索我的账号:运维实战课程,可以关注我,学习更多免费的运维实战技术视频项目1  配置mysql服务器的主从结构(开始时主从数据不一致时需要找到临界点的主从同步,主库初始......
  • 接口Map源码阅读与分析
      computeIfPresent用途:这是一个针对已经存在的键值对进行更新的方法,如果旧值存在,则使用key和旧值去计算出新值进行更新具体逻辑如下:XX<流程图占位> defaultVcomputeIfPresent(Kkey,BiFunction<?superK,?superV,?extendsV>remappingFunction)......
  • 基于PHP的考研真题资料互助交流平台vue.js【开题实训报告源码论文】
      博主介绍:......
  • 基于PHP的网上订餐平台系统vue.js【开题实训报告源码论文】
      博主介绍:......
  • JVM频繁GC分析
    本文记录一次频繁GC的分析问题查看项目日志发现GC频繁,几乎几秒钟一次查看GC日志[GC(AllocationFailure)[PSYoungGen:6816K->320K(8192K)]82693K->76229K(187904K),0.0032930secs][Times:user=0.01sys=0.00,real=0.00secs][GC(AllocationFailure)[PSYou......
  • 共享单车轨迹数据分析:以厦门市共享单车数据为例(四)
    副标题:共享单车与地铁接驳距离探究——以厦门市为例关于轨道交通站点接驳范围的研究早已屡见不鲜,通常认为以站点为圆心、800米作为地铁站直接的服务范围是合理的。近年来,随着轨道、公交和慢行交通三网融合概念的提出,慢行交通被视为解决城市“最后一公里”问题的最佳方案之一。......