Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

发布于:2024-05-09 ⋅ 阅读:(17) ⋅ 点赞:(0)

背景

在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察是如何算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理历程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {
		final long barrierId = receivedBarrier.getId();

		// fast path for single channel trackers
		if (totalNumberOfInputChannels == 1) {
			markAlignmentStartAndEnd(receivedBarrier.getTimestamp());
			notifyCheckpoint(receivedBarrier);
			return;
		}

		// general path for multiple input channels
		if (LOG.isDebugEnabled()) {
			LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);
		}

		// find the checkpoint barrier in the queue of pending barriers
		CheckpointBarrierCount barrierCount = null;
		int pos = 0;

		for (CheckpointBarrierCount next : pendingCheckpoints) {
			if (next.checkpointId == barrierId) {
				barrierCount = next;
				break;
			}
			pos++;
		}

		if (barrierCount != null) {
			// add one to the count to that barrier and check for completion
			int numBarriersNew = barrierCount.incrementBarrierCount();
			if (numBarriersNew == totalNumberOfInputChannels) {
				// checkpoint can be triggered (or is aborted and all barriers have been seen)
				// first, remove this checkpoint and all all prior pending
				// checkpoints (which are now subsumed)
				for (int i = 0; i <= pos; i++) {
					pendingCheckpoints.pollFirst();
				}

				// notify the listener
				if (!barrierCount.isAborted()) {
					if (LOG.isDebugEnabled()) {
						LOG.debug("Received all barriers for checkpoint {}", barrierId);
					}
					markAlignmentEnd();
					notifyCheckpoint(receivedBarrier);
				}
			}
		}
		else {
			// first barrier for that checkpoint ID
			// add it only if it is newer than the latest checkpoint.
			// if it is not newer than the latest checkpoint ID, then there cannot be a
			// successful checkpoint for that ID anyways
			if (barrierId > latestPendingCheckpointID) {
				markAlignmentStart(receivedBarrier.getTimestamp());
				latestPendingCheckpointID = barrierId;
				pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

				// make sure we do not track too many checkpoints
				if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
					pendingCheckpoints.pollFirst();
				}
			}
		}
	}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在个队列中。

当收取到时非第一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候就会将barrier队列中在这个barrier之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Override
	public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
		long barrierId = barrier.getId();
		LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);

        if (currentCheckpointId > barrierId
                || (currentCheckpointId == barrierId && !isCheckpointPending())) {
            if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
                inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);
            }
			return;
		}

        checkNewCheckpoint(barrier);
        checkState(currentCheckpointId == barrierId);

        if (numBarriersReceived++ == 0) {
            if (getNumOpenChannels() == 1) {
                markAlignmentStartAndEnd(barrier.getTimestamp());
            } else {
                markAlignmentStart(barrier.getTimestamp());
            }
		}

        // we must mark alignment end before calling currentState.barrierReceived which might
        // trigger a checkpoint with unfinished future for alignment duration
        if (numBarriersReceived == numOpenChannels) {
            if (getNumOpenChannels() > 1) {
                markAlignmentEnd();
            }
        }

        try {
            currentState = currentState.barrierReceived(context, channelInfo, barrier);
        } catch (CheckpointException e) {
            abortInternal(barrier.getId(), e);
        } catch (Exception e) {
            ExceptionUtils.rethrowIOException(e);
        }

        if (numBarriersReceived == numOpenChannels) {
			numBarriersReceived = 0;
			lastCancelledOrCompletedCheckpointId = currentCheckpointId;
			LOG.debug(
				"{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);
			resetAlignmentTimer();
			allBarriersReceivedFuture.complete(null);
			}
		}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。


网站公告

今日签到

点亮在社区的每一天
去签到