Flink 源碼:StreamTask 介紹及初始化過程詳解

本文僅為筆者平日學(xué)習(xí)記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/GuA9o09EEue66fEpGgoGaQ

本文是 Flink 源碼解析系列,通過閱讀本文你能 get 到以下點(diǎn):

  • StreamTask 類的基本功能及其職責(zé)
  • StreamTask 初始化詳細(xì)流程
  • StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系

這里先留一個(gè)思考題:如下代碼所示,開發(fā) Flink Job 時(shí) dataStream keyBy 后連續(xù)跟 map、filter、flatMap 三個(gè)算子,請(qǐng)問這三個(gè)自定義的 Function 內(nèi)都可以使用 Flink 的 ValueState 嗎?

dataStream.keyby(_._1)
  .map(new MyMapFunction())
  .filter(new MyFilterFunction())
  .flatMap(new MyFlatMapFunction())

一、 StreamTask 介紹

Flink 中數(shù)據(jù)的整個(gè)處理流程都是圍繞 StreamTask 來(lái)做的,所以先介紹一下 StreamTask 這個(gè)類。StreamTask 類的 doc 如下所示:

StreamTask 類 doc

注釋的大概意思是:StreamTask 是所有 Streaming Task 的基類,是由 TM 部署并執(zhí)行的本地處理單元。每個(gè) StreamTask 運(yùn)行一個(gè)或多個(gè) Chain 在一起的 StreamOperator,這些 Operator 將會(huì)在一個(gè)線程內(nèi)同步執(zhí)行。常見的 case:map、flatmap、filter 三個(gè)算子連續(xù)的算子。

通俗的講,StreamTask 就是對(duì)應(yīng)一個(gè) subtask 實(shí)例。如下圖 Job 的 ExecutionGraph 所示,Source 算子和 map 算子 Chain 在一起,組成一個(gè) OperatorChain,所以這兩個(gè)算子運(yùn)行在一個(gè) subtask 里,同時(shí)這兩個(gè)算子的并行度為 2,所以在對(duì)應(yīng)兩個(gè) subtask。圖中后續(xù)的算子也是類似,圖中任務(wù)如果運(yùn)行起來(lái),就會(huì)對(duì)應(yīng) 5 個(gè) subtask,也就是對(duì)應(yīng) 5 個(gè) StreamTask。

Job 的 ExecutionGraph

從資源角度講,每個(gè) TaskManager 內(nèi)部有多個(gè) slot,每個(gè) slot 內(nèi)部運(yùn)行著一個(gè) subtask,也就是說(shuō)每個(gè) slot 內(nèi)部運(yùn)行著一個(gè) StreamTask。

看完這個(gè)案例,再回顧一遍源碼中注釋,應(yīng)該比較容易理解了:

  • StreamTask 是由 TM 部署并執(zhí)行的本地處理單元
  • 每個(gè) StreamTask 運(yùn)行一個(gè)或多個(gè) Chain 在一塊的 StreamOperator,即:Source 算子和 map 算子就是 Chain 在一起的 Operator
  • 這些 Operator 將會(huì)在一個(gè)線程內(nèi)同步的執(zhí)行。即:線程中 Source 算子和 map 算子不能同時(shí)執(zhí)行。

二、StreamTask 職責(zé)簡(jiǎn)介

如源碼注釋所示,StreamTask 的生命周期如下所示:

StreamTask 的生命周期

簡(jiǎn)單概括分為三個(gè)階段:初始化、run、close。

初始化階段包括:Operator 的配置、task 特定的初始化、初始化算子的 State、open-operators。

做 Flink 開發(fā)的同學(xué)應(yīng)該都知道:自定義一個(gè) Function 時(shí)可以實(shí)現(xiàn) RichFunction,實(shí)現(xiàn) open 方法,然后 Job 啟動(dòng)時(shí),就會(huì)調(diào)用 open 方法做一些初始化操作。
open-operators 指的是 StreamTask 在初始化階段,會(huì)調(diào)用所有實(shí)現(xiàn)了 RichFunction 算子 的 open 方法。

run 階段:主要就是數(shù)據(jù)處理了。

close 階段:做一些關(guān)閉操作,例如調(diào)用算子的 close 方法等,并做一些清理工作。

三、StreamTask 的初始化

StreamTask 的整個(gè)流程都在 invoke 方法中,直接從 invoke 方法開始分析。invoke 方法就是上面介紹的三個(gè)階段:初始化、run、close。初始化階段做了很多事情,有些直接略過了(例如:創(chuàng)建線程池等),當(dāng)然初始化階段重要的操作會(huì)深入分析。

invoke 中初始化相關(guān)的代碼做了部分刪減,如下所示:

asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));

// 創(chuàng)建 StateBackend, 優(yōu)先從 app 的設(shè)置中去加載,再去 config 中去加載,
// 都沒有配置,則創(chuàng)建默認(rèn)的 MemoryStateBackend
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

// task 特定的初始化,例如 當(dāng)前 StreamTask 有 input 的情況下,會(huì)初始化 inputProcessor
init();

synchronized (lock) {
 // 循環(huán)遍歷,對(duì)該 task 所有 Operator 進(jìn)行狀態(tài)初始化,
 // 包括初始化 StateBackend ,并調(diào)用 udf 的 initializeState 方法
 initializeState();
 openAllOperators();
}

初始化部分代碼較多,下面主要介紹幾部分:

  • createStateBackend
  • init
  • initializeState();
  • openAllOperators();

1. createStateBackend

見名之意,該方法就是創(chuàng)建 StateBackend,F(xiàn)link 目前支持三種 StateBackend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。該方法決定了當(dāng)前 Job 具體要?jiǎng)?chuàng)建哪種 StateBackend。

源碼如下:

// (1) the application defined state backend has precedence
// 代碼中創(chuàng)建了 StateBackend,則按照代碼中配置來(lái)
if (fromApplication != null) {
 // see if this is supposed to pick up additional configuration parameters
 if (fromApplication instanceof ConfigurableStateBackend) {
  // needs to pick up configuration
  if (logger != null) {
   logger.info("Configuring application-defined state backend with job/cluster config");
  }

  backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
 }
 else {
  // keep as is!
  backend = fromApplication;
 }
}
else {
 // (2) check if the config defines a state backend
 // 代碼中沒有配置,按照 配置文件來(lái):即按照 flink-conf.yaml 文件中的配置來(lái)
 final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
 if (fromConfig != null) {
  backend = fromConfig;
 }
 else {
  // (3) use the default
  // 代碼中沒有配置,配置文件也沒有配置,則創(chuàng)建默認(rèn)的 MemoryStateBackend
  backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
 }
}

整體流程比較簡(jiǎn)單:

  • 代碼中創(chuàng)建了 StateBackend,則按照代碼中配置來(lái)
  • 代碼中沒有配置,按照 配置文件來(lái):即按照 flink-conf.yaml 文件中的配置來(lái)
  • 代碼中沒有配置,配置文件也沒有配置,則創(chuàng)建默認(rèn)的 MemoryStateBackend

2. init

init 運(yùn)行 task 特定的初始化,例如當(dāng)前 StreamTask 有 input 的情況下,會(huì)初始化 inputProcessor 讀取并處理數(shù)據(jù)。關(guān)于 inputProcessor 會(huì)在 run 部分重點(diǎn)介紹,這里先略過。

3. initializeState

initializeState 方法源碼:

// 循環(huán)遍歷,對(duì)該 task 所有 Operator 進(jìn)行狀態(tài)初始化,
// 包括初始化 StateBackend ,并調(diào)用 udf 的 initializeState 方法
private void initializeState() throws Exception {

 StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

 for (StreamOperator<?> operator : allOperators) {
  if (null != operator) {
   operator.initializeState();
  }
 }
}

源碼邏輯比較簡(jiǎn)單:直接調(diào)用當(dāng)前 StreamTask 的 operatorChain 中所有 StreamOperator 的 initializeState 方法。假如當(dāng)前 operatorChain 包含了 MapFunction、FilterFunction,兩個(gè)算子將會(huì)被封裝在 StreamMap 和 StreamFilter 中,那么此時(shí)就會(huì)調(diào)用這兩個(gè)算子所對(duì)應(yīng)的 StreamOperator 的 initializeState 方法,根據(jù)繼承,最后調(diào)用的是 AbstractStreamOperator 的無(wú)參 initializeState() 方法。

這里專門強(qiáng)調(diào)無(wú)參 initializeState() 方法,是因?yàn)?AbstractStreamOperator 中還有一個(gè)有參的 initializeState(StateInitializationContext context) 方法,不要混淆。

注:有參的 initializeState 方法參數(shù)類型較長(zhǎng),下文將縮寫為 initializeState(context) ;無(wú)參的 initializeState 方法繼續(xù)用 initializeState() 表示。

AbstractStreamOperator 類的 initializeState() 方法介紹

initializeState 方法的簡(jiǎn)潔版源碼如下:

public final void initializeState() throws Exception {
 final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

  // 創(chuàng)建 StreamTaskStateInitializerImpl
 final StreamTaskStateInitializer streamTaskStateManager =
  Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

 // 使用 StreamTaskStateInitializerImpl 初始化
  // 各種 operatorStateBackend 和 keyedStateBackend,
 // 并從 Checkpoint 處恢復(fù) State
 final StreamOperatorStateContext context =
  streamTaskStateManager.streamOperatorStateContext(XXX);

 try {
    // new Context 
  StateInitializationContext initializationContext = new 
      StateInitializationContextImpl(XXX);

  /**
   * 重點(diǎn)關(guān)注 AbstractUdfStreamOperator,它重寫了 initializeState(context) 方法,
   * 去真正調(diào)用 各個(gè) udf 的 initializeState 方法,
   */
  initializeState(initializationContext);
 } finally {
    XXX
 }
}

initializeState 方法主要完成兩個(gè)工作:

  • 1、初始化 KeyedStateBackend 和 OperatorStateBackend,并從 Checkpoint 處恢復(fù) State
  • 2、如果封裝了 udf,則調(diào)用 udf 的 initializeState 方法(前提是 userFunction 實(shí)現(xiàn)了 CheckpointedFunction 接口)

源碼流程:創(chuàng)建 StreamTaskStateInitializer。StreamTaskStateInitializer 只有一個(gè)實(shí)現(xiàn)類:StreamTaskStateInitializerImpl,所以會(huì)創(chuàng)建 StreamTaskStateInitializerImpl。創(chuàng)建時(shí),將之前初始化好的 StateBackend 傳遞給 StreamTaskStateInitializerImpl,然后調(diào)用 streamOperatorStateContext 方法初始化 KeyedStateBackend 和 OperatorStateBackend。

下面重點(diǎn)關(guān)注 StreamTaskStateInitializerImpl 類的 streamOperatorStateContext 方法,源碼如下所示:

public StreamOperatorStateContext streamOperatorStateContext(XXX){
// -------------- 初始化 Keyed State Backend --------------
 keyedStatedBackend = keyedStatedBackend(XXX);

 // -------------- 初始化 Operator State Backend --------------
 operatorStateBackend = operatorStateBackend(XXX);

 // -------------- Raw State 相關(guān)操作 --------------
 rawKeyedStateInputs = rawKeyedStateInputs(XXX);
 streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

 rawOperatorStateInputs = rawOperatorStateInputs(XXX);
 streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

 // -------------- Internal Timer Service Manager --------------
 timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);

 // -------------- Preparing return value --------------

 return new StreamOperatorStateContextImpl(
  prioritizedOperatorSubtaskStates.isRestored(),
  operatorStateBackend,
  keyedStatedBackend,
  timeServiceManager,
  rawOperatorStateInputs,
  rawKeyedStateInputs);
}

首先會(huì)初始化 Keyed State Backend 和 Operator State Backend,F(xiàn)link 還支持 Raw 類型的 State,基本用不到,除非 Flink 內(nèi)的 Managed State 不能滿足作業(yè)的需求。重點(diǎn)關(guān)注 Keyed State Backend 和 Operator State Backend 的初始化。keyedStatedBackend 方法用于初始化 keyedStatedBackend,operatorStateBackend 方法用于初始化 operatorStateBackend。

StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系

介紹初始化源碼之前,先介紹一下 StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系。我們都知道 Flink 中支持兩種類型的 State,即:KeyedState 和 OperatorState;Flink 目前支持三種狀態(tài)后端存儲(chǔ),即:Memory、Fs 和 RocksDB。所以 StateBackend 有三種實(shí)現(xiàn)即:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。每種 StateBackend 都要支持 KeyedState 和 OperatorState,所以每種 StateBackend 要負(fù)責(zé)創(chuàng)建出自己相應(yīng)的 keyedStateBackend 以及 operatorStateBackend。具體 KeyedState 與存儲(chǔ)系統(tǒng)如何交互是由 keyedStateBackend 完成的,具體 OperatorState 與存儲(chǔ)系統(tǒng)如何交互是由 operatorStateBackend。例如 RocksDBStateBackend 會(huì)創(chuàng)建出 RocksDBKeyedStateBackend,每個(gè) RocksDBKeyedStateBackend 會(huì)持有 RocksDB 數(shù)據(jù)庫(kù)實(shí)例,然后 Flink 引擎就可以與 RocksDB 進(jìn)行交互了。

簡(jiǎn)言之:根據(jù)用戶配置創(chuàng)建出不同類型的 StateBackend,然后不同的 StateBackend 再創(chuàng)建出對(duì)應(yīng)的 keyedStateBackend 以及 operatorStateBackend,keyedStateBackend 和 operatorStateBackend 會(huì)真正的存儲(chǔ)狀態(tài)數(shù)據(jù)。

每種 StateBackend 到底會(huì)創(chuàng)建出哪種 keyedStateBackend 和哪種 operatorStateBackend 呢?這里引用 Flink 社區(qū)分享的圖:

StateBackend 與 keyedStateBackend 以及 operatorStateBackend 的映射關(guān)系

圖中可以看出,Memory 和 Fs 會(huì)創(chuàng)建出 HeapKeyedStateBackend,RocksDB 會(huì)創(chuàng)建出 RocksDBKeyedStateBackend。無(wú)論哪種 StateBackend,都會(huì)使用 DefaultOperatorStateBackend。這里也驗(yàn)證了一點(diǎn):RocksDB 數(shù)據(jù)庫(kù)中只會(huì)存儲(chǔ) KeyedState,不會(huì)存儲(chǔ) OperatorState。因?yàn)?RocksDBStateBackend 對(duì)應(yīng)的 OperatorState 的存儲(chǔ)也是基于內(nèi)存的。讀到這里,讀者應(yīng)該理解 StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系了。

初始化 keyedStateBackend 流程

下面重點(diǎn)關(guān)注初始化 keyedStateBackend 的流程,keyedStatedBackend 方法源碼如下所示:

protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(XXX){
 // 如果不是 KeyedStream 直接就返回,即:不創(chuàng)建 keyedStatedBackend
 if (keySerializer == null) {
  return null;
 }
  // 計(jì)算當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange
 final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
  taskInfo.getMaxNumberOfParallelSubtasks(),
  taskInfo.getNumberOfParallelSubtasks(),
  taskInfo.getIndexOfThisSubtask());

 BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> 
    backendRestorer =
  new BackendRestorerProcedure<>(
   // 這里是函數(shù)式接口,并不是去 create KeyedStateBackend
   (stateHandles) -> stateBackend.createKeyedStateBackend(XXX),
   backendCloseableRegistry,
   logDescription);

 try {
  // 這里去 create StateBackend 并恢復(fù)狀態(tài)文件
  return backendRestorer.createAndRestore(
   // 獲取 StateHandle 的集合
   prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
 } finally {
  if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
   IOUtils.closeQuietly(cancelStreamRegistryForRestore);
  }
 }
}

可以看到方法第一行:if (keySerializer == null) {return null;} 表示如果不是 KeyedStream 直接就返回,即:不創(chuàng)建 keyedStatedBackend。計(jì)算當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange,然后創(chuàng)建 BackendRestorerProcedure 類型的 backendRestorer,且將 (stateHandles) -> stateBackend.createKeyedStateBackend(XXX) 傳遞給 BackendRestorerProcedure 構(gòu)造器的 instanceSupplier,instanceSupplier 是一個(gè)函數(shù)式接口,用于創(chuàng)建 KeyedStateBackend。

backendRestorer.createAndRestore 方法會(huì)循環(huán)調(diào)用 attemptCreateAndRestore 恢復(fù)一個(gè)個(gè) State, attemptCreateAndRestore 方法中調(diào)用 instanceSupplier(函數(shù)式接口)真正的創(chuàng)建 keyedStatedBackend。

instanceSupplier 函數(shù)式接口的工作:調(diào)用相應(yīng) StateBackend 的 createKeyedStateBackend 方法創(chuàng)建 AbstractKeyedStateBackend。如果 stateBackend 是 RocksDBStateBackend,就會(huì)創(chuàng)建出 RocksDBKeyedStateBackend。如果是 Memory 或 Fs 則會(huì)創(chuàng)建出 HeapKeyedStateBackend。在創(chuàng)建完 KeyedStateBackend 的過程中,會(huì)從 Checkpoint 中恢復(fù)狀態(tài)到 Flink 引擎。

注:具體 KeyedStateBackend 恢復(fù)狀態(tài)的流程比較復(fù)雜,每種 StateBackend 的恢復(fù)流程都不同,同時(shí)還牽扯到從 dfs 中拉取狀態(tài)數(shù)據(jù)用于恢復(fù),所以后續(xù)會(huì)有單獨(dú)的博客介紹恢復(fù)流程。

初始化 operatorStateBackend 流程

operatorStateBackend 方法用于初始化 operatorStateBackend。operatorStateBackend 初始化流程與 keyedStateBackend 比較類似,區(qū)別在于最后調(diào)用的是 stateBackend.createOperatorStateBackend()。

三種 StateBackend 的 createOperatorStateBackend 方法非常相似,源碼如下:

public OperatorStateBackend createOperatorStateBackend(
 Environment env,
 String operatorIdentifier,
 @Nonnull Collection<OperatorStateHandle> stateHandles,
 CloseableRegistry cancelStreamRegistry) throws Exception {

 return new DefaultOperatorStateBackendBuilder(
  env.getUserClassLoader(),
  env.getExecutionConfig(),
  isUsingAsynchronousSnapshots(),
  stateHandles,
  cancelStreamRegistry).build();
}

這里無(wú)論是何種 StateBackend,都會(huì)創(chuàng)建出 DefaultOperatorStateBackend。也就驗(yàn)證了一點(diǎn):RocksDB 只支持 KeyedState,OperatorState 都是按照 Heap 的方案。

具體 new DefaultOperatorStateBackend 的過程由建造器 DefaultOperatorStateBackendBuilder 完成,build 的功能是創(chuàng)建出 OperatorStateBackend,并從 Checkpoint 中將 State 恢復(fù)到 Flink 引擎端。整體流程比較復(fù)雜,這里不闡述會(huì)在后面博客中單獨(dú)介紹。

如果當(dāng)前是 udf,則調(diào)用 udf 的 initializeState 方法

接下來(lái)重點(diǎn)又回到了 AbstractStreamOperator 類的 initializeState() 方法中,根據(jù)創(chuàng)建好的 operatorStateBackend 和 keyedStateStore 構(gòu)造 Context。然后調(diào)用 initializeState(Context) 方法,之前說(shuō)過要區(qū)分有參和無(wú)參的 initializeState 方法,現(xiàn)在執(zhí)行到了有參的 initializeState(Context) 方法。

前面介紹過所有自定義的 UDF 都被包裝起來(lái),例如 MapFunction 都被 StreamMap 類包裝起來(lái),且這些 UDF 的包裝類都繼承自 AbstractUdfStreamOperator,AbstractStreamOperator 類的 initializeState(Context) 方法沒有任何實(shí)現(xiàn),這里重點(diǎn)關(guān)注 AbstractUdfStreamOperator 重寫的 initializeState(Context) 方法。

AbstractUdfStreamOperator 類的 initializeState 方法源碼:

@Override
public void initializeState(StateInitializationContext context) throws Exception {
 // super 表示 AbstractStreamOperator 類
 super.initializeState(context);
 // 調(diào)用 udf 的 initializeState 方法
 StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

super 表示 AbstractStreamOperator 類,即調(diào)用 AbstractStreamOperator 類的 initializeState(context) 空方法,重點(diǎn)在于工具類 StreamingFunctionUtils 的 restoreFunctionState(context, userFunction) 方法,restoreFunctionState 方法內(nèi)會(huì)對(duì)包裝的 udf 進(jìn)行解包裝,然后執(zhí)行 tryRestoreFunction 方法。

tryRestoreFunction 方法部分源碼如下所示:

private static boolean tryRestoreFunction(
  StateInitializationContext context,
  Function userFunction) throws Exception {
 // 調(diào)用 udf 的  initializeState 方法,
  // 前提是 userFunction instanceof CheckpointedFunction
 if (userFunction instanceof CheckpointedFunction) {
  ((CheckpointedFunction) userFunction).initializeState(context);
  return true;
 }
}

tryRestoreFunction 會(huì)對(duì) userFunction 進(jìn)行判斷,如果實(shí)現(xiàn)了 CheckpointedFunction 接口,就調(diào)用 userFunction 的 initializeState(context) 對(duì)狀態(tài)進(jìn)行初始化。使用過 CheckpointedFunction 接口的同學(xué)應(yīng)該清楚:自定義的 Function 可以實(shí)現(xiàn) CheckpointedFunction 接口,重寫 initializeState 方法,做一些狀態(tài)的初始化操作。例如在 initializeState 方法申請(qǐng)創(chuàng)建 OperatorState。

udf 使用 initializeState 的經(jīng)典案例就是 FlinkKafkaConsumerBase 類,F(xiàn)linkKafkaConsumerBase 類實(shí)現(xiàn)了 CheckpointedFunction 接口,在 initializeState 方法中定義了 OperatorState 類型的 ListState,將 Flink 消費(fèi) Kafka 的 offset 信息維護(hù)在 ListState 中。每次啟動(dòng)任務(wù)時(shí),都會(huì)從 ListState 中恢復(fù)之前的 offset,并從 offset 處繼續(xù)消費(fèi)。

initializeState 小結(jié)

initializeState 過程比較復(fù)雜,總的來(lái)說(shuō)就兩個(gè)事情:

  • 1、 創(chuàng)建相應(yīng)的 keyedStateBackend 和 OperatorStateBackend,并從 Checkpoint 處恢復(fù) State(具體恢復(fù)流程后續(xù)講述)
  • 2、 如果 udf 實(shí)現(xiàn)了 CheckpointedFunction 接口,則調(diào)用 udf 的 initializeState 方法

4. openAllOperators

此時(shí)回到了 StreamTask 初始化流程的下一步:openAllOperators。openAllOperators 方法比較簡(jiǎn)單,源碼如下所示:

private void openAllOperators() throws Exception {
 for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
  if (operator != null) {
   operator.open();
  }
 }
}

openAllOperators 方法會(huì)調(diào)用 OperatorChain 中所有 StreamOperator 的 open 方法,通過繼承關(guān)系,最后調(diào)用的仍然是 AbstractUdfStreamOperator 類的 open 方法。AbstractUdfStreamOperator 類的 open 方法源碼如下所示:

// AbstractUdfStreamOperator 類的 open 方法
@Override
public void open() throws Exception {
 super.open();
 FunctionUtils.openFunction(userFunction, new Configuration());
}

// FunctionUtils 類的 openFunction 方法
public static void openFunction(Function function
                                , Configuration parameters) {
 if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
 }
}

AbstractUdfStreamOperator 類的 open 方法調(diào)用 FunctionUtils 類的 openFunction 方法,openFunction 方法中會(huì)判斷當(dāng)前 userFunction 是否實(shí)現(xiàn)了 RichFunction 接口,如果實(shí)現(xiàn)了 RichFunction 接口,則調(diào)用 userFunction 的 open 方法。

openAllOperators 小結(jié)

openAllOperators 的流程比較簡(jiǎn)單,就是判斷 userFunction 是否實(shí)現(xiàn)了 RichFunction 接口,在 Flink 中實(shí)現(xiàn)了 RichFunction 表示富函數(shù),可以定義 open 和 close 相關(guān)的邏輯,在算子初始化或者關(guān)閉時(shí)會(huì)被調(diào)用。

四、 思考題

如下代碼所示,開發(fā) Flink Job 時(shí) dataStream keyBy 后連續(xù)跟 map、filter、flatMap 三個(gè)算子,請(qǐng)問這三個(gè)自定義的 Function 內(nèi)都可以使用 Flink 的 ValueState 嗎?

dataStream.keyby(_._1)
  .map(new MyMapFunction())
  .filter(new MyFilterFunction())
  .flatMap(new MyFlatMapFunction())

先說(shuō)答案吧:在 MyMapFunction 中可以使用 ValueState,在 MyFilterFunction 和 MyFlatMapFunction 中不能使用 ValueState。如果在 MyFilterFunction 和 MyFlatMapFunction 中定義 ValueState 或 MapState,都會(huì)報(bào)錯(cuò),會(huì)顯示 keyedStateBackend 為 null。為什么呢?

首先 Flink 中只有 KeyedState 才支持 ValueState 和 MapState,OperatorState 不支持 ValueState 和 MapState。只要對(duì) KeyedStream 的操作才能使用 KeyedState,KeyedState 表示相同的 key 共享同一個(gè) State,普通的 DataStream 中沒有 key 的概念不能使用 KeyedState。

DataStream 的 keyBy 方法源碼如下所示,由源碼 DataStream 可以看到,DataStream 的 keyBy 方法會(huì)返回 KeyedStream,KeyedStream 是 DataStream 的子類,KeyedStream 經(jīng)過 map 轉(zhuǎn)換后又會(huì)變成 DataStream。所以上圖中只有 MyMapFunction 是基于 KeyedStream 操作的,MyFilterFunction 和 MyFlatMapFunction 都是基于 DataStream 操作的,沒有 key 的概念,因此不能使用 KeyedState,即不能使用 ValueState。

// DataStream 的 keyBy 方法源碼
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
 Preconditions.checkNotNull(key);
 return new KeyedStream<>(this, clean(key));
}

MyFilterFunction 和 MyFlatMapFunction 中定義 ValueState 或 MapState 時(shí),為什么會(huì)報(bào)出 keyedStateBackend 為 null 呢?

回顧一下創(chuàng)建 keyedStateBackend 的流程,第一步就是 if (keySerializer == null) {return null;},如果不是 KeyedStream 直接就返回,即:不創(chuàng)建 keyedStatedBackend。所以出現(xiàn)了上述現(xiàn)象。

五、 總結(jié)

本文介紹了 StreamTask 類的基本功能,StreamTask 映射到 ExecutionGraph 中對(duì)應(yīng)的是一個(gè) subtask,每個(gè) StreamTask 運(yùn)行一個(gè)或多個(gè) Chain 在一起的 StreamOperator,這些 Operator 將會(huì)在一個(gè)線程內(nèi)同步執(zhí)行。隨后介紹了 StreamTask 的生命周期,主要包括了初始化、run、close 三個(gè)流程。后半部分重點(diǎn)描述了 StreamTask 初始化的過程,主要是:createStateBackend、init、initializeState()、openAllOperators() 四個(gè)過程。

后續(xù)會(huì)給大家詳細(xì)介紹 initializeState 部分如何從 Checkpoint 中恢復(fù) State,也會(huì)詳細(xì)介紹 run 流程到底是如何處理一條條數(shù)據(jù)的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容