Document APIs之 Index API

文檔API Document APIs

這個部分主要描述了以下的CRUD API

一 Single document APIs

1 Index API

index API 允許我們添加某種類型的JSON文檔到特定的index ,并使之可搜索.

生成JSON文檔

生成JSON文檔的方式如下:

  • 手動使用native byte[] or as a String
  • 使用一個可以自動轉(zhuǎn)換為對應(yīng)JSON 的Map
  • 使用第三方庫如 Jackson序列化 beans
  • 使用內(nèi)置的 XContentFactory.jsonBuilder()

實(shí)際上, 每種方法都是轉(zhuǎn)換成byte[] (so a String is converted to a byte[]). 所以如果一個對象已經(jīng)是byte[] 就可直接使用. jsonBuilder是一個高度優(yōu)化的可以直接創(chuàng)建一個byte[]的JSON 生成器.

各種生成JSON文檔方法的具體說明

Manually

注意要根據(jù)日期格式對日期進(jìn)行編碼

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

Using Map

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

序列化Beans

可以使用 Jackson 進(jìn)行序列化.
要先添加Jackson Databind 到應(yīng)用項目.然后使用ObjectMapper進(jìn)行序列化

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);

Use Elasticsearchhelpers

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意 : 你也可以用方法startArray(String)endArray()添加數(shù)組. field 方法可以接受很多類型的對象. 你可以直接傳入 數(shù)據(jù), 日期 甚至其他XContentBuilder 對象.

如果你想查看生產(chǎn)的JSON內(nèi)容,你可以使用方法string()

String json = builder.string();

應(yīng)用實(shí)例

以下演示 在index為twitter ;類型為 tweet; id為1 加入一個JSON文檔

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

你可以以一個json string的形式 加入json文檔,此時你不需要給出id

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json)
        .get();

IndexResponse 對象會反饋一個報告

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

更多有關(guān)index的操作,請查看REST index 文檔.

操作線程

當(dāng)操作是在同一個節(jié)點(diǎn)執(zhí)行時,index API 允許我們設(shè)置操作的執(zhí)行方式為線程模式.

另一個方式是可以在不同的線程執(zhí)行這次操作,或者在調(diào)用線程執(zhí)行, 默認(rèn)情況下,
operationThreaded設(shè)置為true 表示操作是在不同的線程執(zhí)行的.

2 Get API

get API 允許我們從index下根據(jù)id獲取某個類型的JSON 文檔. 以下實(shí)例演示了index為 twitter, type 為 tweet, id為1 獲取數(shù)據(jù):

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多 get操作,請查看 REST get 文檔.

操作線程

當(dāng)操作是在同一個節(jié)點(diǎn)執(zhí)行時,get API 允許我們設(shè)置操作的執(zhí)行方式為線程模式.

另一個方式是可以在不同的線程執(zhí)行這次操作,或者在調(diào)用線程執(zhí)行, 默認(rèn)情況下,
operationThreaded設(shè)置為true 表示操作是在不同的線程執(zhí)行的. 以下是設(shè)置為false的例子

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

3 Delete API

Delete API 允許我們從index下根據(jù)id刪除某個類型的JSON 文檔. 以下實(shí)例演示了index為 twitter, type 為 tweet, id為1 刪除數(shù)據(jù):

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多 get操作,請查看 RESTdelete API文檔.

Delete By Query API

刪除一個查詢出來的數(shù)據(jù)

BulkIndexByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male"))  //   query
.source("persons")  //  index
.get(); //  execute the operation
long deleted = response.getDeleted(); //  number of deleted documents

也可以持續(xù)操作, 如果你想異步執(zhí)行這些操作,你可以調(diào)用execute 而非get 兵器提供如下監(jiān)聽

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //query
 .source("persons") //index
.execute(new ActionListener<BulkIndexByScrollResponse>() { //listener
 @Override
        public void onResponse(BulkIndexByScrollResponse response) {
             long deleted = response.getDeleted(); //number of deleted documents
       }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

創(chuàng)建UpdateRequest 并發(fā)送到 client

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

或者 使用prepareUpdate()方法

client.prepareUpdate("ttl", "doc", "1") 
    .setScript(new Script("ctx._source.gender = \"male\"" //  1, ScriptService.ScriptType.INLINE, null, null))
    .get();
client.prepareUpdate("ttl", "doc", "1")
    .setDoc(jsonBuilder()   //2
         .startObject() 
                .field("gender", "male")
         .endObject()) 
   .get();

說明1:你自己的script. 也可以是本地存儲的script文件名稱. 這種情況下你需要使用, you’ll ScriptService.ScriptType.FILE

說明2:被增加的文檔

注意: 你不能同時提供scriptdoc

根據(jù)script 修改

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

運(yùn)行傳入部分會被添加進(jìn)已知文檔的文檔

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

支持upsert如果文檔不存在 ,會使用upsert元素去增加新文檔

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())

         .upsert(indexRequest); //1

client.update(updateRequest).get();

說明1:
//如果文檔不存在, the one in indexRequest will be added. 文檔內(nèi)容如下:

{
    "name" : "Joe Smith",
    "gender": "male"
}

//如果index/type/1 文檔已存在, 該操作后文檔內(nèi)容如下:

{ "name" : "Joe Dalton",
 "gender": "male" 
}

二 Multi-document APIs

multi get API可以根據(jù)index, type 和 id獲取一系列的文檔數(shù)據(jù)

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")  // get by a single id   
    .add("twitter", "tweet", "2", "3", "4")//or by a list of ids for the same index / type
    .add("another", "type", "foo") //you can also get from another index
    .get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
//iterate over the result set
GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {   // you can check if the document exists        
           String json = response.getSourceAsString(); //access to the _source field
    }
}

更多有關(guān)multi get 操作,請參照REST multi get 文檔.

1 Bulk API

使用bulk API可以在一個請求中索引和刪除幾個文檔 ,使用實(shí)例如下:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

Using Bulk Processor

BulkProcessor提供一個基于請求數(shù)量和大小或者某個特定時間之后的自動刷新批處理操作接口.

使用如下:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client   1,new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                       BulkRequest request) { ... } //2
          @Override
            public void afterBulk(long executionId,
                        BulkRequest request,
                        BulkResponse response) { ... } //3
         @Override
            public void afterBulk(long executionId,
                         BulkRequest request,
                         Throwable failure) { ... }//4

    })
        .setBulkActions(10000)//5
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//6
        .setFlushInterval(TimeValue.timeValueSeconds(5))//7
        .setConcurrentRequests(1) //8
        .setBackoffPolicy(  
           BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//9
        .build();

說明1:Add your elasticsearch client
說明2:bulk執(zhí)行前會調(diào)用這個方法. 例如:你可以通過這個方法使用request.numberOfActions()查看 numberOfActions
說明3:bulk執(zhí)行后會調(diào)用這個方法.例如你可以通過這個方法結(jié)合response.hasFailures() 查看失敗的請求
說明4:當(dāng)bulk執(zhí)行失敗或產(chǎn)生異常的時候會去調(diào)用這個方法
說明5:表示每個bulk要執(zhí)行1000條請求
說明6:表示每到5mb大小的時候執(zhí)行bulk
說明7:不論請求的數(shù)量如何,我們每5s刷新bulk
說明8:設(shè)置當(dāng)前并發(fā)請求數(shù)量,0表示運(yùn)行執(zhí)行單個請求,1表示當(dāng)在累計一個新bulk請求時,一個并發(fā)請求允許被執(zhí)行(不知道在講什么)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
說明9:Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an EsRejectedExecutionException which indicates that there were too little compute resources available for processing the request. To disable backoff, pass BackoffPolicy.noBackoff().

默認(rèn)·BulkProcessor·參數(shù)設(shè)置:

sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.

Add requests

創(chuàng)建了BulkProcessor后可以往里面添加請求

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

Closing the Bulk Processor

當(dāng)所有文檔都被加載入了BulkProcessor可以使用awaitClose或close方法將它關(guān)閉

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

or

bulkProcessor.close();

兩個方法都 flush任何剩余的文檔,并且使所有通過flushInterval規(guī)定的flushes失效,如果concurrent requests啟動了,awaitClose 方法會等待特定時間直至所有bulk請求完成,然后返回true,如果在這些請求執(zhí)行完成前,設(shè)定的時間已到,那么則返回false.
close不等待剩余請求執(zhí)行完畢,立馬退出.

Using Bulk Processor in tests

使用elasticsearch做測試并且使用BulkProcessor
往dataset添加數(shù)據(jù)時, 你最好把concurrent requests設(shè)置為0
于是bulk的flush operation將會用異步的方式執(zhí)行:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評論 19 139
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,058評論 25 709
  • 一、位運(yùn)算符 位運(yùn)算符就是把數(shù)值的二進(jìn)制里面的位上面的0和1來比較或運(yùn)算。0為false,1為true。 &(與)...
    凱哥學(xué)堂閱讀 288評論 0 2
  • 《好好說話》 說話也是一門學(xué)問,需要我們用心琢磨。 01.在對方時間很緊張的情況下,一定要學(xué)會在短時間內(nèi)把話說清楚...
    環(huán)盈閱讀 165評論 0 0
  • 20161206童磊感恩日記 【原來人的性格是可以改變的】 今天早上我一下火車就直接地鐵前往通州,參加12月卓越父...
    童磊心理閱讀 561評論 0 0

友情鏈接更多精彩內(nèi)容