Flink Time和Watermark的理解
1. Time
背景
在實(shí)際開發(fā)過程中,我們可能需要接入各種流數(shù)據(jù)源,比如在線業(yè)務(wù)用戶點(diǎn)擊流數(shù)據(jù)、監(jiān)控系實(shí)時(shí)收集到的事件流數(shù)據(jù)、從傳感器采集到的實(shí)時(shí)數(shù)據(jù),等等,為了處理方便他們可能會(huì)寫入Kafka消息中間件集群中某個(gè)/某些topic中,或者選擇其它的緩沖/存儲(chǔ)系統(tǒng)。這些數(shù)據(jù)源中數(shù)據(jù)元素具有固定的時(shí)間屬性,是在流數(shù)據(jù)處理系統(tǒng)之外的其它系統(tǒng)生成的。比如,上億用戶通過手機(jī)終端操作觸發(fā)生成的事件數(shù)據(jù),都具有對(duì)應(yīng)的事件時(shí)間;再特殊一點(diǎn),可能我們希望回放(Replay)上一年手機(jī)終端用戶的歷史行為數(shù)據(jù),與當(dāng)前某個(gè)流數(shù)據(jù)集交叉分析才能夠得到支持某類業(yè)務(wù)的特定結(jié)果,這種情況下,基于數(shù)據(jù)所具有的事件時(shí)間進(jìn)行處理,就具有很重要的意義了。
下面,我們先從Flink支持的3個(gè)與流數(shù)據(jù)處理相關(guān)的時(shí)間概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系統(tǒng)對(duì)時(shí)間概念的抽象有其它叫法,比如,Google Cloud Dataflow中稱為時(shí)間域(Time Domain)。在Flink中,基于不同的Time Notion來處理流數(shù)據(jù),具有不同的意義和結(jié)果,所以了解這3個(gè)Time Notion非常關(guān)鍵。
Time Notion
我們先看下,Apache Flink官網(wǎng)文檔給出的一張概念圖,非常形象地展示了Process Time、Event Time、Ingestion Time這三個(gè)時(shí)間分別所處的位置,如下圖所示:

下面,分別對(duì)這3個(gè)Time Notion進(jìn)行說明如下:
ProcessTime--事件被處理時(shí)當(dāng)前系統(tǒng)的時(shí)間
Flink中有對(duì)數(shù)據(jù)處理的操作進(jìn)行抽象,稱為Transformation Operator,而對(duì)于整個(gè)Dataflow的開始和結(jié)束分別對(duì)應(yīng)著Source Operator和Sink Operator,這些Operator都是在Flink集群系統(tǒng)所在的主機(jī)節(jié)點(diǎn)上,所以在基于ProcessTime的Notion進(jìn)行與時(shí)間相關(guān)的數(shù)據(jù)處理時(shí),數(shù)據(jù)處理依賴于Flink程序運(yùn)行所在的主機(jī)節(jié)點(diǎn)系統(tǒng)時(shí)鐘(System Clock)。
因?yàn)槲覀冴P(guān)心的是數(shù)據(jù)處理時(shí)間(Process Time),比如進(jìn)行Time Window操作,對(duì)Window的指派就是基于當(dāng)前Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘。也就是說,每次創(chuàng)建一個(gè)Window,計(jì)算Window對(duì)應(yīng)的起始時(shí)間和結(jié)束時(shí)間都使用Process Time,它與外部進(jìn)入的數(shù)據(jù)元素的事件時(shí)間無關(guān)。那么,后續(xù)作用于Window的操作(Function)都是基于具有Process Time特性的Window進(jìn)行的。
使用ProcessTime的場(chǎng)景,比如,我們需要對(duì)某個(gè)App應(yīng)用的用戶行為進(jìn)行實(shí)時(shí)統(tǒng)計(jì)分析與監(jiān)控,由于用戶可能使用不同的終端設(shè)備,這樣可能會(huì)造成數(shù)據(jù)并非是實(shí)時(shí)的(如用戶手機(jī)沒電,導(dǎo)致2小時(shí)以后才會(huì)將操作行為記錄批量上傳上來)。而此時(shí),如果我們按照每分鐘的時(shí)間粒度做實(shí)時(shí)統(tǒng)計(jì)監(jiān)控,那么這些數(shù)據(jù)記錄延遲的太嚴(yán)重,如果為了等到這些記錄上傳上來(無法預(yù)測(cè),具體什么時(shí)間能獲取到這些數(shù)據(jù))再做統(tǒng)計(jì)分析,對(duì)每分鐘之內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析的結(jié)果恐怕要到幾個(gè)小時(shí)甚至幾天后才能計(jì)算并輸出結(jié)果,這不是我們所希望的。而且,數(shù)據(jù)處理系統(tǒng)可能也沒有這么大的容量來處理海量數(shù)據(jù)的情況。結(jié)合業(yè)務(wù)需求,其實(shí)我們只需要每分鐘時(shí)間內(nèi)進(jìn)入的數(shù)據(jù)記錄,依賴當(dāng)前數(shù)據(jù)處理系統(tǒng)的處理時(shí)間(Process Time)生成每分鐘的Window,指派數(shù)據(jù)記錄到指定Window并計(jì)算結(jié)果,這樣就不用考慮數(shù)據(jù)元素本身自帶的事件時(shí)間了。
EventTime--事件產(chǎn)生的時(shí)間,它通常由事件中的時(shí)間戳描述
流數(shù)據(jù)中的數(shù)據(jù)元素可能會(huì)具有不變的事件時(shí)間(Event Time)屬性,該事件時(shí)間是數(shù)據(jù)元素所代表的行為發(fā)生時(shí)就不會(huì)改變。最簡(jiǎn)單的情況下,這也最容易理解:所有進(jìn)入到Flink處理系統(tǒng)的流數(shù)據(jù),都是在外部的其它系統(tǒng)中產(chǎn)生的,它們產(chǎn)生后具有了事件時(shí)間,經(jīng)過傳輸后,進(jìn)入到Flink處理系統(tǒng),理論上(如果所有系統(tǒng)都具有相同系統(tǒng)時(shí)鐘)該事件時(shí)間對(duì)應(yīng)的時(shí)間戳要早于進(jìn)入到Flink處理系統(tǒng)中進(jìn)行處理的時(shí)間戳,但實(shí)際應(yīng)用中會(huì)出現(xiàn)數(shù)據(jù)記錄亂序、延遲到達(dá)等問題,這也是非常普遍的。
基于EventTime的Notion,處理數(shù)據(jù)的進(jìn)度(Progress)依賴于數(shù)據(jù)本身,而不是當(dāng)前Flink處理系統(tǒng)中Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘。所以,需要有一種機(jī)制能夠控制數(shù)據(jù)處理的進(jìn)度,比如一個(gè)基于事件時(shí)間的Time Window創(chuàng)建后,具體怎么確定屬于該Window的數(shù)據(jù)元素都已經(jīng)到達(dá)?如果確定都到達(dá)了,然后就可以對(duì)屬于這個(gè)Window的所有數(shù)據(jù)元素做滿足需要的處理(如匯總、分組等)。這就要用到WaterMark機(jī)制,它能夠衡量數(shù)據(jù)處理進(jìn)度(表達(dá)數(shù)據(jù)到達(dá)的完整性)。
WaterMark帶有一個(gè)時(shí)間戳,假設(shè)為X,進(jìn)入到數(shù)據(jù)處理系統(tǒng)中的數(shù)據(jù)元素具有事件時(shí)間,記為Y,如果Y<X,則所有的數(shù)據(jù)元素均已到達(dá),可以計(jì)算并輸出結(jié)果。反過來說,可能更容易理解一些:要想觸發(fā)對(duì)當(dāng)前Window中的數(shù)據(jù)元素進(jìn)行計(jì)算,必須保證對(duì)所有進(jìn)入到系統(tǒng)的數(shù)據(jù)元素,其事件時(shí)間Y>=X。如果數(shù)據(jù)元素的事件時(shí)間是有序的,那么當(dāng)出現(xiàn)一個(gè)數(shù)據(jù)元素的事件時(shí)間Y<X,則觸發(fā)對(duì)當(dāng)前Window計(jì)算,并創(chuàng)建另一個(gè)新的Window來指派事件時(shí)間Y<X的數(shù)據(jù)元素到該新的Window中。
可以看到,有了WaterMark機(jī)制,對(duì)基于事件時(shí)間的流數(shù)據(jù)處理會(huì)變得特別靈活,可以根據(jù)實(shí)際業(yè)務(wù)需要選擇各種組件和處理策略。比如,上面我們說到,當(dāng)Y<X則觸發(fā)當(dāng)前Window計(jì)算,記為t1時(shí)刻,如果流數(shù)據(jù)元素是亂序的,經(jīng)過一段時(shí)間,假設(shè)t2時(shí)刻有一個(gè)數(shù)據(jù)元素的事件時(shí)間Y>=X,這時(shí)該怎么辦呢?如果t1時(shí)刻的Window已經(jīng)不存在了,但我們還是希望新出現(xiàn)的亂序數(shù)據(jù)元素加入到t1時(shí)刻Window的計(jì)算中,這時(shí)可以實(shí)現(xiàn)自定義的Trigger來滿足各種業(yè)務(wù)場(chǎng)景的需要。
IngestionTime--事件進(jìn)入Flink的時(shí)間
IngestionTime是數(shù)據(jù)進(jìn)入到Flink流數(shù)據(jù)處理系統(tǒng)的時(shí)間,該時(shí)間依賴于Source Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘,會(huì)為到達(dá)的數(shù)據(jù)記錄指派Ingestion Time?;贗ngestionTime的Notion,存在多個(gè)Source Operator的情況下,每個(gè)Source Operator會(huì)使用自己本地系統(tǒng)時(shí)鐘指派Ingestion Time。后續(xù)基于時(shí)間相關(guān)的各種操作,都會(huì)使用數(shù)據(jù)記錄中的Ingestion Time。
與EventTime相比,IngestionTime不能處理亂序、延遲到達(dá)事件的應(yīng)用場(chǎng)景,它也就不用必須指定如何生成WaterMark。
設(shè)定時(shí)間特性
Flink DataStream 程序的第一部分通常是設(shè)置基本時(shí)間特性。 該設(shè)置定義了數(shù)據(jù)流源的行為方式(例如:它們是否將分配時(shí)間戳),以及像 **KeyedStream.timeWindow(Time.seconds(30)) ** 這樣的窗口操作應(yīng)該使用上面哪種時(shí)間概念。
以下示例顯示了一個(gè) Flink 程序,該程序在每小時(shí)時(shí)間窗口中聚合事件。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 其他
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
2. Watermark
Watermark的類型
EventTime和Watermarks
在使用eventTime的時(shí)候如何處理亂序數(shù)據(jù)?
我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)延遲等原因,導(dǎo)致亂序的產(chǎn)生,特別是使用kafka的話,多個(gè)分區(qū)的數(shù)據(jù)無法保證有序。所以在進(jìn)行window計(jì)算的時(shí)候,我們又不能無限期的等下去,必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制,就是watermark,watermark是用于處理亂序事件的。
watermark可以翻譯為水位線
有序的流的watermarks

無序的流的watermarks

多并行度流的watermarks
注意:多并行度的情況下,watermark對(duì)齊會(huì)取所有channel最小的watermark

在Apache Flink中使用watermark的4個(gè)理解
當(dāng)人們第一次使用Flink時(shí),經(jīng)常會(huì)對(duì)watermark感到困惑。但其實(shí)watermark并不復(fù)雜。讓我們通過一個(gè)簡(jiǎn)單的例子來說明為什么我們需要watermark,以及它的工作機(jī)制是什么樣的。
在下文中的例子中,我們有一個(gè)帶有時(shí)間戳的事件流,但是由于某種原因它們并不是按順序到達(dá)的。圖中的數(shù)字代表事件發(fā)生的時(shí)間戳。第一個(gè)到達(dá)的事件發(fā)生在時(shí)間4,然后它后面跟著的是發(fā)生在更早時(shí)間(時(shí)間2)的事件,以此類推:

注意這是一個(gè)按照事件時(shí)間處理的例子,這意味著時(shí)間戳反映的是事件發(fā)生的時(shí)間,而不是處理事件的時(shí)間。事件時(shí)間(Event-Time)處理的強(qiáng)大之處在于,無論是在處理實(shí)時(shí)的數(shù)據(jù)還是重新處理歷史的數(shù)據(jù),基于事件時(shí)間創(chuàng)建的流計(jì)算應(yīng)用都能保證結(jié)果是一樣的。
現(xiàn)在假設(shè)我們正在嘗試創(chuàng)建一個(gè)流計(jì)算排序算子。也就是處理一個(gè)亂序到達(dá)的事件流,并按照事件時(shí)間的順序輸出事件。
理解1
數(shù)據(jù)流中的第一個(gè)元素的時(shí)間是4,但是我們不能直接將它作為排序后數(shù)據(jù)流的第一個(gè)元素并輸出它。因?yàn)閿?shù)據(jù)是亂序到達(dá)的,也許有一個(gè)更早發(fā)生的數(shù)據(jù)還沒有到達(dá)。事實(shí)上,我們能預(yù)見一些這個(gè)流的未來,也就是我們的排序算子至少要等到2這條數(shù)據(jù)的到達(dá)再輸出結(jié)果。
有緩存,就必然有延遲。
理解2
如果我們做錯(cuò)了,我們可能會(huì)永遠(yuǎn)等待下去。首先,我們的應(yīng)用程序從看到時(shí)間4的數(shù)據(jù),然后看到時(shí)間2的數(shù)據(jù)。是否會(huì)有一個(gè)比時(shí)間2更早的數(shù)據(jù)到達(dá)呢?也許會(huì),也許不會(huì)。我們可以一直等下去,但可能永遠(yuǎn)看不到1。
最終,我們必須勇敢地輸出 2 作為排序流的第一個(gè)結(jié)果
理解3
我們需要的是某種策略,它定義了對(duì)于任何帶時(shí)間戳的事件流,何時(shí)停止等待更早數(shù)據(jù)的到來。
這正是 watermark 的作用,他們定義了何時(shí)不再等待更早的數(shù)據(jù)。
Flink中的事件時(shí)間處理依賴于一種特殊的帶時(shí)間戳的元素,成為watermark,它們會(huì)由數(shù)據(jù)源或是watermark生成器插入數(shù)據(jù)流中。具有時(shí)間戳t的watermark可以被理解為斷言了所有時(shí)間戳小于或等于t的事件都(在某種合理的概率上)已經(jīng)到達(dá)了。
注:此處原文是“小于”,譯者認(rèn)為應(yīng)該是 “小于或等于”,因?yàn)?Flink 源碼中采用的是 “小于或等于” 的機(jī)制。
何時(shí)我們的排序算子應(yīng)該停止等待,然后將事件2作為首個(gè)元素輸出?答案是當(dāng)收到時(shí)間戳為2(或更大)的watermark時(shí)。
理解4
我們可以設(shè)想不同的策略來生成watermark。
我們知道每個(gè)事件都會(huì)延遲一段時(shí)間才到達(dá),而這些延遲差異會(huì)比較大,所以有些事件會(huì)比其他事件延遲更多。一種簡(jiǎn)單的方法是假設(shè)這些延遲不會(huì)超過某個(gè)最大值。Flink 把這種策略稱作 “有界無序生成策略”(bounded-out-of-orderness)。當(dāng)然也有很多更復(fù)雜的方式去生成watermark,但是對(duì)于大多數(shù)應(yīng)用來說,固定延遲的方式已經(jīng)足夠了。
如果想要構(gòu)建一個(gè)類似排序的流應(yīng)用,可以使用Flink的ProcessFunction。它提供了對(duì)事件時(shí)間計(jì)時(shí)器(基于watermark觸發(fā)回調(diào))的訪問,還提供了可以用來緩存數(shù)據(jù)的托管狀態(tài)接口。
Watermark案例
1.watermarks的生成方式
通常,在接收到source的數(shù)據(jù)后,應(yīng)該立刻生成watermark;但是,也可以在source后,應(yīng)用簡(jiǎn)單的map或者filter操作后,再生成watermark。
注意:如果指定多次watermark,后面指定的會(huì)覆蓋前面的值。
生成方式
-
With Periodic Watermarks
周期性的觸發(fā)watermark的生成和發(fā)送,默認(rèn)是100ms
每隔N秒自動(dòng)向流里注入一個(gè)WATERMARK
時(shí)間間隔由ExecutionConfig.setAutoWatermarkInterval 決定.
每次調(diào)用getCurrentWatermark 方法, 如果得到的WATERMARK
不為空并且比之前的大就注入流中
可以定義一個(gè)最大允許亂序的時(shí)間,這種比較常用
實(shí)現(xiàn)AssignerWithPeriodicWatermarks接口
-
With Punctuated Watermarks
基于某些事件觸發(fā)watermark的生成和發(fā)送
基于事件向流里注入一個(gè)WATERMARK,每一個(gè)元素都有機(jī)會(huì)判斷是否生成一個(gè)WATERMARK.
如果得到的WATERMARK 不為空并且比之前的大就注入流中
實(shí)現(xiàn)AssignerWithPunctuatedWatermarks接口
2.watermark和window案例
這里寫了一個(gè)watermark&window的flink程序,從socket讀取數(shù)據(jù)
代碼:
public class StreamingWindowWatermark {
private static final Logger log = LoggerFactory.getLogger(StreamingWindowWatermark.class);
public static void main(String[] args) throws Exception {
//定義socket的端口號(hào)
int port = 9000;
//獲取運(yùn)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置使用eventtime,默認(rèn)是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設(shè)置并行度為1,默認(rèn)并行度是當(dāng)前機(jī)器的cpu數(shù)量
env.setParallelism(1);
//連接socket獲取輸入的數(shù)據(jù)
DataStream<String> text = env.socketTextStream("zzy", port, "\n");
//解析輸入的數(shù)據(jù),每行數(shù)據(jù)按逗號(hào)分隔
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時(shí)間是10s
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定義生成watermark的邏輯,比當(dāng)前最大時(shí)間戳晚10s
* 默認(rèn)100ms被調(diào)用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
//設(shè)置多并行度時(shí)獲取線程id
long id = Thread.currentThread().getId();
log.info("extractTimestamp=======>" + ",currentThreadId:" + id + ",key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "]," +
"currentMaxTimestamp:[" + currentMaxTimestamp + "|" +
sdf.format(currentMaxTimestamp) + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]");
// System.out.println("currentThreadId:" + id + ",key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" +
// sdf.format(currentMaxTimestamp) + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]");
return timestamp;
}
});
DataStream<String> window = waterMarkStream.keyBy(0)//分組
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調(diào)用TimeWindow效果一樣
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對(duì)window內(nèi)的數(shù)據(jù)進(jìn)行排序,保證數(shù)據(jù)的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
//時(shí)間戳放到了arrarList里
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
}
});
//測(cè)試-把結(jié)果打印到控制臺(tái)即可
window.print();
//注意:因?yàn)閒link是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會(huì)執(zhí)行
env.execute("eventtime-watermark");
}
}
啟動(dòng)程序StreamingWindowWatermark
打印日志:

2019-02-14 11:57:36,715 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction] [INFO] - Connecting to server socket zzy:9000
2019-02-14 11:57:36,741 [Window(TumblingEventTimeWindows(3000), EventTimeTrigger, WindowFunction$3) -> Sink: Print to Std. Out (1/1)] [org.apache.flink.runtime.state.heap.HeapKeyedStateBackend] [INFO] - Initializing heap keyed state backend with stream factory.
首先,我們開啟socket,輸入第一條數(shù)據(jù),數(shù)據(jù)格式是(id,時(shí)間戳):
? /data nc -l 9000
0001,1550116440000
輸出如下:

019-02-14 11:58:48,690 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116440000|2019-02-14 11:54:00.000],currentMaxTimestamp:[1550116440000|2019-02-14 11:54:00.000],watermark:[1550116430000|2019-02-14 11:53:50.000]
匯總下表:

此時(shí),wartermark的時(shí)間按照邏輯,已經(jīng)落后于currentMaxTimestamp10秒了。
我們繼續(xù)輸入:
0001,1550116444000
輸出內(nèi)容如下:
2019-02-14 12:08:25,474 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116444000|2019-02-14 11:54:04.000],currentMaxTimestamp:[1550116444000|2019-02-14 11:54:04.000],watermark:[1550116434000|2019-02-14 11:53:54.000]
再次匯總表:

繼續(xù)輸入:
0001,1550116450000
輸出內(nèi)容如下:
2019-02-14 14:30:27,480 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116450000|2019-02-14 11:54:10.000],currentMaxTimestamp:[1550116450000|2019-02-14 11:54:10.000],watermark:[1550116440000|2019-02-14 11:54:00.000]
匯總下表:

到這里,window仍然沒有被觸發(fā),此時(shí)watermark的時(shí)間已經(jīng)等于了第一條數(shù)據(jù)的Event Time了。那么window到底什么時(shí)候被觸發(fā)呢?我們?cè)俅屋斎耄?br>
0001,1550116451000
輸出內(nèi)容如下:
2019-02-14 14:36:01,479 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116451000|2019-02-14 11:54:11.000],currentMaxTimestamp:[1550116451000|2019-02-14 11:54:11.000],watermark:[1550116441000|2019-02-14 11:54:01.000]
匯總?cè)缦?

可以看到window仍然沒有觸發(fā),此時(shí),我們的數(shù)據(jù)已經(jīng)發(fā)到2019-02-14 11:54:11.000了,最早的數(shù)據(jù)已經(jīng)過去了11秒了,還沒有開始計(jì)算。那是不是要等到13(10+3)秒過去了,才開始觸發(fā)window呢?答案是否定的。
我們?cè)俅卧黾?秒,輸入:
0001,1550116452000
輸出內(nèi)容如下:
2019-02-14 14:40:50,332 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116452000|2019-02-14 11:54:12.000],currentMaxTimestamp:[1550116452000|2019-02-14 11:54:12.000],watermark:[1550116442000|2019-02-14 11:54:02.000]
匯總?cè)缦?

Window依舊沒有觸發(fā)
我們?cè)俅卧黾?s,輸入:
0001,1550116453000
輸出內(nèi)容如下:
2019-02-14 14:51:10,020 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116453000|2019-02-14 11:54:13.000],currentMaxTimestamp:[1550116453000|2019-02-14 11:54:13.000],watermark:[1550116443000|2019-02-14 11:54:03.000]
(0001),1,2019-02-14 11:54:00.000,2019-02-14 11:54:00.000,2019-02-14 11:54:00.000,2019-02-14 11:54:03.000
可以看到觸發(fā)了window操作,打印數(shù)據(jù)到控制臺(tái)了

String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
匯總?cè)缦拢?/p>

到這里,我們做一個(gè)說明:
window的觸發(fā)機(jī)制,是先按照自然時(shí)間將window劃分,如果window大小是3秒,那么1分鐘內(nèi)會(huì)把window劃分為如下的形式(注意window是左閉右開的):
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
如果window大小是10秒,則window會(huì)被分為如下的形式:
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
window的設(shè)定無關(guān)數(shù)據(jù)本身,而是系統(tǒng)定義好了的。
輸入的數(shù)據(jù)中,根據(jù)自身的Event Time,將數(shù)據(jù)劃分到不同的window中,如果window中有數(shù)據(jù),則當(dāng)watermark時(shí)間>=Event Time時(shí),就符合了window觸發(fā)的條件了,最終決定window觸發(fā),還是由數(shù)據(jù)本身的Event Time所屬的window中的window_end_time決定。
上面的測(cè)試中,最后一條數(shù)據(jù)到達(dá)后,其水位線已經(jīng)升至19:34:24秒,正好是最早的一條記錄所在window的window_end_time,所以window就被觸發(fā)了。
為了驗(yàn)證window的觸發(fā)機(jī)制,我們繼續(xù)輸入數(shù)據(jù):
0001,1550116455000
輸出內(nèi)容如下:
2019-02-14 15:00:58,535 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116455000|2019-02-14 11:54:15.000],currentMaxTimestamp:[1550116455000|2019-02-14 11:54:15.000],watermark:[1550116445000|2019-02-14 11:54:05.000]

匯總表:

此時(shí),watermark時(shí)間雖然已經(jīng)達(dá)到了第二條數(shù)據(jù)的時(shí)間,但是由于其沒有達(dá)到第二條數(shù)據(jù)所在window的結(jié)束時(shí)間,所以window并沒有被觸發(fā)。那么,第二條數(shù)據(jù)所在的window時(shí)間是:
[2019/2/14 11:54:03, 2019/2/14 11:54:06)
也就是說,我們必須輸入一個(gè)11:54:06秒的數(shù)據(jù),第二條數(shù)據(jù)所在的window才會(huì)被觸發(fā)。
我們繼續(xù)輸入:
0001,1550116456000
輸出如下:
2019-02-14 15:07:48,879 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116456000|2019-02-14 11:54:16.000],currentMaxTimestamp:[1550116456000|2019-02-14 11:54:16.000],watermark:[1550116446000|2019-02-14 11:54:06.000]
(0001),1,2019-02-14 11:54:04.000,2019-02-14 11:54:04.000,2019-02-14 11:54:03.000,2019-02-14 11:54:06.000

可以看到是有觸發(fā)windows操作的
匯總:

下面劃重點(diǎn)了
watermark觸發(fā)條件
此時(shí),我們已經(jīng)看到,window的觸發(fā)要符合以下幾個(gè)條件:
1、watermark時(shí)間 >= window_end_time
2、在[window_start_time,window_end_time)中有數(shù)據(jù)存在
同時(shí)滿足了以上2個(gè)條件,window才會(huì)觸發(fā)。
而且,這里要強(qiáng)調(diào)一點(diǎn),watermark是一個(gè)全局的值,不是某一個(gè)key下的值,所以即使不是同一個(gè)key的數(shù)據(jù),其warmark也會(huì)增加,例如:
0002,1550116458000
輸出如下:
2019-02-14 15:22:04,219 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:39,key:0002,eventtime:[1550116458000|2019-02-14 11:54:18.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
我們看到,currentMaxTimestamp也增加到2019-02-14 11:54:08.000了。
watermark+window處理亂序
我們上面的測(cè)試,數(shù)據(jù)都是按照時(shí)間順序遞增的,現(xiàn)在,我們輸入一些亂序的(late)數(shù)據(jù),看看watermark結(jié)合window機(jī)制,是如何處理亂序的。
輸入:
0001,1550116440000
0001,1550116441000
0001,1550116442000
0001,1550116443000
0001,1550116444000
0001,1550116445000
0001,1550116446000
0001,1550116450000
0001,1550116451000
0001,1550116452000
0001,1550116453000
0001,1550116456000
0001,1550116460000
0001,1550116461000
0001,1550116462000
0001,1550116464000
輸出如下:
2019-02-14 15:34:49,469 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116452000|2019-02-14 11:54:12.000],currentMaxTimestamp:[1550116452000|2019-02-14 11:54:12.000],watermark:[1550116442000|2019-02-14 11:54:02.000]
2019-02-14 15:34:50,276 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116453000|2019-02-14 11:54:13.000],currentMaxTimestamp:[1550116453000|2019-02-14 11:54:13.000],watermark:[1550116443000|2019-02-14 11:54:03.000]
(0001),3,2019-02-14 11:54:00.000,2019-02-14 11:54:02.000,2019-02-14 11:54:00.000,2019-02-14 11:54:03.000
2019-02-14 15:35:05,916 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116456000|2019-02-14 11:54:16.000],currentMaxTimestamp:[1550116456000|2019-02-14 11:54:16.000],watermark:[1550116446000|2019-02-14 11:54:06.000]
(0001),3,2019-02-14 11:54:03.000,2019-02-14 11:54:05.000,2019-02-14 11:54:03.000,2019-02-14 11:54:06.000
2019-02-14 15:35:17,804 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116460000|2019-02-14 11:54:20.000],currentMaxTimestamp:[1550116460000|2019-02-14 11:54:20.000],watermark:[1550116450000|2019-02-14 11:54:10.000]
2019-02-14 15:35:17,804 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116461000|2019-02-14 11:54:21.000],currentMaxTimestamp:[1550116461000|2019-02-14 11:54:21.000],watermark:[1550116451000|2019-02-14 11:54:11.000]
2019-02-14 15:35:17,804 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116462000|2019-02-14 11:54:22.000],currentMaxTimestamp:[1550116462000|2019-02-14 11:54:22.000],watermark:[1550116452000|2019-02-14 11:54:12.000]
(0001),1,2019-02-14 11:54:06.000,2019-02-14 11:54:06.000,2019-02-14 11:54:06.000,2019-02-14 11:54:09.000
(0001),2,2019-02-14 11:54:10.000,2019-02-14 11:54:11.000,2019-02-14 11:54:09.000,2019-02-14 11:54:12.000
2019-02-14 15:35:48,356 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116464000|2019-02-14 11:54:24.000],currentMaxTimestamp:[1550116464000|2019-02-14 11:54:24.000],watermark:[1550116454000|2019-02-14 11:54:14.000]
再輸入:
0001,1550116454000
輸出如下:
2019-02-14 15:40:41,051 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116454000|2019-02-14 11:54:14.000],currentMaxTimestamp:[1550116464000|2019-02-14 11:54:24.000],watermark:[1550116454000|2019-02-14 11:54:14.000]
匯總:

可以看到,雖然我們輸入了一個(gè)2019/2/14 11:54:14的數(shù)據(jù),但是currentMaxTimestamp和watermark都沒變。
此時(shí),按照我們上面提到的公式:
1、watermark時(shí)間 >= window_end_time
2、在[window_start_time,window_end_time)中有數(shù)據(jù)存在
那如果我們?cè)俅屋斎胍粭l2019/2/14 11:54:25的數(shù)據(jù),此時(shí)watermark時(shí)間會(huì)升高到19:34:33,這時(shí)的window一定就會(huì)觸發(fā)了,我們?cè)囈辉嚕?br>
輸入:
0001,1550116465000
輸出如下:
2019-02-14 15:48:07,322 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116465000|2019-02-14 11:54:25.000],currentMaxTimestamp:[1550116465000|2019-02-14 11:54:25.000],watermark:[1550116455000|2019-02-14 11:54:15.000]
(0001),3,2019-02-14 11:54:12.000,2019-02-14 11:54:14.000,2019-02-14 11:54:12.000,2019-02-14 11:54:15.000
可以看到觸發(fā)了window操作,打印了2019/2/14 11:54:14這條數(shù)據(jù)
匯總:

上邊的結(jié)果,已經(jīng)表明,對(duì)于out-of-order的數(shù)據(jù),F(xiàn)link可以通過watermark機(jī)制結(jié)合window的操作,來處理一定范圍內(nèi)的亂序數(shù)據(jù)。那么對(duì)于“遲到”太多的數(shù)據(jù),F(xiàn)link是怎么處理的呢?
late element的處理
運(yùn)行代碼:StreamingWindowWatermark2
public class StreamingWindowWatermark2 {
private static final Logger log = LoggerFactory.getLogger(StreamingWindowWatermark2.class);
public static void main(String[] args) throws Exception {
//定義socket的端口號(hào)
int port = 9000;
//獲取運(yùn)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置使用eventtime,默認(rèn)是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設(shè)置并行度為1,默認(rèn)并行度是當(dāng)前機(jī)器的cpu數(shù)量
env.setParallelism(1);
//連接socket獲取輸入的數(shù)據(jù)
DataStream<String> text = env.socketTextStream("zzy", port, "\n");
//解析輸入的數(shù)據(jù)
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時(shí)間是10s--亂序時(shí)間
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定義生成watermark的邏輯
* 默認(rèn)100ms被調(diào)用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
log.info("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
// System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
// sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
return timestamp;
}
});
//保存被丟棄的數(shù)據(jù)
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
//注意,由于getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,所以這里的類型,不能使用它的父類dataStream。
SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調(diào)用TimeWindow效果一樣
//.allowedLateness(Time.seconds(2))//允許數(shù)據(jù)遲到2秒--延遲時(shí)間
.sideOutputLateData(outputTag)
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對(duì)window內(nèi)的數(shù)據(jù)進(jìn)行排序,保證數(shù)據(jù)的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "key:" + key + ",size:" + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
}
});
//window.getSideOutput獲取遲到的數(shù)據(jù),把遲到的數(shù)據(jù)暫時(shí)打印到控制臺(tái),實(shí)際中可以保存到其他存儲(chǔ)介質(zhì)中
DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
sideOutput.flatMap(new FlatMapFunction<Tuple2<String,Long>, Tuple2<String,String>>() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void flatMap(Tuple2<String, Long> stringLongTuple2, Collector<Tuple2<String, String>> collector) throws Exception {
collector.collect(new Tuple2<>(stringLongTuple2.f0,"eventtime:" + stringLongTuple2.f1 + "|"
+ sdf.format(stringLongTuple2.f1)));
}
}).print();
// sideOutput.print();
//測(cè)試-把結(jié)果打印到控制臺(tái)即可
window.print();
//注意:因?yàn)閒link是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會(huì)執(zhí)行
env.execute("eventtime-watermark-late-data");
}
}
我們輸入一個(gè)亂序很多的數(shù)據(jù)來測(cè)試下:
輸入:
? /data nc -l 9000
0001,1550116440000
0001,1550116443000
0001,1550116444000
0001,1550116445000
0001,1550116446000
0001,1550116450000
0001,1550116451000
0001,1550116452000
0001,1550116453000
0001,1550116441000
0001,1550116454000
0001,1550116455000
0001,1550116455000
0001,1550116457000
0001,1550116458000
輸出如下:
2019-02-14 16:34:27,881 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116455000|2019-02-14 11:54:15.000],currentMaxTimestamp:[1550116455000|2019-02-14 11:54:15.000],watermark:[1550116445000|2019-02-14 11:54:05.000]
2019-02-14 16:34:27,881 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116455000|2019-02-14 11:54:15.000],currentMaxTimestamp:[1550116455000|2019-02-14 11:54:15.000],watermark:[1550116445000|2019-02-14 11:54:05.000]
2019-02-14 16:34:27,882 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116457000|2019-02-14 11:54:17.000],currentMaxTimestamp:[1550116457000|2019-02-14 11:54:17.000],watermark:[1550116447000|2019-02-14 11:54:07.000]
key:(0001),size:3,2019-02-14 11:54:03.000,2019-02-14 11:54:05.000,2019-02-14 11:54:03.000,2019-02-14 11:54:06.000
2019-02-14 16:34:28,420 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116458000|2019-02-14 11:54:18.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
輸入數(shù)據(jù):
0001,1550116447000
0001,1550116446000
輸出如下:
2019-02-14 16:35:25,902 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116447000|2019-02-14 11:54:07.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
2019-02-14 16:39:11,450 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116446000|2019-02-14 11:54:06.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]

沒有觸發(fā)window
550116446000|2019-02-14 11:54:06.000 對(duì)應(yīng)的window是
[2019-02-14 11:54:06.000, 2019-02-14 11:54:09.000)
而現(xiàn)在的watermark是2019-02-14 11:54:08.000 比2019-02-14 11:54:09.000小,輸入eventtime是1550116445000|2019-02-14 11:54:05.000的事件
輸入:
0001,1550116445000
輸出:
2019-02-14 16:40:14,721 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116445000|2019-02-14 11:54:05.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
(0001,eventtime:1550116445000|2019-02-14 11:54:05.000)

我們輸入數(shù)據(jù):
0001,1550116444000
輸出:
2019-02-14 16:47:38,607 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116444000|2019-02-14 11:54:04.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
(0001,eventtime:1550116444000|2019-02-14 11:54:04.000)
可以看出來是有觸發(fā)window的

總結(jié)
1.Flink如何處理亂序?
watermark+window機(jī)制,window中可以對(duì)input進(jìn)行按照Event Time排序,使得完全按照Event Time發(fā)生的順序去處理數(shù)據(jù),以達(dá)到處理亂序數(shù)據(jù)的目的。-
- Flink何時(shí)觸發(fā)window?
1、watermark時(shí)間 >= window_end_time(對(duì)于out-of-order以及正常的數(shù)據(jù)而言)
2、在[window_start_time,window_end_time)中有數(shù)據(jù)存在
3.Flink應(yīng)該如何設(shè)置最大亂序時(shí)間?
這個(gè)要結(jié)合自己的業(yè)務(wù)以及數(shù)據(jù)情況去設(shè)置。如果maxOutOfOrderness設(shè)置的太小,而自身數(shù)據(jù)發(fā)送時(shí)由于網(wǎng)絡(luò)等原因?qū)е聛y序或者late太多,那么最終的結(jié)果就是會(huì)有很多單條的數(shù)據(jù)在window中被觸發(fā),數(shù)據(jù)的正確性影響太大。
參考:
http://shiyanjun.cn/archives/1785.html
http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/