ElasticSearch Bulk 源碼解析

本來(lái)應(yīng)該先有這篇文章,后有如何提高ElasticSearch 索引速度才對(duì)。不過(guò)當(dāng)時(shí)覺(jué)得后面一篇文章會(huì)更有實(shí)際意義一些,所以先寫(xiě)了后面那篇文章。結(jié)果現(xiàn)在這篇文章晚了20多天。

前言

讀這篇文章前,建議先看看ElasticSearch Rest/RPC 接口解析,有利于你把握ElasticSearch接受處理請(qǐng)求的脈絡(luò)。對(duì)于RPC類的調(diào)用,我會(huì)在后文簡(jiǎn)單提及,只是endpoint不一樣,內(nèi)部處理邏輯還是一樣的。這篇只會(huì)講IndexRequest,其他如DeleteRequest,UpdateRequest之類的,我們暫時(shí)不涉及。

類處理路徑

RestBulkAction -> 
            TransportBulkAction -> 
                       TransportShardBulkAction

其中TransportShardBulkAction比較特殊,有個(gè)繼承結(jié)構(gòu):

   TransportShardBulkAction < TransportReplicationAction < TransportAction

主入口是TransportAction,具體的業(yè)務(wù)邏輯實(shí)現(xiàn)分布到子類(TransportReplicationAction)和孫子類(TransportShardBulkAction)里了。

另外,我們也會(huì)提及org.elasticsearch.index.engine.Engine相關(guān)的東西,從而讓大家清楚的了解ES是如何和Lucene關(guān)聯(lián)上的。

RestBulkAction

入口自然是org.elasticsearch.rest.action.bulk.RestBulkAction,一個(gè)請(qǐng)求會(huì)構(gòu)建一個(gè)BulkRequest對(duì)象,BulkRequest.add方法會(huì)解析你提交的文本。對(duì)于類型為index或者create的(還記得bulk提交的文本格式是啥樣子的么?),都會(huì)被構(gòu)建出IndexRequest對(duì)象,這些解析后的對(duì)象會(huì)被放到BulkRequest對(duì)象的屬性requests里。當(dāng)然如果是update,delete等則會(huì)構(gòu)建出其他對(duì)象,但都會(huì)放到requests里。

public class BulkRequest extends ActionRequest<BulkRequest> implements CompositeIndicesRequest {
    //這個(gè)就是前面提到的requests
    final List<ActionRequest> requests = new ArrayList<>();  

//這個(gè)復(fù)雜的方法就是通過(guò)http請(qǐng)求參數(shù)解析出
//IndexRequest,DeleteRequest,UpdateRequest等然后放到requests里
public BulkRequest add(BytesReference data, 
@Nullable String defaultIndex, 
@Nullable String defaultType, 
@Nullable String defaultRouting, 
@Nullable String[] defaultFields, 
@Nullable Object payload, boolean allowExplicitIndex) throws Exception {
        XContent xContent = XContentFactory.xContent(data);
        int line = 0;
        int from = 0;
        int length = data.length();
        byte marker = xContent.streamSeparator();
        while (true) {

接著通過(guò)NodeClient將請(qǐng)求發(fā)送到TransportBulkAction類(回憶下之前文章里提到的映射關(guān)系,譬如 *** Transport*Action,兩層映射關(guān)系解析 ** )。對(duì)應(yīng)的方法如下:

//這里的client其實(shí)是NodeClient
client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {

TransportBulkAction

看這個(gè)類的簽名:

public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

實(shí)現(xiàn)了HandledTransportAction,說(shuō)明這個(gè)類同時(shí)也是RPC接口的邏輯處理類。如果你點(diǎn)進(jìn)HandledTransportAction就能看到ES里經(jīng)典的messageReceived方法了。這個(gè)是題外話

該類對(duì)應(yīng)的入口是:

protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {

這里的bulkRequest 就是前面RestBulkAction組裝好的。該方法第一步是判斷是不是需要自動(dòng)建索引,如果索引不存在,就自動(dòng)創(chuàng)建了。

接著通過(guò)executeBulk方法進(jìn)入原來(lái)的流程。在該方法中,對(duì)bulkRequest.requests 進(jìn)行了兩次for循環(huán)。

第一次判定如果是IndexRequest就調(diào)用IndexRequest.process方法,主要是為了解析出timestamp,routing,id,parent 等字段。

第二次是為了對(duì)數(shù)據(jù)進(jìn)行分揀。大致是為了形成這么一種結(jié)構(gòu):

//這里的BulkItemRequest來(lái)源于 IndexRequest等
Map[ShardId, List[BulkItemRequest]]

接著對(duì)新形成的這個(gè)結(jié)構(gòu)(ShardId -> List[BulkItemRequest])做循環(huán),也就是針對(duì)每個(gè)ShardId里的數(shù)據(jù)進(jìn)行統(tǒng)一處理。有了ShardId,bulkRequest,List[BulkItemRequest]等信息后,統(tǒng)一封裝成BulkShardRequest。從名字看就很好理解,就是對(duì)屬于同一ShardId的數(shù)據(jù)構(gòu)建一個(gè)新的類似BulkRequest的對(duì)象。

接著就到TransportShardBulkAction,TransportReplicationAction,TransportAction 三代人出場(chǎng)了:

//這里的shardBulkAction 是TransportShardBulkAction
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {

TransportReplicationAction/TransportShardBulkAction

TransportAction是一個(gè)通用的主類,具體邏輯還是其子類來(lái)實(shí)現(xiàn)。雖然前面提到shardBulkActionTransportShardBulkAction,但其實(shí)流程邏輯還是TransportReplicationAction來(lái)完成的。入口在該類的doExecute方法:

@Override
    protected void doExecute(Request request, ActionListener<Response> listener) {
        new PrimaryPhase(request, listener).run();
    }

我們知道在ES里有主從分片的概念,所以一條數(shù)據(jù)被索引后需要經(jīng)過(guò)兩個(gè)階段:

  1. 將數(shù)據(jù)寫(xiě)入Primary(主分片)
  2. 將數(shù)據(jù)寫(xiě)入Replication(從分片)

至于為什么不直接從Primary進(jìn)行復(fù)制,而是將數(shù)據(jù)分別寫(xiě)入到Primary和Replication我覺(jué)得主要考慮如果一旦Primary是損壞的,不至于影響到Replication(考慮下,如果Primary是損壞的文件,然后所有的Replication如果是直接復(fù)制過(guò)來(lái),就都?jí)牧耍?/p>

又扯遠(yuǎn)了。我們看到doExecute 首先是進(jìn)入PrimaryPhase階段,也就是寫(xiě)主分片。

Primary Phase

PrimaryPhase.doRun方法里,你會(huì)看到兩行代碼:

final ShardIterator shardIt = shards(observer.observedState(), internalRequest);
final ShardRouting primary = resolvePrimary(shardIt);

其中這個(gè)ShardIterator是類似 shardId->ShardGroup 的結(jié)構(gòu)。不管這個(gè)shardId是什么,它一定是個(gè)Replication或者Primary的shardId, ShardGroup 就是Replication和Primary的集合。resolvePrimary方法則是遍歷這個(gè)集合,然后找出Primary的過(guò)程。

知道Primary后就可以判斷是轉(zhuǎn)發(fā)到別的Node或者直接在本Node處理了:

routeRequestOrPerformLocally(primary, shardIt);

如果Primary就在本節(jié)點(diǎn),直接就處理了:

//我去掉了一些無(wú)關(guān)代碼哈
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
                try {
                    threadPool.executor(executor).execute(new AbstractRunnable() {
                         @Override
                        protected void doRun() throws Exception {
                            performOnPrimary(primary, shardsIt);
                        }
            }

這里用上了線程池。前面對(duì)每個(gè)shardId對(duì)應(yīng)的數(shù)據(jù)集合做處理,其實(shí)是順序循環(huán)執(zhí)行的,這里實(shí)現(xiàn)了將數(shù)據(jù)處理異步化。

performOnPrimary方法中,BulkShardRequest被轉(zhuǎn)化成了PrimaryOperationRequest,理由也很簡(jiǎn)單,更加specific了,因?yàn)榫褪轻槍?duì)主分片的Request。接著進(jìn)入shardOperationOnPrimary 方法,該方法是在孫子類TransportShardBulkAction類里實(shí)現(xiàn)的。

protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(
ClusterState clusterState, 
PrimaryOperationRequest shardRequest) {

到該方法,有兩個(gè)比較重要的概念會(huì)出現(xiàn):

//偉大的版本號(hào),實(shí)現(xiàn)了對(duì)并發(fā)修改的支持
long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
//事物日志,為Shard Recovery以及
//避免過(guò)多的Index Commit做出突出貢獻(xiàn),
//同時(shí)也是是實(shí)現(xiàn)了GetById的實(shí)時(shí)性
Translog.Location location = null;

上面兩個(gè)概念成就了ES從一個(gè)簡(jiǎn)單的全文檢索引擎到類No-SQL的轉(zhuǎn)型(好吧,我好像又扯遠(yuǎn)了)

接著就是for循環(huán)了:

//這里的request是BulkShardRequest
//對(duì)應(yīng)的items則是BulkItemRequest集合
for (int requestIndex = 0;
 requestIndex < request.items().length; 
requestIndex++) {

循環(huán)會(huì)根據(jù)BulkItemRequest的不同類型而有了分支。其實(shí)就是IndexRequest,DeleteRequest,UpdateRequest,我們這里依然只討論IndexRequest。如果發(fā)現(xiàn)BulkItemRequestIndexRequest,進(jìn)行如下操作:

WriteResult<IndexResponse> result = shardIndexOperation(request, 
indexRequest, 
clusterState, 
indexShard, 
true);

shardIndexOperation里嵌套的核心方法是executeIndexRequestOnPrimary,該方法第一步是獲取到Operation對(duì)象,

Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);

Engine對(duì)象是比較底層的一個(gè)對(duì)象了,是對(duì)Lucene的IndexWriter,Searcher之類的封裝。這里的Engine.IndexingOperation對(duì)應(yīng)的是Create或者Index類。你可以把這兩個(gè)類理解為待索引的Document,只是還帶上了動(dòng)作。

第二步是判斷索引的Mapping是不是要?jiǎng)討B(tài)更新,如果是,則更新。

第三步執(zhí)行實(shí)際的建索引操作:

final boolean created = operation.execute(indexShard);

operation.execute 額外引出的話題

我們會(huì)暫時(shí)深入到operate.execute方法里,但這個(gè)不是主線,看完后記得回到上面那行代碼上。

剛才我們說(shuō)了operation可能是Create或者Index,我們會(huì)以Create為主線進(jìn)行分析。所謂CreateIndex,你可以理解為一個(gè)待索引的Document,只是帶上動(dòng)作的語(yǔ)義。

上面對(duì)應(yīng)的execute 方法簽名是:

@Overridepublic boolean execute(IndexShard shard) {     shard.create(this);   
 return true;
}

我們看到這里是反向調(diào)用indexShard對(duì)象的create方法來(lái)進(jìn)行索引的創(chuàng)建。我們來(lái)看看IndexShardcreate方法:

//我依然做了刪減,體現(xiàn)一些核心代碼
public void create(Engine.Create create) {        
        engine().create(create);
    }

engine()方法返回的是InternalEngine實(shí)例,InternalEngine .innerCreate方法執(zhí)行到構(gòu)建索引的操作。這個(gè)方法值得分析一下,所以我就貼了一坨的代碼。

private void innerCreate(Create create) throws IOException {
        if (engineConfig.isOptimizeAutoGenerateId() && create.autoGeneratedId() && !create.canHaveDuplicates()) {
            // We don't need to lock because this ID cannot be concurrently updated:
            innerCreateNoLock(create, Versions.NOT_FOUND, null);
        } else {
            synchronized (dirtyLock(create.uid())) {
                final long currentVersion;
                final VersionValue versionValue;
                versionValue = versionMap.getUnderLock(create.uid().bytes());
                if (versionValue == null) {
                    currentVersion = loadCurrentVersionFromIndex(create.uid());
                } else {
                    if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                        currentVersion = Versions.NOT_FOUND; // deleted, and GC
                    } else {
                        currentVersion = versionValue.version();
                    }
                }
                innerCreateNoLock(create, currentVersion, versionValue);
            }
        }
    }

首先,如果滿足如下三個(gè)條件就無(wú)需進(jìn)行版本檢查:

  1. index.optimize_auto_generated_id 被設(shè)置為true(默認(rèn)是false,話說(shuō)注釋上說(shuō)是默認(rèn)是true,但是我看著覺(jué)得像是false)
  2. id設(shè)置為自動(dòng)生成(沒(méi)有人工設(shè)置id)
  3. create.canHaveDuplicates == false ,該參數(shù)一般是false

提這個(gè)是主要為了說(shuō)明,譬如一般的運(yùn)維日志啥的,就不要自己生成ID了,采用自動(dòng)生成的ID,可以跳過(guò)版本檢查,從而提高入庫(kù)的效率。

第二個(gè)指的說(shuō)的是,如果對(duì)應(yīng)文檔在緩存中沒(méi)有找到(versionMap),那么就會(huì)由如下的代碼執(zhí)行實(shí)際磁盤(pán)查詢操作:

currentVersion = loadCurrentVersionFromIndex(create.uid());

通過(guò)對(duì)比create對(duì)象里的版本號(hào)和從索引文件里加載的版本號(hào) ,最終決定是進(jìn)行update還是create操作。

innerCreateNoLock 方法里,你會(huì)看到熟悉的Lucene操作,譬如:

indexWriter.addDocument(index.docs().get(0));
//或者
indexWriter.updateDocument(index.uid(), index.docs().get(0));

現(xiàn)在回到TransportShardBulkAction的主線上。執(zhí)行完下面的代碼后:

final boolean created = operation.execute(indexShard);

就能獲得對(duì)應(yīng)文檔的版本等信息,這些信息會(huì)更新對(duì)應(yīng)的IndexRequest等對(duì)象。

到目前為止,Primay Phase 完成,接著開(kāi)始Replication Phase

replicationPhase = new ReplicationPhase(shardsIt, 
primaryResponse.v2(), 
primaryResponse.v1(), 
observer, 
primary, 
internalRequest, 
listener, 
indexShardReference);
finishAndMoveToReplication(replicationPhase);

最后一行代碼會(huì)啟動(dòng)replicationPhase階段。

Replication Phase

Replication Phase 流程大致和Primary Phase 相同,就不做過(guò)詳細(xì)的解決,我這里簡(jiǎn)單提及一下。

ReplicationPhasedoRun方法是入口,核心方法是performOnReplica,如果發(fā)現(xiàn)Replication shardId所屬的節(jié)點(diǎn)就是自己的話,異步執(zhí)行shardOperationOnReplica,大體邏輯如下:

threadPool.executor(executor).execute(new AbstractRunnable() {
                        @Override
                        protected void doRun() {
                            try {
                                shardOperationOnReplica(shard.shardId(), replicaRequest);
                                onReplicaSuccess();
                            } catch (Throwable e) {
                                onReplicaFailure(nodeId, e);
                                failReplicaIfNeeded(shard.index(), shard.id(), e);
                            }
                        }

在Replication階段,shardOperationOnReplica 該方法完成了索引內(nèi)容解析,mapping動(dòng)態(tài)新增,最后進(jìn)入索引(和就是前面提到的operation.execute)等動(dòng)作,所以還是比Primary 階段更緊湊些。

另外,在Primary Phase 和 Replication Phase, 一個(gè)BulkShardRequest 處理完成后(也就是一個(gè)Shard 對(duì)應(yīng)的數(shù)據(jù)集合)才會(huì)刷寫(xiě)Translog日志。所以如果發(fā)生數(shù)據(jù)丟失,則可能是多條數(shù)據(jù)。

總結(jié)

這篇文章以流程分析為主,很多細(xì)節(jié)我們依然沒(méi)有講解詳細(xì),比如Translog和Version。這些爭(zhēng)取能夠在后續(xù)文章中進(jìn)一步闡述。另外錯(cuò)誤之處在所難免,請(qǐng)大家在評(píng)論處提出。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Elasticsearch 架構(gòu)以及源碼概覽 Elasticsearch是最近兩年異軍突起的一個(gè)兼有搜索引擎和No...
    meng_philip123閱讀 4,419評(píng)論 1 36
  • Elasticsearch 架構(gòu)以及源碼概覽 Elasticsearch是最近兩年異軍突起的一個(gè)兼有搜索引擎和No...
    meng_philip123閱讀 2,441評(píng)論 2 47
  • Solr&ElasticSearch原理及應(yīng)用 一、綜述 搜索 http://baike.baidu.com/it...
    樓外樓V閱讀 7,637評(píng)論 1 17
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • ' 今天托付老師發(fā)了這階段孩子在托付學(xué)習(xí)階段測(cè)試視頻,孩子得了托付班測(cè)試成績(jī)比較優(yōu)秀的,掌握知識(shí)點(diǎn)比較多的...
    兆木兆木閱讀 104評(píng)論 0 0

友情鏈接更多精彩內(nèi)容