首页 > 其他分享 >从0到1Flink的成长之路(二十)-Flink 高级特性(二)之自动重启策略和恢复 ,固定延迟重启策略(开发中使用)

从0到1Flink的成长之路(二十)-Flink 高级特性(二)之自动重启策略和恢复 ,固定延迟重启策略(开发中使用)

时间:2024-07-03 09:52:52浏览次数:22  
标签:flink 设置 重启 Flink Checkpoint env 1Flink import

从0到1Flink的成长之路(二十)-Flink 高级特性(二)之自动重启策略和恢复,,固定延迟重启策略(开发中使用)

自动重启策略和恢复

1)、重启策略配置方式

配置文件

在flink-conf.yml中可以进行配置,示例如下:

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

代码中

可以在代码中针对该任务进行配置,示例如下:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重启次数
Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔
))

2)、重启策略分类

其一、默认重启策略

如果配置Checkpoint,没有配置重启策略,那么代码中出现了非致命错误时,程序会无限重启。

其二、无重启策略

Job直接失败,不会尝试进行重启
设置方式一:
restart-strategy: none
设置方式二:
无重启策略也可以在程序中设置
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

其三、固定延迟重启策略(开发中使用)

设置方式一:
重启策略可以配置flink-conf.yaml的下面配置参数来启用,作为默认的重启策略:
例子:
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
设置方式二:
也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启3次数
Time.of(10, TimeUnit.SECONDS) // 重启时间间隔
))
上面的设置表示:如果job失败,重启3次, 每次间隔10

其四、失败率重启策略(偶尔使用)

设置方式1:
失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:
例子:
restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate
-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
设置方式2:
失败率重启策略也可以在程序中设置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
))
上面的设置表示:如果5分钟内job失败不超过三次,自动重启, 每次间隔10s (如果5分钟内程序失
败超过3次,则程序退出)

3)、代码演示

package xx.xxxxxx.flink.checkpoint; 
import org.apache.flink.api.common.functions.FilterFunction; 
import org.apache.flink.api.common.functions.FlatMapFunction; 
import org.apache.flink.api.common.restartstrategy.RestartStrategies; 
import org.apache.flink.api.common.time.Time; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.runtime.state.filesystem.FsStateBackend; 
import org.apache.flink.streaming.api.CheckpointingMode; 
import org.apache.flink.streaming.api.datastream.DataStreamSource; 
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; 
import org.apache.flink.streaming.api.environment.CheckpointConfig; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.SourceFunction; 
import org.apache.flink.util.Collector; 
import java.util.concurrent.TimeUnit; 
/** 
* Flink Checkpoint定时保存状态State,设置应用失败以后,重启策略ReStart Strategy 
*/ 
public class StreamRestartStrategyDemo { 
public static void main(String[] args) throws Exception { 
// 1. 执行环境-env 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
env.setParallelism(2); 
// TODO: ================= 建议必须设置 =================== 
// a. 设置Checkpoint-State的状态后端为FsStateBackend,本地测试时使用本地路径,集群测试时使用传入的HDFS的路径 
if(args.length < 1){ 
env.setStateBackend(new FsStateBackend("file:///D:/datas/ckpt")); 
//env.setStateBackend(new FsStateBackend("hdfs://node1.itcast.cn:8020/flink-checkpoints/checkpoint")); 
}else { 
// 后续集群测试时,传入参数:hdfs://node1.itcast.cn:8020/flink-checkpoints/checkpoint 
env.setStateBackend(new FsStateBackend(args[0])) ; 
} 
/* 
b. 设置Checkpoint时间间隔为1000ms,意思是做 2 个 Checkpoint 的间隔为1000ms。 
Checkpoint 做的越频繁,恢复数据时就越简单,同时 Checkpoint 相应的也会有一些IO消耗。 
*/ 
env.enableCheckpointing(2000) ;// 默认情况下如果不设置时间checkpoint是没有开启的 
/* 
c. 设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms 
为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了 
如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
*/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// d. 设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是 false不是
env.getCheckpointConfig().setFailOnCheckpointingErrors(false); // 默认为true
/*
e. 设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值) */
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// ================= 直接使用默认的即可 ===============
// a. 设置checkpoint的执行模式为EXACTLY_ONCE(默认),注意:得需要外部支持,如Source和Sink的支持
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// b. 设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
env.getCheckpointConfig().setCheckpointTimeout(60000);
// c. 设置同一时间有多少个checkpoint可以同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 默认为1
//======================配置重启策略==============
// a. 如果设置Checkpoint,而没有配置重启策略,代码中出现了非致命错误时,程序会无限重启
// b. 配置无重启策略
env.setRestartStrategy(RestartStrategies.noRestart()) ;
// c.固定延迟重启策略(开发中使用),如下:如果有异常,每隔5s重启1次,最多3次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启3次数
Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
));
// d. 失败率重启策略(开发偶尔使用),如下:5分钟内,最多重启3次,每次间隔10
/*
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启的时间间隔
))
*/
// 2. 数据源-source
DataStreamSource<String> inputDataStream = env.addSource(new SourceFunction<String>() {
private boolean isRunning = true;
private int counter = 0 ; @Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning){
ctx.collect("flink spark");
TimeUnit.SECONDS.sleep(2);
counter += 1 ;
if(counter % 5 == 0){
throw new RuntimeException("程序程序异常啦.................") ; } }
}@Override
public void cancel() {
isRunning = false ; }
});
// 3. 数据转换-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
.filter(new FilterFunction<String>() { @Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0; }
})
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.trim().split("\\W+")) {
out.collect(Tuple2.of(word, 1));
} }
})
.keyBy(0).sum(1);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 触发执行-execute
env.execute(StreamRestartStrategyDemo.class.getSimpleName());
} }

原文链接:https://zhuanlan.zhihu.com/p/386182811

标签:flink,设置,重启,Flink,Checkpoint,env,1Flink,import
From: https://www.cnblogs.com/sunny3158/p/18281033

相关文章

  • 大数据面试题之Flink(1)
    目录Flink架构 Flink的窗口了解哪些,都有什么区别,有哪几种?如何定义? Flink窗口函数,时间语义相关的问题 介绍下Flink的watermark(水位线),watermark需要实现哪个实现类,在何处定义?有什么作用? Flink的窗口(实现)机制 说下Flink的CEP 说一说Flink的Checkpoint机制 ......
  • 大数据面试题之Flink(2)
    Flink中Checkpoint超时原因 Flink的ExactlyOnce语义怎么保证? Flink的端到端ExactlyOnce Flink的水印(Watermark),有哪几种? Flink的时间语义 Flink相比于其它流式处理框架的优点? Flink和Spark的区别?什么情况下使用Flink?有什么优点? FlinkbackPressure反压机......
  • 大数据面试题之Flink(3)
    如何确定Flink任务的合理并行度? Flink任务如何实现端到端一致? Flink如何处理背(反)压? Flink解决数据延迟的问题 Flink消费kafka分区的数据时flink件务并行度之间的关系 使用flink-client消费kafka数据还是使用flink-connector消费 如何动态修改Flink的配置,前提......
  • Ctrl + 空格 快捷键改了,重启又恢复了?
    找到【文本服务和输入语言】,切换【Ctrl+空格】的快捷键为其他,如【Ctrl+空格】当修改了快捷键后,对应的注册表项值也会发生变化(如下图),但重启后又会恢复重点:如果将如下的注册表项值也手动修改为相同的值,则重启后不会恢复......
  • FlinkCDCSQL数据同步mysql->clickhouse
    FlinkCDC(ChangeDataCapture)SQL用于实现数据库的数据变更捕获,并通过SQL接口进行处理。以下是一个基本的示例,全量+增量数据mysql同步到clickhouse,展示如何使用FlinkCDCSQL进行数据同步。首先,确保你有Flink和FlinkCDC的环境配置好。1.mysql测试source表(准备......
  • 54、Flink 测试工具测试 Flink 作业详解
    测试Flink作业a)JUnit规则MiniClusterWithClientResourceApacheFlink提供了一个名为MiniClusterWithClientResource的Junit规则,用于针对本地嵌入式小型集群测试完整的作业。叫做MiniClusterWithClientResource.要使用MiniClusterWithClientResource,需要添加......
  • 53、Flink 测试工具测试用户自定义函数详解
    1.测试用户自定义函数a)单元测试无状态、无时间限制的UDF示例:无状态的MapFunction。publicclassIncrementMapFunctionimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longrecord)throwsException{returnrecord+1;}......
  • 乌班图Ubuntu 24.04 SSH Server 修改默认端口重启无效
    试用最新的乌班图版本,常规修改ssh端口,修改完毕后重启sshd提示没有找到service,然后尝试去掉d重启ssh后查看状态,端口仍然是默认的22,各种尝试都试了不行,重启服务器后倒是端口修改成功了,心想着不能每台机器都重启吧。百思不得其解后查看官网相关(机翻)意思就是22.10之后的版本使用方......
  • 【Flink metric(3)】chunjun是如何实现脏数据管理的
    文章目录一.基础逻辑二.DirtyManager1.初始化2.收集脏数据并check3.关闭资源三.DirtyDataCollector1.初始化2.收集脏数据并check3.run:消费脏数据4.释放资源四.LogDirtyDataCollector一.基础逻辑脏数据管理模块的基本逻辑是:当数据消费失败时,将脏数据......
  • Ctrl + 空格 快捷键改了,重启又恢复了?
    ​​找到【文本服务和输入语言】,切换【Ctrl+空格】的快捷键为其他,如【Ctrl+空格】当修改了快捷键后,对应的注册表项值也会发生变化(如下图),但重启后又会恢复 重点:如果将如下的注册表项值也手动修改为相同的值,则重启后不会恢复 ......