文檔API Document APIs
這個部分主要描述了以下的CRUD API
一 Single document APIs
1 Index API
index API 允許我們添加某種類型的JSON文檔到特定的index ,并使之可搜索.
生成JSON文檔
生成JSON文檔的方式如下:
- 手動使用native
byte[]or as aString - 使用一個可以自動轉(zhuǎn)換為對應(yīng)JSON 的
Map - 使用第三方庫如 Jackson序列化 beans
- 使用內(nèi)置的
XContentFactory.jsonBuilder()
實(shí)際上, 每種方法都是轉(zhuǎn)換成byte[] (so a String is converted to a byte[]). 所以如果一個對象已經(jīng)是byte[] 就可直接使用. jsonBuilder是一個高度優(yōu)化的可以直接創(chuàng)建一個byte[]的JSON 生成器.
各種生成JSON文檔方法的具體說明
Manually
注意要根據(jù)日期格式對日期進(jìn)行編碼
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
Using Map
Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
序列化Beans
可以使用 Jackson 進(jìn)行序列化.
要先添加Jackson Databind 到應(yīng)用項目.然后使用ObjectMapper進(jìn)行序列化
import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
Use Elasticsearchhelpers
import static org.elasticsearch.common.xcontent.XContentFactory.*;
XContentBuilder builder = jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
注意 : 你也可以用方法startArray(String)和 endArray()添加數(shù)組. field 方法可以接受很多類型的對象. 你可以直接傳入 數(shù)據(jù), 日期 甚至其他XContentBuilder 對象.
如果你想查看生產(chǎn)的JSON內(nèi)容,你可以使用方法string()
String json = builder.string();
應(yīng)用實(shí)例
以下演示 在index為twitter ;類型為 tweet; id為1 加入一個JSON文檔
import static org.elasticsearch.common.xcontent.XContentFactory.*;
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
.get();
你可以以一個json string的形式 加入json文檔,此時你不需要給出id
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse response = client.prepareIndex("twitter", "tweet")
.setSource(json)
.get();
IndexResponse 對象會反饋一個報告
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
更多有關(guān)index的操作,請查看REST index 文檔.
操作線程
當(dāng)操作是在同一個節(jié)點(diǎn)執(zhí)行時,index API 允許我們設(shè)置操作的執(zhí)行方式為線程模式.
另一個方式是可以在不同的線程執(zhí)行這次操作,或者在調(diào)用線程執(zhí)行, 默認(rèn)情況下,
operationThreaded設(shè)置為true 表示操作是在不同的線程執(zhí)行的.
2 Get API
get API 允許我們從index下根據(jù)id獲取某個類型的JSON 文檔. 以下實(shí)例演示了index為 twitter, type 為 tweet, id為1 獲取數(shù)據(jù):
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
更多 get操作,請查看 REST get 文檔.
操作線程
當(dāng)操作是在同一個節(jié)點(diǎn)執(zhí)行時,get API 允許我們設(shè)置操作的執(zhí)行方式為線程模式.
另一個方式是可以在不同的線程執(zhí)行這次操作,或者在調(diào)用線程執(zhí)行, 默認(rèn)情況下,
operationThreaded設(shè)置為true 表示操作是在不同的線程執(zhí)行的. 以下是設(shè)置為false的例子
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
3 Delete API
Delete API 允許我們從index下根據(jù)id刪除某個類型的JSON 文檔. 以下實(shí)例演示了index為 twitter, type 為 tweet, id為1 刪除數(shù)據(jù):
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
更多 get操作,請查看 RESTdelete API文檔.
Delete By Query API
刪除一個查詢出來的數(shù)據(jù)
BulkIndexByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) // query
.source("persons") // index
.get(); // execute the operation
long deleted = response.getDeleted(); // number of deleted documents
也可以持續(xù)操作, 如果你想異步執(zhí)行這些操作,你可以調(diào)用execute 而非get 兵器提供如下監(jiān)聽
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //query
.source("persons") //index
.execute(new ActionListener<BulkIndexByScrollResponse>() { //listener
@Override
public void onResponse(BulkIndexByScrollResponse response) {
long deleted = response.getDeleted(); //number of deleted documents
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});
Update API
創(chuàng)建UpdateRequest 并發(fā)送到 client
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
或者 使用prepareUpdate()方法
client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" // 1, ScriptService.ScriptType.INLINE, null, null))
.get();
client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //2
.startObject()
.field("gender", "male")
.endObject())
.get();
說明1:你自己的script. 也可以是本地存儲的script文件名稱. 這種情況下你需要使用, you’ll ScriptService.ScriptType.FILE
說明2:被增加的文檔
注意: 你不能同時提供script和doc
根據(jù)script 修改
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();
Update by merging documents
運(yùn)行傳入部分會被添加進(jìn)已知文檔的文檔
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();
Upsert
支持upsert如果文檔不存在 ,會使用upsert元素去增加新文檔
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //1
client.update(updateRequest).get();
說明1:
//如果文檔不存在, the one in indexRequest will be added. 文檔內(nèi)容如下:
{
"name" : "Joe Smith",
"gender": "male"
}
//如果index/type/1 文檔已存在, 該操作后文檔內(nèi)容如下:
{ "name" : "Joe Dalton",
"gender": "male"
}
二 Multi-document APIs
multi get API可以根據(jù)index, type 和 id獲取一系列的文檔數(shù)據(jù)
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") // get by a single id
.add("twitter", "tweet", "2", "3", "4")//or by a list of ids for the same index / type
.add("another", "type", "foo") //you can also get from another index
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
//iterate over the result set
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { // you can check if the document exists
String json = response.getSourceAsString(); //access to the _source field
}
}
更多有關(guān)multi get 操作,請參照REST multi get 文檔.
1 Bulk API
使用bulk API可以在一個請求中索引和刪除幾個文檔 ,使用實(shí)例如下:
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
Using Bulk Processor
BulkProcessor提供一個基于請求數(shù)量和大小或者某個特定時間之后的自動刷新批處理操作接口.
使用如下:
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client 1,new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //2
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //3
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... }//4
})
.setBulkActions(10000)//5
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//6
.setFlushInterval(TimeValue.timeValueSeconds(5))//7
.setConcurrentRequests(1) //8
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//9
.build();
說明1:Add your elasticsearch client
說明2:bulk執(zhí)行前會調(diào)用這個方法. 例如:你可以通過這個方法使用request.numberOfActions()查看 numberOfActions
說明3:bulk執(zhí)行后會調(diào)用這個方法.例如你可以通過這個方法結(jié)合response.hasFailures() 查看失敗的請求
說明4:當(dāng)bulk執(zhí)行失敗或產(chǎn)生異常的時候會去調(diào)用這個方法
說明5:表示每個bulk要執(zhí)行1000條請求
說明6:表示每到5mb大小的時候執(zhí)行bulk
說明7:不論請求的數(shù)量如何,我們每5s刷新bulk
說明8:設(shè)置當(dāng)前并發(fā)請求數(shù)量,0表示運(yùn)行執(zhí)行單個請求,1表示當(dāng)在累計一個新bulk請求時,一個并發(fā)請求允許被執(zhí)行(不知道在講什么)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
說明9:Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an EsRejectedExecutionException which indicates that there were too little compute resources available for processing the request. To disable backoff, pass BackoffPolicy.noBackoff().
默認(rèn)·BulkProcessor·參數(shù)設(shè)置:
sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
Add requests
創(chuàng)建了BulkProcessor后可以往里面添加請求
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
Closing the Bulk Processor
當(dāng)所有文檔都被加載入了BulkProcessor可以使用awaitClose或close方法將它關(guān)閉
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
or
bulkProcessor.close();
兩個方法都 flush任何剩余的文檔,并且使所有通過flushInterval規(guī)定的flushes失效,如果concurrent requests啟動了,awaitClose 方法會等待特定時間直至所有bulk請求完成,然后返回true,如果在這些請求執(zhí)行完成前,設(shè)定的時間已到,那么則返回false.
close不等待剩余請求執(zhí)行完畢,立馬退出.
Using Bulk Processor in tests
使用elasticsearch做測試并且使用BulkProcessor
往dataset添加數(shù)據(jù)時, 你最好把concurrent requests設(shè)置為0
于是bulk的flush operation將會用異步的方式執(zhí)行:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();