前言
通過(guò)上一篇文章,我們基本了解了窗口的概念,以及常見(jiàn)的用法,本篇繼續(xù)學(xué)習(xí),flink 的window api 操作
窗口分配 - window() 方法
- 我們可以使用
window()來(lái)定義一個(gè)窗口,然后基于窗口做一些處理,注意window()操作必須在keyBy()之后才能用 - flink 提供了更加簡(jiǎn)單的
timeWindow()和countWindow()用于定義時(shí)間窗口和計(jì)數(shù)窗口
windowAll 方法
上文說(shuō)到,window()必須用在 keyBy() 之后,但是 windowAll() 可以直接應(yīng)用在 DataStream 類(lèi)型上,但是這個(gè)相當(dāng)于,把所有數(shù)據(jù)都放到一個(gè)窗口運(yùn)行,是一個(gè)全局操作,會(huì)把所有數(shù)據(jù)都放到相同的下游算子,相當(dāng)于并行度設(shè)置為了1,不推薦使用
timeWindow
timeWindow ,傳一個(gè)參數(shù)時(shí),會(huì)設(shè)置為滾動(dòng)窗口,傳兩個(gè)參數(shù),就是滑動(dòng)窗口
如何操作窗口中收集的數(shù)據(jù)
增量聚合函數(shù)
- 每條數(shù)據(jù)進(jìn)來(lái)就計(jì)算,保持一個(gè)簡(jiǎn)單的狀態(tài)
- ReduceFunction, AggregateFunction
例如上一篇的wordCount 案例,就是增量聚合,即來(lái)一條數(shù)據(jù),處理一條,還有類(lèi)似的,比如min(),sum()等算子操作,都是增量聚合
ReduceFunction 的案例就是上一篇的word count ,這里主要以AggregateFunction為例 ,依然是實(shí)現(xiàn)一個(gè) word count 的案例,測(cè)試過(guò)程省略
package com.lxs.flink.realtime.window;
import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.Objects;
/**
* User: lixinsong
* Date: 2021/1/14
* Description:
*/
public class AggregateFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Integer >>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer >> collector) throws Exception {
String[] arr = s.split(",");
for (String a: arr) {
collector.collect(Tuple2.of(a, 1));
}
}
});
// 統(tǒng)計(jì)每10s的窗口,輸入數(shù)據(jù)的詞頻
DataStream<Tuple2<String, Integer>> wordCount = dataStream.keyBy(0).timeWindow(Time.seconds(10)).aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> createAccumulator() {
// 創(chuàng)建累加器,初始值給一個(gè) null
return null;
}
@Override
public Tuple2<String, Integer> add(Tuple2<String, Integer> s1, Tuple2<String, Integer> s2) {
// 累加器進(jìn)行累加, 需要注意,因?yàn)槔奂悠鞒跏贾禐?NULL,這里需要人為判斷,如果為null,就設(shè)置為當(dāng)前傳入的數(shù)據(jù)值
if (Objects.isNull(s2)) {
return s1;
}
s2.f1 = s2.f1 + 1; // word count 累加器 + 1
return s2;
}
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
return acc;
}
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> s, Tuple2<String, Integer> acc1) {
// 滾動(dòng)窗口不會(huì)走到這個(gè)函數(shù),一般會(huì)話窗口需要這個(gè)方法
return null;
}
});
wordCount.print("AggregateFunctionTest");
env.execute("test");
}
}
需要注意的是 AggregateFunction 有三個(gè)組成部分,分別是 輸入值,累加值,輸出值, 即實(shí)現(xiàn)的子方法,都是在實(shí)現(xiàn)累加器的功能,AggregateFunction 源碼如下

全窗口函數(shù)
- 先把窗口數(shù)據(jù)收集起來(lái),等到計(jì)算時(shí)遍歷所有數(shù)據(jù)
- ProcessWindowFunction, WindowFunction
先以 WindowFunction 舉例,依然是 word count
package com.lxs.flink.realtime.window;
import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;
import java.util.Iterator;
/**
* User: lixinsong
* Date: 2021/1/14
* Description:
*/
public class WindowFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Integer >>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer >> collector) throws Exception {
String[] arr = s.split(",");
for (String a: arr) {
collector.collect(Tuple2.of(a, 1));
}
}
});
DataStream<Tuple2<String, Integer>> wordCount = dataStream.keyBy(0).timeWindow(Time.seconds(10)).apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@Override
// 全窗口函數(shù),將窗口的數(shù)據(jù)收集,統(tǒng)一處理,所以這里的 input 是一個(gè) Iterable 對(duì)象
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = 0;
String word = "";
for (Tuple2<String, Integer> s : input) {
word = s.f0;
count += 1;
}
out.collect(Tuple2.of(word, count));
}
});
wordCount.print("windowFunction test");
env.execute("test");
}
}
這里主要是在 timeWindow() 方法后面使用了apply() 算子,實(shí)現(xiàn)了 WindowFunction 接口,主要是實(shí)現(xiàn)了 apply()方法,先看下 WindowFunction 源碼,可以看出WindowFunction 有更多的窗口信息,有輸入數(shù)據(jù)(IN),輸出數(shù)據(jù)(OUT),key(keyBy()的返回),窗口信息(W)

其他 API
- trigger() 觸發(fā)器,定義窗口什么時(shí)間關(guān)閉,觸發(fā)計(jì)算并輸出結(jié)果
- evictor() 移除器
- allowedLateness() 允許處理遲到數(shù)據(jù),窗口延遲關(guān)閉
- sideOutputLateData 將遲到數(shù)據(jù)放入側(cè)輸出流
- getSideOutput() 獲取側(cè)輸出流
例如側(cè)輸出流案例如下,但是需要注意,側(cè)輸出流必須用在時(shí)間語(yǔ)義下,否則,我們無(wú)法界定什么數(shù)據(jù)算作遲到數(shù)據(jù),應(yīng)該放到側(cè)輸出流
OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late"){};
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.keyBy(0)
.timeWindow(Time.seconds(10)) // 設(shè)置 10 秒的滾動(dòng)窗口
.allowedLateness(Time.seconds(2)) // 允許數(shù)據(jù)遲到 2s,窗口延遲2s關(guān)閉
.sideOutputLateData(late) // 如果延遲2s還有遲到數(shù)據(jù),就放到側(cè)輸出流
.sum(1);
sum.getSideOutput(late).print("late");
sum.print("wordCount test");
env.execute("test");
總結(jié)
Keyed Window
// Keyed Window
stream
.keyBy(...) <- 按照一個(gè)Key進(jìn)行分組
.window(...) <- 將數(shù)據(jù)流中的元素分配到相應(yīng)的窗口中
[.trigger(...)] <- 指定觸發(fā)器Trigger(可選)
[.evictor(...)] <- 指定清除器Evictor(可選)
.reduce/aggregate/process() <- 窗口處理函數(shù)Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分組,將數(shù)據(jù)流中的所有元素分配到相應(yīng)的窗口中
[.trigger(...)] <- 指定觸發(fā)器Trigger(可選)
[.evictor(...)] <- 指定清除器Evictor(可選)
.reduce/aggregate/process() <- 窗口處理函數(shù)Window Function