Flink Checkpoint 流程解析
Checkpoint 流程解析
- Flink Checkpoint 流程解析
Checkpint 流程概括
任务运行后 JobMaster 定时执行 Checkpoint,JobMaster 会通过调用 CheckpointCoordinator 对作业进行 Checkpoint。
CheckpointCoordinator 开始进行 Checkpoint,它首先会先创建 PendingCheckpoint,然后开始给 Checkpoint 计时,再关闭网关开始触发 OperatorCoordinator 的 Checkpoint。
如果是 SourceOperatorCoordinator,则这时会调用 Source 的 getSplitSerializer,获取分片序列化器,然后将 SplitAssignmentTracker 中任务运行时分配的分片序列化创建 Snapshot,再将 Snapshot 放入 PendingCheckpoint 中。
OperatorCoordinator 状态触发完后,开始触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFStreamOperator 内部的实现 WithMasterCheckpointHook 接口的 Function 创建,用于在 Master 触发 Checkpoint 时,Function 需要进行的操作。
MasterHooks 调用完后,CheckpointCoordinator 将给子任务 TaskManager 发送请求,通知它们开始 Checkpoint。
TaskExecutor 获取相应的任务 Task,Task 调用 StreamTask 开始进行 Checkpoint,StreamTask 调用 Mailbox 执行 Checkpoint 事件,Mailbox 执行 Checkpoint 事件时, Source 将不会从数据源读取数据。
Checkpoint 事件开始执行,如果 Checkpoint 需要强制对齐,那么需要异步创建 Channel 和结果分区的数据快照, 随后在执行传播 Barrier 前,SubtaskCheckpointCoordinatorImpl 会调用 OperatorChain 让 Operator 进行 Barrier 前的准备操作,然后开始往下游传播 Barrier。
SubtaskCheckpointCoordinatorImpl 创建 CheckpointBarrier 并将 CheckpointBarrier 发送给 RecordWriterOutput 将 Barrier 传输给下游任务,然后注册 Barrier 对齐超时计时器。
Barrier 传播完后,如果之前创建了 Channel 状态快照 ,那么还需要异步完成 Channel Output 的数据快照。
最后 SubtaskCheckpointCoordinatorImpl 开始对当前子任务的所有算子进行 Checkpoint,这时会进行算子创建快照时的操作,算子状态是存储在 OperatorStateBackend 和 KeyedStateBackend 中的, SubtaskCheckpointCoordinator 将会创建 OperatorStateBackend 和 KeyedStateBackend 的状态快照。
下游任务这时是正常处理上游发送过来的数据的,但是上游正在进行 Checkpoint,数据也是被发送过来的 CheckpointBarrier 分割开了,处理到后面会接收到上游的 CheckpointBarrier,也就表示着当前 Checkpoint 上游快照数据已经处理完,下游也开始进行 Checkpoint 了,下游进行 Checkpoint 的过程也是和上面的一样,继续调用 SubtaskCheckpointCoordinatorImpl 开始进行 Checkpoint。
总的来说,Checkpoint 将创建 Coordinator 状态、托管键值状态、托管算子状态、未处理的键值状态、未处理的算子状态、输入通道状态和结果分区状态的快照。
Checkpoint 触发流程解析 (Flink 1.20)
任务启动后 JobManager 开始定期对任务执行 Checkpoint
Task 任务恢复
Task#restoreAndInvoke
…
更新任务状态为 RUNNING 状态,TaskExecutor 通知 JobMaster 任务状态更新
TaskManagerActions#updateTaskExecutionState
TaskExecutor.TaskManagerActionsImpl#updateTaskExecutionState
JobMasterGateway#updateTaskExecutionState
…
JobMaster 调用 SchedulerBase、DefaultExecutionGraph 更新任务状态,定期触发 Checkpoint
JobMaster#updateTaskExecutionState
SchedulerBase#updateTaskExecutionState
DefaultExecutionGraph#updateState
DefaultExecutionGraph#updateStateInternal
[CheckpointCoordinator 开始定期执行 Checkpoint](#JobManager 使用 CheckpointCoordinator 触发 Checkpoint)
CheckpointCoordinator#startCheckpointScheduler
JobManager 使用 CheckpointCoordinator 触发 Checkpoint
JobMaster 触发 Checkpoint
JobMaster#triggerCheckpoint
调度器触发 Checkpoint
SchedulerNG#triggerCheckpoint
从 ExecutionGraph 中获取 CheckpointCoordinator,创建 CheckpointTriggerRequest,并使用 CheckpointCoordinator 通过 CheckpointRequestDecider 决定需要处理的 Checkpoint 请求触发 Checkpoint
CheckpointCoordinator#triggerCheckpoint
CheckpointRequestDecider#chooseRequestToExecute
CheckpointCoordinator#startTriggeringCheckpoint
触发和通知所有 OperatorCoordinator 开始 Checkpoint
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
-
CheckpointCoordinator#snapshotMasterState
MasterTriggerRestoreHook#triggerCheckpoint
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
CheckpointCoordinator 初始化 Checkpoint 所需要的信息
- 计算 Checkpoint 执行计划
CheckpointPlanCalculator#calculateCheckpointPlan
校验所有任务是否已经初始化
如果有任务已经完成,那么创建所有任务完成后计算检查点的计划
DefaultCheckpointPlanCalculator#calculateAfterTasksFinished
如果没有任务完成,那么创建当所有任务都在运行时计算检查点的计划,该计划为所有任务都将标记为需要触发 Checkpoint,并将所有任务标记为需要等待和提交
DefaultCheckpointPlanCalculator#calculateWithAllTasksRunning
校验所有任务是否都在运行中
Checkpoint 计数加一
创建待处理的的 Checkpoint
CheckpointCoordinator#createPendingCheckpoint
追溯待处理的 Checkpoint 状态
CheckpointCoordinator#trackPendingCheckpointStats
创建一个新的挂起检查点跟踪器
CheckpointStatsTracker#reportPendingCheckpoint
报告单个子任务的统计信息
CheckpointCoordinator#reportFinishedTasks
创建待处理的的 Checkpoint(PendingCheckpoint)
开始 Checkpoint 计时,时间超时则取消 Checkpoint
返回待处理的的 Checkpoint
初始化 Checkpoint 地址
CheckpointCoordinator#initializeCheckpointLocation
如果该 Checkpoint 类型为 Savepoint,则初始化 Savepoint 地址
CheckpointStorageCoordinatorView#initializeLocationForSavepoint
否则,先初始化 Checkpoint Base 地址,再开始初始化地址
CheckpointStorageCoordinatorView#initializeBaseLocationsForCheckpoint
CheckpointStorageCoordinatorView#initializeLocationForCheckpoint
返回 Checkpoint 地址
触发所有 OperatorCoordinator Checkpoint
触发和通知所有 OperatorCoordinator 开始 Checkpoint
OperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
触发 OperatorCoordinator Checkpoint
OperatorCoordinatorCheckpoints#triggerAllCoordinatorCheckpoints
关闭网关,获取并等待所有事件完成
OperatorCoordinatorHolder#closeGateways
IncompleteFuturesTracker#getCurrentIncompleteAndReset
网关标记当前的 Checkpoint
OperatorCoordinator 触发 Checkpoint
OperatorCoordinator#checkpointCoordinator
根据 Coordinator 的 Checkpoint 后的状态创建并返回 CoordinatorSnapshot
通知所有 CheckpointCoordinator Checkpoint 结果
OperatorCoordinatorCheckpoints#acknowledgeAllCoordinators
触发 MastersHooks 状态快照
触发 MasterHooks 状态快照,MasterTriggerRestoreHook 由 UDFOperator 内部的实现 WithMasterCheckpointHook 接口的 UDF 创建,表示在 Master 触发 Checkpoint 时,UDF 可以做什么。
CheckpointCoordinator#snapshotMasterState
MasterTriggerRestoreHook#triggerCheckpoint
CheckpointCoordinator 通知子任务开始 Checkpoint
CheckpointCoordinator 给子任务发送 Checkpoint 请求
CheckpointCoordinator#triggerCheckpointRequest
发送任务 Checkpoint 请求
CheckpointCoordinator#triggerTasks
向所有的 Exeuction 对应的 Taskmanager 网关发送 Checkpoint 请求,子任务接收到请求后会开始触发 Checkpoint
Execution#triggerCheckpointHelper
TaskManagerGateway#triggerCheckpoint
任务 Checkpoint 请求发送完后取消定时器
子任务开始触发 Checkpoint
TaskManager 触发指定子任务的 Checkpoint
TaskExecutor#triggerCheckpoint
Task#triggerCheckpointBarrier
创建 Checkpoint 元数据 CheckpointMetaData
算子 Mailbox 异步执行 Checkpoint,因为 Checkpoint 在 MailboxProcessor 执行,所以这时将不会有数据传入
CheckpointableTask#triggerCheckpointAsync
StreamTask#triggerCheckpointAsync
如果 InputGateway 分区数据未处理完成,则触发未完成的数据通道 Checkpoint
StreamTask#triggerUnfinishedChannelsCheckpoint
这这情况是考虑已完成任务的 Checkpoint ,如果非 Source 任务成为新的主任务,则可能会通过 RPC 触发检查点。在这种情况下,他们将通知该检查点的 CheckpointBarrierHandle。
创建一个 CheckpointBarrier,并通知所有未完成的 Channel 处理该 Barrier,并尝试触发 Checkpoint
CheckpointBarrierHandler#processBarrier
如果 InputGateway 分区数据已经处理完成,则直接开始触发 Checkpoint
StreamTask#triggerCheckpointAsyncInMailbox
-
SubtaskCheckpointCoordinator#initInputsCheckpoint
SubtaskCheckpointCoordinatorImpl 开始执行 Checkpoint
StreamTask#performCheckpoint
SubtaskCheckpointCoordinator#checkpointState
-
MailBoxProcessor 异步执行 Checkpoint 事件
算子调用 SubtaskCheckpointCoordinator 执行 Checkpoint
SubtaskCheckpointCoordinatorImpl#checkpointState
如果当前 Checkpint 被终止了,那么向下游发送 CancelCheckpointMarker事件,以防下游背压,并结束当前 Checkpoint。
如果 Checkpoint 之前没有对齐过,并且 Checkpoint 配置的对齐类型是强制对齐,那么首先将当前 Checkpoint 类型设置为不再需要对齐了,然后初始化输入端的状态,可见初始化输入端状态
CheckpointOptions#withUnalignedSupported
SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
准备 Checkpoint,算子执行 Snapshot 和 发送 Barrier 前的操作
OperatorChain#prepareSnapshotPreBarrier
创建 CheckpointBarrier,并往下游发送 CheckpointBarrier 事件,开始 Barrier 对齐操作
OperatorChain#broadcastEvent
注册对齐计时器以在超时时对齐未对齐的 barrier
SubtaskCheckpointCoordinator#registerAlignmentTimer
如果前面进行了 Channel Checkpoint,那么在这里完成状态通道 Writer 快照
ChannelStateWriter#finishOutput
SubtaskCheckpointCoordinator 同步获取算子的所有的状态快照
SubtaskCheckpointCoordinator#takeSnapshotSync
如果 Checkpoint 是可超时和可不对齐的,则从 ChannelStateWriter 中获取通道状态写结果(ChannelStateWriteResult)
解析 Checkpoint 存储地址
SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#resolveCheckpointStorageLocation
触发 OpeartorChain 状态快照
OperatorChain#snapshotState
如果是 RegularOperatorChain,则获取所有算子,并触发所有算子的状态快照
RegularOperatorChain#buildOperatorSnapshotFutures
**构建 StreamOpeartor 算子状态快照 Future **
StreamOperator#snapshotState
如果算子是主算子或者是尾算子,那么将通道和结果分区的状态快照结果 Future 设置到AsyncCheckpointRunnable 中
如果是FinishedOperatorChain,则只将通道和结果分区的状态快照结果 Future 设置到 OperatorSnapshotFutures 中
向 CheckpointCoordinator 发送已接收 Checkpoint 事件
OperatorChain#sendAcknowledgeCheckpointEvent
清理 Checkpoint 缓存
SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#clearCacheFor
设置 Checkpoint 持续时间的指标
CheckpointMetricsBuilder#setSyncDurationMillis
如果获取 SnapShot 成功,则异步完成 Checkpoint
SubtaskCheckpointCoordinator#finishAndReportAsync
创建并异步执行 AsyncCheckpointRunnable
AsyncCheckpointRunnable#start
开始状态快照,并等待所有 SnapshotFuture 完成
AsyncCheckpointRunnable#finalizedFinishedSnapshots
AsyncCheckpointRunnable#finalizeNonFinishedSnapshots
计算 Channel 和分区对齐时的状态大小,并设置相关指标
否则,清理 SubtaskCheckpointCoordinator
SubtaskCheckpointCoordinator#cleanup
初始化输入端状态
子任务初始化 Checkpoint
SubtaskCheckpointCoordinatorImpl#initInputsCheckpoint
如果 Checkpoint 可不需要对齐
初始化写状态通道
ChannelStateWriter#start
创建CheckpointStartRequest,并将请求分发到 Writer
ChannelStateWriteRequestDispatcher#dispatch
分发器处理 CheckpointStartRequest
ChannelStateWriteRequestDispatcherImpl#handleCheckpointStartRequest
为该子任务 Writer 注册 ChannelStateWriteResult,用于收集 Checkpoint 过程中传输过来的数据
ChannelStateCheckpointWriter#registerSubtaskResult
准备正在传输中的数据快照,等待输入端的数据达到 Barrier
SubtaskCheckpointCoordinatorImpl#prepareInflightDataSnapshot
准备输入端快照
StreamTask#prepareInputSnapshot
StreamTaskInput#prepareSnapshot
网络输入端准备快照
StreamTaskNetworkInput#prepareSnapshot
获取所有还未处理的 Buffer,并添加到状态写状态通道中
ChannelStateWriter#addInputData
返回所有 Barriers 屏障接受 Future
等所有 Barriers 屏障接受后,完成对给定检查点 id 的通道状态数据的写入
将 CheckpointInProgressRequest 请求提交到通道状态写请求执行器(ChannelStateWriteRequestExecutor)中
通道状态写请求执行器执行对应请求
ChannelStateCheckpointWriter#completeInput
完成状态写入,写入的状态存放在 ChannelStateWriteResult 中,里面存放着写入的状态柄 InputChannelStateHandle 和 ResultSubpartitionStateHandle
ChannelStateCheckpointWriter#finishWriteAndResult
如果 Checkpoint 是可超时的,那么除了上面准备输入端快照那一步骤外,其他步骤都需要进行
触发 StreamOperator 状态快照
触发 StreamOperator 的 Checkpoint
RegularOperatorChain#checkpointStreamOperator
StreamOperatorStateHandler 创建快照
StreamOperatorStateHandler#snapshotState
创建算子快照环境和算子快照 Futures
真正的触发算子快照,该步操作可以通过算子自定义
StreamOperatorStateHandler.CheckpointedStreamOperator#snapshotState
算子和 Keyd 状态后端触发快照
Snapshotable#snapshot
下游算子接收到 CheckpointBarrier 后开始 Checkpoint
下游算子处理上游发送过来的事件
CheckpointedInputGate#handleEvent
如果接收到的事件为 CheckpointBarrier 事件,则开始处理 Barrier,尝试开始 Checkpoint
CheckpointBarrierHandler#processBarrier
CheckpointBarrierHandler 处理 Barrier
CheckpointBarrierHandler 处理 Barrier
CheckpointBarrierHandler#processBarrier
如果该 Barrier Id 大于上一次 PendingCheckpoint 的 Id 并且当前开启的 Channel 只有一个,标记对齐开始和结束,并通知开始 Checkpoint,然后结束该次处理
CheckpointBarrierHandler#markAlignmentStartAndEnd
CheckpointBarrierHandler#notifyCheckpoint
StreamTask#triggerCheckpointOnBarrier
SubtaskCheckpointCoordinator#checkpointState
否则尝试从等待的 Checkpoint 队列中寻找该 CheckpointBarrier
如果找到了,则说明 Barrier 已经对齐,标记已经完成对齐,并开始触发 Checkpoint,可见[MailBoxProcessor 异步执行 Checkpoint 事件](#MailBoxProcessor 异步执行 Checkpoint 事件)
CheckpointBarrierTracker#triggerCheckpointOnAligned
CheckpointBarrierHandler#notifyCheckpoint
StreamTask#triggerCheckpointOnBarrier
SubtaskCheckpointCoordinator#checkpointState
否则将该 Barrier 添加到Checkpoint 队列中,开始对齐
参考:
Flink Stateful Stream Processing:https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/stateful-stream-processing/