对齐检查点 (Aligned Checkpoint)
Flink 的分布式快照机制受到 Chandy-Lamport 算法的启发。 其核心元素是数据流中的屏障(Barrier)。
Barrier 注入 :JobManager 中的 Checkpoint Coordinator 指示 Source 任务开始 Checkpoint。Source 任务在数据流中注入 Barrier。这些 Barrier 携带 Checkpoint ID,将数据流分割成属于本次快照的记录和属于下次快照的记录。
Barrier 对齐 :当一个算子(Operator)有多个输入流时,它必须等待所有输入流的 Barrier 都到达后,才会进行自己的状态快照,并向下游广播 Barrier。 在对齐过程中,已经接收到 Barrier 的输入通道会被阻塞,算子会继续处理来自尚未接收到 Barrier 的通道的数据。
状态快照 :一旦所有 Barrier 到达,算子就进行本地状态快照,并异步上传到持久化存储。
Checkpoint 完成 :当所有 Sink 任务接收到所有输入流的 Barrier 并完成自己的快照后,会通知 JobManager,该 Checkpoint 完成。
对齐检查点的局限性 (尤其在反压情况下)
在应用产生反压时,对齐检查点会面临以下问题:
Barrier 流动缓慢 :由于反压,Buffer 中缓存了大量数据,导致 Barrier 在数据流中流动缓慢。
处理阻塞 :对于已经接收到 Barrier 的 Channel,由于需要等待其他 Channel 的 Barrier 进行对齐,其上游数据处理会被阻塞。
Checkpoint 完成时间长 :Barrier 可能需要很长时间才能到达 Sink,导致 Checkpoint 完成时间过长。一个耗时过长的 Checkpoint 在完成时可能已经“过时”了。
恶性循环 :长时间的 Checkpoint 可能导致任务超时、崩溃,然后从一个较旧的 Checkpoint 恢复,这可能加剧反压,形成恶性循环,使得任务几乎没有进展。
非对齐检查点 (Unaligned Checkpoint) 的工作原理
为了解决上述问题,Flink 引入了非对齐检查点(FLIP-76)。其核心思想是取消中间算子的 Barrier 对齐过程。
Barrier 注入 :与对齐检查点类似,Barrier 仍然在 Source 端注入。
无需对齐,立即转发 :中间算子在接收到任何一个输入流的 Barrier 后,不再等待其他输入流的 Barrier。它会:
短暂阻塞 任务。
标记 Buffer :记录当前 Buffer 中的数据(这些数据属于当前 Checkpoint)。
转发 Barrier :立即将 Barrier 向下游算子转发。
创建状态快照 :进行本地状态快照,这个快照包含了算子自身的状态以及在其输入 Buffer 中、尚未被处理但在 Barrier 之前到达的数据(即所谓的“in-flight data”)。
快速到达 Sink :由于 Barrier 不再需要等待对齐,它们可以非常快速地传递到 Sink。
Sink 端对齐(可选) :在某些描述中,提到非对齐检查点只在 Sink 端进行对齐,而中间算子则不进行对齐。
怎么保证 Barrier 不会超过之前的数据,和不会被之后的数据超过?
这是通过 Flink 数据传输和处理的有序性来保证的,主要依赖以下几点:
发送端的有序性:
- 当一个上游算子处理完一批数据后,如果接下来需要发送 Checkpoint Barrier N,它会确保先将这批数据发送出去,然后再发送 Barrier N。之后产生的数据,则会在 Barrier N 之后发送。
- 这意味着在逻辑上,数据记录和 Barrier 在发送端是被串行化处理和发送的。
Flink 网络栈的有序性保证 (Channel 级别):
- Flink 的 TaskManager 内部(对于 chained operators)的数据传输通道(
InputChannel
和ResultSubpartition
)被设计为先进先出 (FIFO) 的。 - 无论是普通的数据记录还是特殊的 Checkpoint Barrier,一旦被写入一个输出通道,它们就会按照写入的顺序被下游的输入通道读取。
- 对于跨 TaskManager 的网络传输,Flink 通常依赖 TCP/IP。在单个 TCP 连接内,TCP 协议本身保证了数据包的有序传输。 Flink 会为有数据交互的 Task Subtask 之间建立逻辑连接,这些连接底层可能复用物理 TCP 连接,但 Flink 的网络层会确保逻辑上的数据顺序。
- Flink 的 TaskManager 内部(对于 chained operators)的数据传输通道(
算子处理的有序性:
- 算子从其输入通道读取数据时,也是按顺序读取的。它不会跳过前面的数据去处理后面的数据(在一个通道内)。
总结一下保证顺序的关键:
- 上游按序发送:数据记录和 Barrier 在源头就是按逻辑顺序发送的。
- 通道保证 FIFO:Flink 的数据传输通道(无论是内存中的还是跨网络的)保证了消息的先进先出。
- 下游按序接收和处理:下游算子按顺序从通道中读取数据。
当然,如果一个算子有多个输入通道(例如来自不同的上游算子,或者同一个上游算子的不同并行实例),那么不同通道上的 Barrier N 到达时间确实可能因为网络延迟、处理速度等原因而不同。这正是 Barrier Alignment (屏障对齐) 机制要解决的问题:算子会等待所有输入通道的 Barrier N 都到达后,才进行自己的状态快照,以确保快照的一致性。
PipelinedSubpartition
PipelinedSubpartition
使用 PrioritizedDeque
使得 Flink 能够在保证普通数据流的 FIFO 特性的基础上,有效地处理需要优先响应的特殊事件。这比单纯的 ArrayDeque
提供了更灵活的控制流管理。
PrioritizedDeque
是一个很有意思的数据结构,它结合了队列和优先级处理的能力:
- 基本行为: 它仍然是一个双端队列 (Deque),可以从头部和尾部添加或移除元素。
- 优先级处理:
- 当一个具有优先级的
BufferConsumer
(例如,一个非对齐检查点的CheckpointBarrier
,或者其他一些控制事件)被添加到PipelinedSubpartition
时,它通常会被放入PrioritizedDeque
的队首(通过类似addPriorityElement
的方法,具体实现在PrioritizedDeque
内部)。 - 普通的、非优先级的
BufferConsumer
(即大部分数据 Buffer)则会被添加到PrioritizedDeque
的队尾(通过buffers.add(...)
,这通常是 Deque 的标准addLast
行为)。
- 当一个具有优先级的
- 数据消费:
- 当数据被消费时(例如通过
pollBuffer()
方法),元素总是从PrioritizedDeque
的队首被取出。
- 当数据被消费时(例如通过
这对 FIFO 意味着什么?
- 对于普通数据流: 如果没有优先事件插入,普通的数据 Buffer 遵循严格的 FIFO 顺序。它们被加入队尾,然后按顺序从队首被消费。
- 当优先事件发生时: 如果一个优先事件(比如一个需要立即处理的 Barrier)到达,它会被插入到队首。这意味着它会在任何已在队列中但尚未被消费的普通数据 Buffer 之前被处理。这对于确保像非对齐 Checkpoint Barrier 这样的控制信令能够及时响应是至关重要的。一旦所有优先元素被处理完毕,队列会继续从队首消费那些之前按 FIFO 顺序排列的普通数据 Buffer。
Flink 在恢复时怎么知道快照包含的“物理部分”是什么
主要是通过以下信息和机制:
算子标识 (Operator ID / UID):
- 在 Flink 作业中,每个有状态的算子(Operator)都有一个唯一的标识符。这个标识符可以是 Flink 自动生成的,也可以是用户通过
uid(String)
方法指定的。 - 当进行 Checkpoint 时,每个算子产生的状态都会与这个唯一的算子标识符关联起来并存储。
- 在 Flink 作业中,每个有状态的算子(Operator)都有一个唯一的标识符。这个标识符可以是 Flink 自动生成的,也可以是用户通过
子任务索引 (Subtask Index):
- Flink 作业中的算子通常会以一定的并行度执行。每个并行实例被称为一个子任务(Subtask),并且它们都有一个从 0 到
parallelism-1
的索引。 - Checkpoint 快照会分别记录下属于每一个算子的每一个子任务的状态。你当前正在查看的
AcknowledgeCheckpoint.java
文件中的TaskStateSnapshot subtaskState
字段,就代表了一个特定子任务的状态快照。
- Flink 作业中的算子通常会以一定的并行度执行。每个并行实例被称为一个子任务(Subtask),并且它们都有一个从 0 到
键组 (Key Groups) - 针对 Keyed State:
- 对于 Keyed State(例如,在
keyBy
之后使用的ValueState
,MapState
等),数据是根据 key 的哈希值被划分到逻辑的“键组”(Key Groups)中的。 - 一个作业的最大并行度(
maxParallelism
)决定了总共有多少个键组。 - 在 Checkpoint 时,每个键组的状态都会被保存下来。
- 在恢复时,每个(可能新的)子任务会被确定性地分配一组它需要负责的键组。这个分配算法保证了无论并行度如何变化(在
maxParallelism
范围内),每个 key 始终属于同一个键组,并且每个键组始终会被分配给一个确定的子任务。因此,子任务可以准确地知道应该从快照中加载哪些键组的状态。
- 对于 Keyed State(例如,在
快照元数据 (Checkpoint Metadata):
- 当一个 Checkpoint 成功完成后,JobManager 会将关于这个 Checkpoint 的所有元数据信息持久化。这些元数据通常包含:
- Checkpoint ID。
- 每个算子(通过其唯一标识符识别)的状态信息。
- 对于每个算子,其每个子任务(通过其索引识别)的状态句柄(State Handle),这些句柄指向实际存储状态数据的位置(例如 HDFS 上的文件路径)。
- 对于 Keyed State,会记录每个键组范围的状态句柄。
- 当一个 Checkpoint 成功完成后,JobManager 会将关于这个 Checkpoint 的所有元数据信息持久化。这些元数据通常包含:
恢复过程如何利用这些信息:
当 Flink 作业从一个 Checkpoint 恢复时:
- JobManager 首先从持久化存储中读取选定的 Checkpoint 的元数据。
- JobManager 根据当前的作业拓扑和每个算子的并行度,为每个新启动的 Task(即算子的子任务实例)分配任务。
- 对于每一个需要恢复状态的 Task:
- JobManager 会在其元数据中查找与该 Task 对应的算子标识符和子任务索引(或者它负责的键组范围)。
- 通过这些信息,JobManager 可以定位到该 Task 在 Checkpoint 中对应的状态句柄。
- Task 接收到这些状态句柄后,就会从持久化存储中读取并加载属于自己的那部分状态数据。
所以,所谓的“物理部分”其实就是指特定算子的特定并行实例(子任务)所拥有的那部分状态数据。
Flink 通过在 Checkpoint 时精确记录这种“逻辑算子/子任务”到“实际状态数据存储位置”的映射关系,并在恢复时利用这种映射关系,来确保每个新的 Task 实例都能正确地加载其先前保存的状态。
什么是非对齐检查点 (Unaligned Checkpoint)?
在标准的对齐检查点(Aligned Checkpoint)模式下,当一个算子接收到来自上游某个输入通道的检查点屏障 (Checkpoint Barrier) 时,它会暂停处理该通道的数据,直到接收到所有输入通道的屏障。这个过程称为“对齐”。对齐的目的是确保所有算子在同一时刻对数据流进行快照,从而保证精确一次 (Exactly-Once) 的处理语义。然而,在高背压的情况下,对齐过程可能会非常耗时,因为某些通道的屏障可能需要等待很长时间才能到达,这会导致检查点时长增加,甚至超时。
非对齐检查点通过允许检查点屏障“越过”通道中正在传输的数据来解决这个问题。当屏障到达算子时,算子会立即开始进行快照,并将通道中尚未处理的数据(即所谓的“飞行中”数据)也作为检查点状态的一部分保存下来。
非对齐检查点是如何工作的?
核心思想是:
- 屏障超越数据 (Barrier Overtaking Data):检查点屏障不再需要等待所有数据处理完毕。当屏障到达一个算子时,该算子会立即开始其快照过程。
- 飞行中数据作为状态 (In-flight Data as State):在屏障之后、算子处理之前的数据(即飞行中数据)会被捕获并存储为检查点状态的一部分。这意味着,当从检查点恢复时,这些飞行中数据也会被恢复并重新处理,就好像它们从未被屏障越过一样。
- 源端插入屏障:尽管非对齐检查点在概念上更接近 Chandy-Lamport 算法,但 Flink 仍然在数据源端插入屏障,以避免检查点协调器过载。
在代码层面,CheckpointOptions.java
文件定义了不同的对齐类型,包括 UNALIGNED
和 FORCED_ALIGNED
:CheckpointConfig.java
中有启用非对齐检查点的方法。
当启用非对齐检查点时,如果设置了 alignedCheckpointTimeout
,检查点会先尝试对齐。如果在超时时间内未能完成对齐,则会自动切换到非对齐模式。如果 alignedCheckpointTimeout
设置为0,则检查点会直接以非对齐方式启动。
为什么可以非对齐?
非对齐检查点之所以能够实现,关键在于它改变了对“一致性快照”的实现方式。
- 传统对齐检查点:通过确保所有算子在逻辑上的同一时间点(即所有输入屏障都到达时)进行快照,来保证数据的一致性。这意味着在屏障到达之前的数据都已被处理并反映在算子状态中,屏障之后的数据则不包含在当前快照中。
- 非对齐检查点:它放宽了“同一逻辑时间点”的严格要求。通过将飞行中的数据(即那些在屏障已经通过但尚未被下游算子处理的数据)也包含在检查点状态中,它依然能够保证在恢复时,所有数据不多不少恰好被处理一次。当从非对齐检查点恢复时,这些被保存的飞行中数据会被重新注入到数据流中,就好像它们从未被屏障“跳过”一样。
优势:
- 减少背压影响:在高背压场景下,数据处理缓慢,导致屏障对齐时间过长。非对齐检查点由于不需要等待数据处理,可以显著缩短检查点时间。
- 提高检查点频率:由于检查点时间缩短,可以更频繁地进行检查点,从而减少故障恢复时需要重放的数据量。
限制和注意事项:
- 精确一次语义:非对齐检查点仅在精确一次 (Exactly-Once) 语义下可用。
- Savepoint:Savepoint 通常不能是非对齐的,因为它们通常用于版本升级或作业迁移,需要一个完全对齐的状态。
- 状态大小:由于飞行中的数据也被包含在状态中,非对齐检查点的状态大小可能会比对齐检查点更大。
- Sink 的特殊处理:如 Flink 2.0 的发布说明中提到 (
docs/content/release-notes/flink-2.0.md
),对于 Sink 拓扑中的操作(如 committer),非对齐检查点会被禁用,以确保 committable 能够在notifyCheckpointComplete
时位于相应的算子,从而正确提交所有副作用。
总而言之,非对齐检查点通过将飞行中数据纳入状态管理,实现了在不牺牲精确一次语义的前提下,大幅优化高背压场景下检查点性能的目标。