Chain分隔
文章目录
- Chain分隔
- 如何切断任务链?
- startNewChain 与 disableChaining区别
- 全局切断任务链(chain)
- web端效果
- 查看隔离后依赖链忙碌程度
- 什么是Backpressured(被压/反压)?
- 代码样例
- 参考文献
如何切断任务链?
由于共享slot的存在,当一个任务链的计算量特别庞大时,且只在一个slot上执行,对于slot的压力过大,需要拆开。
1)disableChaining:如果对一个算子使用了disableChaining(取消任务链),那么该算子就会与前后算子隔离开,不参与任务链的组合,独用一个slot。
2)startNewChain:如果一个任务链只是过于庞大,但是算子之间的操作简单,仅想拆开为2个任务链,并且参与任务链组合,就需要对算子使用startNewChain(开启一个新的任务链),意思就是之前该怎么合并就怎么合并,map之后重新合成任务链。
3)env.disableOperatorchaining ,全局切断任务链。所有的算子都是独立的任务,再按照并行度拆分开。
4)slotSharingGroup:取消任务链后,还需要打破slot共享,使用单独的slot。这时需要使用slotSharingGroup,之前的算子不管,从当前算子开始共享slot,并可以为共享组命名。同一个共享组内的算子可以共享一个slot,不同共享组的slot必须分配到不同的slot。且可以设置多个共享组,跨算子设置。
startNewChain 与 disableChaining区别
stream.startNewChain()
从当前算子开启新的链,与前面的链断开,后面的链不断开
stream.disableChaining()
当前算子独立一个链,前后都分开,从stream算子开始到结束禁用Chain,单独划出来,将不会合并链。
全局切断任务链(chain)
env.disableOperatorchaining
全局切断任务链。所有的算子都是独立的任务,再按照并行度拆分开
web端效果
查看隔离后依赖链忙碌程度
Backpressured(背压/反压): 从字面上可以理解,后端的压力,这里代表当前算子的下一个环节压力,也可以理解为,下一个环节有这个Backpressured比例数据未处理
busy(忙碌): 代表当前环节忙碌程度
**Idle(空闲):**代表当前环节空闲程度
什么是Backpressured(被压/反压)?
概括来说,反压就是Job Graph中的某些operator处理数据的速率低于接收数据的速率,造成数据积压,积压的数据填充到这些operator子任务的输入缓冲区。一旦输入缓冲区满了,反压就会传播到上游子任务的输出缓冲区。上游子任务也会被迫降低自身数据处理速度,以匹配下游opeartor的处理速度。由此类推,反压一步一步向上游传递,直至到达数据源operator端。
具体关于Backpressured,参考《Flink BackPressure详细介绍》
代码样例
val writeHbaseResult = userChangeWriteHbase.process(new HbaseProcessFunction).name("write hbase").disableChaining()
val writeHbaseResult = userChangeWriteHbase.process(new HbaseProcessFunction).name("write hbase").startNewChain()
参考文献
https://www.kancloud.cn/zhangpn/flink/1743325
https://cdn.modb.pro/db/128767
https://www.163.com/dy/article/H2VSPV1T05529EM4.html
https://v.youku.com/v_show/id_XNDU2MjU5NjI4OA==.html
https://cloud.tencent.com/developer/article/1797980