ES 導(dǎo)入 導(dǎo)出
java代碼
導(dǎo)入
public class Input {
/**
* 文件保存路徑
*/
private static final String filePath = "F:\\數(shù)據(jù)文件\\p1\\p1_10.json";
/**
* 索引名稱
*/
private static final String indexName = "dwd-p1";
/**
* 類型名稱
*/
private static final String typeName = "dwdata";
public static void main(String[] args) throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "hinge-es").build();
TransportClient client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.2"), 9300));
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(new File(filePath)));
String json;
int count = 0;
long total = 0;
BulkRequestBuilder bulkRequest = client.prepareBulk();
while ((json = br.readLine()) != null) {
count++;
total++;
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));
if (count == 500) {
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.err.println("############ 出錯了!!!??!");
}
bulkRequest = client.prepareBulk();
count = 0;
System.out.println("已經(jīng)導(dǎo)入:" + total);
}
}
if (count != 0) {
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
System.err.println("############ 出錯了!?。。?!");
}
System.out.println("已經(jīng)導(dǎo)入:" + total);
}
System.out.println("導(dǎo)入結(jié)束");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
導(dǎo)出
public class Output {
/**
* 一批獲取數(shù)據(jù)
*/
private static final int BATCH_SIZE = 10000;
/**
* 文件記錄數(shù)
*/
private static final int FILE_RECORD = 300000;
/**
* 文件保存路徑
*/
private static final String filePath = "F:\\數(shù)據(jù)文件\\dwd-p1\\";
/**
* 索引名稱
*/
private static final String indexName = "dwd-p1";
/**
* 類型名稱
*/
private static final String typeName = "dwdata";
public static void main(String[] args) throws UnknownHostException {
Settings settings = Settings.settingsBuilder().put("cluster.name", "hinge-es").build();
TransportClient client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.2"), 9300));
SearchResponse scrollResp = client.prepareSearch(indexName).setTypes(typeName).setQuery(QueryBuilders.matchAllQuery()).setSize(BATCH_SIZE).setScroll(new TimeValue(600000))
.execute().actionGet();
int fileIndex = 0;
String outputFile = getFileName(fileIndex);
BufferedWriter out = null;
long totalSize = 0;
try {
out = new BufferedWriter(new FileWriter(outputFile));
while (true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
totalSize++;
out.write(hit.getId());// ID
out.write("\r\n");
out.write(hit.getSourceAsString());// 數(shù)據(jù)
out.write("\r\n");
out.flush();
}
System.out.println("已經(jīng)導(dǎo)出:" + totalSize);
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
if (totalSize > FILE_RECORD) {
out.flush();
out.close();
out = null;
System.out.println(outputFile);
System.out.println("導(dǎo)出結(jié)束,總共導(dǎo)出數(shù)據(jù):" + totalSize);
totalSize = 0;
fileIndex++;
outputFile = getFileName(fileIndex);
out = new BufferedWriter(new FileWriter(outputFile));
}
}
System.out.println(outputFile);
System.out.println("導(dǎo)出結(jié)束,總共導(dǎo)出數(shù)據(jù):" + totalSize);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (out != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
System.out.println("查詢結(jié)束");
}
private static String getFileName(int fileIndex) {
String outputFile = filePath + indexName + "#" + typeName + "_" + fileIndex + ".json";
return outputFile;
}
}