背景
眾所周知,Elasticsearch是?個實時的分布式搜索引擎,為?戶提供搜索服務(wù)。當我們決定存儲某種數(shù)據(jù),在創(chuàng)建索引的時候就需要將數(shù)據(jù)結(jié)構(gòu),即Mapping確定下來,于此同時索引的設(shè)定和很多固定配置將不能改變。
那如果后續(xù)業(yè)務(wù)發(fā)生變化,需要改變數(shù)據(jù)結(jié)構(gòu)或者更換ES更換分詞器怎么辦呢?為此,Elastic團隊提供了很多通過輔助?具來幫助開發(fā)?員進?重建索引的方案。
如果對 reindex API 不熟悉,那么在遇到重構(gòu)的時候,必然事倍功半,效率低下。反之,就可以方便地進行索引重構(gòu),省時省力。
步驟
假設(shè)之前我們已經(jīng)存在一個blog索引,因為更換分詞器需要對該索引中的數(shù)據(jù)進行重建索引,以便支持業(yè)務(wù)使用新的分詞規(guī)則搜索數(shù)據(jù),并且盡可能使這個變化對外服務(wù)沒有感知,大概分為以下幾個步驟:?
- 新增?個索引
blog_lastest,Mapping數(shù)據(jù)結(jié)構(gòu)與blog索引一致 - 將
blog數(shù)據(jù)同步至blog_lastest - 刪除
blog索引 - 數(shù)據(jù)同步后給
blog_lastest添加別名blog
新建索引
在這里推薦一個ES管理工具Kibana,主要針對數(shù)據(jù)的探索、可視化和分析。

put /blog_lastest/
{
"mappings":{
"properties":{
"title":{
"type":"text",
"analyzer":"ik_max_word"
},
"author":{
"type":"keyword",
"fields":{
"seg":{
"type":"text",
"analyzer":"ik_max_word"
}
}
}
}
}
}
將舊索引數(shù)據(jù)copy到新索引
同步等待
接?將會在 reindex 結(jié)束后返回
POST /_reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest"
}
}
在 kibana 中的使用如下所示

當然高版本(7.1.1)中,ES都有提供對應(yīng)的Java REST Client,比如
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
為了防止贅述,接下來舉例全部以kibana中請求介紹,如果有需要用Java REST Client,可以自行去ES官網(wǎng)查看。
異步執(zhí)?
如果 reindex 時間過?,建議加上 wait_for_completion=false 的參數(shù)條件,這樣 reindex 將直接返回 taskId。
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest"
}
}
返回:
{
"task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}
op_type 參數(shù)
op_type 參數(shù)控制著寫入數(shù)據(jù)的沖突處理方式,如果把 op_type 設(shè)置為 create【默認值】,在 _reindex API 中,表示寫入時只在 dest index中添加不存在的 doucment,如果相同的 document 已經(jīng)存在,則會報 version confilct 的錯誤,那么索引操作就會失敗?!具@種方式與使用 _create API 時效果一致】
POST _reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
}
}
如果這樣設(shè)置了,也就不存在更新數(shù)據(jù)的場景了【沖突數(shù)據(jù)無法寫入】,我們也可以把 op_type 設(shè)置為 index,表示所有的數(shù)據(jù)全部重新索引創(chuàng)建。
conflicts 配置
默認情況下,當發(fā)生 version conflict 的時候,_reindex 會被 abort,任務(wù)終止【此時數(shù)據(jù)還沒有 reindex 完成】,在返回體中的 failures 指標中會包含沖突的數(shù)據(jù)【有時候數(shù)據(jù)會非常多】,除非把 conflicts 設(shè)置為 proceed。
關(guān)于 abort 的說明,如果產(chǎn)生了 abort,已經(jīng)執(zhí)行的數(shù)據(jù)【例如更新寫入的】仍然存在于目標索引,此時任務(wù)終止,還會有數(shù)據(jù)沒有被執(zhí)行,也就是漏數(shù)了。換句話說,該執(zhí)行過程不會回滾,只會終止。如果設(shè)置了 proceed,任務(wù)在檢測到數(shù)據(jù)沖突的情況下,不會終止,會跳過沖突數(shù)據(jù)繼續(xù)執(zhí)行,直到所有數(shù)據(jù)執(zhí)行完成,此時不會漏掉正常的數(shù)據(jù),只會漏掉有沖突的數(shù)據(jù)。
POST _reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
我們可以故意把 op_type 設(shè)置為 create,人為制造數(shù)據(jù)沖突的場景,測試時更容易觀察到?jīng)_突現(xiàn)象。
如果把 conflicts 設(shè)置為 proceed,在返回體結(jié)果中不會再出現(xiàn) failures 的信息,但是通過 version_conflicts 指標可以看到具體的數(shù)量。
批次大小配置
當你發(fā)現(xiàn)reindex的速度有些慢的時候,可以在 query 參數(shù)的同一層次【即 source 參數(shù)中】添加 size 參數(shù),表示 scroll size 的大小【會影響批次的次數(shù),進而影響整體的速度】,如果不顯式設(shè)置,默認是一批 1000 條數(shù)據(jù),在一開始的簡單示例中也看到了。
如下,設(shè)置 scroll size 為 5000:
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog",
"size":5000
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
測試后,速度達到了 30 分鐘 500 萬左右,明顯提升了很多。
根據(jù)taskId可以實時查看任務(wù)的執(zhí)行狀態(tài)
一般來說,如果我們的 source index 很大【比如幾百萬數(shù)據(jù)量】,則可能需要比較長的時間來完成 _reindex 的工作,可能需要幾十分鐘。而在此期間不可能一直等待結(jié)果返回,可以去做其它事情,如果中途需要查看進度,可以通過 _tasks API 進行查看。
GET /_tasks/{taskId}
返回:
{
"completed" : false,
"task" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 4704218,
"type" : "transport",
"action" : "indices:data/write/reindex",
……
}
當執(zhí)行完畢時,completed為true
查看任務(wù)進度以及取消任務(wù),除了根據(jù)taskId查看以外,我們還可以通過查看所有的任務(wù)中篩選本次reindex的任務(wù)。
GET _tasks?detailed=true&actions=*reindex
返回結(jié)果:
{
"nodes" : {
"dpBihNSMQfSlboMGlTgCBA" : {
"name" : "node-16111-9210",
"transport_address" : "192.168.XXX.XXX:9310",
"host" : "192.168.XXX.XXX",
"ip" : "192.168.16.111:9310",
"roles" : [
"ingest",
"master"
],
"attributes" : {
"xpack.installed" : "true",
"transform.node" : "false"
},
"tasks" : {
"dpBihNSMQfSlboMGlTgCBA:6629305" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 6629305,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"total" : 8361421,
"updated" : 0,
"created" : 254006,
"deleted" : 0,
"batches" : 743,
"version_conflicts" : 3455994,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0
},
"description" : "reindex from [blog] to [blog_lastest][_doc]",
"start_time_in_millis" : 1609338953464,
"running_time_in_nanos" : 1276738396689,
"cancellable" : true,
"headers" : { }
}
}
}
}
}
注意觀察里面的幾個重要指標,例如從 description 中可以看到任務(wù)描述,從 tasks 中可以找到任務(wù)的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,從 cancellable 可以判斷任務(wù)是否支持取消操作。
這個 API 其實就是模糊匹配,同理也可以查詢其它類型的任務(wù)信息,例如使用 GET _tasks?detailed=true&actions=*byquery 查看查詢請求的狀態(tài)。
當集群的任務(wù)太多時我們就可以根據(jù)task_id,也就是上面提到GET /_tasks/task_id 方式更加準確地查詢指定任務(wù)的狀態(tài),避免集群的任務(wù)過多,不方便查看。
如果遇到操作失誤的場景,想取消任務(wù),有沒有辦法呢?
當然有啦,雖然覆水難收,通過調(diào)用
_tasks API:
POST _tasks/task_id/_cancel
這里的 task_id 就是通過上面的查詢?nèi)蝿?wù)接口獲取的任務(wù)id(任務(wù)要支持取消操作,即【cancellable 為 true】時方能收效)。
刪除舊索引
當我們通過 API 查詢發(fā)現(xiàn)任務(wù)完成后,就可以進行后續(xù)操作,我這里是要刪除舊索引,然后再給新索引起別名,用于替換舊索引,這樣才能保證對外服務(wù)沒有任何感知。
DELETE /blog
使用別名
POST /_aliases
{
"actions":[
{
"add":{
"index":"blog_lastest",
"alias":"blog"
}
}
]
}
通過別名訪問新索引
進行過以上操作后,我們可以使用一個簡單的搜索驗證服務(wù)。
POST /blog/_search
{
"query": {
"match": {
"author": "james"
}
}
}
如果搜索結(jié)果達到我們的預(yù)期目標,至此,數(shù)據(jù)索引重建遷移完成。
本文可轉(zhuǎn)載,但需聲明原文出處。 程序員小明,一個很少加班的程序員。歡迎關(guān)注微信公眾號“程序員小明”,獲取更多優(yōu)質(zhì)文章。