ES集群數(shù)據(jù)遷移

記錄一次ES數(shù)據(jù)庫的遷移

起因是由于老庫中的模板映射不是我們想要的模板,需要將數(shù)據(jù)遷移到新的ES集群中,新集群已經(jīng)執(zhí)行了我們想要的模板

PS:由于之前沒有整過整過東西,所以,走了不少的彎路,研究好幾種方法

Elasticsearch-Migration方法

elasticsearch-migration支持:多個版本間的數(shù)據(jù)遷移,使用scroll+bulk的接口原理
1.安裝和使用

elasticsearch-migration支持linux,windows等不同系統(tǒng),下載解壓后可以直接運(yùn)行
安裝:

源碼地址:https://github.com/medcl/esm
編譯好的地址:https://github.com/medcl/elasticsearch-dump/releases

有各個版本可以選擇
直接下載下來在選定目錄解壓即可

使用示例

./esm -s http://IP:9200 -d http://IP:9200/ -x 索引名稱 -w=5 -b=10 -c 10000
主要幾個參數(shù):
索引名稱可以使用*進(jìn)行模糊匹配

-w 表示線程數(shù)

-b 表示一次bulk請求數(shù)據(jù)大小,單位MB默認(rèn) 5M

-c 一次scroll請求數(shù)量

實(shí)際使用下來,在測試環(huán)境的話,對集群數(shù)據(jù)進(jìn)行遷移,只需要將源和目標(biāo)設(shè)置為兩個集群的主節(jié)點(diǎn)就行了,


1589337148(1).png

但是有兩個集群的服務(wù)器的配置必須對等,如果服務(wù)器配置不均衡的話,會出現(xiàn)數(shù)據(jù)積壓,導(dǎo)致程序中斷。
還有這個工具是用GO編寫的,在部分環(huán)境不支持的服務(wù)器上面還用不了,所以最終沒有采用這種方法。

設(shè)置快照鏡像導(dǎo)入的方法

這個方法使用于大數(shù)據(jù)量的遷移

首先,設(shè)置一個目錄,這是一個備份的存儲目錄,放在源數(shù)據(jù)的服務(wù)器上
sudo mkdir /data/es/data_backup
# 該目錄要是elasticsearch的啟動用戶可訪問的,我們的環(huán)境一般是普通用戶(如:centos)
chown -R centos:centos /data/es/data_backup 
修改elasticsearch的配置文件 elasticsearch.yml
vi ../../elasticsearch.yml
#在文件最后添加一條
path.repo: ["/data/es/data_backup "]
!ps:這條語句一定要注意格式,:后面要接一個空格,字符用英文字符
然后,重啟ES

借助kibana進(jìn)行ES語句的操作

# 創(chuàng)建倉庫,創(chuàng)建一個名為my_backup的倉庫
PUT _snapshot/my_backup
{
  "type": "fs",
  "settings": {
    "location": "/data/es/data_backup ",
    "compress": true
  }
}

# 檢查倉庫是否創(chuàng)建成功(my_backup為之前創(chuàng)建的倉庫名稱) 
GET _snapshot/my_backup

# 刪除倉庫
DELETE _snapshot/my_backup

備份索引數(shù)據(jù)

# 給所有索引創(chuàng)建快照
PUT _snapshot/my_backup/snapshot_name        

# 針對具體的index創(chuàng)建快照備份
# 其中my_backup是倉庫名稱,snapshot_name是快照的名字
# ignore_unavailable在創(chuàng)建快照的過程中會忽略不存在的索引,默認(rèn)情況下,如果沒有設(shè)置,在索引不存在的情況下快照請求將會失敗
# include_global_state能夠防止集群的全局狀態(tài)被作為快照的一部分存儲起來
# 多個索引間不要加空格,否則只會對第一個索引操作
PUT _snapshot/my_backup/snapshot_name?wait_for_completion=true
{
  "indices": "index_1,index_2",
  "ignore_unavailable": true,
  "include_global_state": false
}

# 查看指定倉庫下所有快照
GET _snapshot/my_backup/_all

# 查看具體某一個快照的信息
GET _snapshot/my_backup/snapshot_name/_status

# 刪除快照,要指定倉庫名和快照名
# 也能用于取消一個正在進(jìn)行的快照創(chuàng)建過程,會刪除備份了一半的快照
DELETE _snapshot/my_backup/snapshot_name

然后開始進(jìn)行數(shù)據(jù)遷移

在目標(biāo)服務(wù)器上面重復(fù)1-4步,建立一個相同名稱的目錄,作為數(shù)據(jù)目錄
將源集群的備份內(nèi)容(/data/es/data_backup 里的所有文件),復(fù)制到遷移目標(biāo)的倉庫目錄里,接下來就是類似批量導(dǎo)入了
# 如果索引已經(jīng)存在目標(biāo)的集群,需要先關(guān)閉索引,恢復(fù)數(shù)據(jù)后再開啟
POST /index_name/_close
POST _snapshot/my_backup/snapshot_name/_restore
POST /index_name/_open

# 從快照中恢復(fù)指定的索引,并給索引重命名
# indices里面可以寫多個索引名,中間用逗號隔開
# rename_pattern可以寫完整的索引名,也可以用正則匹配索引名的局部
# rename_replacement將替換rename_pattern中匹配到的局部(如果是正則,不是將整個索引名都替換)
#下面這條語句會把index_1,index_2恢復(fù)為restored_index_1和restored_index_2
POST /_snapshot/my_backup/snapshot_1/_restore
{
  "indices": "index_1,index_2",
  "rename_pattern": "index_(.+)",
  "rename_replacement": "restored_index_$1"
}

# 查看恢復(fù)的狀態(tài)
GET _snapshot/my_backup/snapshot_name/_status 

然后,對于集群數(shù)據(jù)的導(dǎo)入,因?yàn)槟撤N特殊的原因,沒有繼續(xù)研究,故棄之,改用另外的手段

logstash數(shù)據(jù)遷移方法,這也是最終使用的方案

使用這種方法,對于數(shù)據(jù)量不是特別龐大的數(shù)據(jù)源還是比較適用的,因?yàn)橐话愕募憾际鞘褂玫腅LK,這樣也不需要其余操作
只需要改動一下logstash的配置就行了。logstash本身支持從各種源搬運(yùn)數(shù)據(jù)到各種目標(biāo)

#配置修改
input{
  elasticsearch {
    hosts => ["IP:9200","IP:9200"]
    index => "*" # * 代表著所有索引,也可以指定特定的索引,把索引名稱配置好就行了
    size => 1000
  }
}

output {
  elasticsearch {
    hosts => ["IP:9200","IP:9200"]
    index => "%{[@metadata][_index]}"
    #document_type => "%{[@metadata][_type]}"  #這type和id這兩項(xiàng)如果不是同時(shí)遷移多個不同的索引的情況下,建議直接指定
    #document_id => "%{[@metadata][_id]}"
  }
 #  stdout { codec => rubydebug }
}

對于一些特殊情況下的logstash數(shù)據(jù)遷移:

# 在源數(shù)據(jù)和目標(biāo)數(shù)據(jù)之間需要做轉(zhuǎn)換的

input{
  elasticsearch {
    hosts => "IP:9200"
    index => "*"
    query => '{"query": {"exists" : { "field" : "requestTime" }}}'
    # 索引過濾語句
  }
}


filter {
  mutate {
    rename => { "appId" => "s_appId" } #字段修改
 
    convert => { "l_requestTime" => "integer"}  #字段類型修改 (_mapping)
    remove_field => ["@timestamp", "@version"]  #過濾掉logstash 自己加上的字段
    add_field =>["s_serverTime","2018-06-15"]  #新增字段
  }

  ruby{
   code=>"
     time =  Time.at(event.get('l_requestTime')/1000).strftime('%Y-%m-%d')
     event.set('s_serverTimestamp', time.to_s)
     "#索引時(shí)間指定
  }
}


output {
  elasticsearch {
    hosts => "IP:9200"
    document_type => "*"
    index => "*"
  }
#  stdout { codec => rubydebug }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容