本文僅為筆者平日學(xué)習(xí)記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/GuA9o09EEue66fEpGgoGaQ
本文是 Flink 源碼解析系列,通過閱讀本文你能 get 到以下點(diǎn):
- StreamTask 類的基本功能及其職責(zé)
- StreamTask 初始化詳細(xì)流程
- StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系
這里先留一個(gè)思考題:如下代碼所示,開發(fā) Flink Job 時(shí) dataStream keyBy 后連續(xù)跟 map、filter、flatMap 三個(gè)算子,請(qǐng)問這三個(gè)自定義的 Function 內(nèi)都可以使用 Flink 的 ValueState 嗎?
dataStream.keyby(_._1)
.map(new MyMapFunction())
.filter(new MyFilterFunction())
.flatMap(new MyFlatMapFunction())
一、 StreamTask 介紹
Flink 中數(shù)據(jù)的整個(gè)處理流程都是圍繞 StreamTask 來(lái)做的,所以先介紹一下 StreamTask 這個(gè)類。StreamTask 類的 doc 如下所示:
注釋的大概意思是:StreamTask 是所有 Streaming Task 的基類,是由 TM 部署并執(zhí)行的本地處理單元。每個(gè) StreamTask 運(yùn)行一個(gè)或多個(gè) Chain 在一起的 StreamOperator,這些 Operator 將會(huì)在一個(gè)線程內(nèi)同步執(zhí)行。常見的 case:map、flatmap、filter 三個(gè)算子連續(xù)的算子。
通俗的講,StreamTask 就是對(duì)應(yīng)一個(gè) subtask 實(shí)例。如下圖 Job 的 ExecutionGraph 所示,Source 算子和 map 算子 Chain 在一起,組成一個(gè) OperatorChain,所以這兩個(gè)算子運(yùn)行在一個(gè) subtask 里,同時(shí)這兩個(gè)算子的并行度為 2,所以在對(duì)應(yīng)兩個(gè) subtask。圖中后續(xù)的算子也是類似,圖中任務(wù)如果運(yùn)行起來(lái),就會(huì)對(duì)應(yīng) 5 個(gè) subtask,也就是對(duì)應(yīng) 5 個(gè) StreamTask。
從資源角度講,每個(gè) TaskManager 內(nèi)部有多個(gè) slot,每個(gè) slot 內(nèi)部運(yùn)行著一個(gè) subtask,也就是說(shuō)每個(gè) slot 內(nèi)部運(yùn)行著一個(gè) StreamTask。
看完這個(gè)案例,再回顧一遍源碼中注釋,應(yīng)該比較容易理解了:
- StreamTask 是由 TM 部署并執(zhí)行的本地處理單元
- 每個(gè) StreamTask 運(yùn)行一個(gè)或多個(gè) Chain 在一塊的 StreamOperator,即:Source 算子和 map 算子就是 Chain 在一起的 Operator
- 這些 Operator 將會(huì)在一個(gè)線程內(nèi)同步的執(zhí)行。即:線程中 Source 算子和 map 算子不能同時(shí)執(zhí)行。
二、StreamTask 職責(zé)簡(jiǎn)介
如源碼注釋所示,StreamTask 的生命周期如下所示:
簡(jiǎn)單概括分為三個(gè)階段:初始化、run、close。
初始化階段包括:Operator 的配置、task 特定的初始化、初始化算子的 State、open-operators。
做 Flink 開發(fā)的同學(xué)應(yīng)該都知道:自定義一個(gè) Function 時(shí)可以實(shí)現(xiàn) RichFunction,實(shí)現(xiàn) open 方法,然后 Job 啟動(dòng)時(shí),就會(huì)調(diào)用 open 方法做一些初始化操作。
open-operators 指的是 StreamTask 在初始化階段,會(huì)調(diào)用所有實(shí)現(xiàn)了 RichFunction 算子 的 open 方法。
run 階段:主要就是數(shù)據(jù)處理了。
close 階段:做一些關(guān)閉操作,例如調(diào)用算子的 close 方法等,并做一些清理工作。
三、StreamTask 的初始化
StreamTask 的整個(gè)流程都在 invoke 方法中,直接從 invoke 方法開始分析。invoke 方法就是上面介紹的三個(gè)階段:初始化、run、close。初始化階段做了很多事情,有些直接略過了(例如:創(chuàng)建線程池等),當(dāng)然初始化階段重要的操作會(huì)深入分析。
invoke 中初始化相關(guān)的代碼做了部分刪減,如下所示:
asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
// 創(chuàng)建 StateBackend, 優(yōu)先從 app 的設(shè)置中去加載,再去 config 中去加載,
// 都沒有配置,則創(chuàng)建默認(rèn)的 MemoryStateBackend
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
// task 特定的初始化,例如 當(dāng)前 StreamTask 有 input 的情況下,會(huì)初始化 inputProcessor
init();
synchronized (lock) {
// 循環(huán)遍歷,對(duì)該 task 所有 Operator 進(jìn)行狀態(tài)初始化,
// 包括初始化 StateBackend ,并調(diào)用 udf 的 initializeState 方法
initializeState();
openAllOperators();
}
初始化部分代碼較多,下面主要介紹幾部分:
- createStateBackend
- init
- initializeState();
- openAllOperators();
1. createStateBackend
見名之意,該方法就是創(chuàng)建 StateBackend,F(xiàn)link 目前支持三種 StateBackend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。該方法決定了當(dāng)前 Job 具體要?jiǎng)?chuàng)建哪種 StateBackend。
源碼如下:
// (1) the application defined state backend has precedence
// 代碼中創(chuàng)建了 StateBackend,則按照代碼中配置來(lái)
if (fromApplication != null) {
// see if this is supposed to pick up additional configuration parameters
if (fromApplication instanceof ConfigurableStateBackend) {
// needs to pick up configuration
if (logger != null) {
logger.info("Configuring application-defined state backend with job/cluster config");
}
backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
}
else {
// keep as is!
backend = fromApplication;
}
}
else {
// (2) check if the config defines a state backend
// 代碼中沒有配置,按照 配置文件來(lái):即按照 flink-conf.yaml 文件中的配置來(lái)
final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
if (fromConfig != null) {
backend = fromConfig;
}
else {
// (3) use the default
// 代碼中沒有配置,配置文件也沒有配置,則創(chuàng)建默認(rèn)的 MemoryStateBackend
backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
}
}
整體流程比較簡(jiǎn)單:
- 代碼中創(chuàng)建了 StateBackend,則按照代碼中配置來(lái)
- 代碼中沒有配置,按照 配置文件來(lái):即按照 flink-conf.yaml 文件中的配置來(lái)
- 代碼中沒有配置,配置文件也沒有配置,則創(chuàng)建默認(rèn)的 MemoryStateBackend
2. init
init 運(yùn)行 task 特定的初始化,例如當(dāng)前 StreamTask 有 input 的情況下,會(huì)初始化 inputProcessor 讀取并處理數(shù)據(jù)。關(guān)于 inputProcessor 會(huì)在 run 部分重點(diǎn)介紹,這里先略過。
3. initializeState
initializeState 方法源碼:
// 循環(huán)遍歷,對(duì)該 task 所有 Operator 進(jìn)行狀態(tài)初始化,
// 包括初始化 StateBackend ,并調(diào)用 udf 的 initializeState 方法
private void initializeState() throws Exception {
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (StreamOperator<?> operator : allOperators) {
if (null != operator) {
operator.initializeState();
}
}
}
源碼邏輯比較簡(jiǎn)單:直接調(diào)用當(dāng)前 StreamTask 的 operatorChain 中所有 StreamOperator 的 initializeState 方法。假如當(dāng)前 operatorChain 包含了 MapFunction、FilterFunction,兩個(gè)算子將會(huì)被封裝在 StreamMap 和 StreamFilter 中,那么此時(shí)就會(huì)調(diào)用這兩個(gè)算子所對(duì)應(yīng)的 StreamOperator 的 initializeState 方法,根據(jù)繼承,最后調(diào)用的是 AbstractStreamOperator 的無(wú)參 initializeState() 方法。
這里專門強(qiáng)調(diào)無(wú)參 initializeState() 方法,是因?yàn)?AbstractStreamOperator 中還有一個(gè)有參的 initializeState(StateInitializationContext context) 方法,不要混淆。
注:有參的 initializeState 方法參數(shù)類型較長(zhǎng),下文將縮寫為 initializeState(context) ;無(wú)參的 initializeState 方法繼續(xù)用 initializeState() 表示。
AbstractStreamOperator 類的 initializeState() 方法介紹
initializeState 方法的簡(jiǎn)潔版源碼如下:
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
// 創(chuàng)建 StreamTaskStateInitializerImpl
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
// 使用 StreamTaskStateInitializerImpl 初始化
// 各種 operatorStateBackend 和 keyedStateBackend,
// 并從 Checkpoint 處恢復(fù) State
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(XXX);
try {
// new Context
StateInitializationContext initializationContext = new
StateInitializationContextImpl(XXX);
/**
* 重點(diǎn)關(guān)注 AbstractUdfStreamOperator,它重寫了 initializeState(context) 方法,
* 去真正調(diào)用 各個(gè) udf 的 initializeState 方法,
*/
initializeState(initializationContext);
} finally {
XXX
}
}
initializeState 方法主要完成兩個(gè)工作:
- 1、初始化 KeyedStateBackend 和 OperatorStateBackend,并從 Checkpoint 處恢復(fù) State
- 2、如果封裝了 udf,則調(diào)用 udf 的 initializeState 方法(前提是 userFunction 實(shí)現(xiàn)了 CheckpointedFunction 接口)
源碼流程:創(chuàng)建 StreamTaskStateInitializer。StreamTaskStateInitializer 只有一個(gè)實(shí)現(xiàn)類:StreamTaskStateInitializerImpl,所以會(huì)創(chuàng)建 StreamTaskStateInitializerImpl。創(chuàng)建時(shí),將之前初始化好的 StateBackend 傳遞給 StreamTaskStateInitializerImpl,然后調(diào)用 streamOperatorStateContext 方法初始化 KeyedStateBackend 和 OperatorStateBackend。
下面重點(diǎn)關(guān)注 StreamTaskStateInitializerImpl 類的 streamOperatorStateContext 方法,源碼如下所示:
public StreamOperatorStateContext streamOperatorStateContext(XXX){
// -------------- 初始化 Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(XXX);
// -------------- 初始化 Operator State Backend --------------
operatorStateBackend = operatorStateBackend(XXX);
// -------------- Raw State 相關(guān)操作 --------------
rawKeyedStateInputs = rawKeyedStateInputs(XXX);
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(XXX);
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
}
首先會(huì)初始化 Keyed State Backend 和 Operator State Backend,F(xiàn)link 還支持 Raw 類型的 State,基本用不到,除非 Flink 內(nèi)的 Managed State 不能滿足作業(yè)的需求。重點(diǎn)關(guān)注 Keyed State Backend 和 Operator State Backend 的初始化。keyedStatedBackend 方法用于初始化 keyedStatedBackend,operatorStateBackend 方法用于初始化 operatorStateBackend。
StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系
介紹初始化源碼之前,先介紹一下 StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系。我們都知道 Flink 中支持兩種類型的 State,即:KeyedState 和 OperatorState;Flink 目前支持三種狀態(tài)后端存儲(chǔ),即:Memory、Fs 和 RocksDB。所以 StateBackend 有三種實(shí)現(xiàn)即:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。每種 StateBackend 都要支持 KeyedState 和 OperatorState,所以每種 StateBackend 要負(fù)責(zé)創(chuàng)建出自己相應(yīng)的 keyedStateBackend 以及 operatorStateBackend。具體 KeyedState 與存儲(chǔ)系統(tǒng)如何交互是由 keyedStateBackend 完成的,具體 OperatorState 與存儲(chǔ)系統(tǒng)如何交互是由 operatorStateBackend。例如 RocksDBStateBackend 會(huì)創(chuàng)建出 RocksDBKeyedStateBackend,每個(gè) RocksDBKeyedStateBackend 會(huì)持有 RocksDB 數(shù)據(jù)庫(kù)實(shí)例,然后 Flink 引擎就可以與 RocksDB 進(jìn)行交互了。
簡(jiǎn)言之:根據(jù)用戶配置創(chuàng)建出不同類型的 StateBackend,然后不同的 StateBackend 再創(chuàng)建出對(duì)應(yīng)的 keyedStateBackend 以及 operatorStateBackend,keyedStateBackend 和 operatorStateBackend 會(huì)真正的存儲(chǔ)狀態(tài)數(shù)據(jù)。
每種 StateBackend 到底會(huì)創(chuàng)建出哪種 keyedStateBackend 和哪種 operatorStateBackend 呢?這里引用 Flink 社區(qū)分享的圖:
圖中可以看出,Memory 和 Fs 會(huì)創(chuàng)建出 HeapKeyedStateBackend,RocksDB 會(huì)創(chuàng)建出 RocksDBKeyedStateBackend。無(wú)論哪種 StateBackend,都會(huì)使用 DefaultOperatorStateBackend。這里也驗(yàn)證了一點(diǎn):RocksDB 數(shù)據(jù)庫(kù)中只會(huì)存儲(chǔ) KeyedState,不會(huì)存儲(chǔ) OperatorState。因?yàn)?RocksDBStateBackend 對(duì)應(yīng)的 OperatorState 的存儲(chǔ)也是基于內(nèi)存的。讀到這里,讀者應(yīng)該理解 StateBackend 與 keyedStateBackend 以及 operatorStateBackend 之間的關(guān)系了。
初始化 keyedStateBackend 流程
下面重點(diǎn)關(guān)注初始化 keyedStateBackend 的流程,keyedStatedBackend 方法源碼如下所示:
protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(XXX){
// 如果不是 KeyedStream 直接就返回,即:不創(chuàng)建 keyedStatedBackend
if (keySerializer == null) {
return null;
}
// 計(jì)算當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange
final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
taskInfo.getMaxNumberOfParallelSubtasks(),
taskInfo.getNumberOfParallelSubtasks(),
taskInfo.getIndexOfThisSubtask());
BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle>
backendRestorer =
new BackendRestorerProcedure<>(
// 這里是函數(shù)式接口,并不是去 create KeyedStateBackend
(stateHandles) -> stateBackend.createKeyedStateBackend(XXX),
backendCloseableRegistry,
logDescription);
try {
// 這里去 create StateBackend 并恢復(fù)狀態(tài)文件
return backendRestorer.createAndRestore(
// 獲取 StateHandle 的集合
prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
} finally {
if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
IOUtils.closeQuietly(cancelStreamRegistryForRestore);
}
}
}
可以看到方法第一行:if (keySerializer == null) {return null;} 表示如果不是 KeyedStream 直接就返回,即:不創(chuàng)建 keyedStatedBackend。計(jì)算當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange,然后創(chuàng)建 BackendRestorerProcedure 類型的 backendRestorer,且將 (stateHandles) -> stateBackend.createKeyedStateBackend(XXX) 傳遞給 BackendRestorerProcedure 構(gòu)造器的 instanceSupplier,instanceSupplier 是一個(gè)函數(shù)式接口,用于創(chuàng)建 KeyedStateBackend。
backendRestorer.createAndRestore 方法會(huì)循環(huán)調(diào)用 attemptCreateAndRestore 恢復(fù)一個(gè)個(gè) State, attemptCreateAndRestore 方法中調(diào)用 instanceSupplier(函數(shù)式接口)真正的創(chuàng)建 keyedStatedBackend。
instanceSupplier 函數(shù)式接口的工作:調(diào)用相應(yīng) StateBackend 的 createKeyedStateBackend 方法創(chuàng)建 AbstractKeyedStateBackend。如果 stateBackend 是 RocksDBStateBackend,就會(huì)創(chuàng)建出 RocksDBKeyedStateBackend。如果是 Memory 或 Fs 則會(huì)創(chuàng)建出 HeapKeyedStateBackend。在創(chuàng)建完 KeyedStateBackend 的過程中,會(huì)從 Checkpoint 中恢復(fù)狀態(tài)到 Flink 引擎。
注:具體 KeyedStateBackend 恢復(fù)狀態(tài)的流程比較復(fù)雜,每種 StateBackend 的恢復(fù)流程都不同,同時(shí)還牽扯到從 dfs 中拉取狀態(tài)數(shù)據(jù)用于恢復(fù),所以后續(xù)會(huì)有單獨(dú)的博客介紹恢復(fù)流程。
初始化 operatorStateBackend 流程
operatorStateBackend 方法用于初始化 operatorStateBackend。operatorStateBackend 初始化流程與 keyedStateBackend 比較類似,區(qū)別在于最后調(diào)用的是 stateBackend.createOperatorStateBackend()。
三種 StateBackend 的 createOperatorStateBackend 方法非常相似,源碼如下:
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
這里無(wú)論是何種 StateBackend,都會(huì)創(chuàng)建出 DefaultOperatorStateBackend。也就驗(yàn)證了一點(diǎn):RocksDB 只支持 KeyedState,OperatorState 都是按照 Heap 的方案。
具體 new DefaultOperatorStateBackend 的過程由建造器 DefaultOperatorStateBackendBuilder 完成,build 的功能是創(chuàng)建出 OperatorStateBackend,并從 Checkpoint 中將 State 恢復(fù)到 Flink 引擎端。整體流程比較復(fù)雜,這里不闡述會(huì)在后面博客中單獨(dú)介紹。
如果當(dāng)前是 udf,則調(diào)用 udf 的 initializeState 方法
接下來(lái)重點(diǎn)又回到了 AbstractStreamOperator 類的 initializeState() 方法中,根據(jù)創(chuàng)建好的 operatorStateBackend 和 keyedStateStore 構(gòu)造 Context。然后調(diào)用 initializeState(Context) 方法,之前說(shuō)過要區(qū)分有參和無(wú)參的 initializeState 方法,現(xiàn)在執(zhí)行到了有參的 initializeState(Context) 方法。
前面介紹過所有自定義的 UDF 都被包裝起來(lái),例如 MapFunction 都被 StreamMap 類包裝起來(lái),且這些 UDF 的包裝類都繼承自 AbstractUdfStreamOperator,AbstractStreamOperator 類的 initializeState(Context) 方法沒有任何實(shí)現(xiàn),這里重點(diǎn)關(guān)注 AbstractUdfStreamOperator 重寫的 initializeState(Context) 方法。
AbstractUdfStreamOperator 類的 initializeState 方法源碼:
@Override
public void initializeState(StateInitializationContext context) throws Exception {
// super 表示 AbstractStreamOperator 類
super.initializeState(context);
// 調(diào)用 udf 的 initializeState 方法
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
super 表示 AbstractStreamOperator 類,即調(diào)用 AbstractStreamOperator 類的 initializeState(context) 空方法,重點(diǎn)在于工具類 StreamingFunctionUtils 的 restoreFunctionState(context, userFunction) 方法,restoreFunctionState 方法內(nèi)會(huì)對(duì)包裝的 udf 進(jìn)行解包裝,然后執(zhí)行 tryRestoreFunction 方法。
tryRestoreFunction 方法部分源碼如下所示:
private static boolean tryRestoreFunction(
StateInitializationContext context,
Function userFunction) throws Exception {
// 調(diào)用 udf 的 initializeState 方法,
// 前提是 userFunction instanceof CheckpointedFunction
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).initializeState(context);
return true;
}
}
tryRestoreFunction 會(huì)對(duì) userFunction 進(jìn)行判斷,如果實(shí)現(xiàn)了 CheckpointedFunction 接口,就調(diào)用 userFunction 的 initializeState(context) 對(duì)狀態(tài)進(jìn)行初始化。使用過 CheckpointedFunction 接口的同學(xué)應(yīng)該清楚:自定義的 Function 可以實(shí)現(xiàn) CheckpointedFunction 接口,重寫 initializeState 方法,做一些狀態(tài)的初始化操作。例如在 initializeState 方法申請(qǐng)創(chuàng)建 OperatorState。
udf 使用 initializeState 的經(jīng)典案例就是 FlinkKafkaConsumerBase 類,F(xiàn)linkKafkaConsumerBase 類實(shí)現(xiàn)了 CheckpointedFunction 接口,在 initializeState 方法中定義了 OperatorState 類型的 ListState,將 Flink 消費(fèi) Kafka 的 offset 信息維護(hù)在 ListState 中。每次啟動(dòng)任務(wù)時(shí),都會(huì)從 ListState 中恢復(fù)之前的 offset,并從 offset 處繼續(xù)消費(fèi)。
initializeState 小結(jié)
initializeState 過程比較復(fù)雜,總的來(lái)說(shuō)就兩個(gè)事情:
- 1、 創(chuàng)建相應(yīng)的 keyedStateBackend 和 OperatorStateBackend,并從 Checkpoint 處恢復(fù) State(具體恢復(fù)流程后續(xù)講述)
- 2、 如果 udf 實(shí)現(xiàn)了 CheckpointedFunction 接口,則調(diào)用 udf 的 initializeState 方法
4. openAllOperators
此時(shí)回到了 StreamTask 初始化流程的下一步:openAllOperators。openAllOperators 方法比較簡(jiǎn)單,源碼如下所示:
private void openAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.open();
}
}
}
openAllOperators 方法會(huì)調(diào)用 OperatorChain 中所有 StreamOperator 的 open 方法,通過繼承關(guān)系,最后調(diào)用的仍然是 AbstractUdfStreamOperator 類的 open 方法。AbstractUdfStreamOperator 類的 open 方法源碼如下所示:
// AbstractUdfStreamOperator 類的 open 方法
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
// FunctionUtils 類的 openFunction 方法
public static void openFunction(Function function
, Configuration parameters) {
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}
AbstractUdfStreamOperator 類的 open 方法調(diào)用 FunctionUtils 類的 openFunction 方法,openFunction 方法中會(huì)判斷當(dāng)前 userFunction 是否實(shí)現(xiàn)了 RichFunction 接口,如果實(shí)現(xiàn)了 RichFunction 接口,則調(diào)用 userFunction 的 open 方法。
openAllOperators 小結(jié)
openAllOperators 的流程比較簡(jiǎn)單,就是判斷 userFunction 是否實(shí)現(xiàn)了 RichFunction 接口,在 Flink 中實(shí)現(xiàn)了 RichFunction 表示富函數(shù),可以定義 open 和 close 相關(guān)的邏輯,在算子初始化或者關(guān)閉時(shí)會(huì)被調(diào)用。
四、 思考題
如下代碼所示,開發(fā) Flink Job 時(shí) dataStream keyBy 后連續(xù)跟 map、filter、flatMap 三個(gè)算子,請(qǐng)問這三個(gè)自定義的 Function 內(nèi)都可以使用 Flink 的 ValueState 嗎?
dataStream.keyby(_._1)
.map(new MyMapFunction())
.filter(new MyFilterFunction())
.flatMap(new MyFlatMapFunction())
先說(shuō)答案吧:在 MyMapFunction 中可以使用 ValueState,在 MyFilterFunction 和 MyFlatMapFunction 中不能使用 ValueState。如果在 MyFilterFunction 和 MyFlatMapFunction 中定義 ValueState 或 MapState,都會(huì)報(bào)錯(cuò),會(huì)顯示 keyedStateBackend 為 null。為什么呢?
首先 Flink 中只有 KeyedState 才支持 ValueState 和 MapState,OperatorState 不支持 ValueState 和 MapState。只要對(duì) KeyedStream 的操作才能使用 KeyedState,KeyedState 表示相同的 key 共享同一個(gè) State,普通的 DataStream 中沒有 key 的概念不能使用 KeyedState。
DataStream 的 keyBy 方法源碼如下所示,由源碼 DataStream 可以看到,DataStream 的 keyBy 方法會(huì)返回 KeyedStream,KeyedStream 是 DataStream 的子類,KeyedStream 經(jīng)過 map 轉(zhuǎn)換后又會(huì)變成 DataStream。所以上圖中只有 MyMapFunction 是基于 KeyedStream 操作的,MyFilterFunction 和 MyFlatMapFunction 都是基于 DataStream 操作的,沒有 key 的概念,因此不能使用 KeyedState,即不能使用 ValueState。
// DataStream 的 keyBy 方法源碼
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key));
}
MyFilterFunction 和 MyFlatMapFunction 中定義 ValueState 或 MapState 時(shí),為什么會(huì)報(bào)出 keyedStateBackend 為 null 呢?
回顧一下創(chuàng)建 keyedStateBackend 的流程,第一步就是 if (keySerializer == null) {return null;},如果不是 KeyedStream 直接就返回,即:不創(chuàng)建 keyedStatedBackend。所以出現(xiàn)了上述現(xiàn)象。
五、 總結(jié)
本文介紹了 StreamTask 類的基本功能,StreamTask 映射到 ExecutionGraph 中對(duì)應(yīng)的是一個(gè) subtask,每個(gè) StreamTask 運(yùn)行一個(gè)或多個(gè) Chain 在一起的 StreamOperator,這些 Operator 將會(huì)在一個(gè)線程內(nèi)同步執(zhí)行。隨后介紹了 StreamTask 的生命周期,主要包括了初始化、run、close 三個(gè)流程。后半部分重點(diǎn)描述了 StreamTask 初始化的過程,主要是:createStateBackend、init、initializeState()、openAllOperators() 四個(gè)過程。
后續(xù)會(huì)給大家詳細(xì)介紹 initializeState 部分如何從 Checkpoint 中恢復(fù) State,也會(huì)詳細(xì)介紹 run 流程到底是如何處理一條條數(shù)據(jù)的。