背景:
需要實現(xiàn)根據(jù)某個條件更新es里面的一批數(shù)據(jù),如果一條條查出來然后去更新的話就效率比較低。
實現(xiàn):
ES接口調(diào)用
POST http://localhost:9200/article/_update_by_query
{
"script": {
"source":"ctx._source['status']=0;"
},
"query": {
"term": {
"userId": 1
}
}
}
Spring data
環(huán)境:jdk8
語言:kotlin
框架:spring boot 2.4.5
寫了個服務,可以用注入的方式調(diào)用方法。方法的入?yún)ㄟ^濾條件和要修改的值。
@Component
class EsSearchService(
private val esClient: RestHighLevelClient
) {
fun updateByQuery(status: Int, userId: String): BulkByScrollResponse {
val termQuery = TermQueryBuilder("userId", userId)
val script = Script(
ScriptType.INLINE,
"painless",
"ctx._source['status'] = params['status'];",
mapOf("status" to status)
)
val request = UpdateByQueryRequest("article")
request.setScript(script).setQuery(termQuery)
request.maxRetries = 10
request.isAbortOnVersionConflict = true
return esClient.updateByQuery(request , RequestOptions.DEFAULT)
}
}
后續(xù)
去GitHub上查看spring-data-elasticsearch的源碼,發(fā)現(xiàn)4.2.x的版本用updateByQuery更加方便了。
官方給出的例子:
https://github.com/spring-projects/spring-data-elasticsearch/blob/4.2.x/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java
void shouldDoUpdateByQueryForExistingDocument() {
// given
final String documentId = nextIdAsString();
final String messageBeforeUpdate = "some test message";
final String messageAfterUpdate = "test message";
final SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message(messageBeforeUpdate)
.version(System.currentTimeMillis()).build();
final IndexQuery indexQuery = getIndexQuery(sampleEntity);
operations.index(indexQuery, index);
indexOperations.refresh();
final NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
final UpdateQuery updateQuery = UpdateQuery.builder(query)
.withScriptType(org.springframework.data.elasticsearch.core.ScriptType.INLINE)
.withScript("ctx._source['message'] = params['newMessage']").withLang("painless")
.withParams(Collections.singletonMap("newMessage", messageAfterUpdate)).withAbortOnVersionConflict(true)
.build();
// when
operations.updateByQuery(updateQuery, index);
// then
SampleEntity indexedEntity = operations.get(documentId, SampleEntity.class, index);
assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate);
}
可惜想用的話需要升級spring boot版本,項目依賴太多了,沒有升級成功,以下是對應的版本表。官方文檔

image.png