記錄一次ES數(shù)據(jù)庫的遷移
起因是由于老庫中的模板映射不是我們想要的模板,需要將數(shù)據(jù)遷移到新的ES集群中,新集群已經(jīng)執(zhí)行了我們想要的模板
PS:由于之前沒有整過整過東西,所以,走了不少的彎路,研究好幾種方法
Elasticsearch-Migration方法
elasticsearch-migration支持:多個版本間的數(shù)據(jù)遷移,使用scroll+bulk的接口原理
1.安裝和使用
elasticsearch-migration支持linux,windows等不同系統(tǒng),下載解壓后可以直接運(yùn)行
安裝:
源碼地址:https://github.com/medcl/esm
編譯好的地址:https://github.com/medcl/elasticsearch-dump/releases
有各個版本可以選擇
直接下載下來在選定目錄解壓即可
使用示例
./esm -s http://IP:9200 -d http://IP:9200/ -x 索引名稱 -w=5 -b=10 -c 10000
主要幾個參數(shù):
索引名稱可以使用*進(jìn)行模糊匹配
-w 表示線程數(shù)
-b 表示一次bulk請求數(shù)據(jù)大小,單位MB默認(rèn) 5M
-c 一次scroll請求數(shù)量
實(shí)際使用下來,在測試環(huán)境的話,對集群數(shù)據(jù)進(jìn)行遷移,只需要將源和目標(biāo)設(shè)置為兩個集群的主節(jié)點(diǎn)就行了,

但是有兩個集群的服務(wù)器的配置必須對等,如果服務(wù)器配置不均衡的話,會出現(xiàn)數(shù)據(jù)積壓,導(dǎo)致程序中斷。
還有這個工具是用GO編寫的,在部分環(huán)境不支持的服務(wù)器上面還用不了,所以最終沒有采用這種方法。
設(shè)置快照鏡像導(dǎo)入的方法
這個方法使用于大數(shù)據(jù)量的遷移
首先,設(shè)置一個目錄,這是一個備份的存儲目錄,放在源數(shù)據(jù)的服務(wù)器上
sudo mkdir /data/es/data_backup
# 該目錄要是elasticsearch的啟動用戶可訪問的,我們的環(huán)境一般是普通用戶(如:centos)
chown -R centos:centos /data/es/data_backup
修改elasticsearch的配置文件 elasticsearch.yml
vi ../../elasticsearch.yml
#在文件最后添加一條
path.repo: ["/data/es/data_backup "]
!ps:這條語句一定要注意格式,:后面要接一個空格,字符用英文字符
然后,重啟ES
借助kibana進(jìn)行ES語句的操作
# 創(chuàng)建倉庫,創(chuàng)建一個名為my_backup的倉庫
PUT _snapshot/my_backup
{
"type": "fs",
"settings": {
"location": "/data/es/data_backup ",
"compress": true
}
}
# 檢查倉庫是否創(chuàng)建成功(my_backup為之前創(chuàng)建的倉庫名稱)
GET _snapshot/my_backup
# 刪除倉庫
DELETE _snapshot/my_backup
備份索引數(shù)據(jù)
# 給所有索引創(chuàng)建快照
PUT _snapshot/my_backup/snapshot_name
# 針對具體的index創(chuàng)建快照備份
# 其中my_backup是倉庫名稱,snapshot_name是快照的名字
# ignore_unavailable在創(chuàng)建快照的過程中會忽略不存在的索引,默認(rèn)情況下,如果沒有設(shè)置,在索引不存在的情況下快照請求將會失敗
# include_global_state能夠防止集群的全局狀態(tài)被作為快照的一部分存儲起來
# 多個索引間不要加空格,否則只會對第一個索引操作
PUT _snapshot/my_backup/snapshot_name?wait_for_completion=true
{
"indices": "index_1,index_2",
"ignore_unavailable": true,
"include_global_state": false
}
# 查看指定倉庫下所有快照
GET _snapshot/my_backup/_all
# 查看具體某一個快照的信息
GET _snapshot/my_backup/snapshot_name/_status
# 刪除快照,要指定倉庫名和快照名
# 也能用于取消一個正在進(jìn)行的快照創(chuàng)建過程,會刪除備份了一半的快照
DELETE _snapshot/my_backup/snapshot_name
然后開始進(jìn)行數(shù)據(jù)遷移
在目標(biāo)服務(wù)器上面重復(fù)1-4步,建立一個相同名稱的目錄,作為數(shù)據(jù)目錄
將源集群的備份內(nèi)容(/data/es/data_backup 里的所有文件),復(fù)制到遷移目標(biāo)的倉庫目錄里,接下來就是類似批量導(dǎo)入了
# 如果索引已經(jīng)存在目標(biāo)的集群,需要先關(guān)閉索引,恢復(fù)數(shù)據(jù)后再開啟
POST /index_name/_close
POST _snapshot/my_backup/snapshot_name/_restore
POST /index_name/_open
# 從快照中恢復(fù)指定的索引,并給索引重命名
# indices里面可以寫多個索引名,中間用逗號隔開
# rename_pattern可以寫完整的索引名,也可以用正則匹配索引名的局部
# rename_replacement將替換rename_pattern中匹配到的局部(如果是正則,不是將整個索引名都替換)
#下面這條語句會把index_1,index_2恢復(fù)為restored_index_1和restored_index_2
POST /_snapshot/my_backup/snapshot_1/_restore
{
"indices": "index_1,index_2",
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}
# 查看恢復(fù)的狀態(tài)
GET _snapshot/my_backup/snapshot_name/_status
然后,對于集群數(shù)據(jù)的導(dǎo)入,因?yàn)槟撤N特殊的原因,沒有繼續(xù)研究,故棄之,改用另外的手段
logstash數(shù)據(jù)遷移方法,這也是最終使用的方案
使用這種方法,對于數(shù)據(jù)量不是特別龐大的數(shù)據(jù)源還是比較適用的,因?yàn)橐话愕募憾际鞘褂玫腅LK,這樣也不需要其余操作
只需要改動一下logstash的配置就行了。logstash本身支持從各種源搬運(yùn)數(shù)據(jù)到各種目標(biāo)
#配置修改
input{
elasticsearch {
hosts => ["IP:9200","IP:9200"]
index => "*" # * 代表著所有索引,也可以指定特定的索引,把索引名稱配置好就行了
size => 1000
}
}
output {
elasticsearch {
hosts => ["IP:9200","IP:9200"]
index => "%{[@metadata][_index]}"
#document_type => "%{[@metadata][_type]}" #這type和id這兩項(xiàng)如果不是同時(shí)遷移多個不同的索引的情況下,建議直接指定
#document_id => "%{[@metadata][_id]}"
}
# stdout { codec => rubydebug }
}
對于一些特殊情況下的logstash數(shù)據(jù)遷移:
# 在源數(shù)據(jù)和目標(biāo)數(shù)據(jù)之間需要做轉(zhuǎn)換的
input{
elasticsearch {
hosts => "IP:9200"
index => "*"
query => '{"query": {"exists" : { "field" : "requestTime" }}}'
# 索引過濾語句
}
}
filter {
mutate {
rename => { "appId" => "s_appId" } #字段修改
convert => { "l_requestTime" => "integer"} #字段類型修改 (_mapping)
remove_field => ["@timestamp", "@version"] #過濾掉logstash 自己加上的字段
add_field =>["s_serverTime","2018-06-15"] #新增字段
}
ruby{
code=>"
time = Time.at(event.get('l_requestTime')/1000).strftime('%Y-%m-%d')
event.set('s_serverTimestamp', time.to_s)
"#索引時(shí)間指定
}
}
output {
elasticsearch {
hosts => "IP:9200"
document_type => "*"
index => "*"
}
# stdout { codec => rubydebug }
}