Flink內(nèi)存管理

前言

參考 : <Flink內(nèi)核原理與實踐>

1. 一個java對象內(nèi)存大小

java所有數(shù)據(jù)類型對應(yīng)的字節(jié)大小

類型 大小(byte) 說明
Long 8
Double 8
Int 4
Float 4
Char 2
Short 2
Byte 1
Boolean 1
Class Point 8 保存對象的引用指針,指針壓縮后為4個字節(jié)
Mark Word 8 保存運行時的數(shù)據(jù) : hash,鎖狀態(tài)等.
Array length 4 如果對象是數(shù)組需要額外的4個字節(jié)記錄數(shù)組長度

上面是按照64位的,如果是32位有所不同

java對象的組成 : 對象頭,實例數(shù)據(jù),對齊部分

類型 說明
對象頭 由Markword和類指針組成,運行時存儲對象運行時數(shù)據(jù){hash code,gc年齡,鎖標志等..},32系統(tǒng)頭大小8byte,64系統(tǒng)為16byte開啟指針壓縮為12byte
實例數(shù)據(jù) 當前對象中的實例字段,有上面的數(shù)據(jù)類型組成
對齊 JVM要求對象大小比須是8的倍數(shù),為了使對象達到8的倍數(shù)而補充的數(shù)據(jù)

JVM關(guān)于壓縮指針的參數(shù)

-XX:-+UseCompressedOops : 普通對象指針壓縮(OOP即ordinary object pointer)

-XX:-+UseCompressedClassPointers : 類型指針壓縮,即針對klass pointer的指針壓縮

class A {
  int i;
  String s;
}   //                                                是否開啟指針壓縮
// 那么一個new A()所占用的大小 =  16/12(對象頭) + 4(int) + 8/4(ref) + 4/4(對齊) = 32/24byte;

測試對象大小的代碼,具體展示結(jié)果可自行測試,下面是具體測試代碼

/**
 * -XX:-+UseCompressedClassPointers 壓縮class指針的. 對象頭中使用 開啟這個參數(shù)需要開啟下個參數(shù)才可生效
 * -XX:-+UseCompressedOops 壓縮對象值指針的,
 *  maven opjdk提供的
            <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.10</version>
        </dependency>
 * @author xuzhiwen
 */
public class Test {
  public static void main(String[] args) throws NoSuchFieldException {
    extracted(new Object(), "Object ");
    extracted(new int[1], " int[1]");
    extracted(new int[10], " int[10]");
    extracted(new int[0], "int[0]");
    extracted(new Integer[1], "Integer[1] ");
    extracted(new Integer[0], "Integer[0]");
    extracted(new T(new T()), "T");
    extracted(new T2(), "T2");
  }

  private static void extracted(Object o, String s) {
    System.out.println(s + " 占用大小");
    System.out.println(ClassLayout.parseInstance(o).toPrintable());
    System.out.println();
  }

  static class T2 {}
  static class T {
    public static T t2 = new T();
    int i;
    T t;
    public T(T t) {this.t = t;}
    public T() {}
  }
}

jvm 序列化缺點

  • 無法跨語言 : java序列化的對象,其他語言無法反序列化,除非實現(xiàn)java序列化協(xié)議
  • 易被攻擊 : java無法保證序列化和反序列化的安全性,需要手動添加代碼黑白名單,判斷等方式防止被攻擊
  • 流太大,性能差 : 序列化后的字節(jié)比目前主流的protobuf,kryo等序列化框架要大很多,并且耗時要久一點

可以看到j(luò)ava的序列化有很多不足的地方,所以在很多框架中都是選擇自己實現(xiàn)序列化或者使用一些主流的開源框架

2. heap & off-heap

  • heap (堆內(nèi)存)

    • jvm的內(nèi)存區(qū)域中,占用內(nèi)存空間最大的一部分叫做堆'heap',也就是我們所說的堆內(nèi)存. jvm中的heap主要是存放所有對象的實例,這一塊區(qū)域在jvm啟動的時候被創(chuàng)建,并被所有的線程所共享,同時也是垃圾收集器的主要工作區(qū)域
  • off-heap (非堆內(nèi)存/堆外內(nèi)存)

    • 堆外內(nèi)存意味著把一些對象的實例分配在Java虛擬機堆內(nèi)存以外的內(nèi)存區(qū)域,這些內(nèi)存直接受操作系統(tǒng)而不是jvm管理,這樣做的結(jié)果就是能保持一個較小的堆,以減少垃圾收集對程序的影響
    • 為了解決heap過大導(dǎo)致gc停頓過長的問題,java可以通過off-heap來緩解這個問題, off-heap可以通過零拷貝, 以減少數(shù)據(jù)從jvm內(nèi)存到系統(tǒng)之間的拷貝次數(shù)

3. Flink為什么自己管理內(nèi)存

  • java序列化問題
    • 序列化效率低,序列化時間過久
    • 序列化密度低,序列化后的字節(jié)數(shù)組過大
  • GC & oom問題
    • 由于大數(shù)據(jù)情況下,jvm啟動會開辟較大空間,導(dǎo)致full gc的時候時間較久,極大的影響任務(wù)性能
    • 數(shù)據(jù)量大的情況下,當jvm對象分配的大小超過jvm內(nèi)存(heap)時,會拋出oom,導(dǎo)致jvm停止,影響框架健壯性和整體性能

4. Flink自主管理內(nèi)存的好處

  • 將內(nèi)存抽象成MemorySegment,每個MemorySegment(底層就是一個字節(jié)數(shù)組)默認大小32K
  • segment會存儲在老年代,segment會被持續(xù)引用,不會被ygc的時候回收掉,那么老年代的內(nèi)存有很大一部分是不變的,如果full gc 那么一定是新生代對象移動到老年代中造成,這樣對于管理老年代和控制full gc就會變得輕松一些.在新生代中創(chuàng)建的對象很大一部分會被flink通過序列化存入segment中,只要控制好非序列化的對象即可,這樣內(nèi)存管理會變得簡單很多. memorySegment在后面會介紹
  • 自定義序列化.彌補java序列化的不足,在flink中處理的對象都是經(jīng)過序列化的,然后存入segment中,所以user創(chuàng)建對象大部分都被在老年代中,由于flink的序列化性能較高,會使內(nèi)存占用降低,這樣老年代的對象也很穩(wěn)定,發(fā)生full gc的次數(shù)就會被減少,對象存儲在segment的字節(jié)數(shù)組中,當對象被釋放的時候我們只需要通過調(diào)整數(shù)組的指針即可
  • 直接操作二進制數(shù)據(jù).flink可以直接對序列化后的字節(jié)進行計算,所以內(nèi)存使用和計算效率非常高.二進制數(shù)據(jù)的操作結(jié)果依然是字節(jié),同樣保存在segment中,只有在需要的時候才會反序列化成對象.序列化是消耗cpu的主要操作,因此flink的計算避免了大量的序列化操作

一.Flink內(nèi)存模型

內(nèi)存模型.png

上面圖為TaskManager內(nèi)存模型,左邊為細分的內(nèi)存模型,右邊為整體內(nèi)存模型,該圖摘自Flink官網(wǎng)

內(nèi)存可以分為三部分

heap內(nèi)存

  • TaskManager使用的heap內(nèi)存,作為flink(Runtime)框架使用內(nèi)存的一部分
  • Task使用的heap內(nèi)存,用于用戶代碼使用

non-heap內(nèi)存

  • 托管內(nèi)存 : 實時用于rocksDB狀態(tài)后端,離線則用于排序,hash,中間結(jié)果緩存,不預(yù)分配參數(shù) managed memory
    taskmanager.memory.preallocate: false
  • 直接內(nèi)存 :
    • TaskManager使用的直接內(nèi)存,作為flink(Runtime)框架使用內(nèi)存的一部分
    • Task使用的直接內(nèi)存,用于用戶代碼使用
    • 用于網(wǎng)絡(luò)傳輸使用的直接內(nèi)存

jvm使用內(nèi)存

  • JVM 元空間,即jvm的方法區(qū),可以通過jvm參數(shù)進行配置
  • JVM開銷保留的內(nèi)存,如線程堆棧,jit緩存,gc

關(guān)于內(nèi)存的判斷

  • 若是 Flink 有硬限制的分區(qū),Flink 會報該分區(qū)內(nèi)存不足。否則進入下一步。
  • 若該分區(qū)屬于 JVM 管理的分區(qū),在其實際值增長導(dǎo)致 JVM 分區(qū)也內(nèi)存耗盡時,JVM 會報其所屬的 JVM 分區(qū)的 OOM (比如 java.lang.OutOfMemoryError: Jave heap space)。否則進入下一步。
  • 該分區(qū)內(nèi)存持續(xù)溢出,最終導(dǎo)致進程總體內(nèi)存超出容器內(nèi)存限制。在開啟嚴格資源控制的環(huán)境下,資源管理器(YARN/k8s 等)會 kill 掉該進程,在該情況下通常由開啟了rocksDB導(dǎo)致的

heap內(nèi)存在jvm啟動的時候申請的一塊不變的內(nèi)存區(qū)域,該內(nèi)存實際上是Flink和task公用的一塊區(qū)域,在flink層面通過控制來區(qū)分框架使用和task內(nèi)存,heap內(nèi)存管理起來是比較容易的,實際上non-heap的內(nèi)存是難管理的一塊,如果管理不當或者使用不當可能造成內(nèi)存泄漏或者內(nèi)存無限增長等問題

內(nèi)存參數(shù)配置

# ------------------- 堆內(nèi) ------------------------------

#flink框架使用內(nèi)存,默認128,不計入slot
taskmanager.memory.framework.heap.size = xxxMB 
#task使用的內(nèi)存,即用戶代碼
taskmanager.memory.task.heap.size = xxxMB 

# ------------------- 堆外 -------------------------------

#flink框架使用 默認128
taskmanager.memory.framework.off-heap.size = xxxMB 
#task使用 , 默認0
taskmanager.memory.task.heap.size = xxxMB 
# network使用的內(nèi)存大小 默認 min=64mb max=1gb fraction=0.1
# 通過fraction計算出來值若比最小值小或比最大值大,就會限制到最小值或者最大值,比例是按照總內(nèi)存計算
taskmanager.memory.network.off-heap.[min/max/fraction] = xxxMB/xxxMB/0.x 
# 托管內(nèi)存 ManagedMemory 計算同上,默認0.4
taskmanager.memory.managed.off-heap.[size/fraction]= 0.x

#------------------- jvm使用 -----------------------------

# jvm元空間使用
taskmanager.memory.jvm-metaspace = xxxMB 
# jvm執(zhí)行的開銷,堆棧,io編譯換成等使用的內(nèi)存
taskmanager.memory.jvm-overhead = xxxMB 

# ------------------ 總內(nèi)存  ------------------------------

# 綜上框架使用的內(nèi)存堆和堆外內(nèi)存,通過該參數(shù)控制,
taskmanager.memory.flink.size = xxxMB 
# flink任務(wù)進程的使用的內(nèi)存
taskmanager.memory.porcess.size = xxxMB 

# ------------------ jvm線束 ------------------------------
# 對應(yīng)于上面的jvm相關(guān)參數(shù),實際上都是根據(jù)上面內(nèi)存配置計算出來的jvm參數(shù),在啟動tm的時候會指定對應(yīng)的jvm參數(shù)
-Xmx / -Xms 配置jvm堆內(nèi)存大小
-XX:MaxDirectMemorySize jvm直接內(nèi)存
-XX:MeataspaceSize 元空間使用內(nèi)存

二.Flink內(nèi)存管理

1.存在的問題

1). jvm問題

  • 數(shù)據(jù)密度低
    • 上面我們已經(jīng)介紹了java對象在內(nèi)存的分配,由于有大量的額外對齊數(shù)據(jù),會導(dǎo)致jvm中有效數(shù)據(jù)信息密度低,如果在大量對象的情況下會占用過多額外的內(nèi)存空間
  • gc影響
    • jvm的gc雖然內(nèi)存泄漏的可能已經(jīng)開發(fā)人員的工作量,但是gc回收是不可控的,在tb,pb級的數(shù)據(jù)計算需要大量內(nèi)存,在內(nèi)存中生成大量的對象導(dǎo)致在full gc的時候會用時較久直接影響性能,甚至還會導(dǎo)致flink心跳超時問題
  • oom影響
    • 如果出現(xiàn)oom,則jvm直接崩潰,影響flink健壯性和性能問題
  • 緩存未命中
    • 由于java對象在堆上存儲不是連續(xù)的,所以從內(nèi)存讀取java對象時,鄰近的數(shù)據(jù)通常不是cpu下一步需要計算的數(shù)據(jù),這就是緩存未命中,此時cpu等待從內(nèi)存重新讀取數(shù)據(jù),此時cpu空轉(zhuǎn),cpu的速度和內(nèi)存速度差距比較大,那么執(zhí)行效率也會隨之下降

2). 自主管理

  • 自定義序列化工具 : 將數(shù)據(jù)序列化成二進制數(shù)據(jù)存入在MemorySegment中,并且提供一些高效的讀寫方法,一些計算可以直接操作二進制,減少序列化,如果需要序列化直接序列化需要計算的數(shù)據(jù)即可,無需全部序列化
  • 合理使用堆外內(nèi)存 : 堆外在寫磁盤和網(wǎng)絡(luò)io中是通過零拷貝的,而堆內(nèi)需要在用戶態(tài)復(fù)制一次,提高io效率
  • 緩存友好的數(shù)據(jù)結(jié)構(gòu)和算法 : 通過友好的緩存,以較少緩存未命中,提高cpu的執(zhí)行效率

3). 堆外不足

  • 堆外出現(xiàn)問題是難以排查,操作不當容易內(nèi)存泄漏

  • 對于聲明周期短的MemorySegment,如果分配在堆外,開銷跟更高,分配時間要比堆內(nèi)分配響應(yīng)慢很多

    通過下面代碼測試堆內(nèi)和堆外內(nèi)存的分配效率

    public void test(){
        for (int i = 1; i < 60; i++) {
          System.out.println("第" + i + "循環(huán)");
          alloc(1024 * 1024, 1024);
        }
    private static void alloc(int size, int s) {
        printTakesTime( s, "分配堆內(nèi)存用時 = ",val1 -> {
              byte[] bytes = new byte[size];});
        printingTakesTime( s, "分配直接內(nèi)存用時 = ", val1 -> {
              ByteBuffer buf = ByteBuffer.allocateDirect(size);});
        System.gc();
      }
     
    private static void printTakesTime( int s, String str, Consumer<Integer> consumer) {
        long start = System.currentTimeMillis();
        for (int i = 0; i < s; i++) {
          consumer.accept(null);
        }
        System.out.println(str + (System.currentTimeMillis() - start) + "ms ");
      }
    }
    

2. 內(nèi)存數(shù)據(jù)結(jié)構(gòu)

1).MemorySegment (內(nèi)存段)

在flink中對內(nèi)存進行了抽象成了MemorySegment,?默認情況下,一個 MemorySegment 對應(yīng)著一個 32KB 大小的內(nèi)存塊,這塊內(nèi)存既可以是堆上內(nèi)存( byte數(shù)組) ,也可以是堆外內(nèi)存(nio的ByteBufferr ) .

同時MemorySegment也提供了對二進制數(shù)據(jù)的操作方法,以及讀取字節(jié)數(shù)組序列化以及序列化字節(jié)數(shù)組的方法等

下面是類繼承圖,該類有兩MemorySegment實現(xiàn)類有兩個分別為使用heap的以及混合的即有heap和non-heap,對于內(nèi)存的訪問有子類具體的實現(xiàn)


內(nèi)存段繼承圖

在MemorySegment類的注釋中有兩段話,我們解釋一下

Note on efficiency: For best efficiency, the code that uses this class should make sure that only one subclass is loaded, or that the methods that are abstract in this class are used only from one of the subclasses (either the HeapMemorySegment, or the HybridMemorySegment).

That way, all the abstract methods in the MemorySegment base class have only one loaded actual implementation. This is easy for the JIT to recognize through class hierarchy analysis, or by identifying that the invocations are monomorphic (all go to the same concrete method implementation). Under these conditions, the JIT can perfectly inline methods.

解釋一下上面的兩句話

為了提升效率應(yīng)該只加載一個MemorySegment的子類或者,調(diào)用抽象方法的時候只調(diào)用其中一個子類的,避免交叉使用

這樣MemorySegment的抽象方法只有一個已加載的實際發(fā)現(xiàn),通過類的層次分析或確定是單態(tài)(所有調(diào)用都指向一個具

體實現(xiàn))的,這樣JIT很容易識別并(去虛化)進行方法內(nèi)聯(lián)

JIT(Just In Time)優(yōu)化

jvm是編譯和解釋

  • 熱點代碼優(yōu)化 : 對執(zhí)行的熱點代碼進行編譯成本地代碼,執(zhí)行編譯后的代碼效率會更快,編譯后的代碼會比膨脹,所以只有熱點代碼才會被編譯
  • 完全去虛化 : 通過類型推導(dǎo)或者類層次分析,識別虛方法調(diào)用的唯一目標方法,將其轉(zhuǎn)換為直接調(diào)用
   static class A{
   void a(){System.out.println("a");}
   }
   static class A1 extends A{
   @Override
   void a() { System.out.println("a1");}
   }
   static class A2 extends A{
   @Override
   void a() { System.out.println("a2");}
   }
   public static void main(String[] args) throws Exception {
   // 子類實現(xiàn)了父類的方法,構(gòu)建對象的時候通過子類引用指向父類
   // 在調(diào)用方法的時候只能調(diào)用父類擁有的方法,但是子類有重寫或者實現(xiàn)了對應(yīng)的方法
   // 在編譯時調(diào)用父類方法,運行時調(diào)用具體實現(xiàn)類方法
   // 那么這個方法就是虛方法
   
   // 如果只有A1.class被加載,通過層次分析,很容易識別到目標方法,
   // 如果A2.class也被加載,那么就會通過具體的類--------------
   final A a = new A1();
   a.a();
   }
  }
  • 方法內(nèi)聯(lián) : 將目標方法代碼轉(zhuǎn)移至當前代碼中,避免入棧和出棧不必要的開銷
   public void t1(){
   int r = sum(1,2)+ sum(2,3);
   // 優(yōu)化后 , 代碼直接嵌入
   // int r = 1+2+2+3;
   }
   public int sum(int a,int b){
   return a + b;
   }
  }
  • ......等
public abstract class MemorySegment {
 /** unsafe對象,用于操作heap an non-heap內(nèi)存 */
 @SuppressWarnings("restriction")
 protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
 /* 數(shù)組對象的偏移量,基本為16,因為是相對于數(shù)組對象而言,前面是對象頭,后面是數(shù)據(jù),當然不完全是,通常情況下而已 */
 @SuppressWarnings("restriction")
 protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
 /*  判斷是大端法還是小端法,不同的cpu會使用不同的字節(jié)順序 大端 : 低地址存放最高有效字節(jié), 小端與大端相反   */
 private static final boolean LITTLE_ENDIAN =
 (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
 // ------------------------------------------------------------------------
 /* 如果使用heap的情況下為字節(jié)數(shù)組對象,non-heap情況下為null */
 protected final byte[] heapMemory;
 /* 內(nèi)存地址,heapMemory =null 則為絕對地址,否則為heapMemory的相對地址 */
 protected long address;
 /*  標識地址結(jié)束位置, address+size   */
 protected final long addressLimit;
 /** memorySegment的大小 heap情況為字節(jié)數(shù)組長度,non-heap為byteBuffer的capacity */
 protected final int size;
 /**  memory segment owner */
 private final Object owner;

 // ----------------------------------------- 構(gòu)造方法 ---------------------------------------------
 // 是用heap情況下的構(gòu)造方法
 MemorySegment(byte[] buffer, Object owner) {
 this.heapMemory = buffer;
 this.address = BYTE_ARRAY_BASE_OFFSET; // address為字節(jié)數(shù)組的相對偏移量
 this.size = buffer.length; // 字節(jié)長度
 this.addressLimit = this.address + this.size; // address結(jié)束地址
 this.owner = owner;
 }
 // non-heap的構(gòu)造方法
 MemorySegment(long offHeapAddress, int size, Object owner) {
 this.heapMemory = null; // heapMemory為null
 this.address = offHeapAddress; // 堆外對絕對地址,
 this.addressLimit = this.address + size;
 this.size = size; // 堆外申請的內(nèi)存大小
 this.owner = owner;
 }
 // ------------------------------------------------------------------------------------------------

}

2).DataInputView/DataOutputView

MemorySemgent是flink內(nèi)存分配的最小單元了,對于數(shù)據(jù)夸MemorySemgent保存,那么對于上層的使用者來說,需要考慮考慮所有的細節(jié),由于過于繁瑣,所以在MemorySemgent上又抽象了一層內(nèi)存也,內(nèi)存也是在MemorySemgent數(shù)據(jù)訪問上的視圖,對數(shù)據(jù)輸入和輸出分別抽象為DataInputView/DataOutputView,有了這一層,上層使用者無需關(guān)心跨MemorySemgent的細節(jié)問題,內(nèi)存也對自動處理跨MemorySemgent的內(nèi)存操作

DataInputView

DataInputView繼承DataInput,DataInputView是對MemorySemgent讀取的抽象視圖,提供一系列讀取二進制數(shù)據(jù)不同類型的方法,AbstractPageInputView是DataInputView的一個抽象實現(xiàn)類,并且基本所有InputView都實現(xiàn)了該類,即所有實現(xiàn)該類的InputView都支持Page

InputView持有了多個MemorySemgent的引用(可以基于數(shù)組,list,deque等),這些MemorySemgent被視為一個內(nèi)存頁,可以順序,隨機等方式讀取數(shù)據(jù),要基于不同的實現(xiàn)類,實現(xiàn)類不同讀取方式不同

方法圖

dataInput方法圖.png

類繼承圖
DataInput繼承關(guān)系.png

DataOutputView

與DataInputView相對應(yīng),繼承Output,并有一個擁有Page功能的抽象類(AbstractPagedOutputView),其大部outputView的實現(xiàn)都是繼承自該抽象類,對一組MemorySemgent提供一個基于頁的寫入功能

方法圖

view方法

類繼承圖

DataOutputView繼承關(guān)系.png

3).Buffer

用于網(wǎng)絡(luò)io數(shù)據(jù)的包裝,每個buffer持有一個MemorySegment的引用,resultPartition寫數(shù)據(jù)的時候,會向LocalBufferPool申請Buffer,會返回BufferBuilder,通過BufferBuilder想Buffer<實際寫入的是MemorySegment>寫數(shù)據(jù)

BufferBuilder是在上游Task中,負責想Buffer寫入數(shù)據(jù),BufferConsumer位于下游,與BufferBuilder相對應(yīng),用于消費Buffer的數(shù)據(jù),每個bufferBuilder對應(yīng)一個bufferConsumer

常用參數(shù)介紹

public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer {
    /* buffer中持有的內(nèi)存段 */
    private final MemorySegment memorySegment;
    /* 用于回收MemorySegment的回收器 */
    private final BufferRecycler recycler;
    /* buffer分配器,netty會使用到*/
    private ByteBufAllocator allocator;
    /* 當前buffer的容量 */
    private int currentSize;
      
  // 釋放buffer,引用計數(shù)-1, 引用計數(shù)=0則調(diào)用deallocate方法回收buffer
      @Override
    public void recycleBuffer() {
        release();
    }
    // 保留buffer,原理就是引用計數(shù)+1
    @Override
    public NetworkBuffer retainBuffer() {
        return (NetworkBuffer) super.retain();
    }
  // 回收MemorySegment,放入pool
    @Override
    protected void deallocate() {
        recycler.recycle(memorySegment);
    }
  // ....... 忽略
}

buffer申請

// ------------------ resultPartition ------------------------------------------
// BufferWritingResultPartition.java
// 請求Buffer
private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
            throws IOException {
  // bufferPool = LocalBufferPool,請求本地buffer,如果LocalBufferPool沒有memorySegment則會向全局的資源池申請memorySegment
  // 實際上請求buffer就是請求memorySegment的過程,memorySegment被bufferBuilder包裝了一下,方便后面使用
        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
        if (bufferBuilder != null) {
            return bufferBuilder;
        }

        final long start = System.currentTimeMillis();
        try {
            // 當前面請求不到的時候,則會通過block的形式請求,阻塞直到有可用buffer返回
            bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
            idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
            return bufferBuilder;
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for buffer");
        }
    }

// ------------------- 

buffer回收

當buffer用完之后需要進行回收比如在netty的clientHandler收到響應(yīng)之后進行處理就會把buffer回收掉,buffer回收之后并不會釋放memorySegment,而是放回池中,變?yōu)榭捎脙?nèi)存,反復(fù)使用

  // NetworkBuffer.java
   @Override
    public void recycleBuffer() {
      // 由于繼承了netty的AbstractReferenceCountedByteBuf類,所以也具有引用計數(shù)功能
      // 調(diào)用了release之后會使引用計數(shù)-1,當count=0的時候就會回收buffer了,buffer回收了并不會釋放memorySegment
        release();
    }

// -----------------------------------------------------------------------------

// LocalBufferPool.java
// 廢品回收站
private void recycle(MemorySegment segment, int channel) {
        BufferListener listener;
        CompletableFuture<?> toNotify = null;
        NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
        while (!notificationResult.isBufferUsed()) {
            synchronized (availableMemorySegments) {
                if (channel != UNKNOWN_CHANNEL) {
                    if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {
                        unavailableSubpartitionsCount--;
                    }
                }

                if (isDestroyed || hasExcessBuffers()) {
                  // 返回給全局bufferPool
                    returnMemorySegment(segment);
                    return;
                } else {
                    listener = registeredListeners.poll();
                    if (listener == null) {
                      // 返回給localBufferPool
                        availableMemorySegments.add(segment);                              
                        break;
                    }
                }

                checkConsistentAvailability();
            }
            notificationResult = fireBufferAvailableNotification(listener, segment);
        }
        mayNotifyAvailable(toNotify);
    }

4).BufferBuilder & BufferConsumer

省略 ...................

就是寫入和消費MemorySegment的

3.MemroyManager

flink托管的內(nèi)存,托管內(nèi)存使用堆外內(nèi)存,用于批處理緩存排序等以及提供rocksDB內(nèi)存

內(nèi)存申請
class MemoryManager {
      public List<MemorySegment> allocatePages(Object owner, int numPages)
            throws MemoryAllocationException {
        List<MemorySegment> segments = new ArrayList<>(numPages);
        allocatePages(owner, segments, numPages);
        return segments;
    }
    // 申請內(nèi)存
     public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)
            throws MemoryAllocationException {

        long memoryToReserve = numberOfPages * pageSize;
        try {
            memoryBudget.reserveMemory(memoryToReserve);
        } catch (MemoryReservationException e) {
            throw new MemoryAllocationException( String.format("Could not allocate %d pages", numberOfPages), e);
        }
                
       // 注冊一個釋放內(nèi)存的清理函數(shù)
        Runnable gcCleanup = memoryBudget.getReleaseMemoryAction(getPageSize());
       // 申請內(nèi)存直接內(nèi)存,并將釋放內(nèi)存的runnable傳入,用于回收內(nèi)存
        allocatedSegments.compute(
                owner,
        (o, currentSegmentsForOwner) -> {
                    Set<MemorySegment> segmentsForOwner =
                            currentSegmentsForOwner == null
                                    ? new HashSet<>(numberOfPages)
                                    : currentSegmentsForOwner;
                    for (long i = numberOfPages; i > 0; i--) {
                        MemorySegment segment =
                          // 申請一個HybridMemorySegment內(nèi)存
                                allocateOffHeapUnsafeMemory(getPageSize(), owner, gcCleanup);
                        target.add(segment);
                        segmentsForOwner.add(segment);
                    }
                    return segmentsForOwner;
                });
    }
}

    // MemorySegmentFactory
    public static MemorySegment allocateOffHeapUnsafeMemory(
            int size, Object owner, Runnable gcCleanupAction) {
        long address = MemoryUtils.allocateUnsafe(size);
      // 通過unsafe分配一個directBytebuffer
        ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
      // 用于回收內(nèi)存的清理器
        Runnable cleaner =
                MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);
        return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
    }
    
        // MemoryUtils
    static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {
        try {
            ByteBuffer buffer = (ByteBuffer) UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);
          // 為byteBuffer的addree 和 capacity賦值
            UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
            UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
            buffer.clear();
            return buffer;
        } catch (Throwable t) {
            throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t);
        }
    }
內(nèi)存釋放
public final class HybridMemorySegment extends MemorySegment {
      @Override
    public void free() {
      // 調(diào)用父類的free方法
        super.free();
      // 執(zhí)行清理器,cleaner會執(zhí)行到下面的兩個方法
        if (cleaner != null) {
            cleaner.run();
        }
      // 切斷引用,gc直接回收
        offHeapBuffer = null;
    }
}

    // UnsafeMemoryBudget.java    --- MemoryManager的一個參數(shù),上面申請內(nèi)存的時候用到了
    // 通過cas的方式循環(huán)釋放,實際的上釋放并不是真正意義上的釋放,只是修改了指針的位置
    // 這里釋放的是manager的內(nèi)存,將要釋放的內(nèi)存大小加至到可用內(nèi)存大小中,完整釋放
  // 注意實際上Memory是不存儲內(nèi)存的,通過參數(shù)來控制內(nèi)存的申請,最底層的內(nèi)存申請還是通過unsafe來做的
    void releaseMemory(@Nonnegative long size) {
        if (size == 0) {return; }
        boolean released = false;
        long currentAvailableMemorySize = 0L;
      // cas方式釋放
        while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size){
            released = availableMemorySize
              .compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size);
        }
        if (!released) {
            throw new IllegalStateException(String.format("Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",size, currentAvailableMemorySize, totalMemorySize));
        } 
      
     // MemoryUtils.java 
     // 這里是實際的釋放內(nèi)存,我們在MemoryManager申請內(nèi)存的時候,首先通過unsafe來申請一塊內(nèi)存即size,long address =    unsafe.allocateMemory(size)返回的是堆外的實際地址,通過申請的內(nèi)存來實例化一個ByteBuffer對象
     // 最終釋放的時候仍需要使用unsafe來做,直接通過unsafe.freeMemory(address)即可
     private static void releaseUnsafe(long address) {
        UNSAFE.freeMemory(address);
    }
rocksDB申請資源

MemoryManager更多的是用戶管理,來控制rocksDB的內(nèi)存使用,通過rocksDB block cache和writerBufferManager參數(shù)來限制,具體值通過TM內(nèi)存配置計算得來,最終還是有rocksDB自己來負責運行過程中內(nèi)存的申請和釋放,所以對于rocks真實的內(nèi)存使用,flink并不能完全的掌握,也就導(dǎo)致了flink 任務(wù)被yarn/k8s給kill掉,這里rocksDB的內(nèi)存申請是通過jni來申請,對于其申請的原理目前我也不是特別清楚,后面我查閱相關(guān)資料.研究一下

下面的代碼看看就行,后面是調(diào)入了rocksDB的代碼,因為我個人也不是很了解rocksDB,這里就不多做介紹,這里我們只需要了解MemoryManager是用于提供rocksDB內(nèi)存以及批處理相關(guān)的工作即可,這里不必太深究<我也不太懂,就不多說了>

   public <T extends AutoCloseable> OpaqueMemoryResource<T> getExternalSharedMemoryResource(
            String type, LongFunctionWithException<T, Exception> initializer, long numBytes)
            throws Exception {

        final Object leaseHolder = new Object();

        final SharedResources.ResourceAndSize<T> resource =
                sharedResources.getOrAllocateSharedResource(
                        type, leaseHolder, initializer, numBytes);

        final ThrowingRunnable<Exception> disposer =
                () -> sharedResources.release(type, leaseHolder);

        return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer);
    }

  public <T extends AutoCloseable> OpaqueMemoryResource<T> getSharedMemoryResourceForManagedMemory(
                    String type,
                    LongFunctionWithException<T, Exception> initializer,
                    double fractionToInitializeWith)
                    throws Exception {

        final long numBytes = computeMemorySize(fractionToInitializeWith);
        final LongFunctionWithException<T, Exception> reserveAndInitialize =
                (size) -> {
                    try {
                        reserveMemory(type, size);
                    } catch (MemoryReservationException e) {
                        throw new MemoryAllocationException("Could not created the shared memory resource of size "+ size + ". Not enough memory left to reserve from the slot's managed memory.",e);
                    }

                    try {
                        return initializer.apply(size);
                    } catch (Throwable t) {
                        releaseMemory(type, size);
                        throw t;
                    }
                };

        final Consumer<Long> releaser = (size) -> releaseMemory(type, size);
        final Object leaseHolder = new Object();

        final SharedResources.ResourceAndSize<T> resource =
                sharedResources.getOrAllocateSharedResource(type, leaseHolder, reserveAndInitialize, numBytes);

        final long size = resource.size();
        final ThrowingRunnable<Exception> disposer =() -> sharedResources.release(type, leaseHolder, releaser);
        return new OpaqueMemoryResource<>(resource.resourceHandle(), size, disposer);
    }


三.源碼分析

NetworkBufferPool

NetworkBufferPool是一個固定大小的MemorySegment實例吃,用于網(wǎng)絡(luò)棧中,NettyBufferPool會為每個ResultPartition創(chuàng)建屬于自己的LocalBufferPool,NettyBufferPool會作為全局的pool來提供內(nèi)存,LocalBufferPool會通過限制來控制自己內(nèi)存的申請,防止過多申請

// 代碼沒啥 就是提前申請了用于network的代碼,這塊代碼在啟動的時候進行申請
LocalBufferPool

LocalBufferPool繼承關(guān)系,實現(xiàn)了bufferRecycler的接口,用于回收自己持有的buffer

LocalBufferPool繼承圖

每個Task擁有一個自己的LocalBufferPool,在數(shù)據(jù)接收和數(shù)據(jù)發(fā)送的過程中,會向LocalBufferPool請求buffer,將數(shù)據(jù)存儲在buffer中的,如果LocalBufferPool持有的buffer用盡,則會想全局的nettyBufferPool請求buffer,為了防止單個Task導(dǎo)致整個TM的反壓,會限制每個LocalBufferPool請求全局BufferPool的數(shù)量

在數(shù)據(jù)接收的時候會將數(shù)據(jù)封裝成NettyBuffer,在數(shù)據(jù)發(fā)送的時候會通過BufferBilder向MemorySegment寫入數(shù)據(jù),然后通過BufferConsumer讀取MemorySegment的數(shù)據(jù)

class LocalBufferPool implements BufferPool {
    // 全局bufferPool,當localBufferPool的buffer用完之后會向全局bufferPool申請buffer
    private final NetworkBufferPool networkBufferPool;



    // 可用的memorySegment,memorySegment不會單獨提供使用,會被NetworkBuffer進行封裝后使用
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

    // 監(jiān)聽器,實際上就是bufferManager,在bufferManager內(nèi)存不夠用的時候會注冊監(jiān)聽器,然后當內(nèi)存可用的時候監(jiān)聽器會通知buffer可用
    // 然后將memorySegment封裝成nettyBuffer添加到bufferManager的浮動隊列中
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();

    // 所需要的最小memorySegemt的數(shù)量
    private final int numberOfRequiredMemorySegments;
    // 最大分配數(shù)量
    private final int maxNumberOfMemorySegments;
    // 當前pool的大小
    private int currentPoolSize;
    // 從全局bufferPool請求的數(shù)量
    private int numberOfRequestedMemorySegments;
        // 每個channel最大使用的buffer數(shù)量
    private final int maxBuffersPerChannel;
  
        // 每個結(jié)果子分區(qū)使用的buffer數(shù)量
    @GuardedBy("availableMemorySegments")
    private final int[] subpartitionBuffersCount;
        // 每個結(jié)果子分區(qū)的buffer回收器
    private final BufferRecycler[] subpartitionBufferRecyclers;

    LocalBufferPool(
            NetworkBufferPool networkBufferPool, 
            int numberOfRequiredMemorySegments,
            int maxNumberOfMemorySegments,
            int numberOfSubpartitions,
            int maxBuffersPerChannel) {
        this.networkBufferPool = networkBufferPool;
        this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
        this.currentPoolSize = numberOfRequiredMemorySegments;
        this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;

        this.subpartitionBuffersCount = new int[numberOfSubpartitions];
        subpartitionBufferRecyclers = new BufferRecycler[numberOfSubpartitions];
        for (int i = 0; i < subpartitionBufferRecyclers.length; i++) {
            subpartitionBufferRecyclers[i] = new SubpartitionBufferRecycler(i, this);
        }
        this.maxBuffersPerChannel = maxBuffersPerChannel;
            
        synchronized (this.availableMemorySegments) {
          // 檢查buffer的可用性,如果可用的buffer,如果buffer沒有超過當前pool的大小,則會想全局bufferPool申請一個buffer
            if (checkAvailability()) {
               // 可用性設(shè)置成可用,廢話文學(xué)
                availabilityHelper.resetAvailable();
            }
                    // 檢查可用性是否保持一致
            checkConsistentAvailability();
        }
    }

        // 請求buffer,在bufferManager中沒有可用buffer的時候會向LocalBufferPool請求浮動buffer
    public Buffer requestBuffer() {
      // 請求一個memorySegment并將其封裝到NettyBuffer
        return toBuffer(requestMemorySegment());
    }


  // 請求bufferBuilder,用于task處理完數(shù)據(jù)發(fā)送下游時候使用,主要用于將處理完的數(shù)據(jù)放入buffer中發(fā)送到下游
  // BufferWritingResultPartition中會請求bufferBuilder存放record
  // bufferBuilder前面介紹過,主要是用于將數(shù)據(jù)寫入到memorySegment中
    @Override
    public BufferBuilder requestBufferBuilder(int targetChannel) {
        return toBufferBuilder(requestMemorySegment(targetChannel), targetChannel);
    }

    // 阻塞的形式請求bufferBuilder,在上面請求bufferBuilder的時候沒有獲取到,則會通過這個方法進行阻塞的請求
    public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
            throws InterruptedException {
        return toBufferBuilder(requestMemorySegmentBlocking(targetChannel), targetChannel);
    }

    // 將memory封裝成buffer
    private Buffer toBuffer(MemorySegment memorySegment) {
        return new NetworkBuffer(memorySegment, this);
    }
    // 將memory封裝成bufferBuilder
    private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int targetChannel) {
        if (targetChannel == UNKNOWN_CHANNEL) {
            return new BufferBuilder(memorySegment, this);
        } else {
            return new BufferBuilder(memorySegment, subpartitionBufferRecyclers[targetChannel]);
        }
    }
        
    // 阻塞的請求memorySegment
    private MemorySegment requestMemorySegmentBlocking(int targetChannel)
            throws InterruptedException {
        MemorySegment segment;
        while ((segment = requestMemorySegment(targetChannel)) == null) {
            try {
                getAvailableFuture().get();
            } catch (ExecutionException e) {
                ExceptionUtils.rethrow(e);
            }
        }
        return segment;
    }

    // 請求memorySegment
    private MemorySegment requestMemorySegment(int targetChannel) {
        MemorySegment segment;
        synchronized (availableMemorySegments) {
            if (targetChannel != UNKNOWN_CHANNEL
                    && subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
                return null;
            }

            segment = availableMemorySegments.poll();
            if (segment == null) {
                return null;
            }

            if (targetChannel != UNKNOWN_CHANNEL) {
                if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {
                    unavailableSubpartitionsCount++;
                }
            }

            if (!checkAvailability()) {
                availabilityHelper.resetUnavailable();
            }
            checkConsistentAvailability();
        }
        return segment;
    }

    
        // 從全局bufferPool請求memorySegment
    private boolean requestMemorySegmentFromGlobal() {
        if (isRequestedSizeReached()) {
            return false;
        }

        // 如果請求成功則加入到可用的memorySegment隊列
        MemorySegment segment = networkBufferPool.requestMemorySegment();
        if (segment != null) {
            availableMemorySegments.add(segment);
          // 將請求memorySegment數(shù)+!
            numberOfRequestedMemorySegments++;
            return true;
        }
        return false;
    }
 
        // 檢查可用,順便請求一個memorySegment
    private boolean checkAvailability() {
        if (!availableMemorySegments.isEmpty()) {
            return unavailableSubpartitionsCount == 0;
        }
        if (!isRequestedSizeReached()) {
            // 全局bufferPool請求一個內(nèi)存段
            if (requestMemorySegmentFromGlobal()) {
                return unavailableSubpartitionsCount == 0;
            } else {
                requestMemorySegmentFromGlobalWhenAvailable();
                return shouldBeAvailable();
            }
        }
        return false;
    }

        // 釋放內(nèi)存段
    // 這里可以對應(yīng)上BufferManager
    @Override
    public void recycle(MemorySegment segment) {
        recycle(segment, UNKNOWN_CHANNEL);
    }
    // listener實際上就是memoryManager
    private void recycle(MemorySegment segment, int channel) {
        BufferListener listener;
        CompletableFuture<?> toNotify = null;
        NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
        while (!notificationResult.isBufferUsed()) {
            synchronized (availableMemorySegments) {
                if (channel != UNKNOWN_CHANNEL) {
                    if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {
                        unavailableSubpartitionsCount--;
                    }
                }
                                
                // 如果請求的buffer已經(jīng)大于了當前pool的大小,則將這個memorySegment返回給全局bufferPool
                if (isDestroyed || hasExcessBuffers()) {
                    returnMemorySegment(segment);
                    return;
                } else {
                    // 從lintener隊列中取一個listener,如果listener==null說明bufferManager不需要浮動內(nèi)存了
                    // 會直接將memorySegment加入當前pool的可用隊列
                    listener = registeredListeners.poll();
                    if (listener == null) {
                        availableMemorySegments.add(segment);
                        // only need to check unavailableSubpartitionsCount here because
                        // availableMemorySegments is not empty
                        if (!availabilityHelper.isApproximatelyAvailable()
                                && unavailableSubpartitionsCount == 0) {
                            toNotify = availabilityHelper.getUnavailableToResetAvailable();
                        }
                        break;
                    }
                }

                checkConsistentAvailability();
            }
            // 如果listener不等于null,通知通知給listener buffer可用,
            notificationResult = fireBufferAvailableNotification(listener, segment);
        }

        mayNotifyAvailable(toNotify);
    }

    private NotificationResult fireBufferAvailableNotification(
            BufferListener listener, MemorySegment segment) {
            
        // 調(diào)用到bufferManager的notifyBufferAvailable()方法,將buffer加入到bufferManager的浮動buffer中
        NotificationResult notificationResult =
                listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
        // 如果bufferManager還需要更多的buffer,則會將listener再次加入到listener隊列中,等待下次buffer被回收
        if (notificationResult.needsMoreBuffers()) {
            synchronized (availableMemorySegments) {
                if (isDestroyed) {
                    listener.notifyBufferDestroyed();
                } else {
                    registeredListeners.add(listener);
                }
            }
        }
        return notificationResult;
    }

        
  // bufferManager會調(diào)用,當bufferManager的buffer不夠用的時候會通過監(jiān)聽器等待localBufferPool回收buufer
    public boolean addBufferListener(BufferListener listener) {
        synchronized (availableMemorySegments) {
            if (!availableMemorySegments.isEmpty() || isDestroyed) {
                return false;
            }

            registeredListeners.add(listener);
            return true;
        }
    }

        // 將內(nèi)存段返回給全局bufferPool
    private void returnMemorySegment(MemorySegment segment) {
        assert Thread.holdsLock(availableMemorySegments);
                // 返回給pool后會將請求的內(nèi)存段數(shù)量-1
        numberOfRequestedMemorySegments--;
        networkBufferPool.recycle(segment);
    }
        
    // localBufferPool為每個結(jié)果子分區(qū)分配的內(nèi)存回收器,回收器會持有當前LocalBufferPool的引用,調(diào)用到當前pool的內(nèi)存回收方法
    // 用于結(jié)果子分區(qū)回收buffer,最終還是釋放給localBufferPool
    private static class SubpartitionBufferRecycler implements BufferRecycler {
        private int channel;
        private LocalBufferPool bufferPool;

        SubpartitionBufferRecycler(int channel, LocalBufferPool bufferPool) {
            this.channel = channel;
            this.bufferPool = bufferPool;
        }
        @Override
        public void recycle(MemorySegment memorySegment) {
            bufferPool.recycle(memorySegment, channel);
        }
    }
}
BufferManager

BufferManager主要用于為RemoteInputChannel提供buffer的,bufferManager在啟動的時候會向全局bufferPool請求自己的獨有buffer,當bufferManager的buffer不夠的時候,則會向localBufferPool請求buffer,此時請求的buffer為浮動buffer

實際上提供的buffer是給到netty的handler了,在netty接收到server響應(yīng)的消息后,會請求buffer解析message,將消息封裝到buffer中,在請求buffer的過程中,實際上就是向bufferManager請求buffer的過程

在后面我們會介紹到LocalBufferPool

public class BufferManager implements BufferListener, BufferRecycler {

    // 持有浮動buffer和獨占buffer的一個隊列
    private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();
    // 用于向全局bufferPool申請buffer,以及釋放buffer
    private final MemorySegmentProvider globalPool;
    //標記,是在等待浮動buffer
    @GuardedBy("bufferQueue") // 注解表示該變量被bufferQueue保護
    private boolean isWaitingForFloatingBuffers;
    //input channel所需的buffer總數(shù)
    @GuardedBy("bufferQueue")
    private int numRequiredBuffers;
  
    // 請求buffer
    // 該方法會在clinet端收到server端的數(shù)據(jù)后,會請求buffer封裝server端的response message
    // 將解析好的msg發(fā)送到下游,然后釋放掉netty的byteBuf
     Buffer requestBuffer() {
        synchronized (bufferQueue) {
            return bufferQueue.takeBuffer();
        }
    }
    
  // 這里是屬于bufferManager獨有buffer,單獨進行管理,是有全局bufferPool申請的,當remoteInputChannel啟動的時候在setup()方法中,一次性請求bufferManager所獨有的buffer
    void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
      // 從全局bufferPool中請求buffer
        Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
        synchronized (bufferQueue) {
            // 將請求到的buffer加入到bufferManager的獨有buffer隊列匯總
            for (MemorySegment segment : segments) {
                bufferQueue.addExclusiveBuffer(
                        new NetworkBuffer(segment, this), numRequiredBuffers);
            }
        }
    }
  
  // 這個方法有onSenderBacklog()方法進行調(diào)用,當收到server的消息后,會根據(jù)server的背壓數(shù)量來請求buffer
  // 如果請求到buffer,則將請求到的buffer數(shù)量通過credit的形式發(fā)送給server,這里涉及到了flink的背壓通信機制
  
  // 請求浮動buffer,buffer是從localBufferPool請求的,該buffer請求在被回收的時候會選擇性的返回,返回給               bufferManager/localBufferPool/全局bufferPool
   int requestFloatingBuffers(int numRequired) {
        int numRequestedBuffers = 0;
        synchronized (bufferQueue) {
            if (inputChannel.isReleased()) {
                return numRequestedBuffers;
            }
            numRequiredBuffers = numRequired;

            while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers
                    && !isWaitingForFloatingBuffers) {
              // 從localBufferPool請求buffer
                BufferPool bufferPool = inputChannel.inputGate.getBufferPool();
                Buffer buffer = bufferPool.requestBuffer();
                if (buffer != null) {
                    bufferQueue.addFloatingBuffer(buffer);
                    numRequestedBuffers++;
                } 
              // 如果localBufferPool沒有buffer則會將自己加入的localBufferPool的listener隊列中
              // 這里思考一下為什么要這么做,因為沒有buffer了,所以通過一個監(jiān)聽器來監(jiān)聽buffer什么時候被釋放
              // localBufferPool的buffer釋放之后,會判斷是否有l(wèi)istener,如果有l(wèi)istener則說明bufferManager需要buffer
              // 就會將回收之后的buffer加入到bufferManager的浮動buffer隊列
              // ↓↓↓  --- 下面的notifyBufferAvailable()中有介紹
              else if (bufferPool.addBufferListener(this)) {
                    isWaitingForFloatingBuffers = true;
                    break;
                }
            }
        }
     // 返回請求到的buffer數(shù)量
        return numRequestedBuffers;
    }
   
  
  // BufferManager的recycle方法,與LocalBufferPool實現(xiàn)不同
  // 該方法被調(diào)用都是bufferManager獨有的buffer使用完成了,所以會將獨有的buffer返回到自己管理的buffer隊列中,
  // 不會像LocalBufferPool通過一系列條件判斷來決定buffer返回給誰
    public void recycle(MemorySegment segment) {
        @Nullable Buffer releasedFloatingBuffer = null;
        synchronized (bufferQueue) {
            try {
                // Similar to notifyBufferAvailable(), make sure that we never add a buffer
                // after channel released all buffers via releaseAllResources().
                if (inputChannel.isReleased()) {
                    globalPool.recycleMemorySegments(Collections.singletonList(segment));
                    return;
                } else {
                  // 釋放多余的浮動buffer
                    releasedFloatingBuffer = bufferQueue.addExclusiveBuffer(
                        new NetworkBuffer(segment, this), numRequiredBuffers);
                }
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            } finally {
                bufferQueue.notifyAll();
            }
        }

        if (releasedFloatingBuffer != null) {
          // 如果有多余的浮動buffer,則釋放掉
            releasedFloatingBuffer.recycleBuffer();
        } else {
            try {
              // 有buffer釋放,就可以通知channel有buffer可用,就會向server反饋信用
                inputChannel.notifyBufferAvailable(1);
            } catch (Throwable t) {
                ExceptionUtils.rethrow(t);
            }
        }
    }

  // ------------------------------------------------------------------------------------
  
  // 這個方法比較有意思,也比較復(fù)雜,讓我們看一下方法名,通過翻譯方法得到 中文方法名 : 通知 buffer 可用
  // 提前說明 :  bufferPool和bufferManager都實現(xiàn)了BufferRecycler接口,實現(xiàn)了recycle方法,用于回收buffer,重新使用的
  /*
  實際上這個方法是有LocalBufferPool調(diào)用,在調(diào)用了recycle方法后會對memorySegment釋放,因為浮動buffer就是從localBuffer請求的,在創(chuàng)建(NettyBuffer=buffer)的時候需要一個BufferRecycler,在bufferManager創(chuàng)建buffer的時候傳入的是自己,而LocalBufferPool創(chuàng)建的時候傳入的是自己,所以,這里是由localBufferPool通過recycle方法調(diào)用,并且bufferManager的recycle的方法與localBufferPool實現(xiàn)不同,所以這里就會放入的浮動buffer的隊列中
  在這個方法被調(diào)用的時候會有判斷,判斷是否需要更多的buffer,如果需要更多的buffer,如果需要更多的buffer會將listener(實際上就是BufferManager,因為BufferManager實現(xiàn)了BufferListener)加入的listener隊列中(lintener會被循環(huán)利用),然后當buffer使用完成了,對buffer進行回收的時候,會選擇是根據(jù)一些條件來判斷是否返回到全局的bufferPool中或者返回到localBufferPool中
  返回全局bufferPool的條件是判斷LocalBufferPool請求的buffer數(shù)量已經(jīng)超過了LocalBufferPool的核心buffer數(shù)量,如果超過了則返回給全局bfferPool中
  返回LocalBufferPool的條件是,當不返回全局bufferPool則會判斷是否存在listener,如果沒有l(wèi)istener則將buffer返回給localBufferPool,這樣的原因是因為,在請求buffer的時候如果沒有可用buffer,就會添加listener到listener隊列中,當buffer用完之后就會根據(jù)listener是否存在決定是否還需要更多的buffer
  
 localBufferPool 后面在介紹
   */
  public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
       BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.BUFFER_NOT_USED;
        if (inputChannel.isReleased()) {
            return notificationResult;
        }

        try {
            synchronized (bufferQueue) {
                if (inputChannel.isReleased() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                    isWaitingForFloatingBuffers = false;
                    return notificationResult;
                }
                                // 將buffer方法放入浮動buffer隊列
                bufferQueue.addFloatingBuffer(buffer);
               // 喚醒在隊列等待的線程
                bufferQueue.notifyAll();
                                
                if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
                    isWaitingForFloatingBuffers = false;             
                    notificationResult = BufferListener.NotificationResult.BUFFER_USED_NO_NEED_MORE;
                } else {
                    notificationResult = BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
                }
            }
            
            // 實際上這個判斷永遠都會進入,因為同步代碼塊的內(nèi)容已經(jīng)保證了result一定不等于BUFFER_NOT_USED
            // 要注意調(diào)用這個方法說明有buffer調(diào)用了recycle,說明buffer釋放了,那么就可以被重新使用了
            if (notificationResult != NotificationResult.BUFFER_NOT_USED) {
               // 通過netty向server發(fā)送addCreditMessage,通知自己的信用
               // 在netty server端收到credit(信用)后會記錄對應(yīng)channel的信用
               // 當server向下游發(fā)送數(shù)據(jù)的時候,會根據(jù)下游的信用值來確定發(fā)送多少數(shù)據(jù)甚至不發(fā)送
               // 這樣就不會因為某一個task的反壓導(dǎo)致整個taskManger的反壓
                inputChannel.notifyBufferAvailable(1);
            }
        } catch (Throwable t) {
            inputChannel.setError(t);
        }
        return notificationResult;
    }
  
  // 靜態(tài)內(nèi)部類,用于維護可用buffer
  static final class AvailableBufferQueue {
    
        // 從localBufferPool申請的buffer,優(yōu)先使用
        final ArrayDeque<Buffer> floatingBuffers;
        // 從全局bufferPool申請的buffer,為channel獨有
        final ArrayDeque<Buffer> exclusiveBuffers;

        AvailableBufferQueue() {
            this.exclusiveBuffers = new ArrayDeque<>();
            this.floatingBuffers = new ArrayDeque<>();
        }


        // 從全局buffer pool申請的該channel獨占的buffer
        Buffer addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
            exclusiveBuffers.add(buffer);
            // 如果可用的buffer大于bufferManager的必須的buffer數(shù)量,則會釋放掉多余的浮動buffer
            if (getAvailableBufferSize() > numRequiredBuffers) {
                return floatingBuffers.poll();
            }
            return null;
        }
            
       // 申請的浮動buffer,可以認為是獨占的buffer用完了,開始申請臨時buffer
        void addFloatingBuffer(Buffer buffer) {
            floatingBuffers.add(buffer);
        }

       // 在返回的時候優(yōu)先返回浮動buffer
       // 為什么先請求浮動buffer呢,因為只有獨有buffer用完之后才會請求浮動buffer,如果浮動buffer
       // 有buffer,則一定說明了獨有buffer用完了,你們覺得呢
        Buffer takeBuffer() {
            if (floatingBuffers.size() > 0) {
                return floatingBuffers.poll();
            } else {
                return exclusiveBuffers.poll();
            }
        }
  }
}

四. tm和jm內(nèi)存分配代碼 或者 內(nèi)存分配源碼

了解即可,如果感興趣可以自行看代碼,等我想起來在補齊

public abstract class AbstractContainerizedClusterClientFactory<ClusterID>
        implements ClusterClientFactory<ClusterID> {

    @Override
    public ClusterSpecification getClusterSpecification(Configuration configuration) {
        checkNotNull(configuration);

        final int jobManagerMemoryMB =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                                configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
                        .getTotalProcessMemorySize()
                        .getMebiBytes();

        final int taskManagerMemoryMB =
                TaskExecutorProcessUtils.processSpecFromConfig(
                                TaskExecutorProcessUtils
                                        .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                                                configuration,
                                                TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                        .getTotalProcessMemorySize()
                        .getMebiBytes();

        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

        return new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(jobManagerMemoryMB)
                .setTaskManagerMemoryMB(taskManagerMemoryMB)
                .setSlotsPerTaskManager(slotsPerTaskManager)
                .createClusterSpecification();
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 背景 1)java對象的存儲密度比較低,對象主要包含 對象頭,對象數(shù)據(jù),對齊填充。 其中對齊填充是沒用的,純粹是為...
    LQC_gogogo閱讀 1,066評論 0 1
  • 在大數(shù)據(jù)面前,JVM的內(nèi)存結(jié)構(gòu)和GC機制往往會成為掣肘 1. 對象開銷:在HotSpot中,每個對象占用的內(nèi)存空間...
    aiguang2016閱讀 8,815評論 1 10
  • 如今,大數(shù)據(jù)領(lǐng)域的開源框架(Hadoop,Spark,Storm)都使用的 JVM,當然也包括 Flink?;?...
    尼小摩閱讀 1,782評論 0 16
  • Flink內(nèi)存管理 1.簡介 自從2003-2006年,Google發(fā)表了三篇著名的大數(shù)據(jù)相關(guān)論文(Google ...
    寇寇寇先森閱讀 1,380評論 0 0
  • TaskManager 的內(nèi)存布局 Flink 內(nèi)部并非直接將對象存儲在堆上,而是將對象序列化到一個個預(yù)先分配的 ...
    專職掏大糞閱讀 529評論 0 0

友情鏈接更多精彩內(nèi)容