1. Watermark概念
watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性。通常基于Event Time的數(shù)據(jù),自身都包含一個(gè)timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)。但是對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。此時(shí)就是watermark發(fā)揮作用了,它表示當(dāng)達(dá)到watermark到達(dá)之后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù)).watermark的示意圖如下.

2. 生成EventTime和Watermark
以下這個(gè)程序的功能是實(shí)現(xiàn)計(jì)算相同時(shí)間窗口出現(xiàn)相同單詞的統(tǒng)計(jì).在這個(gè)過程中,自定義實(shí)現(xiàn)了時(shí)間戳和Watermark.
public class DataStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設(shè)置時(shí)間分配器
env.setParallelism(1); //設(shè)置并行度
env.getConfig().setAutoWatermarkInterval(9000);//每9秒發(fā)出一個(gè)watermark
DataStream<String> text = env.socketTextStream("localhost", 9900);
DataStream<Tuple3<String, Long, Integer>> counts = text.filter(new FilterClass()).map(new LineSplitter())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
private long currentMaxTimestamp = 0l;
private final long maxOutOfOrderness = 10000l; //這個(gè)控制失序已經(jīng)延遲的度量
//獲取EventTime
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
System.out.println(
"get timestamp is " + timestamp + " currentMaxTimestamp " + currentMaxTimestamp);
return timestamp;
}
//獲取Watermark
@Override
public Watermark getCurrentWatermark() {
System.out.println("wall clock is " + System.currentTimeMillis() + " new watermark "
+ (currentMaxTimestamp - maxOutOfOrderness));
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}).keyBy(0).timeWindow(Time.seconds(20))
// .allowedLateness(Time.seconds(10))
.sum(2);
counts.print();
env.execute("Window WordCount");
}
// 自定義獲取timesStamp
// private static class MyTimestamp extends AscendingTimestampExtractor<Tuple3<String, Long, Integer>> {
//
// private static final long serialVersionUID = 1L;
//
// public long extractAscendingTimestamp(Tuple3<String, Long, Integer> element) {
//
// return element.f1;
// }
//
// }
//構(gòu)造出element以及它的event time.然后把次數(shù)賦值為1
public static final class LineSplitter implements MapFunction<String, Tuple3<String, Long, Integer>> {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
// TODO Auto-generated method stub
String[] tokens = value.toLowerCase().split("\\W+");
long eventtime = Long.parseLong(tokens[1]);
return new Tuple3<String, Long, Integer>(tokens[0], eventtime, 1);
}
}
//過濾掉為null和whitespace的字符串
public static final class FilterClass implements FilterFunction<String> {
@Override
public boolean filter(String value) throws Exception {
if (StringUtils.isNullOrWhitespaceOnly(value)) {
return false;
} else {
return true;
}
}
}
}
- maxOutOfOrderness 這個(gè)參數(shù)在設(shè)置的時(shí)候往往根據(jù)經(jīng)驗(yàn)來(lái).MaxOutOfOrderness設(shè)置的太小,而自身數(shù)據(jù)發(fā)送時(shí)由于網(wǎng)絡(luò)等原因?qū)е聛y序或者late太多,那么最終的結(jié)果就是會(huì)有很多單條的數(shù)據(jù)在window中被觸發(fā),數(shù)據(jù)的正確性影響太大。如果設(shè)置太大,導(dǎo)致設(shè)置的Watermark太小,使得Watermark沒有用,因?yàn)樵驹诤芏痰臅r(shí)間內(nèi),一個(gè)窗口的所有的數(shù)據(jù)都到達(dá)了,但是不得不等Watermark一點(diǎn)點(diǎn)變大, 才能觸發(fā)計(jì)算.
3. EventTime按順序的情況
運(yùn)行上述程序.初始化的時(shí)候,由于沒有輸入,watermark為-10000

在9900監(jiān)聽端口,輸入aa 1522827199000(2018-04-04 15:33:19),重復(fù)3次.得到如下圖所示結(jié)果.現(xiàn)在的watermark是1522827189000(2018-04-04 15:33:09),即為最大的currentMaxTimestamp-10000.在這里生命下,aa 的時(shí)間2018-04-04 15:33:19在2018-04-04 15:33:0-2018-04-04 15:33:20這個(gè)窗口中.因?yàn)槲以O(shè)置的窗口為20s.不管怎樣怎樣,窗口是確定的.初始化設(shè)置后,就一直不會(huì)變.而且窗口是左閉右開的區(qū)間.

接著輸入bb 1522827299000(2018-04-04 15:34:59),此時(shí)的watermark為1522827289000(2018-04-04 15:34:49),此時(shí)的watermark超過了aa所在窗口的endtime(2018-04-04 15:33:20).那么會(huì)觸發(fā)計(jì)算,從而會(huì)有下面的輸出.在這里強(qiáng)調(diào)下,觸發(fā)計(jì)算的時(shí)間點(diǎn)是
- watermark超過了window的endtime.
- 在該window中有數(shù)據(jù).
只有同時(shí)滿足這兩個(gè)條件,就會(huì)觸發(fā)計(jì)算.

4. EventTime不按順序的情況
輸入如下數(shù)據(jù):
aa 1522827261000(2018-04-04 15:34:21)
aa 1522827251000(2018-04-04 15:34:11)
aa 1522827252000(2018-04-04 15:34:12)
ee 1522827291000(2018-04-04 15:34:51)
由于最后一個(gè)ee的輸入,改變了watermark,使得當(dāng)前的時(shí)間戳為2018-04-04 15:34:41.那么此時(shí)會(huì)觸發(fā)前面兩個(gè)窗口的計(jì)算.計(jì)算結(jié)果如下.

Note:此時(shí),如果還輸入之前的時(shí)間窗口的aa 1522827261000.是不會(huì)觸發(fā)計(jì)算的,它會(huì)丟棄掉數(shù)據(jù),這在后面會(huì)引入allowedLateness參數(shù).原窗口中的內(nèi)容不會(huì)立即被刪除,而是會(huì)再次等待一段時(shí)間,即watermark小于end-time + allowedLateness時(shí),后續(xù)的該窗口的數(shù)據(jù)到達(dá)時(shí)會(huì)納入到原窗口,再次觸發(fā)計(jì)算.而watermark >= end_time + allowedLateness,后續(xù)的還有屬于該窗口的數(shù)據(jù)到達(dá)時(shí),那么這種數(shù)據(jù)只能被刪除了,因?yàn)橄到y(tǒng)不會(huì)無(wú)限制的等下去,這既會(huì)增加window buffer的大小,也會(huì)引起不必要的性能下降.
5. 總結(jié)
通過以上例子,我們可以看到如何通過watermark對(duì)順序以及亂序數(shù)據(jù)的處理.以及如何在watermark觸發(fā)之后還能通過 allowedLateness對(duì)延遲做一些補(bǔ)償.watermark的設(shè)計(jì)思想和用途很光,我也只是淺嘗輒止.
參考文章:
flink watermark介紹
Flink流計(jì)算編程--watermark(水位線)簡(jiǎn)介