Flink Window Triggers 觸發(fā)器 和 EVICTORS 清理器

前言

觸發(fā)器定義了window何時(shí)會(huì)被求值以及何時(shí)發(fā)送求值結(jié)果。觸發(fā)器可以到了特定的時(shí)間觸發(fā)也可以碰到特定的事件觸發(fā)。例如:觀察到事件數(shù)量符合一定條件或者觀察到了特定的事件。
清理器是一個(gè)可選的組件,可以被注入到ProcessWindowFunction之前或者之后調(diào)用。evictor可以清除掉window中收集的元素。由于evictor需要迭代所有的元素,所以evictor只能使用在沒有增量聚合函數(shù)作為參數(shù)的情況下。

如果對(duì)window 的概念或者window的分配器不熟悉的話,可以看下前面的文章Flink Window

首先來回顧下關(guān)于Flink Window 操作流程

Flink window operator 流程

當(dāng)一個(gè)事件來到窗口操作符,首先將會(huì)傳給 WindowAssigner 來處理。WindowAssigner 決定
了事件將被分配到哪些窗口。如果窗口不存在,WindowAssigner 將會(huì)創(chuàng)建一個(gè)新的窗口。
如果一個(gè) window operator 接受了一個(gè)增量聚合函數(shù)作為參數(shù),例如 ReduceFunction 或者
AggregateFunction,新到的元素將會(huì)立即被聚合,而聚合結(jié)果 result 將存儲(chǔ)在 window 中。如
果 window operator 沒有使用增量聚合函數(shù),那么新元素將被添加到 ListState 中,ListState 中
保存了所有分配給窗口的元素。
新元素被添加到窗口時(shí),這個(gè)新元素同時(shí)也被傳給了 window 的 trigger。trigger 定義了 window
何時(shí)準(zhǔn)備好求值,何時(shí) window 被清空。trigger 可以基于 window 被分配的元素和注冊(cè)的定
時(shí)器來對(duì)窗口的所有元素求值或者在特定事件清空 window 中所有的元素。

官網(wǎng)提供的關(guān)于WIndows 的算子流程

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

-----------------------------------------------------------------------------------------------------------------------

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Triggers

Trigger確定了窗口(由窗口分配器形成)什么時(shí)候準(zhǔn)備好執(zhí)行窗口函數(shù)處理。每個(gè)WindowAssigner都有一個(gè)默認(rèn)值Trigger。如果默認(rèn)觸發(fā)器不符合您的需求,則可以使用指定自定義觸發(fā)器trigger()。


觸發(fā)器作用

默認(rèn)的觸發(fā)器將會(huì)在兩種情況下觸發(fā)
處理時(shí)間:機(jī)器時(shí)間到達(dá)處理時(shí)間
事件時(shí)間:水位線超過了窗口的結(jié)束時(shí)間

觸發(fā)器可以訪問流的時(shí)間屬性以及定時(shí)器,還可以對(duì)state狀態(tài)編程。所以觸發(fā)器和process function一樣強(qiáng)大。
例如我們可以實(shí)現(xiàn)一個(gè)觸發(fā)邏輯:當(dāng)窗口接收到一定數(shù)量的元素時(shí),觸發(fā)器觸發(fā)。
再比如當(dāng)窗口接收到一個(gè)特定元素時(shí),觸發(fā)器觸發(fā)。
還有就是當(dāng)窗口接收到的元素里面包含特定模式(5秒鐘內(nèi)接收到了兩個(gè)同樣類型的事件),觸發(fā)器也可以觸發(fā)。
在一個(gè)事件時(shí)間的窗口中,一個(gè)自定義的觸發(fā)器可以提前(在水位線沒過窗口結(jié)束時(shí)間之前)計(jì)算和發(fā)射計(jì)算結(jié)果。這是一個(gè)常見的低延遲計(jì)算策略,盡管計(jì)算不完全,但不像默認(rèn)的那樣需要等待水位線沒過窗口結(jié)束時(shí)間。

Trigger API

我們看一下Trigger API:

onElement()  對(duì)于添加到窗口中的每個(gè)元素,都會(huì)調(diào)用該方法。
onEventTime()  當(dāng)注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí),將調(diào)用該方法。
onProcessingTime() 當(dāng)注冊(cè)的處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí),將調(diào)用該方法。
onMerge() 與有狀態(tài)觸發(fā)器相關(guān),并且在兩個(gè)觸發(fā)器的相應(yīng)窗口合并時(shí)(例如,在使用會(huì)話窗口時(shí))合并兩個(gè)觸發(fā)器的狀態(tài)。
clear() 執(zhí)行刪除相應(yīng)窗口后所需的任何操作。

關(guān)于上述方法,需要注意兩件事:

  1. 前面的三個(gè)方法都會(huì)產(chǎn)生一個(gè)TriggerResult來決定窗口接下來發(fā)生什么。TriggerResult可以取以下結(jié)果:

    • CONTINUE:什么都不做
    • FIRE:如果window operator有ProcessWindowFunction這個(gè)參數(shù),將會(huì)調(diào)用這個(gè)0 ProcessWindowFunction。如果窗口僅有增量聚合函數(shù)(ReduceFunction或者AggregateFunction)作為參數(shù),那么當(dāng)前的聚合結(jié)果將會(huì)被發(fā)送。窗口的state不變。
    • PURGE:窗口所有內(nèi)容包括窗口的元數(shù)據(jù)都將被丟棄。
    • FIRE_AND_PURGE:先對(duì)窗口進(jìn)行求值,再將窗口中的內(nèi)容丟棄。
  2. 這些方法中的任何一種都可用于注冊(cè)處理或事件時(shí)間計(jì)時(shí)器以用于將來的操作。

TriggerResult可能的取值使得我們可以實(shí)現(xiàn)很復(fù)雜的窗口邏輯。一個(gè)自定義觸發(fā)器可以觸發(fā)多次,可以計(jì)算或者更新結(jié)果,可以在發(fā)送結(jié)果之前清空窗口。

Fire and Purge

當(dāng)Trigger確定窗口已準(zhǔn)備好進(jìn)行處理,返回FIRE或FIRE_AND_PURGE。window operator根據(jù) FIRE或FIRE_AND_PURGE 去執(zhí)行當(dāng)前窗口的操作。像ProcessWindowFunction全窗口函數(shù)會(huì)處理 all elements,ReduceFunction,AggregateFunction或,F(xiàn)oldFunction增量聚合函數(shù)只會(huì)增量聚合

當(dāng)觸發(fā)器觸發(fā)時(shí),它可以是FIRE或FIRE_AND_PURGE。在FIRE保留窗口內(nèi)容的同時(shí),F(xiàn)IRE_AND_PURGE刪除其內(nèi)容。默認(rèn)情況下,預(yù)先實(shí)現(xiàn)的觸發(fā)器僅觸發(fā)FIRE而不會(huì)清除窗口狀態(tài)。

注意: Purge 將僅刪除窗口的內(nèi)容,并將保留有關(guān)該窗口的任何潛在元信息以及任何trigger state 。

當(dāng)在trigger中使用per-window state時(shí),這里我們需要保證當(dāng)窗口被刪除時(shí)state也要被刪除,否則隨著時(shí)間的推移,window operator將會(huì)積累越來越多的數(shù)據(jù),最終可能使應(yīng)用崩潰。

當(dāng)窗口被刪除時(shí),為了清空所有狀態(tài),觸發(fā)器的clear()方法需要需要?jiǎng)h掉所有的自定義per-window state,以及使用TriggerContext對(duì)象將處理時(shí)間和事件時(shí)間的定時(shí)器都刪除。

WindowAssigners的默認(rèn)觸發(fā)器

默認(rèn)Trigger的WindowAssigner是適用于很多情況。例如,所有事件時(shí)間窗口分配器都有EventTimeTrigger默認(rèn)觸發(fā)器。一旦WaterMark通過窗口的閉合時(shí)間,此trigger便會(huì)觸發(fā)。

注意: GlobalWindow的 默認(rèn)觸發(fā)器是NeverTrigger永不觸發(fā)。因此,使用時(shí),GlobalWindow使用時(shí)必須定義一個(gè)自定義觸發(fā)器
注意: 通過使用指定觸發(fā)器,trigger()您將覆蓋的默認(rèn)觸發(fā)器WindowAssigner。例如,如果您指定為 CountTrigger,則TumblingEventTimeWindows您將不再基于時(shí)間進(jìn)度而是僅通過計(jì)數(shù)獲得窗口觸發(fā)?,F(xiàn)在,如果要基于時(shí)間和計(jì)數(shù)做出反應(yīng),則必須編寫自己的自定義觸發(fā)器。

FLink 自帶的和自定義Triggers

  • Flink帶有一些內(nèi)置觸發(fā)器。
 EventTimeTrigger根據(jù)事件時(shí)間(由WaterMark判斷)的進(jìn)度觸發(fā)。
 ProcessingTimeTrigger基于處理時(shí)間的Trigger。
 CountTrigger一旦窗口中的元素?cái)?shù)量超過給定的限制,就會(huì)觸發(fā)。
 PurgingTrigger
  • 通過Trigger 類實(shí)現(xiàn)自定義觸發(fā)器

請(qǐng)注意,API仍在不斷更新,并可能在Flink的未來版本中更改。

Triggers 使用案例

下面的例子展示了一個(gè)觸發(fā)器在窗口結(jié)束時(shí)間之前觸發(fā)。當(dāng)?shù)谝粋€(gè)事件被分配到窗口時(shí),這個(gè)觸發(fā)器注冊(cè)了一個(gè)定時(shí)器,定時(shí)時(shí)間為水位線之前一秒鐘。當(dāng)定時(shí)事件執(zhí)行,將會(huì)注冊(cè)一個(gè)新的定時(shí)事件,這樣,這個(gè)觸發(fā)器每秒鐘最多觸發(fā)一次。

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TriggerExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stream = env
      .socketTextStream("localhost", 9999, '\n')
      .map(line => {
        val arr = line.split(" ")
        (arr(0), arr(1).toLong)
      })
      .assignAscendingTimestamps(_._2)
      .keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .trigger(new OneSecondIntervalTrigger)
      .process(new WindowCount)

    stream.print()

    env.execute()
  }

  // 只在整數(shù)秒和窗口結(jié)束時(shí)間時(shí)觸發(fā)窗口計(jì)算!
  class OneSecondIntervalTrigger extends Trigger[(String, Long), TimeWindow] {
    // 每來一條數(shù)據(jù)都要調(diào)用一次!
    override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
      // 默認(rèn)值為false
      // 當(dāng)?shù)谝粭l事件來的時(shí)候,會(huì)在后面的代碼中將firstSeen置為true
      val firstSeen = ctx.getPartitionedState(
        new ValueStateDescriptor[Boolean]("first-seen", Types.of[Boolean])
      )

      // 當(dāng)?shù)谝粭l數(shù)據(jù)來的時(shí)候,!firstSeen.value()為true
      // 僅對(duì)第一條數(shù)據(jù)注冊(cè)定時(shí)器
      // 這里的定時(shí)器指的是:onEventTime函數(shù)!
      if (!firstSeen.value()) {
        // 第一條數(shù)據(jù)`a 1234`來的時(shí)候,水位線是:-9223372036854775808
        // 過了200ms,插入了一個(gè)水位線1234 - 1 = 1233
        // 水位線后面的整數(shù)秒是:-9223372036854774000
        println("第一條數(shù)據(jù)來了!當(dāng)前水位線是:" + ctx.getCurrentWatermark)
        val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
        println("第一條數(shù)據(jù)來了以后,注冊(cè)的定時(shí)器的整數(shù)秒的時(shí)間戳是:" + t)
        ctx.registerEventTimeTimer(t) // 在第一條數(shù)據(jù)的時(shí)間戳之后的整數(shù)秒注冊(cè)一個(gè)定時(shí)器
        ctx.registerEventTimeTimer(window.getEnd) // 在窗口結(jié)束事件注冊(cè)一個(gè)定時(shí)器
        firstSeen.update(true)
      }
      TriggerResult.CONTINUE
    }

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
      TriggerResult.CONTINUE
    }

    // 定時(shí)器函數(shù),在水位線到達(dá)time時(shí),觸發(fā)
    override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
      // 在onElement函數(shù)中,我們注冊(cè)過窗口結(jié)束時(shí)間的定時(shí)器
      if (time == window.getEnd) {
        // 在窗口閉合時(shí),觸發(fā)計(jì)算并清空窗口
        TriggerResult.FIRE_AND_PURGE
      } else {
        // 1233ms后面的整數(shù)秒是2000ms
        val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
        // 保證t小于窗口結(jié)束時(shí)間
        if (t < window.getEnd) {
          println("注冊(cè)的定時(shí)器的整數(shù)秒的時(shí)間戳是:" + t)
          // 這里注冊(cè)的定時(shí)器還是onEventTime函數(shù)
          ctx.registerEventTimeTimer(t)
        }
        // 觸發(fā)窗口計(jì)算
        println("在 " + time + " 觸發(fā)了窗口計(jì)算!")
        TriggerResult.FIRE
      }
    }

    override def clear(window: TimeWindow, ctx: TriggerContext): Unit = {
      // 狀態(tài)變量是一個(gè)單例!
      val firstSeen = ctx.getPartitionedState(
        new ValueStateDescriptor[Boolean]("first-seen", Types.of[Boolean])
      )

      firstSeen.clear()
    }
  }

  class WindowCount extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
      out.collect("窗口中有 " + elements.size + " 條數(shù)據(jù)!窗口結(jié)束時(shí)間是" + context.window.getEnd)
    }
  }
}

Evictors

evictor可以在window function求值之前或者之后移除窗口中的元素。

我們看一下Evictor的接口定義:

public interface Evictor<T, W extends Window>  extends Serializable {
  void evictBefore(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  void evictAfter(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  interface EvictorContext {

    long getCurrentProcessingTime();

    long getCurrentWatermark();
  }
}

evictBefore()和evictAfter()分別在window function計(jì)算之前或者之后調(diào)用。Iterable迭代器包含了窗口所有的元素,size為窗口中元素的數(shù)量,window object和EvictorContext可以訪問當(dāng)前處理時(shí)間和水位線??梢詫?duì)Iterator調(diào)用remove()方法來移除窗口中的元素。

evictor也經(jīng)常被用在GlobalWindow上,用來清除部分元素,而不是將窗口中的元素全部清空。

參考flink 官網(wǎng) flink Windows trigger

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容