Phaser
,翻译为移相器(阶段),它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务
Phaser使用方法
Phaser同时包含CyclicBarrier和CountDownLatch两个类的功能。
Phaser的arrive方法将将计数器加1,awaitAdvance将线程阻塞,直到计数器达到目标,这两个方法与CountDownLatch的countDown和await方法相对应;
Phaser的arriveAndAwaitAdvance方法将计数器加1的同时将线程阻塞,直到计数器达到目标后继续执行,这个方法对应CyclicBarrier的await方法。
除了包含以上两个类的功能外,Phaser还提供了更大的灵活性。CyclicBarrier和CountdownLatch在构造函数指定目标后就无法修改,而Phaser提供了register和deregister方法可以对目标进行动态修改。
下面看一个最简单的使用案例:
package com.tools;
import java.util.concurrent.Phaser;
/**
* Phaser示例
*/
public class PhaserRunner {
// 定义每个阶段需要执行3个小任务
public static final int PARTIES = 3;
// 定义需要4个阶段完成的大任务
public static final int PHASES = 4;
public static void main(String[] args) {
Phaser phaser = new Phaser(PARTIES) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("=======phase: " + phase + " finished=============");
return super.onAdvance(phase, registeredParties);
}
};
for (int i = 0; i < PARTIES; i++) {
new Thread(() -> {
for (int j = 0; j < PHASES; j++) {
System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));
phaser.arriveAndAwaitAdvance();
}
}, "Thread " + i).start();
}
}
}
这里我们定义个需要4个阶段完成的大任务,每个阶段需要3个小任务,针对这些小任务,我们分别起3个线程来执行这些小任务,查看输出结果为:
ini复制代码Thread 2: phase: 0
Thread 0: phase: 0
Thread 1: phase: 0
=======phase: 0 finished=============
Thread 2: phase: 1
Thread 1: phase: 1
Thread 0: phase: 1
=======phase: 1 finished=============
Thread 1: phase: 2
Thread 2: phase: 2
Thread 0: phase: 2
=======phase: 2 finished=============
Thread 1: phase: 3
Thread 0: phase: 3
Thread 2: phase: 3
=======phase: 3 finished=============
可以看到,每个阶段都是三个线程都完成来才进入下一个阶段
结合AQS的原理,大概猜测一下Phaser的实现原理:
- 首先,需要存储当前阶段phase、当前阶段的任务数(参与者)parties、未完成参与者的数量,这三个变量我们可以放在一个变量state中存储。
- 其次,需要一个队列存储先完成的参与者,当最后一个参与者完成任务时,需要唤醒队列中的参与者。
结合上面的案例带入:初始时当前阶段为0,参与者为3个,未完成参与者数为3;
- 第一个线程执行到
phaser.arriveAndAwaitAdvance();
时进入队列; - 第二个线程执行到
phaser.arriveAndAwaitadvance();
时进入队列; - 第三个线程执行到
phaser.arriveAndAwaitadvance();
时先执行这个阶段的总结onAdvance()
, 再唤醒签名两个线程继续执行下一个阶段的任务
Phaser分析
主要方法
register()
,增加一个参与者,需要同时增加parties和unarrived两个数值,也就是state中的16位和低16位
onAdvance(int phase, int registeredParties)
,当前阶段所有线程完成时,会调用OnAdvance()
bulkRegister(int parties)
,指定参与者数目注册到Phaser中,同时增加parties和unarrived两个数值
arrive()
,作用使parties值加1,并且不在屏障处等待,直接运行下面的代码
awaitAdvance(int phase)
,如果传入的参数与当前阶段一致,这个方法会将当前线程置于休眠,直到这个阶段的参与者都完成运行。如果传入的阶段参数与当前阶段不一致,立即返回
arriveAndAwaitAdvance()
,当前线程当前阶段执行完毕,等待其它线程完成当前阶段
arriveAndDeregister()
,当一个线程调用来此方法时,parties将减1,并且通知这个线程已经完成来当前预警,不会参加到下一个阶段中,因此Phaser对象在开始下一个阶段时不会等待这个线程。
awaitAdvanceInterruptibly(int phase)
,这个方法跟awaitAdvance(int phase)一样,不同之处是,如果这个方法中休眠的线程被中断,它将抛出InterruptedException异常。
Phaser 的监控方法
getPhase()
:返回当前的 phase,前面说了,phase 从 0 开始计算,最大值是 Integer.MAX_VALUE,超过又从 0 开始
getRegisteredParties()
:当前有多少 parties,随着不断地有 register 和 deregister,这个值会发生变化
getArrivedParties()
:有多少个 party 已经到达当前 phase 的栅栏
getUnarrivedParties()
:还没有到达当前栅栏的 party 数
Phaser 的分层结构
为什么要把多个 Phaser 实例构造成一棵树,解决什么问题?有什么优点?
Phaser 内部用一个 state
来管理状态变化,随着 parties 的增加,并发问题带来的性能影响会越来越严重。
/**
* 0-15: unarrived 还没有抵达屏障的参与者的个数 (bits 0-15)
* 16-31: parties, 所以一个 phaser 实例最大支持 2^16-1=65535 个 parties
* 32-62: phase, 屏障所处的阶段 31 位,那么最大值是 Integer.MAX_VALUE,达到最大值后又从 0 开始
* 63: terminated 屏障的结束标记
*/
private volatile long state;
通常我们在说 0-15 位这种,说的都是从低位开始的
state 的各种操作依赖于 CAS,典型的无锁操作,但是,在大量竞争的情况下,可能会造成很多的自旋
而构造一棵树就是为了降低每个节点(每个 Phaser 实例)的 parties 的数量,从而有效降低单个 state 值的竞争。
总结
Phaser
- Phaser适用于多阶段多任务的场景,每个阶段的任务都可以控制的很细;
- Phaser内部使用state变量及队列实现整个逻辑;
- state的高32位存储当前阶段phase,中16位存储当前阶段参与者(任务)的数量parties,低16位存储未完成参与者的数量unarrived;
- 队列会根据当前阶段的奇偶性选择不同的队列;
- 当不是最后一个参与者到达时,会自旋或者进入队列排队来等待所有参与者完成任务;
- 当最后一个参与者完成任务时,会唤醒队列中的线程并进入下一阶段。
Phaser相对于CyclicBarrier和CountDownLatch的优势?
优势主要有两点:
- Phaser可以完成多阶段,而一个CyclicBarrier 或者CountDownLatch一般只能控制一到两个阶段的任务;
- Phaser每个阶段的任务数量可以控制,而一个CyclicBarrier 或者 CountDownLatch任务数量一旦确定不可修改。
相关博客:
https://juejin.cn/post/6902060644661952525
https://pdai.tech/md/java/thread/java-thread-x-juc-tool-phaser.html
https://www.cnblogs.com/youthlql/articles/14027047.html