首页 > 其他分享 >1

1

时间:2023-05-10 10:58:22浏览次数:37  
标签: opt Task flink taskmanager checkpoint docker

flink

一、搭建

1、standalone模式

参考: https://blog.51cto.com/u_11409186/5743198#_Toc85293738

1、jobmanager

1.1 先拉取镜像

docker pull flink:latest

1.2 查看端口占用

netstat -anp | grep 8081

1.3 docker-compose.yml脚本创建

​ 先搭建一个简单的容器,没有volumes;

​ 然后在宿主机准备好文件夹,通过docker cp将文件夹先拉出来,不需要拉取log文件。

docker cp flink-jobmanager:/opt/flink/conf /opt/flink-jobmanager/

​ 然后删除容器,docker-compose.yml添加文件映射。

version: "3"
services:
  jobmanager:
    image: flink:1.17.0
    ports:
      - "8088:8081"
      - "6123:6123"
      - "6124:6124"
      - "6125:6125"
      - "50100-50200:50100-50200"
    volumes:
      - /opt/flink-jobmanager/conf:/opt/flink/conf
      - /opt/flink-jobmanager/log:/opt/flink/log
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=192.168.31.56
    container_name: flink-jobmanager
    restart: always

1.4 启动

docker-compose up -d

1.5 进入容器修改权限

修改文件夹所属用户和用户组

chown flink:flink conf

chown flink:flink log

1.6 重启容器

docker restart flink-jobmanager

1.7 修改conf下workes文件,改为taskmanager的ip,集群往下添加

vim workers

image-20230423114925323

1.8 修改flink.conf,添加

metrics.internal.query-service.port: 50100-50130
taskmanager.rpc.port: 50140-50170

1.9 重启

docker restart flink-jobmanager

2、taskmanager

2.1 先拉取镜像

docker pull flink:latest

2.2 docker-compose.yml脚本创建

​ 先搭建一个简单的容器,没有volumes;

​ 然后在宿主机准备好文件夹,通过docker cp将文件夹先拉出来,不需要拉取log文件。

docker cp flink-jobmanager:/opt/flink/conf /opt/flink-jobmanager/

​ 然后删除容器,docker-compose.yml添加文件映射。

version: "3"
services:
  taskmanager:
    image: flink:1.17.0
    network_mode: host
    command: taskmanager
    volumes:
      - /opt/flink-taskmanager/conf:/opt/flink/conf
      - /opt/flink-taskmanager/log:/opt/flink/log
      - /opt/flink-taskmanager/logs:/opt/flink/logs
    environment:
      - JOB_MANAGER_RPC_ADDRESS=192.168.31.56
    container_name: flink-taskmanager01
    restart: always

2.3 启动

docker-compose up -d

2.4 进入容器修改权限

修改文件夹所属用户和用户组

chown flink:flink conf
chown flink:flink log
chown flink:flink logs

2.5 重启容器

2.6 修改flink.conf,添加

metrics.internal.query-service.port: 50100-50130
taskmanager.rpc.port: 50140-50170

2.7 重启容器

二、概述

image-20230427112709627

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager

遵循 Master - Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点。

  • JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,决定何时调度下一个Task(或一组Task)、对完成的 task 或执行失败做出反应等。

  • TaskManager 负责具体的任务执行和对应任务在每个节点上的资源申请和管理。

三、并行度与slot

在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。

Solt的数量通常与每个TaskManager节点的可用CPU数量成比例,一般情况下Slot的数量就是每个节点的可用CPU数量。

  • 所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。
    Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出

  • 对于分布式执行,Flink 将算子的 subtasks 链接tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量

image-20230504173245672

案例分析

image-20230504102209450

image-20230504102226852

image-20230504102240641

image-20230504102247950

四、Taskmanager

1、内存分配

image-20230504172558281

2、内存计算2核4G

关键在于资源情况能不能抗住高峰时期每秒的数据量

案例一:全部默认

image-20230427155242893

Total Process Memory总大小:taskmanager.memory.process.size: 1728m 默认

(1)JVM Metaspace:256mb(默认)

(2)JVM Overhead:192mb

	taskmanager.memory.jvm-overhead.max: 1 gb
	taskmanager.memory.jvm-overhead.min: 192 mb
	taskmanager.memory.jvm-overhead.fraction: 0.1
	公式 1728 * 0.1 = 172.8m 必须在[192m, 1g]之间,所以JVM Overhead的大小是192m

(3)Total Flink Memory:1280mb

totalFlinkMemorySize = taskmanager.memory.process.size - JVM Metaspace - JVM Overhead = 1728 - 256 - 192 = 1280mb

(4)Framework Head:128mb 默认

(5)Framework Off-Head:128mb 默认

(6)Task Off-Heap:0 默认

确定好上面这些参数后,就是最重要的三个指标的计算了:Task Heap,Network,Managed Memory

(7)Managed Memory:512mb

taskmanager.memory.managed.fraction:0.4
公式:totalFlinkMemorySize * 0.4 = 1280 * 0.4 = 512m

(8) Network:128mb

taskmanager.memory.network.max:infinite
taskmanager.memory.network.min:64 mb
taskmanager.memory.network.fraction: 0.1
公式: [64mb, 1g] 0.1 * totalFlinkMemorySize = 0.1 * 1280 = 128mb

(8) Task Heap:384mb (未设置大小)

公式:totalFlinkMemorySize 
			- Managed Memory 
			- Network 
			- Framework Off-Head
			- Framework Head 
			- Task Off-Heap
	= 1280 - 512 - 128 - 128 - 128 - 0 = 384m

案例二:taskHeapMemorySize设置值

image-20230427161618825

参考案例一:Task Heap设置385mb,其余都不变

计算下来

Network:127mb

  • 如果 Task Heap + Task Off-Heap + Framework Off-Head + Framework Head + Managed Memory > totalFlinkMemorySize异常

  • Network等于剩余的大小,之后还会check这块内存是否充足

0任务 3slot free

image-20230427174557411

1任务 2slotfree 1parallelism 同时1300条消息进来

image-20230427175653465

image-20230428134527002

image-20230428134615442

image-20230428134656689

image-20230428134733891

案例三:

image-20230428141725412

调优原则,根据程序运行时每块内存的使用情况来调整。

image-20230506110153663

五、checkpoints与故障恢复

Flink的Checkpoint机制是Flink容错能力的基本保证,能够对流处理运行时的状态进行保存,当故障发生时,能够备份的状态中还原。例如,当Flink读取kafka时,将消费的kafka offset保存下来,如果任务失败,可以从上次消费的offset之后重新消费。

使用案例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
 
// checkpoint的时间间隔,如果状态比较大,可以适当调大该值 
env.enableCheckpointing(1000); 
// 配置处理语义,默认是exactly-once,确保恢复一致或者幂等性保证
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
// 两个checkpoint之间的最小时间间隔,防止因checkpoint时间过长,导致checkpoint积压 
// 如果启动间隔小于Checkpoint过程时间,就相当于程序一直Checkpoint,保证了安全,消耗了资源,所以看业务场景配置。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
// checkpoint执行的上限时间,如果超过该阈值,则会中断checkpoint 
env.getCheckpointConfig().setCheckpointTimeout(60000); 
// 最大并行执行的检查点数量,默认为1,可以指定多个,从而同时出发多个checkpoint,提升效率 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
// 设定周期性外部检查点,将状态数据持久化到外部系统中, 
// 使用该方式不会在任务正常停止的过程中清理掉检查点数据 
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint的存储位置,默认是存储在内存中,最大5M,不推荐使用
env.getCheckpointConfig().setCheckpointStorage("oss://vis-flink-checkpoints/userflink");
  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。

配置文件

execution.checkpointing.interval: 5000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
execution.checkpointing.timeout: 600000
execution.checkpointing.min-pause: 500
execution.checkpointing.max-concurrent-checkpoints: 1
state.checkpoints.num-retained: 3
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

从checkpoint中恢复

./bin/flink run -s 该应用的checkpoint路径 ...

Task 故障恢复

当 Task 发生故障时,Flink 需要重启出错的 Task 以及其他受到影响的 Task ,以使得作业恢复到正常执行状态。

Flink 通过重启策略和故障恢复策略来控制 Task 重启:重启策略决定是否可以重启以及重启的间隔;故障恢复策略决定哪些 Task 需要重启。

1、Fixed Delay Restart Strategy

restart-strategy.type: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 尝试重启的次数
  Time.of(10, TimeUnit.SECONDS) // 延时
));

2、Failure Rate Restart Strategy

故障率重启策略在故障发生之后重启作业,但是当故障率(每个时间间隔发生故障的次数)超过设定的限制时,作业会最终失败。 在连续的两次重启尝试之间,重启策略等待一段固定长度的时间。

通过在 flink-conf.yaml 中设置如下配置参数,默认启用此策略。

restart-strategy.type: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每个时间间隔的最大故障次数
  Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
  Time.of(10, TimeUnit.SECONDS) // 延时
));

标签:,opt,Task,flink,taskmanager,checkpoint,docker
From: https://www.cnblogs.com/qiaoxin11/p/17387285.html

相关文章