1、时间定义、事件时间和 处理时间标签:count,keyby,聚合,flink,并行度,state,优化 From: https://www.cnblogs.com/wind-man/p/17755978.html
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/time_attributes/#defining-in-ddl-1
2、自定义函数
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/#table-functions
3、窗口聚合
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/window-agg/
幂等性和两阶段提交(有事务)
4、资源配置
一个core:一个slot或者两个slot
一个slot 2-8G大小
需要考虑 container大小和配置
并行度: 任务并行度10以下,测试单个并行度的处理上限,QPS / 单任务并行度的处理能力
最后 并行度*1.2
transform端并行度配置
不太重的操作,map、filter、flatmap一般和source并行度保持一致
keyby之后的双子(keyGroup最小值为128)
如果并发比较大,建议设置并行度为2的整数次幂
小的并发任务不一定要设置成2的整数次幂
大并发任务如果没有keyby,并行度也无需设置成2的整数次幂
sink端并行度的配置
根据sink端的数据量及下游的服务抗压能力进行评估
如果是kafka,这设置为kafka对应topic的分区数
5、RocksDB大状态调优
RocksDB是基于LSM Tree实现的,类似于(Hbase),写数据都是先换存到内存中,所以RocksDB的写请求效率比较高。
RocksDB使用 内存 结合磁盘的方式来存储数据,每次读取数据时,先从 blockcache中查找,如果内存中没有再去磁盘中查询。
优化后差不多 单 并行度tps 5000record/s。 性能瓶颈主要 在于RocksDB对磁盘的读请求。
设置本地RocksDB多目录 ***
在flink-conf.yaml中配置 : state.backend.rocksdb.localdir: /data/flink/rocksdb,/data2/flink/rocksdb
增量检查点: ***
state.backend.incremental: true,默认为false
硬盘模式:
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM 设置为机械硬盘+内存模式,
有条件 SSD,指定为 FLASH_SSD_OPTIMIZED
内存cache设置:
state.backend.rocksdb.block.cache-size: 整个RocksDB共享一个block cache,参数越大读数据命中率越高,默认为8M,建议设置为64~256MB
flush 和 合并 sst文件的线程数,
state.backend.rocksdb.thread.num, 默认为1,建议调大,机械硬盘用户可以改为4等更大的值
############ 写缓存
state.backend.rocksdb.writebuffer.size
RocksDB中,每个state使用一个Column Family,每个Column Family使用独占的write buffer,建议调大,如32M
state.backend.rocksdb.writebuffer.count:
每个Column Family对应的writebuffer数量,默认值是2,对于机械硬盘来说,如果内存够大,可以调大到5左右。
state.backend.rocksdb.writebuffer.number-to-merge:
将数据从writebuffer中flush到磁盘时,需要合并的writebuffer数量,默认为1,可以调成3.
state.backend.local-recovery:
设置本地恢复,当flink任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从hdfs拉去数据
6.CheckPoint设置
Checkpoint时间间隔设置为分钟级别,你如1分钟、3分钟,对于状态很大的任务每次CheckPoint访问HDFS比较耗时,可以设置为5-10分钟一次,
并且调大两次CheckPoint之间的暂停间隔,间隔时间至少暂停4或8分钟。
7、使用Flink ParameterTool读取配置
读取参数:
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
读取配置文件:
ParameterTool parameterTool = ParameterTool.fromPropertiesFile("./");
String s = parameterTool.get("");
6、反压配置
ok: 0<=比例<=0.10
low:0.10<比例<=0.5
high: 0.5<比例 <=1 ,表示被反压
6.1资源不足导致的反压
6.2负载不均衡
6.3外部依赖
7、数据倾斜
7.1 keyby之前发生的数据倾斜,可以使用reblance、shuffle或者rescale将数据均匀分配。推荐使用reblance。
7.2 keyby后的聚合操作导致的数据倾斜,
keyby后直接聚合:
a.加随机数实现双重聚合-最后结果依然倾斜
ds.map(id-> id + "-" + "随机数")
.keyby(id).reduce()
.map(id + "-" +随机数 ->id)
.keyby(id)
.reduce()
b.预聚合,定时器 + 状态
keyby后开窗聚合,两阶段聚合。
第一阶段聚合,key拼接随机数前缀或后缀,进行keyby、开窗、聚合
聚合完不再是windowedStream,要获取WindowsEnd作为窗口标记,作为第二阶段分组一句,避免不同窗口的结果聚到一起
第二阶段,去掉随机数前缀或后缀,按照原来的key及windowEnd作为keyby、聚合
8、KafkaSource调优
8.1动态发现分区,通过Properties指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,多久检测一次是否有新创建的partition。默认值是Long的最小值,标识不开启,大于0标识开启
8.2 从Kafka数据源处生成WaterMark,用处不是很大
8.3 设置空闲等待
由于下游算子WaterMark的计算方式是取所有不同的上游并行数据源WaterMark的最小值,则其WaterMark将不会发生变化,导致窗口、定时器等不会触发。
为了解决这个问题,可以使用WatermarkStrategy来监测空闲输入并将其标记为空闲状态。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)).withIdleNess(Duration.ofMinutes(5))
8.4 Kafka的Offset消费策略
FlinkKafkaConsumer可以调用以下API,注意与“auto.offset.reset"区分开
a. setStartFromGroupOffsets,默认消费策略,默认读取上次保存的offset信息,如果是第一次启动,读取不到offset信息,已根据auto.offset.reset值消费
建议使用这个
b. setStartFromEarliest(),从最早的数据开始进行消费,忽略存储的offset信息
c. setStartFromLatest(),从最新的数据进行消费,忽略存储的offset信息
d. setStartFromSpecificOffsets(Map),从指定位置开始消费
e. setStartFromTimestamp(long),从topic中指定的时间点开始消费,指定时间点之前的数据忽略
f. 当checkpoint机制开启的时候, kafkaConsumer会定期把kafka的offset信息还有其他的operator的状态信息一块保存起来。当Job失败重启的时候,
Flink会从最近一次的CheckPoint中恢复数据。
g. 为了能够使用支持容错的kafka Consumer,需要开启CheckPoint。
9、Flink SQL优化
参数配置列表: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
9.1 Group Aggregate优化
a. 开启MiniBatch(提升吞吐)
MiniBatch是微批处理,原理是缓存一定的数据后再触发处理,以减少对State的访问从而提升吞吐并减少数据的输出量。
MiniBatch主要依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。
Configuration conf = tevn.getConfig().getConfiguration;
// 开启MiniBatch
conf.setString("table.exec.mini-batch.enabled","true");
// 批量输出的时间间隔
conf.setString("table.exec.mini-batch.allow-latency","5s");
// 防止OOM设置每个批次最多缓存数据的条数,可以设置为2万条
conf.setString("table.exec.mini-batch.size","20000");
使用场景,微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著提升性能,建议开启。
b. 开启LocalGlobal(解决常见数据热点问题)
LocalGlobal优化将原先的Aggregate分成Local + Global两阶段聚合,即MapReduce模型中的Combine + Reduce处理模式。
LocalGlobal本质上能够靠LocalAgg的聚合筛除部分倾斜数据,从而降低GlobalAgg的热点,提升性能。
LocalGlobal开启方式:
a.需要开启MiniBatch,依赖于MiniBatch参数
b.table.optimizer.agg-phase-strategy,聚合策略。默认AUTO,
支持参数AUTO、
TWO_PHASE(使用localGlobal两阶段聚合)、
ONE_PHASE(仅使用Global一阶段聚合)
c. 开启Split Distinct(解决Count distinct热点问题)
LocalGlobal优化针对普通聚合(如:sum、count、max、min和avg)有较好效果,对于count distinct收效不明显。
从Flink 1.9.0版本开始,提供了 count distinct自动打散功能。
d. 改写 agg with filter语法(提升大量 count distinct 场景性能)
在某些场景下,可能需要从不同维度来统计uv,可能会用到case when语法。
select day,count(distinct user_id) as total_uv,
count(distinct case when flag in ('a','b') then user_id else null end) as app_Uv
from t
group by day
在这种情况下,建议使用filter语法,目前的flink SQL优化器可以识别同一唯一键的不同filter参数。
select day,count(distinct user_id) as total_uv,
count(distinct user_id) filter(where flag in ('a','b')) as app_Uv
from t
group by day
9.2 TopN优化
a. 使用最有算法
使用SQL满足一定条件,会自动启用算法。
UpdateFastRank算法,需要满足两个条件。
a.1 输入流有PK(primary key)信息,例如Group By avg。
a.2 排序字段的更新是单调的,而且单调方向于排序方向相反。 如 order by count/count_distinct/sum(证书) desc
不建议生产环境使用该算法。
b. 无排名优化(解决数据膨胀问题)
select col1,col2,col3,col4
from (
select *,
row_number() over(partition by col1,col2 order by col1 asc,col2 desc) rownum
from t
) t
where rownum <= N and col1>2
c. 增加TopN的Cache大小
TopN为了提升性能,有一个State Cache层,Cache层能提升对State的访问效率。
TopN的Cache命中率计算公式: cache_hit = cache_size * parallelism / top_n / partition_key_num
例如,Top100可以缓存1w条,并发50.当 parttion by key为10w时,cache命中率 1w*50/10w/100 = 0.05,此时需要增加cache size,
也需要适当增加topn节点的Heap Memory。
Configuration conf = tevn.getConfig().getConfiguration;
conf.setString("table.exec.topn.cache-size","2000000")
d.PartitionBy的字段要有时间类型字段
要带上Day字段,否则会由于State ttl出现错误。跨天问题。
9.3 内置函数
慎用regexp函数