原文連接 https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html
Windows是無限數(shù)據(jù)流(infinite streams)處理的核心,Windows將一個stream拆分成有限大小的"桶(buckets)",可以在這些桶上做計算操作。
窗口化的Flink程序的一般結(jié)構(gòu)如下,第一個代碼段中是分組的流,第二段是非分組的流。區(qū)別是分組的stream調(diào)用keyBy(...)和window(...),非分組的stream中window(...)換成了windowAll(...)
分組窗口(Keyed Windows)
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
無分組窗口(Non-Keyed Windows)
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
方括號[]內(nèi)的命令是可選的,這表明Flink允許你根據(jù)最符合你的要求來定義自己的window邏輯。
Window生命周期(Window Lifecycle)
簡單地說,當一個屬于window的元素到達之后這個window就創(chuàng)建了,而當時間(event time或者processing time)超過window的創(chuàng)建時間和用戶指定的延遲時間相加時,窗口將被徹底清除。
Flink確保了==只清除基于時間的window,其他類型的window不清除==,例如:Global window。舉個例子:對于一個每5分鐘創(chuàng)建無覆蓋的窗口,允許一個1分鐘的時延的窗口策略,F(xiàn)link將會在12:00到12:05這段時間內(nèi)第一個元素到達時創(chuàng)建窗口,當水?。╳artmark)通過12:06時,移除這個窗口。
此外,每個window都有一個 Trigger 和一個Function(例如:ProcessWindowFunction,ReduceFunction,AggregateFunction和FoldFunction)。Function包含了應(yīng)用于窗口內(nèi)容的計算,Trigger指定了函數(shù)何時被觸發(fā)。一個觸發(fā)策略可以是 "當窗口中的元素個數(shù)超過4個時" 或者 "當水印達到窗口的邊界時"。觸發(fā)器還可以決定在窗口創(chuàng)建和刪除之間的任意時刻,清除窗口的內(nèi)容。清除僅指清除window內(nèi)的元素而不是window的元數(shù)據(jù),新的數(shù)據(jù)還是可以被添加到當前的window中。
除了上面的提到之外,還可以指定一個 Evictor,Evictor可以在觸發(fā)器觸發(fā)之后和在函數(shù)被應(yīng)用之前或者之后,清除窗口中的元素。
分組和非分組Windows(Keyed vs Non-Keyed Windows)
首先,第一件事是==指定stream是分組的還是未分組的,這個必須在定義window之前定義好==。使用keyBy(...)會將stream拆分成邏輯分組的數(shù)據(jù)流,如果keyBy(...)函數(shù)不被調(diào)用的話,stream將不是分組的。
在分組stream中,任何正在傳入的事件的屬性都可以被當做 Specifying Keys,分組stream將通過多任務(wù)并發(fā)執(zhí)行window計算,每一個邏輯分組stream在執(zhí)行中是獨立地進行的。
在非分組stream中,原始stream不會拆分成多個邏輯stream并且所有的window邏輯將在一個任務(wù)中執(zhí)行,并發(fā)度為1。
窗口分配器(Window Assingers)
指定完你的數(shù)據(jù)流是分組的還是非分組的之后,接下來需要定義一個窗口分配器(WindowAssigner)。窗口分配定義了元素如何分配到窗口中,這是通過調(diào)用window(...)或者windowAll(...)時,選擇的窗口分配器(WindowAssigner)來指定的。
WindowAssigner是==負責將每一個到來的元素分配給一個或者多個窗口==,F(xiàn)link提供了一些常用的預定義窗口分配器,即:滾動窗口(tumbling windows)、滑動窗口(sliding windows)、會話窗口(session windows)和全局窗口(globalwindows),也可以通過繼承WindowAssigner類來自定義自己的窗口。除了全局窗口分配器,其他所有的內(nèi)置窗口分配器都是通過時間來分配元素到窗口中的,這個時間要么是event time,要么是processing time。
了解更多processing time和event time的區(qū)別及時間戳(timestamp)和水?。╳atermark)是如何產(chǎn)生的,查看 event time
滾動窗口(Tumbling Windows)
滾動窗口分配器將每個元素分配的一個指定窗口大小的窗口中,==滾動窗口有一個固定的大小,并且不會出現(xiàn)重疊==。例如:指定了一個5分鐘大小的滾動窗口,當前窗口將被評估并將按下圖說明每5分鐘創(chuàng)建一個新的窗口。

val input: DataStream[T] = ...
// 滾動event-time窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// 滾動processing-time窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// 每日偏移8小時的滾動event-time窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
在上面的最后一個例子中,滾動窗口分配器接受了一個可選的偏移參數(shù),可以用來改變窗口的排列。例如:沒有偏移的情況,按小時的滾動窗口將按整點時間對齊,會得到一系列窗口如: 1:00:00 ~ 1:59:59、 2:00:00 ~ 2:59:59 等。如果指定了一個15分鐘的偏移,將得到的窗口如下: 1:15:00 ~ 2:14:59、 2:15:00 ~ 3:14:59 等。
時間偏移一個很大的用處是用來調(diào)準非0時區(qū)的窗口,例如:在中國你需要指定一個8小時的時間偏移。
滑動窗口(Sliding Windows)
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數(shù)來配置,另一個==窗口滑動參數(shù)控制滑動窗口開始的頻率==。因此,滑動窗口如果滑動參數(shù)小于滾動參數(shù)的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。例如:你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產(chǎn)生的數(shù)據(jù)。

val input: DataStream[T] = ...
// 滑動event-time窗口
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// 滑動processing-time窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// 偏移8小時的滑動processing-time窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
除了需要指定滑動時間參數(shù),其他與滾動窗口類似,也可以指定偏移
會話窗口(Session Windows)
會話窗口分配器==通過session活動來對元素進行分組==,會話窗口跟滾動窗口和滑動窗口相比,==不會有重疊和固定的開始時間和結(jié)束時間的情況==。相應(yīng)的,當它在一個固定的時間周期內(nèi)不再收到元素,即非活動間隔產(chǎn)生,那個這個窗口就會關(guān)閉。
會話窗口可以配置一個靜態(tài)session gap或定義了非活躍周期的session gap提取函數(shù)。當這個非活躍周期產(chǎn)生,當前的會話窗口將關(guān)閉并且后續(xù)的元素將被分配到新的會話窗口中。

val input: DataStream[T] = ...
// event-time Session窗口,固定的Session gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time Session窗口,動態(tài)的Session gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// 確定和返回session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time Session窗口,固定的Session gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time Session窗口,動態(tài)的Session gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// 確定和返回session gap
}
}))
.<windowed transformation>(<window function>)
session windows沒有一個固定的開始和結(jié)束時間。session windows操作為每一個到達的元素創(chuàng)建一個新的窗口,并合間隔時間小于指定時間間隔的窗口。為了進行合并,session windows的操作需要指定一個合并觸發(fā)器(Trigger)和一個合并窗口函數(shù)(Window Function),如:ReduceFunction、AggregateFunction或者ProcessWindowFunction。
全局窗口(Global Windows)
全局窗口分配器==將所有具有相同key的元素分配到同一個全局窗口中==,這個窗口模式++僅適用于用戶還需自定義觸發(fā)器的情況++。否則,由于==全局窗口沒有一個自然的結(jié)尾==,無法執(zhí)行元素的聚合,將不會有計算被執(zhí)行。
[圖片上傳失敗...(image-24f35a-1534762158575)]
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
窗口函數(shù)(Window Functions)
定義完窗口分配器后,我們還需要為每一個窗口指定我們需要執(zhí)行的計算,這是窗口的責任,當系統(tǒng)決定一個窗口已經(jīng)準備好執(zhí)行之后,這個窗口函數(shù)將被用來處理窗口中的每一個元素(可能是分組的)。當一個窗口準備好之后,F(xiàn)link是如何決定的?
window函數(shù)可以是ReduceFunction、AggregateFunction、FoldFunction或ProcessWindowFunction中的一個。前面兩個更高效一些,因為在++每個窗口中增量地對每一個到達的元素執(zhí)行聚合操作++。一個ProcessWindowFunction可以獲取一個窗口中的所有元素的迭代器(Iterable)以及元素所屬窗口的額外元信息。
有ProcessWindowFunction的窗口化操作會比其他的操作效率要差一些,因為Flink內(nèi)部在調(diào)用函數(shù)之前會將窗口中的所有元素都緩存起來。這個可以通過ProcessWindowFunction和ReduceFunction、AggregateFunction、FoldFunction結(jié)合使用來獲取窗口中所有元素的增量聚合和額外的窗口元數(shù)據(jù)
ReduceFunction
ReduceFunction指定了如何==通過兩個輸入的參數(shù)進行合并輸出一個同類型的參數(shù)==的過程,F(xiàn)link使用ReduceFunction來對窗口中的元素進行增量聚合。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction
聚合函數(shù)是ReduceFunction的一種廣義函數(shù),具有三種類型:輸入類型(in)、累加器類型(ACC)和輸出類型(out)。輸入類型是輸入流中的元素類型,而聚合函數(shù)有一種將一個輸入元素添加到累加器的方法。該接口還具有用于創(chuàng)建初始累加器的方法,用于將兩個累加器合并為一個累加器,并從累加器中提取輸出。
/**
* 這個AverageAggregate用來持續(xù)計算sum和count,getResult方法計算平均值
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
// 創(chuàng)建初始累加器
override def createAccumulator() = (0L, 0L)
// 將一個輸入元素添加到累加器
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
// 輸出結(jié)果
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
// 合并累加器
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
/* */
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
FoldFunction
1.6.0+已經(jīng)過期
FoldFunction指定了一個輸入元素如何與一個輸出類型的元素合并的過程,這個FoldFunction會被每一個加入到窗口中的元素和當前的輸出值增量地調(diào)用,第一個元素是與一個預定義的類型為輸出類型的初始值合并。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
// 追加所有輸入的長整型到一個空的字符串中。
.fold("") { (acc, v) => acc + v._2 }
fold()不能應(yīng)用于Session window或者其他可合并的窗口中。
ProcessWindowFunction
一個ProcessWindowFunction獲得一個包含了window中的所有元素的迭代器(Iterable),和一個Context對象包含訪問時間和狀態(tài)信息,提供了更大的靈活性。這些帶來了性能的成本和資源的消耗,因為window中的元素無法進行增量迭代,而是緩存起來直到window被認為是可以處理時。
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
上面的例子展示了統(tǒng)計一個window中元素個數(shù),此外,還將window的信息添加到輸出中。
使用ProcessWindowFunction來做簡單的聚合操作,如:計數(shù)操作,性能是相當差的。將ReduceFunction跟ProcessWindowFunction結(jié)合起來,來獲取增量聚合和添加到ProcessWindowFunction中的信息,性能更好。
ProcessWindowFunction with Incremental Aggregation
ProcessWindowFunction可以跟ReduceFunction、AggregateFunction或者FoldFunction結(jié)合來增量地對到達window中的元素進行聚合。當window關(guān)閉之后,ProcessWindowFunction就能提供聚合結(jié)果。當獲取到WindowFunction額外的window元信息后就可以進行增量計算窗口了。
Incremental Window Aggregation with ReduceFunction
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
Incremental Window Aggregation with AggregateFunction
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
...
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = {
val average = averages.iterator.next()
out.collect((key, average))
}
}
Incremental Window Aggregation with FoldFunction
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
觸發(fā)器(Triggers)
觸發(fā)器決定了一個窗口何時可以被窗口函數(shù)處理,每一個窗口分配器都有一個默認的觸發(fā)器,如果默認的觸發(fā)器不能滿足需要,你可以通過調(diào)用trigger(...)來指定一個自定義的觸發(fā)器。
觸發(fā)器的接口有5個方法來允許觸發(fā)器處理不同的事件:
-
onElement()方法,每個元素被添加到窗口時調(diào)用 -
onEventTime()方法,當一個已注冊的事件時間計時器啟動時調(diào)用 -
onProcessingTime()方法,當一個已注冊的處理時間計時器啟動時調(diào)用 -
onMerge()方法,與狀態(tài)性觸發(fā)器相關(guān),當使用會話窗口時,兩個觸發(fā)器對應(yīng)的窗口合并時,合并兩個觸發(fā)器的狀態(tài)。 -
clear()方法,執(zhí)行任何需要清除的相應(yīng)窗口
上面的方法中有兩個需要注意的地方:
- 前三個通過返回一個TriggerResult來決定如何操作調(diào)用他們的事件,這些操作可以是下面操作中的一個:
CONTINUE:什么也不做
FIRE:觸發(fā)計算
PURGE:清除窗口中的數(shù)據(jù)
FIRE_AND_PURGE:觸發(fā)計算并清除窗口中的數(shù)據(jù) - 這些函數(shù)可以注冊 "處理時間定時器" 或者 "事件時間計時器",被用來為后續(xù)的操作使用
觸發(fā)和清除(Fire and Purge)
一旦一個觸發(fā)器決定一個窗口已經(jīng)準備好進行處理,它將觸發(fā)并返回FIRE或者FIRE_AND_PURGE。這是窗口操作==發(fā)送當前窗口結(jié)果的信號==,給定一個擁有一個ProcessWindowFunction的窗口,那么所有的元素都將發(fā)送到ProcessWindowFunction中(可能之后還會發(fā)送到驅(qū)逐器[Evitor]中)。ReduceFunction、AggregateFunction或者FoldFunction的窗口僅僅發(fā)送他們想要的聚合結(jié)果。
當一個觸發(fā)器觸發(fā)時,它可以是FIRE或者FIRE_AND_PURGE,如果是FIRE,將保持window中的內(nèi)容,如果是FIRE_AND_PURGE,會清除window的內(nèi)容。默認情況下,預實現(xiàn)的觸發(fā)器僅僅是FIRE,不會清除window的狀態(tài)。
清除操作僅清除window的內(nèi)容,并留下潛在的窗口元信息和完整的觸發(fā)器狀態(tài)。
默認觸發(fā)器(Default Triggers of WindowAssigners)
默認的觸發(fā)器適用于許多種情況,例如:所有的事件時間分配器都有一個EventTimeTrigger作為默認的觸發(fā)器,這個觸發(fā)器僅在當水印通過窗口的最后時間時觸發(fā)。
GlobalWindow默認的觸發(fā)器是
NeverTrigger,是永遠不會觸發(fā)的,因此,在使用GlobalWindow時,需要定義一個自定義觸發(fā)器。
通過調(diào)用
trigger(...)來指定一個觸發(fā)器,你就重寫了WindowAssigner的默認觸發(fā)器。例如:如果你為TumblingEventTimeWindows指定了一個CountTrigger,就不會再通過時間來獲取觸發(fā)了,而是通過計數(shù)?,F(xiàn)在,如果你想通過時間和計數(shù)來觸發(fā)的話,你需要寫自定義的觸發(fā)器。
內(nèi)置的和自定義的觸發(fā)器(Build-in and Custom Triggers)
Flink有一些內(nèi)置的觸發(fā)器:
- EventTimeTrigger,根據(jù)由水印衡量的事件時間的進度來的
- ProcessingTimeTrigger,根據(jù)處理時間來觸發(fā)
- CountTrigger,一旦窗口中的元素個數(shù)超出了給定的限制就會觸發(fā)
- PurgingTrigger,作為另一個觸發(fā)器的參數(shù)并將它轉(zhuǎn)換成一個清除類型
如果想實現(xiàn)一個自定義的觸發(fā)器,需要使用抽象類Trigger。這個API還在優(yōu)化中,后續(xù)的Flink版本可能會改變。
驅(qū)逐器(Evictors)
Flink的窗口模型允許指定一個除了WindowAssigner和Trigger之外的可選參數(shù)Evitor,這個可以通過調(diào)用evitor(...)方法來實現(xiàn)。這個驅(qū)逐器可以在觸發(fā)器觸發(fā)之前或者之后,或者窗口函數(shù)被應(yīng)用之前清理窗口中的元素。為了達到這個目的,Evitor接口有兩個方法:
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evitorBefore()方法包含了在window function之前應(yīng)用的驅(qū)逐邏輯,而evitorAfter()方法包含了在window function之后應(yīng)用的驅(qū)逐邏輯。在window function應(yīng)用之前被驅(qū)逐的元素將不會再被window function處理。
Flink有三個預實現(xiàn)的驅(qū)逐器:
- CountEvitor:在窗口中保持一個用戶指定數(shù)量的元素,并在窗口的開始處丟棄剩余的其他元素
- DeltaEvitor:通過一個DeltaFunction和一個閾值,計算窗口緩存中最近的一個元素和剩余的所有元素的delta值,并清除delta值大于或者等于閾值的元素
- TimeEvitor:對于一個給定的窗口,使用一個毫秒級的interval作為參數(shù),它會找出元素中的最大時間戳max_ts,并清除時間戳小于(max_ts - interval)的元素。
默認情況下,所有預實現(xiàn)的evitor都是在window function前應(yīng)用它們的邏輯
指定一個Evitor要防止預聚合,因為窗口中的所有元素必須得在計算之前傳遞到驅(qū)逐器中
Flink 并不保證窗口中的元素是有序的,所以驅(qū)逐器可能從窗口的開始處清除,元素到達的先后不是那么必要。
允許延遲(Allowed Lateness)
當使用event-time的window時,可能會出現(xiàn)元素到達晚了,F(xiàn)link用來與事件時間聯(lián)系的水印(watermark)已經(jīng)過了元素所屬的窗口的最后時間。
默認情況下,當水印已經(jīng)過了窗口的最后時間時,晚到的元素會被丟棄。然而,F(xiàn)link允許為窗口操作指定一個最大允許時延,允許時延指定了元素可以晚到多長時間,默認情況下是0,也就是說水印之后到達的元素將被丟棄。
水印已經(jīng)過了窗口最后時間后才來的元素,如果還未到窗口最后時間加時延時間,那么元素任然添加到窗口中。如果依賴觸發(fā)器的使用的話,晚到但是未丟棄的元素可能會導致窗口再次被觸發(fā)。
為了達到這個目的,F(xiàn)link將保持窗口的狀態(tài)直到允許時延的發(fā)生,一旦發(fā)生,F(xiàn)link將清除Window,刪除window的狀態(tài)。
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
當使用GlobalWindows分配器時,沒有數(shù)據(jù)會被認為是延遲的,因為Global Window的最后時間是Long.MAX_VALUE。
以Side Output來獲取延遲數(shù)據(jù)(Getting late data as a side output)
使用Flink的 Side Output 特性,你可以獲得一個已經(jīng)被丟棄的延遲數(shù)據(jù)流。
首先你需要在窗口化的數(shù)據(jù)流中調(diào)用sideOutputLateData(OutputTag)指定你需要獲取延遲數(shù)據(jù)。然后,你就可以在window操作的結(jié)果中獲取到Side output了。
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)