
1、ES 客戶端選擇一個node節(jié)點發(fā)送寫入請求,這個節(jié)點就是協(xié)調(diào)節(jié)點coordinator node,ES的任意節(jié)點都可以作為協(xié)調(diào)節(jié)點。協(xié)調(diào)節(jié)點處理過程如下:
1.1 處理injest pipeline
查看本次請求是否符合某個pipeline的模式匹配,符合則執(zhí)行pipeline中的邏輯,進行預處理:格式變換、新增字段、設(shè)置某個字段值、字段默認值處理等等。如果當前節(jié)點沒有injest角色,則需要將請求轉(zhuǎn)發(fā)給有injest角色的節(jié)點。
1.2 創(chuàng)建索引
如果開啟了自動創(chuàng)建索引,則索引不存在自動創(chuàng)建,否則報錯。
1.3 routing獲取
獲取請求url或者mapping配置中的_routing信息,如果沒有則使用_id,一般業(yè)務中不手動維護_id,使用ES自動生成的全局唯一ID。
1.4 構(gòu)建BulkShardRequest
Bulk Request中包含多種(Index/Update/Delete)請求,這些請求分別需要到不同的shard上執(zhí)行,因此協(xié)調(diào)節(jié)點,會將請求按照shard分開,同一個shard上的請求聚合到一起,構(gòu)建BulkShardRequest。
1.5 路由請求到primary shard
通過_routing字段按照公式 shard_num = hash(\routing) % num_primary_shards 計算出文檔要分配到的分片,并從集群元數(shù)據(jù)中找出對應primary shard,將請求轉(zhuǎn)發(fā)給primary shard。
1.6 等待primary shard處理返回

2、協(xié)調(diào)節(jié)點通過_routing字段進行路由,找到對應的primary shard,并將請求轉(zhuǎn)發(fā)給primary shard。primary shard處理過程如下:
2.1 判斷操作類型
遍歷bulk請求中的子請求,根據(jù)類型走不同處理邏輯分支。
2.2 update操作轉(zhuǎn)換
將update轉(zhuǎn)換成index和delete操作,獲取文檔的當前內(nèi)容,與update內(nèi)容合并生成新文檔,然后將update請求轉(zhuǎn)換成index請求,此處文檔設(shè)置一個version v1。
2.3 解析文檔
解析文檔的各個字段,添加_uid等ES相關(guān)的系統(tǒng)字段。
2.4 更新Mapping
對于新增字段會根據(jù)dynamic mapping或dynamic template生成對應的mapping,如果mapping中有dynamic mapping相關(guān)設(shè)置則按設(shè)置處理,如忽略或拋出異常。
2.5 獲取sequence Id和Version
從SequcenceNumberService獲取一個sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根據(jù)當前Versoin+1用于防止并發(fā)寫導致數(shù)據(jù)不一致。
2.6 寫入lucene
這一步開始會對文檔uid加鎖,然后判斷uid對應的version v2和之前update轉(zhuǎn)換時的versoin v1是否一致,不一致則返回第二步重新執(zhí)行。 如果version一致,如果同id的doc已經(jīng)存在,則調(diào)用lucene的updateDocument接口,如果是新文檔則調(diào)用lucene的addDoucument. 這里有個問題,如何保證Delete-Then-Add的原子性,ES是通過在Delete之前會加上已refresh鎖,禁止被refresh,只有等待Add完成后釋放了Refresh Lock, 這樣就保證了這個操作的原子性。
2.7 寫入translog
寫入Lucene的Segment后,會以key value的形式寫Translog, Key是Id, Value是Doc的內(nèi)容。當查詢的時候,如果請求的是GetDocById則可以直接根據(jù)_id從translog中獲取。滿足nosql場景的實時性。
2.8 重構(gòu)bulk request
因為primary shard已經(jīng)將update操作轉(zhuǎn)換為index操作或delete操作,因此要對之前的bulkrequest進行調(diào)整,只包含index或delete操作,不需要再進行update的處理操作。
2.9 flush translog
默認情況下,translog要在此處落盤完成,如果對可靠性要求不高,可以設(shè)置translog異步,那么translog的fsync將會異步執(zhí)行,但是落盤前的數(shù)據(jù)有丟失風險。
2.10 發(fā)送請求給replicas
將構(gòu)造好的bulkrequest并發(fā)發(fā)送給各replicas,等待replica返回,這里需要等待所有的replicas返回,響應請求給協(xié)調(diào)節(jié)點。如果某個shard執(zhí)行失敗,則primary會給master發(fā)請求remove該shard。這里會同時把sequenceID, primaryTerm, GlobalCheckPoint等傳遞給replica。
2.11 等待replica響應
當所有的replica返回請求時,更細primary shard的LocalCheckPoint。
3、primary shard完成寫入后,將寫入并發(fā)發(fā)送給各replica, raplica執(zhí)行寫入操作后返回給primary shard, primary shard再將請求返回給協(xié)調(diào)節(jié)點。副本分片的執(zhí)行流程如下(與主分片基本一致):
3.1 判斷操作類型
replica收到的寫如請求只會有add和delete,因update在primary shard上已經(jīng)轉(zhuǎn)換為add或delete了。根據(jù)不同的操作類型執(zhí)行對應的操作
3.2 Parse Doc
3.3 更新mapping
3.4 獲取sequenceId和Version 直接使用primary shard發(fā)送過來的請求中的內(nèi)容即可
3.5 寫入lucene
3.6 寫Translog