OpenSearch存儲(chǔ)數(shù)據(jù)的默認(rèn)位置是data目錄:

存儲(chǔ)的數(shù)據(jù)的種類
nodes/0下的就是節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)了, 分為以下幾種:
- state元信息
- index lucene的索引和段文件等
- translog 事務(wù)日志


state 元信息
opensearch fork自 elasticsearch7.10, 與它一樣, 關(guān)于集群狀態(tài)的元數(shù)據(jù), 是寫在了lucene的段文件里.
只有具備Master資格的節(jié)點(diǎn)和數(shù)據(jù)節(jié)點(diǎn)可以持久化集群狀態(tài).
-
GatewayMetaState負(fù)責(zé)元數(shù)據(jù)的接收和持久化 -
GatewayService負(fù)責(zé)元數(shù)據(jù)的恢復(fù)(在重啟集群)
- _state/*.st : 集群層面的元信息: UUID, Settings, template, etc
- indices/{index_uuid}/_state/*.st: 索引層面的元信息
- indices/{index_uuid}/_state/0/_state/*.st: 分片層面的元信息
關(guān)于 st 后綴的文件, 可以下載一個(gè)16進(jìn)制的查看器, 查看文件的內(nèi)容:
http://hexfiend.com/

Notice: 持久化的state不包括某個(gè)分片存在于哪個(gè)節(jié)點(diǎn)這種內(nèi)容路由信息,集群完全重啟時(shí),依靠
gateway的recovery過程重建RoutingTable.

元數(shù)據(jù)的持久化與變更
元數(shù)據(jù)從哪里來: 1. 持久化存儲(chǔ) 2. ClusterState
GatewayMetaState的內(nèi)部類: GatewayClusterApplier extents ClusterStateApplier 會(huì)將接受到的ClusterState的元信息持久化. applyClusterState 的統(tǒng)一調(diào)用入口在ClusterApplierService#callClusterStateAppliers
public void applyClusterState(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
incrementalClusterStateWriter.setIncrementalWrite(false);
return;
}
try {
// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
// that's higher than the last accepted term.
// TODO: can we get rid of this hack?
if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {
incrementalClusterStateWriter.setCurrentTerm(event.state().term());
}
incrementalClusterStateWriter.updateClusterState(event.state());
incrementalClusterStateWriter.setIncrementalWrite(true);
} catch (WriteStateException e) {
logger.warn("Exception occurred when storing new meta data", e);
}
}
org.opensearch.gateway.IncrementalClusterStateWriter#updateClusterState 會(huì)負(fù)責(zé)持久化元信息(實(shí)際的寫入動(dòng)作還是委托給了MetaStateService以及MetadataStateFormat, 處理失敗等等:
先寫入全局狀態(tài)long globalStateGeneration = writeGlobalState(writer, newMetadata);
然后寫入索引狀態(tài): Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);
最后寫入Manifest: writeManifest(writer, manifest); 而Manifest 文件是持久化元信息的入口.
舉例, 寫入全局狀態(tài)
long writeGlobalState(String reason, Metadata metadata) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try { // 1. 設(shè)置回滾清理 2. 寫入 3. 設(shè)置提交清理 兩個(gè)清理的區(qū)別是: 1 會(huì)刪除previousManifest之前的提交記錄 3. 是提交成功的,會(huì)刪除此次提交之前的記錄, 包括previousManifest
rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
long generation = metaStateService.writeGlobalState(reason, metadata);
commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
return generation;
} catch (WriteStateException e) {
rollback();
throw e;
}
}
- 設(shè)置回滾清理
- 寫入
- 設(shè)置提交清理 兩個(gè)清理的區(qū)別是: 1 會(huì)刪除previousManifest之前的提交記錄 3. 是提交成功的,會(huì)刪除此次提交之前的記錄, 包括previousManifest.
所謂提交記錄, 是提交點(diǎn). _state 目錄下的帶有對(duì)應(yīng)前綴的都是提交點(diǎn), 數(shù)字后綴是版本. 比如node-130就是node的元信息, 版本130.
而底層執(zhí)行讀寫的邏輯都被封裝在了MetaDataStateFormat<T>中, 主要由write 方法調(diào)用寫(且寫都遵循同樣的邏輯 : 寫臨時(shí)文件 -> 刷盤 -> rename/move 原子操作 ), 不同的元數(shù)據(jù)作為子類, 需要實(shí)現(xiàn)抽象方法:
/**
* Writes the given state to the given XContentBuilder
* Subclasses need to implement this class for theirs specific state.
*/
public abstract void toXContent(XContentBuilder builder, T state) throws IOException;
/**
* Reads a new instance of the state from the given XContentParser
* Subclasses need to implement this class for theirs specific state.
*/
public abstract T fromXContent(XContentParser parser) throws IOException;

元數(shù)據(jù)的恢復(fù)
GatewayService負(fù)責(zé)元數(shù)據(jù)的恢復(fù)動(dòng)作, 在集群狀態(tài)發(fā)生變化的時(shí)候, 負(fù)責(zé)元數(shù)據(jù)的恢復(fù). 調(diào)用點(diǎn): ClusterApplierService在應(yīng)用集群狀態(tài)變更的時(shí)候. GaewayService是一級(jí)的組件, 在 org.opensearch.gateway.GatewayService.doStart 組件啟動(dòng)的時(shí)候 ,將自己作為一個(gè)集群狀態(tài)監(jiān)聽器注冊(cè)到ClusterService中, 這樣, ClusterService在ClusterState發(fā)生變化的時(shí)候, 會(huì)調(diào)用ClusterApplierService 的 handleApplyCommit#onNewClusterState#applyChange 來觸發(fā)監(jiān)聽器的調(diào)用. 在zen2模式下集群級(jí)別和索引級(jí)別的元數(shù)據(jù)在集群master選擇出來后就已經(jīng)有了, 而zen1(Gateway)是在更新索引級(jí)別的. 當(dāng)索引級(jí)別的元數(shù)據(jù)也恢復(fù)以后, 就需要恢復(fù)shard級(jí)別的. 從始至終, 任務(wù)都在MasterSerivce的一個(gè)"masterService#updateTask" 的線程池里執(zhí)行,這是個(gè)單線程的線程池, 也就是說,任務(wù)會(huì)阻塞執(zhí)行.
index級(jí)別
GatewayService依賴于:
-
AllocationServiceshard分配服務(wù), 元數(shù)據(jù)在cluster和index層面的恢復(fù)是由GatewayService操作的, 而shard級(jí)別的是由AllocationSerivce, 所以完整的恢復(fù)需要依賴AllocationService -
ClusterService集群服務(wù), 負(fù)責(zé)的是集群狀態(tài)的協(xié)調(diào), 狀態(tài)變更的發(fā)生地,GatewayService要感知到集群狀態(tài)的變化, 必須依賴他 -
TransportNodesListGatewayMetaStatetodo -
Discovery用來判斷 集群是用的zen1 還是 zen2(修改過的raft) -
ThreadPoolandSettings
GatewayService 初始化的時(shí)候, 讀取元數(shù)據(jù)恢復(fù)的各類配置項(xiàng).
GatewayService() {
this.expectedNodes = EXPECTED_NODES_SETTING.get(settings);
this.expectedDataNodes = EXPECTED_DATA_NODES_SETTING.get(settings);
this.expectedMasterNodes = EXPECTED_MASTER_NODES_SETTING.get(settings);
if (RECOVER_AFTER_TIME_SETTING.exists(settings)) {
recoverAfterTime = RECOVER_AFTER_TIME_SETTING.get(settings);
} else if (expectedNodes >= 0 || expectedDataNodes >= 0 || expectedMasterNodes >= 0) {
recoverAfterTime = DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET;
} else {
recoverAfterTime = null;
}
this.recoverAfterNodes = RECOVER_AFTER_NODES_SETTING.get(settings);
this.recoverAfterDataNodes = RECOVER_AFTER_DATA_NODES_SETTING.get(settings);
// default the recover after master nodes to the minimum master nodes in the discovery
if (RECOVER_AFTER_MASTER_NODES_SETTING.exists(settings)) {
recoverAfterMasterNodes = RECOVER_AFTER_MASTER_NODES_SETTING.get(settings);
} else if (discovery instanceof ZenDiscovery) {
recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
} else {
recoverAfterMasterNodes = -1;
}
}
- expectedNodes: "gateway.expected_nodes" : data和master-elig節(jié)點(diǎn)的個(gè)數(shù), 要達(dá)到這個(gè)數(shù)據(jù)才會(huì)開始恢復(fù)
- expectedDataNodes: "gateway.expected_data_nodes"
- expectedMasterNodes: "gateway.expected_master_nodes"
- recoverAfterTime: "gateway.recover_after_time" : 如果沒有達(dá)到預(yù)期的節(jié)點(diǎn)數(shù)量,則恢復(fù)過程將等待配置的時(shí)間,再嘗試恢復(fù)。
- recoverAfterNodes: "gateway.recover_after_nodes" : 只要配置數(shù)量的節(jié)點(diǎn)(數(shù)據(jù)節(jié)點(diǎn)或具備Master資格的節(jié)點(diǎn))加入集群就可以開始恢復(fù)
- recoverAfterDataNodes: "gateway.recover_after_data_nodes": 更精細(xì)化的配置
- recoverAfterMasterNodes: "gateway.recover_after_master_nodes" : 更精細(xì)化的配置
expected_nodes 和 recover_after_time 和 recover_after_nodes 是 或 的關(guān)系, 只要有一個(gè)先達(dá)到, 就執(zhí)行.
STATE_NOT_RECOVERED_BLOCK : 阻塞態(tài), 如果有, 則表示集群的元數(shù)據(jù)還沒有恢復(fù), 需要執(zhí)行GatewayService的恢復(fù). 如果沒有, 則已經(jīng)恢復(fù), 不用執(zhí)行. 所以, 可以關(guān)注哪些地方會(huì)生成這個(gè)阻塞態(tài):
Discovery的子類zen和Coordinator 的 doStart 方法, 即集群在選主的時(shí)候, 選主完成后(找到最大的term, 最大的version), 找到最新的ClusterState, 然后發(fā)布出去, 二階段提交的第二階段觸發(fā)集群狀態(tài)變化. 然后, 進(jìn)入恢復(fù)流程.
根據(jù)選主算法的不同, 會(huì)采用不同的恢復(fù)流程:
if (discovery instanceof Coordinator) {
recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
} else {
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
}
即, 不同選主算法下, 元數(shù)據(jù)的恢復(fù)流程被封裝成了一個(gè) Runnable recoveryRunnable, 在clusterChanged#performStateRecovery的時(shí)候,觸發(fā)調(diào)用. 出發(fā)前要做一系列的校驗(yàn), 判斷前置條件, 如:
執(zhí)行時(shí)間, 節(jié)點(diǎn)個(gè)數(shù) 等
zen1
入口:
org.opensearch.gateway.Gateway#performStateRecovery
步驟:
-
final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);獲取具有Matser資格的節(jié)點(diǎn)列表 -
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();獲取每個(gè)節(jié)點(diǎn)上的 IndexMetaData, 是同步阻塞調(diào)用.TransportNodesAction -
final int requiredAllocation = Math.max(1, minimumMasterNodes);zen1模式的缺陷之一: 必須要用戶設(shè)置了過半的個(gè)數(shù) - 從各個(gè)node拿回來的信息, 比較每個(gè)node里version 最大的ClusterState, 取版本最大的. 并獲取所有 master-elig節(jié)點(diǎn)中存儲(chǔ)的所有index信息.
- 最大版本號(hào)的
ClusterState可以用來做除了index信息之外的其他信息的基礎(chǔ), index信息, 將通過第四步的獲取結(jié)果重建. 對(duì)從各個(gè)節(jié)點(diǎn)拿回來的信息按照index分組, Index -> List[Node]. 然后找出每個(gè)index在各個(gè)node中最大版本的元數(shù)據(jù)作為此index的元信息. - 刪除無用信息, 不合法的信息, 最終基于最大版本號(hào)的
ClusterState(沒有index信息)以及第五步中獲取的信息構(gòu)建新的ClusterState - 構(gòu)建后處理: 調(diào)用成功和失敗的后處理. 成功:
GatewayRecoveryListener, 提交一個(gè)"local-gateway-elected-state", new RecoverStateUpdateTask()的子任務(wù),- 任務(wù)會(huì)混合當(dāng)前的
ClusterState和6得到的恢復(fù)了的ClusterState - 并且重建阻塞態(tài), Settings中設(shè)置的.
- 最后調(diào)用父類 RecoverStateUpdateTask 的execute方法, 從indices重建 routingTable , 取出
Not recovered的阻塞態(tài). -
啟動(dòng)shard的恢復(fù):
allocationService.reroute(newState, "state recovered");
- 任務(wù)會(huì)混合當(dāng)前的
zen2 Coordinator
clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask()
與zen1 模式不同的是, Coordinator不需要再向各個(gè)master-eligible節(jié)點(diǎn)拉取indices的元信息以決定index 的最新元信息是什么, 因?yàn)槔昧薘aft的選主算法, 已經(jīng)可以保證選完主之后發(fā)布的ClusterState就是罪行的. 所以, 直接用他們構(gòu)建routingTable即可, 然后觸發(fā) shard的恢復(fù).
核心邏輯:
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
logger.debug("cluster is already recovered");
return currentState;
}
final ClusterState newState = Function.<ClusterState>identity()
.andThen(ClusterStateUpdaters::updateRoutingTable)
.andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
.apply(currentState);
return allocationService.reroute(newState, "state recovered");
}
shard級(jí)別
見其他章節(jié)