Flink的Checkpoint機(jī)制是Flink容錯能力的基本保證,能夠?qū)α魈幚磉\(yùn)行時的狀態(tài)進(jìn)行保存,當(dāng)故障發(fā)生時,能夠備份的狀態(tài)中還原。例如,當(dāng)Flink讀取kafka時,將消費(fèi)的kafka offset保存下來,如果任務(wù)失敗,可以從上次消費(fèi)的offset之后重新消費(fèi)。
Flink的checkpoint從以下幾方面著手理解。
Barrier
Barrier是一個輕量級的數(shù)據(jù)被按一定的規(guī)則(調(diào)度)插入到原始數(shù)據(jù)流中,這個數(shù)據(jù)不會影響原有數(shù)據(jù)處理的性能,不會改變原始數(shù)據(jù)的順序。Barrier將數(shù)據(jù)分割成一段一段的,有點(diǎn)類似于Spark Streaming的micro batch中的批次數(shù)據(jù)。

Barrier隨著數(shù)據(jù)一起在各Task中流動,當(dāng)Operator收到一個barrier時,會認(rèn)為此barrier之前的所有數(shù)據(jù)應(yīng)該已經(jīng)得到了處理,這時候就會觸發(fā)checkpoint。
如果一個Operator有多個并發(fā)的輸入流,那么當(dāng)它收到一個checkpoint的barrier時,需要等待其他所有的該checkpoint對應(yīng)的額barrier到達(dá),再進(jìn)行處理,這個barrier對齊的步驟如下。
只要有operator從上游接收到一條barrier n,此時,該operator就不能處理這條流barrier以后的數(shù)據(jù),直到該operator收到其他所有上游的barrier n。
此時上報(bào)barrier n的流暫時不做任何處理。從這些流里讀到的數(shù)據(jù)也不被處理,而是被放置到input buffer中緩存。
直到最后一個上游的barrier n到達(dá),operator會發(fā)送barrier n給下游。
-
之后,operator恢復(fù)從所有的上游中處理數(shù)據(jù),在上游流數(shù)據(jù)處理之前先將input buffer中的數(shù)據(jù)處理。
整個過程如下圖所示。
barrier.png
Barrier的產(chǎn)生
Flink的checkpoint是由JobMaster發(fā)起的,以一定的周期觸發(fā)Source Task產(chǎn)生barrier。
JobMastert周期性觸發(fā)checkpoint。
在CheckpointCoordinator類中startCheckpointScheduler() 方法
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
//以特定的周期 baseInterval 觸發(fā)checkpoint
periodicScheduling = true;
long initialDelay = ThreadLocalRandom.current().nextLong(
minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
}
}
順著ScheduledTrigger一直進(jìn)入到j(luò)obMaster觸發(fā)checkpoint的方法triggerCheckpoint. 在經(jīng)過一系列的設(shè)置之后,該方法會生成一個唯一的checkpoint ID,并創(chuàng)建pending的checkpoint,調(diào)用rpc方法execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); 觸發(fā)Task側(cè)生成barrier,進(jìn)行checkpoint.
checkpointID = checkpointIdCounter.getAndIncrement();
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
props,
checkpointStorageLocation,
executor);
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
//調(diào)用TaskManager RPC taskManagerGateway.triggerCheckpoint
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
}
}
Source收到指令后生成barrier
TaskManager 接收到觸發(fā)checkpoint的RPC后,在Source Task中觸發(fā)生成checkpoint barrier, 在triggerCheckpointBarrier中會創(chuàng)建另一個線程專門做生成barrier的事情。
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
}
在Source Task中真正執(zhí)行的方法是StreamTask中的performCheckpoint,在此方法中,會進(jìn)行兩件事:首先生成攜帶checkpoint ID的barrier,并將此barrier發(fā)送到所有的下游。然后處理本Task的狀態(tài)保存。這樣,在整個流處理中就有了barrier傳遞。
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
Barrier傳遞
上面講到對于Source Task,會根據(jù)JobMaster的指令周期性的在原始數(shù)據(jù)中插入barrier,并將barrier傳遞到下游Operator。
對于非Source Task,在處理數(shù)據(jù)中,并不是周期性觸發(fā)checkpoint,而是當(dāng)遇到Barrier數(shù)據(jù)時,觸發(fā)一次checkpoint。
具體到代碼中,是由BarrierBuffer中的getNextNonBlocked觸發(fā)。
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
Optional<BufferOrEvent> next;
if (currentBuffered == null) {
next = inputGate.getNextBufferOrEvent();
}
else {
next = Optional.ofNullable(currentBuffered.getNext());
if (!next.isPresent()) {
completeBufferedSequence();
return getNextNonBlocked();
}
}
if (!next.isPresent()) {
if (!endOfStream) {
// end of input stream. stream continues with the buffered data
endOfStream = true;
releaseBlocksAndResetBarriers();
return getNextNonBlocked();
}
else {
// final end of both input and buffered data
return null;
}
}
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
bufferBlocker.add(bufferOrEvent);
checkSizeLimit();
}
else if (bufferOrEvent.isBuffer()) {
return bufferOrEvent;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
if (!endOfStream) {
// process barriers only if there is a chance of the checkpoint completing
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
processEndOfPartition();
}
return bufferOrEvent;
}
}
}
如果當(dāng)前流尚未結(jié)束,則在方法processBarrier中處理該barrier,processBarrier會根據(jù)該Task是否有多個輸入源判斷是否需要對齊barrier。如果可以進(jìn)行barrier了,則會調(diào)用notifyCheckpoint觸發(fā)checkpoint,該方法會走到triggerCheckpointOnBarrier,后續(xù)過程和Source Task一致。
Operator checkpoint
上述講到當(dāng)Task執(zhí)行checkpoint時,首先會生成該checkpoint的barrier廣播出去,然后再執(zhí)行該Task的checkpoint. 通過executeCheckpointing方法調(diào)用operator的snapshotState進(jìn)行狀態(tài)保存。不同的operator根據(jù)自己的需要實(shí)現(xiàn)snapshotState方法。
例如Flink提供的kafka consumer operator, KafkaConsumerBase的snapshotState就保存了當(dāng)前Topic各partition消費(fèi)到的offset.
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
pending checkpoint到complete
當(dāng)一個operator完成了checkpoint時,會向job master報(bào)告已經(jīng)完成了,job master收到該operator報(bào)告的完成信息,會將此operator從未完成checkpoint移到已完成,當(dāng)所有的operator都上報(bào)了完成信息時,job master會將此checkpoint從pending狀態(tài)改變未complete狀態(tài)。
Task側(cè):
executeCheckpointing -> asyncCheckpointRunnable -> reportCompletedSnapshotStates -> taskStateManager.reportTaskStateSnapshots -> TaskStateManagerImpl 最終會調(diào)用
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
通過actor通知job master。
Job master側(cè):
acknowledgeCheckpoint -> checkpointCoordinator.receiveAcknowledgeMessage(ackMessage) -> checkpoint.acknowledgeTask -> completePendingCheckpoint(checkpoint);
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// As a first step to complete the checkpoint, we register its state with the registry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
try {
try {
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
}
catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
if (!pendingCheckpoint.isDiscarded()) {
pendingCheckpoint.abortError(e1);
}
throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);
}
// the pending checkpoint must be discarded after the finalization
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
try {
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// we failed to store the completed checkpoint. Let's clean up
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
}
}
});
throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);
}
} finally {
pendingCheckpoints.remove(checkpointId);
triggerQueuedRequests();
}
rememberRecentCheckpointId(checkpointId);
// drop those pending checkpoints that are at prior to the completed one
dropSubsumedCheckpoints(checkpointId);
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
// send the "notify complete" call to all vertices
final long timestamp = completedCheckpoint.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}
}
}
