Flink源碼分析系列文檔目錄
請點(diǎn)擊:Flink 源碼分析系列文檔目錄
從JobGraph到ExecutionGraph
JobGraph通過Dispatcher.submitJob方法提交。這是后續(xù)流程的入口方法。該方法調(diào)用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob。
Dispatcher.persistAndRunJob方法存儲并執(zhí)行作業(yè)。如下所示:
private void persistAndRunJob(JobGraph jobGraph) throws Exception {
jobGraphWriter.putJobGraph(jobGraph);
runJob(jobGraph, ExecutionType.SUBMISSION);
}
Dispatcher.runJob接收J(rèn)obGraph和執(zhí)行類型兩個參數(shù)。執(zhí)行類型有兩種:提交任務(wù)(SUBMISSION)和恢復(fù)任務(wù)(RECOVERY)。
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
// 確保JobID對應(yīng)的這個作業(yè)目前不在運(yùn)行狀態(tài),避免重復(fù)提交
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
// 獲取啟動時時間戳
long initializationTimestamp = System.currentTimeMillis();
// 這里將JobManagerRunner創(chuàng)建出來
// JobManagerRunner接下來會構(gòu)造出JobManager
CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
createJobManagerRunner(jobGraph, initializationTimestamp);
// 包裝JobGraph相關(guān)信息供Dispatcher使用
DispatcherJob dispatcherJob =
DispatcherJob.createFor(
jobManagerRunnerFuture,
jobGraph.getJobID(),
jobGraph.getName(),
initializationTimestamp);
// 將當(dāng)前作業(yè)的ID加入runningJob集合
// 表示當(dāng)前作業(yè)已處于運(yùn)行狀態(tài)
runningJobs.put(jobGraph.getJobID(), dispatcherJob);
final JobID jobId = jobGraph.getJobID();
// 處理Job派發(fā)結(jié)果
final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
dispatcherJob
.getResultFuture()
.handleAsync(
(dispatcherJobResult, throwable) -> {
Preconditions.checkState(
runningJobs.get(jobId) == dispatcherJob,
"The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
if (dispatcherJobResult != null) {
return handleDispatcherJobResult(
jobId, dispatcherJobResult, executionType);
} else {
return dispatcherJobFailed(jobId, throwable);
}
},
getMainThreadExecutor());
// 作業(yè)停止的時候,將JobID從runningJob中移除
final CompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture
.thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
.thenCompose(Function.identity());
// 將作業(yè)ID和對應(yīng)的作業(yè)停止future加入到dispatcherJobTerminationFutures集合維護(hù)
FutureUtils.assertNoException(jobTerminationFuture);
registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
}
接下來是Dispatcher.createJobManagerRunner方法。
JobManager在Dispatcher中被創(chuàng)建出來,然后啟動。創(chuàng)建JobManager的邏輯在createJobManagerRunner方法中,如下所示:
CompletableFuture<JobManagerRunner> createJobManagerRunner(
JobGraph jobGraph, long initializationTimestamp) {
final RpcService rpcService = getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
// 使用工廠類創(chuàng)建JobManager
// 傳入了JobGraph和高可用服務(wù)
JobManagerRunner runner =
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(
jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 啟動JobManager
// 實(shí)際上為啟動JobManager的leader選舉服務(wù),選出JM主節(jié)點(diǎn)
runner.start();
return runner;
} catch (Exception e) {
throw new CompletionException(
new JobInitializationException(
jobGraph.getJobID(),
"Could not instantiate JobManager.",
e));
}
},
ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
// JobManager creation
}
此時,JobManager開始進(jìn)行l(wèi)eader競選活動。為了確保JobManager不存在單點(diǎn)故障問題,F(xiàn)link設(shè)計(jì)了JobManager 高可用,可以同時運(yùn)行多個JobManager實(shí)例。在Standalone部署方式中,JobManager的競選通過Zookeeper來實(shí)現(xiàn)。Yarn集群模式下則通過Yarn的ApplicationMaster失敗后自動重啟動方式來確保JobManager的高可用。有關(guān)leader選舉的內(nèi)容請參見Flink 源碼之leader選舉(Zookeeper方式)。
一旦leader JM被選舉出來,選舉服務(wù)會調(diào)用對應(yīng)JM的grantLeadership方法。該方法內(nèi)容如下所示:
@Override
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.debug(
"JobManagerRunner cannot be granted leadership because it is already shut down.");
return;
}
leadershipOperation =
leadershipOperation.thenRun(
ThrowingRunnable.unchecked(
() -> {
synchronized (lock) {
// 主要邏輯是這個
// 檢查作業(yè)調(diào)度狀態(tài)并啟動JobManager
verifyJobSchedulingStatusAndStartJobManager(
leaderSessionID);
}
}));
handleException(leadershipOperation, "Could not start the job manager.");
}
}
接著我們跟蹤到verifyJobSchedulingStatusAndStartJobManager方法。
@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
throws FlinkException {
// 如果JobManager已停止,直接返回
if (shutdown) {
log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
return;
}
// 從JobRegistry中獲取Job調(diào)度狀態(tài)
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
getJobSchedulingStatus();
// 如果作業(yè)已執(zhí)行完畢
// 調(diào)用作業(yè)執(zhí)行完畢邏輯(實(shí)際上是作業(yè)未被當(dāng)前JobManager完成運(yùn)行的邏輯)
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
jobAlreadyDone();
} else {
// 啟動JobMaster
startJobMaster(leaderSessionId);
}
}
現(xiàn)在邏輯流轉(zhuǎn)到了JobManagerRunnerImpl.startJobMaster方法。
該方法啟動JobMaster。注冊JobGraph,啟動JobMaster服務(wù)并確認(rèn)該JobMaster為leader。
@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException {
log.info(
"JobManager runner for job {} ({}) was granted leadership with session id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
leaderSessionId);
try {
// 注冊JobGraph
// 根據(jù)集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存儲JobID
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
throw new FlinkException(
String.format(
"Failed to set the job %s to running in the running jobs registry.",
jobGraph.getJobID()),
e);
}
// 啟動JobMaster服務(wù)
startJobMasterServiceSafely(leaderSessionId);
// 確認(rèn)該JobMaster是leader狀態(tài)
if (jobMasterService != null) {
confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
}
}
JobManagerRunnerImpl.startJobMasterServiceSafely緊接著通過
DefaultJobMasterServiceFactory.createJobMasterService方法,創(chuàng)建出JobMaster并啟動他的Rpc通信服務(wù)。
接下來。在JobMaster構(gòu)造函數(shù)中存在構(gòu)建Flink作業(yè)任務(wù)調(diào)度器的邏輯。JobMaster.createScheduler方法調(diào)用了
DefaultSlotPoolServiceSchedulerFactory.createScheduler創(chuàng)建Flink的調(diào)度器。該方法又調(diào)用了Scheduler工廠類的創(chuàng)建Scheduler實(shí)例這個方法DefaultSchedulerFactory.createInstance。
接下來的流程到了DefaultScheduler中。DefaultScheduler是Flink作業(yè)調(diào)度器的默認(rèn)實(shí)現(xiàn)。它繼承了SchedulerBase,SchedulerBase又實(shí)現(xiàn)了SchedulerNG接口。
SchedulerBase構(gòu)造函數(shù)中調(diào)用了createAndRestoreExecutionGraph方法。
SchedulerBase.createAndRestoreExecutionGraph代碼如下所示:
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
JobStatusListener jobStatusListener)
throws Exception {
// 創(chuàng)建ExecutionGraph
ExecutionGraph newExecutionGraph =
createExecutionGraph(
currentJobManagerJobMetricGroup,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp);
// 獲取ExecutionGraph中創(chuàng)建出的CheckpointCoordinator
// 創(chuàng)建CheckpointCoordinator的過程后面章節(jié)有說明
final CheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
// 檢查是否存在一個最近的checkpoint
if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
// check whether we can restore from a savepoint
// 如果有,嘗試從這個檢查點(diǎn)恢復(fù)
tryRestoreExecutionGraphFromSavepoint(
newExecutionGraph, jobGraph.getSavepointRestoreSettings());
}
}
// 設(shè)置任務(wù)失敗監(jiān)聽器
newExecutionGraph.setInternalTaskFailuresListener(
new UpdateSchedulerNgOnInternalFailuresListener(this));
// 設(shè)置作業(yè)狀態(tài)監(jiān)聽器
newExecutionGraph.registerJobStatusListener(jobStatusListener);
// 設(shè)置JobMaster的主線程ThreadExecutor
newExecutionGraph.start(mainThreadExecutor);
return newExecutionGraph;
}
SchedulerBase.createExecutionGraph方法調(diào)用DefaultExecutionGraphBuilder,創(chuàng)建出ExecutionGraph。代碼如下所示:
private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp)
throws JobExecutionException, JobException {
// 創(chuàng)建Execution部署監(jiān)聽器
ExecutionDeploymentListener executionDeploymentListener =
new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
//創(chuàng)建Execution狀態(tài)更新監(jiān)聽器
ExecutionStateUpdateListener executionStateUpdateListener =
(execution, newState) -> {
if (newState.isTerminal()) {
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
}
};
// 創(chuàng)建ExecutionGraph
return DefaultExecutionGraphBuilder.buildGraph(
jobGraph,
jobMasterConfiguration,
futureExecutor,
ioExecutor,
userCodeLoader,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
rpcTimeout,
currentJobManagerJobMetricGroup,
blobWriter,
log,
shuffleMaster,
partitionTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
jobGraph.getJobType()),
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
new DefaultVertexAttemptNumberStore());
}
ExecutionGraph相關(guān)概念
ExecutionGraph為Flink作業(yè)的物理執(zhí)行計(jì)劃。用來協(xié)調(diào)數(shù)據(jù)流的分布式執(zhí)行過程。
和StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。
從ExecutionGraph也有頂點(diǎn)(Vertex)的概念,ExecutionGraph中的vertex為ExecutionJobVertex,和JobGraph中的JobVertex對應(yīng)。從ExecutionGraph到JobGraph的過程中加入了并行度的概念,ExecutionJobVertex包含了與之對應(yīng)的JobVertex中所有的并行任務(wù)。ExecutionJobVertex之中每一個并行的任務(wù)由ExecutionVertex代表。也就是說一個ExecutionJobVertex具有多少并行度,它下面就包含多少個ExecutionVertex。
ExecutionVertex可以被執(zhí)行一次或多次(由于任務(wù)恢復(fù),重計(jì)算或更新配置)ExecutionVertex的每一次執(zhí)行都會生成一個Execution對象。Execution負(fù)責(zé)跟蹤ExecutionVertex的任務(wù)執(zhí)行狀態(tài)變化和資源使用狀況。
IntermediateResult和JobGraph中JobVertex的IntermediateDataSet的概念對應(yīng),用于表示兩個相鄰的ExecutionJobVertex之間數(shù)據(jù)傳輸過程中的臨時存放點(diǎn)。IntermediateResult在ExecutionJobVertex創(chuàng)建的時候被構(gòu)建出來,數(shù)量和該vertex的并行度一致。
DefaultExecutionGraphBuilder的buildGraph方法
public static ExecutionGraph buildGraph(
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
ClassLoader classLoader,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
Time rpcTimeout,
MetricGroup metrics,
BlobWriter blobWriter,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
ExecutionDeploymentListener executionDeploymentListener,
ExecutionStateUpdateListener executionStateUpdateListener,
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore)
throws JobExecutionException, JobException {
checkNotNull(jobGraph, "job graph cannot be null");
// 獲取作業(yè)名稱和作業(yè)ID
final String jobName = jobGraph.getName();
final JobID jobId = jobGraph.getJobID();
// 創(chuàng)建JobInformation
// JobInformation為ExecutionGraph中的job相關(guān)配置信息的封裝類
final JobInformation jobInformation =
new JobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
// 獲取保留在歷史記錄中的最大重試次數(shù)
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
// 獲取IntermediateResultPartition釋放策略工廠類
final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
jobManagerConfig);
// create a new execution graph, if none exists so far
// 創(chuàng)建ExecutionGraph,后面章節(jié)分析
final DefaultExecutionGraph executionGraph;
try {
executionGraph =
new DefaultExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
maxPriorAttemptsHistoryLength,
classLoader,
blobWriter,
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
partitionLocationConstraint,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
// set the basic properties
try {
// 設(shè)置json格式的執(zhí)行計(jì)劃
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
} catch (Throwable t) {
log.warn("Cannot create JSON plan for job", t);
// give the graph an empty plan
// 如果根據(jù)jobGraph生成json執(zhí)行計(jì)劃失敗,設(shè)置一個空的執(zhí)行計(jì)劃
executionGraph.setJsonPlan("{}");
}
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
for (JobVertex vertex : jobGraph.getVertices()) {
// 獲取節(jié)點(diǎn)調(diào)用的類名,即節(jié)點(diǎn)的task
String executableClass = vertex.getInvokableClassName();
// 確保每個節(jié)點(diǎn)的調(diào)用類必須存在
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(
jobId,
"The vertex "
+ vertex.getID()
+ " ("
+ vertex.getName()
+ ") has no invokable class.");
}
try {
// 根據(jù)不同的節(jié)點(diǎn)類型,調(diào)用job啟動時節(jié)點(diǎn)的任務(wù)邏輯
vertex.initializeOnMaster(classLoader);
} catch (Throwable t) {
throw new JobExecutionException(
jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
t);
}
}
log.info(
"Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
// topologically sort the job vertices and attach the graph to the existing one
// 按照拓?fù)浣Y(jié)構(gòu)(數(shù)據(jù)流的順序)排序,獲取所有的Job頂點(diǎn)
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug(
"Adding {} vertices from job graph {} ({}).",
sortedTopology.size(),
jobName,
jobId);
}
// executionGraph綁定所有的Job節(jié)點(diǎn)
executionGraph.attachJobGraph(sortedTopology);
if (log.isDebugEnabled()) {
log.debug(
"Successfully created execution graph from job graph {} ({}).", jobName, jobId);
}
// configure the state checkpointing
// 配置checkpoint
// 如果啟用了checkpoint
if (isCheckpointingEnabled(jobGraph)) {
// 從JobGraph獲取checkpoint的配置
// snapshotSettings的配置位于StreamingJobGraphGenerator
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
// 獲取所有觸發(fā)checkpoint的頂點(diǎn),即所有的數(shù)據(jù)輸入頂點(diǎn)
List<ExecutionJobVertex> triggerVertices =
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
// 獲取所有需要checkpoint確認(rèn)的頂點(diǎn),即所有的頂點(diǎn)
List<ExecutionJobVertex> ackVertices =
idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
// 獲取所有需要接收到提交checkpoint信息的頂點(diǎn),即所有的頂點(diǎn)
List<ExecutionJobVertex> confirmVertices =
idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
// Maximum number of remembered checkpoints
// 獲取歷史記錄checkpoint最大數(shù)量
int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
// 創(chuàng)建checkpoint狀態(tài)跟蹤器,和CheckpointCoordinator配合工作
CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(
historySize,
ackVertices,
snapshotSettings.getCheckpointCoordinatorConfiguration(),
metrics);
// load the state backend from the application settings
// 獲取狀態(tài)后端的配置
final StateBackend applicationConfiguredBackend;
final SerializedValue<StateBackend> serializedAppConfigured =
snapshotSettings.getDefaultStateBackend();
if (serializedAppConfigured == null) {
applicationConfiguredBackend = null;
} else {
try {
// 根據(jù)應(yīng)用的配置獲取狀態(tài)后端
applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId, "Could not deserialize application-defined state backend.", e);
}
}
final StateBackend rootBackend;
try {
// 獲取狀態(tài)后端配置
// 如果應(yīng)用的狀態(tài)后端沒有配置,使用配置文件中的狀態(tài)后端
// 如果配置文件中也沒有,使用默認(rèn)值
rootBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(
applicationConfiguredBackend, jobManagerConfig, classLoader, log);
} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
throw new JobExecutionException(
jobId, "Could not instantiate configured state backend", e);
}
// load the checkpoint storage from the application settings
// 從app設(shè)置中加載checkpoint存儲配置
final CheckpointStorage applicationConfiguredStorage;
final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
snapshotSettings.getDefaultCheckpointStorage();
if (serializedAppConfiguredStorage == null) {
applicationConfiguredStorage = null;
} else {
try {
applicationConfiguredStorage =
serializedAppConfiguredStorage.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId,
"Could not deserialize application-defined checkpoint storage.",
e);
}
}
// 和狀態(tài)后端的配置類似,從應(yīng)用配置和flink配置文件中加載checkpoint存儲配置
final CheckpointStorage rootStorage;
try {
rootStorage =
CheckpointStorageLoader.load(
applicationConfiguredStorage,
null,
rootBackend,
jobManagerConfig,
classLoader,
log);
} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
throw new JobExecutionException(
jobId, "Could not instantiate configured checkpoint storage", e);
}
// instantiate the user-defined checkpoint hooks
// 實(shí)例化用戶定義的checkpoint鉤子
// 這些鉤子可以在恢復(fù)快照或者是觸發(fā)快照的時候執(zhí)行
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
snapshotSettings.getMasterHooks();
final List<MasterTriggerRestoreHook<?>> hooks;
if (serializedHooks == null) {
hooks = Collections.emptyList();
} else {
final MasterTriggerRestoreHook.Factory[] hookFactories;
try {
hookFactories = serializedHooks.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId, "Could not instantiate user-defined checkpoint hooks", e);
}
final Thread thread = Thread.currentThread();
final ClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(classLoader);
try {
hooks = new ArrayList<>(hookFactories.length);
for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
}
} finally {
thread.setContextClassLoader(originalClassLoader);
}
}
// 獲取checkpoint協(xié)調(diào)器的配置
final CheckpointCoordinatorConfiguration chkConfig =
snapshotSettings.getCheckpointCoordinatorConfiguration();
// 為executionGraph應(yīng)用checkpoint的相關(guān)配置
executionGraph.enableCheckpointing(
chkConfig,
triggerVertices,
ackVertices,
confirmVertices,
hooks,
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
checkpointStatsTracker,
checkpointsCleaner);
}
// create all the metrics for the Execution Graph
// 創(chuàng)建相關(guān)監(jiān)控項(xiàng),監(jiān)控任務(wù)運(yùn)行時間,重啟時間和停止時間
metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
return executionGraph;
}
創(chuàng)建ExecutionGraph的主要步驟大致如下:
- 獲取Job信息
- 創(chuàng)建ExecutionGraph本體
- 綁定JobGraph頂點(diǎn)
- 設(shè)置Checkpoint配置
- 設(shè)置狀態(tài)后端配置
- 設(shè)置Checkpoint存儲
- 設(shè)置Checkpoint鉤子
- 設(shè)置作業(yè)監(jiān)控
ExecutionGraph構(gòu)造函數(shù)
- jobInformation:作業(yè)信息的一個封裝,包含作業(yè)id,名稱,配置,用戶代碼和classpath等。
- futureExecutor:異步執(zhí)行線程池。
- ioExecutor:IO操作線程池。
- rpcTimeout:RPC調(diào)用超時時間。
- maxPriorAttemptsHistoryLength:保留在歷史記錄中的最大重試次數(shù)。
- classLoader:用戶代碼類加載器。
- blobWriter:用于將數(shù)據(jù)寫入blob server。
- partitionReleaseStrategyFactory:IntermediateResultPartition釋放策略工廠類。
- shuffleMaster:用于注冊IntermediateResultPartition(中間結(jié)果分區(qū)),向JobMaster注冊數(shù)據(jù)分區(qū)及它的生產(chǎn)者。
- partitionTracker:用于追蹤和釋放分區(qū)。
- partitionLocationConstraint:限制在部署的時候partition的位置可否未知。在批模式,分區(qū)未知可以未知,但是在流模式,分區(qū)位置必須是已知的。
- executionDeploymentListener:執(zhí)行計(jì)劃部署監(jiān)聽器
- executionStateUpdateListener:執(zhí)行計(jì)劃更新監(jiān)聽器
- initializationTimestamp:初始時間戳
- vertexAttemptNumberStore:用于儲存每個Job頂點(diǎn)重試次數(shù)
attachJobGraph方法
該方法將JobGraph綁定到ExecutionGraph。
@Override
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
// 檢查在JobMaster主線程執(zhí)行
assertRunningInJobMasterMainThread();
LOG.debug(
"Attaching {} topologically sorted vertices to existing job graph with {} "
+ "vertices and {} intermediate results.",
topologiallySorted.size(),
tasks.size(),
intermediateResults.size());
// 創(chuàng)建保存Execution作業(yè)頂點(diǎn)的集合
final ArrayList<ExecutionJobVertex> newExecJobVertices =
new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
for (JobVertex jobVertex : topologiallySorted) {
// 如果有頂點(diǎn)是數(shù)據(jù)輸入頂點(diǎn)并且是無法停止的頂點(diǎn)
// 則設(shè)置ExecutionGraph的數(shù)據(jù)源task屬性為無法停止
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// create the execution job vertex and attach it to the graph
// 創(chuàng)建出ExecutionJobVertex
ExecutionJobVertex ejv =
new ExecutionJobVertex(
this,
jobVertex,
maxPriorAttemptsHistoryLength,
rpcTimeout,
createTimestamp,
this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
// 設(shè)置前一個節(jié)點(diǎn)的IntermediateResult給當(dāng)前ejv
// 完成連接到前置節(jié)點(diǎn)這個邏輯,即這個方法名的含義
ejv.connectToPredecessors(this.intermediateResults);
// 將job頂點(diǎn)ID和ejv作為鍵值對,放入ExecutionGraph
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
// 如果previousTask不為空,說明兩個JobGraph的頂點(diǎn)具有相同的ID,為異常情況
if (previousTask != null) {
throw new JobException(
String.format(
"Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
// 遍歷ejv中創(chuàng)建的IntermediateResult
// 該IntermediateResult在ExecutionJobVertex構(gòu)造函數(shù)創(chuàng)建
// 從ejv對應(yīng)JobVertex的IntermediateDataSets創(chuàng)建出IntermediateResult
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet =
this.intermediateResults.putIfAbsent(res.getId(), res);
// 同理,檢查result的ID不能重復(fù)
if (previousDataSet != null) {
throw new JobException(
String.format(
"Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
// 該集合按照頂點(diǎn)創(chuàng)建順序保存ejv,將ejv保存起來
this.verticesInCreationOrder.add(ejv);
// 統(tǒng)計(jì)總的頂點(diǎn)數(shù),作業(yè)實(shí)際執(zhí)行的時候,每個并行度都會部署一個vertex運(yùn)行task
// 因此需要累加各個ejv的并行度
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
// 注冊所有的ExecutionVertex和它輸出數(shù)據(jù)的IntermediateResultPartition
// ExecutionVertex為物理執(zhí)行節(jié)點(diǎn),一個ExecutionJobVertex有多少并行度,就會包含多少個ExecutionVertex
registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);
// the topology assigning should happen before notifying new vertices to failoverStrategy
// 創(chuàng)建執(zhí)行拓?fù)? //執(zhí)行拓?fù)浒蠩xecutionVertex,ResultPartition以及PipelinedRegion
executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
// 創(chuàng)建分區(qū)釋放策略
partitionReleaseStrategy =
partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}
Pipelined Region
創(chuàng)建executionTopology我們會遇到Pipelined region。在解釋這個概念前需要了解下pipelined result和blocking result的區(qū)別。
Pipelined result指的是數(shù)據(jù)從管道中源源不斷的流出,下游可以連續(xù)消費(fèi)產(chǎn)生的數(shù)據(jù)。一旦上游產(chǎn)生數(shù)據(jù),下游就可以立即開始消費(fèi)。Pipelined result生產(chǎn)數(shù)據(jù)的過程永遠(yuǎn)不會停止。此類型對應(yīng)的作業(yè)為流計(jì)算作業(yè)。
Blocking result只能等到上游數(shù)據(jù)生產(chǎn)過程結(jié)束的時候才可以消費(fèi)。Blocking result永遠(yuǎn)是有限的。典型的場景是批處理作業(yè)。
Pipelined Region是ExecutionGraph的一部分。它包含連續(xù)多個pipeline類型數(shù)據(jù)交換task(生成pipelined result)。因此一個ExecutionGraph可被分隔為多個pipelined Region,他們之間有block類型作業(yè)相連接。
Pipelined Region的意義是region內(nèi)部的所有消費(fèi)者必須持續(xù)消費(fèi)上游生產(chǎn)者產(chǎn)生的數(shù)據(jù),從而避免阻塞上游,或者產(chǎn)生反壓。所以說同一個pipelined Region內(nèi)的所有task啟動或失敗之時都必須同時被調(diào)度或重啟。
Pipelined region調(diào)度的官網(wǎng)解釋請參見:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html
enableCheckpointing方法
該方法為ExecutionGraph初始化檢查點(diǎn)相關(guān)配置。主要邏輯是創(chuàng)建和配置CheckpointCoordinator對象。代碼如下所示:
@Override
public void enableCheckpointing(
CheckpointCoordinatorConfiguration chkConfig,
List<MasterTriggerRestoreHook<?>> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStorage checkpointStorage,
CheckpointStatsTracker statsTracker,
CheckpointsCleaner checkpointsCleaner) {
// 檢查作業(yè)必須處于已創(chuàng)建狀態(tài)
checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
// 檢查CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器)必須未創(chuàng)建,避免重復(fù)操作
checkState(checkpointCoordinator == null, "checkpointing already enabled");
// 收集各個ExecutionJobVertex的OperatorCoordinator
// OperatorCoordinator運(yùn)行在JobManager,與作業(yè)vertex中的operator相關(guān)聯(lián)。用于和operator交互
final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators =
buildOpCoordinatorCheckpointContexts();
checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
// 創(chuàng)建checkpoint失敗管理器,負(fù)責(zé)在checkpoint失敗時候調(diào)用處理邏輯
CheckpointFailureManager failureManager =
new CheckpointFailureManager(
chkConfig.getTolerableCheckpointFailureNumber(),
new CheckpointFailureManager.FailJobCallback() {
@Override
public void failJob(Throwable cause) {
getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
}
@Override
public void failJobDueToTaskFailure(
Throwable cause, ExecutionAttemptID failingTask) {
getJobMasterMainThreadExecutor()
.execute(
() ->
failGlobalIfExecutionIsStillRunning(
cause, failingTask));
}
});
// 創(chuàng)建CheckpointCoordinator周期自動觸發(fā)checkpoint的定時器
checkState(checkpointCoordinatorTimer == null);
checkpointCoordinatorTimer =
Executors.newSingleThreadScheduledExecutor(
new DispatcherThreadFactory(
Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
// 創(chuàng)建CheckpointCoordinator,負(fù)責(zé)協(xié)調(diào)整個集群范圍內(nèi)所有operator的checkpoint操作,發(fā)起checkpoint操作和提交checkpoint
// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator =
new CheckpointCoordinator(
jobInformation.getJobId(),
chkConfig,
operatorCoordinators,
checkpointIDCounter,
checkpointStore,
checkpointStorage,
ioExecutor,
checkpointsCleaner,
new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
SharedStateRegistry.DEFAULT_FACTORY,
failureManager,
createCheckpointPlanCalculator(),
new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
// register the master hooks on the checkpoint coordinator
// 設(shè)置主消息鉤子,在創(chuàng)建checkpoint或從checkpoint恢復(fù)的時候回調(diào)
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
if (!checkpointCoordinator.addMasterHook(hook)) {
LOG.warn(
"Trying to register multiple checkpoint hooks with the name: {}",
hook.getIdentifier());
}
}
// 配置checkpoint狀態(tài)跟蹤器
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max
// value
// 如果沒有禁用周期性觸發(fā)checkpoint,注冊一個作業(yè)狀態(tài)監(jiān)聽器
// 該listener為CheckpointCoordinator所用,監(jiān)聽作業(yè)狀態(tài)的變化
if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}
// 配置狀態(tài)后端名稱和checkpoint存儲名稱
this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
}
從ExecutionGraph到部署Task
ExecutionVertex每次執(zhí)行都會創(chuàng)建出一個Execution對象。
在JobManager啟動完畢之后,會對Scheduler發(fā)出開始調(diào)度的命令(調(diào)用SchedulerBase的startScheduling方法)。經(jīng)過中間層層調(diào)用(較為復(fù)雜,這里暫時省略),最終到達(dá)Execution.deploy方法。
Execution.deploy方法為真正的部署運(yùn)行邏輯,根據(jù)task資源和ExecutionVertex構(gòu)造出一個task部署描述符。這個部署描述符的作用為攜帶task執(zhí)行配置,通過RPC的方式傳遞給TaskManager,從而創(chuàng)建出一個符合要求的task。
public void deploy() throws JobException {
// 確保在JobMaster主線程執(zhí)行
assertRunningInJobMasterMainThread();
// 獲取分配的資源
final LogicalSlot slot = assignedResource;
checkNotNull(
slot,
"In order to deploy the execution we first have to assign a resource via tryAssignResource.");
// Check if the TaskManager died in the meantime
// This only speeds up the response to TaskManagers failing concurrently to deployments.
// The more general check is the rpcTimeout of the deployment call
if (!slot.isAlive()) {
throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
}
// make sure exactly one deployment call happens from the correct state
// note: the transition from CREATED to DEPLOYING is for testing purposes only
// 獲取之前的狀態(tài),并切換狀態(tài)為正在部署(DEPLOYING)
ExecutionState previous = this.state;
if (previous == SCHEDULED || previous == CREATED) {
if (!transitionState(previous, DEPLOYING)) {
// race condition, someone else beat us to the deploying call.
// this should actually not happen and indicates a race somewhere else
throw new IllegalStateException(
"Cannot deploy task: Concurrent deployment call race.");
}
} else {
// vertex may have been cancelled, or it was already scheduled
throw new IllegalStateException(
"The vertex must be in CREATED or SCHEDULED state to be deployed. Found state "
+ previous);
}
// 檢查這個slot資源是否分配給了當(dāng)前execution
if (this != slot.getPayload()) {
throw new IllegalStateException(
String.format(
"The execution %s has not been assigned to the assigned slot.", this));
}
try {
// race double check, did we fail/cancel and do we need to release the slot?
// 再次檢查作業(yè)狀態(tài)是否為正在部署
if (this.state != DEPLOYING) {
slot.releaseSlot(
new FlinkException(
"Actual state of execution "
+ this
+ " ("
+ state
+ ") does not match expected state DEPLOYING."));
return;
}
LOG.info(
"Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",
vertex.getTaskNameWithSubtaskIndex(),
attemptNumber,
vertex.getCurrentExecutionAttempt().getAttemptId(),
getAssignedResourceLocation(),
slot.getAllocationId());
// 創(chuàng)建一個Task部署描述符
// 該部署描述符攜帶了ExecutionVertex及其分配的資源等信息
final TaskDeploymentDescriptor deployment =
TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
.createDeploymentDescriptor(
slot.getAllocationId(),
taskRestore,
producedPartitions.values());
// null taskRestore to let it be GC'ed
taskRestore = null;
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
getVertex().notifyPendingDeployment(this);
// We run the submission in the future executor so that the serialization of large TDDs
// does not block
// the main thread and sync back to the main thread once submission is completed.
// 在這里,異步調(diào)用taskManagerGateway,通過rpc方式通知TaskManager
// 將Task部署描述符發(fā)送給TaskManager
// TaskManager接收到后開始部署Task
CompletableFuture.supplyAsync(
() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
if (failure == null) {
vertex.notifyCompletedDeployment(this);
} else {
if (failure instanceof TimeoutException) {
String taskname =
vertex.getTaskNameWithSubtaskIndex()
+ " ("
+ attemptId
+ ')';
markFailed(
new Exception(
"Cannot deploy task "
+ taskname
+ " - TaskManager ("
+ getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of "
+ rpcTimeout,
failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
} catch (Throwable t) {
markFailed(t);
}
}
上面方法中通過TaskManagerGateway調(diào)用了TaskExecutor的submitTask方法。
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
// ...
// 創(chuàng)建出Task
Task task =
new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
// ...
}
到這里為止,TaskManager中的具體任務(wù)Task對象已經(jīng)被創(chuàng)建出來。從JobGraph生成ExecutionGraph并最終部署為Task的過程已分析完畢。
本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。