Flink检查点常用配置:
//配置检查点 env.enableCheckpointing(180000); // 开启checkpoint 每180000ms 一次 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);// 确认 checkpoints 之间的时间会进行 50000 ms env.getCheckpointConfig().setCheckpointTimeout(600000); //设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置有且仅有一次模式 目前支持EXACTLY_ONCE/AT_LEAST_ONCE env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置并发checkpoint的数目 env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink-checkpoints/oracle/AC_SUB_REGIST_INFO"); // 这个是存放到hdfs目录下 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 开启在 job 中止后仍然保留的 externalizedcheckpoints env.getCheckpointConfig().enableUnalignedCheckpoints();// 开启checkpoints
Checkpoint与State的关系
State 是 Checkpoint 所做的主要持久化备份的主要数据,而 Checkpoint 是从 source 触发到下游所有节点完成的一次全局操作。
Flink任务恢复,可以从Checkpoint或者savepoint进行实时任务数据恢复;Checkpoint 的实现算法
-
基于 Chandy-Lamport 算法的分布式快照
-
将检查点的保存和数据处理分开,不暂停整个应用
-
检查点分界线(Checkpoint Barrier)
- Flink 的检查点算法用到了一种为分界线(barrier)的特殊数据形式,用来把一条流上的数据按照不同的检查点分开
- Flink 会定时在任务的 Source Task 触发 barrier,barrier是一种特殊的消息事件,会随着消息通道流入到下游的算子中
- barrier 之前到来的数据导致的状态更改,都会被包含在当前 barrier 所属的检查点中
- barrier 之后的数据导致的所有更改,就会被包含在之后的检查点中
- 在某些算子的 Task 有多个输入时,会存在 Barrier 对齐时间,我们可以在Web UI上面看到各个 Task 的Barrier 对齐时间
- 只有当最后 Sink 端的算子接收到 Barrier 并确认该次 Checkpoint 完成时,该次 Checkpoint 才算完成
从检查点启动示例
从checkpoint启动示例:./bin/flink run -s hdfs://ip:8020/user/xx/chk-35 -c xxx_demo ./xxx.jar
从savepoint启动参考:Flink保留savepoint,并从savepoint启动示例
其他参考: Flink 容错机制 保存点和检查点 Flink Checkpoint 原理流程以及常见失败原因分析 标签:Checkpoint,barrier,Flink,getCheckpointConfig,容错,检查点,env From: https://www.cnblogs.com/-courage/p/17583340.html