opensearch元數(shù)據(jù)分析

OpenSearch存儲(chǔ)數(shù)據(jù)的默認(rèn)位置是data目錄:


data目錄

存儲(chǔ)的數(shù)據(jù)的種類

nodes/0下的就是節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)了, 分為以下幾種:

  1. state元信息
  2. index lucene的索引和段文件等
  3. translog 事務(wù)日志
image.png
image.png

state 元信息

opensearch fork自 elasticsearch7.10, 與它一樣, 關(guān)于集群狀態(tài)的元數(shù)據(jù), 是寫在了lucene的段文件里.
只有具備Master資格的節(jié)點(diǎn)和數(shù)據(jù)節(jié)點(diǎn)可以持久化集群狀態(tài).

  • GatewayMetaState 負(fù)責(zé)元數(shù)據(jù)的接收和持久化
  • GatewayService 負(fù)責(zé)元數(shù)據(jù)的恢復(fù)(在重啟集群)
  1. _state/*.st : 集群層面的元信息: UUID, Settings, template, etc
  2. indices/{index_uuid}/_state/*.st: 索引層面的元信息
  3. indices/{index_uuid}/_state/0/_state/*.st: 分片層面的元信息

關(guān)于 st 后綴的文件, 可以下載一個(gè)16進(jìn)制的查看器, 查看文件的內(nèi)容:
http://hexfiend.com/

image.png

Notice: 持久化的state不包括某個(gè)分片存在于哪個(gè)節(jié)點(diǎn)這種內(nèi)容路由信息,集群完全重啟時(shí),依靠
gateway的recovery過程重建RoutingTable.

image.png

元數(shù)據(jù)的持久化與變更

元數(shù)據(jù)從哪里來: 1. 持久化存儲(chǔ) 2. ClusterState

GatewayMetaState的內(nèi)部類: GatewayClusterApplier extents ClusterStateApplier 會(huì)將接受到的ClusterState的元信息持久化. applyClusterState 的統(tǒng)一調(diào)用入口在ClusterApplierService#callClusterStateAppliers

        public void applyClusterState(ClusterChangedEvent event) {
            if (event.state().blocks().disableStatePersistence()) {
                incrementalClusterStateWriter.setIncrementalWrite(false);
                return;
            }

            try {
                // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
                // that's higher than the last accepted term.
                // TODO: can we get rid of this hack?
                if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {
                    incrementalClusterStateWriter.setCurrentTerm(event.state().term());
                }

                incrementalClusterStateWriter.updateClusterState(event.state());
                incrementalClusterStateWriter.setIncrementalWrite(true);
            } catch (WriteStateException e) {
                logger.warn("Exception occurred when storing new meta data", e);
            }
        }

org.opensearch.gateway.IncrementalClusterStateWriter#updateClusterState 會(huì)負(fù)責(zé)持久化元信息(實(shí)際的寫入動(dòng)作還是委托給了MetaStateService以及MetadataStateFormat, 處理失敗等等:

寫入全局狀態(tài)long globalStateGeneration = writeGlobalState(writer, newMetadata);
然后寫入索引狀態(tài): Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);
最后寫入Manifest: writeManifest(writer, manifest);Manifest 文件是持久化元信息的入口.

舉例, 寫入全局狀態(tài)

        long writeGlobalState(String reason, Metadata metadata) throws WriteStateException {
            assert finished == false : FINISHED_MSG;
            try { // 1. 設(shè)置回滾清理 2. 寫入 3. 設(shè)置提交清理  兩個(gè)清理的區(qū)別是: 1 會(huì)刪除previousManifest之前的提交記錄 3. 是提交成功的,會(huì)刪除此次提交之前的記錄, 包括previousManifest
                rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
                long generation = metaStateService.writeGlobalState(reason, metadata);
                commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
                return generation;
            } catch (WriteStateException e) {
                rollback();
                throw e;
            }
        }

  1. 設(shè)置回滾清理
  2. 寫入
  3. 設(shè)置提交清理 兩個(gè)清理的區(qū)別是: 1 會(huì)刪除previousManifest之前的提交記錄 3. 是提交成功的,會(huì)刪除此次提交之前的記錄, 包括previousManifest.

所謂提交記錄, 是提交點(diǎn). _state 目錄下的帶有對(duì)應(yīng)前綴的都是提交點(diǎn), 數(shù)字后綴是版本. 比如node-130就是node的元信息, 版本130.

而底層執(zhí)行讀寫的邏輯都被封裝在了MetaDataStateFormat<T>中, 主要由write 方法調(diào)用寫(且寫都遵循同樣的邏輯 : 寫臨時(shí)文件 -> 刷盤 -> rename/move 原子操作 ), 不同的元數(shù)據(jù)作為子類, 需要實(shí)現(xiàn)抽象方法:

    /**
     * Writes the given state to the given XContentBuilder
     * Subclasses need to implement this class for theirs specific state.
     */
    public abstract void toXContent(XContentBuilder builder, T state) throws IOException;

    /**
     * Reads a new instance of the state from the given XContentParser
     * Subclasses need to implement this class for theirs specific state.
     */
    public abstract T fromXContent(XContentParser parser) throws IOException;
image.png

元數(shù)據(jù)的恢復(fù)

GatewayService負(fù)責(zé)元數(shù)據(jù)的恢復(fù)動(dòng)作, 在集群狀態(tài)發(fā)生變化的時(shí)候, 負(fù)責(zé)元數(shù)據(jù)的恢復(fù). 調(diào)用點(diǎn): ClusterApplierService在應(yīng)用集群狀態(tài)變更的時(shí)候. GaewayService是一級(jí)的組件, 在 org.opensearch.gateway.GatewayService.doStart 組件啟動(dòng)的時(shí)候 ,將自己作為一個(gè)集群狀態(tài)監(jiān)聽器注冊(cè)到ClusterService中, 這樣, ClusterServiceClusterState發(fā)生變化的時(shí)候, 會(huì)調(diào)用ClusterApplierServicehandleApplyCommit#onNewClusterState#applyChange 來觸發(fā)監(jiān)聽器的調(diào)用. 在zen2模式下集群級(jí)別和索引級(jí)別的元數(shù)據(jù)在集群master選擇出來后就已經(jīng)有了, 而zen1(Gateway)是在更新索引級(jí)別的. 當(dāng)索引級(jí)別的元數(shù)據(jù)也恢復(fù)以后, 就需要恢復(fù)shard級(jí)別的. 從始至終, 任務(wù)都在MasterSerivce的一個(gè)"masterService#updateTask" 的線程池里執(zhí)行,這是個(gè)單線程的線程池, 也就是說,任務(wù)會(huì)阻塞執(zhí)行.

index級(jí)別

GatewayService依賴于:

  1. AllocationService shard分配服務(wù), 元數(shù)據(jù)在cluster和index層面的恢復(fù)是由GatewayService操作的, 而shard級(jí)別的是由AllocationSerivce , 所以完整的恢復(fù)需要依賴AllocationService
  2. ClusterService 集群服務(wù), 負(fù)責(zé)的是集群狀態(tài)的協(xié)調(diào), 狀態(tài)變更的發(fā)生地, GatewayService要感知到集群狀態(tài)的變化, 必須依賴他
  3. TransportNodesListGatewayMetaState todo
  4. Discovery 用來判斷 集群是用的zen1 還是 zen2(修改過的raft)
  5. ThreadPool and Settings

GatewayService 初始化的時(shí)候, 讀取元數(shù)據(jù)恢復(fù)的各類配置項(xiàng).

GatewayService() {
        this.expectedNodes = EXPECTED_NODES_SETTING.get(settings);
        this.expectedDataNodes = EXPECTED_DATA_NODES_SETTING.get(settings);
        this.expectedMasterNodes = EXPECTED_MASTER_NODES_SETTING.get(settings);

        if (RECOVER_AFTER_TIME_SETTING.exists(settings)) {
            recoverAfterTime = RECOVER_AFTER_TIME_SETTING.get(settings);
        } else if (expectedNodes >= 0 || expectedDataNodes >= 0 || expectedMasterNodes >= 0) {
            recoverAfterTime = DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET;
        } else {
            recoverAfterTime = null;
        }
        this.recoverAfterNodes = RECOVER_AFTER_NODES_SETTING.get(settings);
        this.recoverAfterDataNodes = RECOVER_AFTER_DATA_NODES_SETTING.get(settings);
        // default the recover after master nodes to the minimum master nodes in the discovery
        if (RECOVER_AFTER_MASTER_NODES_SETTING.exists(settings)) {
            recoverAfterMasterNodes = RECOVER_AFTER_MASTER_NODES_SETTING.get(settings);
        } else if (discovery instanceof ZenDiscovery) {
            recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
        } else {
            recoverAfterMasterNodes = -1;
        }
}
  • expectedNodes: "gateway.expected_nodes" : data和master-elig節(jié)點(diǎn)的個(gè)數(shù), 要達(dá)到這個(gè)數(shù)據(jù)才會(huì)開始恢復(fù)
  • expectedDataNodes: "gateway.expected_data_nodes"
  • expectedMasterNodes: "gateway.expected_master_nodes"
  • recoverAfterTime: "gateway.recover_after_time" : 如果沒有達(dá)到預(yù)期的節(jié)點(diǎn)數(shù)量,則恢復(fù)過程將等待配置的時(shí)間,再嘗試恢復(fù)。
  • recoverAfterNodes: "gateway.recover_after_nodes" : 只要配置數(shù)量的節(jié)點(diǎn)(數(shù)據(jù)節(jié)點(diǎn)或具備Master資格的節(jié)點(diǎn))加入集群就可以開始恢復(fù)
  • recoverAfterDataNodes: "gateway.recover_after_data_nodes": 更精細(xì)化的配置
  • recoverAfterMasterNodes: "gateway.recover_after_master_nodes" : 更精細(xì)化的配置
    expected_nodesrecover_after_timerecover_after_nodes 是 或 的關(guān)系, 只要有一個(gè)先達(dá)到, 就執(zhí)行.

STATE_NOT_RECOVERED_BLOCK : 阻塞態(tài), 如果有, 則表示集群的元數(shù)據(jù)還沒有恢復(fù), 需要執(zhí)行GatewayService的恢復(fù). 如果沒有, 則已經(jīng)恢復(fù), 不用執(zhí)行. 所以, 可以關(guān)注哪些地方會(huì)生成這個(gè)阻塞態(tài):
Discovery的子類zenCoordinatordoStart 方法, 即集群在選主的時(shí)候, 選主完成后(找到最大的term, 最大的version), 找到最新的ClusterState, 然后發(fā)布出去, 二階段提交的第二階段觸發(fā)集群狀態(tài)變化. 然后, 進(jìn)入恢復(fù)流程.

根據(jù)選主算法的不同, 會(huì)采用不同的恢復(fù)流程:

        if (discovery instanceof Coordinator) {
            recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
        } else {
            final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
            recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
        }

即, 不同選主算法下, 元數(shù)據(jù)的恢復(fù)流程被封裝成了一個(gè) Runnable recoveryRunnable, 在clusterChanged#performStateRecovery的時(shí)候,觸發(fā)調(diào)用. 出發(fā)前要做一系列的校驗(yàn), 判斷前置條件, 如:
執(zhí)行時(shí)間, 節(jié)點(diǎn)個(gè)數(shù) 等

zen1

入口:
org.opensearch.gateway.Gateway#performStateRecovery

步驟:

  1. final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class); 獲取具有Matser資格的節(jié)點(diǎn)列表
  2. TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet(); 獲取每個(gè)節(jié)點(diǎn)上的 IndexMetaData, 是同步阻塞調(diào)用. TransportNodesAction
  3. final int requiredAllocation = Math.max(1, minimumMasterNodes); zen1模式的缺陷之一: 必須要用戶設(shè)置了過半的個(gè)數(shù)
  4. 從各個(gè)node拿回來的信息, 比較每個(gè)node里version 最大的ClusterState, 取版本最大的. 并獲取所有 master-elig節(jié)點(diǎn)中存儲(chǔ)的所有index信息.
  5. 最大版本號(hào)的ClusterState 可以用來做除了index信息之外的其他信息的基礎(chǔ), index信息, 將通過第四步的獲取結(jié)果重建. 對(duì)從各個(gè)節(jié)點(diǎn)拿回來的信息按照index分組, Index -> List[Node]. 然后找出每個(gè)index在各個(gè)node中最大版本的元數(shù)據(jù)作為此index的元信息.
  6. 刪除無用信息, 不合法的信息, 最終基于最大版本號(hào)的ClusterState(沒有index信息)以及第五步中獲取的信息構(gòu)建新的ClusterState
  7. 構(gòu)建后處理: 調(diào)用成功和失敗的后處理. 成功: GatewayRecoveryListener , 提交一個(gè)"local-gateway-elected-state", new RecoverStateUpdateTask() 的子任務(wù),
    • 任務(wù)會(huì)混合當(dāng)前的ClusterState和6得到的恢復(fù)了的ClusterState
    • 并且重建阻塞態(tài), Settings中設(shè)置的.
    • 最后調(diào)用父類 RecoverStateUpdateTask 的execute方法, 從indices重建 routingTable , 取出 Not recovered 的阻塞態(tài).
    • 啟動(dòng)shard的恢復(fù): allocationService.reroute(newState, "state recovered");

zen2 Coordinator

clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask()

與zen1 模式不同的是, Coordinator不需要再向各個(gè)master-eligible節(jié)點(diǎn)拉取indices的元信息以決定index 的最新元信息是什么, 因?yàn)槔昧薘aft的選主算法, 已經(jīng)可以保證選完主之后發(fā)布的ClusterState就是罪行的. 所以, 直接用他們構(gòu)建routingTable即可, 然后觸發(fā) shard的恢復(fù).

核心邏輯:

        public ClusterState execute(final ClusterState currentState) {
            if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                logger.debug("cluster is already recovered");
                return currentState;
            }

            final ClusterState newState = Function.<ClusterState>identity()
                .andThen(ClusterStateUpdaters::updateRoutingTable)
                .andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
                .apply(currentState);

            return allocationService.reroute(newState, "state recovered");
        }

shard級(jí)別

見其他章節(jié)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容