1. Checkpointing
Flink 容错机制的核心部分是对分布式数据流和操作符状态绘制一致的快照。这些快照作为一致的检查点,系统可以在发生故障时回退到这些检查点。Flink 绘制这些快照的机制在分布式数据流的轻量级异步快照中有详细描述。该机制受标准的 Chandy-Lamport 算法启发,专门针对 Flink 的执行模型进行了定制。请记住,与检查点相关的所有操作都可以异步进行。检查点barriers不会同步传输,操作可以异步地快照其状态。自 Flink 1.11 版本起,检查点可以选择是否进行对齐。
1.1 Barriers
Flink 分布式快照的核心元素是barrier。这些barrier被注入到数据流中,并与记录一起流动,作为数据流的一部分。屏障永远不会超过记录,它们严格按顺序流动。一个barrier将数据流中的记录分为两部分:一部分进入当前快照,另一部分进入下一个快照。每个barrier携带其前面推动的记录所属的快照 ID。barrier不会中断数据流的传输,因此非常轻量。来自不同快照的多个barrier可以同时存在于数据流中,这意味着多个快照可以并发发生。
barrier被注入到并行数据流的流源中。快照 n 的屏障注入点(我们称之为 Sn)是源流中快照覆盖的数据的终止位置。例如,在 Apache Kafka 中,这个位置将是分区中最后一条记录的偏移量。这个位置 Sn 会被报告给检查点协调器(Flink 的 JobManager)。
这些barrier随后向下游传播。当一个中间算子从它的所有输入流中接收到快照 n 的屏障后,它会将快照 n 的屏障发送到所有的输出流中。一旦一个sink算子(流处理 DAG 的终点)从它的所有输入流中接收到快照 n 的barrier,它会向检查点协调器确认快照 n。当所有的sink算子都确认了一个快照时,该快照被认为已完成。
一旦快照 n 完成,作业将不会再向源请求快照 n 之前的记录,因为此时这些记录(以及它们的派生记录)已经通过了整个数据流。
接收多个输入流的算子需要在快照屏障上对输入流进行对齐。上图说明了这一点:一旦算子从某个输入流接收到快照 n 的屏障,它就不能再处理该流中的任何后续记录,直到它也从其他输入流接收到快照 n 的屏障。否则,它会混合属于快照 n 的记录和属于快照 n+1 的记录。
当最后一个输入流接收到快照 n 的屏障时,算子会将所有待处理的输出记录发送出去,然后自己也发送快照 n 的屏障。接下来,算子会快照其状态,并恢复处理所有输入流中的记录,优先处理输入缓冲区中的记录,然后才处理来自流中的记录。最后,算子会异步将状态写入状态后端。
需要注意的是,所有具有多个输入的算子以及在洗牌后消费多个上游子任务输出流的算子都需要进行对齐。
1.2 Snapshotting Operator State
当算子包含任何形式的状态时,这些状态也必须包含在快照中。算子会在收到来自输入流的所有快照barrier后,并在将barrier发送到输出流之前,快照其状态。在这个时刻,所有来自barrier之前记录的状态更新都已经完成,并且没有依赖于barrier之后记录的更新。由于快照的状态可能很大,它会存储在一个可配置的状态后端中。默认情况下,这是 JobManager 的内存,但在生产环境中应该配置分布式可靠存储(如 HDFS)。状态存储完成后,算子确认检查点,发送快照屏障到输出流,并继续处理。
生成的快照现在包含以下内容:
- 对于每个并行流数据源,快照开始时该流中的偏移量/位置
- 对于每个算子,指向作为快照一部分存储的状态的指针
1.3 Recovery
在这种机制下,恢复过程非常简单:发生故障时,Flink 会选择最近完成的检查点 k。系统随后重新部署整个分布式数据流,并将检查点 k 的状态分配给每个算子。数据源则会从位置 Sk开始读取流。例如,在 Apache Kafka 中,这意味着通知消费者从偏移量 Sk开始获取数据。如果状态是以增量方式快照的,算子会从最新的完整快照状态开始,并对该状态应用一系列增量快照更新。
2. Unaligned Checkpointing
检查点也可以以非对齐的方式执行。其基本思想是,只要处理中的数据(in-flight data)成为算子状态的一部分,检查点就可以越过所有的处理数据。需要注意的是,这种方法实际上更接近于 Chandy-Lamport 算法,但 Flink 仍然会在数据源中插入barrier,以避免检查点协调器的过载。
下图描述了算子如何处理非对齐检查点的屏障:
- 算子对存储在其输入缓冲区中的第一个barrier作出反应。
- 它会立即将barrier转发到下游算子,在输出缓冲区的末尾添加该barrier。
- 算子标记所有被越过的记录(overtaken records)以异步存储,并对其自身状态创建快照。
- 因此,算子只会短暂地暂停输入处理,用于标记缓冲区、转发barrier,以及创建其其他状态的快照。
非对齐检查点能够确保barrier尽可能快地到达sink。这特别适用于存在至少一条缓慢数据路径的应用场景,在这些场景中,常规对齐的时间可能需要几个小时。然而,由于这种方法增加了额外的 I/O 压力,当状态后端的 I/O 是瓶颈时,它并不能提供帮助。需要注意的是,保存点savepoints始终是对齐的。
2.1 Unaligned Recovery
在非对齐检查点中,算子会在开始处理任何来自上游算子的输入数据之前,优先恢复处理中的数据(in-flight data)。除此之外,其恢复过程与对齐检查点的恢复步骤相同。
3. Savepoints
所有使用检查点的程序都可以从保存点(savepoint)恢复执行。保存点允许在不丢失任何状态的情况下更新程序或 Flink 集群。保存点是手动触发的检查点,会对程序进行快照并将其写入状态后端。保存点依赖于常规的检查点机制来实现。保存点与检查点类似,不同之处在于它们由用户触发,并且在较新的检查点完成后不会自动过期。为了正确使用保存点,
4. Exactly Once vs. At Least Once
对齐步骤可能会为流式程序增加延迟。通常,这种额外延迟只有几毫秒,但我们也遇到过某些异常情况下延迟明显增加的情况。对于要求所有记录都具备超低延迟(几毫秒)的应用,Flink 提供了一个选项,可以在检查点期间跳过流对齐。检查点快照仍会在算子从每个输入接收到检查点屏障后立即生成。当跳过对齐时,即使某些检查点 n 的屏障已经到达,算子仍会继续处理所有输入流。这样,算子在为检查点 n生成状态快照之前,也会处理属于检查点 n+1的数据。在恢复时,这些记录可能会出现重复,因为它们既包含在检查点 n的状态快照中,又会作为检查点 n之后的数据重新播放。对齐仅发生在具有多个前驱(如 join 操作)的算子以及具有多个发送者(例如流重分区/洗牌后)的算子中。因此,仅包含完全并行流式操作(如 map()
、flatMap()
、filter()
等)的数据流,即使在至少一次(at-least-once)模式下,也可以实现精确一次(exactly-once)的保证。
5. State and Fault Tolerance in Batch Programs
在 BATCH 执行模式中,Flink 将批处理程序作为流处理程序的一种特殊情况来执行,此时流是有界的(即元素数量有限)。因此,上述概念同样适用于批处理程序,就像它们适用于流处理程序一样,但有一些小的例外:
-
批处理程序的容错机制不使用检查点(checkpointing)。恢复是通过完全重放流来实现的。这是可能的,因为输入是有界的。这样做的好处是将成本更多地转移到恢复阶段,同时降低了常规处理的成本,因为避免了创建检查点的开销。
-
在批处理执行模式下,状态后端使用的是简化的内存/外存数据结构,而不是键/值索引。
6. 开启与配置 Checkpoint
默认情况下 checkpoint 是禁用的。通过调用
StreamExecutionEnvironment
的 enableCheckpointing(n)
来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。
Checkpoint 其他的属性包括:
- Checkpoint 存储: 你可以设置检查点快照的持久化位置。默认情况下,Flink将使用JobManager的堆。建议在生产部署中改为使用持久性文件系统。 有关作业范围和集群范围配置的可用选项的更多详细信息,请参阅Checkpoint 存储。
- 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向
enableCheckpointing(long interval, CheckpointingMode mode)
方法中传入一个模式来选择使用两种保证等级中的哪一种。 对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。 - checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值,还在进行中的 checkpoint 操作就会被抛弃。
- checkpoints 之间的最小时间:该属性定义在 checkpoint 之间需要多久的时间,以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000, 无论 checkpoint 持续时间与间隔是多久,在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。
- 往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints 之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。注意这个值也意味着并发 checkpoint 的数目是一。
- checkpoint 可容忍连续失败次数:该属性定义可容忍多少次连续的 checkpoint 失败。超过这个阈值之后会触发作业错误 fail over。 默认次数为“0”,这意味着不容忍 checkpoint 失败,作业将在第一次 checkpoint 失败时fail over。 可容忍的checkpoint失败仅适用于下列情形:Job Manager的IOException,TaskManager做checkpoint时异步部分的失败, checkpoint超时等。TaskManager做checkpoint时同步部分的失败会直接触发作业fail over。其它的checkpoint失败(如一个checkpoint被另一个checkpoint包含)会被忽略掉。
- 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。该选项不能和 “checkpoints 间的最小时间"同时使用。
- externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。 这种方式下,如果你的 job 失败,你将会有一个现有的 checkpoint 去恢复。更多的细节请看 保留 checkpoints 的部署官方文档。
- 非对齐 checkpoints: 你可以启用非对齐 checkpoints 以在背压时大大减少创建checkpoint的时间。这仅适用于精确一次(exactly-once)checkpoints 并且只有一个并发检查点。
- 部分任务结束的 checkpoints: 默认情况下,即使DAG的部分已经处理完它们的所有记录,Flink也会继续执行 checkpoints。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointRetention(
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
6.1 任务结束前等待最后一次 Checkpoint
为了保证使用两阶段提交的算子可以提交所有的数据,任务会在所有算子都调用 finish()
方法后等待下一次 checkpoint 成功后退出。 需要注意的是,这一行为可能会延长任务运行的时间,如果 checkpoint 周期比较大,这一延迟会非常明显。 极端情况下,如果 checkpoint 的周期被设置为 Long.MAX_VALUE
,那么任务永远不会结束,因为下一次 checkpoint 不会进行。