前言
觸發(fā)器定義了window何時(shí)會(huì)被求值以及何時(shí)發(fā)送求值結(jié)果。觸發(fā)器可以到了特定的時(shí)間觸發(fā)也可以碰到特定的事件觸發(fā)。例如:觀察到事件數(shù)量符合一定條件或者觀察到了特定的事件。
清理器是一個(gè)可選的組件,可以被注入到ProcessWindowFunction之前或者之后調(diào)用。evictor可以清除掉window中收集的元素。由于evictor需要迭代所有的元素,所以evictor只能使用在沒有增量聚合函數(shù)作為參數(shù)的情況下。
如果對(duì)window 的概念或者window的分配器不熟悉的話,可以看下前面的文章Flink Window
首先來回顧下關(guān)于Flink Window 操作流程
Flink window operator 流程
當(dāng)一個(gè)事件來到窗口操作符,首先將會(huì)傳給 WindowAssigner 來處理。WindowAssigner 決定
了事件將被分配到哪些窗口。如果窗口不存在,WindowAssigner 將會(huì)創(chuàng)建一個(gè)新的窗口。
如果一個(gè) window operator 接受了一個(gè)增量聚合函數(shù)作為參數(shù),例如 ReduceFunction 或者
AggregateFunction,新到的元素將會(huì)立即被聚合,而聚合結(jié)果 result 將存儲(chǔ)在 window 中。如
果 window operator 沒有使用增量聚合函數(shù),那么新元素將被添加到 ListState 中,ListState 中
保存了所有分配給窗口的元素。
新元素被添加到窗口時(shí),這個(gè)新元素同時(shí)也被傳給了 window 的 trigger。trigger 定義了 window
何時(shí)準(zhǔn)備好求值,何時(shí) window 被清空。trigger 可以基于 window 被分配的元素和注冊(cè)的定
時(shí)器來對(duì)窗口的所有元素求值或者在特定事件清空 window 中所有的元素。
官網(wǎng)提供的關(guān)于WIndows 的算子流程
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
-----------------------------------------------------------------------------------------------------------------------
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Triggers
Trigger確定了窗口(由窗口分配器形成)什么時(shí)候準(zhǔn)備好執(zhí)行窗口函數(shù)處理。每個(gè)WindowAssigner都有一個(gè)默認(rèn)值Trigger。如果默認(rèn)觸發(fā)器不符合您的需求,則可以使用指定自定義觸發(fā)器trigger()。
觸發(fā)器作用
默認(rèn)的觸發(fā)器將會(huì)在兩種情況下觸發(fā)
處理時(shí)間:機(jī)器時(shí)間到達(dá)處理時(shí)間
事件時(shí)間:水位線超過了窗口的結(jié)束時(shí)間
觸發(fā)器可以訪問流的時(shí)間屬性以及定時(shí)器,還可以對(duì)state狀態(tài)編程。所以觸發(fā)器和process function一樣強(qiáng)大。
例如我們可以實(shí)現(xiàn)一個(gè)觸發(fā)邏輯:當(dāng)窗口接收到一定數(shù)量的元素時(shí),觸發(fā)器觸發(fā)。
再比如當(dāng)窗口接收到一個(gè)特定元素時(shí),觸發(fā)器觸發(fā)。
還有就是當(dāng)窗口接收到的元素里面包含特定模式(5秒鐘內(nèi)接收到了兩個(gè)同樣類型的事件),觸發(fā)器也可以觸發(fā)。
在一個(gè)事件時(shí)間的窗口中,一個(gè)自定義的觸發(fā)器可以提前(在水位線沒過窗口結(jié)束時(shí)間之前)計(jì)算和發(fā)射計(jì)算結(jié)果。這是一個(gè)常見的低延遲計(jì)算策略,盡管計(jì)算不完全,但不像默認(rèn)的那樣需要等待水位線沒過窗口結(jié)束時(shí)間。
Trigger API
我們看一下Trigger API:
onElement() 對(duì)于添加到窗口中的每個(gè)元素,都會(huì)調(diào)用該方法。
onEventTime() 當(dāng)注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí),將調(diào)用該方法。
onProcessingTime() 當(dāng)注冊(cè)的處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí),將調(diào)用該方法。
onMerge() 與有狀態(tài)觸發(fā)器相關(guān),并且在兩個(gè)觸發(fā)器的相應(yīng)窗口合并時(shí)(例如,在使用會(huì)話窗口時(shí))合并兩個(gè)觸發(fā)器的狀態(tài)。
clear() 執(zhí)行刪除相應(yīng)窗口后所需的任何操作。
關(guān)于上述方法,需要注意兩件事:
-
前面的三個(gè)方法都會(huì)產(chǎn)生一個(gè)TriggerResult來決定窗口接下來發(fā)生什么。TriggerResult可以取以下結(jié)果:
- CONTINUE:什么都不做
- FIRE:如果window operator有ProcessWindowFunction這個(gè)參數(shù),將會(huì)調(diào)用這個(gè)0 ProcessWindowFunction。如果窗口僅有增量聚合函數(shù)(ReduceFunction或者AggregateFunction)作為參數(shù),那么當(dāng)前的聚合結(jié)果將會(huì)被發(fā)送。窗口的state不變。
- PURGE:窗口所有內(nèi)容包括窗口的元數(shù)據(jù)都將被丟棄。
- FIRE_AND_PURGE:先對(duì)窗口進(jìn)行求值,再將窗口中的內(nèi)容丟棄。
這些方法中的任何一種都可用于注冊(cè)處理或事件時(shí)間計(jì)時(shí)器以用于將來的操作。
TriggerResult可能的取值使得我們可以實(shí)現(xiàn)很復(fù)雜的窗口邏輯。一個(gè)自定義觸發(fā)器可以觸發(fā)多次,可以計(jì)算或者更新結(jié)果,可以在發(fā)送結(jié)果之前清空窗口。
Fire and Purge
當(dāng)Trigger確定窗口已準(zhǔn)備好進(jìn)行處理,返回FIRE或FIRE_AND_PURGE。window operator根據(jù) FIRE或FIRE_AND_PURGE 去執(zhí)行當(dāng)前窗口的操作。像ProcessWindowFunction全窗口函數(shù)會(huì)處理 all elements,ReduceFunction,AggregateFunction或,F(xiàn)oldFunction增量聚合函數(shù)只會(huì)增量聚合
當(dāng)觸發(fā)器觸發(fā)時(shí),它可以是FIRE或FIRE_AND_PURGE。在FIRE保留窗口內(nèi)容的同時(shí),F(xiàn)IRE_AND_PURGE刪除其內(nèi)容。默認(rèn)情況下,預(yù)先實(shí)現(xiàn)的觸發(fā)器僅觸發(fā)FIRE而不會(huì)清除窗口狀態(tài)。
注意: Purge 將僅刪除窗口的內(nèi)容,并將保留有關(guān)該窗口的任何潛在元信息以及任何trigger state 。
當(dāng)在trigger中使用per-window state時(shí),這里我們需要保證當(dāng)窗口被刪除時(shí)state也要被刪除,否則隨著時(shí)間的推移,window operator將會(huì)積累越來越多的數(shù)據(jù),最終可能使應(yīng)用崩潰。
當(dāng)窗口被刪除時(shí),為了清空所有狀態(tài),觸發(fā)器的clear()方法需要需要?jiǎng)h掉所有的自定義per-window state,以及使用TriggerContext對(duì)象將處理時(shí)間和事件時(shí)間的定時(shí)器都刪除。
WindowAssigners的默認(rèn)觸發(fā)器
默認(rèn)Trigger的WindowAssigner是適用于很多情況。例如,所有事件時(shí)間窗口分配器都有EventTimeTrigger默認(rèn)觸發(fā)器。一旦WaterMark通過窗口的閉合時(shí)間,此trigger便會(huì)觸發(fā)。
注意: GlobalWindow的 默認(rèn)觸發(fā)器是NeverTrigger永不觸發(fā)。因此,使用時(shí),GlobalWindow使用時(shí)必須定義一個(gè)自定義觸發(fā)器
注意: 通過使用指定觸發(fā)器,trigger()您將覆蓋的默認(rèn)觸發(fā)器WindowAssigner。例如,如果您指定為 CountTrigger,則TumblingEventTimeWindows您將不再基于時(shí)間進(jìn)度而是僅通過計(jì)數(shù)獲得窗口觸發(fā)?,F(xiàn)在,如果要基于時(shí)間和計(jì)數(shù)做出反應(yīng),則必須編寫自己的自定義觸發(fā)器。
FLink 自帶的和自定義Triggers
- Flink帶有一些內(nèi)置觸發(fā)器。
EventTimeTrigger根據(jù)事件時(shí)間(由WaterMark判斷)的進(jìn)度觸發(fā)。
ProcessingTimeTrigger基于處理時(shí)間的Trigger。
CountTrigger一旦窗口中的元素?cái)?shù)量超過給定的限制,就會(huì)觸發(fā)。
PurgingTrigger
- 通過Trigger 類實(shí)現(xiàn)自定義觸發(fā)器
請(qǐng)注意,API仍在不斷更新,并可能在Flink的未來版本中更改。
Triggers 使用案例
下面的例子展示了一個(gè)觸發(fā)器在窗口結(jié)束時(shí)間之前觸發(fā)。當(dāng)?shù)谝粋€(gè)事件被分配到窗口時(shí),這個(gè)觸發(fā)器注冊(cè)了一個(gè)定時(shí)器,定時(shí)時(shí)間為水位線之前一秒鐘。當(dāng)定時(shí)事件執(zhí)行,將會(huì)注冊(cè)一個(gè)新的定時(shí)事件,這樣,這個(gè)觸發(fā)器每秒鐘最多觸發(fā)一次。
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TriggerExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.socketTextStream("localhost", 9999, '\n')
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1).toLong)
})
.assignAscendingTimestamps(_._2)
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.trigger(new OneSecondIntervalTrigger)
.process(new WindowCount)
stream.print()
env.execute()
}
// 只在整數(shù)秒和窗口結(jié)束時(shí)間時(shí)觸發(fā)窗口計(jì)算!
class OneSecondIntervalTrigger extends Trigger[(String, Long), TimeWindow] {
// 每來一條數(shù)據(jù)都要調(diào)用一次!
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
// 默認(rèn)值為false
// 當(dāng)?shù)谝粭l事件來的時(shí)候,會(huì)在后面的代碼中將firstSeen置為true
val firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean]("first-seen", Types.of[Boolean])
)
// 當(dāng)?shù)谝粭l數(shù)據(jù)來的時(shí)候,!firstSeen.value()為true
// 僅對(duì)第一條數(shù)據(jù)注冊(cè)定時(shí)器
// 這里的定時(shí)器指的是:onEventTime函數(shù)!
if (!firstSeen.value()) {
// 第一條數(shù)據(jù)`a 1234`來的時(shí)候,水位線是:-9223372036854775808
// 過了200ms,插入了一個(gè)水位線1234 - 1 = 1233
// 水位線后面的整數(shù)秒是:-9223372036854774000
println("第一條數(shù)據(jù)來了!當(dāng)前水位線是:" + ctx.getCurrentWatermark)
val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
println("第一條數(shù)據(jù)來了以后,注冊(cè)的定時(shí)器的整數(shù)秒的時(shí)間戳是:" + t)
ctx.registerEventTimeTimer(t) // 在第一條數(shù)據(jù)的時(shí)間戳之后的整數(shù)秒注冊(cè)一個(gè)定時(shí)器
ctx.registerEventTimeTimer(window.getEnd) // 在窗口結(jié)束事件注冊(cè)一個(gè)定時(shí)器
firstSeen.update(true)
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
// 定時(shí)器函數(shù),在水位線到達(dá)time時(shí),觸發(fā)
override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
// 在onElement函數(shù)中,我們注冊(cè)過窗口結(jié)束時(shí)間的定時(shí)器
if (time == window.getEnd) {
// 在窗口閉合時(shí),觸發(fā)計(jì)算并清空窗口
TriggerResult.FIRE_AND_PURGE
} else {
// 1233ms后面的整數(shù)秒是2000ms
val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
// 保證t小于窗口結(jié)束時(shí)間
if (t < window.getEnd) {
println("注冊(cè)的定時(shí)器的整數(shù)秒的時(shí)間戳是:" + t)
// 這里注冊(cè)的定時(shí)器還是onEventTime函數(shù)
ctx.registerEventTimeTimer(t)
}
// 觸發(fā)窗口計(jì)算
println("在 " + time + " 觸發(fā)了窗口計(jì)算!")
TriggerResult.FIRE
}
}
override def clear(window: TimeWindow, ctx: TriggerContext): Unit = {
// 狀態(tài)變量是一個(gè)單例!
val firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean]("first-seen", Types.of[Boolean])
)
firstSeen.clear()
}
}
class WindowCount extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
out.collect("窗口中有 " + elements.size + " 條數(shù)據(jù)!窗口結(jié)束時(shí)間是" + context.window.getEnd)
}
}
}
Evictors
evictor可以在window function求值之前或者之后移除窗口中的元素。
我們看一下Evictor的接口定義:
public interface Evictor<T, W extends Window> extends Serializable {
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
interface EvictorContext {
long getCurrentProcessingTime();
long getCurrentWatermark();
}
}
evictBefore()和evictAfter()分別在window function計(jì)算之前或者之后調(diào)用。Iterable迭代器包含了窗口所有的元素,size為窗口中元素的數(shù)量,window object和EvictorContext可以訪問當(dāng)前處理時(shí)間和水位線??梢詫?duì)Iterator調(diào)用remove()方法來移除窗口中的元素。
evictor也經(jīng)常被用在GlobalWindow上,用來清除部分元素,而不是將窗口中的元素全部清空。
參考flink 官網(wǎng) flink Windows trigger