6.Maven項目中使用Spark 2.x讀寫Elasticsearch中的數(shù)據(jù)

一、使用spark寫數(shù)據(jù)到Elasticsearch中
  • 連接spark,配置es(前提是maven中所需的基本Spark環(huán)境都配置好了,否則還需要添加spark-core的依賴等)
    注:如果是在聯(lián)網(wǎng)的環(huán)境下,在項目的配置文件pom.xml文件中添加依賴,這里我安裝的Elasticsearch版本為7.6.2,這里對應(yīng)自己Elasticsearch的版本,相關(guān)依賴的版本可以在官網(wǎng)上查看。
<dependency>
    <groupId>org.apche.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apche.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apche.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>7.6.2</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>16.0.1</version>
</dependency>

如果是在脫機狀態(tài)下,則還需要自己先根據(jù)自己Spark版本和Elasticsearch版本下載好對應(yīng)的包,然后放入到C:\用戶\用戶名.m2\repository\中對應(yīng)文件夾下,然后強制更新maven項目。

import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import scala.Tuple2;
import java.util.Map
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD;

// es.node默認(rèn)是localhost,如果你的es集群不在本地,把ip換成你集群中master節(jié)點所在機器的ip地址
//es.port是設(shè)置es集群中master節(jié)點的端口,如果沒有進行更改,就是9200,不知道可以去服務(wù)器上安裝路徑下查看config/elasticsearch.yml文件中http.port參數(shù)的值
SparkSession session=SparkSession.builder().master("local[*]")
                   .appName("yourappName")
                   .config("es.node","ip")
                   .config("es.port","9200")
                   .config("es.resource","index/type")
                   .config("es.index.auto.create",true)
//                 .config("es.mapping.id","id")
                   .config("es.nodes.wan.only", true)
                   .getOrCreate();
session.sparkContext().setLogLevel("WARN");
JavaSparkContext jsc=new JavaSparkContext(session.sparkContext());

其他參數(shù)配置可查看官方文檔

  • JavaEsSpark中有兩個寫入方法:saveToEs和saveJsonToEs
    • JavaEsSpark.saveToEs(要寫入的數(shù)據(jù)是Map格式)
    • JavaEsSpark.saveJsonToEs(要寫入的數(shù)據(jù)是Json格式,這里使用字符串拼接的方式,其實可以用例如阿里的fastjson包的JSON對象,方便,出錯概率也少)
  • 使用saveJsonToEs寫入數(shù)據(jù)到ES
    1.上傳的樣例數(shù)據(jù)
    待上傳數(shù)據(jù).png
    2.spark代碼,這里也可以引入com.alibaba.fastjson.JSONObject包,將需要寫入的數(shù)據(jù)轉(zhuǎn)換成json格式,避免字符串拼接時出現(xiàn)出錯。
JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");       
JavaRDD<String> writeRDD=readRDD.map(new Function<String, String>() {
    /**
     * 將文本數(shù)據(jù)寫入到ES索引中
    */
    private static final long serialVersionUID = 1L;
    public String call(String v1) throws Exception {
        // TODO Auto-generated method stub
        String[] info=v1.split("\t",-1);
        String fournum;
        fournum="{\"name\":\""+info[0]+"\",\"sex\":\""+info[1]+"\",\"age\":\""+info[2]+"\",\"num\":\""+info[3]+"\"}";
        return fournum;
    }
            
});
JavaEsSpark.saveJsonToEs(writeRDD,"test2/four");
  • 使用saveToEs將本地文本文件寫入到Elasticsearch中
//先讀取文本數(shù)據(jù)到JavaRDD中,然后將String轉(zhuǎn)換成Map格式,id自動生成
JavaRDD<String> readRDD=jsc.textFile("file:///E:/data/user");       
JavaRDD<Map<String, Object>> writeRDD=readRDD.map(new Function<String, Map<String,Object>>() {
    public Map<String, Object> call(String v1) throws Exception {
        String[] info=v1.split("    ");
        Map<String,Object> users=ImmutableMap.of("name",info[0],"sex",info[1],"age",info[2],"num",info[3]);         
        return users;
    }
});
//需要數(shù)據(jù)是Map格式
JavaEsSpark.saveToEs(writeRDD, "users/admin");

上傳結(jié)果可以在安裝的可視化工具中查看,也可用curl查看。這里在head插件里可以看到。

head插件中查看上傳后的數(shù)據(jù).png

注:
1.這里的索引是自動創(chuàng)建的。在插入數(shù)據(jù)前先看es中是否存在同名的索引,如果有如果結(jié)構(gòu)一樣,就不需要再自動創(chuàng)建,如果存在的不需要,可以使用curl命令刪除:curl -XDELETE "http://localhost:9200/index"。也可以在es中先根據(jù)需求創(chuàng)建好索引,然后直接插入數(shù)據(jù)。
2.插入數(shù)據(jù)時自動生成(如果想指定id,在spark中配置es.mapping.id那一句注釋刪掉)
3.分片數(shù)和備份數(shù)都是1,這個是可以隨時修改更新(可使用官方給的kibana工具,官網(wǎng)上有資料,方便快捷)的。
另外還有很多問題后面深入研究,例如:如何在繼續(xù)往該索引中插入數(shù)據(jù)時,保證同一內(nèi)容的數(shù)據(jù)不重復(fù)生成id插入


二、查詢Elasticsearch中的數(shù)據(jù)
  • 在所有字段上查詢符合條件的數(shù)據(jù),例如前面已經(jīng)導(dǎo)入的數(shù)據(jù):test索引中user類的數(shù)據(jù)。
    注:如果只是查詢已經(jīng)在Elasticsearch中的數(shù)據(jù),spark創(chuàng)建連接時,可以不用配置:“es.resource”參數(shù),把“es.index.auto.create”參數(shù)設(shè)置為false
//查詢數(shù)據(jù)中包含25的記錄,網(wǎng)頁上的請求地址:http://masterIP:9200/users/admin/_search/?q="25"&pretty
// 如果要查詢索引中的所有數(shù)據(jù),可以不傳入第三個參數(shù):"?q=\"25\""
JavaPairRDD<String, Map<String, Object>> searchRDD=JavaEsSpark.esRDD(jsc,"users/admin","?q=\"25\"");
searchRDD.foreach(new VoidFunction<Tuple2<String,Map<String,Object>>>() {
    /**
     * searchRDD中的第一個string的值是每條記錄的在Elasticsearch索引中的id值
     * 使用http://masterIP:9200/users/admin/id值?pretty 也可查看數(shù)據(jù)
     */
    private static final long serialVersionUID = 1L;

    public void call(Tuple2<String, Map<String, Object>> t) throws Exception {
        // TODO Auto-generated method stub
        System.out.println(t._1+"---------------"+t._2);
    }
});
通過http請求查詢符合條件的數(shù)據(jù).png
通過id值查看數(shù)據(jù)內(nèi)容.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容