Flink 源碼筆記 --- checkpoint
接上篇內(nèi)容
Flink-1.10 源碼筆記 checkpint - 2
Snapshot確認(rèn)消息發(fā)送
在AsyncCheckpointRunnable.run()中調(diào)用了reportCompletedSnapshotStates方法負(fù)責(zé)報(bào)告 snapshot,在這里taskEnvironment調(diào)用了getTaskStateManager 獲取TaskStateManager(任務(wù)狀態(tài)管理器),TaskStateManager是TaskStateManagerImpl類型,后調(diào)用reportTaskStateSnapshots進(jìn)行上報(bào)狀態(tài)
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) { //快照持續(xù)時(shí)間
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
Preconditions.checkState(hasAckState || !hasLocalState,
"Found cached state but no corresponding primary state is reported to the job " +
"manager. This indicates a problem.");
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
//todo 上報(bào)任務(wù) 快照狀態(tài)
// 當(dāng)任務(wù)上觸發(fā)檢查點(diǎn)或保存點(diǎn)時(shí),它將為其擁有的所有流操作符實(shí)例創(chuàng)建快照。然后,通過(guò)此接口報(bào)告來(lái)自任務(wù)的所有操作符快照。
// 典型的實(shí)現(xiàn)將把報(bào)告的狀態(tài)信息分派并轉(zhuǎn)發(fā)給相關(guān)方,比如檢查點(diǎn)協(xié)調(diào)器或本地狀態(tài)存儲(chǔ)。
taskEnvironment.getTaskStateManager().reportTaskStateSnapshots( //改方法調(diào)用的 TaskStateManagerImpl實(shí)現(xiàn)了的方法
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
taskName, checkpointMetaData.getCheckpointId(), asyncDurationMillis);
LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
taskName, checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
}
進(jìn)入TaskStateManagerImpl類,該類實(shí)現(xiàn)了TaskStateManager接口
public class TaskStateManagerImpl implements TaskStateManager
現(xiàn)在查看reportTaskStateSnapshots方法,在該方法中,會(huì)先將快照狀態(tài)保存在本地,然后調(diào)用checkpointResponder的acknowledgeCheckpoint方法,發(fā)送消息
checkpointResponder是RpcCheckpointResponder類型
/**
* @param checkpointMetaData 檢查點(diǎn)請(qǐng)求的元數(shù)據(jù)。
* @param checkpointMetrics 檢查點(diǎn)的任務(wù)級(jí)別度量。 --指標(biāo)監(jiān)控
* @param acknowledgedState 報(bào)告的狀態(tài)表示向作業(yè)管理器應(yīng)答。
* @param localState 報(bào)告的狀態(tài)為本地恢復(fù)。
*/
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
//保存本地快照狀態(tài)
localStateStore.storeLocalState(checkpointId, localState);
//發(fā)送快照成功消息
checkpointResponder.acknowledgeCheckpoint( //調(diào)用RpcCheckpointResponder類中的方法
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
進(jìn)入到acknowledgeCheckpoint方法中主要調(diào)用了checkpointCoordinatorGateway的acknowledgeCheckpoint方法,該方法調(diào)用的是JobMaster的acknowledgeCheckpoint方法
@Override
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
//調(diào)用了同名方法
// JobMaster --> JobMasterGateway --> JobMasterOperatorEventGateway 以此實(shí)現(xiàn) JobMasterOperatorEventGateway為父接口
// jobMaster 是 任務(wù)管理器負(fù)責(zé)執(zhí)行單個(gè)任務(wù)的jobGraph
// RpcCheckpointResponder 本對(duì)象 在jobManager選舉成功的時(shí)候,建立和jobManager的聯(lián)系時(shí)候創(chuàng)建的
checkpointCoordinatorGateway.acknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
subtaskState);
}
JobMaster繼承了FencedRpcEndpoint,實(shí)現(xiàn)了JobMasterGateway和JobMasterService接口,
JobMasterGateway接口實(shí)現(xiàn)了JobMasterOperatorEventGateway接口
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService
進(jìn)入checkpointCoordinatorGateway.acknowledgeCheckpoint方法
/**
* @param jobID 正在運(yùn)行的作業(yè)的作業(yè)ID
* @param executionAttemptID 正在運(yùn)行的任務(wù)的執(zhí)行嘗試ID
* @param checkpointId 此檢查點(diǎn)的元數(shù)據(jù)
* @param checkpointMetrics 這個(gè)檢查點(diǎn)的度量
* @param checkpointState 檢查點(diǎn)的狀態(tài)句柄
*/
@Override
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
//todo schedulerNG 該對(duì)象, 負(fù)責(zé)調(diào)度Flink作業(yè)
//調(diào)用schedulerBase實(shí)現(xiàn)類的方法
schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
}
進(jìn)入schedulerNG的acknowledgeCheckpoint方法
在該方法中, 會(huì)獲取checkpointCoordinator(checkpoint協(xié)調(diào) 器),AcknowledgeCheckpoint對(duì)象,然后獲取獲取taskManager位置,最后調(diào)用checkpointCoordinator的receiveAcknowledgeMessage方法
@Override
public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
// 確保在主線程中運(yùn)行
mainThreadExecutor.assertRunningInMainThread();
//從執(zhí)行圖中 獲取checkpoint協(xié)調(diào)器
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
/*
該對(duì)象用于
此消息從{@link org.apache.flink.runtime.taskexecutor}發(fā)送到{@link org.apache.flink.runtime.jobmaster},以指示單個(gè)任務(wù)的檢查點(diǎn)已經(jīng)完成。
*/
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
//獲取taskManager位置
final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);
if (checkpointCoordinator != null) {
ioExecutor.execute(() -> {
try {
//該方法 接收確認(rèn)AcknowledgeCheckpoint消息,并返回該消息是否與某個(gè)掛起的檢查點(diǎn)相關(guān)聯(lián)。
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
}
現(xiàn)在進(jìn)入checkpointCoordinator的receiveAcknowledgeMessage方法
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
return false;
}
//獲取 檢查點(diǎn)id
final long checkpointId = message.getCheckpointId();
//對(duì)此處代碼添加線程鎖, 同一時(shí)刻為單線程跑
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
// 確保沒(méi)有關(guān)閉
if (shutdown) {
return false;
}
//獲取正在進(jìn)行的checkpoint
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
//acknowledgeTask 使用給定的執(zhí)行嘗試id和給定的子任務(wù)狀態(tài)來(lái)識(shí)別任務(wù)
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS: //成功
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
//確認(rèn)成功,并接收到了所有operator快照成功的確認(rèn)
if (checkpoint.areTasksFullyAcknowledged()) {
//調(diào)用改方法
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE: // 復(fù)制,副本
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
break;
case UNKNOWN: //未知的
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED: // 廢棄的
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
} else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
} else {
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
"{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
} else {
LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
在進(jìn)入該方法后,會(huì)先確認(rèn)正在運(yùn)行得checkpoint不等于null,并且不能是被丟棄的才會(huì)進(jìn)入邏輯,
首先看一下switch中調(diào)用的方法,調(diào)用的checkpoint的acknowledgeTask方法,
該方法主要根據(jù)任務(wù)的狀態(tài),返回一個(gè)操作結(jié)果,
public TaskAcknowledgeResult acknowledgeTask(
ExecutionAttemptID executionAttemptId,
TaskStateSnapshot operatorSubtaskStates,
CheckpointMetrics metrics) {
synchronized (lock) {
//如果棄用 ,返回棄用
if (discarded) {
return TaskAcknowledgeResult.DISCARDED;
}
// 從notYetAcknowledgedTasks集合中移除已確認(rèn)的task
// notYetAcknowledgedTasks保存了所有未確認(rèn)的task
final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
if (vertex == null) {
// 如果notYetAcknowledgedTasks沒(méi)有該task
// 但是它在acknowledgedTasks(已確認(rèn)的task)集合中
// 返回重復(fù)確認(rèn)DUPLICATE
if (acknowledgedTasks.contains(executionAttemptId)) {
return TaskAcknowledgeResult.DUPLICATE;
} else {
// 其他情況返回未知
return TaskAcknowledgeResult.UNKNOWN;
}
} else {
//如果 不等于null 將其添加至已確認(rèn)的task集合
acknowledgedTasks.add(executionAttemptId);
}
List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
int subtaskIndex = vertex.getParallelSubtaskIndex();
long ackTimestamp = System.currentTimeMillis();
long stateSize = 0L;
// 這段代碼為保存各個(gè)operator的snapshot狀態(tài)
if (operatorSubtaskStates != null) {
for (OperatorID operatorID : operatorIDs) {
//返回給定操作符id的子任務(wù)狀態(tài)(如果不包含則返回null)。
OperatorSubtaskState operatorSubtaskState =
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
// if no real operatorSubtaskState was reported, we insert an empty state
//如果沒(méi)獲取到 operatorSubtaskSate的狀態(tài) 給一個(gè)空狀態(tài)
if (operatorSubtaskState == null) {
operatorSubtaskState = new OperatorSubtaskState();
}
//獲取改 operator的狀態(tài)
OperatorState operatorState = operatorStates.get(operatorID);
if (operatorState == null) {
//如果 operator有狀態(tài), 那么給添加一個(gè)默認(rèn)的狀態(tài)
operatorState = new OperatorState(
operatorID,
vertex.getTotalNumberOfParallelSubtasks(),
vertex.getMaxParallelism());
//添加的狀態(tài)后 進(jìn)行put
operatorStates.put(operatorID, operatorState);
}
//對(duì) operator添加狀態(tài) 平行任務(wù)的索引 operator子任務(wù)狀態(tài)
operatorState.putState(subtaskIndex, operatorSubtaskState);
stateSize += operatorSubtaskState.getStateSize();
}
}
++numAcknowledgedTasks; //多少個(gè)確認(rèn)的任務(wù)
// publish the checkpoint statistics
// to prevent null-pointers from concurrent modification, copy reference onto stack
// 這段代碼為匯報(bào)所有子任務(wù)checkpoint狀態(tài)
final PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Do this in millis because the web frontend works with them
long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000;
SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
subtaskIndex,
ackTimestamp,
stateSize,
metrics.getSyncDurationMillis(),
metrics.getAsyncDurationMillis(),
alignmentDurationMillis,
checkpointStartDelayMillis);
statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
}
// 最后返回執(zhí)行成功
return TaskAcknowledgeResult.SUCCESS;
}
}
在switch語(yǔ)句中匹配到SUCCESS 會(huì)調(diào)用 completePendingCheckpoint方法, 在進(jìn)入方法前會(huì)先判斷checkpoint.areTasksFullyAcknowledged()方法,通過(guò)代碼可以看出,所有任務(wù)都被確認(rèn),并且checkpoint不是被棄用的才會(huì)君如completePendingCheckpoint方法
boolean areTasksFullyAcknowledged() {
// 為確認(rèn)的任務(wù) =空 不是廢棄的
return notYetAcknowledgedTasks.isEmpty() && !discarded;
}
現(xiàn)在跟著進(jìn)入completePendingCheckpoint方法
該方法, 會(huì)先將operator的狀態(tài)注冊(cè)到 注冊(cè)表中,然后回調(diào)用完成checkpoint的邏輯,到這里checkpoint的過(guò)程就已經(jīng)結(jié)束了
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
// As a first step to complete the checkpoint, we register its state with the registry
// 作為完成檢查點(diǎn)的第一步,我們?cè)谧?cè)表中注冊(cè)檢查點(diǎn)的狀態(tài)
// 注冊(cè)所有operator的state到sharedStateRegistry
Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
try {
try {
// 調(diào)用完成pendingCheckpoint的邏輯,
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
// 重置失敗checkpoint的計(jì)數(shù)
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
} catch (Exception e1) {
// abort the current pending checkpoint if we fails to finalize the pending checkpoint.
// 如果我們未能完成掛起檢查點(diǎn),則終止當(dāng)前掛起檢查點(diǎn)
if (!pendingCheckpoint.isDiscarded()) {
abortPendingCheckpoint(
pendingCheckpoint,
new CheckpointException(
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
}
throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
// the pending checkpoint must be discarded after the finalization
// 檢查狀態(tài),調(diào)用finalizeCheckpoint方法后pendingCheckpoint必須為discarded狀態(tài)
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);
try {
//存儲(chǔ)完成的checkpoint
completedCheckpointStore.addCheckpoint(completedCheckpoint);
} catch (Exception exception) {
// we failed to store the completed checkpoint. Let's clean up
// 未能存儲(chǔ)完成的檢查點(diǎn),那么進(jìn)行清理
executor.execute(new Runnable() {
@Override
public void run() {
try {
completedCheckpoint.discardOnFailedStoring();
} catch (Throwable t) {
LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
}
}
});
throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
}
} finally {
// 從正在進(jìn)行中checkpoint集合中移除此checkpoint
pendingCheckpoints.remove(checkpointId);
// 恢復(fù)暫停周期性觸發(fā)
resumePeriodicTriggering();
}
// 保存最近的checkpoint ID
rememberRecentCheckpointId(checkpointId);
// drop those pending checkpoints that are at prior to the completed one
// 在完成檢查點(diǎn)之前刪除那些掛起的檢查點(diǎn)
// 掛掉所有id小于checkpointId的checkpoint操作(被掛掉的checkpoint不能是強(qiáng)制的)
dropSubsumedCheckpoints(checkpointId);
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
// 保存此次checkpoint完成時(shí)間
lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
//Completed checkpoint 2 for job d36ca92c353c5fc9794d42fdff834b5d (9013 bytes in 106 ms).
// flink web監(jiān)控 輸出
LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Checkpoint state: ");
for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
builder.append(state);
builder.append(", ");
}
// Remove last two chars ", "
builder.setLength(builder.length() - 2);
LOG.debug(builder.toString());
}
// send the "notify complete" call to all vertices, coordinators, etc.
sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
}
最后我們看一下PendingCheckpoint如何生成CompletedCheckpoint的過(guò)程,首先會(huì)調(diào)用pendingCheckpoint.finalizeCheckpoint方法,在方法中會(huì)存儲(chǔ)checkpoint的元數(shù)據(jù)等操作,最終將pendingCheckpoint變成CompletedCheckpoint狀態(tài),在這之后會(huì)將pendingCheckpoint標(biāo)記為棄用的狀態(tài),返回完成的checkpoint
public CompletedCheckpoint finalizeCheckpoint() throws IOException {
synchronized (lock) {
checkState(!isDiscarded(), "checkpoint is discarded");
checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet");
// make sure we fulfill the promise with an exception if something fails
try {
// write out the metadata
//創(chuàng)建一個(gè) checkpointMetadata對(duì)象 --改對(duì)象封裝 檢查點(diǎn)或保存的元數(shù)據(jù)
final CheckpointMetadata savepoint = new CheckpointMetadata(checkpointId, operatorStates.values(), masterStates);
final CompletedCheckpointStorageLocation finalizedLocation;
// 保存checkpoint數(shù)據(jù)到文件系統(tǒng)
// createMetadataOutputStream 創(chuàng)建將檢查點(diǎn)元數(shù)據(jù)持久化到的輸出流
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
//在寫(xiě)入所有元數(shù)據(jù)后關(guān)閉流并完成檢查點(diǎn)位置
finalizedLocation = out.closeAndFinalizeCheckpoint();
}
// 創(chuàng)建一個(gè) CompletedCheckpoint對(duì)象
// CompletedCheckpoint描述在所有需要的任務(wù)確認(rèn)之后的檢查點(diǎn)(帶有它們的狀態(tài)),它被認(rèn)為是成功的。
// CompletedCheckpoint類包含檢查點(diǎn)的所有元數(shù)據(jù),即、檢查點(diǎn)ID、時(shí)間戳以及檢查點(diǎn)的所有狀態(tài)的句柄
CompletedCheckpoint completed = new CompletedCheckpoint(
jobId,
checkpointId,
checkpointTimestamp,
System.currentTimeMillis(),
operatorStates,
masterStates,
props,
finalizedLocation);
// completableFuture任務(wù)完成,返回completedCheckpoint
onCompletionPromise.complete(completed);
// to prevent null-pointers from concurrent modification, copy reference onto stack
// 設(shè)置completedCheckpoint的discardCallback
PendingCheckpointStats statsCallback = this.statsCallback;
if (statsCallback != null) {
// Finalize the statsCallback and give the completed checkpoint a
// callback for discards.
CompletedCheckpointStats.DiscardCallback discardCallback =
statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
//設(shè)置當(dāng)丟棄此檢查點(diǎn)時(shí)用于跟蹤的回調(diào)
completed.setDiscardCallback(discardCallback);
}
// mark this pending checkpoint as disposed, but do NOT drop the state
// 標(biāo)記自己為disposed狀態(tài)
dispose(false);
return completed;
}
catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
}
完成了pendingCheckpoint向completedCheckpoint轉(zhuǎn)換后,會(huì)調(diào)用failureManager.handleCheckpointSuccess方法
這個(gè)方法主要是重置checkpoint失敗的計(jì)數(shù)
public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) {
clearCount();
}
checkpoint往期內(nèi)容
Flink-1.10 源碼筆記 checkpint - 1
Flink-1.10 源碼筆記 checkpint - 2
如有錯(cuò)誤,歡迎指正!