1. 所需驅(qū)動和工具
- 安裝mssql和mysql的jdbc驅(qū)動請參考上一篇:elasticsearch環(huán)境搭建過程
- 如果在windows下使用導(dǎo)elasticsearch的命令:推薦使用cygwin
- 用kopf插件查看導(dǎo)數(shù)據(jù)進(jìn)度,若導(dǎo)入中斷或錯誤,也可使用kopf刪除該索引,重新導(dǎo)入
2. 導(dǎo)elasticsearch參數(shù)對應(yīng)名稱
elasticsearch對應(yīng)參數(shù):
elasticsearch服務(wù)器IP:192.168.1.1
索引名稱:indexname #可以將其看成關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)庫名稱
類型名稱:typename #可以將其看成關(guān)系型數(shù)據(jù)庫中的表名稱
映射:mapping #可以將其看成關(guān)系型數(shù)據(jù)庫中的表結(jié)構(gòu),默認(rèn)分詞,可進(jìn)行設(shè)置
elasticsearch端口號:9200
who_jdbc_river: 每次導(dǎo)入數(shù)據(jù),此處不能相同
關(guān)系型數(shù)據(jù)庫對應(yīng)參數(shù):
服務(wù)器IP:192.168.1.2
用戶:此處分別以 mssql的sa 和 mysql的root舉例
數(shù)據(jù)庫名稱:dbname
表名稱/視圖名:tbname
密碼:dbpasswd
3. 根據(jù)需求創(chuàng)建映射
視圖(mapping)相當(dāng)于關(guān)系型數(shù)據(jù)庫的表結(jié)構(gòu),在執(zhí)行之后導(dǎo)入命令前最好先創(chuàng)建索引和mapping,設(shè)置好數(shù)據(jù)類型和是否分詞等關(guān)鍵參數(shù),以免以后調(diào)用數(shù)據(jù)或者使用數(shù)據(jù)時陷入困境,最后不得不重新導(dǎo)入。
創(chuàng)建索引indexname:
curl -XPUT 192.168.1.1:9200/indexname -d '{}'
創(chuàng)建映射
curl -XPUT 192.168.1.1:9200/indexname/typename/_mapping -d '{
"typename" : {
"properties" : {
"domain" : { #數(shù)據(jù)字段名稱
"type" : "string", #指定類型
"index": "not_analyzed" #指定不分詞
},
"record_type" : {
"type" : "string",
"index": "not_analyzed"
},
"record_value" : {
"type" : "string",
"index": "not_analyzed"
}
}
}
}'
4. 從mysql數(shù)據(jù)庫導(dǎo)入到elasticsearch
curl -XPUT '192.168.1.1:9200/_river/who_jdbc_river/_meta' -d '{
"type" : "jdbc",
"shedule" : null,
"jdbc" : {
"url" : "jdbc:mysql://192.168.1.2:3306/dbname",
"user" : "root",
"password" : "dbpasswd",
"sql" : "select * from tbname",
"index" : "indexname",
"type" : "typename"
}
}'
5. 從mssql數(shù)據(jù)庫導(dǎo)入到elasticsearch
curl -XPUT '192.168.1.1:9200/_river/who_jdbc_river/_meta' -d '
{
"type" : "jdbc",
"jdbc": {
"url":"jdbc:sqlserver://192.168.1.2:1433;databaseName=dbname",
"user":"sa",
"password":"dbpasswd",
"sql":"select * from dbo.tbname",
"index" : "indexname",
"type" : "typename"
}
}'
6. csv或txt格式數(shù)據(jù)導(dǎo)入到elasticsearch
注意事項:
- 如果文件中的第一行未設(shè)置字段名稱可用下方命令行添加,或者在導(dǎo)入命令中修改指定參數(shù)添加
sed -i '1 s/^/username,email,password\n/' user.txt #為user.txt首行添加字段名username,email,password
- 在導(dǎo)入過程中,要導(dǎo)入的csv文件的文件名 如user.csv 會變成user.csv.processing,導(dǎo)入成功后user.csv.processing.imported
如果導(dǎo)入過程中出現(xiàn)錯誤中途斷開,在重新導(dǎo)數(shù)據(jù)前記得先將文件名user.csv.processing.imported 改成user.csv 否則會提示找不到文件
curl -XPUT 192.168.1.1:9200/_river/who_jdbc_river/_meta -d '
{
"type" : "csv", #指定文件類型
"csv_file" : {
"folder" : "http://home//black3y//", #要導(dǎo)入的文件的文件路徑,注意注釋斜杠,window下:D://isc//b
"filename_pattern" : "user.csv", #待導(dǎo)入數(shù)據(jù)文件名稱(txt或csv),支持同類型所有文件(.*\\.csv$)
"poll":"5m",
"fields" : [
"username",
"email",
"password"
],
"first_line_is_header" : "false", #true:將第一行作為字段名,false:將fields中的信息作為字段名
"field_separator" : "\t", #tab對應(yīng)\t,逗號直接寫,
"field_id" : "id",
"field_id_include" : "false",
"concurrent_requests" : "1",
"charset" : "UTF-8",
"script_before_all": "/path/to/before_all.sh",
"script_after_all": "/path/to/after_all.sh",
"script_before_file": "/path/to/before_file.sh",
"script_after_file": "/path/to/after_file.sh"
},
"index" : {
"index" : "indexname",
"type" : "typename",
"bulk_size" : 100,
"bulk_threshold" : 10
}
}'
也可使用head插件執(zhí)行上面的命令

使用head插件導(dǎo)入csv數(shù)據(jù)
7. json格式數(shù)據(jù)導(dǎo)入elasticsearch
贈上現(xiàn)成好用的python腳本 ( json2es.py )
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from itertools import islice
import json , sys
from elasticsearch import Elasticsearch , helpers
import threading
_index = 'indexname' #修改為索引名
_type = 'typename' #修改為類型名
es_url = 'http://192.168.1.1:9200/' #修改為elasticsearch服務(wù)器
reload(sys)
sys.setdefaultencoding('utf-8')
es = Elasticsearch(es_url)
#es.indices.create(index='webinfo', ignore=400,body = mapping)
es.indices.create(index=_index, ignore=400)
chunk_len = 10
num = 0
def bulk_es(chunk_data):
bulks=[]
try:
for i in xrange(chunk_len):
bulks.append({
"_index": _index,
"_type": _type,
"_source": chunk_data[i]
})
helpers.bulk(es, bulks)
except:
pass
with open(sys.argv[1]) as f:
while True:
lines = list(islice(f, chunk_len))
num =num +chunk_len
sys.stdout.write('\r' + 'num:'+'%d' % num)
sys.stdout.flush()
bulk_es(lines)
if not lines:
print "\n"
print "task has finished"
break
使用方法:

json2es.py使用方法