Flink sink數(shù)據(jù)到帶有賬號(hào)密碼的ES

話不多說(shuō)直接上代碼

/*********************************** 寫(xiě)數(shù)據(jù)到ElasticSearch ***************************************/

//從配置文件中獲取es的地址

ListhttpHosts =new ArrayList<>();

httpHosts.add(new HttpHost(parameterTool.getRequired("ELASTICSEARCH_HOST"),9200,"http"));

/*//從配置文件中讀取 bulk flush size,代表一次批處理的數(shù)量int bulkSize = parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS", 40);*/

// 創(chuàng)建elasticsearch Sink

ElasticsearchSink.BuilderesSinkBuilder =new ElasticsearchSink.Builder<>(

httpHosts,

new ElasticsearchSinkFunction() {

public IndexRequest createIndexRequest(String element) {

Mapjson =new HashMap<>();

String line =element.substring(element.split("\\|\\^\\|")[0].length() +3);

KafkaEvent kafkaEvent =new KafkaEvent().fromString(line);

json.put("dts_id",kafkaEvent.getDid().toString());

json.put("business_time",kafkaEvent.getBt());

json.put("protocol_id",kafkaEvent.getPi());

json.put("user_view_status",kafkaEvent.getUvs().longValue());

return Requests.indexRequest()

.id(element.split("\\|\\^\\|")[0])

.index(parameterTool.getRequired("ES_INDEX_NAME"))

.type(parameterTool.getRequired("ES_INDEX_TYPE"))//ES_INDEX_TYPE

? ? ? ? ? ? ? ? ? ? ? ? .source(json);

}

@Override

? ? ? ? ? ? public void process(String element,RuntimeContext ctx,RequestIndexer indexer) {

indexer.add(createIndexRequest(element));

}

}

);

//批處理最大數(shù)

esSinkBuilder.setBulkFlushMaxActions(parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS",40));

esSinkBuilder.setRestClientFactory(

new RestClientFactory() {

@Override

? ? ? ? ? ? public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {

final CredentialsProvider credentialsProvider =new BasicCredentialsProvider();

credentialsProvider.setCredentials(AuthScope.ANY,

new UsernamePasswordCredentials(parameterTool.getRequired("ELASTICSEARCH_NAME"),

parameterTool.getRequired("ELASTICSEARCH_PASSWD")));

restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {//設(shè)置自定義http客戶(hù)端配置

? ? ? ? ? ? ? ? ? ? public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

httpClientBuilder.disableAuthCaching();

return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

}

})/*.setMaxRetryTimeoutMillis(2000)*/;

}

}

);

//將去重后的數(shù)據(jù)寫(xiě)入到ElasticSearch中

resulted.addSink(esSinkBuilder.build());

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • !/usr/bin/env python -- coding: utf-8 -- '''根據(jù)某個(gè)traceId去調(diào)...
    小七奇奇閱讀 669評(píng)論 0 0
  • 此文是關(guān)于elasticsearch in action書(shū)部分重點(diǎn)讀書(shū)筆記。 Chapter 2 Diving i...
    shamumu閱讀 2,725評(píng)論 0 1
  • 平臺(tái)內(nèi)的產(chǎn)品有一個(gè)數(shù)據(jù)分析,統(tǒng)計(jì)平臺(tái)內(nèi)某個(gè)商戶(hù)某個(gè)時(shí)間段內(nèi)(今天、昨天、7天內(nèi)、30天內(nèi)……)的各種數(shù)據(jù)分析,這種...
    Chting閱讀 713評(píng)論 1 3
  • 英文文檔,一開(kāi)始我也是抗拒的,邊翻譯邊看,也就花費(fèi)了1個(gè)小時(shí)基本就閱讀過(guò)了,我的英文基礎(chǔ)其實(shí)很差。附上鏈接:鏈接:...
    lonecolonel閱讀 10,408評(píng)論 3 1
  • 1 注意import的StreamExecutionEnvironment // java 的頭是 import ...
    君劍閱讀 9,392評(píng)論 3 3

友情鏈接更多精彩內(nèi)容