話不多說(shuō)直接上代碼
/*********************************** 寫(xiě)數(shù)據(jù)到ElasticSearch ***************************************/
//從配置文件中獲取es的地址
ListhttpHosts =new ArrayList<>();
httpHosts.add(new HttpHost(parameterTool.getRequired("ELASTICSEARCH_HOST"),9200,"http"));
/*//從配置文件中讀取 bulk flush size,代表一次批處理的數(shù)量int bulkSize = parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS", 40);*/
// 創(chuàng)建elasticsearch Sink
ElasticsearchSink.BuilderesSinkBuilder =new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction() {
public IndexRequest createIndexRequest(String element) {
Mapjson =new HashMap<>();
String line =element.substring(element.split("\\|\\^\\|")[0].length() +3);
KafkaEvent kafkaEvent =new KafkaEvent().fromString(line);
json.put("dts_id",kafkaEvent.getDid().toString());
json.put("business_time",kafkaEvent.getBt());
json.put("protocol_id",kafkaEvent.getPi());
json.put("user_view_status",kafkaEvent.getUvs().longValue());
return Requests.indexRequest()
.id(element.split("\\|\\^\\|")[0])
.index(parameterTool.getRequired("ES_INDEX_NAME"))
.type(parameterTool.getRequired("ES_INDEX_TYPE"))//ES_INDEX_TYPE
? ? ? ? ? ? ? ? ? ? ? ? .source(json);
}
@Override
? ? ? ? ? ? public void process(String element,RuntimeContext ctx,RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
//批處理最大數(shù)
esSinkBuilder.setBulkFlushMaxActions(parameterTool.getInt("ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS",40));
esSinkBuilder.setRestClientFactory(
new RestClientFactory() {
@Override
? ? ? ? ? ? public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
final CredentialsProvider credentialsProvider =new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(parameterTool.getRequired("ELASTICSEARCH_NAME"),
parameterTool.getRequired("ELASTICSEARCH_PASSWD")));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {//設(shè)置自定義http客戶(hù)端配置
? ? ? ? ? ? ? ? ? ? public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})/*.setMaxRetryTimeoutMillis(2000)*/;
}
}
);
//將去重后的數(shù)據(jù)寫(xiě)入到ElasticSearch中
resulted.addSink(esSinkBuilder.build());