本文僅為筆者平日學(xué)習(xí)記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/eaALnpd_qHQg6fxI12fQjg
本文會詳細分析 TM 端恢復(fù)及創(chuàng)建 KeyedState 的流程,恢復(fù)過程會分析 RocksDB 和 Fs 兩種 StateBackend 的恢復(fù)流程,創(chuàng)建流程會介紹 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來。
一、 RocksDBKeyedStateBackend 創(chuàng)建流程
從 RocksDBStateBackend 類的 createKeyedStateBackend 方法開始,createKeyedStateBackend 方法源碼主要加載一些配置和創(chuàng)建 RocksDBKeyedStateBackend,就不貼出來了。簡單介紹一下 createKeyedStateBackend 方法的功能:
- 加載 RocksDB JNI library
- 初始化 RocksDB 的本地數(shù)據(jù)目錄,對應(yīng)的是 RocksDB
state.backend.rocksdb.localdir參數(shù)配置的目錄 - new RocksDBKeyedStateBackendBuilder 的構(gòu)造器,所有狀態(tài)的恢復(fù)及初始化都封裝在 build 方法中。
RocksDBKeyedStateBackendBuilder 的 build 方法刪減后源碼如下所示:
@Override
public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
// 維護 StateName 與 StateInfo 的映射關(guān)系
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>
kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
SnapshotStrategy<K> snapshotStrategy;
try {
// 保存 CheckpointID 與 sst 的映射
SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
long lastCompletedCheckpointId = -1L;
// 準(zhǔn)備 instanceBasePath 目錄,用于本地狀態(tài)存儲
prepareDirectories();
// 根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
// 1、 RocksDB 無需恢復(fù),直接啟動
// 2、 RocksDB 增量 Checkpoint 的狀態(tài)恢復(fù)
// 3、 RocksDB 全量 Checkpoint 的狀態(tài)恢復(fù)
restoreOperation = getRocksDBRestoreOperation(XXX);
// 恢復(fù)狀態(tài),并打開 db,具體 執(zhí)行三種不同恢復(fù)流程
RocksDBRestoreResult restoreResult = restoreOperation.restore();
db = restoreResult.getDb();
// RocksDB 的增量 Checkpoint 模式,則獲取 sst 文件,
if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
backendUID = restoreResult.getBackendUID();
materializedSstFiles = restoreResult.getRestoredSstFiles();
lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
}
// 初始化 Savepoint 和 Checkpoint 的 snapshot 策略
snapshotStrategy = initializeSavepointAndCheckpointStrategies(XXX);
} catch (Throwable e) {
// State 初始化異常,做一些清理操作
}
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
// kvStateInformation 會傳遞給 RocksDBKeyedStateBackend
return new RocksDBKeyedStateBackend<>(XXX);
}
build 方法中同樣定義了一個 Map kvStateInformation 用于維護 StateName 與 StateInfo 的映射關(guān)系。之后會準(zhǔn)備本地目錄,用于本地狀態(tài)存儲。
getRocksDBRestoreOperation 方法會創(chuàng)建 RestoreOperation,源碼如下所示:
// 根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
// 1、 RocksDB 直接啟動無需恢復(fù)
// 2、 增量 Checkpoint 的 RocksDB 恢復(fù)
// 3、 全量 Checkpoint 的 RocksDB 恢復(fù)
private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation() {
// restoreStateHandles 為空表示沒有 State 需要恢復(fù),構(gòu)造 NoneRestoreOperation
if (restoreStateHandles.isEmpty()) {
return new RocksDBNoneRestoreOperation<>(X);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
// StateHandle 是 Increment 模式,則構(gòu)造 IncrementalRestoreOperation
// 否則構(gòu)造 FullRestoreOperation
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(XXX);
} else {
return new RocksDBFullRestoreOperation<>(XXX);
}
}
getRocksDBRestoreOperation 方法會根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
restoreStateHandles 為空表示沒有 State 需要恢復(fù),構(gòu)造 NoneRestoreOperation,即:不恢復(fù) State 的方式,直接啟動
restoreStateHandles 不為空的情況下,判斷 StateHandle 的類型, StateHandle 是 Increment 模式,則構(gòu)造 IncrementalRestoreOperation;否則構(gòu)造 FullRestoreOperation
build 方法下一步就會執(zhí)行具體 RocksDBRestoreOperation 的 restore 方法了,三種 RocksDBRestoreOperation 的 restore 流程完全不同,且比較復(fù)雜,后續(xù)單獨介紹。后續(xù)會執(zhí)行 initializeSavepointAndCheckpointStrategies 方法初始化 Savepoint 和 Checkpoint 的 snapshot 策略,方法的返回值是 SnapshotStrategy 類型,SnapshotStrategy 封裝了 Checkpoint 和 Savepoint 兩個策略。如果用戶觸發(fā) Checkpoint,則執(zhí)行 Checkpoint 策略,觸發(fā) Savepoint,則執(zhí)行 Checkpoint 策略。
initializeSavepointAndCheckpointStrategies 方法源碼如下所示:
// SnapshotStrategy 封裝了 Checkpoint 和 Savepoint 兩個策略。
class SnapshotStrategy<K> {
final RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
final RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;
}
private SnapshotStrategy<K> initializeSavepointAndCheckpointStrategies(XXX) {
// 創(chuàng)建 Savepoint 的 snapshot 類為 Full Snapshot 策略
RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy =
new RocksFullSnapshotStrategy<>(XXX);
RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
// 如果開啟了增量 Checkpoint,
// 則 Checkpoint 的 snapshot 類為 Increment Snapshot 策略
if (enableIncrementalCheckpointing) {
checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>(XXX);
} else {
// 未開始增量 Checkpoint,
// 則 Checkpoint 的 snapshot 為 Savepoint 的 snapshot 策略
checkpointSnapshotStrategy = savepointSnapshotStrategy;
}
// 封裝兩個策略到 SnapshotStrategy 中
return new SnapshotStrategy<>(checkpointSnapshotStrategy, savepointSnapshotStrategy);
}
可以看到 RocksDB Savepoint 的 snapshot 類永遠為 Full Snapshot 策略,如果開啟增量 Checkpoint,則 Checkpoint 的 snapshot 類為 Increment Snapshot 策略。未開始增量 Checkpoint,則 Checkpoint 的 snapshot 為 Savepoint 的 snapshot 策略,最后封裝兩個策略到 SnapshotStrategy 中。
這塊可以得出一個結(jié)論:使用 RocksDBStateBackend 時,如果不開啟增量 Checkpoint,那么觸發(fā) Savepoint 和 Checkpoint 都是相同的策略,即:都是 Full Snapshot 模式。
build 方法中上述流程如果任何階段拋出異常,都認(rèn)為 State 初始化異常,會做一些清理操作,并認(rèn)為本次任務(wù)恢復(fù)失敗。如果一切都成功,最后會構(gòu)建出 RocksDBKeyedStateBackend。
build 方法結(jié)束!下面重點關(guān)注三種不同的 RestoreOperation 具體是怎么 restore 的。
二、 RocksDB 的 NoneRestoreOperation 恢復(fù)流程
NoneRestoreOperation 表示沒有 State 需要恢復(fù),直接啟動,所以該模式下 restore 流程特別簡單。
restore 方法源碼如下所示:
@Override
public RocksDBRestoreResult restore() throws Exception {
openDB();
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, -1, null, null);
}
直接打開一個空的 RocksDB 就恢復(fù)完成返回結(jié)果。
三、 RocksDB 的 IncrementalRestoreOperation 恢復(fù)流程
RocksDBIncrementalRestoreOperation 類的 restore 方法源碼如下所示:
@Override
public RocksDBRestoreResult restore() throws Exception {
if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
return null;
}
final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
// restoreStateHandles 數(shù)量大于 1,
// 或者 恢復(fù)的 keyGroupRange 與當(dāng)前負(fù)責(zé)的 keyGroupRange 不同,
// 則使用 Rescaling 模式。如果沒有改并發(fā),則關(guān)閉 Rescaling 模式
boolean isRescaling = (restoreStateHandles.size() > 1 ||
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));
if (isRescaling) {
// Rescaling 開啟的恢復(fù)模式,相當(dāng)于改并發(fā)恢復(fù),需要依賴 KeyGroup 恢復(fù)
restoreWithRescaling(restoreStateHandles);
} else {
// Rescaling 關(guān)閉的恢復(fù)模式,相當(dāng)于沒有改變并發(fā),直接恢復(fù) sst 即可
// 沒有改并發(fā)就只有一個 StateHandle,所以這里只需要將 firstStateHandle 當(dāng)做參數(shù)傳遞即可
restoreWithoutRescaling(theFirstStateHandle);
}
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
}
如果 restoreStateHandles 為 null 或者集合為空,直接返回 null。否則開始后面的恢復(fù)流程。
恢復(fù)流程第一步檢測是否是 rescale 模式,換言之:檢測是否新舊 Job 之間修改并發(fā)了。isRescaling 為 true 表示修改并發(fā)了,isRescaling 為 false 表示沒有修改并發(fā)。
判斷是否修改并發(fā)的邏輯:
代碼中判斷邏輯:如果 restoreStateHandles 集合中元素數(shù)量大于 1 或者恢復(fù)的 keyGroupRange 與當(dāng)前負(fù)責(zé)的 keyGroupRange 不同,則開啟 Rescaling 模式。否則關(guān)閉 Rescaling 模式。
分析一波:如果不修改并發(fā),那么新 Job 的 subtask 與舊 Job 的 subtask 是一對一的關(guān)系,每個 subtask 只會恢復(fù)舊 Job 對應(yīng)的那一個 subtask 的 StateHandle,且新舊 subtask 負(fù)責(zé)的 KeyGroupRange 是相同的。
代碼中 restoreStateHandles 集合中元素數(shù)量表示要恢復(fù)的 KeyedStateHandle 的數(shù)據(jù),數(shù)量大于 1 表示當(dāng)前 subtask 要恢復(fù)舊 Job 的多個 subtask 的 KeyedStateHandle。所以 restoreStateHandles.size() > 1 必然修改了并發(fā)。
代碼中拿要恢復(fù)的 StateHandle 的 KeyGroupRange 與當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange 進行比較,兩者不同則表示新舊 subtask 負(fù)責(zé)的 KeyGroupRange 不是完全相同的,也可以推斷出一定修改并發(fā)了。
例如:舊的 subtask 0 負(fù)責(zé)的 KeyGroupRange(0,9),新的 subtask 0 負(fù)責(zé)的 KeyGroupRange(0,6),雖然 restoreStateHandles 集合中只有一個 StateHandle,但是 KeyGroupRange 變了,也可以推斷出并發(fā)改變了。
判斷完是否修改并發(fā),就會按照是否修改并發(fā),進行兩種不同的模式開始恢復(fù)流程。如源碼所示,restoreWithoutRescaling 方法表示為修改并發(fā)的恢復(fù)模式,這里有個小細節(jié),restoreWithoutRescaling 方法的參數(shù)是 KeyedStateHandle 類型,而不用傳整個集合,因為不修改并發(fā)只會恢復(fù)一個 KeyedStateHandle。而 restoreWithRescaling 方法的參數(shù)就是 KeyedStateHandle 的集合類型。
下面詳細介紹兩種恢復(fù)模式。
未修改并發(fā)的恢復(fù)流程
restoreWithoutRescaling 方法源碼如下所示:
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
// 遠程的 KeyedStateHandle 恢復(fù)
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
// 保存 StateHandle 對應(yīng)的那一次 Checkpoint 對應(yīng)的文件狀態(tài),
// 包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
/**
* 從遠程恢復(fù) State 的過程:
* 1、 本地創(chuàng)建 tmp 目錄
* 2、 從遠程拉取 sst 文件到本地,將 遠程的 StateHandle 轉(zhuǎn)換為 本地 的 StateHandle
* 3、 調(diào)用 restoreFromLocalState 方法,從 local 恢復(fù) State
* 4、 清理 tmp 文件
*/
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
// Local 的 KeyedStateHandle 恢復(fù)
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
// 保存 StateHandle 對應(yīng)的那一次 Checkpoint 對應(yīng)的文件狀態(tài),
// 包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
// 從 local 恢復(fù) State 的方法
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("XXX");
}
}
restoreWithoutRescaling 方法中首先會區(qū)分 KeyedStateHandle 到底是 IncrementalRemoteKeyedStateHandle 類型還是 IncrementalLocalKeyedStateHandle,區(qū)別在于一個是 Remote 一個是 Local。正常從 dfs 上恢復(fù)就屬于 Remote 模式,但是 RocksDB 增量 Checkpoint 有個 local-recovery 的優(yōu)化。
local-recovery 是指:Checkpoint 時會在本地目錄保留一份狀態(tài)快照,當(dāng)任務(wù)重啟時避免了從 dfs 上拉取狀態(tài)文件的過程,加速任務(wù)恢復(fù)。
Local 模式的恢復(fù)流程:
restorePreviousIncrementalFilesStatus 方法保存 StateHandle 對應(yīng)的那一次 Checkpoint 對應(yīng)的文件狀態(tài),包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系。
然后調(diào)用 restoreFromLocalState 方法從 Local 恢復(fù) State 即可。
Remote 模式的恢復(fù)流程:
同樣也是調(diào)用 restorePreviousIncrementalFilesStatus 方法保存 StateHandle 對應(yīng)的那一次 Checkpoint 對應(yīng)的文件狀態(tài),包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系。
區(qū)別在于調(diào)用 restoreFromRemoteState 方法從 Remote 恢復(fù) State,但其實 restoreFromRemoteState 最后還是調(diào)用的 restoreFromLocalState。restoreFromRemoteState 方法源碼如下所示:
private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) {
// 創(chuàng)建臨時目錄
final Path tmpRestoreInstancePath = new Path(
instanceBasePath.getAbsolutePath(),
UUID.randomUUID().toString());
try {
// 從本地恢復(fù)
restoreFromLocalState(
// 從遠程拉取 State 文件到本地
transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
} finally {
cleanUpPathQuietly(tmpRestoreInstancePath);
}
}
遠程恢復(fù) State 的過程如下所示:
- 本地創(chuàng)建 tmp 目錄
- 從遠程拉取 sst 文件到本地,將 遠程的 StateHandle 轉(zhuǎn)換為 本地 的 StateHandle
- 調(diào)用 restoreFromLocalState 方法,從 local 恢復(fù) State
- 清理 tmp 文件
所以得出的結(jié)論是:Remote 模式相比 Local 模式而言,只是多了一個從 dfs 上下載文件到本地的過程,下載到本地后就轉(zhuǎn)換成了 Local 模式進行恢復(fù)。所以先看一下 transferRemoteStateToLocalDirectory 方法是如何下載文件的,之后重點關(guān)注 restoreFromLocalState 即可。
下載狀態(tài)文件到本地流程:
transferRemoteStateToLocalDirectory 方法源碼如下所示:
private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
Path temporaryRestoreInstancePath,
IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
// try with resource 的方式創(chuàng)建 RocksDBStateDownloader
try (RocksDBStateDownloader rocksDBStateDownloader =
new RocksDBStateDownloader(numberOfTransferringThreads)) {
// 具體的從 dfs 上 Download 數(shù)據(jù)到本地
// 使用線程池,多線程拉取 所有的 sst 文件和 RocksDB 數(shù)據(jù)庫的元數(shù)據(jù)
rocksDBStateDownloader.transferAllStateDataToDirectory(
restoreStateHandle,
temporaryRestoreInstancePath,
cancelStreamRegistry);
}
// 將 Remote 的 StateHandle 重新構(gòu)建成 Local 的 StateHandle
return new IncrementalLocalKeyedStateHandle(
restoreStateHandle.getBackendIdentifier(),
restoreStateHandle.getCheckpointId(),
// 使用 DirectoryStateHandle,目錄就是之前創(chuàng)建的臨時數(shù)據(jù)目錄
new DirectoryStateHandle(temporaryRestoreInstancePath),
restoreStateHandle.getKeyGroupRange(),
restoreStateHandle.getMetaStateHandle(),
restoreStateHandle.getSharedState().keySet());
}
創(chuàng)建 RocksDBStateDownloader 類,見名之意,用于下載 RocksDB 狀態(tài)文件的類。RocksDBStateDownloader 的構(gòu)造參數(shù)是拉取文件的線程數(shù),具體可以進行配置的。然后使用 RocksDBStateDownloader 去 dfs 上多線程 Download 所有的 sst 文件和 RocksDB 數(shù)據(jù)庫的元數(shù)據(jù)數(shù)據(jù)到本地的 temporaryRestoreInstancePath 臨時目錄下。
拉取完成后,將 Remote 的 StateHandle 重新構(gòu)建成 Local 的 StateHandle,并且使用 DirectoryStateHandle,這里的目錄就是之前創(chuàng)建的臨時數(shù)據(jù)目錄,剛才下載的數(shù)據(jù)也在該目錄下。然后就開始
Increment 模式不修改并發(fā),從 Local 恢復(fù) State 流程
restoreFromLocalState 方法源碼如下所示:
// 從本地 State 文件中恢復(fù)狀態(tài)
private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
// 從 State 中獲取元數(shù)據(jù)
KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(
localKeyedStateHandle.getMetaDataState());
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy
.getStateMetaInfoSnapshots();
// 根據(jù) State 的元數(shù)據(jù),創(chuàng)建或注冊 CF 描述符
columnFamilyDescriptors =
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
// 準(zhǔn)備數(shù)據(jù)目錄
restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
// 打開 DB,打開 DB 時會將 columnFamilyHandles 填滿,
// 即獲取到 columnFamilyHandles 集合
columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
openDB();
// 將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中
registerColumnFamilyHandles(stateMetaInfoSnapshots);
}
首先從 State 中獲取元數(shù)據(jù),根據(jù) State 的元數(shù)據(jù),創(chuàng)建或注冊 CF 描述符,然后 restoreInstanceDirectoryFromPath 方法用于準(zhǔn)備數(shù)據(jù)目錄,準(zhǔn)備數(shù)據(jù)目錄就是將那些從 Checkpoint 處拉取的文件從臨時目錄拷貝到真正的 DB 目錄。restoreInstanceDirectoryFromPath 方法在拷貝數(shù)據(jù)時做了一個優(yōu)化,以 .sst 結(jié)尾的文件不是真正拷貝,而是做了一個 link,其他文件真正的拷貝過去。RocksDB 真正存儲數(shù)據(jù)的是 .sst,這些才是占用空間較大的文件,向其他的元數(shù)據(jù)占用空間較小,所以直接拷貝。
最后 registerColumnFamilyHandles 方法,將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)映射信息保存到 kvStateInformation 中。kvStateInformation 是一個 Map,key 為 StateName,value 為 RocksDbKvStateInfo 類型。
RocksDbKvStateInfo 相關(guān)類源碼如下所示:
public static class RocksDbKvStateInfo implements AutoCloseable{
public final ColumnFamilyHandle columnFamilyHandle;
public final RegisteredStateMetaInfoBase metaInfo;
}
RocksDbKvStateInfo 封裝了 ColumnFamilyHandle 和 RegisteredStateMetaInfoBase,ColumnFamilyHandle 表示 RocksDB CF 句柄,RegisteredStateMetaInfoBase 中維護了 State 的 name、類型、序列化信息等。有了 kvStateInformation,就可以根據(jù) StateName 拿到當(dāng)前 State 對應(yīng)的 CF 句柄讀到數(shù)據(jù),并拿到其對應(yīng)的序列化規(guī)則對讀到的數(shù)據(jù)進行反序列化。
到這里 RocksDB 的 Increment 模式在不改變并發(fā)的情況下,無論是 Remote 還是 Local,數(shù)據(jù)都正?;謴?fù)了(數(shù)據(jù)在本地的 RocksDB 實例中,可以根據(jù) kvStateInformation 中維護的信息從 RocksDB 中讀取到數(shù)據(jù))。
修改并發(fā)的恢復(fù)流程
修改并發(fā)的情況,新的 subtask 可能要恢復(fù)多個 StateHandle 的數(shù)據(jù),也就是多個 RocksDB 實例的數(shù)據(jù)。最笨的方法是將多個 RocksDB 的數(shù)據(jù)全拉取的本地,建立多個 RocksDB 實例,從中讀取出當(dāng)前 subtask 對應(yīng) KeyGroup 的數(shù)據(jù),寫入到一個新的 RocksDB 實例中。這樣存在一個問題,所有數(shù)據(jù)都是一條條從舊的 RocksDB get 出來,再一條條 put 到一個新的 RocksDB 中 。
恢復(fù) RocksDB 優(yōu)化
FLINK-8790 基于上述方案做了一個優(yōu)化:首先從多個 RocksDB 實例中選取一個最優(yōu)的 RocksDB 實例,最優(yōu)的標(biāo)準(zhǔn)是:RocksDB 負(fù)責(zé) KeyGroupRange 與當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange 的交集占 RocksDB 負(fù)責(zé)的 KeyGroupRange 的百分比最高。
舉個例子:RocksDB a 存儲的 KeyGroupRange(0,9) 的數(shù)據(jù),當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange(0,7) 的數(shù)據(jù),那么交集就是 KeyGroupRange(0,7)。KeyGroupRange(0,7) 中包含 8 個 KeyGroup,KeyGroupRange(0,9) 包含 10 個 KeyGroup,8 /10 為 80%。
此時就認(rèn)為 RocksDB a 上有 80% 的數(shù)據(jù)是有效的,所有要恢復(fù)的 RocksDB 都做一次上述運算,挑選出分?jǐn)?shù)最高的 RocksDB 實例。源碼中還加了一層限制,重疊率低于 75% 的 RocksDB 會直接被過濾掉。通過上述篩選,可能會得到一個相對來講最優(yōu)的 RocksDB 做為最終的 RocksDB,但是要對其進行裁剪。就拿上述例子來講,RocksDB 中存儲的 KeyGroupRange(0,9) 的數(shù)據(jù),但只需要 KeyGroupRange(0,7) 的數(shù)據(jù),所以會將 KeyGroupRange(8,9) 的數(shù)據(jù)裁掉。當(dāng)然裁剪效率相對較高,RocksDB 中 key 的設(shè)計都是以 KeyGroup 開頭的,LSM Tree 的底層存儲都是按照 key 有序存儲,所以直接按照前綴即可高效裁剪。
篩選最優(yōu) StateHandle 的代碼,參考 RocksDBIncrementalCheckpointUtils 的 chooseTheBestStateHandleForInitial 方法和 STATE_HANDLE_EVALUATOR 函數(shù)式接口,源碼如下所示:
public static KeyedStateHandle chooseTheBestStateHandleForInitial(
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull KeyGroupRange targetKeyGroupRange) {
KeyedStateHandle bestStateHandle = null;
double bestScore = 0;
// 遍歷所有 KeyedStateHandle
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
// 計算分?jǐn)?shù)
double handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle, targetKeyGroupRange);
if (handleScore > bestScore) {
// 保存最高分 及 對應(yīng)的 KeyedStateHandle
bestStateHandle = rawStateHandle;
bestScore = handleScore;
}
}
return bestStateHandle;
}
private static final BiFunction<KeyedStateHandle, KeyGroupRange, Double>
STATE_HANDLE_EVALUATOR = (stateHandle, targetKeyGroupRange) -> {
final KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
// 計算當(dāng)前 StateHandle 與 目標(biāo) Handle 在 KeyGroup 上的交集
final KeyGroupRange intersectGroup = handleKeyGroupRange.
getIntersection(targetKeyGroupRange);
// 計算 當(dāng)前 StateHandle 對應(yīng)的狀態(tài)文件上 有 百分之多少的數(shù)據(jù)應(yīng)該在當(dāng)前 subtask 上
final double overlapFraction = (double) intersectGroup.getNumberOfKeyGroups() /
handleKeyGroupRange.getNumberOfKeyGroups();
// 概率小于 0.75 返回 -1,意味著該 StateHandle 是肯定會被丟棄的
if (overlapFraction < OVERLAP_FRACTION_THRESHOLD) {
return -1.0;
}
return intersectGroup.getNumberOfKeyGroups()
* overlapFraction * overlapFraction;
};
修改并發(fā)的恢復(fù)主流程
restoreWithRescaling 源碼如下所示:
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) {
// 選取一個最好的 StateHandle 用于數(shù)據(jù)初始化,會有一個選擇標(biāo)準(zhǔn)打分,分?jǐn)?shù)最高,則被選中
// 分?jǐn)?shù)的計算規(guī)則:主要依賴 StateHandle 的 KeyGroup 與 當(dāng)前 subtask 處理的 KeyGroup 求一個交集,看重疊率
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.
chooseTheBestStateHandleForInitial(restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
// 打開選取的認(rèn)為最好的 StateHandle 對應(yīng)的 db,并對其多余的 KeyGroup 進行裁剪
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
// open 一個空的 db
openDB();
}
// 將 target 的 startKey 和 endKey 轉(zhuǎn)換成 byte 形式
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(),
startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1,
stopKeyGroupPrefixBytes);
// 將所有要恢復(fù)的 StateHandle 中對應(yīng)的 RocksDB 恢復(fù),
// 并將 target 的 startKey 和 endKey 之間的數(shù)據(jù) put 到目標(biāo) db
// 同時還需要將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
XXX
}
}
首先根據(jù)分?jǐn)?shù),挑選一個最優(yōu)的 StateHandle 作為 RocksDB 初始 DB,initDBWithRescaling 方法會對多余的 KeyGroup 進行裁剪。如果沒有挑選出來說明都不太優(yōu),會直接創(chuàng)建一個空的 RocksDB 作為初始 DB。
根據(jù)當(dāng)前 subtask 負(fù)責(zé)的 keyGroupRange 計算出 RocksDB 的 startKey 和 endKey,把其他剩余的所有 StateHandle 對應(yīng) RocksDB 數(shù)據(jù)庫一一恢復(fù),從中讀取出 key 位于 startKey 和 endKey 之間的數(shù)據(jù)插入到初始 DB 中。從 RocksDB 讀取數(shù)據(jù)時可以直接通過 startKey seek 到指定位置,因為是全局有序的,所以遍歷過程中一旦讀到 endKey 以外的數(shù)據(jù),就認(rèn)為遍歷結(jié)束了,直接退出循環(huán)。
同時在恢復(fù)過程中,需要將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中。
小優(yōu)化思考
后續(xù)多個 RocksDB 實例恢復(fù)時的流程基本是串行操作,即:從 dfs 上拉取第一個 RocksDB 數(shù)據(jù)文件、本地構(gòu)建 RocksDB 數(shù)據(jù)庫,依次讀出 startKey 和 endKey 之間的數(shù)據(jù)插入到新的 RocksDB。再拉取第二個、構(gòu)建、讀數(shù)據(jù)、寫數(shù)據(jù)。再拉取第三個。。。
思考:所有從 dfs 上拉取 RocksDB 數(shù)據(jù)文件的過程,能不能完全異步化,即:讀寫第一個 RocksDB 的過程中,就開始拉取第二個、第三個等。
四、 RocksDB 的 FullRestoreOperation 恢復(fù)流程
RocksDBFullRestoreOperation 類的 restore 方法的源碼如下所示:
@Override
public RocksDBRestoreResult restore()
throws IOException, StateMigrationException, RocksDBException {
// 打開空的 DB
openDB();
// 遍歷所有的 restoreStateHandles
for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
if (keyedStateHandle != null) {
// RocksDB 的 Full 模式與 Savepoint 模式保存的狀態(tài)文件都是 Flink 自己序列化好的問題,
// 其對應(yīng)的 KeyedStateHandle 必然是 KeyGroupsStateHandle。
if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected: " + KeyGroupsStateHandle.class +
", but found: " + keyedStateHandle.getClass());
}
this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
// 根據(jù) StateHandle 恢復(fù)
restoreKeyGroupsInStateHandle();
}
}
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, -1, null, null);
}
restore 方法會打開一個空的 RocksDB,遍歷所有的 restoreStateHandles,之前強調(diào)過 Full 模式的 KeyStateHandle 對應(yīng)的是 KeyGroupsStateHandle 類型。所以這里進行了判斷,如果不是 KeyGroupsStateHandle 類型,直接拋出異常,恢復(fù)失敗。然后 restoreKeyGroupsInStateHandle 方法用于恢復(fù)當(dāng)前 keyedStateHandle 對應(yīng)的數(shù)據(jù)。
restoreKeyGroupsInStateHandle 方法源碼如下所示:
private void restoreKeyGroupsInStateHandle()
throws IOException, StateMigrationException, RocksDBException {
try {
// KeyGroupsStateHandle 的場景,并不會直接拉回文件,而是建立一個遠程的輸入流
currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
// 注冊 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)
// 映射信息保存到 kvStateInformation 中
restoreKVStateMetaData();
// 將當(dāng)前 StateHandle 中屬于當(dāng)前 KeyGroupRange 的數(shù)據(jù) put 到 db 中
restoreKVStateData();
} finally {
if (cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
}
restoreKeyGroupsInStateHandle 依然會將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)映射信息保存到 kvStateInformation 中,并將 StateHandle 中屬于當(dāng)前 KeyGroupRange 的數(shù)據(jù) put 到 db 中。KeyGroupsStateHandle 中能拿到狀態(tài)文件的輸入流,且有保存的每個 KeyGroup 在文件中的 offset,所以可以直接讀取到數(shù)據(jù)并 put 到剛創(chuàng)建的 RocksDB 中。
小結(jié)
到這里 RocksDB 的三種模式都恢復(fù)完成,RocksDB 的三種恢復(fù)模式下,都會將 StateName 與具體 State 的信息維護在 kvStateInformation 中。后期在創(chuàng)建 State 的過程中,也會通過 kvStateInformation 將創(chuàng)建的 State 與 Checkpoint 中恢復(fù)的 State 進行關(guān)聯(lián)。
下面分析 FsStateBackend 模式下的恢復(fù)流程。
五、 HeapKeyedStateBackend 創(chuàng)建恢復(fù)流程
FsStateBackend 模式下,createKeyedStateBackend 方法創(chuàng)建的是 HeapKeyedStateBackend,最后調(diào)用的 HeapKeyedStateBackendBuilder 的 build 方法。
build 方法源碼如下所示:
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
// Map of registered Key/Value states
Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
// Map of registered priority queue set states
Map<String, HeapPriorityQueueSnapshotRestoreWrapper>
registeredPQStates = new HashMap<>();
HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(XXX);
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(XXX);
try {
// 恢復(fù)流程
restoreOperation.restore();
} catch (Exception e) {
throw new BackendBuildingException("XXX", e);
}
// 構(gòu)建 HeapKeyedStateBackend
return new HeapKeyedStateBackend<>(XXX);
}
build 方法中首先創(chuàng)建出 Map 類型的 registeredKVStates,用于保存 StateName 及對應(yīng)的 StateTable,每個 State 對應(yīng)一個 StateTable 存儲狀態(tài)數(shù)據(jù)。將 registeredKVStates 傳遞給 HeapRestoreOperation 用于恢復(fù),最后再傳遞給 HeapKeyedStateBackend 用于后續(xù)使用。
HeapRestoreOperation 類的 restore 方法會遍歷所有的 StateHandle 恢復(fù) State 信息,維護映射關(guān)系到 registeredKVStates 中,并恢復(fù) State 信息到具體的 StateTable 中。StateTable 是 Heap 模式真正存儲 State 的集合。
小結(jié)
HeapKeyedStateBackend 會將 StateName 與具體 State 的信息維護在 registeredKVStates 中。后期在創(chuàng)建 State 的過程中,也會通過 registeredKVStates 將創(chuàng)建的 State 與 Checkpoint 中恢復(fù)的 State 進行關(guān)聯(lián)。
下面詳細分析 KeyedState 的創(chuàng)建流程。
六、 用戶定義的 KeyedState 創(chuàng)建流程
可以拿 IntervalJoinOperator 的例子來分析 KeyedState 的創(chuàng)建流程,IntervalJoin 用于對兩個輸入流的數(shù)據(jù)進行關(guān)聯(lián),兩個流先到的數(shù)據(jù)會放到 buffer 中,左右兩個流分別有各自的 buffer,使用 Flink 的 MapState 充當(dāng) buffer。
IntervalJoinOperator 類的 initializeState 方法源碼如下所示:
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// 左流的 buffer
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
// 右流的 buffer
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}
initializeState 方法會創(chuàng)建兩個 State,即:LEFT_BUFFER 和 RIGHT_BUFFER。
context.getKeyedStateStore().getMapState 實際調(diào)用 DefaultKeyedStateStore 類的 getMapState 方法,整個創(chuàng)建 State 第一階段的方法調(diào)用時序圖如下所示:
調(diào)用關(guān)系從 DefaultKeyedStateStore 類的 getMapState 方法到 KeyedStateFactory 的 createInternalState 方法。
如下圖所示,KeyedStateFactory 有兩個子類,即:HeapKeyedStateBackend 和 RocksDBKeyedStateBackend。
如下圖所示,HeapKeyedStateBackend 會對應(yīng) MemoryStateBackend 和 FsStateBackend,RocksDBKeyedStateBackend 對應(yīng) RocksDBStateBackend。
下面詳細介紹 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 的 createInternalState 方法是如何創(chuàng)建 State 的。
HeapKeyedStateBackend 創(chuàng)建 State 流程
HeapKeyedStateBackend 類的 createInternalState 方法源碼如下所示:
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(XXX) {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
// 注冊和恢復(fù) StateTable
StateTable<K, N, SV> stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc,
getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
// 根據(jù) stateDesc、StateTable 和 序列化信息,創(chuàng)建具體的 State
return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}
createInternalState 方法中首先會執(zhí)行 tryRegisterStateTable 方法「注冊和恢復(fù)」 StateTable,然后根據(jù) stateDesc、StateTable 和 序列化信息,創(chuàng)建具體的 State。
重點的恢復(fù)邏輯就在 tryRegisterStateTable 方法中,tryRegisterStateTable 方法源碼如下所示:
private <N, V> StateTable<K, N, V> tryRegisterStateTable(XXX) {
// 根據(jù) StateName 從 registeredKVStates 中獲取 StateTable
StateTable<K, N, V> stateTable = (StateTable<K, N, V>)
registeredKVStates.get(stateDesc.getName());
// stateTable 不為空,表示從 Checkpoint 中恢復(fù)了當(dāng)前 State
if (stateTable != null) {
RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo =
stateTable.getMetaInfo();
// 主要對 State 的兼容性進行校驗,校驗包括:StateName、狀態(tài)類型、序列化校驗
// 如果創(chuàng)建的 State 與 Checkpoint 恢復(fù)的 State 不匹配,
// 則拋出異常,不能成功恢復(fù)
restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);
TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
// 檢查 State 的 name 和 Type 是否可以匹配
restoredKvMetaInfo.checkStateMetaInfo(stateDesc);
TypeSerializerSchemaCompatibility<V> stateCompatibility =
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("XXX");
}
stateTable.setMetaInfo(restoredKvMetaInfo);
} else {
// 沒有從 Checkpoint 恢復(fù),則創(chuàng)建 StateTable,存放到 registeredKVStates 中
RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new
RegisteredKeyValueStateBackendMetaInfo<>(XXX);
stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
registeredKVStates.put(stateDesc.getName(), stateTable);
}
return stateTable;
}
tryRegisterStateTable 方法首先會根據(jù) StateName 從 registeredKVStates 中獲取 StateTable 保存到 stateTable 中。
如果 stateTable 不為空,表示 Checkpoint 中有當(dāng)前 StateName 對應(yīng)的狀態(tài),應(yīng)該恢復(fù),此時會對新舊 Job 的 State 匹配性進行檢測,校驗項包括:StateName、狀態(tài)類型、序列化校驗。
否則 stateTable 為空,表示當(dāng)前 StateName 不需要從 Checkpoint 恢復(fù),直接創(chuàng)建一個新的 StateTable,存放到 registeredKVStates 中。
RocksDBKeyedStateBackend 創(chuàng)建 State 流程
RocksDBKeyedStateBackend 類的 createInternalState 方法源碼如下所示:
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(XXX) {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
// 注冊和恢復(fù) State 元信息
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
registerResult = tryRegisterKvStateInformation(
stateDesc, namespaceSerializer, snapshotTransformFactory);
// 根據(jù) stateDesc、State 元信息 和 RocksDBKeyedStateBackend,創(chuàng)建具體的 State
return stateFactory.createState(stateDesc, registerResult,
RocksDBKeyedStateBackend.this);
}
createInternalState 方法中首先會執(zhí)行 tryRegisterKvStateInformation 方法「注冊和恢復(fù)」 State 元信息,然后根據(jù) stateDesc、State 元信息 和 RocksDBKeyedStateBackend,創(chuàng)建具體的 State。
tryRegisterKvStateInformation 與上述 HeapKeyedStateBackend 類的恢復(fù)邏輯類似,所以不貼代碼了,tryRegisterKvStateInformation 方法的整體邏輯就是:
首先會根據(jù) StateName 從 kvStateInformation 中獲取 State 的元信息保存到 oldStateInfo 中。
如果 stateTable 不為空,表示 Checkpoint 中有當(dāng)前 StateName 對應(yīng)的狀態(tài),應(yīng)該恢復(fù),此時會對新舊 Job 的 State 匹配性進行校驗。
否則 stateTable 為空,表示當(dāng)前 StateName 不需要從 Checkpoint 恢復(fù),直接在 RocksDB 中創(chuàng)建一個新的 ColumnFamily 存儲當(dāng)前 State 的數(shù)據(jù)。
小結(jié)
在恢復(fù)過程中主要依賴之前創(chuàng)建的 Map,Map 中保存的從 Checkpoint 中恢復(fù)出來的狀態(tài)數(shù)據(jù)。如果 Map 中有對應(yīng) StateName 的數(shù)據(jù),則對其進行校驗并恢復(fù);如果 Map 中找不到,則創(chuàng)建新的。
七、 總結(jié)
本文首先介紹了 RocksDBKeyedStateBackend 創(chuàng)建流程,并分別介紹 RocksDB 三種模式下的 State 恢復(fù)流程,分別是:NoneRestoreOperation、IncrementalRestoreOperation、FullRestoreOperation 三種模式。之后介紹 HeapKeyedStateBackend 恢復(fù)流程。最后介紹了用戶定義的 KeyedState 創(chuàng)建流程,創(chuàng)建流程會介紹 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來。