Elasticsearch——Document

Document:文檔

  • Elasticsearch是面向文檔的,文檔是所有可搜索數(shù)據(jù)的最小基礎信息單元。
  • 一個Document就像數(shù)據(jù)庫中的一行記錄,文檔會被序列化成JSON格式,保持在Elasticsearch中,多個Document存儲于一個索引(Index)中。文檔以JSON(Javascript Object Notation)格式來表示,而JSON是一個到處存在的互聯(lián)網(wǎng)數(shù)據(jù)交互格式。
  • 每一個文檔都有一個UniqueID

文檔的元數(shù)據(jù)

  • 元數(shù)據(jù):用于標注穩(wěn)定的相關信息
  • _index:文檔所屬的索引名
  • _type:文檔所屬的類型名
  • _id:文檔的主鍵,在寫入的時候,可以指定該Doc的ID值,如果不指定,則系統(tǒng)自動生成一個唯一的UUID值。
  • _source:文檔的原始Json數(shù)據(jù)
  • _version:文檔的版本信息,Elasticsearch通過使用version來保證對文檔的變更能以正確的順序執(zhí)行,避免亂序造成的數(shù)據(jù)丟失。
  • _score:相關性打分。
  • _seq_no:嚴格遞增的順序號,每個文檔一個,Shard級別嚴格遞增,保證后寫入的Doc的_seq_no大于先寫入的Doc的_seq_no。
  • primary_term:primary_term也和_seq_no一樣是一個整數(shù),每當Primary Shard發(fā)生重新分配時,比如重啟,Primary選舉等,_primary_term會遞增1。
  • found:查詢的ID正確那么ture, 如果 Id 不正確,就查不到數(shù)據(jù),found字段就是false。

生成文檔id

手動生成

場景:從數(shù)據(jù)庫或其他系統(tǒng)導入時,本身有唯一主鍵。
用法:PUT /index/_doc/id

PUT /test_index/_doc/1

{
    "test_field": "test"
}

自動生成

用法:PUT /index/_doc

PUT /test_index/_doc

{
    "test_field": "test"
}

自動id特點:長度為20個字符,URL安全,base64編碼,GUID,分布式生成不沖突。

_source字段

含義:插入數(shù)據(jù)時的所有字段和值,在get獲取數(shù)據(jù)時,在_source字段中原樣返回。

定制返回字段:GET /index/_doc/id?_source_includes=field1,field2

文檔的替換與刪除

全量替換

執(zhí)行兩次,返回結果中版本號(_version)在不斷上升,此過程為全量替換。

PUT /test_index/_doc/1

{
    "test_field": "test"
}

實質:舊文檔的內容不會立即刪除,只是標記為delete,適當?shù)臅r機,集群會將這些文檔刪除。

強制創(chuàng)建

為防止覆蓋原有數(shù)據(jù),我們在新增時,設置為強制創(chuàng)建,就不會覆蓋原有文檔。
語法:PUT /index/_doc/id/_create

PUT /test_index/_doc/1/_create

{
    "test_field": "test"
}

刪除

語法:DELETE /index/_doc/id
實質:舊文檔的內容不會立即刪除,只是標記為deleted,適當?shù)臅r機,集群會將這些文檔刪除,lazy delete。

局部替換 partial update

使用PUT /index/type/id為文檔全量替換,需要將文檔所有數(shù)據(jù)提交。
partial update局部替換則只修改變動字段。

用法:

PUT /index/type/id/_update

{
    "doc": {
        "field": "value"
    }
}
內部原理

內部與全量替換是一樣的,舊文檔標記為刪除,新建一個文檔。

步驟:

  • 1、es獲取內部舊文檔。
  • 2、將傳來的文檔field更新到舊數(shù)據(jù)(內存中)
  • 3、將舊文檔標記為delete
  • 4、創(chuàng)建新文檔。

優(yōu)點:

  • 大大減少網(wǎng)絡傳輸次數(shù)和流量,提升性能。
  • 減少并發(fā)沖突發(fā)生的概率。

使用腳本更新

es可以內置腳本執(zhí)行復雜操作,例如painless腳本。
注意:groovy腳本在es6以后就不支持了,原因是耗內存,不安全遠程注入漏洞。

內置腳本

插入數(shù)據(jù)

PUT /test_index/_doc/6

{
    "num": 0
}

執(zhí)行腳本操作:

PUT /test_index/_doc/6/_update

{
    "script": "ctx._source.num+=1"
}

外部腳本

Painless是內置的,腳本內容可以通過多種途徑傳給es,包括rest接口,或者放到config/scripts目錄等,默認開啟。

注意:腳本性能底下,容易發(fā)生注入。

官方文檔:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html

elasticsearch的并發(fā)問題

如同秒殺,多線程情況下,es同樣會出現(xiàn)并發(fā)沖突問題。

實現(xiàn)基于_version的版本控制

es對于文檔的增刪改都是基于_version版本號的。

  • 1、多次新增,返回版本號遞增
PUT /test_index/_doc/3
{
  "test_field": "test"
}
  • 2、刪除此文檔
DELETE /test_index/_doc/3

返回

{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "3",
  "_version" : 4,
  "result" : "deleted",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}
  • 3、刪除后新增


可以看到版本號依然遞增,驗證延遲刪除策略。

如果刪除一條數(shù)據(jù),es立馬刪除的話,所有分片和副本都要立馬刪除,對es集群來說壓力太大。

es內部主從同步并發(fā)控制

es內部主從同步時,是多線程異步,樂觀鎖機制。

java api實現(xiàn)文檔管理

es技術特點

  • 1、es技術比較特殊,不像其他分布式,es代碼層面很好寫,難的是概念的理解。
  • 2、es最重要的是它的rest api,跨語言的,在真實生產中,查詢數(shù)據(jù)、分析數(shù)據(jù),使用rest api更方便。

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.3/java-rest-overview.html

java客戶端簡單獲取數(shù)據(jù)

引入maven依賴
<dependencies>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.3.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>7.3.0</version>
    </dependency>
</dependencies>
步驟
  • 1、獲取連接的客戶端
  • 2、構建請求
  • 3、執(zhí)行
  • 4、獲取結果
public class TestDemo {

    public static void main(String[] args) throws IOException {
        //1、獲取連接的客戶端
        RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
                new HttpHost("localhost",9200,"http")
        ));

        //2、構建請求
        GetRequest request = new GetRequest("book","1");

        //3、執(zhí)行
        GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);

        //4、獲取結果
        System.out.println(getResponse.getId());
        System.out.println(getResponse.getVersion());
        System.out.println(getResponse.getSourceAsString());
    }
}

結合spring-boot-starter-test測試文檔查詢

引入maven依賴
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.10.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.3.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>7.3.0</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.74</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
配置application.properties
spring.application.name=service-search

#多個節(jié)點用逗號隔開
elasticsearch.hostlist=127.0.0.1:9200
創(chuàng)建配置類ElasticsearchConfig
@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.hostlist}")
    private String hostlist;

    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient(){
        String[] split = hostlist.split(",");
        HttpHost[] httpHost = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String[] item = split[i].split(":");
            httpHost[i] = new HttpHost(item[0],Integer.parseInt(item[1]),"http");
        }
        return new RestHighLevelClient(RestClient.builder(httpHost));
    }
}

編寫測試代碼

簡單同步查詢
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestDocument {

    @Autowired
    private RestHighLevelClient client;

    @Test
    public void testGet() throws IOException {
        //1、構建請求
        GetRequest request = new GetRequest("book","1");

        //===========可選參數(shù),可以設置很多================
        String[] includes = new String[]{"user","message"};//想要查詢的字段
        String[] excludes = Strings.EMPTY_ARRAY;//不想要的字段,暫時設置空數(shù)組
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes,excludes);
        request.fetchSourceContext(fetchSourceContext);

        //2、執(zhí)行
        //同步查詢
        GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);

        //3、獲取結果
        if(getResponse.isExists()){
            System.out.println(getResponse.getId());
            System.out.println(getResponse.getVersion());
            System.out.println(getResponse.getSourceAsString());//以String獲取數(shù)據(jù)
            System.out.println(getResponse.getSourceAsBytes());//以bytes獲取數(shù)據(jù)
            System.out.println(getResponse.getSourceAsMap());//以Map獲取數(shù)據(jù)
        }else {
            log.info("沒有獲取到結果");
        }
    }
}
簡單異步查詢
@SpringBootTest(classes = SearchApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
public class TestDocument {

    @Autowired
    private RestHighLevelClient client;

    @Test
    public void testGet() throws IOException {
        //1、構建請求
        GetRequest request = new GetRequest("book","1");

        //===========可選參數(shù),可以設置很多================
        String[] includes = new String[]{"user","message"};//想要查詢的字段
        String[] excludes = Strings.EMPTY_ARRAY;//不想要的字段,暫時設置空數(shù)組
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true,includes,excludes);
        request.fetchSourceContext(fetchSourceContext);

        //2、執(zhí)行
        //異步查詢
        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
            //成功時的操作
            @Override
            public void onResponse(GetResponse getResponse) {
                System.out.println(getResponse.getId());
                System.out.println(getResponse.getVersion());
                System.out.println(getResponse.getSourceAsString());//以String獲取數(shù)據(jù)
            }
            //失敗時的操作
            @Override
            public void onFailure(Exception e) {
                log.error("error",e);
            }
        };
        client.getAsync(request, RequestOptions.DEFAULT,listener);

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

新增

4種構建文檔數(shù)據(jù)的方法
  • 1、json字符串方式
IndexRequest request = new IndexRequest("book");
request.id("2");

String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
request.source(json, XContentType.JSON);
  • 2、文檔源作為Map提供,可自動轉換為JSON格式。
IndexRequest request = new IndexRequest("book");
request.id("2");

Map<String,Object> map = new HashMap<>();
map.put("user","tom");
map.put("postDate","2020-10-25");
map.put("message","trying out es");
request.source(map);
  • 3、文檔源作為XContentBuilder對象提供,Elasticsearch內置輔助生成JSON內容。
IndexRequest request = new IndexRequest("book");
request.id("2");

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user","tomas");
    builder.timeField("postDate","2020-10-25");
    builder.field("message","trying out es");
}
builder.endObject();
request.source(builder);
  • 4、文檔源作為Object鍵值對提供,轉換為JSON格式。
IndexRequest request = new IndexRequest("book");
request.id("2");

request.source("user","tomas",
        "postDate","2020-10-25",
        "message","trying out es");
同步新增
@Test
public void testAdd() throws IOException {
    //1、構建請求
    IndexRequest request = new IndexRequest("book");
    request.id("3");
    //===============構建文檔數(shù)據(jù)4種方法=======================
    //方法1
    String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
    request.source(json, XContentType.JSON);

    //方法2
    /*Map<String,Object> map = new HashMap<>();
    map.put("user","tom");
    map.put("postDate","2020-10-25");
    map.put("message","trying out es");
    request.source(map);

    //方法3
    XContentBuilder builder = XContentFactory.jsonBuilder();
    builder.startObject();
    {
        builder.field("user","tomas");
        builder.timeField("postDate","2020-10-25");
        builder.field("message","trying out es");
    }
    builder.endObject();
    request.source(builder);

    //方法4
    request.source("user","tomas",
            "postDate","2020-10-25",
            "message","trying out es");*/

    //=====================可選參數(shù)========================
    //設置超時時間
    request.timeout("1s");
    request.timeout(TimeValue.timeValueSeconds(1));

    //手動維護版本號
    request.version(2);
    request.versionType(VersionType.EXTERNAL);

    //2、執(zhí)行
    //同步操作
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);

    //3、獲取結果
    log.info("index: {}",response.getIndex());
    log.info("id: {}",response.getId());
    if(response.getResult() == DocWriteResponse.Result.CREATED){
        log.info("新增成功,result: {}",response.getResult());
    }else if(response.getResult() == DocWriteResponse.Result.UPDATED){
        log.info("更新成功,result: {}",response.getResult());
    }else {
        log.info("操作失敗,result: {}",response.getResult());
    }

    ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
    if(shardInfo.getTotal() != shardInfo.getSuccessful()){
        log.info("處理成功的分片數(shù)少于總分片");
    }

    if(shardInfo.getFailed() > 0){
        for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
            String reason = failure.reason();//每一個錯誤的原因
            log.info(reason);
        }
    }
}
異步新增
@Test
public void testAdd() throws IOException {
    //1、構建請求
    IndexRequest request = new IndexRequest("book");
    request.id("3");
    //===============構建文檔數(shù)據(jù)4種方法=======================
    //方法1
    String json = "{\"user\":\"tomas\",\"postDate\":\"2020-10-25\",\"message\":\"trying out es\"}";
    request.source(json, XContentType.JSON);

    //方法2
    /*Map<String,Object> map = new HashMap<>();
    map.put("user","tom");
    map.put("postDate","2020-10-25");
    map.put("message","trying out es");
    request.source(map);

    //方法3
    XContentBuilder builder = XContentFactory.jsonBuilder();
    builder.startObject();
    {
        builder.field("user","tomas");
        builder.timeField("postDate","2020-10-25");
        builder.field("message","trying out es");
    }
    builder.endObject();
    request.source(builder);

    //方法4
    request.source("user","tomas",
            "postDate","2020-10-25",
            "message","trying out es");*/

    //=====================可選參數(shù)========================
    //設置超時時間
    request.timeout("1s");
    request.timeout(TimeValue.timeValueSeconds(1));

    //手動維護版本號
    request.version(2);
    request.versionType(VersionType.EXTERNAL);

    //2、執(zhí)行

    //異步操作
    ActionListener<IndexResponse> actionListener = new ActionListener<IndexResponse>() {
        //成功時進行的操作
        @Override
        public void onResponse(IndexResponse indexResponse) {
            log.info("index: {}",response.getIndex());
            log.info("id: {}",response.getId());
            if(response.getResult() == DocWriteResponse.Result.CREATED){
                log.info("新增成功,result: {}",response.getResult());
            }else if(response.getResult() == DocWriteResponse.Result.UPDATED){
                log.info("更新成功,result: {}",response.getResult());
            }else {
                log.info("操作失敗,result: {}",response.getResult());
            }
        }

        //失敗時進行的操作
        @Override
        public void onFailure(Exception e) {
            log.error("error",e);
        }
    };
    client.indexAsync(request,RequestOptions.DEFAULT,actionListener);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

文檔修改

全量替換:文檔同一個Id多新增幾次,就叫全量替換。

同步局部替換
@Test
public void testUpdate() throws IOException {
    //1、創(chuàng)建請求
    UpdateRequest request = new UpdateRequest("book","3");
    Map<String,Object> map = new HashMap<>();
    map.put("user","tomas Lee");
    request.doc(map);

    //=========可選參數(shù)==========
    request.timeout("1s");
    request.retryOnConflict(3);//重試次數(shù)

    //2、執(zhí)行
    //同步操作
    UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

    //3、獲取結果
    log.info("index: {}",updateResponse.getIndex());
    log.info("id: {}",updateResponse.getId());
    if(updateResponse.getResult() == DocWriteResponse.Result.CREATED){
        log.info("新增成功,result: {}",updateResponse.getResult());
    }else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
        log.info("更新成功,result: {}",updateResponse.getResult());
    }else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
        log.info("刪除成功,result: {}",updateResponse.getResult());
    }else if(updateResponse.getResult() == DocWriteResponse.Result.NOOP){
        log.info("NOOP 不進行操作,result: {}",updateResponse.getResult());
    }
}
異步局部替換
@Test
public void testUpdate() throws IOException {
    //1、創(chuàng)建請求
    UpdateRequest request = new UpdateRequest("book","3");
    Map<String,Object> map = new HashMap<>();
    map.put("user","tomas Lee");
    request.doc(map);

    //=========可選參數(shù)==========
    request.timeout("1s");
    request.retryOnConflict(3);//重試次數(shù)

    //2、執(zhí)行
    //異步操作
    ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
        @Override
        public void onResponse(UpdateResponse updateResponse) {
            log.info("index: {}",updateResponse.getIndex());
            log.info("id: {}",updateResponse.getId());
            if(updateResponse.getResult() == DocWriteResponse.Result.CREATED){
                log.info("新增成功,result: {}",updateResponse.getResult());
            }else if(updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
                log.info("更新成功,result: {}",updateResponse.getResult());
            }else if(updateResponse.getResult() == DocWriteResponse.Result.DELETED){
                log.info("刪除成功,result: {}",updateResponse.getResult());
            }else if(updateResponse.getResult() == DocWriteResponse.Result.NOOP){
                log.info("NOOP 不進行操作,result: {}",updateResponse.getResult());
            }
        }

        @Override
        public void onFailure(Exception e) {
            log.info("error",e);
        }
    };
    client.updateAsync(request, RequestOptions.DEFAULT,listener);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

同步文檔刪除

@Test
public void testDelete() throws IOException {
    //1、創(chuàng)建請求
    DeleteRequest request = new DeleteRequest("book","3");

    //2、執(zhí)行
    DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
    
    //3、獲取結果
    log.info("index: {}",deleteResponse.getIndex());
    log.info("id: {}",deleteResponse.getId());
    log.info("result: {}",deleteResponse.getResult());
}

異步文檔刪除

@Test
public void testDelete() throws IOException {
    //1、創(chuàng)建請求
    DeleteRequest request = new DeleteRequest("book","3");

    //2、執(zhí)行
    ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
        @Override
        public void onResponse(DeleteResponse deleteResponse) {
            log.info("index: {}",deleteResponse.getIndex());
            log.info("id: {}",deleteResponse.getId());
            log.info("result: {}",deleteResponse.getResult());
        }

        @Override
        public void onFailure(Exception e) {
            log.error("error",e);
        }
    };
    client.deleteAsync(request, RequestOptions.DEFAULT,listener);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

批量增刪改bulk

@Test
public void testBulk() throws IOException {
    //1、創(chuàng)建請求
    BulkRequest bulkRequest = new BulkRequest();
    bulkRequest.add(new IndexRequest("book").id("2").source(XContentType.JSON,"field","1"));
    bulkRequest.add(new IndexRequest("book").id("3").source(XContentType.JSON,"field","2"));
    bulkRequest.add(new UpdateRequest("book","3").doc(XContentType.JSON,"field","3"));
    bulkRequest.add(new DeleteRequest("book").id("2"));

    //2、執(zhí)行
    BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    //3、獲取結果
    for (BulkItemResponse bulkItemResponse : bulkResponse) {
        DocWriteResponse response = bulkItemResponse.getResponse();
        log.info("index: {}",response.getIndex());
        log.info("id: {}",response.getId());
        log.info("result: {}",response.getResult());

        if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.INDEX)){
            log.info("index: {}",response.getIndex());
            log.info("id: {}",response.getId());
            log.info("INDEX result: {}",response.getResult());
        }else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.CREATE)){
            log.info("index: {}",response.getIndex());
            log.info("id: {}",response.getId());
            log.info("CREATE result: {}",response.getResult());
        }else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.UPDATE)){
            log.info("index: {}",response.getIndex());
            log.info("id: {}",response.getId());
            log.info("UPDATE result: {}",response.getResult());
        }else if(bulkItemResponse.getOpType().equals(DocWriteRequest.OpType.DELETE)){
            log.info("index: {}",response.getIndex());
            log.info("id: {}",response.getId());
            log.info("DELETE result: {}",response.getResult());
        }
    }
}

bulk api奇特的json格式

bulk api的語法

{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n

[{
  "action": {
 
  },
  "data": {

  }
}]
  • 1、bulk中的每個操作都可能要轉發(fā)到不同的node的shard去執(zhí)行
  • 2、如果采用比較良好的json數(shù)組格式

允許任意的換行,整個可讀性非常棒,讀起來很爽,es拿到那種標準格式的json串以后,要按照下述流程去進行處理:

  • 1)將json數(shù)組解析為JSONArray對象,這個時候,整個數(shù)據(jù),就會在內存中出現(xiàn)一份一模一樣的拷貝,一份數(shù)據(jù)是json文本,一份數(shù)據(jù)是JSONArray對象
  • 2)解析json數(shù)組里的每個json,對每個請求中的document進行路由
  • 3)為路由到同一個shard上的多個請求,創(chuàng)建一個請求數(shù)組
  • 4)將這個請求數(shù)組序列化
  • 5)將序列化后的請求數(shù)組發(fā)送到對應的節(jié)點上去

3、耗費更多內存,更多的jvm gc開銷
我們之前提到過bulk size最佳大小的那個問題,一般建議說在幾千條那樣,然后大小在10MB左右,所以說,可怕的事情來了。假設說現(xiàn)在100個bulk請求發(fā)送到了一個節(jié)點上去,然后每個請求是10MB,100個請求,就是1000MB = 1GB,然后每個請求的json都copy一份為jsonarray對象,此時內存中的占用就會翻倍,就會占用2GB的內存,甚至還不止。因為弄成jsonarray之后,還可能會多搞一些其他的數(shù)據(jù)結構,2GB+的內存占用。

占用更多的內存可能就會積壓其他請求的內存使用量,比如說最重要的搜索請求,分析請求,等等,此時就可能會導致其他請求的性能急速下降。

另外的話,占用內存更多,就會導致java虛擬機的垃圾回收次數(shù)更多,更頻繁,每次要回收的垃圾對象更多,耗費的時間更多,導致es的java虛擬機停止工作線程的時間更多

現(xiàn)在的奇特格式

{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
  • 1)不用將其轉換為json對象,不會出現(xiàn)內存中的相同數(shù)據(jù)的拷貝,直接按照換行符切割json。
  • 2)對每兩個一組的json,讀取meta,進行document路由。
  • 3)直接將對應的json發(fā)送到node上去。

5、最大的優(yōu)勢在于,不需要將json數(shù)組解析為一個JSONArray對象,形成一份大數(shù)據(jù)的拷貝,浪費內存空間,盡可能地保證性能。

參考:
https://www.cnblogs.com/qdhxhz/p/11448451.html

https://www.cnblogs.com/Onlywjy/p/12194626.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容