一、使用spark寫數(shù)據(jù)到Elasticsearch中
- 連接spark,配置es(前提是maven中所需的基本Spark環(huán)境都配置好了,否則還需要添加spark-core的依賴等)
注:如果是在聯(lián)網(wǎng)的環(huán)境下,在項目的配置文件pom.xml文件中添加依賴,這里我安裝的Elasticsearch版本為7.6.2,這里對應(yīng)自己Elasticsearch的版本,相關(guān)依賴的版本可以在官網(wǎng)上查看。
<dependency>
<groupId>org.apche.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apche.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apche.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
如果是在脫機狀態(tài)下,則還需要自己先根據(jù)自己Spark版本和Elasticsearch版本下載好對應(yīng)的包,然后放入到C:\用戶\用戶名.m2\repository\中對應(yīng)文件夾下,然后強制更新maven項目。
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import java.util.Map
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD;
// es.node默認(rèn)是localhost,如果你的es集群不在本地,把ip換成你集群中master節(jié)點所在機器的ip地址
//es.port是設(shè)置es集群中master節(jié)點的端口,如果沒有進行更改,就是9200,不知道可以去服務(wù)器上安裝路徑下查看config/elasticsearch.yml文件中http.port參數(shù)的值
SparkSession session=SparkSession.builder().master("local[*]")
.appName("yourappName")
.config("es.node","ip")
.config("es.port","9200")
.config("es.resource","index/type")
.config("es.index.auto.create",true)
// .config("es.mapping.id","id")
.config("es.nodes.wan.only", true)
.getOrCreate();
session.sparkContext().setLogLevel("WARN");
JavaSparkContext jsc=new JavaSparkContext(session.sparkContext());
其他參數(shù)配置可查看官方文檔
- JavaEsSpark中有兩個寫入方法:saveToEs和saveJsonToEs
- JavaEsSpark.saveToEs(要寫入的數(shù)據(jù)是Map格式)
- JavaEsSpark.saveJsonToEs(要寫入的數(shù)據(jù)是Json格式,這里使用字符串拼接的方式,其實可以用例如阿里的fastjson包的JSON對象,方便,出錯概率也少)
- 使用saveJsonToEs寫入數(shù)據(jù)到ES
1.上傳的樣例數(shù)據(jù)2.spark代碼,這里也可以引入待上傳數(shù)據(jù).pngcom.alibaba.fastjson.JSONObject包,將需要寫入的數(shù)據(jù)轉(zhuǎn)換成json格式,避免字符串拼接時出現(xiàn)出錯。
JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");
JavaRDD<String> writeRDD=readRDD.map(new Function<String, String>() {
/**
* 將文本數(shù)據(jù)寫入到ES索引中
*/
private static final long serialVersionUID = 1L;
public String call(String v1) throws Exception {
// TODO Auto-generated method stub
String[] info=v1.split("\t",-1);
String fournum;
fournum="{\"name\":\""+info[0]+"\",\"sex\":\""+info[1]+"\",\"age\":\""+info[2]+"\",\"num\":\""+info[3]+"\"}";
return fournum;
}
});
JavaEsSpark.saveJsonToEs(writeRDD,"test2/four");
- 使用saveToEs將本地文本文件寫入到Elasticsearch中
//先讀取文本數(shù)據(jù)到JavaRDD中,然后將String轉(zhuǎn)換成Map格式,id自動生成
JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");
JavaRDD<Map<String, Object>> writeRDD=readRDD.map(new Function<String, Map<String,Object>>() {
public Map<String, Object> call(String v1) throws Exception {
String[] info=v1.split(" ");
Map<String,Object> users=ImmutableMap.of("name",info[0],"sex",info[1],"age",info[2],"num",info[3]);
return users;
}
});
//需要數(shù)據(jù)是Map格式
JavaEsSpark.saveToEs(writeRDD, "users/admin");
上傳結(jié)果可以在安裝的可視化工具中查看,也可用curl查看。這里在head插件里可以看到。

head插件中查看上傳后的數(shù)據(jù).png
注:
1.這里的索引是自動創(chuàng)建的。在插入數(shù)據(jù)前先看es中是否存在同名的索引,如果有如果結(jié)構(gòu)一樣,就不需要再自動創(chuàng)建,如果存在的不需要,可以使用curl命令刪除:
curl -XDELETE "http://localhost:9200/index"。也可以在es中先根據(jù)需求創(chuàng)建好索引,然后直接插入數(shù)據(jù)。2.插入數(shù)據(jù)時自動生成(如果想指定id,在spark中配置
es.mapping.id那一句注釋刪掉)3.分片數(shù)和備份數(shù)都是1,這個是可以隨時修改更新(可使用官方給的kibana工具,官網(wǎng)上有資料,方便快捷)的。
另外還有很多問題后面深入研究,例如:如何在繼續(xù)往該索引中插入數(shù)據(jù)時,保證同一內(nèi)容的數(shù)據(jù)不重復(fù)生成id插入
二、查詢Elasticsearch中的數(shù)據(jù)
- 在所有字段上查詢符合條件的數(shù)據(jù),例如前面已經(jīng)導(dǎo)入的數(shù)據(jù):test索引中user類的數(shù)據(jù)。
注:如果只是查詢已經(jīng)在Elasticsearch中的數(shù)據(jù),spark創(chuàng)建連接時,可以不用配置:“es.resource”參數(shù),把“es.index.auto.create”參數(shù)設(shè)置為false
//查詢數(shù)據(jù)中包含25的記錄,網(wǎng)頁上的請求地址:http://masterIP:9200/users/admin/_search/?q="25"&pretty
// 如果要查詢索引中的所有數(shù)據(jù),可以不傳入第三個參數(shù):"?q=\"25\""
JavaPairRDD<String, Map<String, Object>> searchRDD=JavaEsSpark.esRDD(jsc,"users/admin","?q=\"25\"");
searchRDD.foreach(new VoidFunction<Tuple2<String,Map<String,Object>>>() {
/**
* searchRDD中的第一個string的值是每條記錄的在Elasticsearch索引中的id值
* 使用http://masterIP:9200/users/admin/id值?pretty 也可查看數(shù)據(jù)
*/
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Map<String, Object>> t) throws Exception {
// TODO Auto-generated method stub
System.out.println(t._1+"---------------"+t._2);
}
});

通過http請求查詢符合條件的數(shù)據(jù).png

通過id值查看數(shù)據(jù)內(nèi)容.png
