Flink 1.19 Watermark源碼分析

Watermark

  • watermark用于指示事件的進(jìn)展,一個時間戳為T的watermark表示事件時間已經(jīng)推進(jìn)到了T,而且不會有比T更早的事件出現(xiàn)
  • watermark一般在數(shù)據(jù)流的源頭創(chuàng)建,并通過流和操作符傳播
  • 當(dāng)?shù)竭_(dá)流的末尾是,會發(fā)出一個Long.MAX_VALUE表示終止水印


    image.png

前提

所有算子包括Source和Sink都是邏輯數(shù)據(jù)流圖,需要StreamGraphGeneratorJobGraphGenerator生成物理執(zhí)行計劃,根據(jù)information通過translator將算子轉(zhuǎn)為operator。也就是說,flink job的編碼是兩條線,一條明線,即用戶編寫的邏輯,一條暗線是flink內(nèi)部真正封裝了算子行為,水位線傳遞,Checkpoint生成邏輯的物理執(zhí)行計劃。

watermark的源頭

  • TimestampsAndWatermarks是source從事件提取時間戳生成watermark的重要接口
  • WatermarkUpdateListener接口用于監(jiān)聽
  • createMainOutput 創(chuàng)建ReaderOutput,用于Source Reader輸出數(shù)據(jù),包含內(nèi)部處理時間戳邏輯和watermark生成邏輯
  • startPeriodicWatermarkEmitsstopPeriodicWatermarkEmits 啟動和停止周期性watermark生成
  • emitImmediateWatermark 立即發(fā)出watermark
  • 兩個工廠方法createProgressiveEventTimeLogiccreateNoOpEventTimeLogic ,會根據(jù) WatermarkStrategy 生成TimestampsAndWatermarks實例
    • createProgressiveEventTimeLogic 創(chuàng)建一個符合漸進(jìn)式事件時間邏輯的TimestampsAndWatermarks 實例,用于流處理
    • createNoOpEventTimeLogic 創(chuàng)建一個無操作的TimestampsAndWatermarks 實例,用于批處理,跳過watermark生成邏輯
image.png
  • 接口TimestampsAndWatermarksProgressiveTimestampsAndWatermarksNoOpTimestampsAndWatermarks實現(xiàn),他們唯一的創(chuàng)建方式是調(diào)用TimestampsAndWatermarks的工程方法創(chuàng)建
  • 那么,是誰創(chuàng)建了TimestampsAndWatermarks了呢,是SourceOperator,Source在物理計劃的實現(xiàn)。在SourceOperatoropen方法創(chuàng)建了eventTimeLogic,可以看到watermarkStrategy代表用戶指定的生成策略,我們指定的BoundedOutOfOrdernessWatermarks策略就是在這里被傳遞進(jìn)物理執(zhí)行計劃的
image.png
  • 話分兩頭,SourceOperator需要具有將record和watermark輸出的能力,Source的輸出由ReaderOutput 定義,他實現(xiàn)了SourceOutput 能力,是SourceReader和下游算子交互的核心接口之一
  • 數(shù)據(jù)記錄發(fā)送
    • collect(T record) 發(fā)送不帶時間戳的記錄
    • collect(T record, long timestamp) 發(fā)送帶時間戳的記錄
  • watermark發(fā)送,emitWatermark(Watermark watermark) 發(fā)送watermark,標(biāo)記事件進(jìn)展
  • idle狀態(tài)管理,markIdle() 標(biāo)記當(dāng)前輸出為idle狀態(tài),表示下游算子無需等待該輸出的watermark
  • split管理
    • createOutputForSplit(String splitId) 為特定分片創(chuàng)建一個獨立的SourceOutput ,用于設(shè)置各個分片的創(chuàng)建邏輯,比如創(chuàng)建水位線
    • releaseOutputForSplit(String splitId) 釋放特定分片的SourceOutput
image.png
  • SourceOperator 中,output就是成員對象currentMainOutput,他被上文eventTimeLogic所創(chuàng)建,eventTimeLogicTimestampsAndWatermarks類型,當(dāng)我們使用流處理時,eventTimeLogic的類型是ProgressiveTimestampsAndWatermarks
  • ProgressiveTimestampsAndWatermarks實現(xiàn)了createMainOutput方法,在這個方法中currentMainOutput被賦值為StreamingReaderOutput
  • StreamingReaderOutput繼承自SourceOutputWithWatermarks,SourceOutputWithWatermarks實現(xiàn)了SourceOutput
  • 現(xiàn)在我們就知道了一些事情,SourceOperator有兩個成員變量,類型為TimestampsAndWatermarksReaderOutput,TimestampsAndWatermarks用于創(chuàng)建流處理或者批處理的Watermark,根據(jù)用戶指定watermark生成策略激發(fā)新的watermark,以及創(chuàng)建ReaderOutput,ReaderOutput用于向下游發(fā)送record和watermark

watermark生成

  • WatermarkGenerator有兩個方法onEventonPeriodicEmit
  • onEvent表示record到達(dá)時,檢查或者記住這個時間戳,參數(shù)里包含WatermarkOutput,可以決定是否發(fā)出watermark
  • onPeriodicEmit 周期性發(fā)射水印
  • BoundedOutOfOrdernessWatermarks為例,record到達(dá)之后會調(diào)用onEvent記住record的時間戳,然后當(dāng)onPeriodicEmit觸發(fā)的時候發(fā)射record時間戳減延遲時間作為watermark

watermark傳遞(KeyedCoProcessFunction為例)

  • 一個用戶可以使用或者繼承的算子都有他對應(yīng)的運(yùn)行時operator,負(fù)責(zé)執(zhí)行用戶算子的邏輯,并管理狀態(tài),傳遞watermark。比如KeyedCoProcessFunction對應(yīng)KeyedCoProcessOperator
  • KeyedCoProcessOperatorimplementTwoInputStreamOperator,TwoInputStreamOperator里面具有接口函數(shù)processWatermark1processWatermark2,但是KeyedCoProcessOperator并沒有實現(xiàn)這兩個的方法,他的默認(rèn)實現(xiàn)被放到了父類的父類中。
image.png
  • AbstractUdfStreamOperator 是用戶定義function對應(yīng)的operator的基類,KeyedCoProcessOperator繼承了AbstractUdfStreamOperator
image.png
  • AbstractUdfStreamOperator繼承了AbstractStreamOperator ,AbstractStreamOperator是所有stream operator的基類。里面定義了兩個方法processWatermark1processWatermark2
    • 處理watermark的邏輯是,處理指定的輸入流的watermark,對于這個兩輸入的算子,就有兩個索引,然后更新這個算子的watermark狀態(tài),更新時間服務(wù)管理器并發(fā)射watermark到下游
    • 其中updateWatermark是關(guān)鍵的方法,他會遍歷所有輸入方向的watermark,然后取其中小的watermark,更新為這個算子的combine watermark。如果所有輸入都是idle,那這個算子也會變?yōu)閕dle狀態(tài)
    • 如果成功更新了combine watermark,會將這個新的combine watermark更新時間管理服務(wù)以及發(fā)射這個watermark。這涉及兩個東西InternalTimeServiceManagerOutput<StreamRecord<OUT>>
    • InternalTimeServiceManager 太大了,就不說了
    • 這里的Output 其實是WatermarkOutput,他有多種實現(xiàn),有非鏈?zhǔn)剿阕虞敵龊玩準(zhǔn)剿阕虞敵?
      • 如果是非鏈?zhǔn)剿阕訒atermark寫入網(wǎng)絡(luò)緩沖區(qū),通過網(wǎng)絡(luò)發(fā)給下游算子
      • 如果是鏈?zhǔn)剿阕訒苯訉atermark傳遞給下游算子
image.png
image.png
  • IndexedCombinedWatermarkStatus 通過索引管理多個輸入流的PartialWatermark,利用CombinedWatermarkStatus計算全局watermark(也就是這個算子對應(yīng)的operator),它是對CombinedWatermarkStatus的封裝,添加了按索引管理輸入流的功能
image.png
  • CombinedWatermarkStatus 組合多個PartialWatermark對象,計算全局watermark值和狀態(tài),他維護(hù)了一個PartialWatermark列表
    • updateWatermark是更新watermark的關(guān)鍵方法,取小的那個watermark
image.png
image.png
  • PartialWatermark表示單個輸入流的watermark狀態(tài)
    • 每個PartialWatermark跟蹤一個輸入流的watermark值和idle,active狀態(tài)。
    • onWatermarkUpdate表示收到watermark變動時應(yīng)該做什么動作,這個動作在PartialWatermark初始化的時候會指定
image.png
  • WatermarkOutputMultiplexer(watermark輸出多路復(fù)用器)
    • WatermarkUpdateListener用來監(jiān)聽某個輸入流的watermark值,發(fā)生變化時會調(diào)用onWatermarkUpdate方法
    • WatermarkOutputMultiplexer中當(dāng)某個輸入流watermark更新時,WatermarkOutputMultiplexer會通過WatermarkUpdateListener將更新傳遞到底層的WatermarkOutput
    • WatermarkOutputMultiplexer將多個輸入流的watermark更新組合成一個全局的watermark更新。
    • 支持兩種類型的輸入:立即輸出和延遲輸出
image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容