(2)sparkstreaming滾動(dòng)窗口和滑動(dòng)窗口演示

一、滾動(dòng)窗口(Tumbling Windows) 滾動(dòng)窗口有固定的大小,是一種對(duì)數(shù)據(jù)進(jìn)行均勻切片的劃分方式。窗口之間沒(méi)有重疊,也不會(huì)有間隔,是“首尾相接”的狀態(tài)。滾動(dòng)窗口可以基于時(shí)間定義,也可以基于數(shù)據(jù)個(gè)數(shù)定義;需要的參數(shù)只有一個(gè),就是窗口的大小(window size)。


1.png

在sparkstreaming中,滾動(dòng)窗口需要設(shè)置窗口大小和滑動(dòng)間隔,窗口大小和滑動(dòng)間隔都是StreamingContext的間隔時(shí)間的倍數(shù),同時(shí)窗口大小和滑動(dòng)間隔相等,如:
.window(Seconds(10),Seconds(10)) 10秒的窗口大小和10秒的滑動(dòng)大小,不存在重疊部分

package com.examples;

import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * Created by lj on 2022-07-12.
 */
public class SparkSql_Socket_Tumble {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //獲得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));

        /**
         * 設(shè)置日志的級(jí)別: 避免日志重復(fù)
         */
        ssc.sparkContext().setLogLevel("ERROR");

        //從socket源獲取數(shù)據(jù)
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
            private static final long serialVersionUID = 1L;

            public WaterSensor call(String s) throws Exception {
                String[] cols = s.split(",");
                WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
                return waterSensor;
            }
        }).window(Durations.minutes(3), Durations.minutes(3));      //滾動(dòng)窗口:需要設(shè)置窗口大小和滑動(dòng)間隔,窗口大小和滑動(dòng)間隔都是StreamingContext的間隔時(shí)間的倍數(shù),同時(shí)窗口大小和滑動(dòng)間隔相等。

        mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
            @Override
            public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
                SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

                Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
                // 創(chuàng)建臨時(shí)表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select * from log");
                System.out.println("========= " + time + "=========");
                //輸出前20條數(shù)據(jù)
                result.show();
            }
        });


        //開始作業(yè)
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}

代碼中定義了一個(gè)3分鐘的時(shí)間窗口和3分鐘的滑動(dòng)大小,運(yùn)行結(jié)果可以看出數(shù)據(jù)沒(méi)有出現(xiàn)重疊,實(shí)現(xiàn)了滾動(dòng)窗口的效果:


2.png

二、滑動(dòng)窗口(Sliding Windows)與滾動(dòng)窗口類似,滑動(dòng)窗口的大小也是固定的。區(qū)別在于,窗口之間并不是首尾相接的,而是可以“錯(cuò)開”一定的位置。如果看作一個(gè)窗口的運(yùn)動(dòng),那么就像是向前小步“滑動(dòng)”一樣。定義滑動(dòng)窗口的參數(shù)有兩個(gè):除去窗口大?。╳indow size)之外,還有一個(gè)滑動(dòng)步長(zhǎng)(window slide),代表窗口計(jì)算的頻率。


3.png

在sparkstreaming中,滑動(dòng)窗口需要設(shè)置窗口大小和滑動(dòng)間隔,窗口大小和滑動(dòng)間隔都是StreamingContext的間隔時(shí)間的倍數(shù),同時(shí)窗口大小和滑動(dòng)間隔不相等,如:
.window(Seconds(10),Seconds(5)) 10秒的窗口大小和5秒的活動(dòng)大小,存在重疊部分
package com.examples;

import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by lj on 2022-07-12.
 */
public class SparkSql_Socket {
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String host = "localhost";
    private static int port = 9999;

    public static void main(String[] args) {
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //獲得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));

        /**
         * 設(shè)置日志的級(jí)別: 避免日志重復(fù)
         */
        ssc.sparkContext().setLogLevel("ERROR");

        //從socket源獲取數(shù)據(jù)
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

        JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
            private static final long serialVersionUID = 1L;

            public WaterSensor call(String s) throws Exception {
                String[] cols = s.split(",");
                WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
                return waterSensor;
            }
        }).window(Durations.minutes(4), Durations.minutes(2));      //滑動(dòng)窗口:指定窗口大小 和 滑動(dòng)頻率 必須是批處理時(shí)間的整數(shù)倍

        mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
            @Override
            public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
                SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

                Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
                // 創(chuàng)建臨時(shí)表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select * from log");
                System.out.println("========= " + time + "=========");
                //輸出前20條數(shù)據(jù)
                result.show();
            }
        });


        //開始作業(yè)
        ssc.start();
        try {
            ssc.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ssc.close();
        }
    }
}
4.png

數(shù)據(jù)演進(jìn)過(guò)程解釋:


5.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容