最近接到一個(gè)需求,需要接入es日志數(shù)據(jù)到hdfs,進(jìn)行分析,網(wǎng)上查找了一下資料,總結(jié)一下方法大致有如下幾種
-
hive本身直接支持連接es
可直接參考鏈接 http://lxw1234.com/archives/2015/12/585.htm
說(shuō)一下這種方式的弊端:- (a)、es集群通常會(huì)為了安全考慮加入用戶認(rèn)證和證書認(rèn)證,上述方式不支持
- (b)、hive定義表結(jié)構(gòu)的時(shí)候字段類型映射必須與es匹配,而當(dāng)es文檔type有字段類型變更之后,hive無(wú)法很好的識(shí)別,這就會(huì)hive報(bào)類似類型轉(zhuǎn)換的錯(cuò)
-
es提供了兩種java api用來(lái)操作es
es的官方api地址:https://www.elastic.co/guide/en/elasticsearch/client/index.html- (a)、transport接口即為TCP連接
因?yàn)榧鹤隽擞脩粽J(rèn)證和證書認(rèn)證,采用如下方式連接es,遺憾的是一直連不上
因?yàn)闀r(shí)間問(wèn)題,暫時(shí)沒(méi)解決這個(gè)問(wèn)題,希望有同學(xué)有空能幫忙解決,謝謝了
- (a)、transport接口即為TCP連接
Exception in thread "main" NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{3HUrRF8JQGCz_TlwhQOFiA}{10.17.2.79}{10.17.2.79:9305}]]
Settings settings = Settings.builder()
.put("cluster.name", esDataToText.cluster)
.put("xpack.security.user", esDataToText.userPw)
.put("xpack.ssl.key", esDataToText.keyPath)
.put("xpack.ssl.certificate", esDataToText.crtPath)
.put("xpack.ssl.certificate_authorities", esDataToText.cacrtPath)
.put("xpack.security.transport.ssl.enabled", true)
.put("client.transport.ping_timeout", "100s")
.build();
try {
TransportClient client = new PreBuiltXPackTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esDataToText.urls), esDataToText.port));
SearchResponse response = client.prepareSearch("ndf.dlp")
.setQuery(QueryBuilders.matchAllQuery())
.execute().actionGet();
SearchHits resultHits = response.getHits();
Long result_cnt = resultHits.totalHits;
logger.info("數(shù)據(jù)量為:" + result_cnt);
} catch (UnknownHostException e) {
e.printStackTrace();
}
- (b)、rest接口訪問(wèn)es即為http接口
這種方式以http接口的形式訪問(wèn),因?yàn)閑s集群是采用ssl認(rèn)證,所以我們先進(jìn)行認(rèn)證- (1) 將證書文件合成jks文件,es官網(wǎng)API是操作KeyStore
keytool -import -v -trustcacerts -file niudingfeng.crt -keystore my_keystore.jks -keypass password -storepass password - (2) 用戶密碼驗(yàn)證以及https認(rèn)證
- (1) 將證書文件合成jks文件,es官網(wǎng)API是操作KeyStore
//用戶密碼驗(yàn)證
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("bigdata", "123456qwerty"));
//ssl證書驗(yàn)證
SSLContextBuilder sslBuilder = null;
try {
sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
} catch (KeyStoreException e) {
e.printStackTrace();
}
以上為認(rèn)證代碼
- (3) 連接es獲取數(shù)據(jù)
注意:http接口默認(rèn)返回十條數(shù)據(jù),如需要返回更多則需要制定from size
因?yàn)閑s版本問(wèn)題,無(wú)法用到官方j(luò)ava high level rest client,最低版本要求為5.6,故不推薦使用這種方式
RestClient restClient = RestClient.builder(new HttpHost("testelk002.niudingfeng.com", 9205, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setDefaultCredentialsProvider(credentialsProvider);
}
})
.build();
Response response = null;
try {
String method = "GET";
String endpoint = "/ndf.dlp/_search";
String queryStr = "{\n" +
"\t\t\"query\":{ \"range\": {\n" +
" \t\t\t\t\t\"@timestamp\": {\n" +
" \t\t\t\t\t\"gte\": \"2017-12-27\",\n" +
" \t\t\t\t\t\"lte\": \"2017-12-28\"\n" +
" \t\t\t\t\t\t\t}\n" +
" \t\t\t\t\t\t}\n" +
"\t\t\t\t}\n" +
"}";
// String queryStr = "{\"query\":{\"match_all\":{}}}";
HttpEntity entity = new NStringEntity(queryStr, ContentType.APPLICATION_JSON);
response = restClient.performRequest(method,endpoint,Collections.<String, String>emptyMap(),entity);
String res = EntityUtils.toString(response.getEntity());
String resFile = "D:\\java\\es\\res.txt";
File file = new File(resFile);
if(file.exists()){
file.delete();
}
BufferedWriter bw = new BufferedWriter(new FileWriter(resFile));
bw.write(res);
bw.close();
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
-
最后我們采用Python api來(lái)實(shí)現(xiàn)
Python查詢es也有兩種方式
- (a)、search
res = es.search(index='index_name',
doc_type=’type_name’, body=es_query, request_timeout=999999,params={“search_type”:”query_and_fetch”})
說(shuō)明:search返回的結(jié)果為字典不是生成器,和在sense上查詢返回的結(jié)果相同,信息比較全,
如果數(shù)據(jù)量大,分頁(yè)用from size控制,但是會(huì)排序,性能比較差
- (b)、helps.scan
es_client = es.Elasticsearch(
[host],
http_auth=(user, pswd),
port=port,
use_ssl=True,
verify_certs=False,
timeout=300)
res = helpers.scan(es_client, index=index, query=query, scroll='1m',request_timeout=999999,preserve_order=False)
說(shuō)明:scan是對(duì)滿足語(yǔ)句的結(jié)果進(jìn)行掃描,全部返回下來(lái),結(jié)果為一個(gè)生成器需要解析,scroll為滾屏?xí)r間參數(shù),不會(huì)進(jìn)行排序,建議使用這種方式