內(nèi)存管理

TaskManager 的內(nèi)存布局

Flink 內(nèi)部并非直接將對(duì)象存儲(chǔ)在堆上,而是將對(duì)象序列化到一個(gè)個(gè)預(yù)先分配的 MemorySegment 中。MemorySegment 是一段固定長(zhǎng)度的內(nèi)存(默認(rèn)32KB),也是 Flink 中最小的內(nèi)存分配單元。MemorySegment 提供了高效的讀寫方法,它的底層可以是堆上的 byte[], 也可以是堆外(off-heap)ByteBuffer??梢园?MemorySegment 看作 Java NIO 中的 ByteBuffer,F(xiàn)link 還實(shí)現(xiàn)了 Java 的 java.io.DataOutput 和 java.io.DataInput 接口,分別是 AbstractPagedInputView 和 AbstractPagedOutputView, 可以通過一種邏輯視圖的方式來(lái)操作連續(xù)的多塊 MemorySegment。

在 Flink 中,TaskManager 負(fù)責(zé)任務(wù)的實(shí)際運(yùn)行,通常一個(gè) TaskManager 對(duì)應(yīng)一個(gè) JVM 進(jìn)程(非 MiniCluster 模式)。拋開 JVM 內(nèi)存模型,單從 TaskManager 內(nèi)存的主要使用方式來(lái)看,TaskManager 的內(nèi)存主要分為三個(gè)部分:

  • Network Buffers:一定數(shù)量的 MemorySegment, 主要用于網(wǎng)絡(luò)傳輸。在 TaskManager 啟動(dòng)時(shí)分配, 通過 NetworkEnvironment 和 NetworkBufferPool 進(jìn)行管理
  • Managed Memory:由 MemoryManager 管理的一組 MemorySegment 集合, 主要用于 Batch 模式下的 sorting, hashing, 和 cache 等。
  • Remaining JVM heap:余下的堆內(nèi)存留給 TaskManager 的數(shù)據(jù)結(jié)構(gòu)以及用戶代碼處理數(shù)據(jù)時(shí)使用。TaskManager 自身的數(shù)據(jù)結(jié)構(gòu)并不會(huì)占用太多內(nèi)存,因而主要都是供用戶代碼使用,用戶代碼創(chuàng)建的對(duì)象通常生命周期都較短

需要注意的是,上面所說的三部分的內(nèi)存并非都是 JVM 堆上的內(nèi)存,因?yàn)?MemorySegment 底層的內(nèi)存可以在堆上,也可以在堆外(不由 JVM 管理)。對(duì)于 Network Buffers,這一部分內(nèi)存就是在堆外(off-heap)進(jìn)行分配的;對(duì)于 Managed Memory,這一部分內(nèi)存可以配置在堆上,也可以配置在堆外。另外還需要注意的一點(diǎn)是,Managed Memory 主要是在 Batch 模式下使用,在 Streaming 模式下這一部分內(nèi)存并不會(huì)預(yù)分配,因而空閑出來(lái)的內(nèi)存其實(shí)都是可以給用戶自定義函數(shù)使用的。

通過二進(jìn)制數(shù)據(jù)管理對(duì)象

我們已經(jīng)知道,F(xiàn)link 是通過 MemorySegment 來(lái)管理數(shù)據(jù)對(duì)象的,因而對(duì)象首先需要被序列化保存到 MemorySegment 中。 Flink 實(shí)現(xiàn)了一套自己的序列化框架。這主要是出于以下考慮:首先,比較和操作二進(jìn)制數(shù)據(jù)需要準(zhǔn)確了解序列化的布局,針對(duì)二進(jìn)制數(shù)據(jù)的操作來(lái)配置序列化的布局可以顯著提升性能;其次,對(duì)于 Flink 應(yīng)用而言,它所處理的數(shù)據(jù)對(duì)象類型通常是完全已知的,由于數(shù)據(jù)集對(duì)象的類型固定,對(duì)于數(shù)據(jù)集可以只保存一份對(duì)象 Schema 信息,可以進(jìn)一步節(jié)省存儲(chǔ)空間。

Flink 可以處理任意的 Java 或 Scala 對(duì)象,而不必實(shí)現(xiàn)特定的接口。對(duì)于 Java 實(shí)現(xiàn)的 Flink 程序,F(xiàn)link 會(huì)通過反射框架獲取用戶自定義函數(shù)返回的類型;而對(duì)于 Scala 實(shí)現(xiàn)的 Flink 程序,則通過 Scala Compiler 分析用戶自定義函數(shù)返回的類型。每一種數(shù)據(jù)類型都對(duì)應(yīng)一個(gè) TypeInfomation。

  • BasicTypeInfo: 基本類型(裝箱的)或 String 類型
  • BasicArrayTypeInfo: 基本類型數(shù)組(裝箱的)或 String 數(shù)組
  • WritableTypeInfo: 任意 Hadoop Writable 接口的實(shí)現(xiàn)類
  • TupleTypeInfo: 任意的 Flink Tuple 類型 (支持Tuple1 to Tuple25)
  • CaseClassTypeInfo: 任意的 Scala CaseClass (包括 Scala tuples)
  • PojoTypeInfo: 任意的 POJO (Java or Scala),Java對(duì)象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法
  • GenericTypeInfo: 任意無(wú)法匹配之前幾種類型的類

通過 TypeInfomation 可以獲取到對(duì)應(yīng)數(shù)據(jù)類型的序列化器 TypeSerializer。對(duì)于 BasicTypeInfo,F(xiàn)link 提供了對(duì)應(yīng)的序列化器;對(duì)于 WritableTypeInfo, Flink 會(huì)將序列化和反序列化操作委托給 Hadoop Writable 接口的 write() and readFields();對(duì)于 GenericTypeInfo, Flink 默認(rèn)使用 “”Kyro 進(jìn)行序列化“”;而 TupleTypeInfo、CaseClassTypeInfo 和 PojoTypeInfo 是一種組合類型,序列化時(shí)分別委托給成員的序列化器進(jìn)行序列化即可。

對(duì)于可以用作 key 的數(shù)據(jù)類型,TypeInfomation 還可以生成 TypeComparator,用來(lái)直接在序列化后的二進(jìn)制數(shù)據(jù)上進(jìn)行 compare、hash 等操作

Flink 的類型和序列化系統(tǒng)也可以方便地進(jìn)行擴(kuò)展,用戶可以提供自定義的序列化器和比較器,具體可以參考 Flink 官方提供的文檔 Data Types & Serialization。

在批處理的場(chǎng)景下,諸如 group, sort, 和 join 等操作都需要訪問大量的數(shù)據(jù)。借助于 MemorySegment 并直接操作二進(jìn)制數(shù)據(jù),F(xiàn)link 可以高效地完成這些操作,避免了頻繁地序列化/反序列化,并且這些操作是緩存友好的。具體可以參考 Flink 團(tuán)隊(duì)的文章Juggling with Bits and Bytes。

這種基于 MemorySegment 和二進(jìn)制數(shù)據(jù)直接管理數(shù)據(jù)對(duì)象的方式可以帶來(lái)如下好處:

- 保證內(nèi)存安全:由于分配的 MemorySegment 的數(shù)量是固定的,因而可以準(zhǔn)確地追蹤 MemorySegment 的使用情況。在 Batch 模式下,如果 MemorySegment 資源不足,會(huì)將一批 MemorySegment 寫入磁盤,需要時(shí)再重新讀取。這樣有效地減少了 OOM 的情況。
  • 減少了 GC 的壓力:因?yàn)榉峙涞?MemorySegment 是長(zhǎng)生命周期的對(duì)象,數(shù)據(jù)都以二進(jìn)制形式存放,且 MemorySegment 可以回收重用,所以 MemorySegment 會(huì)一直保留在老年代不會(huì)被 GC;而由用戶代碼生成的對(duì)象基本都是短生命周期的,Minor GC 可以快速回收這部分對(duì)象,盡可能減少 Major GC 的頻率。此外,MemorySegment 還可以配置為使用堆外內(nèi)存,進(jìn)而避免 GC。
  • 節(jié)省內(nèi)存空間:數(shù)據(jù)對(duì)象序列化后以二進(jìn)制形式保存在 MemorySegment 中,減少了對(duì)象存儲(chǔ)的開銷。
  • 高效的二進(jìn)制操作和緩存友好的計(jì)算:可以直接基于二進(jìn)制數(shù)據(jù)進(jìn)行比較等操作,避免了反復(fù)進(jìn)行序列化于反序列;另外,二進(jìn)制形式可以把相關(guān)的值,以及 hash 值,鍵值和指針等相鄰地放進(jìn)內(nèi)存中,這使得數(shù)據(jù)結(jié)構(gòu)可以對(duì)高速緩存更友好。

MemorySegment

前面已經(jīng)介紹了,MemorySegment 是一段固定長(zhǎng)度的內(nèi)存,也是 Flink 中最小的內(nèi)存分配單元。在早期版本的實(shí)現(xiàn)中,MemorySegment 使用的都是堆上的內(nèi)存。盡管 Flink 的內(nèi)存管理機(jī)制已經(jīng)做了很多優(yōu)化,但是 Flink 團(tuán)隊(duì)仍然加入了對(duì)堆外內(nèi)存的支持。主要是考慮到以下幾個(gè)方面:

啟動(dòng)很大堆內(nèi)存(100s of GBytes heap memory)的 JVM 需要很長(zhǎng)時(shí)間,GC 停留時(shí)間也會(huì)很長(zhǎng)(分鐘級(jí))。使用堆外內(nèi)存的話,JVM 只需要分配較少的堆內(nèi)存(只需要分配 Remaining Heap 那一塊)。
堆外內(nèi)存在寫磁盤或網(wǎng)絡(luò)傳輸時(shí)是可以利用 zero-copy 特性,I/O 和網(wǎng)絡(luò)傳輸?shù)男矢摺?堆外內(nèi)存是進(jìn)程間共享的,也就是說,即使 JVM 進(jìn)程崩潰也不會(huì)丟失數(shù)據(jù)。這可以用來(lái)做故障恢復(fù)。Flink暫時(shí)沒有利用起這個(gè),不過未來(lái)有可能會(huì)利用這個(gè)特性。

但是使用堆外內(nèi)存同樣存在一些潛在的問題:

堆內(nèi)存可以很方便地進(jìn)行監(jiān)控和分析,相較而言堆外內(nèi)存則更加難以控制;
Flink 有時(shí)可能需要短生命周期的 MemorySegment,在堆上申請(qǐng)開銷會(huì)更??;
一些操作在堆內(nèi)存上會(huì)更快一些

Flink 將原來(lái)的 MemorySegment 變成了抽象類,并提供了兩個(gè)具體的子類:HeapMemorySegment 和 HybridMemorySegment。前者是用于分配堆內(nèi)存,后者用來(lái)分配堆外內(nèi)存和堆內(nèi)存的。

sun.misc包含了低級(jí)(native硬件級(jí)別的原子操作)、不安全的操作集合。
Java無(wú)法直接訪問到操作系統(tǒng)底層(如系統(tǒng)硬件等),為此Java使用native方法來(lái)擴(kuò)展Java程序的功能。Unsafe類提供了硬件級(jí)別的原子操作,提供了一些繞開JVM的更底層功能,由此提高效率

MemorySegment 的實(shí)現(xiàn):

public abstract class MemorySegment {
    protected final byte[] heapMemory; //堆內(nèi)存引用
    protected long address; //堆外內(nèi)存地址

    //基于堆內(nèi)存創(chuàng)建MemorySegment
    MemorySegment(byte[] buffer, Object owner) {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }
        this.heapMemory = buffer;
        this.address = BYTE_ARRAY_BASE_OFFSET;
        this.size = buffer.length;
        this.addressLimit = this.address + this.size;
        this.owner = owner;
    }

    //基于堆外內(nèi)存創(chuàng)建MemorySegment
    MemorySegment(long offHeapAddress, int size, Object owner) {
        if (offHeapAddress <= 0) {
            throw new IllegalArgumentException("negative pointer or size");
        }
        if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
            // this is necessary to make sure the collapsed checks are safe against numeric overflows
            throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress
                    + " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
        }
        this.heapMemory = null;
        this.address = offHeapAddress;
        this.addressLimit = this.address + size;
        this.size = size;
        this.owner = owner;
    }

    public boolean isOffHeap() {
        return heapMemory == null;
    }

    public final long getLong(int index) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 8) {
            //這是能夠在一個(gè)實(shí)現(xiàn)中同時(shí)操作對(duì)內(nèi)存和堆外內(nèi)存的關(guān)鍵
            return UNSAFE.getLong(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    //......
}

堆外方式的MemorySegment

public final class HybridMemorySegment extends MemorySegment {
    private final ByteBuffer offHeapBuffer;

    //堆外內(nèi)存初始化
    HybridMemorySegment(ByteBuffer buffer, Object owner) {
        super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
        this.offHeapBuffer = buffer;
    }

    //堆內(nèi)內(nèi)存初始化
    HybridMemorySegment(byte[] buffer, Object owner) {
        super(buffer, owner);
        this.offHeapBuffer = null;
    }

    //......
}

堆內(nèi)方式的MemorySegment

public final class HeapMemorySegment extends MemorySegment {
    private byte[] memory;

    HeapMemorySegment(byte[] memory, Object owner) {
        super(Objects.requireNonNull(memory), owner);
        this.memory = memory;
    }

    //......
}

MemorySegment 的管理

在 TaskManager 的內(nèi)存布局中我們說過,TaskManager 的內(nèi)存主要分為三個(gè)部分,其中 Network Buffers 和 Managed Memory 都是一組 MemorySegment 的集合。下面就分別介紹下這兩塊內(nèi)存是如何管理的。
Buffer 和 NetworkBufferPool

Buffer 接口是對(duì)池化的 MemorySegment 的包裝,帶有引用計(jì)數(shù),類似與 Netty 的 ByteBuf。Buffer也使用兩個(gè)指針分別表示寫入的位置和讀取的位置。Buffer 的具體實(shí)現(xiàn)實(shí)現(xiàn)類 NetworkBuffer 繼承自 Netty 的 AbstractReferenceCountedByteBuf,這使得它很容易地集成了引用計(jì)數(shù)和讀寫指針的功能。同時(shí),在非 Netty 場(chǎng)景下使用時(shí),Buffer 也提供了 java.nio.ByteBuffer 的包裝,但需要手動(dòng)設(shè)置讀寫指針的位置。ReadOnlySlicedNetworkBuffer 則提供了只讀模式的 buffer 的包裝。

BufferBuilder 和 BufferConsumer 構(gòu)成了寫入和消費(fèi) buffer 的通用模式:通過 BufferBuilder 向底層的 MemorySegment 寫入數(shù)據(jù),再通過 BufferConsumer 生成只讀的 Buffer,讀取 BufferBuilder 寫入的數(shù)據(jù)。這兩個(gè)類都不是線程安全的,但可以實(shí)現(xiàn)一個(gè)線程寫入,另一個(gè)線程讀取的效果。

BufferPool 接口繼承了 BufferProvider 和 BufferRecycler 接口,提供了申請(qǐng)以及回收 Buffer 的功能。LocalBufferPool 是 BufferPool 的具體實(shí)現(xiàn),LocalBufferPool 中 Buffer 的數(shù)量是可以動(dòng)態(tài)調(diào)整的。

BufferPoolFactory 接口是 BufferPool 的工廠,用于創(chuàng)建及銷毀 BufferPool。NetworkBufferPool 是 BufferPoolFactory 的具體實(shí)現(xiàn)類。所以按照 BufferPoolFactory -> BufferPool -> Buffer 這樣的結(jié)構(gòu)進(jìn)行組織。NetworkBufferPool 在初始化的時(shí)候創(chuàng)建一組 MemorySegment,這些 MemorySegment 會(huì)在所有的 LocalBufferPool 之間進(jìn)行均勻分配。

class NetworkBufferPool implements BufferPoolFactory {
    //所有可用的MemorySegment,阻塞隊(duì)列
    private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;

    public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
        this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
        this.memorySegmentSize = segmentSize;
        final long sizeInLong = (long) segmentSize;

        try {
            this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
        }
        catch (OutOfMemoryError err) {
            throw new OutOfMemoryError("Could not allocate buffer queue of length "
                    + numberOfSegmentsToAllocate + " - " + err.getMessage());
        }
        try {
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                //NetworkBufferPool 使用的 MemorySegment 全是堆外內(nèi)存
                availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
            }
        }
        catch (OutOfMemoryError err) {
            ......
        }
        .......
    }
}

MemoryManager

MemoryManager 是管理 Managed Memory 的類,這部分主要是在 Batch 模式下使用,在 Streaming 模式下這一塊內(nèi)存不會(huì)分配。MemoryManager 主要通過內(nèi)部接口 MemoryPool 來(lái)管理所有的 MemorySegment。Managed Memory 和管理相比于 Network Buffers 的管理更為簡(jiǎn)單,因?yàn)椴恍枰?Buffer 的那一層封裝。直接來(lái)看下相關(guān)

public class MemoryManager {
    //管理所有的 MemorySegment
    private final MemoryPool memoryPool;


    public MemoryManager(long memorySize, int numberOfSlots, int pageSize,
                            MemoryType memoryType, boolean preAllocateMemory) {
        // sanity checks
        ......
        this.memoryType = memoryType;
        this.memorySize = memorySize;
        this.numberOfSlots = numberOfSlots;

        // assign page size and bit utilities
        this.pageSize = pageSize;
        this.roundingMask = ~((long) (pageSize - 1));

        final long numPagesLong = memorySize / pageSize;
        if (numPagesLong > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("The given number of memory bytes (" + memorySize
                    + ") corresponds to more than MAX_INT pages.");
        }
        //所有可用的 MemorySegment 數(shù)量
        this.totalNumPages = (int) numPagesLong;
        if (this.totalNumPages < 1) {
            throw new IllegalArgumentException("The given amount of memory amounted to less than one page.");
        }

        this.allocatedSegments = new HashMap<Object, Set<MemorySegment>>();
        this.isPreAllocated = preAllocateMemory;

        this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
        //是否需要預(yù)分配內(nèi)存,Streaming 不會(huì)預(yù)分配
        final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;

        switch (memoryType) {
            case HEAP:
            //堆上
                this.memoryPool = new HybridHeapMemoryPool(memToAllocate, pageSize);
                break;
            case OFF_HEAP:
            //堆外
                if (!preAllocateMemory) {
                    LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
                        " the memory type 'taskmanager.memory.off-heap' is set to true.");
                }
                this.memoryPool = new HybridOffHeapMemoryPool(memToAllocate, pageSize);
                break;
            default:
                throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
        }
    }

    abstract static class MemoryPool {
        abstract int getNumberOfAvailableMemorySegments();
        abstract MemorySegment allocateNewSegment(Object owner);
        abstract MemorySegment requestSegmentFromPool(Object owner);
        abstract void returnSegmentToPool(MemorySegment segment);
        abstract void clear();
    }

    static final class HybridHeapMemoryPool extends MemoryPool {
        private final ArrayDeque<byte[]> availableMemory;

        HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;
            for (int i = 0; i < numInitialSegments; i++) {
                //堆上直接使用byte數(shù)組
                this.availableMemory.add(new byte[segmentSize]);
            }
        }
    }

    static final class HybridOffHeapMemoryPool extends MemoryPool {
        private final ArrayDeque<ByteBuffer> availableMemory;

        HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;
            //堆外使用 DirectByteBuffer
            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }
    }
}

轉(zhuǎn)自
https://blog.jrwang.me/2019/flink-source-code-memory-management/
http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容