首页 > 编程语言 >并发编程(Phaser)

并发编程(Phaser)

时间:2024-04-25 16:47:16浏览次数:25  
标签:Phaser Thread 编程 并发 线程 阶段 parties phase

Phaser,翻译为移相器(阶段),它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务

Phaser使用方法

Phaser同时包含CyclicBarrier和CountDownLatch两个类的功能。

Phaser的arrive方法将将计数器加1,awaitAdvance将线程阻塞,直到计数器达到目标,这两个方法与CountDownLatch的countDown和await方法相对应;

Phaser的arriveAndAwaitAdvance方法将计数器加1的同时将线程阻塞,直到计数器达到目标后继续执行,这个方法对应CyclicBarrier的await方法。

除了包含以上两个类的功能外,Phaser还提供了更大的灵活性。CyclicBarrier和CountdownLatch在构造函数指定目标后就无法修改,而Phaser提供了register和deregister方法可以对目标进行动态修改。

下面看一个最简单的使用案例:

package com.tools;

import java.util.concurrent.Phaser;

/**
 * Phaser示例
 */
public class PhaserRunner {
    // 定义每个阶段需要执行3个小任务
    public static final int PARTIES = 3;
    // 定义需要4个阶段完成的大任务
    public static final int PHASES = 4;

    public static void main(String[] args) {

        Phaser phaser = new Phaser(PARTIES) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=======phase: " + phase + " finished=============");
                return super.onAdvance(phase, registeredParties);
            }
        };

        for (int i = 0; i < PARTIES; i++) {
            new Thread(() -> {
                for (int j = 0; j < PHASES; j++) {
                    System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
                    phaser.arriveAndAwaitAdvance();
                }
            }, "Thread " + i).start();
        }
    }
}

这里我们定义个需要4个阶段完成的大任务,每个阶段需要3个小任务,针对这些小任务,我们分别起3个线程来执行这些小任务,查看输出结果为:

ini复制代码Thread 2: phase: 0
Thread 0: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 1: phase: 1
Thread 0: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 2: phase: 2
Thread 0: phase: 2
=======phase: 2 finished=============
Thread 1: phase: 3
Thread 0: phase: 3
Thread 2: phase: 3
=======phase: 3 finished=============

可以看到,每个阶段都是三个线程都完成来才进入下一个阶段

结合AQS的原理,大概猜测一下Phaser的实现原理:

  • 首先,需要存储当前阶段phase、当前阶段的任务数(参与者)parties、未完成参与者的数量,这三个变量我们可以放在一个变量state中存储。
  • 其次,需要一个队列存储先完成的参与者,当最后一个参与者完成任务时,需要唤醒队列中的参与者。

结合上面的案例带入:初始时当前阶段为0,参与者为3个,未完成参与者数为3;

  • 第一个线程执行到 phaser.arriveAndAwaitAdvance(); 时进入队列;
  • 第二个线程执行到 phaser.arriveAndAwaitadvance(); 时进入队列;
  • 第三个线程执行到 phaser.arriveAndAwaitadvance(); 时先执行这个阶段的总结 onAdvance(), 再唤醒签名两个线程继续执行下一个阶段的任务

Phaser分析

img

主要方法

register(),增加一个参与者,需要同时增加parties和unarrived两个数值,也就是state中的16位和低16位

onAdvance(int phase, int registeredParties),当前阶段所有线程完成时,会调用OnAdvance()

bulkRegister(int parties),指定参与者数目注册到Phaser中,同时增加parties和unarrived两个数值

arrive(),作用使parties值加1,并且不在屏障处等待,直接运行下面的代码

awaitAdvance(int phase),如果传入的参数与当前阶段一致,这个方法会将当前线程置于休眠,直到这个阶段的参与者都完成运行。如果传入的阶段参数与当前阶段不一致,立即返回

arriveAndAwaitAdvance(),当前线程当前阶段执行完毕,等待其它线程完成当前阶段

arriveAndDeregister(),当一个线程调用来此方法时,parties将减1,并且通知这个线程已经完成来当前预警,不会参加到下一个阶段中,因此Phaser对象在开始下一个阶段时不会等待这个线程。

awaitAdvanceInterruptibly(int phase),这个方法跟awaitAdvance(int phase)一样,不同之处是,如果这个方法中休眠的线程被中断,它将抛出InterruptedException异常。

Phaser 的监控方法

getPhase():返回当前的 phase,前面说了,phase 从 0 开始计算,最大值是 Integer.MAX_VALUE,超过又从 0 开始

getRegisteredParties():当前有多少 parties,随着不断地有 register 和 deregister,这个值会发生变化

getArrivedParties():有多少个 party 已经到达当前 phase 的栅栏

getUnarrivedParties():还没有到达当前栅栏的 party 数

Phaser 的分层结构

为什么要把多个 Phaser 实例构造成一棵树,解决什么问题?有什么优点?

Phaser 内部用一个 state 来管理状态变化,随着 parties 的增加,并发问题带来的性能影响会越来越严重。

/**
 * 0-15: unarrived    还没有抵达屏障的参与者的个数 (bits 0-15)
 * 16-31: parties,   所以一个 phaser 实例最大支持 2^16-1=65535 个 parties
 * 32-62: phase,     屏障所处的阶段  31 位,那么最大值是 Integer.MAX_VALUE,达到最大值后又从 0 开始
 * 63: terminated    屏障的结束标记    
 */
private volatile long state;

通常我们在说 0-15 位这种,说的都是从低位开始的

state 的各种操作依赖于 CAS,典型的无锁操作,但是,在大量竞争的情况下,可能会造成很多的自旋

而构造一棵树就是为了降低每个节点(每个 Phaser 实例)的 parties 的数量,从而有效降低单个 state 值的竞争。

总结

Phaser

  1. Phaser适用于多阶段多任务的场景,每个阶段的任务都可以控制的很细;
  2. Phaser内部使用state变量及队列实现整个逻辑;
  3. state的高32位存储当前阶段phase,中16位存储当前阶段参与者(任务)的数量parties,低16位存储未完成参与者的数量unarrived;
  4. 队列会根据当前阶段的奇偶性选择不同的队列;
  5. 当不是最后一个参与者到达时,会自旋或者进入队列排队来等待所有参与者完成任务;
  6. 当最后一个参与者完成任务时,会唤醒队列中的线程并进入下一阶段。

Phaser相对于CyclicBarrier和CountDownLatch的优势?

优势主要有两点:

  1. Phaser可以完成多阶段,而一个CyclicBarrier 或者CountDownLatch一般只能控制一到两个阶段的任务;
  2. Phaser每个阶段的任务数量可以控制,而一个CyclicBarrier 或者 CountDownLatch任务数量一旦确定不可修改。

相关博客:

https://juejin.cn/post/6902060644661952525
https://pdai.tech/md/java/thread/java-thread-x-juc-tool-phaser.html
https://www.cnblogs.com/youthlql/articles/14027047.html

标签:Phaser,Thread,编程,并发,线程,阶段,parties,phase
From: https://www.cnblogs.com/luojw/p/18158019

相关文章

  • Java之oop(面向对象编程)
    目录面向对象编程(OOP)一、面向过程与面向对象二、Java基本元素:类和对象三、对象的创建与使用1.对象的使用2.内存解析3.匿名对象四、类的成员1.属性1.1概念1.2分类2.方法2.1声明格式2.2方法的重载2.3可变形参的方法2.4方法参数的值传递机制2.5递归方法3.构造器4.代码块5.......
  • 在实际编程中,如何有效利用模块规格对象?
    在实际编程中,模块规格对象(`ModuleSpec`)通常用于需要动态导入模块的场景,例如插件系统、扩展框架、测试框架等。以下是一些有效利用模块规格对象的方法:1.**动态导入模块**:当你需要根据运行时的情况来决定导入哪个模块时,可以使用模块规格对象。例如,根据用户的输入或配置文件来动......
  • 第八周结对编程报告
    一.结对情况本人:2252407结对搭档:2252438二.实现方式由c++(2252407)和c(2252438)混合编写实现,3位数的四则运算三.程序完成功能点首先程序要求如下:具体实现的功能有:由用户制定计划,确定练习的总题量和练习的天数,由此得出每天练习的题量生成每天的算术表达式,0-100以内的三个数字......
  • 结对编程
    1.主要内容在本节实验课上,两名同学组队,一个同学编码,另一名同学在旁边审核代码,检查错误,之后再交换角色。这节课上,开发一个小程序,能够进行四则运算,其中包括两个运算符,100以内的数字,然后输入答案,判断答案是否正确。2.代码点击查看代码#include<iostream>#include<cstdlib>#......
  • Windows编程系列:设备I/O
    Windows设备在Windows平台下,设备被定义为能够与之进行通信的任何东西。最常见的I/O设备包括:文件、文件流、目录、物理磁盘、卷、控制台缓冲区、磁带驱动器、通信资源、mailslot和管道等。平常我们使用的文件,目录都可以称之为设备。 本文是介绍设备的通用操作,以文件操作进......
  • 软件开发与创新第二次实验———结对编程:计算出题系统
    一.结对信息2252418盛宇伟2252436董朝二.题目要求小学老师要每周给同学出300道四则运算练习题。这个程序有很多种实现方式:C/C++C#/VB.net/JavaExcelUnixShellEmacs/Powershell/VbscriptPerlPython两个运算符,100以内的数字,不需要写答案。需要检查答案是否正确,并......
  • 结对编程
    点击查看代码#include<iostream>#include<stdio.h>#include<time.h>#include<stdlib.h>usingnamespacestd;intj=0;charyunsuanfu(){ chara[]={'+','-','*','/'}; intn=4; inti; i=rand()%n; r......
  • 使用“数据库提供的事务管理机制来控制并发访问”处理事务
    在数据库中,事务管理机制用于确保一系列操作要么全部完成,要么全部不发生,以保持数据的一致性和完整性。在SQLite中,可以使用`BEGINTRANSACTION`,`COMMIT`,和`ROLLBACK`语句来管理事务。以下是一个使用SQLite的事务管理机制来处理并发访问的示例:```pythonimportthreadingfromD......
  • 结对编程
    #include<iostream>#include<cstdlib>#include<ctime>#include<fstream>#include<limits>//函数原型声明intgetRandomNumber(intmin,intmax);intmain(){intn;std::cout<<"请输入题目的数量:"<<st......
  • Golang - 并发同步更新全局切片失败的原因以及解决方案
    当多个协程同时访问和修改同一个共享资源(如切片)时,如果没有适当的同步机制,可能会导致数据竞争和不一致的结果。packagemainimport("fmt""sync")funcprocessChunk(chunk[]int64,wg*sync.WaitGroup,failedList[]int64){deferwg.Done()fmt.Print......