flink
一、搭建
1、standalone模式
参考: https://blog.51cto.com/u_11409186/5743198#_Toc85293738
1、jobmanager
1.1 先拉取镜像
docker pull flink:latest
1.2 查看端口占用
netstat -anp | grep 8081
1.3 docker-compose.yml脚本创建
先搭建一个简单的容器,没有volumes;
然后在宿主机准备好文件夹,通过docker cp将文件夹先拉出来,不需要拉取log文件。
docker cp flink-jobmanager:/opt/flink/conf /opt/flink-jobmanager/
然后删除容器,docker-compose.yml添加文件映射。
version: "3"
services:
jobmanager:
image: flink:1.17.0
ports:
- "8088:8081"
- "6123:6123"
- "6124:6124"
- "6125:6125"
- "50100-50200:50100-50200"
volumes:
- /opt/flink-jobmanager/conf:/opt/flink/conf
- /opt/flink-jobmanager/log:/opt/flink/log
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=192.168.31.56
container_name: flink-jobmanager
restart: always
1.4 启动
docker-compose up -d
1.5 进入容器修改权限
修改文件夹所属用户和用户组
chown flink:flink conf
chown flink:flink log
1.6 重启容器
docker restart flink-jobmanager
1.7 修改conf下workes文件,改为taskmanager的ip,集群往下添加
vim workers
1.8 修改flink.conf,添加
metrics.internal.query-service.port: 50100-50130
taskmanager.rpc.port: 50140-50170
1.9 重启
docker restart flink-jobmanager
2、taskmanager
2.1 先拉取镜像
docker pull flink:latest
2.2 docker-compose.yml脚本创建
先搭建一个简单的容器,没有volumes;
然后在宿主机准备好文件夹,通过docker cp将文件夹先拉出来,不需要拉取log文件。
docker cp flink-jobmanager:/opt/flink/conf /opt/flink-jobmanager/
然后删除容器,docker-compose.yml添加文件映射。
version: "3"
services:
taskmanager:
image: flink:1.17.0
network_mode: host
command: taskmanager
volumes:
- /opt/flink-taskmanager/conf:/opt/flink/conf
- /opt/flink-taskmanager/log:/opt/flink/log
- /opt/flink-taskmanager/logs:/opt/flink/logs
environment:
- JOB_MANAGER_RPC_ADDRESS=192.168.31.56
container_name: flink-taskmanager01
restart: always
2.3 启动
docker-compose up -d
2.4 进入容器修改权限
修改文件夹所属用户和用户组
chown flink:flink conf
chown flink:flink log
chown flink:flink logs
2.5 重启容器
2.6 修改flink.conf,添加
metrics.internal.query-service.port: 50100-50130
taskmanager.rpc.port: 50140-50170
2.7 重启容器
二、概述
Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。
遵循 Master - Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点。
-
JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,决定何时调度下一个Task(或一组Task)、对完成的 task 或执行失败做出反应等。
-
TaskManager 负责具体的任务执行和对应任务在每个节点上的资源申请和管理。
三、并行度与slot
在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。
Solt的数量通常与每个TaskManager节点的可用CPU数量成比例,一般情况下Slot的数量就是每个节点的可用CPU数量。
-
所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。
Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出 -
对于分布式执行,Flink 将算子的 subtasks 链接成 tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量
案例分析
四、Taskmanager
1、内存分配
2、内存计算2核4G
关键在于资源情况能不能抗住高峰时期每秒的数据量
案例一:全部默认
Total Process Memory总大小:taskmanager.memory.process.size: 1728m 默认
(1)JVM Metaspace:256mb(默认)
(2)JVM Overhead:192mb
taskmanager.memory.jvm-overhead.max: 1 gb
taskmanager.memory.jvm-overhead.min: 192 mb
taskmanager.memory.jvm-overhead.fraction: 0.1
公式 1728 * 0.1 = 172.8m 必须在[192m, 1g]之间,所以JVM Overhead的大小是192m
(3)Total Flink Memory:1280mb
totalFlinkMemorySize = taskmanager.memory.process.size - JVM Metaspace - JVM Overhead = 1728 - 256 - 192 = 1280mb
(4)Framework Head:128mb 默认
(5)Framework Off-Head:128mb 默认
(6)Task Off-Heap:0 默认
确定好上面这些参数后,就是最重要的三个指标的计算了:Task Heap,Network,Managed Memory
(7)Managed Memory:512mb
taskmanager.memory.managed.fraction:0.4
公式:totalFlinkMemorySize * 0.4 = 1280 * 0.4 = 512m
(8) Network:128mb
taskmanager.memory.network.max:infinite
taskmanager.memory.network.min:64 mb
taskmanager.memory.network.fraction: 0.1
公式: [64mb, 1g] 0.1 * totalFlinkMemorySize = 0.1 * 1280 = 128mb
(8) Task Heap:384mb (未设置大小)
公式:totalFlinkMemorySize
- Managed Memory
- Network
- Framework Off-Head
- Framework Head
- Task Off-Heap
= 1280 - 512 - 128 - 128 - 128 - 0 = 384m
案例二:taskHeapMemorySize设置值
参考案例一:Task Heap设置385mb,其余都不变
计算下来
Network:127mb
-
如果 Task Heap + Task Off-Heap + Framework Off-Head + Framework Head + Managed Memory > totalFlinkMemorySize异常
-
Network等于剩余的大小,之后还会check这块内存是否充足
0任务 3slot free
1任务 2slotfree 1parallelism 同时1300条消息进来
案例三:
调优原则,根据程序运行时每块内存的使用情况来调整。
五、checkpoints与故障恢复
Flink的Checkpoint机制是Flink容错能力的基本保证,能够对流处理运行时的状态进行保存,当故障发生时,能够备份的状态中还原。例如,当Flink读取kafka时,将消费的kafka offset保存下来,如果任务失败,可以从上次消费的offset之后重新消费。
使用案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpoint的时间间隔,如果状态比较大,可以适当调大该值
env.enableCheckpointing(1000);
// 配置处理语义,默认是exactly-once,确保恢复一致或者幂等性保证
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个checkpoint之间的最小时间间隔,防止因checkpoint时间过长,导致checkpoint积压
// 如果启动间隔小于Checkpoint过程时间,就相当于程序一直Checkpoint,保证了安全,消耗了资源,所以看业务场景配置。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint执行的上限时间,如果超过该阈值,则会中断checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并行执行的检查点数量,默认为1,可以指定多个,从而同时出发多个checkpoint,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设定周期性外部检查点,将状态数据持久化到外部系统中,
// 使用该方式不会在任务正常停止的过程中清理掉检查点数据
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint的存储位置,默认是存储在内存中,最大5M,不推荐使用
env.getCheckpointConfig().setCheckpointStorage("oss://vis-flink-checkpoints/userflink");
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
配置文件
execution.checkpointing.interval: 5000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
execution.checkpointing.timeout: 600000
execution.checkpointing.min-pause: 500
execution.checkpointing.max-concurrent-checkpoints: 1
state.checkpoints.num-retained: 3
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
从checkpoint中恢复
./bin/flink run -s 该应用的checkpoint路径 ...
Task 故障恢复
当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。
Flink 通过重启策略和故障恢复策略来控制 Task 重启:重启策略决定是否可以重启以及重启的间隔;故障恢复策略决定哪些 Task 需要重启。
1、Fixed Delay Restart Strategy
restart-strategy.type: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
));
2、Failure Rate Restart Strategy
故障率重启策略在故障发生之后重启作业,但是当故障率(每个时间间隔发生故障的次数)超过设定的限制时,作业会最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。
通过在 flink-conf.yaml
中设置如下配置参数,默认启用此策略。
restart-strategy.type: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
));
标签:,opt,Task,flink,taskmanager,checkpoint,docker
From: https://www.cnblogs.com/qiaoxin11/p/17387285.html