首页 > 其他分享 >Flink-checkpoint配置及重启策略

Flink-checkpoint配置及重启策略

时间:2022-09-20 21:36:42浏览次数:59  
标签:重启 Flink getCheckpointConfig checkpoint RestartStrategies env 间隔

Flink-checkpoint配置及重启策略

val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //--------------- checkpoint配置 ----------------
    env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage("hdfs://localhost:9083/flink/checkpoint")
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 执行checkpoint,JobManager向source发送barrier直到checkpoint生成的最大时间间隔
    env.getCheckpointConfig.setCheckpointTimeout(60000L)
    // 最大并行checkpoint,最多允许出现几个checkpoint
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    /**
      * 两次checkpoint最小间隔时间(尾到头的时间),如果两次CP间隔为1s,第一次cp耗时800ms,
      * 为保证两次最小间隔为500ms,第二次cp需要向后推移300ms
      * 配置了最小间隔,会使setMaxConcurrentCheckpoints配置失效
      */
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    /**
      * 倾向于从checkpoint中进行恢复,即使savepoint数据比checkpoint更近
      * 但是此配置Flink官方已经不推荐使用【https://issues.apache.org/jira/browse/FLINK-20427】
      * 因为使用这种方式会造成数据重复sink,在一些较为严谨的使用场景下,会造成数据异常
      * 不建议使用
      */
    //env.getCheckpointConfig.setPreferCheckpointForRecovery(true)

    /**
      * 最多容忍几次checkpoint失败
      * 如果为0则不容忍任何checkpoint失败,checkpoint失败即任务失败
      */
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(5)

    //--------------- 重启策略 ----------------
    /**
      * RestartStrategies.RestartStrategyConfiguration fixedDelayRestart(restartAttempts: Int, delayBetweenAttempts: Long)
      * restartAttempts:尝试重启次数
      * delayBetweenAttempts:重启间隔
      * RestartStrategies.fixedDelayRestart(3, 60000L): restartAttempts为尝试重启次数,delayBetweenAttempts为重启间隔
      */
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000L))

    /**
      * RestartStrategies.FailureRateRestartStrategyConfiguration failureRateRestart(failureRate: Int, failureInterval: Time, delayInterval: Time)
      * failureRate:失败次数
      * failureInterval:失败时间间隔(总的)
      * delayInterval:两次尝试重启的时间间隔
      */
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.minutes(5), Time.seconds(10)))

 

标签:重启,Flink,getCheckpointConfig,checkpoint,RestartStrategies,env,间隔
From: https://www.cnblogs.com/EnzoDin/p/16712609.html

相关文章

  • linux用户管理,关机重启
    1.关机重启shoutdown-hnow立即关机shudown-h1一分钟后关机shudown-rnow重启halt关机reboot重启sync把内存的数据同步到磁盘注意:关机和重启首先要......
  • k8s 容器自动重启 错误 代码
    参考文章https://betterprogramming.pub/understanding-docker-container-exit-codes-5ee79a1d58f6ExitCodesCommonexitcodesassociatedwithdockercontainersar......
  • 重启右脑
    一脑一世界精于计算,富于想象;关注细节,望到远景;遵循规则,寻找可能;专注事实,探寻意义;保守控制,开放尝试;喜用逻辑,善借感知;单线分析,看到关联;强于数理,富有审美。......
  • springboot+Flink 接收、处理数据20220919
     1、pom.xml<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot......
  • 设置Resin服务器定时重启(一次性)
    说明:【】中的内容为需替换成你自己环境下的内容1.新建一个bat后缀的文件,例如restart.bat。restart.bat文件内容为【resin.exe服务器所在的全路径】/resin.exerestart......
  • 【Azure 事件中心】Flink消费Event Hub中事件, 使用Azure默认示例代码,始终获取新产生
    问题描述根据AzureEventHub示例文档,[将ApacheFlink与适用于ApacheKafka的Azure事件中心配合使用],配置好 consumer.config文件后,为什么不能自动消费EventHub......
  • Windows fiarwall 启动失败(0x8007042c)重启防火墙
    1.请把下面代码保存为【Repair.bat】2.右键点击【以管理员身份运行】3.重启机器,看看效果4.同时按【Win+R】键,输入【services.msc】,在【服务】里,尝试启动Windows......
  • Flink基础概念入门
    Flink概述什么是Flink    ApacheApacheFlink是一个开源的流处理框架,应用于分布式、高性能、高可用的数据流应用程序。可以处理有限数据流和无限数据,即能够处理......
  • Flink的时间和窗口
    Flink中的时间及时流处理是有状态流处理的扩展,实现及时流处理的时间起到了很大的作用。在Flink的时间概念中主要分为下面两种:事件时间:事件时间是每个单独事件在其生......
  • 第 17 题:A、B 机器正常连接后,B 机器突然重启,问 A 此时处于 TCP 什么状态
    如果A与B建立了正常连接后,从未相互发过数据,这个时候B突然机器重启,问A此时处于TCP什么状态?如何消除服务器程序中的这个状态?  问题定义A->B发起TCP请求,A......