1. Flink架构
Flink的角色
- Client:获取、转换、提交代码给jm.
- JM:对job做任务调度,再对job进一步处理转换,然后分发给TM.
- TM:数据处理.
部署模式
区别:集群的生命周期和资源的分配方法,代码的main方式在client执行还是JM执行。
- session会话:启动一个集群保持会话,通过client提交作业给jm.资源共享适合小而多的job.
yarn-session.sh -d -nm test
flink run -d -c com.wsl.test wc2.jar # core1 7777
- per-job单作业:每个job启动一个集群,job完成集群关闭资源释放。需要资源调度框架yarn、k8s.
flink run -d -m yarn-cluster -c com.wsl.day00.base.test wc2.jar -ynm wc # < 1.0
flink run -d -m yarn-cluster flink-1.13.0/examples/batch/WordCount.jar -input hdfs://master1:8020/input/wc.txt -output hdfs://master1:8020/output/result2
flink run -d -t yarn-per-job -c com.wsl.day00.base.test wc2.jar # > 1.0 #flink1.17
flink run -d -Dyarn.application.name=flink1.17 -t yarn-per-job -c org.example.App flink1.17.jar
/data/module/flink-1.17.0/bin/flink run -d -t yarn-per-job -Dyarn.application.name=waf_log -c com.sevenxnetworks.Start /data/jar/waf-log-jar-with-dependencies.jar
flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
/data/module/hadoop-3.2.4/bin/yarn application -list
yarn application -list 2>/dev/null | awk '{print $2}' | grep dpi | wc -l #任务名
yarn logs -applicationId application_1627612938926_0005 #查看任务的log
- application应用:不在client上执行代码,直接提交到JM上执行,每个job启动一个JM.
flink run-application -t yarn-application -c xxx xxx.jar
2. 核心概念
Window分类
- TimeWindow
- CountWindow
- SlidingPreocessing、TumblingProcessing、Session、Global
//按键分区(Keyed)相同的key会发送到同一并行子任务,每个Key都定义了一组窗口单独计算。
stream.keyBy(...)
.window(...)
//非按键分区(Non-Keyed)并行度=1
stream.windowAll(...)
聚合函数:
- 全量聚合 apply() process()
- 增量聚合 aggregate() reduce()
- 富函数:侧输出流、运行时上下文可做状态编程、生命周期方法
- Process函数:侧输出流、运行时上下文可做状态编程、生命周期、定时器
WaterMark
.assignTimestampsAndWatermarks(forBoundedOutOfOrderness(1))) :到达窗口结束时间触发当前计算,但不关闭窗口,以后每来一次迟到数据触发一次计算.
.allowedLateness(Time.seconds(3)) :当允许迟到时间过了才会真正关闭窗口。
.getSideOutput(0) :窗口关闭了后又来了迟到数据,通过测输出流输出。
状态
- Row State
- Managed State
- Keyed State
- Operator State (BroadCastState))
状态后端
本地状态的管理
- 默认的 HashMapStateBackend:TM的jvm里
- 内嵌RocksDB EmbeddedRocksDBStateBackend:持久化到TM的本地数据目录里,序列化和反序列化
配置
- flink-conf.yaml state.backend: hashmap
- 代码里为每个job配置
checkpoint
定时存档,遇到故障从检查点读档恢复之前的状态。
- 周期性触发保存:代码设置
- 保存的时间点(barrier):当所有的task处理到同一数据后,将此时的状态保存下来做一次快照。
barrier
触发检查点保存的时间点
JM定时向TM发送指令,触发保持检查点(带检查点id),向source中插入一条barrier,source任务将自己的状态保存起来,做一次ckp,barrier继续向下游传递,之后source就可以继续读入新数据了,然后后面的算子任务继续做ckp。遇到keyby分区,barrier需要广播到每个并行度,所以这个下游算子会收到多个barrier,需要做执行“分界线对齐”操作,即需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。
- Barrier对齐:下游任务等到所有并行度的barrier到齐的过程中,barrier已经到的上游任务又收到了数据(这已经是下次的ckp):
- 至少一次:直接计算。重新启动两次ckp之间的数据会重复计算。
- 精准一次:两次ckp之间的数据不会直接计算,而且缓存起来。
- Barrier不对齐:直接把数据和barrier缓存起来
- 精准一次
端到端一致性
标签:flink,入门,barrier,Flink,jar,yarn,application,job From: https://www.cnblogs.com/xiao-hua-sheng/p/18112696