Elasticsearch:Java RestHighLevelClient 客戶端操作es

摘要:ElasticsearchJava

除了HTTP這種跨語言的訪問方式之外,es支持Java,Python等多種語言的API,整理Java客戶的操作es的相關(guān)代碼

  • 寫入操作:單文檔寫入,批量寫入,單文檔更新,批量更新,有則插入無則更新,條件更新插入
  • 讀取操作:term過濾,range過濾,分頁查詢,游標(biāo)查詢,返回指定字段
  • 刪除操作:刪除單條文檔,條件刪除
  • 工具類代碼: 單例模式

依賴準(zhǔn)備

官方推薦使用高級客戶端RestHighLevelClient,屏蔽底層專注于所有業(yè)務(wù)邏輯,依賴如下,本例采用6.7.2的es

    <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.7.2</version>
        </dependency>

Java客戶端連接和快速開始

先看kibana中一條文檔

    {
        "_index" : "hotel",
        "_type" : "_doc",
        "_id" : "001",
        "_score" : 1.0,
        "_source" : {
          "name" : "a賓館",
          "city" : "上海",
          "price" : 3.13
        }

Java客戶端連接查詢該文檔測試代碼如下

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;


import java.io.IOException;
import java.util.Map;

public class ElasticsearchTest {
    public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(QueryBuilders.matchQuery("city", "上海"));
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        RestStatus restStatus = searchResponse.status();
        System.out.println(restStatus);
        if (restStatus == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit: searchHits) {
                System.out.println("id:" + searchHit.getId());
                System.out.println("index:" + searchHit.getIndex());
                System.out.println("score:" + searchHit.getScore());
                Map<String, Object> map = searchHit.getSourceAsMap();
                System.out.println("name:" + (String) map.get("name"));
                System.out.println("city:" + (String) map.get("city"));
                System.out.println("price:" + (Double) map.get("price"));
            }
        }
        restHighLevelClient.close();
    }
}

idea的終端輸出如下

OK
id:001
index:hotel
score:0.2876821
name:a賓館
city:上海
price:3.13

連接部分使用RestHighLevelClientRestClient,HttpHost實現(xiàn),通HTTP請求連接得到es客戶端,再以此構(gòu)建高階客戶端。搜索部分構(gòu)建一個SearchRequest對象,其中SearchSourceBuilder關(guān)鍵搜索語句DSL,然后restHighLevelClient執(zhí)行search操作得到返回SearchResponse,返回對象調(diào)用getHits得到SearchHits,遍歷SearchHits即可拿到對應(yīng)的文檔的字段數(shù)據(jù),嘴周關(guān)閉客戶端連接


寫入操作

(1)根據(jù)_id單條文檔寫入

單挑文檔寫入需要創(chuàng)建IndexRequest對象,設(shè)置索引名稱,類型名稱,id名稱,以及使用source傳入文檔字段的Map對象,在執(zhí)行寫入時使用客戶端的index方法把IndexRequest傳入即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        Map<String, Object> map = new HashMap<String, Object>() {{
           put("name", "b賓館");
           put("city", "揚州");
           put("price", 5.12);
        }};
        IndexRequest indexRequest = new IndexRequest("hotel").type("_doc").id("002").source(map);
        IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }
(2)批量寫入文檔

批量寫入需要創(chuàng)建BulkRequest,多條數(shù)據(jù)每條創(chuàng)建一個IndexRequest對象設(shè)置index,type,id和數(shù)據(jù)Map,將這些IndexRequest對象條件到BulkRequest中,調(diào)用客戶端的bulk方法執(zhí)行即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        List<Map<String, Object>> list = new ArrayList<>();
        list.add(new HashMap<String, Object>() {{
            put("id", "003");
           put("name", "c賓館");
           put("city", "綿竹");
           put("price", 7.54);
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "004");
            put("name", "d賓館");
            put("city", "石家莊");
            put("price", 17.34);
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "005");
            put("name", "e賓館");
            put("city", "梧州");
            put("price", 21.92);
        }});

        BulkRequest bulkRequest = new BulkRequest();
        list.forEach(s -> bulkRequest.add(new IndexRequest().index("hotel").type("_doc").id(s.get("id").toString()).source(s)));
        bulkRequest.timeout(TimeValue.timeValueSeconds(5));
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }

(3)更新單條文檔

更新單條文檔需要創(chuàng)建UpdateRequest對象,設(shè)置index,type,id和文檔字段數(shù)據(jù),調(diào)用客戶端的update方法執(zhí)行即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        UpdateRequest updateRequest = new UpdateRequest("hotel", "_doc", "001")
                .doc(new HashMap<String, Object>() {{
                    put("name", "h賓館");
                    put("price", 0.0);
                }});
        UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
        System.out.println(RequestOptions.DEFAULT);
        restHighLevelClient.close();
    }

對于有則更新無則插入的情況,UpdateRequest在設(shè)置doc之后再設(shè)置以下upsert即可,其他一樣

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        UpdateRequest updateRequest = new UpdateRequest("hotel", "_doc", "008");
        updateRequest.doc(new HashMap<String, Object>() {{
            put("name", "k賓館");
            put("price", 3.0);
        }});
        updateRequest.upsert(new HashMap<String, Object>() {{
            put("name", "k賓館");
            put("price", 3.0);
        }});
        UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
        System.out.println(RequestOptions.DEFAULT);
        restHighLevelClient.close();
    }

(4)批量更新文檔

創(chuàng)建BulkRequest對象,填裝UpdateRequest,調(diào)用客戶端的bulk即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        BulkRequest bulkRequest = new BulkRequest();

        List<Map<String, Object>> list = new ArrayList<>();
        list.add(new HashMap<String, Object>() {{
            put("id", "003");
            put("name", "o賓館");
            put("city", "綿竹");
            put("price", 7.54);
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "004");
            put("name", "oo賓館");
            put("city", "石家莊");
            put("price", 17.34);
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "011");
            put("name", "e賓館");
            put("city", "梧州");
            put("price", 21.92);
        }});
        list.forEach(s -> bulkRequest.add(new UpdateRequest("hotel", "_doc", s.get("id").toString())
        .doc(s).upsert(s)));
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }

(5)根據(jù)條件批量更新文檔

創(chuàng)建UpdateByQueryRequest對象,分別設(shè)置setQuerysetScript,分別代表條件和更新語句,最后客戶端調(diào)用updateByQuery更新

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));


        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest("hotel");
        updateByQueryRequest.setQuery(new TermQueryBuilder("city", "蘇州"));
        updateByQueryRequest.setScript(new Script("ctx._source['city']='杭州'"));
        restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }

搜索操作

(1)term精確搜索

根據(jù)QueryBuilders.termQuery("_id", "001")來找到指定id的文檔

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.termQuery("_id", "001"));
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            System.out.println(searchHits.getAt(0).getSourceAsMap().get("city"));
        }

        restHighLevelClient.close();
    }

(2)range范圍搜索

通過QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0)來設(shè)置range查詢的DSL

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit: searchHits) {
                System.out.println(searchHit.getSourceAsMap());
            }
        }

        restHighLevelClient.close();
    }

輸出結(jié)果如下

{city=梧州, price=21.92, name=e賓館, id=005}
{city=石家莊, price=17.34, name=oo賓館, id=004}
{city=梧州, price=21.92, name=e賓館, id=011}

(3)分頁查詢

searchSourceBuilder中設(shè)置fromsize參數(shù)即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().from(1).size(1).query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit: searchHits) {
                System.out.println(searchHit.getSourceAsMap());
            }
        }
        restHighLevelClient.close();
    }

查看運行結(jié)果,相比于不加from,size,輸出了從索引第1個開始后總計1個的結(jié)果,就是第二條

{city=石家莊, price=17.34, name=oo賓館, id=004}

(4)游標(biāo)查詢

這種常用于根據(jù)篩選條件之后抽取全部數(shù)據(jù)的場景,scroll API 可以被用來檢索大量的結(jié)果, 甚至所有的結(jié)果 ,注意es的游標(biāo)查詢的是當(dāng)下時刻的數(shù)據(jù)快照,即在游標(biāo)查詢之后的數(shù)據(jù)的變動不會影響游標(biāo)查詢的結(jié)果,默認游標(biāo)查詢根據(jù)_doc字段進行排序,示例代碼如下

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(5.0).lte(100.0));
        searchSourceBuilder.size(2);
        searchRequest.source(searchSourceBuilder);
        Scroll scroll = new Scroll(timeValueMillis(1L));
        searchRequest.scroll(scroll);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHit[] hits = searchResponse.getHits().getHits();
        List<SearchHit> resultSearchHit = new ArrayList<>();
        while (hits != null && hits.length > 0) {
            System.out.println(hits.length);
            System.out.println(scrollId);
            resultSearchHit.addAll(Arrays.asList(hits));
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchScrollRequest.scroll(scroll);
            SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
            scrollId = searchScrollResponse.getScrollId();
            hits = searchScrollResponse.getHits().getHits();
        }
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        restHighLevelClient.close();
//        System.out.println(resultSearchHit);
    }

在搜索條件之后使用searchSourceBuilder.size(2)設(shè)置了每次游標(biāo)只抽取2條數(shù)據(jù),設(shè)置每次游標(biāo)的超時時間是1毫秒timeValueMillis,可以適當(dāng)調(diào)高超時時間防止由于超時還沒查完導(dǎo)致游標(biāo)提前結(jié)束。在執(zhí)行游標(biāo)的時候,第一次使用了客戶端的search方法,從第二次開始使用scroll方法,每開始下一次游標(biāo)的時候都通過查看本次游標(biāo)的結(jié)果是否為空searchResponse.getHits().getHits()來判斷是否還要繼續(xù),把每次游標(biāo)的返回結(jié)果收集起來拿到全部數(shù)據(jù)。代碼最后是釋放游標(biāo)資源,觀察以下打印的結(jié)果

2
DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACkkFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApIRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVBAAAAAAAAKSIWX001eFZqdFNRZE9fM2ZUbUF5S2FlQQAAAAAAACkjFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApJRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVB
2
DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACkkFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApIRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVBAAAAAAAAKSIWX001eFZqdFNRZE9fM2ZUbUF5S2FlQQAAAAAAACkjFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApJRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVB
1
DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACkkFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApIRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVBAAAAAAAAKSIWX001eFZqdFNRZE9fM2ZUbUF5S2FlQQAAAAAAACkjFl9NNXhWanRTUWRPXzNmVG1BeUthZUEAAAAAAAApJRZfTTV4Vmp0U1FkT18zZlRtQXlLYWVB

看到一共查詢了3輪,每輪2條數(shù)據(jù),最后一輪1條,一共5條符合篩選條件的數(shù)據(jù),scrollId在每次執(zhí)行游標(biāo)之后返回的都是相同的,es返回的下一個游標(biāo)id可能是一樣的也可能不一樣,雖然是一樣的但是每次游標(biāo)取到的數(shù)據(jù)是不一樣的


(5)返回指定字段

設(shè)置fetchSource參數(shù),傳入兩個數(shù)組,前一個是包含的字段,后一個是排除的字段

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
                .query(QueryBuilders.termQuery("_id", "001"))
                .fetchSource(new String[] {"city"}, new String[]{});
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            System.out.println(searchHits.getAt(0));
        }

        restHighLevelClient.close();
    }
(6)排序

排序在SearchSourceBuilder對象后構(gòu)建sort參數(shù),通過SortOrder.DESC倒序列和SortOrder.ASC升序

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        SearchRequest searchRequest = new SearchRequest("hotel");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().sort("price", SortOrder.DESC);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        if (searchResponse.status() == RestStatus.OK) {
            SearchHits searchHits = searchResponse.getHits();
            for (SearchHit searchHit : searchHits) {
                System.out.println(searchHit);
            }
        }

        restHighLevelClient.close();
    }

刪除操作

(1)刪除單條文檔

刪除單條文檔使用DeleteRequest對象,傳入index,type,doc_id,客戶端調(diào)用delete方法即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        DeleteRequest deleteByQueryRequest = new DeleteRequest("hotel", "_doc", "005");
        restHighLevelClient.delete(deleteByQueryRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }
(2)批量刪除文檔

批量刪除使用BulkRequest對象,將DeleteRequest傳入BulkRequest中,最后調(diào)用客戶端的bulk提交即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.add(new DeleteRequest("hotel", "_doc", "004"));
        bulkRequest.add(new DeleteRequest("hotel", "_doc", "003"));
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }
(3)根據(jù)條件刪除文檔

使用deleteByQueryRequest對象,設(shè)置setQuery參數(shù)為條件,客戶端調(diào)用deleteByQuery即可

public static void main(String[] args) throws IOException {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHosts));

        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest("hotel");
        deleteByQueryRequest.setQuery(new BoolQueryBuilder().mustNot(new TermQueryBuilder("city", "梧州")));
        restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);

        restHighLevelClient.close();
    }

代碼示例

構(gòu)建單例模式創(chuàng)建一個es工具類

import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ESUtils {
    private RestHighLevelClient client;
    private String index;
    private static ESUtils instance;

    public ESUtils() {
        HttpHost[] httpHosts = {new HttpHost("127.0.0.1", 9200, HttpHost.DEFAULT_SCHEME_NAME)};
        client = new RestHighLevelClient(RestClient.builder(httpHosts));
        index = "hotel";
    }

    public static ESUtils getInstance() {
        if (instance == null) {
            synchronized (ESUtils.class) {
                if (instance == null) {
                    instance = new ESUtils();
                }
            }
        }
        return instance;
    }

    /**
     * 根據(jù)id獲取文檔
     * @param id
     * @return
     */
    public Map<String, Object> getDocByID(String id) {
        Map<String, Object> map = new HashMap<>();
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.termQuery("_id", id));
        searchRequest.source(searchSourceBuilder);
        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                map = searchResponse.getHits().getAt(0).getSourceAsMap();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return map;
    }

    /**
     * 批量upsert
     * @param data
     */
    public void bulkUpsert(List<Map<String, Object>> data) {
        BulkRequest bulkRequest = new BulkRequest();
        for (Map<String, Object> map : data) {
            String id = map.get("id").toString();
            map.remove("id");
            bulkRequest.add(new UpdateRequest(index, "_doc", id).doc(map).upsert(map));
        }
        try {
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulkResponse.hasFailures()) {
                System.out.println("bulk失敗:" + bulkResponse.buildFailureMessage());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * terms和range查詢返回指定字段
     * @return
     */
    public List<String> getQuery() {
        List<String> list = new ArrayList<>();
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
        .query(QueryBuilders.termsQuery("jingjiang", "hangzhou", "zhangzhou"))
        .query(QueryBuilders.rangeQuery("price").gte(3.12).lte(3.13))
        .fetchSource(new String[] {"name"}, new String[] {});
        searchRequest.source(searchSourceBuilder);
        try {
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            if (searchResponse.status() == RestStatus.OK) {
                searchResponse.getHits().forEach(s -> list.add(s.getSourceAsMap().get("name").toString()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return list;
    }

    public void close() {
        if (client != null) {
            try {
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        Map<String, Object> map = ESUtils.getInstance().getDocByID("011");
        System.out.println(map);
        List<Map<String, Object>> list = new ArrayList<>();
        list.add(new HashMap<String, Object>() {{
            put("id", "001");
            put("degree", "low");
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "002");
            put("degree", "median");
        }});
        list.add(new HashMap<String, Object>() {{
            put("id", "003");
            put("degree", "high");
        }});
        List<String> list1 = ESUtils.getInstance().getQuery();
        System.out.println(list1);
        ESUtils.getInstance().bulkUpsert(list);
        ESUtils.getInstance().close();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容