elasticsearch shard split 分析(四)

上篇文章中最后提到索引創(chuàng)建完畢后會生成一個集群變更事件,而該事件是通過clusterStatePublisher發(fā)送給集群中每個節(jié)點的。本文繼續(xù)分析在shard split中,目標shard底層的數(shù)據(jù)是怎么恢復的。
clusterStatePublisher是一個函數(shù)式接口對象,被設(shè)置為ZenDiscovery模塊中的publish方法。

ZenDiscovery

public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
        ClusterState newState = clusterChangedEvent.state();
        assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();

        // state got changed locally (maybe because another master published to us)
        if (clusterChangedEvent.previousState() != this.committedState.get()) {
            throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
        }

        pendingStatesQueue.addPending(newState);

        try {
           //將本次集群狀態(tài)更改事件發(fā)到其它節(jié)點,內(nèi)部會等到至少minMaster個節(jié)點響應后才將這次更改標記為commited,同時會發(fā)送commited信息到其它節(jié)點。
            publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
        } catch (FailedToCommitClusterStateException t) {
            // cluster service logs a WARN message
            logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
                newState.version(), electMaster.minimumMasterNodes());

            synchronized (stateMutex) {
                pendingStatesQueue.failAllStatesAndClear(
                    new ElasticsearchException("failed to publish cluster state"));

                rejoin("zen-disco-failed-to-publish");
            }
            throw t;
        }
        //執(zhí)行到這里說明集群狀態(tài)已經(jīng)被commited了。
        final DiscoveryNode localNode = newState.getNodes().getLocalNode();
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean processedOrFailed = new AtomicBoolean();
        pendingStatesQueue.markAsCommitted(newState.stateUUID(),
            new PendingClusterStatesQueue.StateProcessedListener() {
                @Override
                public void onNewClusterStateProcessed() {
                    processedOrFailed.set(true);
                    latch.countDown();
                    ackListener.onNodeAck(localNode, null);
                }

                @Override
                public void onNewClusterStateFailed(Exception e) {
                    processedOrFailed.set(true);
                    latch.countDown();
                    ackListener.onNodeAck(localNode, e);
                    logger.warn(
                        (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
                            "failed while applying cluster state locally [{}]",
                            clusterChangedEvent.source()),
                        e);
                }
            });

        synchronized (stateMutex) {
            if (clusterChangedEvent.previousState() != this.committedState.get()) {
                throw new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes");
            }
           //開始應用本次集群變更,這里只是發(fā)消息的節(jié)點處理本次集群變更,其它節(jié)點是收到commit信息后才會調(diào)用該函數(shù)處理本次集群變更。
            boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +
                " committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");
            if (sentToApplier == false && processedOrFailed.get() == false) {
                assert false : "cluster state published locally neither processed nor failed: " + newState;
                logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",
                    newState.version());
                return;
            }
        }
        // indefinitely wait for cluster state to be applied locally
        try {
            latch.await();
        } catch (InterruptedException e) {
            logger.debug(
                (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
                    "interrupted while applying cluster state locally [{}]",
                    clusterChangedEvent.source()),
                e);
            Thread.currentThread().interrupt();
        }
    }

這里也不詳細講解集群狀態(tài)怎么publish的。先簡單介紹下,該函數(shù)會把新的集群狀態(tài)發(fā)到集群中的除自身節(jié)點外的所有節(jié)點。當至少有minMaster節(jié)點響應后,將這次集群狀態(tài)修改變?yōu)閏ommited。同時會發(fā)送commit信息到其它節(jié)點。當其它節(jié)點收到commit信息后會processNextCommittedClusterState函數(shù)處理已經(jīng)被commit的集群狀態(tài)。在該函數(shù)中就是應用這次集群狀態(tài)更改的地方,來保證底層數(shù)據(jù)和commited的集群狀態(tài)一致。

boolean processNextCommittedClusterState(String reason) {
        assert Thread.holdsLock(stateMutex);

       ...
       //前面主要是一些異常檢測和狀態(tài)檢測,到這里才是應用集群狀態(tài)的入口
        clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
            this::clusterState,
            new ClusterStateTaskListener() {
                @Override
                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                    try {
                        pendingStatesQueue.markAsProcessed(newClusterState);
                    } catch (Exception e) {
                        onFailure(source, e);
                    }
                }

                @Override
                public void onFailure(String source, Exception e) {
                    logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);
                    try {
                        // TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around
                        // for too long.
                        pendingStatesQueue.markAsFailed(newClusterState, e);
                    } catch (Exception inner) {
                        inner.addSuppressed(e);
                        logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
                    }
                }
            });

        return true;
    }

這里省略了一些異??刂拼a,先直接找到應用集群狀態(tài)的入口。這里是調(diào)用了clusterApplier的onNewClusterState函數(shù),clusterAplier是一個ClusterApplierService對象。并且傳入了一個listener,當處理完成會調(diào)用listener的clusterStateProcessed函數(shù),將當前這次集群狀態(tài)更改標記為已經(jīng)處理。

ClusterApplierService

public void onNewClusterState(final String source, final java.util.function.Supplier<ClusterState> clusterStateSupplier,
                                  final ClusterStateTaskListener listener) {
        Function<ClusterState, ClusterState> applyFunction = currentState -> {
            ClusterState nextState = clusterStateSupplier.get();
            if (nextState != null) {
                return nextState;
            } else {
                return currentState;
            }
        };
        submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
    }
private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
                                       final Function<ClusterState, ClusterState> executor,
                                       final ClusterStateTaskListener listener) {
        if (!lifecycle.started()) {
            return;
        }
        try {
            UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
            if (config.timeout() != null) {
                threadPoolExecutor.execute(updateTask, config.timeout(),
                    () -> threadPool.generic().execute(
                        () -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))));
            } else {
                threadPoolExecutor.execute(updateTask);
            }
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }

看到這里是不是覺得和前面介紹的創(chuàng)建索引的任務執(zhí)行有點類似,其實創(chuàng)建索引的任務提交是在ClusterService中完成的,而集群狀態(tài)應用任務是在ClusterApplierService中完成的。創(chuàng)建索引會異步執(zhí)行一個task來完成,最終結(jié)果是影響到集群狀態(tài),生成新的集群狀態(tài)。而當生成新的集群狀態(tài)并且該新的其群狀態(tài)被commit后,保證背后的數(shù)據(jù)處于新的集群狀態(tài)又會創(chuàng)建一個異步的task來執(zhí)行,該task是在ClusterApplierService中提交的。
這里同樣是先產(chǎn)生一個UpdateTask任務,傳入的config對象標記為Priority.HIGH,并且沒有設(shè)置超時。所以直接調(diào)用threadPoolExecutor的execute方法。這里的threadPoolExecutor也是一個PrioritizedEsThreadPoolExecutor對象,因此執(zhí)行任務的過程和之前創(chuàng)建索引的任務執(zhí)行過程一樣了。這里就不在分析。直接看UpdateTask的run方法

ClusterApplierService.UpdateTask

public void run() {
            runTask(this);
        }

ClusterApplierService

protected void runTask(UpdateTask task) {
        if (!lifecycle.started()) {
            logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
            return;
        }

        logger.debug("processing [{}]: execute", task.source);
        final ClusterState previousClusterState = state.get();

        long startTimeNS = currentTimeInNanos();
        final ClusterState newClusterState;
        try {
           //獲取新的待處理的集群狀態(tài)
            newClusterState = task.apply(previousClusterState);
        } catch (Exception e) {
            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
            if (logger.isTraceEnabled()) {
                logger.trace(
                    (Supplier<?>) () -> new ParameterizedMessage(
                        "failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
                        executionTime,
                        previousClusterState.version(),
                        task.source,
                        previousClusterState.nodes(),
                        previousClusterState.routingTable(),
                        previousClusterState.getRoutingNodes()),
                    e);
            }
            warnAboutSlowTaskIfNeeded(executionTime, task.source);
            task.listener.onFailure(task.source, e);
            return;
        }

        if (previousClusterState == newClusterState) {
            task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
            TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
            logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
            warnAboutSlowTaskIfNeeded(executionTime, task.source);
        } else {
            //新的集群狀態(tài)和老的集群狀態(tài)不相等
            if (logger.isTraceEnabled()) {
                logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
            } else if (logger.isDebugEnabled()) {
                logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
            }
            try {
                applyChanges(task, previousClusterState, newClusterState);
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
                    executionTime, newClusterState.version(),
                    newClusterState.stateUUID());
                warnAboutSlowTaskIfNeeded(executionTime, task.source);
            } catch (Exception e) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
                final long version = newClusterState.version();
                final String stateUUID = newClusterState.stateUUID();
                final String fullState = newClusterState.toString();
                logger.warn(
                    (Supplier<?>) () -> new ParameterizedMessage(
                        "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
                        executionTime,
                        version,
                        stateUUID,
                        task.source,
                        fullState),
                    e);
                // TODO: do we want to call updateTask.onFailure here?
            }
        }
    }

當新的集群狀態(tài)和老的集群狀態(tài)不相等時,調(diào)用applyChanges方法

private void applyChanges(UpdateTask task, ClusterState previousClusterState, ClusterState newClusterState) {
        //封裝一個ClusterChanedEvent
        ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(task.source, newClusterState, previousClusterState);
        // new cluster state, notify all listeners
        final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
        if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
            String summary = nodesDelta.shortSummary();
            if (summary.length() > 0) {
                logger.info("{}, reason: {}", summary, task.source);
            }
        }
       //判斷所有的節(jié)點是否都能連接上
        nodeConnectionsService.connectToNodes(newClusterState.nodes());

        logger.debug("applying cluster state version {}", newClusterState.version());
        try {
            // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
            if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
                final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
                clusterSettings.applySettings(incomingSettings);
            }
        } catch (Exception ex) {
            logger.warn("failed to apply cluster settings", ex);
        }
       //這里是入口
        logger.debug("apply cluster state with version {}", newClusterState.version());
        callClusterStateAppliers(clusterChangedEvent);

        nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());

        logger.debug("set locally applied cluster state to version {}", newClusterState.version());
        state.set(newClusterState);

        callClusterStateListeners(clusterChangedEvent);

        task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
    }

直接進入callClusterStateAppliers函數(shù)

private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
        clusterStateAppliers.forEach(applier -> {
            try {
                logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
                applier.applyClusterState(clusterChangedEvent);
            } catch (Exception ex) {
                logger.warn("failed to notify ClusterStateApplier", ex);
            }
        });
    }

這里會對clusterStateAppliers數(shù)組中的每個applier都調(diào)用一次applyClusterState函數(shù),我們直接看IndicesClusterStateService把,這也是一個ClusterStateApplier。最終是在IndicesClusterStateService里面處理的。

IndicesClusterStateService

public synchronized void applyClusterState(final ClusterChangedEvent event) {
        if (!lifecycle.started()) {
            return;
        }

        final ClusterState state = event.state();

        // we need to clean the shards and indices we have on this node, since we
        // are going to recover them again once state persistence is disabled (no master / not recovered)
        // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
        if (state.blocks().disableStatePersistence()) {
            for (AllocatedIndex<? extends Shard> indexService : indicesService) {
                indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED,
                    "cleaning index (disabled block persistence)"); // also cleans shards
            }
            return;
        }

        updateFailedShardsCache(state);

        deleteIndices(event); // also deletes shards of deleted indices

        removeUnallocatedIndices(event); // also removes shards of removed indices

        failMissingShards(state);

        removeShards(state);   // removes any local shards that doesn't match what the master expects

        updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache

        createIndices(state);

        createOrUpdateShards(state);
    }

在這里統(tǒng)一對各種狀態(tài)處理,比如刪除索引,刪除shard等。因為split其實是一種創(chuàng)建索引,所以直接進入createIndices函數(shù)。

private void createIndices(final ClusterState state) {
        // we only create indices for shards that are allocated
       //從這里可以看到每個節(jié)點只創(chuàng)建那些分配給自己的索引
        RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
        if (localRoutingNode == null) {
            return;
        }
        // create map of indices to create with shards to fail if index creation fails
        final Map<Index, List<ShardRouting>> indicesToCreate = new HashMap<>();
        for (ShardRouting shardRouting : localRoutingNode) {
            //遍歷分配到該節(jié)點的每個shard
            if (failedShardsCache.containsKey(shardRouting.shardId()) == false) {
                final Index index = shardRouting.index();
                //如果該節(jié)點上還沒有創(chuàng)建該索引,則放入indicesToCreate,表示需要創(chuàng)建
                if (indicesService.indexService(index) == null) {
                    indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting);
                }
            }
        }

        for (Map.Entry<Index, List<ShardRouting>> entry : indicesToCreate.entrySet()) {
            final Index index = entry.getKey();
            final IndexMetaData indexMetaData = state.metaData().index(index);
            logger.debug("[{}] creating index", index);

            AllocatedIndex<? extends Shard> indexService = null;
            try {
                //到這里開始創(chuàng)建索引,執(zhí)行過程和前面講過的一樣,這里不再分析。
                indexService = indicesService.createIndex(indexMetaData, buildInIndexListener);
                if (indexService.updateMapping(indexMetaData) && sendRefreshMapping) {
                    nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
                        new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(),
                            indexMetaData.getIndexUUID(), state.nodes().getLocalNodeId())
                    );
                }
            } catch (Exception e) {
                final String failShardReason;
                if (indexService == null) {
                    failShardReason = "failed to create index";
                } else {
                    failShardReason = "failed to update mapping for index";
                    indicesService.removeIndex(index, FAILURE, "removing index (mapping update failed)");
                }
                for (ShardRouting shardRouting : entry.getValue()) {
                    sendFailShard(shardRouting, failShardReason, e, state);
                }
            }
        }
    }

這里有人可能會好奇,之前不是都已經(jīng)創(chuàng)建好索引了嗎,為什么這里還要在創(chuàng)建索引。其實之前創(chuàng)建索引是在master節(jié)點上創(chuàng)建的,并且生成一個集群變更事件。然后把這個事件發(fā)送到其它節(jié)點上。這個時候其它節(jié)點上其實還沒有創(chuàng)建該索引,當其它節(jié)點收到集群變更事件后,就會執(zhí)行到這一步,即進入到createIndices里邊創(chuàng)建索引。創(chuàng)建索引也是調(diào)用IndicesService的createIndex函數(shù),和前文介紹的過程是一樣的。
當索引創(chuàng)建完后回到applyClusterState函數(shù)中,后面還有一步創(chuàng)建shard的過程。接下來看下createOrUpdateShards是怎么實現(xiàn)的。

private void createOrUpdateShards(final ClusterState state) {
        //同樣只創(chuàng)建分配給自己的shard
        RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
        if (localRoutingNode == null) {
            return;
        }

        DiscoveryNodes nodes = state.nodes();
        RoutingTable routingTable = state.routingTable();

        for (final ShardRouting shardRouting : localRoutingNode) {
           //遍歷每一個shard
            ShardId shardId = shardRouting.shardId();
            if (failedShardsCache.containsKey(shardId) == false) {
                AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
               //索引必須在shard之前建好
                assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";
                Shard shard = indexService.getShardOrNull(shardId.id());
                if (shard == null) {
                    assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
                    //創(chuàng)建shard
                    createShard(nodes, routingTable, shardRouting, state);
                } else {
                    updateShard(nodes, shardRouting, shard, routingTable, state);
                }
            }
        }
    }

這里遍歷每一個分配給自己的shard,如果shard還沒創(chuàng)建,則調(diào)用createShard創(chuàng)建shard

private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
        assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;

        DiscoveryNode sourceNode = null;
        if (shardRouting.recoverySource().getType() == Type.PEER)  {
           //如果shard是從peer恢復,則找到恢復源節(jié)點,之前講過split創(chuàng)建的索引的主shard恢復都是local shard 類型,也就是從本地的shard恢復
            sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);
            if (sourceNode == null) {
                logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
                return;
            }
        }

        try {
            logger.debug("{} creating shard", shardRouting.shardId());
            RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
            indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting),
                repositoriesService, failedShardHandler, globalCheckpointSyncer);
        } catch (Exception e) {
            failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
        }
    }

這里直接調(diào)用IndicesService的createShard函數(shù)創(chuàng)建索引

IndicesService

public IndexShard createShard(ShardRouting shardRouting, RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                                  PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                                  Consumer<IndexShard.ShardFailure> onShardFailure,
                                  Consumer<ShardId> globalCheckpointSyncer) throws IOException {
        ensureChangesAllowed();
        IndexService indexService = indexService(shardRouting.index());
        //生成IndexShard對象
        IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer);
        indexShard.addShardFailureCallback(onShardFailure);
        //開始恢復該shard的數(shù)據(jù)集
        indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
            (type, mapping) -> {
                assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS:
                    "mapping update consumer only required by local shards recovery";
                try {
                    client.admin().indices().preparePutMapping()
                        .setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
                        .setType(type)
                        .setSource(mapping.source().string(), XContentType.JSON)
                        .get();
                } catch (IOException ex) {
                    throw new ElasticsearchException("failed to stringify mapping source", ex);
                }
            }, this);
        return indexShard;
    }

這里首先調(diào)用IndexService創(chuàng)建一個shard,其僅僅是生成一個IndexShard對象,并確定一些shard的元數(shù)據(jù),比如shard的目錄等信息。然后把創(chuàng)建好的shard保存在IndexService中shards成員變量中。shard的元數(shù)據(jù)創(chuàng)建好了,但shard的底層數(shù)據(jù)并沒有創(chuàng)建好,所以會調(diào)用IndexShard的startRecovery函數(shù)開始為shard準備數(shù)據(jù)。

IndexShard

public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
                              PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
                              BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
                              IndicesService indicesService) {
        // TODO: Create a proper object to encapsulate the recovery context
        // all of the current methods here follow a pattern of:
        // resolve context which isn't really dependent on the local shards and then async
        // call some external method with this pointer.
        // with a proper recovery context object we can simply change this to:
        // startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {
        //     markAsRecovery("from " + source.getShortDescription(), recoveryState);
        //     threadPool.generic().execute()  {
        //           onFailure () { listener.failure() };
        //           doRun() {
        //                if (source.recover(this)) {
        //                  recoveryListener.onRecoveryDone(recoveryState);
        //                }
        //           }
        //     }}
        // }
        assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
        switch (recoveryState.getRecoverySource().getType()) {
            ...
            //省略了其它幾種類型的恢復,因為split的主shard的恢復類型為這種
            case LOCAL_SHARDS:
                final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
                final Index resizeSourceIndex = indexMetaData.getResizeSourceIndex();
                final List<IndexShard> startedShards = new ArrayList<>();
                final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);
                final Set<ShardId> requiredShards;
                final int numShards;
                if (sourceIndexService != null) {
                    //選擇恢復的源shard
                    requiredShards = IndexMetaData.selectRecoverFromShards(shardId().id(),
                        sourceIndexService.getMetaData(), indexMetaData.getNumberOfShards());
                    for (IndexShard shard : sourceIndexService) {
                        if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {
                           判斷所有的源shard是不是都處于started狀態(tài)
                            startedShards.add(shard);
                        }
                    }
                    numShards = requiredShards.size();
                } else {
                    numShards = -1;
                    requiredShards = Collections.emptySet();
                }
               //如果需要的源shard中有shard不屬于started狀態(tài)則報錯
                if (numShards == startedShards.size()) {
                    assert requiredShards.isEmpty() == false;
                    markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread

                  //另起一個線程開始執(zhí)行數(shù)據(jù)恢復
                    threadPool.generic().execute(() -> {
                        try {
                            if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
                                .filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
                                recoveryListener.onRecoveryDone(recoveryState);
                            }
                        } catch (Exception e) {
                            recoveryListener.onRecoveryFailure(recoveryState,
                                new RecoveryFailedException(recoveryState, null, e), true);
                        }
                    });
                } else {
                    final RuntimeException e;
                    if (numShards == -1) {
                        e = new IndexNotFoundException(resizeSourceIndex);
                    } else {
                        e = new IllegalStateException("not all required shards of index " + resizeSourceIndex
                            + " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "
                            + shardId());
                    }
                    throw e;
                }
                break;
            default:
                throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
        }
    }

數(shù)據(jù)恢復會新起一個線程,執(zhí)行recoverFromLocalShards函數(shù)

public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards) throws IOException {
        assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
        assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " + recoveryState.getRecoverySource();
        final List<LocalShardSnapshot> snapshots = new ArrayList<>();
        try {
            for (IndexShard shard : localShards) {
                snapshots.add(new LocalShardSnapshot(shard));
            }

            // we are the first primary, recover from the gateway
            // if its post api allocation, the index should exists
            assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
            StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
            return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
        } finally {
            IOUtils.close(snapshots);
        }
    }

這里使用StoreRecovery來恢復數(shù)據(jù)

StoreRecovery

boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, final IndexShard indexShard, final List<LocalShardSnapshot> shards) throws IOException {
        if (canRecover(indexShard)) {
            ...
            //開始執(zhí)行恢復
            return executeRecovery(indexShard, () -> {
                logger.debug("starting recovery from local shards {}", shards);
                try {
                    final Directory directory = indexShard.store().directory(); // don't close this directory!!
                    final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
                    final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
                    final long maxUnsafeAutoIdTimestamp =
                            shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
                    //在這個函數(shù)里生成shard底層的lucene索引
                    addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp,
                        indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested);
                    internalRecoverFromStore(indexShard);
                    // just trigger a merge to do housekeeping on the
                    // copied segments - we will also see them in stats etc.
                    indexShard.getEngine().forceMerge(false, -1, false, false, false);
                } catch (IOException ex) {
                    throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
                }

            });
        }
        return false;
    }
void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
            final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split,
            boolean hasNested) throws IOException {

        // clean target directory (if previous recovery attempt failed) and create a fresh segment file with the proper lucene version
        Lucene.cleanLuceneIndex(target);
        assert sources.length > 0;
        final int luceneIndexCreatedVersionMajor = Lucene.readSegmentInfos(sources[0]).getIndexCreatedVersionMajor();
        new SegmentInfos(luceneIndexCreatedVersionMajor).commit(target);
       //如果支持硬鏈接,則通過硬鏈接的方式將目標索引的底層文件指向源索引的文件,否則需要拷貝源索引的文件
        final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);

        IndexWriterConfig iwc = new IndexWriterConfig(null)
            .setCommitOnClose(false)
            // we don't want merges to happen here - we call maybe merge on the engine
            // later once we stared it up otherwise we would need to wait for it here
            // we also don't specify a codec here and merges should use the engines for this index
            .setMergePolicy(NoMergePolicy.INSTANCE)
            .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
        if (indexSort != null) {
            iwc.setIndexSort(indexSort);
        }

        try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
           //將源索引的目錄下的文件添加到目標索引
            writer.addIndexes(sources);
            if (split) {
                //通過deletebyquery的方式將不屬于該shard的數(shù)據(jù)刪掉,ShardSplittingQuery就是負責查出源shard中split后不屬于該目標shard的數(shù)據(jù)。
                writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested));
            }
            /*
             * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
             * the source shards. This ensures that history after this maximum sequence number can advance and we have correct
             * document-level semantics.
             */
            writer.setLiveCommitData(() -> {
                final HashMap<String, String> liveCommitData = new HashMap<>(3);
                liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
                liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
                liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
                return liveCommitData.entrySet().iterator();
            });
            writer.commit();
        }
    }

通過addIndices方法將shard底層的lucene索引構(gòu)造出來后會進入internalRecoverFromStore將該shard 的engine創(chuàng)建,該shard基本上就恢復完畢。回到IndicesClusterStateService的createShard函數(shù),在調(diào)用IndicesService的createShard的時候傳入了一個RecoveryListener對象。

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

        private final ShardRouting shardRouting;

        private RecoveryListener(ShardRouting shardRouting) {
            this.shardRouting = shardRouting;
        }

        @Override
        public void onRecoveryDone(RecoveryState state) {
            shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
        }

        @Override
        public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
            handleRecoveryFailure(shardRouting, sendShardFailure, e);
        }
    }

當恢復完畢后就會調(diào)用該對象的onRecoveryDone方法,在這方法里邊會發(fā)起一個shardStarted命令,將該shard的狀態(tài)從INITIALIZING狀態(tài)變?yōu)镾TARTED狀態(tài)。
至此,shard就已經(jīng)創(chuàng)建完畢,可以接受外邊的請求了

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容