一、前言
最近工作中有這樣一個ElasticSearch(以下簡稱ES)寫入的場景,F(xiàn)link處理完數(shù)據(jù)實(shí)時寫入ES?,F(xiàn)在需要將一批歷史數(shù)據(jù)通過Flink加載到到ES,有兩個點(diǎn)需要保證:
- 對于歷史數(shù)據(jù),ES已有文檔,則舍棄舊數(shù)據(jù),ES沒有則插入歷史數(shù)據(jù)。
- 對于新數(shù)據(jù),能對現(xiàn)有的ES數(shù)據(jù)進(jìn)行更新。
參考ElasticSearch進(jìn)階篇(一)--版本控制,可以使用ES的版本實(shí)現(xiàn)該需求的開發(fā)。
二、代碼實(shí)現(xiàn)及驗(yàn)證
代碼實(shí)現(xiàn)
請求寫數(shù)據(jù)時加入version和version_type參數(shù),主要代碼如下:
IndexRequest indexRequest = Requests.indexRequest()
.index(indexName)
.id("1")
// 指定版本比較的業(yè)務(wù)字段,具體業(yè)務(wù)具體分析,一般取時間戳較為合適
.version(Long.parseLong(dataMap1.get("create").toString()))
// 指定使用外部版本號
.versionType(VersionType.EXTERNAL)
.source(dataMap);
驗(yàn)證
驗(yàn)證demo可使用當(dāng)前時間的時間戳作為版本比較依據(jù)。驗(yàn)證思路如下:
- 運(yùn)行demo程序,在當(dāng)前時間戳下,插入一條數(shù)據(jù),通過kibana等工具檢驗(yàn)數(shù)據(jù)是否插入成功。并記錄當(dāng)前的時間戳。
- 更改某些字段值對數(shù)據(jù)進(jìn)行更新,再次運(yùn)行程序,檢驗(yàn)數(shù)據(jù)是否更新成功。
- 將時間版本比較的字段值固定為第一次執(zhí)行程序的時間戳,檢驗(yàn)數(shù)據(jù)是否更新成功。
驗(yàn)證結(jié)果如下圖:

insert_success.png

update_success.png

version_conflict.png
三、總結(jié)
由截圖可看到,第一步和第二步都能執(zhí)行成功,第三步執(zhí)行會出現(xiàn)版本沖突的異常,根據(jù)提示很方便能識別出原因,即ElasticSearch進(jìn)階篇(一)--版本控制中得出的結(jié)論,使用version和version_type=EXTERNAL進(jìn)行版本控制時,只有要寫入文檔的版本號大于已有文檔的版本號才能更新成功。
案例代碼參考:elasticsearch_demo