按條件批量更新elasticsearch中的某個字段的值

1.附一個數(shù)據(jù)示例:


image.png

2.第一種方法

private void batchUpdateTask(int type, String old, String reset) {
        log.info("-----開始更新ES 類型:" + type + "-----" + old);
        //查詢
        NativeSearchQueryBuilder searchQueryBuilder = new NativeSearchQueryBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        switch (type) {
            case 1:
                boolQueryBuilder.must(QueryBuilders.matchQuery("category", old));
                break;
            case 2:
                boolQueryBuilder.must(QueryBuilders.matchQuery("tags.keyword", old));
                break;
            case 3:
                boolQueryBuilder.must(QueryBuilders.matchQuery("types.keyword", old));
                break;
            default:
                return;
        }
        searchQueryBuilder.withQuery(boolQueryBuilder);
        NativeSearchQuery searchQuery = searchQueryBuilder.build();

        SearchHits<EsDataInfo> search = elasticsearchTemplate.search(searchQuery, EsDataInfo.class);
        if (!search.getSearchHits().isEmpty()) {
            int i = 0;
            for (SearchHit<EsDataInfo> searchHit : search.getSearchHits()) {
                EsDataInfo info = searchHit.getContent();
                switch (type) {
                    case 1:
                        info.setCategory(info.getCategory().replace(old, reset));
                        break;
                    case 2:
                        List<String> infoTags = info.getTags();
                        infoTags.remove(old);
                        infoTags.add(reset);
                        info.setTags(infoTags);
                        break;
                    case 3:
                        List<String> infoTypes = info.getTypes();
                        infoTypes.remove(old);
                        infoTypes.add(reset);
                        info.setTypes(infoTypes);
                        break;
                    default:
                }
                i++;
                esDataInfoRepository.save(info);
            }
            log.info("更新了:" + i + "條");
        }
        log.info("-----結束更新ES 類型:" + type + "-----" + old);
    }

3.第二種方法

@Resource
    private ElasticsearchRestClient elasticsearchRestClient;

    private void batchUpdateTask2(int type, String old, String reset) {
        log.info("-----batchUpdateTask2開始更新ES 類型:" + type + "-----" + old);

        // 設置查詢條件,第一個參數(shù)是字段名,第二個參數(shù)是字段的值
        switch (type) {
            case 1:
                MatchPhraseQueryBuilder category = new MatchPhraseQueryBuilder("category", old);
                Script script1 = new Script(ScriptType.INLINE, "painless",
                        "if (ctx._source.category == '" + old + "') {ctx._source.category = '" + reset + "';}",
                        Collections.emptyMap());
                updateByQuery(category, script1);
                break;
            case 2:
                MatchPhraseQueryBuilder tags = new MatchPhraseQueryBuilder("tags.keyword", old);
                // 修改時,新值如果已存在則只刪除舊值
                Script script2 = new Script(ScriptType.INLINE, "painless",
                        "if(ctx._source.tags!=null && ctx._source.tags.length!=0 && ctx._source.tags.contains" +
                                "('"+old+"')){ \n" +
                                "       for (int i = 0; i < ctx._source.tags.length; ++i) { \n" +
                                "           if (ctx._source.tags[i] == '" + old + "'){\n" +
                                "               if (ctx._source.tags.contains('" + reset + "')) { \n" +
                                "                   ctx._source.tags.remove(ctx._source.tags.indexOf" +
                                "('" + old + "')) \n" +
                                "               } else{" +
                                "                   ctx._source.tags[i]=('" + reset + "')" +
                                "               }" +
                                "           } " +
                                "       }" +
                                "}",
                        Collections.emptyMap());
                updateByQuery(tags, script2);
                break;
            case 3:
                MatchPhraseQueryBuilder types = new MatchPhraseQueryBuilder("types.keyword", old);
                Script script3 = new Script(ScriptType.INLINE, "painless",
                        "if(ctx._source.types!=null && ctx._source.types.length!=0 && ctx._source.types.contains" +
                                "('"+old+"')){ \n" +
                                "for (int i = 0; i < ctx._source.types.length; ++i) { \n" +
                                "if (ctx._source.types[i] == '" + old + "'){\n" +
                                "ctx._source.types[i]=('" + reset + "')} " +
                                "}" +
                                "}",
                        Collections.emptyMap());
                updateByQuery(types, script3);
                break;
            default:
                return;
        }
        log.info("-----batchUpdateTask2結束更新ES 類型:" + type + "-----" + old);
    }

    private void updateByQuery(QueryBuilder queryBuilder, Script script) {
        //參數(shù)為索引名,可以不指定,可以一個,可以多個
        UpdateByQueryRequest request = new UpdateByQueryRequest("datainfo_dev");
        // 更新時版本沖突
        request.setConflicts("proceed");
        // 設置查詢條件
        request.setQuery(queryBuilder);
        // 更新最大文檔數(shù)
        request.setMaxDocs(9000);
        // 批次大小
        request.setBatchSize(1000);
        // 傳入 script 對象(執(zhí)行腳本)
        request.setScript(script);

        // 并行
        request.setSlices(2);
        // 使用滾動參數(shù)來控制“搜索上下文”存活的時間
        request.setScroll(TimeValue.timeValueMinutes(10));
        // 如果提供路由,則將路由復制到滾動查詢,將流程限制為匹配該路由值的切分
        //      request.setRouting("=cat");

        // 可選參數(shù)
        // 超時
        request.setTimeout(TimeValue.timeValueMinutes(2));
        // 刷新索引
        request.setRefresh(true);

        RestHighLevelClient client = elasticsearchRestClient.getRHLClient();
        try {
            BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
            log.info("更新了:" + response.getTotal() + "條");
        } catch (IOException e) {
            log.error("updateByQuery error!", e);
        }
    }

4.調用示例

/**
     * 批量更新es中分類、標簽、類型數(shù)據(jù)
     *
     * @param type  1.category 2.tag 3.type
     * @param old   舊數(shù)據(jù)
     * @param reset 新數(shù)據(jù)
     */
    public void batchUpdateName(int type, String old, String reset) {
        ExecutorService executorService = SingleThreadPool.getExecutorService();
        // 開啟線程
        executorService.execute(() -> {
            try {
                Thread.sleep(3 * 1000);
                batchUpdateTask2(type, old, reset);
            } catch (Exception e) {
                log.error("batchUpdateName error.", e);
            }
        });
    }

參考鏈接:Elasticsearch:Painless scripting 編程實踐 (136.la)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容