Watermark
- watermark用于指示事件的進(jìn)展,一個時間戳為T的watermark表示事件時間已經(jīng)推進(jìn)到了T,而且不會有比T更早的事件出現(xiàn)
- watermark一般在數(shù)據(jù)流的源頭創(chuàng)建,并通過流和操作符傳播
-
當(dāng)?shù)竭_(dá)流的末尾是,會發(fā)出一個Long.MAX_VALUE表示終止水印
image.png
前提
所有算子包括Source和Sink都是邏輯數(shù)據(jù)流圖,需要StreamGraphGenerator和JobGraphGenerator生成物理執(zhí)行計劃,根據(jù)information通過translator將算子轉(zhuǎn)為operator。也就是說,flink job的編碼是兩條線,一條明線,即用戶編寫的邏輯,一條暗線是flink內(nèi)部真正封裝了算子行為,水位線傳遞,Checkpoint生成邏輯的物理執(zhí)行計劃。
watermark的源頭
-
TimestampsAndWatermarks是source從事件提取時間戳生成watermark的重要接口 -
WatermarkUpdateListener接口用于監(jiān)聽 -
createMainOutput創(chuàng)建ReaderOutput,用于Source Reader輸出數(shù)據(jù),包含內(nèi)部處理時間戳邏輯和watermark生成邏輯 -
startPeriodicWatermarkEmits和stopPeriodicWatermarkEmits啟動和停止周期性watermark生成 -
emitImmediateWatermark立即發(fā)出watermark - 兩個工廠方法
createProgressiveEventTimeLogic和createNoOpEventTimeLogic,會根據(jù)WatermarkStrategy生成TimestampsAndWatermarks實例-
createProgressiveEventTimeLogic創(chuàng)建一個符合漸進(jìn)式事件時間邏輯的TimestampsAndWatermarks實例,用于流處理 -
createNoOpEventTimeLogic創(chuàng)建一個無操作的TimestampsAndWatermarks實例,用于批處理,跳過watermark生成邏輯
-

image.png
- 接口
TimestampsAndWatermarks被ProgressiveTimestampsAndWatermarks和NoOpTimestampsAndWatermarks實現(xiàn),他們唯一的創(chuàng)建方式是調(diào)用TimestampsAndWatermarks的工程方法創(chuàng)建 - 那么,是誰創(chuàng)建了
TimestampsAndWatermarks了呢,是SourceOperator,Source在物理計劃的實現(xiàn)。在SourceOperator的open方法創(chuàng)建了eventTimeLogic,可以看到watermarkStrategy代表用戶指定的生成策略,我們指定的BoundedOutOfOrdernessWatermarks策略就是在這里被傳遞進(jìn)物理執(zhí)行計劃的

image.png
- 話分兩頭,
SourceOperator需要具有將record和watermark輸出的能力,Source的輸出由ReaderOutput定義,他實現(xiàn)了SourceOutput能力,是SourceReader和下游算子交互的核心接口之一 - 數(shù)據(jù)記錄發(fā)送
-
collect(T record)發(fā)送不帶時間戳的記錄 -
collect(T record, long timestamp)發(fā)送帶時間戳的記錄
-
- watermark發(fā)送,
emitWatermark(Watermark watermark)發(fā)送watermark,標(biāo)記事件進(jìn)展 - idle狀態(tài)管理,
markIdle()標(biāo)記當(dāng)前輸出為idle狀態(tài),表示下游算子無需等待該輸出的watermark - split管理
-
createOutputForSplit(String splitId)為特定分片創(chuàng)建一個獨立的SourceOutput,用于設(shè)置各個分片的創(chuàng)建邏輯,比如創(chuàng)建水位線 -
releaseOutputForSplit(String splitId)釋放特定分片的SourceOutput
-

image.png
-
SourceOperator中,output就是成員對象currentMainOutput,他被上文eventTimeLogic所創(chuàng)建,eventTimeLogic是TimestampsAndWatermarks類型,當(dāng)我們使用流處理時,eventTimeLogic的類型是ProgressiveTimestampsAndWatermarks -
ProgressiveTimestampsAndWatermarks實現(xiàn)了createMainOutput方法,在這個方法中currentMainOutput被賦值為StreamingReaderOutput -
StreamingReaderOutput繼承自SourceOutputWithWatermarks,SourceOutputWithWatermarks實現(xiàn)了SourceOutput - 現(xiàn)在我們就知道了一些事情,
SourceOperator有兩個成員變量,類型為TimestampsAndWatermarks和ReaderOutput,TimestampsAndWatermarks用于創(chuàng)建流處理或者批處理的Watermark,根據(jù)用戶指定watermark生成策略激發(fā)新的watermark,以及創(chuàng)建ReaderOutput,ReaderOutput用于向下游發(fā)送record和watermark
watermark生成
-
WatermarkGenerator有兩個方法onEvent和onPeriodicEmit -
onEvent表示record到達(dá)時,檢查或者記住這個時間戳,參數(shù)里包含WatermarkOutput,可以決定是否發(fā)出watermark -
onPeriodicEmit周期性發(fā)射水印 - 以
BoundedOutOfOrdernessWatermarks為例,record到達(dá)之后會調(diào)用onEvent記住record的時間戳,然后當(dāng)onPeriodicEmit觸發(fā)的時候發(fā)射record時間戳減延遲時間作為watermark
watermark傳遞(KeyedCoProcessFunction為例)
- 一個用戶可以使用或者繼承的算子都有他對應(yīng)的運(yùn)行時operator,負(fù)責(zé)執(zhí)行用戶算子的邏輯,并管理狀態(tài),傳遞watermark。比如
KeyedCoProcessFunction對應(yīng)KeyedCoProcessOperator。 -
KeyedCoProcessOperatorimplementTwoInputStreamOperator,TwoInputStreamOperator里面具有接口函數(shù)processWatermark1和processWatermark2,但是KeyedCoProcessOperator并沒有實現(xiàn)這兩個的方法,他的默認(rèn)實現(xiàn)被放到了父類的父類中。

image.png
-
AbstractUdfStreamOperator是用戶定義function對應(yīng)的operator的基類,KeyedCoProcessOperator繼承了AbstractUdfStreamOperator。

image.png
-
AbstractUdfStreamOperator繼承了AbstractStreamOperator,AbstractStreamOperator是所有stream operator的基類。里面定義了兩個方法processWatermark1和processWatermark2- 處理watermark的邏輯是,處理指定的輸入流的watermark,對于這個兩輸入的算子,就有兩個索引,然后更新這個算子的watermark狀態(tài),更新時間服務(wù)管理器并發(fā)射watermark到下游
- 其中
updateWatermark是關(guān)鍵的方法,他會遍歷所有輸入方向的watermark,然后取其中小的watermark,更新為這個算子的combine watermark。如果所有輸入都是idle,那這個算子也會變?yōu)閕dle狀態(tài) - 如果成功更新了combine watermark,會將這個新的combine watermark更新時間管理服務(wù)以及發(fā)射這個watermark。這涉及兩個東西
InternalTimeServiceManager和Output<StreamRecord<OUT>> -
InternalTimeServiceManager太大了,就不說了 - 這里的
Output其實是WatermarkOutput,他有多種實現(xiàn),有非鏈?zhǔn)剿阕虞敵龊玩準(zhǔn)剿阕虞敵?- 如果是非鏈?zhǔn)剿阕訒atermark寫入網(wǎng)絡(luò)緩沖區(qū),通過網(wǎng)絡(luò)發(fā)給下游算子
- 如果是鏈?zhǔn)剿阕訒苯訉atermark傳遞給下游算子

image.png

image.png
-
IndexedCombinedWatermarkStatus通過索引管理多個輸入流的PartialWatermark,利用CombinedWatermarkStatus計算全局watermark(也就是這個算子對應(yīng)的operator),它是對CombinedWatermarkStatus的封裝,添加了按索引管理輸入流的功能

image.png
-
CombinedWatermarkStatus組合多個PartialWatermark對象,計算全局watermark值和狀態(tài),他維護(hù)了一個PartialWatermark列表-
updateWatermark是更新watermark的關(guān)鍵方法,取小的那個watermark
-

image.png

image.png
-
PartialWatermark表示單個輸入流的watermark狀態(tài)- 每個
PartialWatermark跟蹤一個輸入流的watermark值和idle,active狀態(tài)。 -
onWatermarkUpdate表示收到watermark變動時應(yīng)該做什么動作,這個動作在PartialWatermark初始化的時候會指定
- 每個

image.png
-
WatermarkOutputMultiplexer(watermark輸出多路復(fù)用器)-
WatermarkUpdateListener用來監(jiān)聽某個輸入流的watermark值,發(fā)生變化時會調(diào)用onWatermarkUpdate方法 - 在
WatermarkOutputMultiplexer中當(dāng)某個輸入流watermark更新時,WatermarkOutputMultiplexer會通過WatermarkUpdateListener將更新傳遞到底層的WatermarkOutput -
WatermarkOutputMultiplexer將多個輸入流的watermark更新組合成一個全局的watermark更新。 - 支持兩種類型的輸入:立即輸出和延遲輸出
-

image.png
