相關(guān)環(huán)境
Elasticsearch 6.4.2
問題
服務(wù)需要對ES的rest-high-level-client的進(jìn)行bulk代碼的封裝.
public Boolean batchInsertOrUpdate(BulkOpeRequest bulkOpeRequest) throws IOException {
List<BatchRequest> docs = bulkOpeRequest.getDocs();
if (CollectionUtil.isEmpty(docs)) {
throw new RuntimeException("BatchRequest list is empty !");
}
BulkRequest bulkRequest = new BulkRequest();
docs.forEach(batchRequest ->
bulkRequest.add(
new IndexRequest(bulkOpeRequest.getIndex(), bulkOpeRequest.getType(), batchRequest.getId())
.source(XContentType.JSON, batchRequest.getData())
)
);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error(response.buildFailureMessage());
return Boolean.FALSE;
}
return Boolean.TRUE;
}
再進(jìn)行調(diào)用的該方法插入數(shù)據(jù)的時候, 提示出錯, 報錯信息: The number of object passed must be even but was [1], 如圖所示:

異常提示
找了一些網(wǎng)上的解決方案 , 無非就是把什么source的參數(shù)轉(zhuǎn)成map, 什么轉(zhuǎn)成字符串, 都并沒有什么用處, 于是自己找找看相應(yīng)問題.
解決步驟
首先此處根據(jù)source的源碼, 可以看到, source需要傳入的參數(shù)必須要成雙, 這個是異常出現(xiàn)的關(guān)鍵.
public IndexRequest source(XContentType xContentType, Object... source) {
if (source.length % 2 != 0) {
throw new IllegalArgumentException("The number of object passed must be even but was [" + source.length + "]");
}
// other code
}
結(jié)合官方文檔 給出的簡單的例子來猜出, 應(yīng)該是把需要的field和value平鋪下來傳入進(jìn)去, 比如{"name": "zhangsan"}, 需要轉(zhuǎn)化成 "name", "zhangsan" 這種形式. 于是對代碼做了一個修改.
// other code...
/**
* 批量插入/更新文檔
* @return
*/
public Boolean batchInsertOrUpdate(BulkOpeRequest bulkOpeRequest) throws IOException {
List<BatchRequest> docs = bulkOpeRequest.getDocs();
if (CollectionUtil.isEmpty(docs)) {
throw new RuntimeException("BatchRequest list is empty !");
}
BulkRequest bulkRequest = new BulkRequest();
docs.forEach(batchRequest ->
bulkRequest.add(
new IndexRequest(bulkOpeRequest.getIndex(), bulkOpeRequest.getType(), batchRequest.getId())
.source(XContentType.JSON, toArgs(batchRequest.getData()))
)
);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error(response.buildFailureMessage());
return Boolean.FALSE;
}
return Boolean.TRUE;
}
/**
* 將批量數(shù)據(jù)轉(zhuǎn)換成es參數(shù)數(shù)組
* @param data
* @return
*/
private Object[] toArgs(Map<String, Object> data) {
List<Object> args = Lists.newArrayList();
if (CollectionUtil.isEmpty(data)) {
throw new RuntimeException("批量操作數(shù)據(jù)不可為空 !");
}
data.forEach((key, value) -> {
args.add(key);
args.add(value);
});
return args.toArray();
}
// other code...
經(jīng)過測試, 問題解決.