Flink(三) Time & Watermark

1. Time

Flink中的時間(Time)主要分為三種:

  • Event Time:每條數(shù)據(jù)真實的產(chǎn)生時間,這就要求每條進入Flink應(yīng)用的數(shù)據(jù)都要自己帶有時間戳,標明數(shù)據(jù)產(chǎn)生時間;

  • Ingestion Time:是介于Event time和 Processing Time之間的時間。在數(shù)據(jù)通過Source Function 進入Flink應(yīng)用之后,他就會獲取Source Operator的本地時間作為時間戳;

  • Processing Time:即數(shù)據(jù)被處理的時間。當(dāng)我們的Flink程序是使用這個時間進行處理的時候,所有基于時間的操作都會使用當(dāng)前機器的系統(tǒng)時鐘來做為時間戳。

image.png

在應(yīng)用中指定時間類型

在Flink中默認情況下使用的是Processing Time,如果我們使用了Event time或者Ingestion time那么就需要在創(chuàng)建StreamExecutionEnvironment之后調(diào)用setStreamTimeCharacteristic來設(shè)置基準時間。這個設(shè)置指定了數(shù)據(jù)的時間分配,以及窗口操作所使用的時間類型。

下面的這段代碼就指定數(shù)據(jù)的時間類型為Processing Time,窗口大小為1小時。

val env = StreamExecutionEnvironment.getExecutionEnvironment
?
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
?
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
?
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
?
stream
 .keyBy( _.getUser )
 .timeWindow(Time.hours(1))
 .reduce( (a, b) => a.add(b) )
 .addSink(...)

2. Event Time和WaterMark

我們知道Flink支持無界數(shù)據(jù)流的處理,同時也支持窗口操作。Flink的并行度設(shè)置在橫向上對數(shù)據(jù)流進行了分割,而窗口就是在縱向上對數(shù)據(jù)流進行了分割。Flink中窗口的作用是將無限的數(shù)據(jù)流劃分成一個個有限的數(shù)據(jù)集。所以基于窗口的操作都是針對這些有限的數(shù)據(jù)集進行的。

在使用Event time時,我們需要思考一個問題,對于一個無限的數(shù)據(jù)流,窗口大小的情況下,如何確定窗口內(nèi)的數(shù)據(jù)都已經(jīng)全部都到了?例如,現(xiàn)在的窗口大小是1小時。對于有序的數(shù)據(jù)流而言,我們只需要判斷數(shù)據(jù)的時間即可。08:01的數(shù)據(jù)一定是在08:02之前進入應(yīng)用,當(dāng)09:00的數(shù)據(jù)到達時,F(xiàn)link就知道可以操作08:00~09:00的數(shù)據(jù)了。

但是在我們實際的應(yīng)用環(huán)境中,大部分的數(shù)據(jù)流都是無序的,而且影響因素可能有很多。在這種情況下,8:58的數(shù)據(jù)可能是在9點之后才到的,這種情況下,我們的窗口操作又該在何時執(zhí)行呢?


亂序數(shù)據(jù)流

上面的問題總結(jié)一下就是:1. Flink如何確定窗口內(nèi)的數(shù)據(jù)全部都到齊了? 2. Flink如何對待數(shù)據(jù)流中遲到的數(shù)據(jù)?

為了解決上面的問題,需要用到Flink中的Watermark(時間水印)機制。Watermark能夠衡量數(shù)據(jù)進度,確保數(shù)據(jù)在亂序情況下也能被正常處理,得出連續(xù)的結(jié)果。Watermark作為數(shù)據(jù)流中一部分隨數(shù)據(jù)流入下游,當(dāng)一個watermark(t)到達下游時就表示后面的數(shù)據(jù)時間都是遲于t。

在Flink中用戶可以配置最大延遲的時間間隔,F(xiàn)link會用最新的數(shù)據(jù)時間減去這個間隔來更新watermark。當(dāng)watermark時間大于窗口結(jié)束時間,且窗口中有數(shù)據(jù)時,就會立刻觸發(fā)窗口計算。例如,我們以30分鐘做為最大延遲間隔,窗口大小為1個小時,那么窗口時間就應(yīng)該為(00:00-01:00),(02:00-03:00)...(23:00-00:00)。假設(shè)現(xiàn)在有一條03:31的數(shù)據(jù)進入應(yīng)用,它減去半個小時就是03:01大于(02:00-03:00)的結(jié)束時間,那么就認為沒有數(shù)據(jù)時間遲于03:00了,此時如果窗口內(nèi)有數(shù)據(jù)就會立馬觸發(fā)窗口計算。這個計算需要通過延遲間隔和最新的數(shù)據(jù)計算,判斷是否已經(jīng)超過了窗口允許的延遲時間。設(shè)置半個小時就意味著每個窗口的數(shù)據(jù)可以遲到半個小時。如果真的有數(shù)據(jù)超過了這個延遲時間,那我們就需要指定這類遲到數(shù)據(jù)的處理策略了。

2.1 順序數(shù)據(jù)流中的watermark

watermark.png

在數(shù)據(jù)有序的情況下,10:00的數(shù)據(jù)到達時,我們就知道09:0010:00的窗口可以操作了,因為不會有比10點還早的數(shù)據(jù)了,所有09:0010:00窗口內(nèi)的時間都已經(jīng)到了。但是因為我們甚至了30分鐘的watermark,10點減去半個小時為09:30小于窗口的結(jié)束時間,所以它會等,一直等到10:31數(shù)據(jù)來了之后,10:31減半個小時大于10:00。原本早就可以執(zhí)行的計算現(xiàn)在多等了半個小時,所以在數(shù)據(jù)流有序的情況,并不能很好的發(fā)揮watermark的作用,反而會增加應(yīng)用的延遲。

2.2 亂序數(shù)據(jù)流中的watermark

在實際環(huán)境中使用event time,我們也會遇到因為網(wǎng)絡(luò)阻塞或者其他原因?qū)е碌臒o序數(shù)據(jù)流。在這種情況下watermark便可以保證窗口內(nèi)的數(shù)據(jù)按照指定的窗口大小和延遲時間進行計算。值得注意的是,F(xiàn)link的延遲時間是相對于event time而言的,不是根據(jù)系統(tǒng)時間來匹配的。就是說,如果我們設(shè)置的窗口大小為1個小時,延遲時間是10分鐘。對于(09:00~10:00)的窗口而言,它不一定是會在系統(tǒng)時間超過10:10后計算,因為此刻不一定有時間戳大于10:10的數(shù)據(jù)到來。只有當(dāng)watermark大于窗口結(jié)束時間時才會進行窗口操作,watermark一般都是根據(jù)event time計算的。

watermark

如上,假設(shè)窗口大小為1小時,延遲時間設(shè)為10分鐘。明顯,數(shù)據(jù)09:38已經(jīng)遲到,但它依然會被正確計算,只有當(dāng)有數(shù)據(jù)時間大于10:10的數(shù)據(jù)到達之后(即對應(yīng)的watermark大于10:10-10min) 09:00~10:00的窗口才會執(zhí)行計算。

2.3 并行數(shù)據(jù)流中的watermark

對應(yīng)并行度大于1的source task,它每個獨立的subtask都會生成各自的watermark。這些watermark會隨著流數(shù)據(jù)一起分發(fā)到下游算子,并覆蓋掉之前的watermark。當(dāng)有多個watermark同時到達下游算子的時候,flink會選擇較小的watermark進行更新。當(dāng)一個task的watermark大于窗口結(jié)束時間時,就會立馬觸發(fā)窗口操作。

image.png

2.4 在代碼中生成timeStamp和watermark

2.4.1 自定義Source中的timeStamp和watermark

下面通過一個簡單的demo來看一下如何生成數(shù)據(jù)的event time和watermark:

object WordCount {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
?
 val source: DataStream[String] = env.addSource(new MySource)
 val transformatted = source.map(element => {
 val word = element.split("_")(0)
 (word, 1)
 }
 )
 .keyBy(0)
 .timeWindow(Time.seconds(5))
 .sum(1)
?
 transformatted.print()
 env.execute("Word Count")
?
 }
}
?
class MySource extends SourceFunction[String] {
?
 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true

 override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)

 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis() - 3000L
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
?
 println(element)
 //生成帶event time的數(shù)據(jù)
 ctx.collectWithTimestamp(element, timeMills)
 //生成指定時間間隔為1s的watermark
 ctx.emitWatermark(new Watermark(timeMills - 1000))
 }
 }
?
 override def cancel(): Unit = {
 isRunning = false
 }
}

這個demo類似與wordcount,統(tǒng)計5秒內(nèi)字母出現(xiàn)的次數(shù),延遲時間為1s中。我們實現(xiàn)了自定義的SourceFunction,每隔1s生成一個數(shù)據(jù),并通過emiterWatermarkcollectWithTimestamp來生成數(shù)據(jù)的watermark和時間戳,這里的watermark減去了1秒鐘,表示最大延遲時間為1秒鐘。需要注意的是,emitWatermark是需要結(jié)合Event time來使用的。

  • 當(dāng)我們指定了程序中的基準時間為Event time時,則我們需要生成數(shù)據(jù)的Event time和watermark來指定數(shù)據(jù)的延遲時間;

  • 當(dāng)我們指定程序中的基準時間為Ingestion time時,watermark會被自動生成的ingestion time watermarks代替;

  • 當(dāng)我們指定程序中的基準時間為Processing time時,watermark會被忽略掉,因為此刻對于每個task而言,數(shù)據(jù)都是有序的(按到達的先后指定時間)。

我們先看看程序的部分輸出:

M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
O_09:56:45
I_09:56:46
(M,1)
(I,1)
(F,1)
(K,2)
O_09:56:47
U_09:56:48
C_09:56:49
N_09:56:50
V_09:56:51
(I,1)
(O,2)
(C,1)
(U,1)
T_09:56:52
M_09:56:53
G_09:56:54
U_09:56:55
X_09:56:56
(V,1)
(N,1)
(M,1)
(G,1)
(T,1)
X_09:56:57
O_09:56:58
T_09:56:59
U_09:57:00
N_09:57:01
(X,2)
(U,1)
(T,1)
(O,1)
E_09:57:02

我們來分析一波,第一個窗口算出的結(jié)果為:

(M,1)
(I,1)
(F,1)
(K,2)

它只計算了下面的幾個元素:

M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44

正好是5秒5條數(shù)據(jù),但是實際上此刻的數(shù)據(jù)時間已經(jīng)到達了09:56:46,即此刻Source產(chǎn)生的全部數(shù)據(jù)如下:

M_09:56:40
I_09:56:41
K_09:56:42
K_09:56:43
F_09:56:44
O_09:56:45
I_09:56:46

比窗口時間多了2秒。這是因為我們設(shè)置的延遲時間為1s,ctx.emitWatermark(new Watermark(timeMills - 1000))。那么對于(09:56:40~09:56:44)的窗口而言,watermark需要大于09:56:44才會觸發(fā)這個窗口的計算,09:56:44+1s=09:56:45,09:56:44+2s=09:56:46>09:56:44觸發(fā)窗口計算。而45和46的數(shù)據(jù)會被放入下個窗口計算,我們可以推算出,下個窗口是在09:56:51之后才觸發(fā)計算的,實際上也的確如此。

2.4.2 Timestamp Assigner

如果我們使用的是Flink自帶的外部數(shù)據(jù)源,那我們就不可以通過SourceFunction來生成數(shù)據(jù)的timestamp和watermark。這種情況下我們就要借助Flink自帶的Timestamp Assigner來管理數(shù)據(jù)流中的timestamp和watermark了。Time Assigner一般是在source function之后使用的,也可以在我們的后續(xù)的算子之后添加,只要保證它是在第一個時間相關(guān)操作之前被使用就都行。如果我們同時使用了Source Function和Timestamp Assigner,那么后面Timestamp Assigner生成的timestamp和watermark就會覆蓋之前生成的。

Flink中Timestamp Assigner生成watermark的兩種類型:

  • Periodic Watermark: 根據(jù)設(shè)定的時間間隔周期性地生成watermark,通過AssignerWithPeriodicWatermarks接口定義;

  • Punctuated Watermark:根據(jù)特定的數(shù)據(jù)生成watermark,通過AssignerWithPunctuatedWatermarks接口定義。

2.4.2.1 Periodic Watermark

Periodic Watermark Assigner 也有兩種不同的實現(xiàn):升序模式指定間隔

升序模式下,會將數(shù)據(jù)中的指定字段提取出來做為timestamp,而且不需要顯式的指定watermark,默認會使用當(dāng)前的timestamp作為watermark。就跟在Source Function中將watermark也設(shè)為timestamp一樣。因為沒有指定延時間隔,也就是意味著不允許有遲到數(shù)據(jù),所以這種方式比較適合有序的數(shù)據(jù)流。

下面看看如何通過Ascending Timestamp Assigner指定timestamp和watermark:

object PeriodicAssignerWordCount {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
?
 val source: DataStream[(String, Long)] = env.addSource(new MySource)
 //指定timestamp,會使用這個時間作為watermark
 val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignAscendingTimestamps(_._2)
 val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
 .keyBy(0)
 .timeWindow(Time.seconds(5))
 .sum(1)
?
 transformatted.print()
 env.execute("Ascending Timestamp Assigner Word Count")
 }
}
?
class MySource extends SourceFunction[(String, Long)] {
?
 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true
?
 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)
 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis()
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
 println(element)
 ctx.collect((element, timeMills))
 }
 }
?
 override def cancel(): Unit = {
 isRunning = false
 }
}

下面是代碼的部分輸出:

R_14:26:54
L_14:26:55
(R,1)
U_14:26:56
V_14:26:57
O_14:26:58
O_14:26:59
V_14:27:00
(L,1)
(U,1)
(O,2)
(V,1)
R_14:27:01
C_14:27:02
D_14:27:03
D_14:27:04
V_14:27:05
(V,1)
(R,1)
(D,2)
(C,1)

我們看第二個計算窗口:

(L,1)
(U,1)
(O,2)
(V,1)

執(zhí)行這個窗口操作前它接受到的數(shù)據(jù)為:

L_14:26:55
U_14:26:56
V_14:26:57
O_14:26:58
O_14:26:59
V_14:27:00

最后一個數(shù)據(jù)時間,同時也是watermark超過了這個窗口的結(jié)束時間14:26:59,所以觸發(fā)了計算。

以上跟source function的輸出相似,只是少了1s延遲,這里不做贅述。

而指定時間間隔則需要實現(xiàn)抽象類BoundedOutOfOrdernessTimestampExtractor來定義Assigner。這個類的構(gòu)造器接受一個時間作為指定的時間間隔,而抽象方法extractTimestamp則是需要用戶自己定義timestamp抽取邏輯。以下實現(xiàn)了最開始的功能,計算5秒內(nèi)的wordcount,同時指定延遲時間為1s,watermark根據(jù)當(dāng)前timestamp 減去固定延遲時間生成:

object SpecifiedTimeIntervalAssigner {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
?
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
?
 val source: DataStream[(String, Long)] = env.addSource(new MySource)
 //定義timestamp抽取邏輯,同時指定延遲時間為1s
 val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(1)) {
 override def extractTimestamp(element: (String, Long)): Long = element._2
 })
 val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
 .keyBy(0)
 .timeWindow(Time.seconds(5))
 .sum(1)
?
 transformatted.print()
 env.execute("Ascending Timestamp Assigner Word Count")
?
 }
}
?
class MySource extends SourceFunction[(String, Long)] {
?
 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true
?
 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)
 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis()
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
 println(element)
 ctx.collect((element, timeMills))
 }
 }
?
 override def cancel(): Unit = {
 isRunning = false
 }
}

以下是部分輸出,與第一個demo一樣,不做贅述。

N_14:53:03
C_14:53:04
R_14:53:05
L_14:53:06
(N,1)
(C,1)
Q_14:53:07
E_14:53:08
N_14:53:09
M_14:53:10
X_14:53:11
(R,1)
(L,1)
(E,1)
(N,1)
(Q,1)
D_14:53:12
D_14:53:13
K_14:53:14
H_14:53:15
W_14:53:16
(X,1)
(M,1)
(K,1)
(D,2)
2.4.2.2 Punctuated Watermark

我們可以根據(jù)數(shù)據(jù)流中的特殊元素來指定watermark的生成。如果狀態(tài)為1則生成watermark,反之則不生成。生成Punctuated Watermark邏輯需要通過實現(xiàn)AssignerWithPunctuatedWatermarks接口,并在其中指定watermark的生成邏輯和timestamp的抽取邏輯。下面實現(xiàn)了只有當(dāng)“A”出現(xiàn)時才生成watermark的邏輯:

object PunctuatedAssigner {
 def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
?
 env.setParallelism(1)
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
?
 val source: DataStream[(String, Long)] = env.addSource(new MyPunctuatedSource)
 //定義timestamp抽取邏輯,同時指定延遲時間為1s
 val withTimestampAndWatermark: DataStream[(String, Long)] = source.assignTimestampsAndWatermarks(
 new AssignerWithPunctuatedWatermarks[(String, Long)] {
 override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark = {
 if (lastElement._1.split("_")(0).equals("A")) {
 new Watermark(lastElement._2)
 } else {
 null
 }
 }
?
?
 override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = element._2
 }
 )
 val transformatted = withTimestampAndWatermark.map(element => (element._1.split("_")(0), 1))
 .keyBy(0)
 .timeWindow(Time.seconds(5))
 .sum(1)
?
 transformatted.print()
 env.execute("Ascending Timestamp Assigner Word Count")
?
 }
}
?
?
class MyPunctuatedSource extends SourceFunction[(String, Long)] {
?
 val sdf = new SimpleDateFormat("HH:mm:ss")
 var isRunning = true
?
 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
 while (isRunning) {
 Thread.sleep(1000)
 val letter = (65 + new Random().nextInt(25)).toChar
 val timeMills = System.currentTimeMillis()
 val element = s"${letter}_${sdf.format(new Date(timeMills))}".toString
 println(element)
 ctx.collect((element, timeMills))
 }
 }
?
 override def cancel(): Unit = {
 isRunning = false
 }
}

輸出結(jié)果如下:

K_15:15:54
G_15:15:55
V_15:15:56
S_15:15:57
Q_15:15:58
B_15:15:59
G_15:16:00
J_15:16:01
I_15:16:02
K_15:16:03
O_15:16:04
F_15:16:05
V_15:16:06
X_15:16:07
R_15:16:08
N_15:16:09
D_15:16:10
Y_15:16:11
F_15:16:12
D_15:16:13
D_15:16:14
T_15:16:15
G_15:16:16
G_15:16:17
X_15:16:18
G_15:16:19
Y_15:16:20
E_15:16:21
S_15:16:22
A_15:16:23
(K,1)
(G,1)
(S,1)
(Q,1)
(V,1)
(B,1)
(J,1)
(I,1)
(K,1)
(O,1)
(G,1)
(N,1)
(R,1)
(F,1)
(V,1)
(X,1)
(D,3)
(Y,1)
(F,1)
(G,3)
(T,1)
(X,1)

可以看到,當(dāng)“A”出現(xiàn)時,才觸發(fā)窗口計算。需要注意的時,觸發(fā)窗口計算的條件不是“A”出現(xiàn),是“A”的watermark大于窗口的結(jié)束時間。

3. 總結(jié)

在Flink中支持多中時間類型,處理起來最靈活同時也是復(fù)雜的是Event time。當(dāng)我們使用它作為基準時間的時候,我們就需要指定他的生成邏輯。而當(dāng)亂序數(shù)據(jù)流出現(xiàn)的時候,如何區(qū)別出時間窗口何時結(jié)束,進而觸發(fā)基于時間窗口的操作,這就要借助watermark的幫助。使用watermark我們可以自己定義允許數(shù)據(jù)遲到的時間間隔,根據(jù)指定數(shù)據(jù)制定生成邏輯等。值得注意的是,F(xiàn)link中的時間都是相對而言的,當(dāng)我們使用event time是,時間是根據(jù)數(shù)據(jù)的event time而言的,如果大于窗口結(jié)束時間的watermark不出現(xiàn),那便一直不會觸發(fā)窗口操作。當(dāng)然,使用watermark我沒也沒法避免數(shù)據(jù)遲到,這個在于我們自己的取舍。確保數(shù)據(jù)的準確性,我們可能需要設(shè)置較大的延遲時間,這樣數(shù)據(jù)的時效性就可不到保證;如果要確保降低延遲,數(shù)據(jù)的準確性也沒法就沒法保證。而對于這些最終遲到的數(shù)據(jù),F(xiàn)link中也可以指定不同的處理策略,后面再做整理。

以下是本文的思維導(dǎo)圖:


文章思維導(dǎo)圖

參考資料:

《Flink原理、實戰(zhàn)與性能優(yōu)化》 張利兵 著
Flink 官網(wǎng)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容