本文僅為筆者平日學習記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/6Oi_1tP-7Jns3ZguMW7wLg
在之前《StreamTask 初始化流程》的文章中,省略掉了 TM 端恢復 State 的詳細過程,本文主要講述:
- OperatorState 的恢復和創(chuàng)建流程
- Checkpoint 處恢復的 State 如何與代碼中創(chuàng)建的 State 關聯起來
一、 TM 端恢復 OperatorState 的流程
StateBackend 創(chuàng)建 OperatorStateBackend 時 TM 端會恢復 OperatorState。目前 Flink 支持的三種 StateBackend 都對應同一種 OperatorStateBackend,即:DefaultOperatorStateBackend,具體 new DefaultOperatorStateBackend 的過程由建造器 DefaultOperatorStateBackendBuilder 完成。
三種 StateBackend 的 createOperatorStateBackend 方法非常相似,源碼如下:
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
所有的初始化流程都在 DefaultOperatorStateBackendBuilder 類的 build 方法中,build 方法源碼如下所示:
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(XXX);
OperatorStateRestoreOperation restoreOperation =
new OperatorStateRestoreOperation(XXX);
try {
// OperatorState 恢復流程
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("XXX", e);
}
return new DefaultOperatorStateBackend(XXX);
}
build 方法中除了構造了幾個對象以外,重點執(zhí)行了 OperatorStateRestoreOperation 的 restore 方法,restore 方法就是恢復流程。
先介紹 OperatorStateRestoreOperation 類中兩個重要的 Map:
- registeredOperatorStates 用于保存 StateName 和 ListState 的映射關系;
- registeredBroadcastStates 用于保存 StateName 和 BroadcastState 的映射關系
restore 源碼如下所示:
// OperatorStateRestoreOperation 類中兩個重要的 Map
// 保存 StateName 和 ListState 的映射關系
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
// 保存 StateName 和 BroadcastState 的映射關系
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
// Operator State 真正的 restore 流程
@Override
public Void restore() throws Exception {
// stateHandles 為空,表示沒有要恢復的 State
if (stateHandles.isEmpty()) {
return null;
}
// 遍歷所有 stateHandles
for (OperatorStateHandle stateHandle : stateHandles) {
// 通過 stateHandle 可以獲取 InputStream 讀取數據
FSDataInputStream in = stateHandle.openInputStream();
try {
List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
// 從元數據中創(chuàng)建 PartitionableListStates,并沒有恢復真正的 State
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
// registeredOperatorStates 中維護的 StateName 與 ListState 的映射關系
PartitionableListState<?> listState = registeredOperatorStates
.get(restoredSnapshot.getName());
// listState == null 表示當前 State 還未創(chuàng)建,則創(chuàng)建,并保存到 map 中
if (null == listState) {
// 這里只是依賴 MetaInfo 創(chuàng)建了 PartitionableListState,并沒有恢復真正的 State 數據
listState = new PartitionableListState<>(restoredMetaInfo);
// 創(chuàng)建出的 State 數據 put 到 registeredOperatorStates 中
registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
}
}
// 真正恢復 State 的操作
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
final String stateName = nameToOffsets.getKey();
// 通過 StateName 從 registeredOperatorStates 中獲取 ListState
// 因為之前已經根據元數據創(chuàng)建了 State,
// 所以這里 get 不到,只能是因為當前的 StateName 屬于 BroadcastState
PartitionableListState<?> listStateForName =
registeredOperatorStates.get(stateName);
// listState 為 null,表示恢復 Broadcast 相關的 State
if (listStateForName == null) {
BackendWritableBroadcastState<?, ?> broadcastStateForName =
registeredBroadcastStates.get(stateName);
deserializeBroadcastStateValues(broadcastStateForName,
in, nameToOffsets.getValue());
} else {
// 恢復 ListState,將恢復出來的元素 add 到 ListState 中
deserializeOperatorStateValues(listStateForName,
in, nameToOffsets.getValue());
}
}
} finally {
Thread.currentThread().setContextClassLoader(restoreClassLoader);
if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
IOUtils.closeQuietly(in);
}
}
}
return null;
}
restore 方法中拿到的就是 JM 分配給當前 subtask 的 stateHandles,如果 stateHandles 為空表示沒有要恢復的 State 則直接返回 null,可能是因為任務是直接啟動,而不是從 Checkpoint 處恢復。否則 stateHandles 不為空的情況,就遍歷一個個 OperatorStateHandle,通過 stateHandle 可以獲取 InputStream 讀取數據。
首先讀出元數據,用于創(chuàng)建 PartitionableListState,并沒有真正恢復 State 數據,PartitionableListState 是 OperatorState 對 ListState 的具體實現。ListState 維護在 registeredOperatorStates 這個 Map 中,通過 StateName 從 registeredOperatorStates 中 get,get 不到時,通過元數據創(chuàng)建 State,并存放在 registeredOperatorStates 中。
代碼中省略了 BroadcastState 的創(chuàng)建流程,整體流程與 ListState 流程類似,只不過 BroadcastState 維護在 registeredBroadcastStates 中。
最后真正的恢復 State 數據,對于 ListState 而言將恢復出來的元素 add 到 ListState 中?;謴?State 數據的過程其實用反序列化器對狀態(tài)數據反序列化生成對象的過程。反序列化器維護在 PartitionableListState 的元數據中。
到這里 OperatorState 就恢復完成,此時映射關系已經保存到 OperatorStateRestoreOperation 類的兩個 Map 集合中?,F在又回到 DefaultOperatorStateBackendBuilder 類的 build 方法中,就會發(fā)現其實這兩個 Map 是好多地方共享的。這里再貼一下 build 方法的完整源碼,重點關注兩個 Map:
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
// 保存 StateName 和 ListState 的映射關系
Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
// 保存 StateName 和 BroadcastState 的映射關系
Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates =
new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
cancelStreamRegistry,
userClassloader,
// 將兩個 Map 傳遞進去,即:restore 過程中,映射關系會存儲在這兩個 Map 中
registeredOperatorStates,
registeredBroadcastStates,
restoreStateHandles
);
try {
// OperatorState 恢復流程
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("XXX", e);
}
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(
userClassloader,
asynchronousSnapshots,
// 將兩個 Map 傳遞給 DefaultOperatorStateBackendSnapshotStrategy
registeredOperatorStates,
registeredBroadcastStates,
cancelStreamRegistryForBackend);
return new DefaultOperatorStateBackend(
executionConfig,
cancelStreamRegistryForBackend,
// 再將兩個 Map 傳遞給 DefaultOperatorStateBackend
registeredOperatorStates,
registeredBroadcastStates,
new HashMap<>(),
new HashMap<>(),
snapshotStrategy
);
}
可以看到 build 方法剛開始會 new 兩個 Map,然后傳遞給了 OperatorStateRestoreOperation,之后 OperatorStateRestoreOperation 的 restore 流程(也就是上述分析的恢復流程)實際上將 Checkpoint 中恢復出來的映射關系保存到了這兩個 Map 中。之后兩個 Map 又傳遞給了 DefaultOperatorStateBackendSnapshotStrategy 和 DefaultOperatorStateBackend。
所以得出結論:DefaultOperatorStateBackend 中持有從 Checkpoint 處恢復出來的 StateName 與具體 State 的映射關系。
到這里 DefaultOperatorStateBackend 就創(chuàng)建完成了,同時留兩個問題:
上面流程雖然將 OperatorState 從 Checkpoint 中恢復了,但用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復的 OperatorState 關聯起來呢?
另外對于直接啟動,不從 Checkpoint 處恢復的任務,OperatorState 又是如何創(chuàng)建出來的?
帶著這兩個問題閱讀下面流程。
二、 用戶定義的 OperatorState 創(chuàng)建流程
Flink 源碼中最典型的使用 OperatorState 的場景就是 FlinkConsumer 使用 ListState 去維護 Kafka 的 offset 信息,所以本文就從這塊源碼入手,看一下這個 ListState 創(chuàng)建流程。
FlinkKafkaConsumerBase 類的 initializeState 方法中用到了 getUnionListState 創(chuàng)建一個 ListState,簡潔版源碼如下所示:
@Override
public final void initializeState(FunctionInitializationContext context) {
OperatorStateStore stateStore = context.getOperatorStateStore();
unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
}
這里調用的 OperatorStateStore 的 getUnionListState 方法。OperatorStateStore 是個接口,它只有一個實現類,就是前面創(chuàng)建出來的 DefaultOperatorStateBackend。所以這里會調用 DefaultOperatorStateBackend 類的 getUnionListState 方法。不過 DefaultOperatorStateBackend 中還有一個 getListState(ListStateDescriptor stateDescriptor) 方法,這也就是 OperatorState 類型的 ListState 兩種獲取方式??梢钥匆幌逻@兩個方法的源碼:
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}
@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}
源碼中可以看到,無論業(yè)務使用的是 getListState 還是 getUnionListState 方法獲取 ListState ,最后都會調用同一個方法,即:getListState(ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode)。加了一個參數 OperatorStateHandle.Mode 用于區(qū)分 OperatorState 的模式:
- getListState 對應 SPLIT_DISTRIBUTE 模式
- getUnionListState 對應 UNION 模式
getListState(stateDescriptor, mode) 方法源碼如下所示:
// 無論是 getListState 還是 getUnionListState 方法都會調用這里,
// 只不過傳遞的 Mode 參數不同而已
private <S> ListState<S> getListState(
ListStateDescriptor<S> stateDescriptor,
OperatorStateHandle.Mode mode) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
TypeSerializer<S> partitionStateSerializer =
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
PartitionableListState<S> partitionableListState = (PartitionableListState<S>)
registeredOperatorStates.get(name);
// registeredOperatorStates 中維護的是 Checkpoint 中恢復的 StateName 和 ListState 的映射關系
// 如果 partitionableListState == null 表示從 Checkpoint 中沒有恢復出這個 State,
// 即:這是一個新的 State,則新建一個 PartitionableListState,并維護在 Map 中
if (null == partitionableListState) {
partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
registeredOperatorStates.put(name, partitionableListState);
} else {
// State 已經從 Checkpoint 中恢復了,檢查兼容性問題
// 這里會檢查 StateName 和 AssignmentMode 是否可以匹配
checkStateNameAndMode(
partitionableListState.getStateMetaInfo().getName(),
name,
partitionableListState.getStateMetaInfo().getAssignmentMode(),
mode);
RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
partitionableListState.getStateMetaInfo();
// 檢查 序列化是否兼容
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
TypeSerializerSchemaCompatibility<S> stateCompatibility =
restoredPartitionableListStateMetaInfo.
updatePartitionStateSerializer(newPartitionStateSerializer);
// 不兼容,則拋出異常
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("XXX.");
}
partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
}
accessedStatesByName.put(name, partitionableListState);
// 返回 State
return partitionableListState;
}
getListState(stateDescriptor, mode) 方法首先通過 name 從 registeredOperatorStates 中 get 對應的 ListState 保存到 partitionableListState 中,registeredOperatorStates 維護的是 Checkpoint 中恢復的 StateName 和 ListState 的映射關系。所以 partitionableListState == null 表示從 Checkpoint 中沒有恢復出這個 State,即:這是一個新的 State,所以新建一個 PartitionableListState,并保存在 registeredOperatorStates 中。
反之,partitionableListState != null 表示 State 已經從 Checkpoint 中恢復了,開始檢查兼容性,首先會檢查 Checkpoint 中恢復的 State 和用戶新申請的 StateName 和 AssignmentMode 是否可以匹配。
- StateName 和 name 肯定是匹配的,因為 partitionableListState 是根據 name get 出來的。
- AssignmentMode 枚舉用于區(qū)分應用層使用的 getListState 恢復還是 getUnionListState 恢復,getListState 表示 SPLIT_DISTRIBUTE 模式,getUnionListState 表示 UNION 模式。如果 State 中存儲的是 SPLIT_DISTRIBUTE 模式,但任務恢復時,代碼改成了 getUnionListState,實際上 State 不能正常恢復的。
StateName 和 AssignmentMode 檢查完畢后,會檢查序列化是否兼容,不兼容,則拋出異常。兼容則會返回 State。
上述流程就回答了最開始提的兩個問題:
-
OperatorState 從 Checkpoint 中恢復后,用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復的 OperatorState 關聯起來呢?
答:依賴 registeredOperatorStates 這個 Map 維護了 StateName 和 ListState 的映射關系,用戶創(chuàng)建 State 是通過 StateName 從 registeredOperatorStates 中查找,如果能找到,對其進行兼容性檢查,檢查通過就會返回從 Checkpoint 中恢復的 ListState,從而完成了關聯。
-
對于直接啟動,不從 Checkpoint 處恢復的任務,OperatorState 又是如何創(chuàng)建出來的?
答:對于直接啟動的任務,registeredOperatorStates 肯定是空的。創(chuàng)建 State 時,從 registeredOperatorStates 中 get 不到,所以就創(chuàng)建一個新的 PartitionableListState,并保存在 registeredOperatorStates 中。
到這里,OperatorState 就完成了恢復,且用戶的 State 也正常的創(chuàng)建出來了。
三、總結
文中首先介紹了 OperatorState 的恢復和創(chuàng)建流程,并介紹從 Checkpoint 處恢復的 State 如何與代碼中創(chuàng)建的 State 關聯起來的。后續(xù)將會詳細介紹 KeyedState 的恢復創(chuàng)建流程以及如何將 Checkpoint 處恢復的 State 如何與代碼中創(chuàng)建的 State 關聯起來。