聊聊Flink異步I/O機制的原理

在不久前的這篇文章中,提出了一種用Flink做流處理時join外部維度數(shù)據(jù)的簡單方法。但是它的適用情境畢竟有限,通用的方法則是從Flink 1.2版本引入的異步I/O(Async I/O)機制。

異步I/O專門用來解決Flink計算過程中與外部系統(tǒng)的交互問題。在默認情況下,算子向外部系統(tǒng)發(fā)出請求后即阻塞,等待結(jié)果返回才能發(fā)送下一個請求,可能會造成較大的延遲,吞吐量下降。有了異步I/O之后,就可以并發(fā)地發(fā)出請求和接收響應(yīng),延遲大大降低。下圖來自官方文檔,一看便知。

要享受異步I/O帶來的便利,前提就是我們有一個能異步請求外部系統(tǒng)的客戶端。如果原生沒有提供,就得自己創(chuàng)建有限大小的線程池,將客戶端放到線程池里調(diào)用。

異步I/O的原始設(shè)計文檔見FLIP-12。但是隨著時間的推移,它里面的內(nèi)容與目前最新的Flink 1.9版本的實現(xiàn)有了一定出入,所以就不參考它了,直接講講筆者讀過1.9版本的相關(guān)源碼之后總結(jié)出的東西吧。

在調(diào)用AsyncDataStream.orderedWait()/unorderedWait()方法時,本質(zhì)上是產(chǎn)生了一個AsyncWaitOperator算子,它是異步I/O的核心。每個AsyncWaitOperator都由三個主要的部分組成。

  • AsyncFunction:執(zhí)行異步操作的函數(shù),用戶需要覆寫其asyncInvoke()方法并傳入。
  • StreamElementQueue:包含StreamElementQueueEntry的隊列,底層由ArrayDeque實現(xiàn)。
  • Emitter:單獨的守護線程,將異步調(diào)用完成后的結(jié)果發(fā)送給下游算子。

所謂StreamElementQueueEntry就是StreamElement(Flink基礎(chǔ)概念,可以是流中的一條數(shù)據(jù),或是一個水印等)的簡單封裝,通過j.u.c.CompletableFuture實現(xiàn)異步返回。CompletableFuture是JDK 8提供的新特性,可以認為是非常好用的Future改進版,這里就不再展開講了。

以下是以StreamElementQueueEntry為中心展開的類圖??垂贂⒁獾剿袃煞N實現(xiàn):代表數(shù)據(jù)的StreamRecordQueueEntry,和代表水印的WatermarkQueueEntry。它們都持有CompletableFuture。

AsyncWaitOperator的機制可以用下面的簡圖來表示。

  1. 來自上游的StreamElement進入AsyncWaitOperator的StreamElementQueue,并被封裝成StreamElementQueueEntry。
  2. AsyncWaitOperator調(diào)用傳入的AsyncFunction的asyncInvoke()方法,該方法異步地與外部系統(tǒng)交互。
  3. 異步操作完成后,由asyncInvoke()方法顯式地調(diào)用ResultFuture.complete()方法,將結(jié)果返回;或者調(diào)用completeExceptionally()方法表示出現(xiàn)了異常。ResultFuture就是CompletableFuture的代理接口。
  4. Emitter線程從StreamElementQueue中拉取那些已經(jīng)完成了的StreamElementQueueEntry,并輸出到下游算子。

以上的分析說明了AsyncWaitOperator的工作流程,但是沒有考慮輸出流的順序性。實際上會有以下兩種情況:

  • 調(diào)用AsyncDataStream.orderedWait():創(chuàng)建OrderedStreamElementQueue隊列,保持請求的順序與輸出結(jié)果的順序相同,亦即先進先出。
  • 調(diào)用AsyncDataStream.unorderedWait():創(chuàng)建UnorderedStreamElementQueue隊列,不保持順序。在采用處理時間時,先返回的結(jié)果先輸出。而采用事件時間時,需要額外保證水印的邊界不錯亂。

簡單討論一下。

  • 有序
    有序是最簡單的情況,只需要將元素按照到來的順序放入OrderedStreamElementQueue。只有當(dāng)隊列中的隊頭請求異步操作返回了結(jié)果,才會觸發(fā)Emitter輸出,后面的請求先返回也只能等待。

  • 無序(處理時間)
    這種情況也不難辦。在UnorderedStreamElementQueue中維護兩個子隊列,一個是未完成請求的隊列(uncompletedQueue),一個是已完成請求的隊列(completedQueue)。所有請求都先進入uncompletedQueue并執(zhí)行異步操作,并按照操作完成的順序進到completedQueue中。Emitter從completedQueue拉取并輸出結(jié)果即可。如下圖所示。

  • 無序(事件時間)
    這是比較復(fù)雜的情況:我們允許兩個水印之間的元素亂序,但是水印不能亂。所以在使用兩個隊列的同時,uncompletedQueue中還必須存儲水印,這就是上面的WatermarkQueueEntry的由來。在水印之間存儲的也不再是單個StreamElementQueueEntry,而是它們的集合。只有當(dāng)uncompletedQueue中的隊頭集合有元素的異步操作返回了,才能將其移動到completedQueue里面。這樣就可以保證在通過某個水印之前,它前面的所有異步請求都完成。如下圖所示。

異步I/O的檢查點做起來很容易。由上面的分析可以知道,StreamElementQueue保存的就是尚未完成異步請求的元素,以及已完成異步請求但還沒有送到Emitter發(fā)送的元素,只要遍歷該隊列,并將它們都放入狀態(tài)后端就OK。

Happy Friday night,晚安。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容