elasticsearch-spark用法

Hadoop允許Elasticsearch在Spark中以兩種方式使用:通過自2.1以來的原生RDD支持,或者通過自2.0以來的Map/Reduce橋接器。從5.0版本開始,elasticsearch-hadoop就支持Spark 2.0。目前spark支持的數(shù)據(jù)源有:
(1)文件系統(tǒng):LocalFS、HDFS、Hive、text、parquet、orc、json、csv
(2)數(shù)據(jù)RDBMS:mysql、oracle、mssql
(3)NOSQL數(shù)據(jù)庫:HBase、ES、Redis
(4)消息對象:Redis

elasticsearch相對hdfs來說,容易搭建、并且有可視化kibana支持,非常方便spark的初學入門,本文主要講解用elasticsearch-spark的入門。

image.png

一、原生RDD支持

1.1 基礎配置

相關庫引入:

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-30_2.13</artifactId>
            <version>8.1.3</version>
        </dependency>

SparkConf配置,更多詳細的請點擊這里或者源碼ConfigurationOptions。

public static SparkConf getSparkConf() {
    SparkConf sparkConf = new SparkConf().setAppName("elasticsearch-spark-demo");
    sparkConf.set("es.nodes", "host")
            .set("es.port", "xxxxxx")
            .set("es.nodes.wan.only", "true")
            .set("es.net.http.auth.user", "elxxxxastic")
            .set("es.net.http.auth.pass", "xxxx")
            .setMaster("local[*]");
    return sparkConf;
}

1.2 讀取es數(shù)據(jù)

這里用的是kibana提供的sample data里面的索引kibana_sample_data_ecommerce,也可以替換成自己的索引。

public static void main(String[] args) {
    SparkConf conf = getSparkConf();
    try (JavaSparkContext jsc = new JavaSparkContext(conf)) {

        JavaPairRDD<String, Map<String, Object>> esRDD =
                JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce");
        esRDD.collect().forEach(System.out::println);
    }
}

esRDD同時也支持query語句esRDD(final JavaSparkContext jsc, final String resource, final String query),一般對es的查詢都需要根據(jù)時間篩選一下,不過相對于es的官方sdk,并沒有那么友好的api,只能直接使用原生的dsl語句。

1.3 寫數(shù)據(jù)

支持序列化對象、json,并且能夠使用占位符動態(tài)索引寫入數(shù)據(jù)(使用較少),不過多介紹了。

public static void jsonWrite(){
    String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
    String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
    JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
    JavaEsSpark.saveJsonToEs(stringRDD, "spark-json");
}

比較常用的讀寫也就這些,更多可以看下官網相關介紹。

二、Spark Streaming

spark的實時處理,es5.0的時候開始支持,Spark Streaming中的DStream編程接口是RDD,我們需要對RDD進行處理,處理起來較為費勁且不美觀。

在spark streaming中,如果我們需要修改流程序的代碼,在修改代碼重新提交任務時,是不能從checkpoint中恢復數(shù)據(jù)的(程序就跑不起來),是因為spark不認識修改后的程序了。

public class EsSparkStreaming extends EsBaseConfig {
    public static void main(String[] args) throws StreamingQueryException, TimeoutException {
        SparkConf conf = getSparkConf();
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, Seconds.apply(1));

        Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
        Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

        JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
        Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>();
        microbatches.add(javaRDD);
        JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches);

        JavaEsSparkStreaming.saveToEs(javaDStream, "spark-streaming");

        jssc.start();
    }
}

這里沒有執(zhí)行awaitTermination,執(zhí)行代碼后沒有卡住,即可在es上查看

image.png

三、Spark SQL

elasticsearch-hadoop也提供了spark sql的插件,換言之,elasticsearch變成了Spark SQL的原生數(shù)據(jù)源,可以通過Spark SQL顯示調用,下面的例子將kibana_sample_data_ecommerce索引讀取,然后轉化成dataset,在用sql來統(tǒng)計出當前貨幣。

public class EsToMysqlDemo extends EsBaseConfig {
    public static void main(String[] args) {
        SparkConf conf = getSparkConf();
        try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
            SparkSession sparkSession = SparkSession.builder()
                    .config(conf)
                    .getOrCreate();
            JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "kibana_sample_data_ecommerce").values();
            JavaRDD<Row> map = esRDD.map(v -> {
                String currency = v.get("currency").toString();
                String customerFullName = v.get("customer_full_name").toString();
                String productsSku = v.getOrDefault("products", "").toString();

                return RowFactory.create(currency, customerFullName, productsSku);
            });
            Dataset<Row> dataset = sparkSession.createDataFrame(map, StructType.fromDDL("currency string,customer_full_name string,products string"));
            dataset.show(2);

            Dataset<Row> count = dataset.select("currency").groupBy("currency").count();
            count.show(2);


        }
    }
}

第一個show展示了當前的dataset,第二個show展示group by之后的結果。

image.png

四、Spark Structure Streaming

Structured Streaming使用DataFrame、DataSet的編程接口,處理數(shù)據(jù)時可以使用Spark SQL中提供的方法,數(shù)據(jù)的轉換和輸出會變得更加簡單。

在structured streaming中,對于指定的代碼修改操作,是不影響修改后從checkpoint中恢復數(shù)據(jù)的。具體可參見文檔。下面這個例子是從控制臺中讀取數(shù)據(jù),然后根據(jù)","切割,把第一個賦值給name,然后寫入到es的spark-structured-streaming索引中去,啟動程序前需要在控制臺執(zhí)行下命令:nc -lk 9999。

@Data
public static class PersonBean {
    private String name;
    private String surname;
}

public static void main(String[] args) throws StreamingQueryException {
    SparkConf sparkConf = getSparkConf();
    SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();


    Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();

    Dataset<PersonBean> people = lines.as(Encoders.STRING())
            .map((MapFunction<String, PersonBean>) value -> {
                String[] split = value.split(",");
                PersonBean personBean = new PersonBean();
                personBean.setName(split[0]);
                return personBean;
            }, Encoders.bean(PersonBean.class));

    StreamingQuery es = people.writeStream().option("checkpointLocation", "./location")
            .format("es").start("spark-structured-streaming");
    es.awaitTermination();
}

checkpointLocation是用來設置檢查點,里面會存儲一些commits、offsets、sinks、metadata的信息。

image.png

執(zhí)行完nc -lk 9999后,在控制臺隨便輸入,即可在es中查看響應的結果。

image.png

相關源代碼:

spark-java-demo

參考:

1.Apache Spark support

2.elasticsearch-hadoop

3.使用SparkSQL操作Elasticsearch - Spark入門教程

4.Spark——Spark Streaming 對比 Structured Streaming

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容