首页 > 其他分享 >Flink(十一): DataStream API (八) Checkpointing

Flink(十一): DataStream API (八) Checkpointing

时间:2025-01-23 17:03:16浏览次数:3  
标签:DataStream 快照 barrier Flink checkpoint API 检查点 算子 对齐

1. Checkpointing

Flink 容错机制的核心部分是对分布式数据流和操作符状态绘制一致的快照。这些快照作为一致的检查点,系统可以在发生故障时回退到这些检查点。Flink 绘制这些快照的机制在分布式数据流的轻量级异步快照中有详细描述。该机制受标准的 Chandy-Lamport 算法启发,专门针对 Flink 的执行模型进行了定制。请记住,与检查点相关的所有操作都可以异步进行。检查点barriers不会同步传输,操作可以异步地快照其状态。自 Flink 1.11 版本起,检查点可以选择是否进行对齐。

1.1 Barriers

Flink 分布式快照的核心元素是barrier。这些barrier被注入到数据流中,并与记录一起流动,作为数据流的一部分。屏障永远不会超过记录,它们严格按顺序流动。一个barrier将数据流中的记录分为两部分:一部分进入当前快照,另一部分进入下一个快照。每个barrier携带其前面推动的记录所属的快照 ID。barrier不会中断数据流的传输,因此非常轻量。来自不同快照的多个barrier可以同时存在于数据流中,这意味着多个快照可以并发发生。

barrier被注入到并行数据流的流源中。快照 n 的屏障注入点(我们称之为 Sn)是源流中快照覆盖的数据的终止位置。例如,在 Apache Kafka 中,这个位置将是分区中最后一条记录的偏移量。这个位置 Sn 会被报告给检查点协调器(Flink 的 JobManager)。

这些barrier随后向下游传播。当一个中间算子从它的所有输入流中接收到快照 n 的屏障后,它会将快照 n 的屏障发送到所有的输出流中。一旦一个sink算子(流处理 DAG 的终点)从它的所有输入流中接收到快照 n 的barrier,它会向检查点协调器确认快照 n。当所有的sink算子都确认了一个快照时,该快照被认为已完成。

一旦快照 n 完成,作业将不会再向源请求快照 n 之前的记录,因为此时这些记录(以及它们的派生记录)已经通过了整个数据流。

接收多个输入流的算子需要在快照屏障上对输入流进行对齐。上图说明了这一点:一旦算子从某个输入流接收到快照 n 的屏障,它就不能再处理该流中的任何后续记录,直到它也从其他输入流接收到快照 n 的屏障。否则,它会混合属于快照 n 的记录和属于快照 n+1 的记录。

当最后一个输入流接收到快照 n 的屏障时,算子会将所有待处理的输出记录发送出去,然后自己也发送快照 n 的屏障。接下来,算子会快照其状态,并恢复处理所有输入流中的记录,优先处理输入缓冲区中的记录,然后才处理来自流中的记录。最后,算子会异步将状态写入状态后端。

需要注意的是,所有具有多个输入的算子以及在洗牌后消费多个上游子任务输出流的算子都需要进行对齐。

1.2 Snapshotting Operator State

当算子包含任何形式的状态时,这些状态也必须包含在快照中。算子会在收到来自输入流的所有快照barrier后,并在将barrier发送到输出流之前,快照其状态。在这个时刻,所有来自barrier之前记录的状态更新都已经完成,并且没有依赖于barrier之后记录的更新。由于快照的状态可能很大,它会存储在一个可配置的状态后端中。默认情况下,这是 JobManager 的内存,但在生产环境中应该配置分布式可靠存储(如 HDFS)。状态存储完成后,算子确认检查点,发送快照屏障到输出流,并继续处理。

生成的快照现在包含以下内容:

  • 对于每个并行流数据源,快照开始时该流中的偏移量/位置
  • 对于每个算子,指向作为快照一部分存储的状态的指针

1.3 Recovery

在这种机制下,恢复过程非常简单:发生故障时,Flink 会选择最近完成的检查点 k。系统随后重新部署整个分布式数据流,并将检查点 k 的状态分配给每个算子。数据源则会从位置 Sk开始读取流。例如,在 Apache Kafka 中,这意味着通知消费者从偏移量 Sk开始获取数据。如果状态是以增量方式快照的,算子会从最新的完整快照状态开始,并对该状态应用一系列增量快照更新。

2. Unaligned Checkpointing

检查点也可以以非对齐的方式执行。其基本思想是,只要处理中的数据(in-flight data)成为算子状态的一部分,检查点就可以越过所有的处理数据。需要注意的是,这种方法实际上更接近于 Chandy-Lamport 算法,但 Flink 仍然会在数据源中插入barrier,以避免检查点协调器的过载。

下图描述了算子如何处理非对齐检查点的屏障:

  • 算子对存储在其输入缓冲区中的第一个barrier作出反应。
  • 它会立即将barrier转发到下游算子,在输出缓冲区的末尾添加该barrier
  • 算子标记所有被越过的记录(overtaken records)以异步存储,并对其自身状态创建快照。
  • 因此,算子只会短暂地暂停输入处理,用于标记缓冲区、转发barrier,以及创建其其他状态的快照。

非对齐检查点能够确保barrier尽可能快地到达sink。这特别适用于存在至少一条缓慢数据路径的应用场景,在这些场景中,常规对齐的时间可能需要几个小时。然而,由于这种方法增加了额外的 I/O 压力,当状态后端的 I/O 是瓶颈时,它并不能提供帮助。需要注意的是,保存点savepoints始终是对齐的

2.1 Unaligned Recovery

在非对齐检查点中,算子会在开始处理任何来自上游算子的输入数据之前,优先恢复处理中的数据(in-flight data)。除此之外,其恢复过程与对齐检查点的恢复步骤相同。

3. Savepoints

所有使用检查点的程序都可以从保存点(savepoint)恢复执行。保存点允许在不丢失任何状态的情况下更新程序或 Flink 集群。保存点是手动触发的检查点,会对程序进行快照并将其写入状态后端。保存点依赖于常规的检查点机制来实现。保存点与检查点类似,不同之处在于它们由用户触发,并且在较新的检查点完成后不会自动过期。为了正确使用保存点,

4. Exactly Once vs. At Least Once

对齐步骤可能会为流式程序增加延迟。通常,这种额外延迟只有几毫秒,但我们也遇到过某些异常情况下延迟明显增加的情况。对于要求所有记录都具备超低延迟(几毫秒)的应用,Flink 提供了一个选项,可以在检查点期间跳过流对齐。检查点快照仍会在算子从每个输入接收到检查点屏障后立即生成。当跳过对齐时,即使某些检查点 n 的屏障已经到达,算子仍会继续处理所有输入流。这样,算子在为检查点 n生成状态快照之前,也会处理属于检查点 n+1的数据。在恢复时,这些记录可能会出现重复,因为它们既包含在检查点 n的状态快照中,又会作为检查点 n之后的数据重新播放。对齐仅发生在具有多个前驱(如 join 操作)的算子以及具有多个发送者(例如流重分区/洗牌后)的算子中。因此,仅包含完全并行流式操作(如 map()flatMap()filter() 等)的数据流,即使在至少一次(at-least-once)模式下,也可以实现精确一次(exactly-once)的保证。

5. State and Fault Tolerance in Batch Programs

BATCH 执行模式中,Flink 将批处理程序作为流处理程序的一种特殊情况来执行,此时流是有界的(即元素数量有限)。因此,上述概念同样适用于批处理程序,就像它们适用于流处理程序一样,但有一些小的例外:

  1. 批处理程序的容错机制不使用检查点(checkpointing)。恢复是通过完全重放流来实现的。这是可能的,因为输入是有界的。这样做的好处是将成本更多地转移到恢复阶段,同时降低了常规处理的成本,因为避免了创建检查点的开销。

  2. 在批处理执行模式下,状态后端使用的是简化的内存/外存数据结构,而不是键/值索引。

6. 开启与配置 Checkpoint

默认情况下 checkpoint 是禁用的。通过调用 

StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

Checkpoint 其他的属性包括:

  • Checkpoint 存储: 你可以设置检查点快照的持久化位置。默认情况下,Flink将使用JobManager的堆。建议在生产部署中改为使用持久性文件系统。 有关作业范围和集群范围配置的可用选项的更多详细信息,请参阅Checkpoint 存储
  • 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
  • checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。
  • checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
  • 往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。注意这个值也意味着并发 checkpoint 的数目是
  • checkpoint 可容忍连续失败次数:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 可容忍的checkpoint失败仅适用于下列情形:Job Manager的IOException,TaskManager做checkpoint时异步部分的失败, checkpoint超时等。TaskManager做checkpoint时同步部分的失败会直接触发作业fail over。其它的checkpoint失败(如一个checkpoint被另一个checkpoint包含)会被忽略掉。
  • 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。该选项不能和 “checkpoints 间的最小时间"同时使用。
  • externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 保留 checkpoints 的部署官方文档
  • 非对齐 checkpoints: 你可以启用非对齐 checkpoints 以在背压时大大减少创建checkpoint的时间。这仅适用于精确一次(exactly-once)checkpoints 并且只有一个并发检查点。
  • 部分任务结束的 checkpoints: 默认情况下,即使DAG的部分已经处理完它们的所有记录,Flink也会继续执行 checkpoints。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointRetention(
        ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);

// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();

6.1 任务结束前等待最后一次 Checkpoint

为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish() 方法后等待下一次 checkpoint 成功后退出。 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE,那么任务永远不会结束,因为下一次 checkpoint 不会进行。

标签:DataStream,快照,barrier,Flink,checkpoint,API,检查点,算子,对齐
From: https://blog.csdn.net/weixin_41914554/article/details/145244013

相关文章

  • pyinstaller package fastapi application with Gunicorn
    使用Gunicorn部署FastAPI应用程序:快速而强大的组合https://juejin.cn/post/7348003004123463717本地部署本地开发调试过程中,我通常是这样启动Fastapi服务的在终端中运行:uvicornmain:app--host0.0.0.0--port80当然,也可以python脚本启动:importuvicorn​uvi......
  • APISIX-API服务网关
    一、简介apisix是一款云原生微服务API网关,可以为API提供终极性能、安全性、开源和可扩展的平台。apisix基于Nginx和etcd实现,与传统API网关相比,apisix具有动态路由和插件热加载,特别适合微服务系统下的API管理。Apisix的诞生主要是为了是解决Nginx的动态配置问题以及网关功能扩......
  • Apache Flink 2.0介绍与部署(最新版本)
    软件概述ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。在德语中,Flink一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为logo,这不仅是因为松鼠具有快速和灵巧......
  • 国税发票查验-电子发票真伪查验API查验结果示例
    现如今,随着发票在业务场景中的广泛使用与电子发票的不断普及,企业面临的挑战之一是如何确保财务数据的高效与准确。在面对大量的财务发票时,通过传统手动录入与核对发票真伪的方法让财务工作者头疼不已,熬夜加班是财务的常态,且手动核验的方式极易出现人为误差,导致虚假发票进入......
  • C# WebAPI 插件热插拔
    背景WebAPI插件热插拔是指在不重启应用程序的情况下,能够动态地加载、更新或卸载功能模块(即插件)的能力。这种设计模式在软件开发中非常有用,尤其是在需要频繁更新或扩展功能的大型系统中。通过实现插件架构,可以将系统的不同部分解耦,使得它们可以独立开发、测试和部署。对于WebAPI......
  • docker-py:在Python中轻松使用Docker引擎API,更加灵活地管理和使用容器性
    Docker是一种流行的容器技术,让开发者能够在各种环境中快速地构建、部署和管理应用程序。而docker-py是一个强大的Python库,可以让你通过Python代码与Docker引擎API进行互动,实现与Docker命令相同的功能。本文将详细介绍docker-py的安装、使用以及一些常见的操作示例,帮助你更好地利用......
  • 【Java开发】magic-api:一个Java接口快速开发框架
    今天给小伙伴们介绍一个Java接口快速开发框架-magic-api简介magic-api是一个基于Java的接口快速开发框架,编写接口将通过magic-api提供的UI界面完成,自动映射为HTTP接口,无需定义Controller、Service、Dao、Mapper、XML、VO等Java对象即可完成常见的HTTPAPI接口开......
  • 混元API的加密机制与原生集成实战
    今天,我们将重点讨论在对接混元大模型时需要特别关注的几个要点。首先,最为关键的一点是,混元大模型的加密方式相比于其他大模型更为复杂和严密。在对接过程中,我们通常避免使用混元官方提供的SDK进行集成,主要是因为官方SDK的应用场景存在一定的限制。若能实现原生对接,将能够提供更加......
  • asyncAPI
    async.cu#include<stdio.h>#include<cuda_runtime.h>#include<cuda_profiler_api.h>template<typenameT>voidcheck(Tresult,charconst*constfunc,constchar*constfile,intconstline){if(result){......
  • API 设计之禅
    API设计之禅译者按:本文翻译自HowtodesignagoodAPIandwhyitmatters。根据笔者经历,很多大厂程序员所写的代码和大厂内部封装的各种中间件、类库,毫不客气地说,90%都是没有经过仔细考虑的,经常有各种各样的性能、拓展、可读性、一致性等问题。本文总结深刻,建议反复阅读学习......