Flink網(wǎng)絡(luò)緩沖區(qū)

一、Flink TaskManager內(nèi)存模型

Flink TaskManager內(nèi)存模型
  • Task堆上內(nèi)存:用戶代碼執(zhí)行過程中產(chǎn)生的Java對象,如中間數(shù)據(jù)、算子結(jié)果數(shù)據(jù)、緩存數(shù)據(jù)等。
  • Task堆外內(nèi)存:用戶代碼中顯式調(diào)用內(nèi)存分配接口創(chuàng)建的內(nèi)存,比如用戶代碼直接與文件系統(tǒng)或網(wǎng)絡(luò)交互時用到的緩沖區(qū)(DMA技術(shù),如內(nèi)存文件映射、內(nèi)存網(wǎng)絡(luò)映射等)。
  • 托管內(nèi)存:流處理中,算子狀態(tài)數(shù)據(jù)如果存儲到RocksDB,表示RocksDB使用的內(nèi)存。
  • 網(wǎng)絡(luò)緩沖內(nèi)存:Task之間數(shù)據(jù)交換時使用的內(nèi)存(包括線程間的數(shù)據(jù)交換還是跨網(wǎng)絡(luò)的數(shù)據(jù)交換)。

內(nèi)存模型中,除了托管內(nèi)存和網(wǎng)絡(luò)緩沖區(qū)內(nèi)存是為了特殊目的存在,并且需要高處理性能以外,其余內(nèi)存基本屬于JVM進(jìn)程的標(biāo)配。托管內(nèi)存和網(wǎng)絡(luò)內(nèi)存緩沖區(qū)都使用MemorySegment作為底層數(shù)據(jù)結(jié)構(gòu)。

二、網(wǎng)絡(luò)緩沖區(qū)相關(guān)數(shù)據(jù)結(jié)構(gòu)

1. MemorySegment

MemorySegment,是Flink管理的內(nèi)存抽象的最小分配單元。默認(rèn)情況下,一個MemorySegment對應(yīng)著一個32KB大小的內(nèi)存塊。這塊內(nèi)存既可以是堆上內(nèi)存(Java的byte數(shù)組),也可以是堆外內(nèi)存(基于Netty的DirectByteBuffer)。

MemorySegment同時也提供了對二進(jìn)制數(shù)據(jù)進(jìn)行讀取和寫入的方法。對于Java基本數(shù)據(jù)類型,如short、int、long等,MemorySegment 內(nèi)置了方法,可以直接返回或者寫入數(shù)據(jù),對于其他數(shù)據(jù)類型,讀取二進(jìn)制數(shù)組byte[]后進(jìn)行反序列化,序列化為二進(jìn)制數(shù)組byte[]后寫入。

MemorySegment的類關(guān)系

HeapMemorySegment用來分配堆上內(nèi)存,HybridMemorySegment用 來分配堆外內(nèi)存和堆上內(nèi)存。實(shí)際上在2017年之后的Flink中,并沒有使用HeapMemorySegment,而是使用HybridMemorySegment這個類來同 時實(shí)現(xiàn)堆上和堆外內(nèi)存的分配

2. Buffer和NetworkBuffer

Task算子處理數(shù)據(jù)完畢,將結(jié)果交給下游的時候,使用的抽象或 者說內(nèi)存對象是Buffer。Buffer接口是網(wǎng)絡(luò)層面上傳輸數(shù)據(jù)和事件的統(tǒng)一抽象,其實(shí)現(xiàn)類是NetworkBuffer。Flink在各個TaskManager之間傳遞數(shù)據(jù)時,使用的是這一層的抽象。1個NetworkBuffer中包裝了1個 MemorySegment。

Buffer的類關(guān)系

3. BufferPool、LocalBufferPool和NetworkBufferPool

Buffer資源池在Flink中叫作BufferPool。BufferPool用來管理Buffer,包含Buffer的申請、釋放、銷毀、可用Buffer通知等,其實(shí)現(xiàn)類是LocalBufferPool,每個Task擁有自己的LocalBufferPool。

BufferPool的類關(guān)系

為了方便對 BufferPool 的管 理 , Flink 設(shè)計了 BufferPoolFactory,提供BufferPool的創(chuàng)建和銷毀,其唯一的實(shí)現(xiàn)類是NetworkBufferPool。

每個TaskManager只有一個NetworkBufferPool , 同一個TaskManager上的Task共享NetworkBufferPool。

NetworkBufferPool持有該TaskManager在進(jìn)行數(shù)據(jù)傳遞時所能夠使用的所有內(nèi)存,每個Task的 LocalBufferPool 所需要的內(nèi)存都是從 NetworkBufferPool 申請而來的。

三、網(wǎng)絡(luò)緩沖區(qū)內(nèi)存的管理

網(wǎng)絡(luò)緩沖區(qū)(NetworkBuffer)是網(wǎng)絡(luò)交換數(shù)據(jù)的包裝,當(dāng)結(jié)果分區(qū)(ResultParition)開始向下游Task發(fā)送數(shù)據(jù)的時候,需要向LocalBufferPool申請Buffer資源,將數(shù)據(jù)寫入MemorySegment。

1. 內(nèi)存申請

LocalBufferPool的大小是動態(tài)的,在最小MemorySegment數(shù)量與最大MemorySegment數(shù)量之間浮動。使用NetworkBufferPool創(chuàng)建LocalBufferPool時, 如果該TaskManager的內(nèi)存無法滿足所有Task所需的最小 MemorySegment的數(shù)量總和,則會發(fā)生錯誤。

(1)Buffer的申請
結(jié)果分區(qū)(ResultParition)申請Buffer進(jìn)行數(shù)據(jù)寫入,代碼如下:

ResultParition申請Buffer

LocalBufferPool首先從自身持有的MemorySegment中分配可用 的,如果沒有可用的,則從TaskManager的NetworkBufferPool中申 請,如果沒有,則阻塞等待可用的MemorySegment,代碼如下:

LocalBuffer分配MemorySegment

(2)MemorySegment的申請
申請Buffer本質(zhì)上來說就是申請MemorySegment,如果在 LocalBufferPool中,則申請新的堆外內(nèi)存MemorySegment,代碼如下:

LocalBufferPool分配、申請MemorySegment

2. 內(nèi)存釋放

Buffer使用了引用計數(shù)機(jī)制來判斷什么時候可以釋放Buffer到可用資源池。每創(chuàng)建一個BufferConsumer,就會對Buffer的引用計數(shù) +1,每個Buffer被消費(fèi)完,就會對Buffer的引用計數(shù)-1,當(dāng)Buffer引用計數(shù)為0的時候就可以回收了。

(1)Buffer回收
前邊介紹過Buffer的主要實(shí)現(xiàn)類是NetworkBuffer,同時繼承了 AbstractReferenceCountedByteBuf。當(dāng)Buffer被消費(fèi)一次后,就會對 Buffer的引用計數(shù)-1,代碼如下:

NetworkBuffer更新引用計數(shù)

Buffer 回收之后 ,并不會釋放MemorySegment , 此時 MemorySegment仍然在LocalBufferPool的資源池中,除非TaskManager 級別內(nèi)存不足,才會釋放回TaskManager持有的全局資源池。

釋放MemorySegment的時候,同樣要根據(jù)MemorySegment的類型來進(jìn)行,并且要在不低于保留內(nèi)存的情況下,將內(nèi)存釋放回內(nèi)存段中,變?yōu)榭捎脙?nèi)存,后續(xù)申請MemorySegment的時候,可以重復(fù)利用該內(nèi)存片段。

(2)MemorySegment釋放
當(dāng)NetworkBufferPool關(guān)閉的時候進(jìn)行內(nèi)存的釋放,交還給操作系統(tǒng)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容