一、滾動(dòng)窗口(Tumbling Windows) 滾動(dòng)窗口有固定的大小,是一種對(duì)數(shù)據(jù)進(jìn)行均勻切片的劃分方式。窗口之間沒(méi)有重疊,也不會(huì)有間隔,是“首尾相接”的狀態(tài)。滾動(dòng)窗口可以基于時(shí)間定義,也可以基于數(shù)據(jù)個(gè)數(shù)定義;需要的參數(shù)只有一個(gè),就是窗口的大小(window size)。

在sparkstreaming中,滾動(dòng)窗口需要設(shè)置窗口大小和滑動(dòng)間隔,窗口大小和滑動(dòng)間隔都是StreamingContext的間隔時(shí)間的倍數(shù),同時(shí)窗口大小和滑動(dòng)間隔相等,如:
.window(Seconds(10),Seconds(10)) 10秒的窗口大小和10秒的滑動(dòng)大小,不存在重疊部分
package com.examples;
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Created by lj on 2022-07-12.
*/
public class SparkSql_Socket_Tumble {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
/**
* 設(shè)置日志的級(jí)別: 避免日志重復(fù)
*/
ssc.sparkContext().setLogLevel("ERROR");
//從socket源獲取數(shù)據(jù)
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(3), Durations.minutes(3)); //滾動(dòng)窗口:需要設(shè)置窗口大小和滑動(dòng)間隔,窗口大小和滑動(dòng)間隔都是StreamingContext的間隔時(shí)間的倍數(shù),同時(shí)窗口大小和滑動(dòng)間隔相等。
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 創(chuàng)建臨時(shí)表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//輸出前20條數(shù)據(jù)
result.show();
}
});
//開始作業(yè)
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
代碼中定義了一個(gè)3分鐘的時(shí)間窗口和3分鐘的滑動(dòng)大小,運(yùn)行結(jié)果可以看出數(shù)據(jù)沒(méi)有出現(xiàn)重疊,實(shí)現(xiàn)了滾動(dòng)窗口的效果:

二、滑動(dòng)窗口(Sliding Windows)與滾動(dòng)窗口類似,滑動(dòng)窗口的大小也是固定的。區(qū)別在于,窗口之間并不是首尾相接的,而是可以“錯(cuò)開”一定的位置。如果看作一個(gè)窗口的運(yùn)動(dòng),那么就像是向前小步“滑動(dòng)”一樣。定義滑動(dòng)窗口的參數(shù)有兩個(gè):除去窗口大?。╳indow size)之外,還有一個(gè)滑動(dòng)步長(zhǎng)(window slide),代表窗口計(jì)算的頻率。

在sparkstreaming中,滑動(dòng)窗口需要設(shè)置窗口大小和滑動(dòng)間隔,窗口大小和滑動(dòng)間隔都是StreamingContext的間隔時(shí)間的倍數(shù),同時(shí)窗口大小和滑動(dòng)間隔不相等,如:
.window(Seconds(10),Seconds(5)) 10秒的窗口大小和5秒的活動(dòng)大小,存在重疊部分
package com.examples;
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.ArrayList;
import java.util.List;
/**
* Created by lj on 2022-07-12.
*/
public class SparkSql_Socket {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
/**
* 設(shè)置日志的級(jí)別: 避免日志重復(fù)
*/
ssc.sparkContext().setLogLevel("ERROR");
//從socket源獲取數(shù)據(jù)
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(4), Durations.minutes(2)); //滑動(dòng)窗口:指定窗口大小 和 滑動(dòng)頻率 必須是批處理時(shí)間的整數(shù)倍
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 創(chuàng)建臨時(shí)表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//輸出前20條數(shù)據(jù)
result.show();
}
});
//開始作業(yè)
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}

數(shù)據(jù)演進(jìn)過(guò)程解釋:
