Flink 源碼:TM 端恢復及創(chuàng)建 OperatorState 的流程

本文僅為筆者平日學習記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/6Oi_1tP-7Jns3ZguMW7wLg

在之前《StreamTask 初始化流程》的文章中,省略掉了 TM 端恢復 State 的詳細過程,本文主要講述:

  • OperatorState 的恢復和創(chuàng)建流程
  • Checkpoint 處恢復的 State 如何與代碼中創(chuàng)建的 State 關聯起來

一、 TM 端恢復 OperatorState 的流程

StateBackend 創(chuàng)建 OperatorStateBackend 時 TM 端會恢復 OperatorState。目前 Flink 支持的三種 StateBackend 都對應同一種 OperatorStateBackend,即:DefaultOperatorStateBackend,具體 new DefaultOperatorStateBackend 的過程由建造器 DefaultOperatorStateBackendBuilder 完成。

三種 StateBackend 的 createOperatorStateBackend 方法非常相似,源碼如下:

public OperatorStateBackend createOperatorStateBackend(
 Environment env,
 String operatorIdentifier,
 @Nonnull Collection<OperatorStateHandle> stateHandles,
 CloseableRegistry cancelStreamRegistry) throws Exception {

 return new DefaultOperatorStateBackendBuilder(
  env.getUserClassLoader(),
  env.getExecutionConfig(),
  isUsingAsynchronousSnapshots(),
  stateHandles,
  cancelStreamRegistry).build();
}

所有的初始化流程都在 DefaultOperatorStateBackendBuilder 類的 build 方法中,build 方法源碼如下所示:

@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
 AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
  new DefaultOperatorStateBackendSnapshotStrategy(XXX);
 OperatorStateRestoreOperation restoreOperation = 
    new OperatorStateRestoreOperation(XXX);
 try {
  // OperatorState 恢復流程
  restoreOperation.restore();
 } catch (Exception e) {
  IOUtils.closeQuietly(cancelStreamRegistryForBackend);
  throw new BackendBuildingException("XXX", e);
 }
 return new DefaultOperatorStateBackend(XXX);
}

build 方法中除了構造了幾個對象以外,重點執(zhí)行了 OperatorStateRestoreOperation 的 restore 方法,restore 方法就是恢復流程。

先介紹 OperatorStateRestoreOperation 類中兩個重要的 Map:

  • registeredOperatorStates 用于保存 StateName 和 ListState 的映射關系;
  • registeredBroadcastStates 用于保存 StateName 和 BroadcastState 的映射關系

restore 源碼如下所示:

// OperatorStateRestoreOperation 類中兩個重要的 Map
// 保存 StateName 和 ListState 的映射關系
private final Map<String, PartitionableListState<?>> registeredOperatorStates;

// 保存 StateName 和 BroadcastState 的映射關系
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

// Operator State 真正的 restore 流程
@Override
public Void restore() throws Exception {
  // stateHandles 為空,表示沒有要恢復的 State
 if (stateHandles.isEmpty()) {
  return null;
 }

 // 遍歷所有 stateHandles
 for (OperatorStateHandle stateHandle : stateHandles) {
  // 通過 stateHandle 可以獲取 InputStream 讀取數據
  FSDataInputStream in = stateHandle.openInputStream();
  try {
   List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
    backendSerializationProxy.getOperatorStateMetaInfoSnapshots();

   // 從元數據中創(chuàng)建 PartitionableListStates,并沒有恢復真正的 State
   for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
    final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
     new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
        
    // registeredOperatorStates 中維護的 StateName 與 ListState 的映射關系
    PartitionableListState<?> listState = registeredOperatorStates
          .get(restoredSnapshot.getName());
        
    // listState == null 表示當前 State 還未創(chuàng)建,則創(chuàng)建,并保存到 map 中
    if (null == listState) {
     // 這里只是依賴 MetaInfo 創(chuàng)建了 PartitionableListState,并沒有恢復真正的 State 數據
     listState = new PartitionableListState<>(restoredMetaInfo);
     // 創(chuàng)建出的 State 數據 put 到 registeredOperatorStates 中
     registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
    }
   }

   // 真正恢復 State 的操作
   for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
    stateHandle.getStateNameToPartitionOffsets().entrySet()) {
    final String stateName = nameToOffsets.getKey();
        // 通過 StateName 從 registeredOperatorStates 中獲取 ListState
        // 因為之前已經根據元數據創(chuàng)建了 State,
        // 所以這里 get 不到,只能是因為當前的 StateName 屬于 BroadcastState
    PartitionableListState<?> listStateForName = 
            registeredOperatorStates.get(stateName);
        
    // listState 為 null,表示恢復 Broadcast 相關的 State
    if (listStateForName == null) {
     BackendWritableBroadcastState<?, ?> broadcastStateForName = 
            registeredBroadcastStates.get(stateName);
     deserializeBroadcastStateValues(broadcastStateForName, 
                                          in, nameToOffsets.getValue());
    } else {
     // 恢復 ListState,將恢復出來的元素 add 到 ListState 中
     deserializeOperatorStateValues(listStateForName, 
                                         in, nameToOffsets.getValue());
    }
   }
  } finally {
   Thread.currentThread().setContextClassLoader(restoreClassLoader);
   if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
    IOUtils.closeQuietly(in);
   }
  }
 }
 return null;
}

restore 方法中拿到的就是 JM 分配給當前 subtask 的 stateHandles,如果 stateHandles 為空表示沒有要恢復的 State 則直接返回 null,可能是因為任務是直接啟動,而不是從 Checkpoint 處恢復。否則 stateHandles 不為空的情況,就遍歷一個個 OperatorStateHandle,通過 stateHandle 可以獲取 InputStream 讀取數據。

首先讀出元數據,用于創(chuàng)建 PartitionableListState,并沒有真正恢復 State 數據,PartitionableListState 是 OperatorState 對 ListState 的具體實現。ListState 維護在 registeredOperatorStates 這個 Map 中,通過 StateName 從 registeredOperatorStates 中 get,get 不到時,通過元數據創(chuàng)建 State,并存放在 registeredOperatorStates 中。

代碼中省略了 BroadcastState 的創(chuàng)建流程,整體流程與 ListState 流程類似,只不過 BroadcastState 維護在 registeredBroadcastStates 中。

最后真正的恢復 State 數據,對于 ListState 而言將恢復出來的元素 add 到 ListState 中?;謴?State 數據的過程其實用反序列化器對狀態(tài)數據反序列化生成對象的過程。反序列化器維護在 PartitionableListState 的元數據中。

到這里 OperatorState 就恢復完成,此時映射關系已經保存到 OperatorStateRestoreOperation 類的兩個 Map 集合中?,F在又回到 DefaultOperatorStateBackendBuilder 類的 build 方法中,就會發(fā)現其實這兩個 Map 是好多地方共享的。這里再貼一下 build 方法的完整源碼,重點關注兩個 Map:

@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
 // 保存 StateName 和 ListState 的映射關系
 Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
 // 保存 StateName 和 BroadcastState 的映射關系
 Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates = 
    new HashMap<>();

 CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
  
 OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
  cancelStreamRegistry,
  userClassloader,
  // 將兩個 Map 傳遞進去,即:restore 過程中,映射關系會存儲在這兩個 Map 中
  registeredOperatorStates,
  registeredBroadcastStates,
  restoreStateHandles
 );
 try {
  // OperatorState 恢復流程
  restoreOperation.restore();
 } catch (Exception e) {
  IOUtils.closeQuietly(cancelStreamRegistryForBackend);
  throw new BackendBuildingException("XXX", e);
 }
 AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
  new DefaultOperatorStateBackendSnapshotStrategy(
   userClassloader,
   asynchronousSnapshots,
   // 將兩個 Map 傳遞給 DefaultOperatorStateBackendSnapshotStrategy
   registeredOperatorStates,
   registeredBroadcastStates,
   cancelStreamRegistryForBackend);
  
 return new DefaultOperatorStateBackend(
  executionConfig,
  cancelStreamRegistryForBackend,
  // 再將兩個 Map 傳遞給 DefaultOperatorStateBackend
  registeredOperatorStates,
  registeredBroadcastStates,
  new HashMap<>(),
  new HashMap<>(),
  snapshotStrategy
 );
}

可以看到 build 方法剛開始會 new 兩個 Map,然后傳遞給了 OperatorStateRestoreOperation,之后 OperatorStateRestoreOperation 的 restore 流程(也就是上述分析的恢復流程)實際上將 Checkpoint 中恢復出來的映射關系保存到了這兩個 Map 中。之后兩個 Map 又傳遞給了 DefaultOperatorStateBackendSnapshotStrategy 和 DefaultOperatorStateBackend。

所以得出結論:DefaultOperatorStateBackend 中持有從 Checkpoint 處恢復出來的 StateName 與具體 State 的映射關系。

到這里 DefaultOperatorStateBackend 就創(chuàng)建完成了,同時留兩個問題:

  • 上面流程雖然將 OperatorState 從 Checkpoint 中恢復了,但用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復的 OperatorState 關聯起來呢?

  • 另外對于直接啟動,不從 Checkpoint 處恢復的任務,OperatorState 又是如何創(chuàng)建出來的?

帶著這兩個問題閱讀下面流程。

二、 用戶定義的 OperatorState 創(chuàng)建流程

Flink 源碼中最典型的使用 OperatorState 的場景就是 FlinkConsumer 使用 ListState 去維護 Kafka 的 offset 信息,所以本文就從這塊源碼入手,看一下這個 ListState 創(chuàng)建流程。

FlinkKafkaConsumerBase 類的 initializeState 方法中用到了 getUnionListState 創(chuàng)建一個 ListState,簡潔版源碼如下所示:

@Override
public final void initializeState(FunctionInitializationContext context) {
 OperatorStateStore stateStore = context.getOperatorStateStore();
 unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
   OFFSETS_STATE_NAME,
   TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
}

這里調用的 OperatorStateStore 的 getUnionListState 方法。OperatorStateStore 是個接口,它只有一個實現類,就是前面創(chuàng)建出來的 DefaultOperatorStateBackend。所以這里會調用 DefaultOperatorStateBackend 類的 getUnionListState 方法。不過 DefaultOperatorStateBackend 中還有一個 getListState(ListStateDescriptor stateDescriptor) 方法,這也就是 OperatorState 類型的 ListState 兩種獲取方式??梢钥匆幌逻@兩個方法的源碼:

@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
 return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}

@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
 return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}

源碼中可以看到,無論業(yè)務使用的是 getListState 還是 getUnionListState 方法獲取 ListState ,最后都會調用同一個方法,即:getListState(ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode)。加了一個參數 OperatorStateHandle.Mode 用于區(qū)分 OperatorState 的模式:

  • getListState 對應 SPLIT_DISTRIBUTE 模式
  • getUnionListState 對應 UNION 模式

getListState(stateDescriptor, mode) 方法源碼如下所示:

// 無論是 getListState 還是 getUnionListState 方法都會調用這里,
// 只不過傳遞的 Mode 參數不同而已
private <S> ListState<S> getListState(
  ListStateDescriptor<S> stateDescriptor,
  OperatorStateHandle.Mode mode) throws StateMigrationException {

 String name = Preconditions.checkNotNull(stateDescriptor.getName());

 TypeSerializer<S> partitionStateSerializer = 
    Preconditions.checkNotNull(stateDescriptor.getElementSerializer());

 PartitionableListState<S> partitionableListState = (PartitionableListState<S>) 
    registeredOperatorStates.get(name);

 // registeredOperatorStates 中維護的是 Checkpoint 中恢復的 StateName 和 ListState 的映射關系
 // 如果 partitionableListState == null 表示從 Checkpoint 中沒有恢復出這個 State,
 // 即:這是一個新的 State,則新建一個 PartitionableListState,并維護在 Map 中
 if (null == partitionableListState) {
  partitionableListState = new PartitionableListState<>(
   new RegisteredOperatorStateBackendMetaInfo<>(
    name,
    partitionStateSerializer,
    mode));

  registeredOperatorStates.put(name, partitionableListState);
 } else {
  // State 已經從 Checkpoint 中恢復了,檢查兼容性問題
  // 這里會檢查 StateName 和 AssignmentMode 是否可以匹配
  checkStateNameAndMode(
    partitionableListState.getStateMetaInfo().getName(),
    name,
    partitionableListState.getStateMetaInfo().getAssignmentMode(),
    mode);

  RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
   partitionableListState.getStateMetaInfo();

  // 檢查 序列化是否兼容
  TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

  TypeSerializerSchemaCompatibility<S> stateCompatibility =
   restoredPartitionableListStateMetaInfo.
      updatePartitionStateSerializer(newPartitionStateSerializer);
  //  不兼容,則拋出異常
  if (stateCompatibility.isIncompatible()) {
   throw new StateMigrationException("XXX.");
  }
  partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
 }

 accessedStatesByName.put(name, partitionableListState);
 // 返回 State
 return partitionableListState;
}

getListState(stateDescriptor, mode) 方法首先通過 name 從 registeredOperatorStates 中 get 對應的 ListState 保存到 partitionableListState 中,registeredOperatorStates 維護的是 Checkpoint 中恢復的 StateName 和 ListState 的映射關系。所以 partitionableListState == null 表示從 Checkpoint 中沒有恢復出這個 State,即:這是一個新的 State,所以新建一個 PartitionableListState,并保存在 registeredOperatorStates 中。

反之,partitionableListState != null 表示 State 已經從 Checkpoint 中恢復了,開始檢查兼容性,首先會檢查 Checkpoint 中恢復的 State 和用戶新申請的 StateName 和 AssignmentMode 是否可以匹配。

  • StateName 和 name 肯定是匹配的,因為 partitionableListState 是根據 name get 出來的。
  • AssignmentMode 枚舉用于區(qū)分應用層使用的 getListState 恢復還是 getUnionListState 恢復,getListState 表示 SPLIT_DISTRIBUTE 模式,getUnionListState 表示 UNION 模式。如果 State 中存儲的是 SPLIT_DISTRIBUTE 模式,但任務恢復時,代碼改成了 getUnionListState,實際上 State 不能正常恢復的。

StateName 和 AssignmentMode 檢查完畢后,會檢查序列化是否兼容,不兼容,則拋出異常。兼容則會返回 State。

上述流程就回答了最開始提的兩個問題:

  1. OperatorState 從 Checkpoint 中恢復后,用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復的 OperatorState 關聯起來呢?

    答:依賴 registeredOperatorStates 這個 Map 維護了 StateName 和 ListState 的映射關系,用戶創(chuàng)建 State 是通過 StateName 從 registeredOperatorStates 中查找,如果能找到,對其進行兼容性檢查,檢查通過就會返回從 Checkpoint 中恢復的 ListState,從而完成了關聯。

  2. 對于直接啟動,不從 Checkpoint 處恢復的任務,OperatorState 又是如何創(chuàng)建出來的?

    答:對于直接啟動的任務,registeredOperatorStates 肯定是空的。創(chuàng)建 State 時,從 registeredOperatorStates 中 get 不到,所以就創(chuàng)建一個新的 PartitionableListState,并保存在 registeredOperatorStates 中。

到這里,OperatorState 就完成了恢復,且用戶的 State 也正常的創(chuàng)建出來了。

三、總結

文中首先介紹了 OperatorState 的恢復和創(chuàng)建流程,并介紹從 Checkpoint 處恢復的 State 如何與代碼中創(chuàng)建的 State 關聯起來的。后續(xù)將會詳細介紹 KeyedState 的恢復創(chuàng)建流程以及如何將 Checkpoint 處恢復的 State 如何與代碼中創(chuàng)建的 State 關聯起來。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容