首页 > 编程语言 >CyclicBarrier源码解析

CyclicBarrier源码解析

时间:2023-02-17 07:11:36浏览次数:48  
标签:CyclicBarrier 屏障 lock private 源码 线程 new 解析 final

CyclicBarrier源码解析

描述:

一个同步帮助,允许一组线程互相等待到达一个共同的屏障点。Cyclicbarrier 在涉及固定大小的线程组的程序中非常有用,这些线程必须偶尔相互等待。这个屏障称为cyclic,因为它可以在等待的线程被释放后被重用。

A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此 屏障操作 对更新共享状态很有用。

示例用法:以下是在并行分解设计中使用障碍的示例:

 class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;
   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);
         try {
           barrier.await();
         } catch (InterruptedException ex) {
           return;
         } catch (BrokenBarrierException ex) {
           return;
         }
       }
     }
   }
   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     Runnable barrierAction =
       new Runnable() { public void run() { mergeRows(...); }};
     barrier = new CyclicBarrier(N, barrierAction);
     List<Thread> threads = new ArrayList<Thread>(N);
     for (int i = 0; i < N; i++) {
       Thread thread = new Thread(new Worker(i));
       threads.add(thread);
       thread.start();
     }
     // wait until done
     for (Thread thread : threads)
       thread.join();
   }
 }

这里,每个工作线程处理矩阵的一行,然后等待屏障,直到所有行都被处理。 当处理所有行时,执行提供的Runnable屏障操作并合并行。 如果合并确定已经找到解决方案,那么done()将返回true ,并且每个工作人员将终止。

如果屏障操作不依赖于执行方暂停的各方,那么该方可以在释放任何线程时执行该操作。 为了方便这一点,每次调用await()返回该线程在屏障上的到达索引。 然后,您可以选择哪个线程应该执行屏障操作,例如:

if (barrier.await() == 0) {
  // log the completion of this iteration
}

CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException(或InterruptedException)异常离开如果他们也在同一时间被打断)。

内存一致性效果:线程中调用的行动之前, await() happen-before 行动是屏障操作的一部分,进而发生,之前的动作之后,从相应的成功返回await()其他线程。

源码:

public class CyclicBarrier {
    /**
     * 屏障的每次使用都表示为一个生成实例。每当障碍被跳闸或复位时,生成就会改变。
     * 使用barrier的线程可能会关联许多代 —— 由于分配锁给等待线程的不确定方式 —— 但一次只能有一个代是活动的(  count 适用的那个),其余的都是中断或触发的。
     * 如果发生了中断,但没有后续的重置,则不需要活动代。
     */
    private static class Generation {
        boolean broken = false;
    }

    /**
     * 用于保护屏障入口的锁
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * 条件等待,直到跳闸
     */
    private final Condition trip = lock.newCondition();

    /**
     * 参与方数量
     */
    private final int parties;

    /**
     * 跳闸时运行的命令
     */
    private final Runnable barrierCommand;

    /**
     *  当前一代
     */
    private Generation generation = new Generation();

    /**
     * 仍在等待的参与方数。每一代从参与方倒数到 0。
     * 它在每一代人中或在被破坏时都会被重置。
     */
    private int count;

    /**
     * 更新障碍旅行状态并唤醒所有人。仅在持有锁时调用。
     */
    private void nextGeneration() {
        // 上一代的信号完成
        trip.signalAll();
        // 建立下一代
        count = parties;
        generation = new Generation();
    }

    /**
     * 设置当前屏障生成为破碎,并唤醒所有人。仅在持有锁时调用。
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    /**
     * 主要壁垒代码,涵盖各种政策。
     */
    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 创建一个新的  CyclicBarrier,当给定数量的方(线程)等待它时,它将跳闸,
     * 当barrier被跳闸时,它将执行给定的barrier动作,由最后一个进入barrier的线程执行。
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * 创建一个新的 CyclicBarrier ,当给定数量的方(线程)等待它时,它将跳闸,并且在跳闸时不执行预定义的操作。
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**
     * 返回跳过此屏障所需的参与方数。
     */
    public int getParties() {
        return parties;
    }

    /**
     * 等待,直到所有  parties  都在此屏障上调用了 await 。
     *
     * 1.如果当前线程不是最后一个到达的,那么出于线程调度的目的,它将被禁用,并处于休眠状态,直到发生以下事情之一:
     * 最后一个到了; 或其他线程中断当前线程; 或其他线程中断了其中一个等待线程; 或其他线程在等待barrier时超时; 或其他一些线程在这个barrier上调用reset。
     * 2.如果当前线程:在进入此方法时设置其中断状态; 或等待时中断;然后 InterruptedException 被抛出,当前线程的中断状态被清除。
     * 3.如果屏障是  reset ,而任何线程正在等待,或如果屏障 isBroken被打破时被调用,或当任何线程正在等待,那么  BrokenBarrierException 将被抛出。
     * 4.如果任何线程在等待时被中断,那么所有其他等待线程将抛出  BrokenBarrierException , barrier将处于broken状态。
     * 5.如果当前线程是到达的最后一个线程,并且构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。
     *
     * 如果在barrier操作期间发生异常,则该异常将在当前线程中传播,并且barrier处于断开状态。
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    /**
     * 等待,直到所有  parties  都在此屏障上调用了 await 。或者指定的等待时间过去。
     *
     * 1.如果当前线程不是最后一个到达的,那么出于线程调度的目的,它将被禁用,并处于休眠状态,直到发生以下事情之一:
     * 最后一个到达; 或指定的超时时间已过; 或一些其他线程中断当前线程; 或其他线程中断了其中一个等待线程; 或其他线程在等待barrier时超时; 或其他一些线程在这个barrier上调用 reset。
     * 2.如果当前线程: 在进入此方法时设置其中断状态; 或等待时中断;然后 InterruptedException 被抛出,当前线程的中断状态被清除。
     * 3.如果指定的等待时间超过,则抛出 TimeoutException 。如果时间小于或等于零,该方法将根本不等待。
     * 4.如果屏障是  reset,而任何线程正在等待,或如果屏障 isBroken被打破时被调用,或当任何线程正在等待,那么 BrokenBarrierException 将被抛出。
     * 5.如果任何线程在等待时被中断,那么所有其他等待线程将抛出 BrokenBarrierException , barrier将处于broken状态。
     * 6.如果当前线程是到达的最后一个线程,并且构造函数中提供了非空屏障操作,则当前线程在允许其他线程继续之前运行该操作。
     *
     * 如果在barrier操作期间发生异常,则该异常将在当前线程中传播,并且barrier处于断开状态。
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**
     *  查询该屏障是否处于损坏状态。
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 将屏障重置为初始状态。
     * 如果任何一方当前正在栅栏旁等待,他们将返回一个 BrokenBarrierException。
     * 请注意,由于其他原因发生断裂后,重置<em>可能会比较复杂;线程需要以其他方式重新同步,并选择一个来执行重置。
     * 可能更可取的是,为后续使用创建一个新的屏障。
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    /**
     * 返回当前在屏障处等待的参与方数量。此方法主要用于调试和断言。
     */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

设计思路:

主要使用 await() 方法,使各线程处于阻塞状态,直到达到一定的数量,调用 signalAll() 释放所有线程继续执行。往复使用该操作

标签:CyclicBarrier,屏障,lock,private,源码,线程,new,解析,final
From: https://www.cnblogs.com/coolyang/p/17126883.html

相关文章

  • CountDownLatch源码解析
    CountDownLatch源码解析描述:一种同步辅助工具,允许一个或多个线程等待在其他线程中执行的一组操作完成。用给定的count初始化CountDownLatch。因为调用了countDown()......
  • Llvm 源码结构及测试基础
    Llvm源码结构及测试基础Llvm源码目录功能简介了解一下Llvm源码工程目录介绍、Llvm相关lib库的介绍、Llvm生成的工具链介绍,方便我们后期学习Llvm。LLVM源码工程目录介......
  • 深入探索Android 启动优化(七) - JetPack App Startup 使用及源码浅析
    本文首发我的微信公众号:徐公,想成为一名优秀的Android开发者,需要一份完备的知识体系,在这里,让我们一起成长,变得更好~。前言前一阵子,写了几篇Android启动优化的文章......
  • 13 drf-解析器
    之前使用request.data获取请求体中的数据。这个reqeust.data的数据怎么来的呢?其实在drf内部是由解析器,根据请求者传入的数据格式+请求头来进行处理。 解析器可......
  • SpringMVC源码(八):Controller控制器执行流程
    在MVC请求流程中,获取到HandlerAdapter适配器后,会执行handler处理器(Controller控制器)的相关逻辑,通过适配器的handle()方法,完成目标Controller处理器的调用。在源码(七......
  • dom4j解析和生成xml文件
    解析xml大致步骤:1:创建SAXReader;2:使用SAXReader解析指定的xml文档信息,并返回对应Document对象。Document对象中就包含了该xml文中的所有信息以及结构了。3:根据文档......
  • Epoll原理解析--网卡接收数据说起
     转至 https://blog.csdn.net/armlinuxww/article/details/92803381 太重要了怕丢失,冒昧转一下  从网卡接收数据说起下边是一个典型的计算机结构图,计算机由CPU......
  • 解析MYSQL建表语句,生成表结构的JSON
    根据建表语句解析表结构,并将表结构解析为JSON。根据MYSQL的建表语句,建表语句:CREATETABLE`TEST`(`ID`varchar(56)NOTNULL,`CREAETE_TIME`datetimeN......
  • 数据解析-正则匹配
    一、正则基础1、为什么使用正则需求判断一个字符串是否是手机号解决编写一个函数,给函数一个字符串,如果是手机号则返回True,否则返回False代码defisPhone(phon......
  • 【android】音视频开发五:学习MediaExtractor 和 MediaMuxer,知道如何解析和封装 mp4 文
    MediaExtractorMediaExtractor顾名思义就是多媒体提取器,主要负责:获取媒体文件的格式,包括音视频轨道,编码格式,宽高,采样率,声道数等,分离音频流,视频流,读取分离后的音视频数据......