elasticsearch索引建立過程

開篇

?懷著佛系心態(tài)寫的文章,因為發(fā)現(xiàn)心急依然看不懂代碼,所以只能安慰自己佛系一點,這篇文章希望能夠把elasticsearch的index的創(chuàng)建過程講清楚(包括index但不包括doc的添加過程),希望能夠有幫助。


ES的Meta的組成

?Meta是用來描述數(shù)據(jù)的數(shù)據(jù)。在ES中,Index的mapping結構、配置、持久化狀態(tài)等就屬于meta數(shù)據(jù),集群的一些配置信息也屬于meta。ES的Meta信息可以簡單的理解為包括ClusterState、MetaData、IndexMetaData。

ClusterState
?集群中的每個節(jié)點都會在內存中維護一個當前的ClusterState,表示當前集群的各種狀態(tài)。ClusterState中包含一個MetaData的結構,MetaData中存儲的內容更符合meta的特征,而且需要持久化的信息都在MetaData中,此外的一些變量可以認為是一些臨時狀態(tài),是集群運行中動態(tài)構建出來的。

public class ClusterState implements ToXContentFragment, Diffable<ClusterState> {
    public static final String UNKNOWN_UUID = "_na_";
    public static final long UNKNOWN_VERSION = -1;
    private final long version;
    private final String stateUUID;
    private final RoutingTable routingTable;
    private final DiscoveryNodes nodes;
    private final MetaData metaData;
    private final ClusterBlocks blocks;
    private final ImmutableOpenMap<String, Custom> customs;
    private final ClusterName clusterName;
    private final boolean wasReadFromDiff;
}



MetaData
?MetaData主要是集群的一些配置,集群所有Index的Meta,所有Template的Meta,所有custom的Meta信息。

public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, ToXContentFragment {
    private final String clusterUUID;
    private final long version;
    private final Settings transientSettings;
    private final Settings persistentSettings;
    private final Settings settings;
    private final ImmutableOpenMap<String, IndexMetaData> indices;
    private final ImmutableOpenMap<String, IndexTemplateMetaData> templates;
    private final ImmutableOpenMap<String, Custom> customs;
    private final transient int totalNumberOfShards; 
    private final int numberOfShards;
    private final String[] allIndices;
    private final String[] allOpenIndices;
    private final String[] allClosedIndices;
    private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;



IndexMetaData
?IndexMetaData指具體某個Index的Meta,比如Index的shard、replica、mappings等

public class IndexMetaData implements Diffable<IndexMetaData>, ToXContentFragment {
    private final int routingNumShards;
    private final int routingFactor;
    private final int routingPartitionSize;
    private final int numberOfShards;
    private final int numberOfReplicas;
    private final Index index;
    private final long version;
    private final long[] primaryTerms;
    private final State state;
    private final ImmutableOpenMap<String, AliasMetaData> aliases;
    private final Settings settings;
    private final ImmutableOpenMap<String, MappingMetaData> mappings;
    private final ImmutableOpenMap<String, Custom> customs;
    private final ImmutableOpenIntMap<Set<String>> inSyncAllocationIds;
    private final transient int totalNumberOfShards;
    private final DiscoveryNodeFilters requireFilters;
    private final DiscoveryNodeFilters includeFilters;
    private final DiscoveryNodeFilters excludeFilters;
    private final DiscoveryNodeFilters initialRecoveryFilters;
    private final Version indexCreatedVersion;
    private final Version indexUpgradedVersion;
    private final ActiveShardCount waitForActiveShards;
}


索引建立過程

階段一:校驗參數(shù)階段

  • 校驗當前ClusterState的currentState是否存在該索引,routingTable包含該index、metaData包含該index、alias別名等。

階段二:配置合并階段

  • 合并template和request傳入的mapping、customs 數(shù)據(jù),優(yōu)先級上request配置優(yōu)先于template。

  • 合并template和request的setting,優(yōu)先級上request配置優(yōu)先于template。

階段三:構建IndexSettings階段

  • 構建Settings.Builder indexSettingsBuilder對象,合并templates、request的數(shù)據(jù),輔以默認配置值。

階段四:構建IndexMetaData階段

  • 構建IndexMetaData.Builder的tmpImdBuilder對象并綁定indexSettingsBuilder生成的actualIndexSettings。

  • 通過tmpImdBuilder.build()構建構建IndexMetaData tmpImd對象。

階段五:構建IndexService階段

  • 臨時構建indexService對象,IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList())。

階段六:獲取Index和Map階段

  • 獲取新創(chuàng)建的index和mapping,Index createdIndex = indexService.index()和MapperService mapperService = indexService.mapperService()。

階段七:更新mapping到mapperService階段

  • mapperService.merge()合并request和template合并后生成的最新的mappings。

  • 生成Map<String, MappingMetaData> mappingsMetaData,key是創(chuàng)建mapping的指定的type。

階段八:構建IndexMetaData階段

  • 構建IndexMetaData.Builder indexMetaDataBuilder的indexMetaDataBuilder對象并綁定actualIndexSettings和routingNumShards。

  • indexMetaDataBuilder通過primaryTerm設置primaryTerm、putMapping設置mappingMd,putAlias設置aliasMetaData(template和request請求),putCustom設置customIndexMetaData,indexMetaDataBuilder.state設置state。

  • 創(chuàng)建indexMetaData對象,通過indexMetaData = indexMetaDataBuilder.build()實現(xiàn)。

階段九:更新IndexMetaData階段

  • 將IndexMetaData對象更新到MetaData對象中并返回最新的MetaData, MetaData newMetaData = MetaData.builder(currentState.metaData()).put(indexMetaData, false).build()。

階段十:更新MetaData階段

  • 更新最新的MetaData對象newMetaData對象到ClusterState,ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build()。
public class MetaDataCreateIndexService extends AbstractComponent {

    static class IndexCreationTask extends AckedClusterStateUpdateTask<ClusterStateUpdateResponse> {

        @Override
        /**
         * es當前集群的狀態(tài)ClusterState
         */
        public ClusterState execute(ClusterState currentState) throws Exception {
            Index createdIndex = null;
            String removalExtraInfo = null;
            IndexRemovalReason removalReason = IndexRemovalReason.FAILURE;
            try {
                // 以下是校驗ClusterState中是否存在index索引。
                // 校驗當前currentState是否存在該索引,routingTable包含該index、metaData包含該index、alias別名等。
                validator.validate(request, currentState);

                // 校驗是否已存在別名
                for (Alias alias : request.aliases()) {
                    aliasValidator.validateAlias(alias, request.index(), currentState.metaData());
                }

                // 以下過程是合并request和template的配置信息
                // 從當前的ClusterState查詢是否存在別名
                List<IndexTemplateMetaData> templates = findTemplates(request, currentState);

                // 用來保存請求帶過來的mapping
                Map<String, Custom> customs = new HashMap<>();
                Map<String, Map<String, Object>> mappings = new HashMap<>();
                Map<String, AliasMetaData> templatesAliases = new HashMap<>();
                List<String> templateNames = new ArrayList<>();

                // 保存request的mapping到臨時變量mappings
                for (Map.Entry<String, String> entry : request.mappings().entrySet()) {
                    mappings.put(entry.getKey(), MapperService.parseMapping(xContentRegistry, entry.getValue()));
                }

                // 保存request的customs信息到臨時變量customs當中
                for (Map.Entry<String, Custom> entry : request.customs().entrySet()) {
                    customs.put(entry.getKey(), entry.getValue());
                }

                final Index recoverFromIndex = request.recoverFrom();

                // todo 這里不知道是什么判斷,大概是不需要從某個索引進行恢復
                if (recoverFromIndex == null) {
                    // 合并template的mapping、customs、alias到request的參數(shù)當中
                    for (IndexTemplateMetaData template : templates) {
                        templateNames.add(template.getName());

                        // 合并request和template的mapping變量
                        for (ObjectObjectCursor<String, CompressedXContent> cursor : template.mappings()) {
                            String mappingString = cursor.value.string();
                            // 如果request包含該命名的mapping,就進行合并,mapping以request傳入為主,合并命中的template
                            if (mappings.containsKey(cursor.key)) {
                                XContentHelper.mergeDefaults(mappings.get(cursor.key),
                                    MapperService.parseMapping(xContentRegistry, mappingString));
                            } else {
                                // 如果request不包含該命名的mapping,就直接新增即可
                                mappings.put(cursor.key,
                                    MapperService.parseMapping(xContentRegistry, mappingString));
                            }
                        }
                        // handle custom
                        for (ObjectObjectCursor<String, Custom> cursor : template.customs()) {
                            String type = cursor.key;
                            IndexMetaData.Custom custom = cursor.value;
                            IndexMetaData.Custom existing = customs.get(type);
                            if (existing == null) {
                                customs.put(type, custom);
                            } else {
                                IndexMetaData.Custom merged = existing.mergeWith(custom);
                                customs.put(type, merged);
                            }
                        }

                        // 以request帶的alias作為為主,合并template的alias
                        for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) {
                            AliasMetaData aliasMetaData = cursor.value;

                            if (request.aliases().contains(new Alias(aliasMetaData.alias()))) {
                                continue;
                            }

                            if (templatesAliases.containsKey(cursor.key)) {
                                continue;
                            }

                            if (aliasMetaData.alias().contains("{index}")) {
                                String templatedAlias = aliasMetaData.alias().replace("{index}", request.index());
                                aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias);
                            }

                            aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData());
                            templatesAliases.put(aliasMetaData.alias(), aliasMetaData);
                        }
                    }
                }

                // 以下是創(chuàng)建setting過程,用于創(chuàng)建索引時候使用。
                // 合并templates的setting配置
                Settings.Builder indexSettingsBuilder = Settings.builder();
                if (recoverFromIndex == null) {
                    // apply templates, here, in reverse order, since first ones are better matching
                    for (int i = templates.size() - 1; i >= 0; i--) {
                        indexSettingsBuilder.put(templates.get(i).settings());
                    }
                }

                // 合并request的setting并覆蓋templates的setting
                indexSettingsBuilder.put(request.settings());
                if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
                    indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
                }
                if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
                    indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
                }
                if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && 
                    indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) {
                    indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));
                }

                if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
                    DiscoveryNodes nodes = currentState.nodes();
                    final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
                    indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
                }

                if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) {
                    indexSettingsBuilder.put(SETTING_CREATION_DATE, new DateTime(DateTimeZone.UTC).getMillis());
                }
                indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
                indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());

                // 以下是創(chuàng)建IndexMetaData的builder的過程
                // 組建IndexMetaData的builder
                final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());

                final int routingNumShards;

                // routingNumShards的獲取邏輯沒怎么看懂
                if (recoverFromIndex == null) {
                    Settings idxSettings = indexSettingsBuilder.build();
                    routingNumShards = IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.get(idxSettings);
                } else {
                    assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false
                        : "index.number_of_routing_shards should be present on the target index on resize";
                    final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
                    routingNumShards = sourceMetaData.getRoutingNumShards();
                }
                // 移除routing_shards配置
                indexSettingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey());
                tmpImdBuilder.setRoutingNumShards(routingNumShards);

                if (recoverFromIndex != null) {
                    assert request.resizeType() != null;
                    prepareResizeIndexSettings(
                        currentState, mappings.keySet(), 
                        indexSettingsBuilder, recoverFromIndex, request.index(), request.resizeType());
                }

                // 實際的IndexSetting
                final Settings actualIndexSettings = indexSettingsBuilder.build();

                // IndexMetaData的builder添加實際的索引的設置
                tmpImdBuilder.settings(actualIndexSettings);

                if (recoverFromIndex != null) {

                    final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(recoverFromIndex);
                    final long primaryTerm =
                        IntStream
                            .range(0, sourceMetaData.getNumberOfShards())
                            .mapToLong(sourceMetaData::primaryTerm)
                            .max()
                            .getAsLong();
                    for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
                        tmpImdBuilder.primaryTerm(shardId, primaryTerm);
                    }
                }

                // 創(chuàng)建實際的IndexMetaData對象
                final IndexMetaData tmpImd = tmpImdBuilder.build();
                ActiveShardCount waitForActiveShards = request.waitForActiveShards();
                if (waitForActiveShards == ActiveShardCount.DEFAULT) {
                    waitForActiveShards = tmpImd.getWaitForActiveShards();
                }
                if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
                    throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
                        "]: cannot be greater than number of shard copies [" +
                        (tmpImd.getNumberOfReplicas() + 1) + "]");
                }

                // 創(chuàng)建索引服務IndexService
                final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
                createdIndex = indexService.index();

                // 獲取創(chuàng)建IndexService的mapperService
                MapperService mapperService = indexService.mapperService();
                try {
                    // 合并request和template合并后生成的最新的mappings
                    mapperService.merge(mappings, MergeReason.MAPPING_UPDATE, request.updateAllTypes());
                } catch (Exception e) {
                    removalExtraInfo = "failed on parsing default mapping/mappings on index creation";
                    throw e;
                }

                if (request.recoverFrom() == null) {
                    indexService.getIndexSortSupplier().get();
                }

                final QueryShardContext queryShardContext = indexService.newQueryShardContext(0, null, () -> 0L, null);

                for (Alias alias : request.aliases()) {
                    if (Strings.hasLength(alias.filter())) {
                        aliasValidator.validateAliasFilter(alias.name(), alias.filter(), queryShardContext, xContentRegistry);
                    }
                }
                for (AliasMetaData aliasMetaData : templatesAliases.values()) {
                    if (aliasMetaData.filter() != null) {
                        aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(),
                            queryShardContext, xContentRegistry);
                    }
                }

                // 新建mappingsMetaData對象
                Map<String, MappingMetaData> mappingsMetaData = new HashMap<>();
                for (DocumentMapper mapper : mapperService.docMappers(true)) {
                    MappingMetaData mappingMd = new MappingMetaData(mapper);
                    mappingsMetaData.put(mapper.type(), mappingMd);
                }

                // 以下過程是創(chuàng)建IndexMetaData的過程
                // 創(chuàng)建indexMetaDataBuilder對象,執(zhí)行真正的創(chuàng)建
                final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
                    .settings(actualIndexSettings)
                    .setRoutingNumShards(routingNumShards);

                for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) {
                    indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId));
                }

                for (MappingMetaData mappingMd : mappingsMetaData.values()) {
                    indexMetaDataBuilder.putMapping(mappingMd);
                }

                for (AliasMetaData aliasMetaData : templatesAliases.values()) {
                    indexMetaDataBuilder.putAlias(aliasMetaData);
                }
                for (Alias alias : request.aliases()) {
                    AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter())
                        .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();
                    indexMetaDataBuilder.putAlias(aliasMetaData);
                }

                for (Map.Entry<String, Custom> customEntry : customs.entrySet()) {
                    indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());
                }

                indexMetaDataBuilder.state(request.state());

                // 創(chuàng)建indexMetaData對象
                final IndexMetaData indexMetaData;
                try {
                    indexMetaData = indexMetaDataBuilder.build();
                } catch (Exception e) {
                    removalExtraInfo = "failed to build index metadata";
                    throw e;
                }

                indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(),
                    indexMetaData.getSettings());

                // 創(chuàng)建新的MetaData
                MetaData newMetaData = MetaData.builder(currentState.metaData())
                    .put(indexMetaData, false)
                    .build();

                ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
                if (!request.blocks().isEmpty()) {
                    for (ClusterBlock block : request.blocks()) {
                        blocks.addIndexBlock(request.index(), block);
                    }
                }
                blocks.updateBlocks(indexMetaData);

                // 阻塞blocks并更新Meta元數(shù)據(jù)
                ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();

                if (request.state() == State.OPEN) {
                    RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
                        .addAsNew(updatedState.metaData().index(request.index()));
                    updatedState = allocationService.reroute(
                        ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
                        "index [" + request.index() + "] created");
                }
                removalExtraInfo = "cleaning up after validating index on master";
                removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED;
                return updatedState;
            } finally {
                if (createdIndex != null) {
                    indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo);
                }
            }
        }
    }
}


創(chuàng)建IndexService過程

這部分邏輯暫時沒有深入研究,這里只放在這里用于引導提醒一下,具體不做深入研究。

public class IndicesService extends AbstractLifecycleComponent
    implements IndicesClusterStateService.AllocatedIndices<IndexShard, IndexService>, IndexService.ShardStoreDeleter {

    public synchronized IndexService createIndex(
            final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
        ensureChangesAllowed();
        if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
            throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
        }
        final Index index = indexMetaData.getIndex();
        if (hasIndex(index)) {
            throw new ResourceAlreadyExistsException(index);
        }
        List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
        final IndexEventListener onStoreClose = new IndexEventListener() {
            @Override
            public void onStoreClosed(ShardId shardId) {
                indicesQueryCache.onClose(shardId);
            }
        };
        finalListeners.add(onStoreClose);
        finalListeners.add(oldShardsStats);
        final IndexService indexService =
                createIndexService(
                        "create index",
                        indexMetaData,
                        indicesQueryCache,
                        indicesFieldDataCache,
                        finalListeners,
                        indexingMemoryController);
        boolean success = false;
        try {
            indexService.getIndexEventListener().afterIndexCreated(indexService);
            indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
            success = true;
            return indexService;
        } finally {
            if (success == false) {
                indexService.close("plugins_failed", true);
            }
        }
    }


    private synchronized IndexService createIndexService(final String reason,
                                                         IndexMetaData indexMetaData,
                                                         IndicesQueryCache indicesQueryCache,
                                                         IndicesFieldDataCache indicesFieldDataCache,
                                                         List<IndexEventListener> builtInListeners,
                                                         IndexingOperationListener... indexingOperationListeners) throws IOException {
        final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
        logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
            indexMetaData.getIndex(),
            idxSettings.getNumberOfShards(),
            idxSettings.getNumberOfReplicas(),
            reason);

        final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
        for (IndexingOperationListener operationListener : indexingOperationListeners) {
            indexModule.addIndexOperationListener(operationListener);
        }
        pluginsService.onIndexModule(indexModule);
        for (IndexEventListener listener : builtInListeners) {
            indexModule.addIndexEventListener(listener);
        }
        return indexModule.newIndexService(
                nodeEnv,
                xContentRegistry,
                this,
                circuitBreakerService,
                bigArrays,
                threadPool,
                scriptService,
                client,
                indicesQueryCache,
                mapperRegistry,
                indicesFieldDataCache,
                namedWriteableRegistry
        );
    }
}


參考文章

Elasticsearch分布式一致性原理剖析(二)-Meta篇
elasticsearch index 之 create index(二)


招聘信息

【招賢納士】

歡迎熱愛技術、熱愛生活的你和我成為同事,和貝貝共同成長。

貝貝集團誠招算法、大數(shù)據(jù)、BI、Java、PHP、android、iOS、測試、運維、DBA等人才,有意可投遞zhi.wang@beibei.com。

貝貝集團創(chuàng)建于2011年,旗下?lián)碛胸愗惥W(wǎng)、貝店、貝貸等平臺,致力于成為全球領先的家庭消費平臺。

貝貝創(chuàng)始團隊來自阿里巴巴,先后獲得IDG資本、高榕資本、今日資本、新天域資本、北極光等數(shù)億美金的風險投資。

公司地址:杭州市江干區(qū)普盛巷9號東谷創(chuàng)業(yè)園(上下班有多趟班車)

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容