首页 > 其他分享 >flink优化

flink优化

时间:2023-10-10 23:14:16浏览次数:38  
标签:count keyby 聚合 flink 并行度 state 优化

1、时间定义、事件时间和 处理时间
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/time_attributes/#defining-in-ddl-1

2、自定义函数
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/#table-functions

3、窗口聚合
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/window-agg/
幂等性和两阶段提交(有事务)

4、资源配置
一个core:一个slot或者两个slot
一个slot 2-8G大小
需要考虑 container大小和配置

并行度: 任务并行度10以下,测试单个并行度的处理上限,QPS / 单任务并行度的处理能力
最后 并行度*1.2

transform端并行度配置
不太重的操作,map、filter、flatmap一般和source并行度保持一致

keyby之后的双子(keyGroup最小值为128)
如果并发比较大,建议设置并行度为2的整数次幂
小的并发任务不一定要设置成2的整数次幂
大并发任务如果没有keyby,并行度也无需设置成2的整数次幂

sink端并行度的配置
根据sink端的数据量及下游的服务抗压能力进行评估
如果是kafka,这设置为kafka对应topic的分区数

5、RocksDB大状态调优
RocksDB是基于LSM Tree实现的,类似于(Hbase),写数据都是先换存到内存中,所以RocksDB的写请求效率比较高。
RocksDB使用 内存 结合磁盘的方式来存储数据,每次读取数据时,先从 blockcache中查找,如果内存中没有再去磁盘中查询。
优化后差不多 单 并行度tps 5000record/s。 性能瓶颈主要 在于RocksDB对磁盘的读请求。

设置本地RocksDB多目录 ***
在flink-conf.yaml中配置 : state.backend.rocksdb.localdir: /data/flink/rocksdb,/data2/flink/rocksdb
增量检查点: ***
state.backend.incremental: true,默认为false
硬盘模式:
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM 设置为机械硬盘+内存模式,
有条件 SSD,指定为 FLASH_SSD_OPTIMIZED
内存cache设置:
state.backend.rocksdb.block.cache-size: 整个RocksDB共享一个block cache,参数越大读数据命中率越高,默认为8M,建议设置为64~256MB
flush 和 合并 sst文件的线程数,
state.backend.rocksdb.thread.num, 默认为1,建议调大,机械硬盘用户可以改为4等更大的值

############ 写缓存
state.backend.rocksdb.writebuffer.size
RocksDB中,每个state使用一个Column Family,每个Column Family使用独占的write buffer,建议调大,如32M
state.backend.rocksdb.writebuffer.count:
每个Column Family对应的writebuffer数量,默认值是2,对于机械硬盘来说,如果内存够大,可以调大到5左右。
state.backend.rocksdb.writebuffer.number-to-merge:
将数据从writebuffer中flush到磁盘时,需要合并的writebuffer数量,默认为1,可以调成3.

state.backend.local-recovery:
设置本地恢复,当flink任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从hdfs拉去数据

6.CheckPoint设置
Checkpoint时间间隔设置为分钟级别,你如1分钟、3分钟,对于状态很大的任务每次CheckPoint访问HDFS比较耗时,可以设置为5-10分钟一次,
并且调大两次CheckPoint之间的暂停间隔,间隔时间至少暂停4或8分钟。

7、使用Flink ParameterTool读取配置
读取参数:
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
读取配置文件:
ParameterTool parameterTool = ParameterTool.fromPropertiesFile("./");
String s = parameterTool.get("");

6、反压配置
ok: 0<=比例<=0.10
low:0.10<比例<=0.5
high: 0.5<比例 <=1 ,表示被反压

6.1资源不足导致的反压

6.2负载不均衡
6.3外部依赖

7、数据倾斜
7.1 keyby之前发生的数据倾斜,可以使用reblance、shuffle或者rescale将数据均匀分配。推荐使用reblance。
7.2 keyby后的聚合操作导致的数据倾斜,
keyby后直接聚合:
a.加随机数实现双重聚合-最后结果依然倾斜
ds.map(id-> id + "-" + "随机数")
.keyby(id).reduce()
.map(id + "-" +随机数 ->id)
.keyby(id)
.reduce()
b.预聚合,定时器 + 状态
keyby后开窗聚合,两阶段聚合。
第一阶段聚合,key拼接随机数前缀或后缀,进行keyby、开窗、聚合
聚合完不再是windowedStream,要获取WindowsEnd作为窗口标记,作为第二阶段分组一句,避免不同窗口的结果聚到一起
第二阶段,去掉随机数前缀或后缀,按照原来的key及windowEnd作为keyby、聚合

8、KafkaSource调优
8.1动态发现分区,通过Properties指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,多久检测一次是否有新创建的partition。默认值是Long的最小值,标识不开启,大于0标识开启
8.2 从Kafka数据源处生成WaterMark,用处不是很大
8.3 设置空闲等待

由于下游算子WaterMark的计算方式是取所有不同的上游并行数据源WaterMark的最小值,则其WaterMark将不会发生变化,导致窗口、定时器等不会触发。
为了解决这个问题,可以使用WatermarkStrategy来监测空闲输入并将其标记为空闲状态。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)).withIdleNess(Duration.ofMinutes(5))
8.4 Kafka的Offset消费策略
FlinkKafkaConsumer可以调用以下API,注意与“auto.offset.reset"区分开
a. setStartFromGroupOffsets,默认消费策略,默认读取上次保存的offset信息,如果是第一次启动,读取不到offset信息,已根据auto.offset.reset值消费
建议使用这个
b. setStartFromEarliest(),从最早的数据开始进行消费,忽略存储的offset信息
c. setStartFromLatest(),从最新的数据进行消费,忽略存储的offset信息
d. setStartFromSpecificOffsets(Map),从指定位置开始消费
e. setStartFromTimestamp(long),从topic中指定的时间点开始消费,指定时间点之前的数据忽略
f. 当checkpoint机制开启的时候, kafkaConsumer会定期把kafka的offset信息还有其他的operator的状态信息一块保存起来。当Job失败重启的时候,
Flink会从最近一次的CheckPoint中恢复数据。
g. 为了能够使用支持容错的kafka Consumer,需要开启CheckPoint。
9、Flink SQL优化
参数配置列表: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
9.1 Group Aggregate优化
a. 开启MiniBatch(提升吞吐)
MiniBatch是微批处理,原理是缓存一定的数据后再触发处理,以减少对State的访问从而提升吞吐并减少数据的输出量。
MiniBatch主要依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。

Configuration conf = tevn.getConfig().getConfiguration;
// 开启MiniBatch
conf.setString("table.exec.mini-batch.enabled","true");
// 批量输出的时间间隔
conf.setString("table.exec.mini-batch.allow-latency","5s");
// 防止OOM设置每个批次最多缓存数据的条数,可以设置为2万条
conf.setString("table.exec.mini-batch.size","20000");

使用场景,微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著提升性能,建议开启。

b. 开启LocalGlobal(解决常见数据热点问题)
LocalGlobal优化将原先的Aggregate分成Local + Global两阶段聚合,即MapReduce模型中的Combine + Reduce处理模式。
LocalGlobal本质上能够靠LocalAgg的聚合筛除部分倾斜数据,从而降低GlobalAgg的热点,提升性能。

LocalGlobal开启方式:
a.需要开启MiniBatch,依赖于MiniBatch参数
b.table.optimizer.agg-phase-strategy,聚合策略。默认AUTO,
支持参数AUTO、
TWO_PHASE(使用localGlobal两阶段聚合)、
ONE_PHASE(仅使用Global一阶段聚合)

c. 开启Split Distinct(解决Count distinct热点问题)
LocalGlobal优化针对普通聚合(如:sum、count、max、min和avg)有较好效果,对于count distinct收效不明显。
从Flink 1.9.0版本开始,提供了 count distinct自动打散功能。
d. 改写 agg with filter语法(提升大量 count distinct 场景性能)
在某些场景下,可能需要从不同维度来统计uv,可能会用到case when语法。
select day,count(distinct user_id) as total_uv,
count(distinct case when flag in ('a','b') then user_id else null end) as app_Uv
from t
group by day
在这种情况下,建议使用filter语法,目前的flink SQL优化器可以识别同一唯一键的不同filter参数。
select day,count(distinct user_id) as total_uv,
count(distinct user_id) filter(where flag in ('a','b')) as app_Uv
from t
group by day

9.2 TopN优化
a. 使用最有算法
使用SQL满足一定条件,会自动启用算法。
UpdateFastRank算法,需要满足两个条件。
a.1 输入流有PK(primary key)信息,例如Group By avg。
a.2 排序字段的更新是单调的,而且单调方向于排序方向相反。 如 order by count/count_distinct/sum(证书) desc
不建议生产环境使用该算法。
b. 无排名优化(解决数据膨胀问题)
select col1,col2,col3,col4
from (
select *,
row_number() over(partition by col1,col2 order by col1 asc,col2 desc) rownum
from t
) t
where rownum <= N and col1>2
c. 增加TopN的Cache大小
TopN为了提升性能,有一个State Cache层,Cache层能提升对State的访问效率。
TopN的Cache命中率计算公式: cache_hit = cache_size * parallelism / top_n / partition_key_num
例如,Top100可以缓存1w条,并发50.当 parttion by key为10w时,cache命中率 1w*50/10w/100 = 0.05,此时需要增加cache size,
也需要适当增加topn节点的Heap Memory。
Configuration conf = tevn.getConfig().getConfiguration;
conf.setString("table.exec.topn.cache-size","2000000")
d.PartitionBy的字段要有时间类型字段
要带上Day字段,否则会由于State ttl出现错误。跨天问题。
9.3 内置函数
慎用regexp函数

标签:count,keyby,聚合,flink,并行度,state,优化
From: https://www.cnblogs.com/wind-man/p/17755978.html

相关文章

  • 前端面试八股文 工程化+性能优化+计算机基础
    前端面试八股文工程化+性能优化+计算机基础前端页面性能如何优化?以下是一些前端性能优化的常用方法:减少资源加载时间:这可以通过多种方式实现,比如压缩和合并CSS和JavaScript文件,使用CDN(内容分发网络)来快速传输资源,以及使用浏览器缓存来避免重复下载。优化图片:图片通常是网页......
  • 学习笔记:斜率优化
    引入有时候我们会遇见一些dp式子\[f_i=\min(f_j+a_i\timesb_i)(j\leqi-1)\]这些式子和\(j\)没有任何关系可以前缀处理最小值\(O(n)\)快速解决但是有些式子是这样的\[f_i=\min(f_j+a_i\timesb_j+c_i)\]这种问题可以使用斜率优化至\(O(n\logn)\)例题传送门很......
  • LntonGBS针对数据库删除级联数据后的无效数据进行的优化
    LntonGBS国标视频云服务可支持通过国标GB28181协议将设备接入,实现视频的实时监控直播、录像、语音对讲、云存储、告警、级联等功能,同时也支持将接入的视频流进行全终端、全平台分发,分发的视频流包括RTSP、RTMP、FLV、HLS、WebRTC等格式。同时LntonGBS平台也支持海康Ehome协议及SDK......
  • Ansible 执行过程分析、异步、效率优化
    直观观察任务执行速度插件callback_whitelist=profile_tasks,profile_roles,timer该插件可以直接统计每个task,每个role,每个play执行的时间,方面观察出哪些任务耗时Ansible执行过程分析下面是2.9的默认执行过程分析(简单分析,具体请vvv):获取用户家目录,此处为/root在家目录下创建临......
  • ControlNet-trt优化总结4:onnx图修改与重建
    ControlNet-trt优化总结4:onnx图修改与重建在这一节中,主要总结网络层面的优化,针对于算子插件优化,主要聚焦于以下几点:修改onnx图,添加不支持的算子插件增加前后处理部分,前后处理导出为onnx图onnx图surgeon原有的graph中存在大量的GN操作,正常fp32的时候没有问题,但是当使用fp16......
  • 编译优化概念:Canonicalization
      编译优化概念:Canonicalization-知乎(zhihu.com) anonicalization(规范化)是编译器IR(intermediaterepresentation)设计中的一个重要部分,它使代码转换(transformations)变得简单高效。大多数编译器都有canonicalizationpass,对于后续进行编译器优化也起到很大作......
  • 可视大盘 + 健康分机制,火山引擎 DataLeap 为企业降低资源优化门槛!
    更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 随着数仓及研发技术团队维护的数据量大、资源使用量大、成本越高、优化压力越大。如何主动发现无效或低效使用的资源,并且可以周期性高效的进行主动治理变为团队治理目标核心诉求之一。在传......
  • CS2 优化——避免手感粘滞
    最重要的一个,禁用CS2.exe的全屏优化其次nvidia设置是这样游戏里“nvidiareflex低延迟”选择“已启用+加速”......
  • Python信贷风控模型:梯度提升Adaboost,XGBoost,SGD, GBOOST, SVC,随机森林, KNN预测金
    原文链接:http://tecdat.cn/?p=26184 原文出处:拓端数据部落公众号最近我们被客户要求撰写关于信贷风控模型的研究报告,包括一些图形和统计输出。在此数据集中,我们必须预测信贷的违约支付,并找出哪些变量是违约支付的最强预测因子?以及不同人口统计学变量的类别,拖欠还款的概率如何......
  • SqlServer 删除的性能优化
    SqlServer删除的性能优化最近遇到个SqlServer删除性能的问题。假设我们有如下的表定义CreateTableTree(IdINT,NameNVARCHAR(MAX),ParentIdINT,PRIMARYKEY(Id),FOREIGNKEY(ParentId)REFERENCESTree(Id))当我们Tree表的数据量比较大的时候,我们删......