ES的讀寫流程

原文鏈接:https://yq.aliyun.com/articles/581875?spm=a2c4e.11153959.blogcont576223.9.4b5f7ba228akfF

目前的Elasticsearch有兩個明顯的身份,一個是分布式搜索系統(tǒng),另一個是分布式NoSQL數(shù)據(jù)庫,對于這兩種不同的身份,讀寫語義基本類似,但也有一點差異。

werite1.png

寫操作

  • 實時性:

    • 搜索系統(tǒng)的Index一般都是NRT(Near Real Time),近實時的,比如Elasticsearch中,Index的實時性是由refresh控制的,默認是1s,最快可到100ms,那么也就意味著Index doc成功后,需要等待一秒鐘后才可以被搜索到。
    • NoSQL數(shù)據(jù)庫的Write基本都是RT(Real Time),實時的,寫入成功后,立即是可見的。Elasticsearch中的Index請求也能保證是實時的,因為Get請求會直接讀內存中尚未Flush到存儲介質的TransLog。
  • 可靠性:

    • 搜索系統(tǒng)對可靠性要求都不高,一般數(shù)據(jù)的可靠性通過將原始數(shù)據(jù)存儲在另一個存儲系統(tǒng)來保證,當搜索系統(tǒng)的數(shù)據(jù)發(fā)生丟失時,再從其他存儲系統(tǒng)導一份數(shù)據(jù)過來重新rebuild就可以了。在Elasticsearch中,通過設置TransLog的Flush頻率可以控制可靠性,要么是按請求,每次請求都Flush;要么是按時間,每隔一段時間Flush一次。一般為了性能考慮,會設置為每隔5秒或者1分鐘Flush一次,F(xiàn)lush間隔時間越長,可靠性就會越低。
    • NoSQL數(shù)據(jù)庫作為一款數(shù)據(jù)庫,必須要有很高的可靠性,數(shù)據(jù)可靠性是生命底線,決不能有閃失。如果把Elasticsearch當做NoSQL數(shù)據(jù)庫,此時需要設置TransLog的Flush策略為每個請求都要Flush,這樣才能保證當前Shard寫入成功后,數(shù)據(jù)能盡量持久化下來。

上面簡單介紹了下NoSQL數(shù)據(jù)庫和搜索系統(tǒng)的一些異同,我們會在后面有一篇文章,專門用來介紹Elasticsearch作為NoSQL數(shù)據(jù)庫時的一些局限和特點。

讀操作

下一篇《Elasticsearch內核解析 - 查詢篇》中再詳細介紹。

上面大概對比了下搜索和NoSQL在寫方面的特點,接下來,我們看一下Elasticsearch 6.0.0版本中寫入流程都做了哪些事情,希望能對大家有用。

寫操作的關鍵點

在考慮或分析一個分布式系統(tǒng)的寫操作時,一般需要從下面幾個方面考慮:

  • 可靠性:或者是持久性,數(shù)據(jù)寫入系統(tǒng)成功后,數(shù)據(jù)不會被回滾或丟失。
  • 一致性:數(shù)據(jù)寫入成功后,再次查詢時必須能保證讀取到最新版本的數(shù)據(jù),不能讀取到舊數(shù)據(jù)。
  • 原子性:一個寫入或者更新操作,要么完全成功,要么完全失敗,不允許出現(xiàn)中間狀態(tài)。
  • 隔離性:多個寫入操作相互不影響。
  • 實時性:寫入后是否可以立即被查詢到。
  • 性能:寫入性能,吞吐量到底怎么樣。

Elasticsearch作為分布式系統(tǒng),也需要在寫入的時候滿足上述的四個特點,我們在后面的寫流程介紹中會涉及到上述四個方面。

接下來,我們一層一層剖析Elasticsearch內部的寫機制。

Lucene的寫

眾所周知,Elasticsearch內部使用了Lucene完成索引創(chuàng)建和搜索功能,Lucene中寫操作主要是通過IndexWriter類實現(xiàn),IndexWriter提供三個接口:

 public long addDocument(); public long updateDocuments(); public long deleteDocuments();

通過這三個接口可以完成單個文檔的寫入,更新和刪除功能,包括了分詞,倒排創(chuàng)建,正排創(chuàng)建等等所有搜索相關的流程。只要Doc通過IndesWriter寫入后,后面就可以通過IndexSearcher搜索了,看起來功能已經(jīng)完善了,但是仍然有一些問題沒有解:

  • 上述操作是單機的,而不是我們需要的分布式。
  • 文檔寫入Lucene后并不是立即可查詢的,需要生成完整的Segment后才可被搜索,如何保證實時性?
  • Lucene生成的Segment是在內存中,如果機器宕機或掉電后,內存中的Segment會丟失,如何保證數(shù)據(jù)可靠性 ?
  • Lucene不支持部分文檔更新,但是這又是一個強需求,如何支持部分更新?

上述問題,在Lucene中是沒有解決的,那么就需要Elasticsearch中解決上述問題。

Elasticsearch在解決上述問題時,除了我們在上一篇《Elasticsearch數(shù)據(jù)模型簡介》中介紹的幾種系統(tǒng)字段外,在引擎架構上也引入了多重機制來解決問題。我們再來看Elasticsearch中的寫機制。

Elasticsearch的寫

Elasticsearch采用多Shard方式,通過配置routing規(guī)則將數(shù)據(jù)分成多個數(shù)據(jù)子集,每個數(shù)據(jù)子集提供獨立的索引和搜索功能。當寫入文檔的時候,根據(jù)routing規(guī)則,將文檔發(fā)送給特定Shard中建立索引。這樣就能實現(xiàn)分布式了。

此外,Elasticsearch整體架構上采用了一主多副的方式:

write2.png

每個Index由多個Shard組成,每個Shard有一個主節(jié)點和多個副本節(jié)點,副本個數(shù)可配。但每次寫入的時候,寫入請求會先根據(jù)_routing規(guī)則選擇發(fā)給哪個Shard,Index Request中可以設置使用哪個Filed的值作為路由參數(shù),如果沒有設置,則使用Mapping中的配置,如果mapping中也沒有配置,則使用_id作為路由參數(shù),然后通過_routing的Hash值選擇出Shard(在OperationRouting類中),最后從集群的Meta中找出出該Shard的Primary節(jié)點。

請求接著會發(fā)送給Primary Shard,在Primary Shard上執(zhí)行成功后,再從Primary Shard上將請求同時發(fā)送給多個Replica Shard,請求在多個Replica Shard上執(zhí)行成功并返回給Primary Shard后,寫入請求執(zhí)行成功,返回結果給客戶端。

這種模式下,寫入操作的延時就等于latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,寫入延時最小也是兩次單Shard的寫入時延總和,寫入效率會較低,但是這樣的好處也很明顯,避免寫入后,單機或磁盤故障導致數(shù)據(jù)丟失,在數(shù)據(jù)重要性和性能方面,一般都是優(yōu)先選擇數(shù)據(jù),除非一些允許丟數(shù)據(jù)的特殊場景。

采用多個副本后,避免了單機或磁盤故障發(fā)生時,對已經(jīng)持久化后的數(shù)據(jù)造成損害,但是Elasticsearch里為了減少磁盤IO保證讀寫性能,一般是每隔一段時間(比如5分鐘)才會把Lucene的Segment寫入磁盤持久化,對于寫入內存,但還未Flush到磁盤的Lucene數(shù)據(jù),如果發(fā)生機器宕機或者掉電,那么內存中的數(shù)據(jù)也會丟失,這時候如何保證?

對于這種問題,Elasticsearch學習了數(shù)據(jù)庫中的處理方式:增加CommitLog模塊,Elasticsearch中叫TransLog。

write0.png

在每一個Shard中,寫入流程分為兩部分,先寫入Lucene,再寫入TransLog。

寫入請求到達Shard后,先寫Lucene文件,創(chuàng)建好索引,此時索引還在內存里面,接著去寫TransLog,寫完TransLog后,刷新TransLog數(shù)據(jù)到磁盤上,寫磁盤成功后,請求返回給用戶。這里有幾個關鍵點,一是和數(shù)據(jù)庫不同,數(shù)據(jù)庫是先寫CommitLog,然后再寫內存,而Elasticsearch是先寫內存,最后才寫TransLog,一種可能的原因是Lucene的內存寫入會有很復雜的邏輯,很容易失敗,比如分詞,字段長度超過限制等,比較重,為了避免TransLog中有大量無效記錄,減少recover的復雜度和提高速度,所以就把寫Lucene放在了最前面。二是寫Lucene內存后,并不是可被搜索的,需要通過Refresh把內存的對象轉成完整的Segment后,然后再次reopen后才能被搜索,一般這個時間設置為1秒鐘,導致寫入Elasticsearch的文檔,最快要1秒鐘才可被從搜索到,所以Elasticsearch在搜索方面是NRT(Near Real Time)近實時的系統(tǒng)。三是當Elasticsearch作為NoSQL數(shù)據(jù)庫時,查詢方式是GetById,這種查詢可以直接從TransLog中查詢,這時候就成了RT(Real Time)實時系統(tǒng)。四是每隔一段比較長的時間,比如30分鐘后,Lucene會把內存中生成的新Segment刷新到磁盤上,刷新后索引文件已經(jīng)持久化了,歷史的TransLog就沒用了,會清空掉舊的TransLog。

上面介紹了Elasticsearch在寫入時的兩個關鍵模塊,Replica和TransLog,接下來,我們看一下Update流程:


Write5.png

Lucene中不支持部分字段的Update,所以需要在Elasticsearch中實現(xiàn)該功能,具體流程如下:

  1. 收到Update請求后,從Segment或者TransLog中讀取同id的完整Doc,記錄版本號為V1。
  2. 將版本V1的全量Doc和請求中的部分字段Doc合并為一個完整的Doc,同時更新內存中的VersionMap。獲取到完整Doc后,Update請求就變成了Index請求。
  3. 加鎖。
  4. 再次從versionMap中讀取該id的最大版本號V2,如果versionMap中沒有,則從Segment或者TransLog中讀取,這里基本都會從versionMap中獲取到。
  5. 檢查版本是否沖突(V1==V2),如果沖突,則回退到開始的“Update doc”階段,重新執(zhí)行。如果不沖突,則執(zhí)行最新的Add請求。
  6. 在Index Doc階段,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然后再增加新Doc。寫入Lucene成功后,將當前V3更新到versionMap中。
  7. 釋放鎖,部分更新的流程就結束了。
  8. 介紹完部分更新的流程后,大家應該從整體架構上對Elasticsearch的寫入有了一個初步的映象,接下來我們詳細剖析下寫入的詳細步驟。

Elasticsearch寫入請求類型

Elasticsearch中的寫入請求類型,主要包括下列幾個:Index(Create),Update,Delete和Bulk,其中前3個是單文檔操作,后一個Bulk是多文檔操作,其中Bulk中可以包括Index(Create),Update和Delete。

在6.0.0及其之后的版本中,前3個單文檔操作的實現(xiàn)基本都和Bulk操作一致,甚至有些就是通過調用Bulk的接口實現(xiàn)的。估計接下來幾個版本后,Index(Create),Update,Delete都會被當做Bulk的一種特例化操作被處理。這樣,代碼和邏輯都會更清晰一些。

下面,我們就以Bulk請求為例來介紹寫入流程。

Elasticsearch寫入流程圖

1.png
  • 紅色:Client Node。
  • 綠色:Primary Node。
  • 藍色:Replica Node。

注冊Action

在Elasticsearch中,所有action的入口處理方法都是注冊在ActionModule.java中,比如Bulk Request有兩個注冊入口,分別是Rest和Transport入口:


2.png
3.png

如果請求是Rest請求,則會在RestBulkAction中Parse Request,構造出BulkRequest,然后發(fā)給后面的TransportAction處理。

TransportShardBulkAction的基類TransportReplicationAction中注冊了對Primary,Replica等的不同處理入口:

4.png

這里對原始請求,Primary Node請求和Replica Node請求各自注冊了一個handler處理入口。

Client Node

Client Node 也包括了前面說過的Parse Request,這里就不再贅述了,接下來看一下其他的部分。

1. Ingest Pipeline

在這一步可以對原始文檔做一些處理,比如HTML解析,自定義的處理,具體處理邏輯可以通過插件來實現(xiàn)。在Elasticsearch中,由于Ingest Pipeline會比較耗費CPU等資源,可以設置專門的Ingest Node,專門用來處理Ingest Pipeline邏輯。

如果當前Node不能執(zhí)行Ingest Pipeline,則會將請求發(fā)給另一臺可以執(zhí)行Ingest Pipeline的Node。

2. Auto Create Index

判斷當前Index是否存在,如果不存在,則需要自動創(chuàng)建Index,這里需要和Master交互。也可以通過配置關閉自動創(chuàng)建Index的功能。

3. Set Routing

設置路由條件,如果Request中指定了路由條件,則直接使用Request中的Routing,否則使用Mapping中配置的,如果Mapping中無配置,則使用默認的_id字段值。

在這一步中,如果沒有指定id字段,則會自動生成一個唯一的_id字段,目前使用的是UUID。

4. Construct BulkShardRequest

由于Bulk Request中會包括多個(Index/Update/Delete)請求,這些請求根據(jù)routing可能會落在多個Shard上執(zhí)行,這一步會按Shard挑揀Single Write Request,同一個Shard中的請求聚集在一起,構建BulkShardRequest,每個BulkShardRequest對應一個Shard。

5. Send Request To Primary

這一步會將每一個BulkShardRequest請求發(fā)送給相應Shard的Primary Node。

Primary Node

Primary 請求的入口是在PrimaryOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。

1. Index or Update or Delete

循環(huán)執(zhí)行每個Single Write Request,對于每個Request,根據(jù)操作類型(CREATE/INDEX/UPDATE/DELETE)選擇不同的處理邏輯。

其中,Create/Index是直接新增Doc,Delete是直接根據(jù)_id刪除Doc,Update會稍微復雜些,我們下面就以Update為例來介紹。

2. Translate Update To Index or Delete

這一步是Update操作的特有步驟,在這里,會將Update請求轉換為Index或者Delete請求。首先,會通過GetRequest查詢到已經(jīng)存在的同_id Doc(如果有)的完整字段和值(依賴_source字段),然后和請求中的Doc合并。同時,這里會獲取到讀到的Doc版本號,記做V1。

3. Parse Doc

這里會解析Doc中各個字段。生成ParsedDocument對象,同時會生成uid Term。在Elasticsearch中,_uid = type # _id,對用戶,_Id可見,而Elasticsearch中存儲的是_uid。這一部分生成的ParsedDocument中也有Elasticsearch的系統(tǒng)字段,大部分會根據(jù)當前內容填充,部分未知的會在后面繼續(xù)填充ParsedDocument。

4. Update Mapping

Elasticsearch中有個自動更新Mapping的功能,就在這一步生效。會先挑選出Mapping中未包含的新Field,然后判斷是否運行自動更新Mapping,如果允許,則更新Mapping。

5. Get Sequence Id and Version

由于當前是Primary Shard,則會從SequenceNumber Service獲取一個sequenceID和Version。SequenceID在Shard級別每次遞增1,SequenceID在寫入Doc成功后,會用來初始化LocalCheckpoint。Version則是根據(jù)當前Doc的最大Version遞增1。

6. Add Doc To Lucene

這一步開始的時候會給特定_uid加鎖,然后判斷該_uid對應的Version是否等于之前Translate Update To Index步驟里獲取到的Version,如果不相等,則說明剛才讀取Doc后,該Doc發(fā)生了變化,出現(xiàn)了版本沖突,這時候會拋出一個VersionConflict的異常,該異常會在Primary Node最開始處捕獲,重新從“Translate Update To Index or Delete”開始執(zhí)行。

如果Version相等,則繼續(xù)執(zhí)行,如果已經(jīng)存在同id的Doc,則會調用Lucene的UpdateDocument(uid, doc)接口,先根據(jù)uid刪除Doc,然后再Index新Doc。如果是首次寫入,則直接調用Lucene的AddDocument接口完成Doc的Index,AddDocument也是通過UpdateDocument實現(xiàn)。

這一步中有個問題是,如何保證Delete-Then-Add的原子性,怎么避免中間狀態(tài)時被Refresh?答案是在開始Delete之前,會加一個Refresh Lock,禁止被Refresh,只有等Add完后釋放了Refresh Lock后才能被Refresh,這樣就保證了Delete-Then-Add的原子性。

Lucene的UpdateDocument接口中就只是處理多個Field,會遍歷每個Field逐個處理,處理順序是invert index,store field,doc values,point dimension,后續(xù)會有文章專門介紹Lucene中的寫入。

7. Write Translog

寫完Lucene的Segment后,會以keyvalue的形式寫TransLog,Key是_id,Value是Doc內容。當查詢的時候,如果請求是GetDocByID,則可以直接根據(jù)_id從TransLog中讀取到,滿足NoSQL場景下的實時性要去。

需要注意的是,這里只是寫入到內存的TransLog,是否Sync到磁盤的邏輯還在后面。

這一步的最后,會標記當前SequenceID已經(jīng)成功執(zhí)行,接著會更新當前Shard的LocalCheckPoint。

8. Renew Bulk Request

這里會重新構造Bulk Request,原因是前面已經(jīng)將UpdateRequest翻譯成了Index或Delete請求,則后續(xù)所有Replica中只需要執(zhí)行Index或Delete請求就可以了,不需要再執(zhí)行Update邏輯,一是保證Replica中邏輯更簡單,性能更好,二是保證同一個請求在Primary和Replica中的執(zhí)行結果一樣。

9. Flush Translog

這里會根據(jù)TransLog的策略,選擇不同的執(zhí)行方式,要么是立即Flush到磁盤,要么是等到以后再Flush。Flush的頻率越高,可靠性越高,對寫入性能影響越大。

10. Send Requests To Replicas

這里會將剛才構造的新的Bulk Request并行發(fā)送給多個Replica,然后等待Replica的返回,這里需要等待所有Replica返回后(可能有成功,也有可能失?。?,Primary Node才會返回用戶。如果某個Replica失敗了,則Primary會給Master發(fā)送一個Remove Shard請求,要求Master將該Replica Shard從可用節(jié)點中移除。

這里,同時會將SequenceID,PrimaryTerm,GlobalCheckPoint等傳遞給Replica。

發(fā)送給Replica的請求中,Action Name等于原始ActionName + [R],這里的R表示Replica。通過這個[R]的不同,可以找到處理Replica請求的Handler。

11. Receive Response From Replicas

Replica中請求都處理完后,會更新Primary Node的LocalCheckPoint。

Replica Node

Replica 請求的入口是在ReplicaOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。

1. Index or Delete

根據(jù)請求類型是Index還是Delete,選擇不同的執(zhí)行邏輯。這里沒有Update,是因為在Primary Node中已經(jīng)將Update轉換成了Index或Delete請求了。

2. Parse Doc

3. Update Mapping

以上都和Primary Node中邏輯一致。

4. Get Sequence Id and Version

Primary Node中會生成Sequence ID和Version,然后放入ReplicaRequest中,這里只需要從Request中獲取到就行。

5. Add Doc To Lucene

由于已經(jīng)在Primary Node中將部分Update請求轉換成了Index或Delete請求,這里只需要處理Index和Delete兩種請求,不再需要處理Update請求了。比Primary Node會更簡單一些。

6. Write Translog

7. Flush Translog

以上都和Primary Node中邏輯一致。

最后

上面詳細介紹了Elasticsearch的寫入流程及其各個流程的工作機制,我們在這里再次總結下之前提出的分布式系統(tǒng)中的六大特性:

  1. 可靠性:由于Lucene的設計中不考慮可靠性,在Elasticsearch中通過Replica和TransLog兩套機制保證數(shù)據(jù)的可靠性。
  2. 一致性:Lucene中的Flush鎖只保證Update接口里面Delete和Add中間不會Flush,但是Add完成后仍然有可能立即發(fā)生Flush,導致Segment可讀。這樣就沒法保證Primary和所有其他Replica可以同一時間Flush,就會出現(xiàn)查詢不穩(wěn)定的情況,這里只能實現(xiàn)最終一致性。
  3. 原子性:Add和Delete都是直接調用Lucene的接口,是原子的。當部分更新時,使用Version和鎖保證更新是原子的。
  4. 隔離性:仍然采用Version和局部鎖來保證更新的是特定版本的數(shù)據(jù)。
    實時性:使用定期Refresh Segment到內存,并且Reopen Segment方式保證搜索可以在較短時間(比如1秒)內被搜索到。通過將未刷新到磁盤數(shù)據(jù)記入TransLog,保證對未提交數(shù)據(jù)可以通過ID實時訪問到。
  5. 性能:性能是一個系統(tǒng)性工程,所有環(huán)節(jié)都要考慮對性能的影響,在Elasticsearch中,在很多地方的設計都考慮到了性能,一是不需要所有Replica都返回后才能返回給用戶,只需要返回特定數(shù)目的就行;二是生成的Segment現(xiàn)在內存中提供服務,等一段時間后才刷新到磁盤,Segment在內存這段時間的可靠性由TransLog保證;三是TransLog可以配置為周期性的Flush,但這個會給可靠性帶來傷害;四是每個線程持有一個Segment,多線程時相互不影響,相互獨立,性能更好;五是系統(tǒng)的寫入流程對版本依賴較重,讀取頻率較高,因此采用了versionMap,減少熱點數(shù)據(jù)的多次磁盤IO開銷。Lucene中針對性能做了大量的優(yōu)化。后面我們也會有文章專門介紹Lucene中的優(yōu)化思路。

到此,Elasticsearch的寫入流程介紹完了,下一篇《Elasticsearch內核解析 - 查詢篇》中再見。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容