「Elasticsearch」ES重建索引怎么才能做到數(shù)據(jù)無縫遷移呢?

背景

眾所周知,Elasticsearch是?個實時的分布式搜索引擎,為?戶提供搜索服務(wù)。當我們決定存儲某種數(shù)據(jù),在創(chuàng)建索引的時候就需要將數(shù)據(jù)結(jié)構(gòu),即Mapping確定下來,于此同時索引的設(shè)定和很多固定配置將不能改變。

那如果后續(xù)業(yè)務(wù)發(fā)生變化,需要改變數(shù)據(jù)結(jié)構(gòu)或者更換ES更換分詞器怎么辦呢?為此,Elastic團隊提供了很多通過輔助?具來幫助開發(fā)?員進?重建索引的方案。
如果對 reindex API 不熟悉,那么在遇到重構(gòu)的時候,必然事倍功半,效率低下。反之,就可以方便地進行索引重構(gòu),省時省力。

步驟

假設(shè)之前我們已經(jīng)存在一個blog索引,因為更換分詞器需要對該索引中的數(shù)據(jù)進行重建索引,以便支持業(yè)務(wù)使用新的分詞規(guī)則搜索數(shù)據(jù),并且盡可能使這個變化對外服務(wù)沒有感知,大概分為以下幾個步驟:?

  • 新增?個索引blog_lastest,Mapping數(shù)據(jù)結(jié)構(gòu)與blog索引一致
  • blog數(shù)據(jù)同步至blog_lastest
  • 刪除blog索引
  • 數(shù)據(jù)同步后給blog_lastest添加別名blog

新建索引

在這里推薦一個ES管理工具Kibana,主要針對數(shù)據(jù)的探索、可視化和分析。

官網(wǎng)
put /blog_lastest/
{
    "mappings":{
        "properties":{
            "title":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "author":{
                "type":"keyword",
                "fields":{
                    "seg":{
                        "type":"text",
                        "analyzer":"ik_max_word"
                    }
                }
            }
        }
    }
}

將舊索引數(shù)據(jù)copy到新索引

同步等待

接?將會在 reindex 結(jié)束后返回

POST /_reindex
{
    "source": {
        "index": "blog"
    },
    "dest": {
        "index": "blog_lastest"
    }
}

kibana 中的使用如下所示

-w706

當然高版本(7.1.1)中,ES都有提供對應(yīng)的Java REST Client,比如

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

為了防止贅述,接下來舉例全部以kibana中請求介紹,如果有需要用Java REST Client,可以自行去ES官網(wǎng)查看。

異步執(zhí)?

如果 reindex 時間過?,建議加上 wait_for_completion=false 的參數(shù)條件,這樣 reindex 將直接返回 taskId。

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog"
    },
    "dest": {
        "index": "blog_lastest"
    }
}

返回:

{
  "task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}

op_type 參數(shù)

op_type 參數(shù)控制著寫入數(shù)據(jù)的沖突處理方式,如果把 op_type 設(shè)置為 create【默認值】,在 _reindex API 中,表示寫入時只在 dest index中添加不存在的 doucment,如果相同的 document 已經(jīng)存在,則會報 version confilct 的錯誤,那么索引操作就會失敗?!具@種方式與使用 _create API 時效果一致】

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  }
}

如果這樣設(shè)置了,也就不存在更新數(shù)據(jù)的場景了【沖突數(shù)據(jù)無法寫入】,我們也可以把 op_type 設(shè)置為 index,表示所有的數(shù)據(jù)全部重新索引創(chuàng)建。

conflicts 配置

默認情況下,當發(fā)生 version conflict 的時候,_reindex 會被 abort,任務(wù)終止【此時數(shù)據(jù)還沒有 reindex 完成】,在返回體中的 failures 指標中會包含沖突的數(shù)據(jù)【有時候數(shù)據(jù)會非常多】,除非把 conflicts 設(shè)置為 proceed。

關(guān)于 abort 的說明,如果產(chǎn)生了 abort,已經(jīng)執(zhí)行的數(shù)據(jù)【例如更新寫入的】仍然存在于目標索引,此時任務(wù)終止,還會有數(shù)據(jù)沒有被執(zhí)行,也就是漏數(shù)了。換句話說,該執(zhí)行過程不會回滾,只會終止。如果設(shè)置了 proceed,任務(wù)在檢測到數(shù)據(jù)沖突的情況下,不會終止,會跳過沖突數(shù)據(jù)繼續(xù)執(zhí)行,直到所有數(shù)據(jù)執(zhí)行完成,此時不會漏掉正常的數(shù)據(jù),只會漏掉有沖突的數(shù)據(jù)。

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  },
  "conflicts": "proceed"
}

我們可以故意把 op_type 設(shè)置為 create,人為制造數(shù)據(jù)沖突的場景,測試時更容易觀察到?jīng)_突現(xiàn)象。

如果把 conflicts 設(shè)置為 proceed,在返回體結(jié)果中不會再出現(xiàn) failures 的信息,但是通過 version_conflicts 指標可以看到具體的數(shù)量。

批次大小配置

當你發(fā)現(xiàn)reindex的速度有些慢的時候,可以在 query 參數(shù)的同一層次【即 source 參數(shù)中】添加 size 參數(shù),表示 scroll size 的大小【會影響批次的次數(shù),進而影響整體的速度】,如果不顯式設(shè)置,默認是一批 1000 條數(shù)據(jù),在一開始的簡單示例中也看到了。
如下,設(shè)置 scroll size 為 5000:

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog",
        "size":5000
    },
    "dest": {
        "index": "blog_lastest",
        "op_type": "create"
    },
    "conflicts": "proceed"
}

測試后,速度達到了 30 分鐘 500 萬左右,明顯提升了很多。

根據(jù)taskId可以實時查看任務(wù)的執(zhí)行狀態(tài)

一般來說,如果我們的 source index 很大【比如幾百萬數(shù)據(jù)量】,則可能需要比較長的時間來完成 _reindex 的工作,可能需要幾十分鐘。而在此期間不可能一直等待結(jié)果返回,可以去做其它事情,如果中途需要查看進度,可以通過 _tasks API 進行查看。

GET /_tasks/{taskId}

返回:

{
  "completed" : false,
  "task" : {
    "node" : "dpBihNSMQfSlboMGlTgCBA",
    "id" : 4704218,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    ……
}

當執(zhí)行完畢時,completed為true
查看任務(wù)進度以及取消任務(wù),除了根據(jù)taskId查看以外,我們還可以通過查看所有的任務(wù)中篩選本次reindex的任務(wù)。

GET _tasks?detailed=true&actions=*reindex

返回結(jié)果:

{
  "nodes" : {
    "dpBihNSMQfSlboMGlTgCBA" : {
      "name" : "node-16111-9210",
      "transport_address" : "192.168.XXX.XXX:9310",
      "host" : "192.168.XXX.XXX",
      "ip" : "192.168.16.111:9310",
      "roles" : [
        "ingest",
        "master"
      ],
      "attributes" : {
        "xpack.installed" : "true",
        "transform.node" : "false"
      },
      "tasks" : {
        "dpBihNSMQfSlboMGlTgCBA:6629305" : {
          "node" : "dpBihNSMQfSlboMGlTgCBA",
          "id" : 6629305,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {
            "total" : 8361421,
            "updated" : 0,
            "created" : 254006,
            "deleted" : 0,
            "batches" : 743,
            "version_conflicts" : 3455994,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until_millis" : 0
          },
          "description" : "reindex from [blog] to [blog_lastest][_doc]",
          "start_time_in_millis" : 1609338953464,
          "running_time_in_nanos" : 1276738396689,
          "cancellable" : true,
          "headers" : { }
        }
      }
    }
  }
}

注意觀察里面的幾個重要指標,例如從 description 中可以看到任務(wù)描述,從 tasks 中可以找到任務(wù)的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,從 cancellable 可以判斷任務(wù)是否支持取消操作。
這個 API 其實就是模糊匹配,同理也可以查詢其它類型的任務(wù)信息,例如使用 GET _tasks?detailed=true&actions=*byquery 查看查詢請求的狀態(tài)。
當集群的任務(wù)太多時我們就可以根據(jù)task_id,也就是上面提到GET /_tasks/task_id 方式更加準確地查詢指定任務(wù)的狀態(tài),避免集群的任務(wù)過多,不方便查看。
如果遇到操作失誤的場景,想取消任務(wù),有沒有辦法呢?
當然有啦,雖然覆水難收,通過調(diào)用
_tasks API

POST _tasks/task_id/_cancel

這里的 task_id 就是通過上面的查詢?nèi)蝿?wù)接口獲取的任務(wù)id(任務(wù)要支持取消操作,即【cancellable 為 true】時方能收效)。

刪除舊索引

當我們通過 API 查詢發(fā)現(xiàn)任務(wù)完成后,就可以進行后續(xù)操作,我這里是要刪除舊索引,然后再給新索引起別名,用于替換舊索引,這樣才能保證對外服務(wù)沒有任何感知。

DELETE /blog

使用別名

POST /_aliases
{
    "actions":[
        {
            "add":{
                "index":"blog_lastest",
                "alias":"blog"
            }
        }
    ]
}

通過別名訪問新索引

進行過以上操作后,我們可以使用一個簡單的搜索驗證服務(wù)。

POST /blog/_search
{
    "query": {
        "match": {
            "author": "james"
        }
    }
}

如果搜索結(jié)果達到我們的預(yù)期目標,至此,數(shù)據(jù)索引重建遷移完成。

本文可轉(zhuǎn)載,但需聲明原文出處。 程序員小明,一個很少加班的程序員。歡迎關(guān)注微信公眾號“程序員小明”,獲取更多優(yōu)質(zhì)文章。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容