Flink水位線不觸發(fā)問題
窗口計(jì)算時(shí)遇到好幾次水位線不觸發(fā)的情況,簡(jiǎn)單總結(jié)下。
首先,介紹下Flink的事件時(shí)間(EventTime)和水位線(Watermarks)的概念。
一、處理時(shí)間
如果要構(gòu)造一個(gè)實(shí)時(shí)的流式應(yīng)用,或早或晚都會(huì)接觸到EventTime這個(gè)概念?,F(xiàn)實(shí)場(chǎng)景中也會(huì)遇到消息亂序到達(dá),這里會(huì)介紹到為什么需要事件時(shí)間和如何去處理亂序到達(dá)的數(shù)據(jù)。
ProcessingTime是Flink系統(tǒng)處理這條消息的時(shí)間,EventTime可以理解成是這條消息真實(shí)發(fā)生的時(shí)間。
舉個(gè)例子,創(chuàng)建一個(gè)SlidingWindow,窗口大小為10秒,步長(zhǎng)為5秒。關(guān)于窗口的更多概念,可以參考Flink官方文檔——Windows。
案例1:消息都按時(shí)到達(dá)
val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("ProcessingTime processing example")
如果source中有三條消息,對(duì)應(yīng)的事件時(shí)間分別為13秒、13秒和16秒:
[站外圖片上傳中...(image-1e2304-1550219427900)]
它們會(huì)落到正確的窗口上,如下圖所示。13秒產(chǎn)生的兩條消息會(huì)落到窗口1[5s-15s]和窗口2[10s-20s]上,16秒產(chǎn)生的消息會(huì)落到窗口2[10s-20s]和窗口3[15s-25s]上。最后窗口fire掉時(shí),三個(gè)窗口的count值分別為:(a,2), (a,3) and (a,1) ,和預(yù)期一致。
案例2:消息delay
其中一條13秒產(chǎn)生的消息晚到了6s,那按上面的代碼邏輯,這些消息會(huì)落到下面的窗口中:
延遲的消息會(huì)落到窗口2[10s-20s]和窗口3[15s-25s]上。這看起來對(duì)窗口2沒有影響,因?yàn)榻Y(jié)果都是3,但窗口3的結(jié)果卻不一致了。
二、事件時(shí)間
因此此處我們采用事件時(shí)間,這里水位線的事件時(shí)間為當(dāng)前系統(tǒng)的時(shí)間,當(dāng)然你可以改成數(shù)據(jù)中的某個(gè)時(shí)間。
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
e.split(",")(1).toLong
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis)
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("EventTime processing example")
結(jié)果如下圖所示:
[站外圖片上傳中...(image-b2feb1-1550219427900)]
結(jié)果看起來好很多,窗口2和3都正確了,但窗口1卻丟了一條數(shù)據(jù)。
Flink沒有將delay的數(shù)據(jù)分配給窗口3是因?yàn)楝F(xiàn)在是檢查消息的事件時(shí)間,因此不會(huì)放入窗口三中。而沒有分配給窗口1的原因是delayed的消息到達(dá)系統(tǒng)的時(shí)間是19秒,窗口1已經(jīng)fire掉了。此處就需要watermarks了。
三、水位線(水?。?/h2>
我認(rèn)為水位線是很重要和有趣的一個(gè)概念,我這里會(huì)大概描述下,如果想了解更多可以看Google一場(chǎng)精彩的talk,也可以看這個(gè)dataArtisans的blog。水位線簡(jiǎn)單理解就是一個(gè)timestamp,當(dāng)Flink收到這個(gè)水印時(shí),F(xiàn)link理解會(huì)收到來自這個(gè)時(shí)間點(diǎn)之后的消息,也可以理解成告訴Flink運(yùn)行到哪個(gè)事件時(shí)間了。
在這個(gè)案例中,其實(shí)就是告訴Flink一條消息可以遲到多久。
我現(xiàn)在設(shè)置水位線為現(xiàn)在的事件提前5秒,相當(dāng)于告訴Flink我的消息可以遲到五秒。
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis - 5000)
}
結(jié)果變成下圖所示:
[站外圖片上傳中...(image-6dfb6e-1550219427900)]
四、允許延遲(Lateness)
如果采用“watermark - delay”,如果水位線不超過window_length + delay是不會(huì)被fire掉的,所以此刻可以采用allowedLateness方法。在window_end_time + allowed lateness之前,F(xiàn)link都不會(huì)丟棄這條數(shù)據(jù)。
當(dāng)消息到達(dá)時(shí),F(xiàn)link會(huì)提取它的時(shí)間,然后判斷它是否在有效的延遲時(shí)間內(nèi),然后去判斷是否fire掉窗口。
但是通過這種途徑,一個(gè)窗口有可能被fire掉多次,如果需要exactly once processing的話,需要保證sink是冪等的。
五、水位線怎么不觸發(fā)?
數(shù)據(jù)一直有序得進(jìn)來,為什么沒有窗口被fire掉?沒有結(jié)果產(chǎn)出?
case1:提取時(shí)間失敗
筆者和上游約定好了數(shù)據(jù)格式,extractTimestamp中提取的是某個(gè)字段為事件時(shí)間。研究數(shù)據(jù)發(fā)現(xiàn)約定好的字段突然不發(fā)了。
case2:提取時(shí)間有了,但是照樣失敗
上游按約定發(fā)了該字段后,系統(tǒng)在測(cè)試環(huán)境運(yùn)行了一段時(shí)間,又沒有結(jié)果產(chǎn)出了。
調(diào)試發(fā)現(xiàn)提取時(shí)間正常,checkAndGetNextWatermark也正常,但是為什么窗口沒被fire掉呢。
因?yàn)闀r(shí)間的format變了,由到毫秒的timestamp變成了yyyymmddHHmmss,需要轉(zhuǎn)成timestamp。
case3:一切正常,窗口不fire
EventTimeTrigger.java
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
看了下EventTimeTrigger的源碼,只有當(dāng)當(dāng)前的水位線越過窗口,即時(shí)間大于窗口的endTime才會(huì)觸發(fā)Fire的操作。我們的處理流程沒有觸發(fā),那就說明我們的水位線沒有更新到合適的值。調(diào)試后發(fā)現(xiàn)當(dāng)前的水位線一直停留在初始的最小的long值。
BoundedOutOfOrdernessTimestampExtractor.java
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
debug發(fā)現(xiàn)lastEmittedWatermark確實(shí)有更新,這說明這個(gè)地方是觸發(fā)了Watermark的值。但是debug的過程中,發(fā)現(xiàn)時(shí)不時(shí)會(huì)出現(xiàn)初始值的水位線。
SystemProcessingTimeService.java
TimestampsAndPeriodicWatermarksOperator.java
@Override
public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
TimestampsAndPeriodicWatermarksOperator會(huì)做判斷:如果新的水位線小于當(dāng)前的水位線,就不會(huì)更新了。
終于,順著StreamInputProcessor–>StatusWatermarkValve理了下來,看見這樣的處理邏輯:
StreamInputProcessor.java
StatusWatermarkValve.java
private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
long newMinWatermark = Long.MAX_VALUE;
// determine new overall watermark by considering only watermark-aligned channels across all channels
for (InputChannelStatus channelStatus : channelStatuses) {
if (channelStatus.isWatermarkAligned) {
newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
}
}
// we acknowledge and output the new overall watermark if it is larger than the last output watermark
if (newMinWatermark > lastOutputWatermark) {
lastOutputWatermark = newMinWatermark;
outputHandler.handleWatermark(new Watermark(lastOutputWatermark));
}
}
這里會(huì)將所有的channel status的水位線做個(gè)匯總:取最小的水位線。那是不是問題出在這里?后面debug了下看看,確實(shí),這個(gè)地方有的channel status下的水位線一直是最小的long值那個(gè)不正常的水位線,進(jìn)而導(dǎo)致整體的水位線發(fā)送不出去。
那么為什么會(huì)出現(xiàn)這種情況呢,百思不得其解。
當(dāng) [window_start_time,window_end_time) 有數(shù)據(jù),watermark Time大于等于window_end_time時(shí),會(huì)觸發(fā)window trigger。
因?yàn)橹斑\(yùn)行都是正常的,檢查了數(shù)據(jù)也沒問題。去翻改動(dòng),有影響的可能就是改了一些算子的并行度。
assigntimestampandwatermarks和map的并行度一樣了就不能生成水位線了?
于是修改了assigntimestampandwatermarks的并行度,window正常fire掉了。
分析原因:
Flink source用到了FlinkKafkaConsumer010,沒有指定KafkaPartitioner的話,會(huì)通過FixedPartitioner來給出默認(rèn)的partitioner方法:
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
return partitions[this.parallelInstanceId % partitions.length];
}
parallelInstanceId代表著Flink consumer程序的并行度ID,假如FlinkKafkaConsumer010的并行度是12,那么這12個(gè)線程的ID分別是1-12.
parallelInstances代表著總的并行度,即12。
partitions是一個(gè)kafka partition的數(shù)組,kafka的topic的partitions是4(因?yàn)樾阅軠y(cè)試,換到只有一個(gè)節(jié)點(diǎn)的kafka)。
Flink partition的規(guī)則,就是Flink的并行度ID除以kafka partition length取余。
因此kafka編號(hào)為1-4的partition分別對(duì)應(yīng)source node的1-4的partition,那么source node5-12的partition就為空了。
默認(rèn)的partition策略是按照Flink的并行度ID與kafka中partition的數(shù)量取余的方法分配的,而與數(shù)據(jù)本身沒有關(guān)系。
source node的partition為5-12的接收不到數(shù)據(jù),
Watermark的生成是數(shù)據(jù)驅(qū)動(dòng)的,只有當(dāng)TimestampAndWatermarkAssigner”看到”數(shù)據(jù)時(shí),watermark才會(huì)生效。
而map和assigntimestampandwatermarks并發(fā)度一樣的話,這八個(gè)partition的watermark不會(huì)修改,所以會(huì)出現(xiàn)watermark的初始值一直存在的情況。
當(dāng)assigntimestampandwatermarks的并行度修改后,事件會(huì)被shuffled across(洗牌),因此到了TimestampAndWatermarkAssigner不會(huì)有空的partition存在了。
以上。
五 實(shí)際定位
- 問題:發(fā)現(xiàn)程序沒有輸出,沒有任何報(bào)錯(cuò)。
- 可以先去任務(wù)的Appmaster上看watermark運(yùn)行情況.來定位是否是watermarke的問題

- 如果是顯示no watermark ,按照上述的case 進(jìn)行診斷。
- 在assignTimestampsAndWatermarks 后setParallelism(2) 。具體并行度根據(jù)實(shí)際需求設(shè)置,在這里kafkatopic只有2個(gè)分區(qū),并行度設(shè)置為2

