首页 > 其他分享 >Flink优化----数据倾斜

Flink优化----数据倾斜

时间:2024-12-25 09:27:17浏览次数:11  
标签:Flink 聚合 flink ---- 数据量 倾斜 数据

目录

判断是否存在数据倾斜

数据倾斜的解决

keyBy 后的聚合操作存在数据倾斜

为什么不能直接用二次聚合来处理

使用 LocalKeyBy 的思想

DataStream API 自定义实现的案例

keyBy 之前发生数据倾斜

keyBy 后的窗口聚合操作存在数据倾斜

实现思路

提交原始案例

提交两阶段聚合的案例

总结


        在大数据流式处理的场景中,Apache Flink 发挥着极为重要的作用。然而,数据倾斜问题就像隐藏在暗处的绊脚石,常常会对 Flink 作业的性能产生严重影响,导致任务执行效率低下、资源利用不均衡等状况。了解如何精准判断数据倾斜是否存在,并掌握有效的解决策略,对于保障 Flink 作业的平稳、高效运行意义重大。无论是开发人员还是运维人员,都需要深入探究这一关键问题,以便更好地应对实际工作中可能遇到的各种挑战,让我们一同走进 Flink 数据倾斜相关内容的学习。

判断是否存在数据倾斜

        相同 Task 的多个 Subtask 中,个别 Subtask 接收到的数据量明显大于其他 Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。

        另外,有时 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。

数据倾斜的解决

keyBy 后的聚合操作存在数据倾斜

提交案例:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo1 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--local-keyby false

查看 webui:

为什么不能直接用二次聚合来处理

        Flink 是实时流处理,如果 keyby 之后的聚合操作存在数据倾斜,且没有开窗口(没攒批)的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候 Flink 是来一条处理一条,且向下游发送一条结果,对于原来 keyby 的维度(第二阶段聚合)来讲,数据量并没有减少,且结果重复计算(非 FlinkSQL,未使用回撤流),如下图所示:

使用 LocalKeyBy 的思想

        在 keyBy 上游算子数据发送之前,首先在上游算子的本地对数据进行聚合后,再发送到下游,使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似 MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。从 Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
实现方式:

  • DataStreamAPI 需要自己写代码实现
  • SQL 可以指定参数,开启 miniBatch 和 LocalGlobal 功能(推荐,后续介绍)

DataStream API 自定义实现的案例

以计算每个 mid 出现的次数为例,keyby 之前,使用 flatMap 实现 LocalKeyby 功能

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class LocalKeyByFlatMapFunc extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> implements CheckpointedFunction {

    //Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
    private ListState<Tuple2<String, Long>> listState;

    //本地 buffer,存放 local 端缓存的 mid 的 count 信息
    private HashMap<String, Long> localBuffer;

    //缓存的数据量大小,即:缓存多少数据再向下游发送
    private int batchSize;

    //计数器,获取当前批次接收的数据量
    private AtomicInteger currentSize;


    //构造器,批次大小传参
    public LocalKeyByFlatMapFunc(int batchSize) {
        this.batchSize = batchSize;
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, Long>> out) throws Exception {
        // 1、将新来的数据添加到 buffer 中
        Long count = localBuffer.getOrDefault(value, 0L);
        localBuffer.put(value.f0, count + 1);

        // 2、如果到达设定的批次,则将 buffer 中的数据发送到下游
        if (currentSize.incrementAndGet() >= batchSize) {
            // 2.1 遍历 Buffer 中数据,发送到下游
            for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
                out.collect(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));
            }

            // 2.2 Buffer 清空,计数器清零
            localBuffer.clear();
            currentSize.set(0);
        }

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 将 buffer 中的数据保存到状态中,来保证 Exactly Once
        listState.clear();
        for (Map.Entry<String, Long> midAndCount : localBuffer.entrySet()) {
            listState.add(Tuple2.of(midAndCount.getKey(), midAndCount.getValue()));
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 从状态中恢复 buffer 中的数据
        listState = context.getOperatorStateStore().getListState(
                new ListStateDescriptor<Tuple2<String, Long>>(
                        "localBufferState",
                        Types.TUPLE(Types.STRING, Types.LONG)
                )
        );
        localBuffer = new HashMap();
        if (context.isRestored()) {
            // 从状态中恢复数据到 buffer 中
            for (Tuple2<String, Long> midAndCount : listState.get()) {
                // 如果出现 pv!= 0,说明改变了并行度,ListState 中的数据会被均匀分发到新的 subtask中
                // 单个 subtask 恢复的状态中可能包含多个相同的 mid 的 count数据
                // 所以每次先取一下buffer的值,累加再put
                long count = localBuffer.getOrDefault(midAndCount.f0, 0L);
                localBuffer.put(midAndCount.f0, count + midAndCount.f1);
            }
            // 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
            currentSize = new AtomicInteger(batchSize);
        } else {
            currentSize = new AtomicInteger(0);
        }

    }
}

提交 localkeyby 案例:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo1 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--local-keyby true

查看 webui:

可以看到每个 subtask 处理的数据量基本均衡,另外处理的数据量相比原先少了很多。

keyBy 之前发生数据倾斜

        如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少。对于不存在 keyBy 的 Flink 任务也会出现该情况。

        这种情况,需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale 算子即可将数据均匀分配,从而解决数据倾斜的问题。

keyBy 后的窗口聚合操作存在数据倾斜

        因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:

实现思路

  • 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
    注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
  • 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合

提交原始案例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo2 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--two-phase false

查看 WebUI:

提交两阶段聚合的案例

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SkewDemo2 \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--two-phase true \
--random-num 16

查看 WebUI:可以看到第一次打散的窗口聚合,比较均匀

第二次聚合,也比较均匀:

随机数范围,需要自己去测,因为 keyby 的分区器是(两次 hash * 下游并行度 / 最大并行度)
SQL 写法参考:https://zhuanlan.zhihu.com/p/197299746

总结

        本文聚焦 Flink 数据倾斜这一关键要点,系统地介绍了判断与解决相关问题的重要知识。

        在判断数据倾斜方面,明确指出可依据 Flink Web UI 中 Subtask 的数据处理量差异,以及 Checkpoint detail 里 SubTask 的 State size 情况来判断是否存在数据倾斜,为及时发现问题提供了有效途径。

        而在解决数据倾斜问题上,针对 keyBy 后的聚合操作存在倾斜,介绍了通过代码实现(如 DataStreamAPI 自定义代码及 SQL 开启相关功能)来均衡数据量;对于 keyBy 之前发生倾斜,利用 shuffle、rebalance 或 rescale 算子强制数据均匀分配;keyBy 后的窗口聚合操作倾斜时,则采用两阶段聚合的方式,详细说明了各阶段的操作要点及注意事项,并给出相应案例查看效果。

        总之,掌握这些数据倾斜相关的判断方法与解决策略,能帮助使用者更好地优化 Flink 作业,提升整体性能,避免因数据倾斜带来的诸多不良影响,保障数据处理工作顺利开展。

标签:Flink,聚合,flink,----,数据量,倾斜,数据
From: https://blog.csdn.net/weixin_64726356/article/details/144632283

相关文章

  • 【Java基础面试题046】Java中的注解原理是什么?
    注解其实就是一个标记,是一种提供元数据的机制,用于给代码添加说明信息。可以标记在类上、方法上、属性上等,标记自身也可以设置一些值。注解本身不影响程序的逻辑执行,但可以通过工具或框架来利用这些信息进行特定的处理,如代码生成、编译时检查、运行时处理等。扩展知识自定义......
  • Alluxio 单机和集群部署教程
    目录Alluxio单机和集群部署教程第一部分:Alluxio概述Alluxio的关键特点:第二部分:Alluxio单机部署教程1.安装Alluxio1.1下载并解压Alluxio1.2配置Alluxio1.3启动Alluxio1.4测试功能2.单机案例代码实现(Python)2.1Python示例代码3.常见问题及解决方法3.1A......
  • Python 列表(详解)
     列表列表的特点:有序,可重复,长度可变(增删改查),异构,可切片,可遍历。列表的基本语法:列表名=[元素]list=['apple','banana','pineapple']列表的作⽤是⼀次性存储多个数据,并且列表可以存储不同类型的数据一:列表的增删改查:增加:append():增加指定数据到列表中names=['1',......
  • Logstash 单机与集群部署教程
    目录Logstash单机与集群部署教程第一部分:Logstash概述第二部分:Logstash单机部署教程1.安装Logstash1.1安装依赖1.2配置Logstash1.3启动Logstash2.单机案例代码实现(Python)3.常见问题及解决方法3.1Logstash启动失败3.2无法连接到Elasticsearch第三部分:L......
  • 开发安卓设备端应用
    开发安卓设备端应用在安卓设备端,开发者需要运行一个安卓应用(文档中也称为小程序Launcher),用来进行设备注册、运行小程序进行VOIP通话等操作。1.接入WMPF并运行小程序在安卓平台上,小程序视频通话能力是在小程序中实现的。需要由设备端运行的安卓应用拉起开发者开发的小程序......
  • 支付刷脸模式
    校园场景支付刷脸模式对于部分存量的支付刷脸设备,我们额外支持通过微信支付人脸识别的用户身份来发起通话。支付刷脸设备的通话存在以下限制:只支持微信支付刷脸设备使用,具体的开通方式请参考微信支付的相关文档;只支持安卓设备,WMPF<=2.0版本;只支持设备发起呼叫,不支持手机......
  • VoIP 插件错误码
    VoIP插件错误码1.后台返回错误码errCode描述1roomId错误2设备deviceId错误3voip_id错误4voipToken错误(刷脸模式)5生成voip房间错误7openId错误8openId未授权(刷脸模式)9openId未授权设备(硬件模式)或不是userId联系人(刷脸......
  • 你有阅读过哪些类库的源码吗?你是如何阅读的?
    一、选择合适的类库在开始阅读源码之前,首先要选择一个合适的类库。可以根据自己的工作需求、技术兴趣或学习目标来选择。一些流行的前端类库如React、Vue、Angular等,它们的源码都是公开可查的,而且有着丰富的社区资源和文档支持。二、了解类库的整体架构在开始深入阅读源码之前,......
  • getIotBindContactList
    getIotBindContactList(Objectreq)本接口为异步接口,返回Promise对象。根据openId,查询指定用户是否授权某台设备。参数Objectreq属性类型默认值必填说明snstring是设备SNmodel_idstring是设备的model_idopenid_liststring[]是要查询的......
  • setVoipEndPagePath
    voidsetVoipEndPagePath(Objectreq)设置插件功能执行完成后的跳转页面路径。参数Objectreq属性类型默认值必填说明最低版本urlstring是跳转页面的路径keystring是业务类型,参见下文optionsstring否跳转页面的queryString。最终跳转的路径......