flink的event time機(jī)制

時間概念

Flink在流程序中支持三種時間概念:

  • 處理時間(Processing Time):處理時間是指執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時間。

當(dāng)流程序在處理時間上運(yùn)行時,所有基于時間的操作(如時間窗口)將使用運(yùn)行各自操作符的機(jī)器的系統(tǒng)時間。

處理時間是最簡單的時間概念,不需要流和機(jī)器之間的協(xié)調(diào)。它提供最佳性能和最低延遲。但是,在分布式和異步環(huán)境中,處理時間不提供確定性,因?yàn)樗苋菀资艿接涗浀竭_(dá)系統(tǒng)的速度,記錄在系統(tǒng)內(nèi)部操作符之間流動的速度以及中斷的影響。

  • 事件時間(Event Time):事件時間是每個單獨(dú)事件在其生成設(shè)備上發(fā)生的時間。

這個時間通常在記錄輸入Flink之前嵌入到記錄中,并且可以從每個記錄中提取事件時間戳。在事件時間中,時間的進(jìn)展取決于數(shù)據(jù),而不是任何掛鐘。

事件時間程序必須指定如何生成事件時間Watermarks,這是表示事件時間進(jìn)度的機(jī)制。

  • 攝入時間(Ingestion time):攝入時間是事件進(jìn)入Flink的時間。

在源操作符中,每個記錄以時間戳的形式獲取源的當(dāng)前時間,基于時間的操作(如時間窗口)引用該時間戳。

從概念上講,攝入時間介于事件時間和處理時間之間。與事件時間相比,攝入時間程序不能處理任何無序事件或延遲數(shù)據(jù),但程序不必指定如何生成Watermarks,因?yàn)樵趦?nèi)部,它自動進(jìn)行時間戳分配和自動Watermarks生成。

設(shè)定時間特征

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

event time和watermarks

設(shè)置時間特征為事件時間的流處理器需要一種方法來衡量事件時間的進(jìn)度,F(xiàn)link衡量event time 進(jìn)度的機(jī)制是watermarks,watermark 帶有一個時間戳,作為數(shù)據(jù)流的一部分隨數(shù)據(jù)流流動,Watermark(t) 表示event time 小于等于 t 的都已經(jīng)到達(dá),watermarks可以解決實(shí)時系統(tǒng)中最常見的問題:亂序與延遲。

生成watermark的方法:

  • 在source中,直接生成watermark,由source生成的watermark 優(yōu)先級比較低,可以被另一個方法產(chǎn)生的watermark覆蓋掉。
  • 通過時間戳分配器(timestamp assigner)來生成水印(watermark)。時間戳分配器分兩種:Periodic: 周期性(一定時間間隔或一定數(shù)據(jù)量)產(chǎn)生watermark;Punctuated: 間斷的 watermark,一般根據(jù)event 決定是否產(chǎn)生新watermark。

方法一:直接在source中生成watermark

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

方法二:通過時間戳分配器生成watermark
首先需要指定時間戳分配器,時間戳分配器通常在數(shù)據(jù)源之后立即指定,但并非嚴(yán)格要求這樣做。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

周期性時間戳分配器,如下所示:可以看出,自定義的時間戳分配器需要實(shí)現(xiàn)AssignerWithPeriodicWatermarks 接口,其中g(shù)etCurrentWatermark 產(chǎn)生新的watermark,如果返回非空且大于原來的watermark,則生成了新的watermark;另外,extractTimestamp 用于給數(shù)據(jù)加上時間戳,這個時間戳在后續(xù)所有基于event time的計算中使用。
ExecutionConfig.setAutoWatermarkInterval(...) 定義了watermark產(chǎn)生的時間間隔,單位是毫秒。

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

間斷性的時間戳分配器,如下所示:根據(jù)event來確定是否需要產(chǎn)生新的watermark,定義Punctuated 分配器需要實(shí)現(xiàn)AssignerWithPunctuatedWatermarks接口,包括函數(shù)extractTimestamp,checkAndGetNextWatermark,其中extractTimestamp 同Periodic Assigner,首先調(diào)用;然后調(diào)用checkAndGetNextWatermark ,用于確定是否需要產(chǎn)生新的watermark,當(dāng)checkAndGetNextWatermark 產(chǎn)生一個非空且大于上一個watermark時就產(chǎn)生了新的watermark。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

Flink允許程序員分配自己的時間戳并發(fā)出自己的watermarks。更具體地說,可以通過實(shí)現(xiàn)其中一個AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口來實(shí)現(xiàn),具體取決于用例。簡而言之,第一個會周期性地發(fā)出watermarks,而第二個會根據(jù)傳入記錄的某些屬性發(fā)出watermarks。為了進(jìn)一步簡化此類任務(wù)的編程工作,F(xiàn)link提供了兩種已經(jīng)實(shí)現(xiàn)的時間戳分配器:AscendingTimestampExtractor和BoundedOutOfOrdernessTimestampExtractor。

  • AscendingTimestampExtractor: 這是AssignerWithPeriodicWatermarks 的最簡單的情況,數(shù)據(jù)流是按時間戳升序到達(dá)Flink的,這種情況下,數(shù)據(jù)里的時間戳就可以作為watermark。
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
  • BoundedOutOfOrdernessTimestampExtractor: 這也是一個AssignerWithPeriodicWatermarks 的實(shí)現(xiàn),表示已知數(shù)據(jù)的最大延遲,在丟棄元素之前允許元素延遲的最長時間,大于最大延遲時長的元素將被丟棄。
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容