前言
平時在研究ES的分布式Doc(文檔)寫入操作時,我們已經(jīng)知道對將要寫入的Doc,ES首先會計算其應該寫入到索引的哪個分片,然后在根據(jù)集群metaData中的路由信息判斷此分片所在的ES節(jié)點,最后將寫入請求發(fā)送到這個節(jié)點并完成最終的寫入操作。寫入流程說明如下:

源碼分析
當前ES版本為5.6.16,確定待寫入Doc的Shard編號的主要代碼部分如下:
1. # TransportBulkAction.java
protected void doRun() throws Exception {
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
MetaData metaData = clusterState.metaData();
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
// 根據(jù)路由,找出doc寫入的目標shard id
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
}
1. # OperationRouting.java
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt();
}
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
int shardId = generateShardId(indexMetaData(clusterState, index), id, routing);
return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
TransportBulkAction類的doRun()方法中,ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); 這行代碼獲取最終Doc的ShardID信息。clusterService.operationRouting()方法返回OperationRouting對象,然后緊接著調(diào)用其indexShards(...)方法,接著進入shards(...)方法,最后可看到int shardId = generateShardId(indexMetaData(clusterState, index), id, routing); 這行代碼。這里最終得到分片編號shardId,所以我們重點關注的邏輯就在generateShardId(...)方法中。generateShardId(...)方法接受indexMetaData(索引元數(shù)據(jù))、id(文檔Doc的id號,即為此次寫入請求的id號)、routing(寫入時自定義的routing信息)。下面我們重點看下generateShardId(...)方法內(nèi)部的邏輯。
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
首先方法內(nèi)聲明了String類型的effectiveRouting與int類型的partitionOffset。ES在Doc寫入操作時通常有兩種方式:一種是指定routing,另一種是不指定routing。
指定routing的寫入方式類似為:
POST my_index/doc?routing=tony
{
"name": "tony",
"age": 10
}
routing的設置可以使得寫入的數(shù)據(jù)分布到當前索引下的具體的某些分片中,引入routing機制也是為了更好的搜索性能,使得遍歷的分片范圍可以進一步的縮??;當然同時要面臨著數(shù)據(jù)分布傾斜的風險。在routing機制下ES提供了一個有意義的設置項index.routing_partition_size,此參數(shù)在索引創(chuàng)建時結合著routing一起使用。其意義是使得寫入的數(shù)據(jù)能夠集中的落入到routing_partition_size個分片集合中。比如索引my_index包含3個分片,若此時routing_partition_size的值設為2,那經(jīng)過routing寫入到my_index的數(shù)據(jù)只會落入其中的兩個分片,而另一個會處于閑置狀態(tài)。ES官網(wǎng)指出routing_partition_size的值通常設置為大于1且小于number_of_shards。
當寫入時不帶有routing機制(對應到代碼routing==null, effectiveRouting=id),此時數(shù)據(jù)會經(jīng)過hash(doc_id) % number_primary_shards的方式均勻的寫入到各個主分片中;通過routing機制寫入,想要達到數(shù)據(jù)分布均勻,則上一種計算公式就不能滿足條件了,需要結合doc_id以及routing值重新計算。只是平時大部分的時候我們在寫入ES時并沒有指定routing,在ES內(nèi)部處理上默認會把doc_id當做_routing,因此我們對hash(doc_id) % number_primary_shards這個公式比較熟悉。帶有routing的寫入,effectiveRouting被賦予routing值。接下來代碼中會判斷當前索引是否設置了routing_partition_size選項,若存在則partitionOffset = hash(doc_id) % routing_partition_size值,否則partitionOffset=0。接著到了calculateScaledShardId(...)方法,方法如下:
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
方法中兩行代碼,本質上對應著ES官網(wǎng)上的公式shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards;hash(_routing) + hash(_id) % routing_partition_size等價于Murmur3HashFunction.hash(effectiveRouting) + partitionOffset,然后再對num_primary_shards做取模運算。整個公式的意思即_routing字段用于計算索引內(nèi)的分片子集,然后_id用于選擇該分片子集內(nèi)的一個分片。這樣就完整的結合了routing與doc_id信息計算出具體的分片編號。認真分析代碼我們會發(fā)現(xiàn)以下兩點可疑的地方:
- 取模計算使用的是indexMetaData.getRoutingNumShards這個值,而不是number_of_primary對應的值
- 取模計算后接著又做了除以indexMetaData.getRoutingFactor的除法運算
為啥此處要這樣做呢?經(jīng)過代碼注釋與實踐發(fā)現(xiàn),這個其實是包含索引shrink功能的計算方法。索引shrink允許我們將一個索引由比如原來的8個分片,shrink成為4、2、1三種數(shù)量的分片索引,是一個比較有用的功能。關于factor,這里做個簡單的說明,比如數(shù)字6,存在6、3、2、1四個因子(Factor)。從索引shrink的角度看factor,比如8個shards同時存在4、2、1三個factor(8意義不大),所以indexMetaData.getRoutingFactor的值獲取的就是這個因子數(shù)。另外這里一個重要的知識點是假定一個包含m(偶數(shù))個分片的索引A,經(jīng)過shrink之后(假定shrink為m/2個分片,自然factor=2)變?yōu)樗饕鼴,但此時索引B的getRoutingNumShards值依然為m,而非m/2。有了這個知識點作鋪墊之后,我們就理解了為啥整個公式的計算結果后面要除以indexMetaData.getRoutingFactor的值了。因為公式中除數(shù)getRoutingNumShards沒有做同步的減小,因此中間的計算結果需要同步除以getRoutingFactor的值。通常索引的getRoutingFactor的值默認為1,這個能夠理解,因為通常索引都是沒有做shrink操作的。到此,我們就分析完了shardID的整個計算過程了,計算的本質沒有變化,因為要考慮routing以及shrink的功能,所以計算公式稍微變得復雜了些。
小結
到此結合著代碼,我們分析完了ES內(nèi)部計算一個將要寫入的Doc對應的分片編號的整個過程。計算的本質當然是為了使得數(shù)據(jù)能夠均勻的分布在滿足條件的每個分片上。為了友好的支持其他的功能,計算會綜合考慮到其他的一些影響因素,比如shrink,routing。但計算的本質沒有發(fā)生變化。對于routing與shrink功能,文章中沒有貼出具體的詳細的實踐步驟,這塊希望大家后面動手實踐起來,同時也結合著代碼一起研究起來,一起學習ES,一起進步。