es源碼筆記-Routing

ES 建索引時默認是根據(jù)文檔標識符 _id 將文檔均分至多個分片。當搜索數(shù)據(jù)時,默認查詢所有分片結(jié)果然后匯總,而并不必須知道數(shù)據(jù)到底存在哪個分片上。

路由算法就是根據(jù)routing和文檔id計算目標shardid的過程。
一般情況下,路由計算方式為下面的公式:

shard_num = hash(_routing) % num_primary_shards

默認情況下,_routing值就是文檔id。
ES使用隨機id和Hash算法來確保文檔均勻地分配給分片。當使用自定義id或routing時, id 或 routing 值可能不夠隨機,造成數(shù)據(jù)傾斜,部分分片過大。在這種情況下,可以使用index.routing_partition_size 配置來減少傾斜的風險。routing_partition_size越大,數(shù)據(jù)的分布越均勻。
在設置了index.routing_partition_size的情況下,計算公式為:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

也就是說,對于同一個routing值,hash(_routing)的結(jié)果固定的,hash(_id) % routing_partition_size的結(jié)果有 routing_partition_size 個可能的值,兩個組合在一起,對于同一個routing值的多個doc,也就能計算出 routing_partition_size 可能的shard了,即一個shard集合。
index.routing_partition_size取值應具有大于1且小于index.number_of_shards的值

計算過程的實現(xiàn)如下

    private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
        final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash document
        return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
    }

Search時如何根據(jù)routing找到指定的分片?

例子

GET /{index}/{type}/_search?routing=beijing

通過發(fā)送search請求查詢數(shù)據(jù)。指定了routing是beijing

1、解析流程

當es接收到上面的請求時,交給org.elasticsearch.rest.action.search.RestSearchAction處理,repareRequest方法中將請求體解析為SearchRequest數(shù)據(jù)結(jié)構(gòu)

public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        SearchRequest searchRequest = new SearchRequest();      
        IntConsumer setSize = size -> searchRequest.source().size(size);
        request.withContentOrSourceParamParserOrNull(parser ->
            parseSearchRequest(searchRequest, request, parser, setSize));

        return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
    }

    public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                          XContentParser requestContentParser,
                                          IntConsumer setSize) throws IOException {      
        String searchType = request.param("search_type");
        parseSearchSource(searchRequest.source(), request, setSize);        searchRequest.requestCache(request.paramAsBoolean("request_cache", null));
        String scroll = request.param("scroll");
        searchRequest.routing(request.param("routing")); //接收到routing參數(shù),并封裝到searchRequest中
        searchRequest.preference(request.param("preference"));
        searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));        searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", true));
        checkRestTotalHits(request, searchRequest);
    }
構(gòu)造目的shard列表

prepareRequest方法構(gòu)造請求后通過transport模塊發(fā)送給org.elasticsearch.action.search.TransportSearchAction處理

    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                               OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
                               Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
                               List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
                               ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {

        Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
        GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
                concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
        GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
            searchRequest.getLocalClusterAlias(), remoteShardIterators);

       。。。。
    }

將請求涉及的本集群shard列表和遠程集群的shard列表(遠程集群用于跨集群訪問)合并

其中routing查找指定分片的流程就在org.elasticsearch.cluster.routing.OperationRouting.searchShards(ClusterState, String[], Map<String, Set<String>>, String, ResponseCollectorService, Map<String, Long>)方法中

    public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
                                                           String[] concreteIndices,
                                                           @Nullable Map<String, Set<String>> routing,
                                                           @Nullable String preference,
                                                           @Nullable ResponseCollectorService collectorService,
                                                           @Nullable Map<String, Long> nodeCounts) {
        final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
····
    }

    private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
                                                              @Nullable Map<String, Set<String>> routing) {
   
        for (String index : concreteIndices) {
           ····
            if (effectiveRouting != null) {
                for (String r : effectiveRouting) {
                 
                        set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset)));
                   
                }
        
        }
        return set;
    }

最終調(diào)用calculateScaledShardId方法計算出指定的分片

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容