1.Global Windows
全局窗口將key相同的數(shù)據(jù)都分配到一個(gè)單獨(dú)的窗口中,每一種key對(duì)應(yīng)一個(gè)全局窗口,多個(gè)全局窗口之間是相互獨(dú)立的。如果是Non-Keyed Windows,就僅有一個(gè)全局窗口。全局窗口沒(méi)有結(jié)束的邊界,使用的Trigger(觸發(fā)器)是NeverTrigger。如果不對(duì)全局窗口指定一個(gè)觸發(fā)器,窗口是不會(huì)觸發(fā)計(jì)算的
reduce/sum
public class Reduce {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-16
* Time: 21:31
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);
AllWindowedStream<Integer, GlobalWindow> windowedStream = map.countWindowAll(5);
SingleOutputStreamOperator<Integer> reduce = windowedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer t2, Integer t1) throws Exception {
return t2 + t1;
}
});
reduce.print();
environment.execute("job");
}
}
Keyby
注意lambda表達(dá)式的使用 : SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
public class Keyby {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-16
* Time: 21:31
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
/* SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map (String s) throws Exception {
return Tuple2.of(s, 1);
}
});*/
KeyedStream<Tuple2<String, Integer>, Tuple> stream = map.keyBy(0);
// KeyedStream<Tuple2<String, Integer>, String> keyBy = map.keyBy(x -> x.f0);
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = stream.countWindow(5);
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = window.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
stringIntegerTuple2.f1 = stringIntegerTuple2.f1 + t1.f1;
return stringIntegerTuple2;
}
});
reduce.print();
environment.execute("job");
}
}
apply
public class Apply {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-16
* Time: 22:12
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);
AllWindowedStream<Integer, GlobalWindow> countWindowAll = map.countWindowAll(5);
SingleOutputStreamOperator<Integer> streamOperator = countWindowAll.apply(new AllWindowFunction<Integer, Integer, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
ArrayList<Integer> list = new ArrayList<>();
for (Integer value : values) {
list.add(value);
}
list.sort(new Comparator<Integer>() {
@Override
public int compare(Integer integer, Integer t1) {
return Integer.compare(integer,t1);
}
});
for (Integer integer : list) {
out.collect(integer);
}
}
});
//.setParallelism(1) 注意在printf的后面
streamOperator.print().setParallelism(1);
environment.execute("job");
}
}
2.Tumbling Windows
滾動(dòng)窗口是按照時(shí)間劃分的窗口,其Assinger會(huì)將輸入的每一條數(shù)據(jù)按照時(shí)間分配到固定長(zhǎng)度的窗口內(nèi),并且按照這個(gè)固定的時(shí)間進(jìn)行滾動(dòng),窗口和窗口之間沒(méi)有數(shù)據(jù)重疊
TumblingWindows的of方法如果指定一個(gè)參數(shù),就會(huì)按照指定的時(shí)間周期性的滾動(dòng)形成新的窗口,例如TumblingProcessingTimeWindows.of(Time.days(1)),那么窗口的起始時(shí)間是以當(dāng)前系統(tǒng)的ProcessingTime的整點(diǎn)開始以小時(shí)為單位對(duì)齊。例如[1:00:00.000, 1:59:59.999]對(duì)應(yīng)一個(gè)窗口,[2:00:00.000, 2:59:59.999]會(huì)對(duì)應(yīng)下一個(gè)窗口,并且會(huì)不斷的生成窗口。(為了方便描述,才使用1:00:00.000這種格式,窗口的時(shí)間其實(shí)是timestamp格式)
TumblingWindows的of方法還可以傳入2個(gè)參數(shù),第二個(gè)參數(shù)的作用是將時(shí)間調(diào)整成指定時(shí)區(qū)的時(shí)間。在UTC-0以外的時(shí)區(qū),就需要指定一個(gè)偏移量進(jìn)行調(diào)整。例如,在中國(guó)就必須指定Time.hours(-8)的偏移量
Non-Keyed Tumbling Windows
public class Test1 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-17
* Time: 19:48
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
AllWindowedStream<String, TimeWindow> stream = source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)));
SingleOutputStreamOperator<String> streamOperator = stream.reduce((x, y) -> String.valueOf(Integer.parseInt(x) + Integer.parseInt(y))).returns(Types.STRING);
streamOperator.print();
environment.execute("job");
}
}
Keyed Tumbling Windows
public class Test1 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-17
* Time: 19:48
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<Integer, Integer>> operator = source.map(x -> Tuple2.of(Integer.parseInt(x), 1)).returns(Types.TUPLE(Types.INT, Types.INT));
KeyedStream<Tuple2<Integer, Integer>, Integer> keyBy = operator.keyBy(x -> x.f0);
AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> windowAll = keyBy.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = windowAll.sum(1);
sum.print();
environment.execute("job");
}
}
3.Sliding Windows
滑動(dòng)窗口是按照時(shí)間劃分的窗口,其Assinger會(huì)將輸入的每一條數(shù)據(jù)按照時(shí)間分配到固定長(zhǎng)度的窗口內(nèi),并且還可以指定一個(gè)額外的滑動(dòng)參數(shù)用來(lái)指定窗口滑動(dòng)的頻率(也叫滑動(dòng)步長(zhǎng)),因此當(dāng)滑動(dòng)步長(zhǎng)小于窗口的長(zhǎng)度時(shí),窗口和窗口之間有數(shù)據(jù)重疊
SlidingWindows的of方法如果指定兩個(gè)參數(shù),第一個(gè)參數(shù)為窗口的長(zhǎng)度,第二個(gè)為滑動(dòng)的頻率(或加滑動(dòng)步長(zhǎng))。例如SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)),那么窗口的起始時(shí)間是以數(shù)據(jù)對(duì)應(yīng)的EventTime并且是滑動(dòng)步長(zhǎng)的整數(shù)倍為單位對(duì)齊。例如[1:00:00.000, 1:00:09.999]對(duì)應(yīng)一個(gè)窗口,[1:00:05.000, 1:00:14.999]會(huì)對(duì)應(yīng)下一個(gè)窗口,兩窗口有數(shù)據(jù)重疊,并且會(huì)不斷的生成窗口
Non-Keyed Sliding Windows
public class Test2 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-17
* Time: 20:13
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);
AllWindowedStream<Integer, TimeWindow> stream = map.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
SingleOutputStreamOperator<Integer> sum = stream.sum(0);
sum.print();
environment.execute("job");
}
}
Keyed Sliding Windows
public class Test2 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-17
* Time: 20:13
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<Integer, Integer>> returns = source.map(x -> Tuple2.of(Integer.parseInt(x), 1)).returns(Types.TUPLE(Types.INT, Types.INT));
KeyedStream<Tuple2<Integer, Integer>, Tuple> keyBy = returns.keyBy(0);
AllWindowedStream<Tuple2<Integer, Integer>, TimeWindow> stream = keyBy.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<Integer, Integer>> sum = stream.sum(0);
sum.print();
environment.execute("job");
}
}
4.Session Windows
會(huì)話窗口是按照時(shí)間間隔劃分窗口的,當(dāng)超過(guò)指定的時(shí)間間隔,就會(huì)劃分一個(gè)新的窗口。會(huì)話窗口沒(méi)有固定的起始時(shí)間和結(jié)束時(shí)間,窗口中的數(shù)據(jù)也不會(huì)重疊。會(huì)話窗口可以指定一個(gè)固定的時(shí)間間隔,也可以根據(jù)數(shù)據(jù)中的信息傳入一個(gè)函數(shù)計(jì)算出一個(gè)動(dòng)態(tài)變化的時(shí)間間隔
//EventTime會(huì)話窗口wordAndOne
.keyBy(0) //指定key selector 分組字段
.window(EventTimeSessionWindows.withGap(Time.minutes(10))) //指定固定的時(shí)間間隔為10分鐘
.sum(1); //觸發(fā)窗口對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算
wordAndOne
.keyBy(0) //指定key selector 分組字段
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
return element.f1 * 1000; //指定一個(gè)動(dòng)態(tài)的時(shí)間間隔,根據(jù)數(shù)據(jù)的f1字段乘以1000得到,返回的是long類型
}))
.sum(1); //觸發(fā)窗口對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算
//ProcessingTime會(huì)話窗口
wordAndOne
.keyBy(0) //指定key selector 分組字段
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.sum(1); //觸發(fā)窗口對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算
wordAndOne
.keyBy(0) //指定key selector 分組字段
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
return element.f1 * 1000; //指定一個(gè)動(dòng)態(tài)的時(shí)間間隔,根據(jù)數(shù)據(jù)的f1字段乘以1000得到,返回的是long類型
}))
.sum(1); //觸發(fā)窗口對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行sum運(yùn)算