TaskManager 的內(nèi)存布局
Flink 內(nèi)部并非直接將對(duì)象存儲(chǔ)在堆上,而是將對(duì)象序列化到一個(gè)個(gè)預(yù)先分配的 MemorySegment 中。MemorySegment 是一段固定長(zhǎng)度的內(nèi)存(默認(rèn)32KB),也是 Flink 中最小的內(nèi)存分配單元。MemorySegment 提供了高效的讀寫方法,它的底層可以是堆上的 byte[], 也可以是堆外(off-heap)ByteBuffer??梢园?MemorySegment 看作 Java NIO 中的 ByteBuffer,F(xiàn)link 還實(shí)現(xiàn)了 Java 的 java.io.DataOutput 和 java.io.DataInput 接口,分別是 AbstractPagedInputView 和 AbstractPagedOutputView, 可以通過一種邏輯視圖的方式來(lái)操作連續(xù)的多塊 MemorySegment。
在 Flink 中,TaskManager 負(fù)責(zé)任務(wù)的實(shí)際運(yùn)行,通常一個(gè) TaskManager 對(duì)應(yīng)一個(gè) JVM 進(jìn)程(非 MiniCluster 模式)。拋開 JVM 內(nèi)存模型,單從 TaskManager 內(nèi)存的主要使用方式來(lái)看,TaskManager 的內(nèi)存主要分為三個(gè)部分:
- Network Buffers:一定數(shù)量的 MemorySegment, 主要用于網(wǎng)絡(luò)傳輸。在 TaskManager 啟動(dòng)時(shí)分配, 通過 NetworkEnvironment 和 NetworkBufferPool 進(jìn)行管理
- Managed Memory:由 MemoryManager 管理的一組 MemorySegment 集合, 主要用于 Batch 模式下的 sorting, hashing, 和 cache 等。
- Remaining JVM heap:余下的堆內(nèi)存留給 TaskManager 的數(shù)據(jù)結(jié)構(gòu)以及用戶代碼處理數(shù)據(jù)時(shí)使用。TaskManager 自身的數(shù)據(jù)結(jié)構(gòu)并不會(huì)占用太多內(nèi)存,因而主要都是供用戶代碼使用,用戶代碼創(chuàng)建的對(duì)象通常生命周期都較短
需要注意的是,上面所說的三部分的內(nèi)存并非都是 JVM 堆上的內(nèi)存,因?yàn)?MemorySegment 底層的內(nèi)存可以在堆上,也可以在堆外(不由 JVM 管理)。對(duì)于 Network Buffers,這一部分內(nèi)存就是在堆外(off-heap)進(jìn)行分配的;對(duì)于 Managed Memory,這一部分內(nèi)存可以配置在堆上,也可以配置在堆外。另外還需要注意的一點(diǎn)是,Managed Memory 主要是在 Batch 模式下使用,在 Streaming 模式下這一部分內(nèi)存并不會(huì)預(yù)分配,因而空閑出來(lái)的內(nèi)存其實(shí)都是可以給用戶自定義函數(shù)使用的。
通過二進(jìn)制數(shù)據(jù)管理對(duì)象
我們已經(jīng)知道,F(xiàn)link 是通過 MemorySegment 來(lái)管理數(shù)據(jù)對(duì)象的,因而對(duì)象首先需要被序列化保存到 MemorySegment 中。 Flink 實(shí)現(xiàn)了一套自己的序列化框架。這主要是出于以下考慮:首先,比較和操作二進(jìn)制數(shù)據(jù)需要準(zhǔn)確了解序列化的布局,針對(duì)二進(jìn)制數(shù)據(jù)的操作來(lái)配置序列化的布局可以顯著提升性能;其次,對(duì)于 Flink 應(yīng)用而言,它所處理的數(shù)據(jù)對(duì)象類型通常是完全已知的,由于數(shù)據(jù)集對(duì)象的類型固定,對(duì)于數(shù)據(jù)集可以只保存一份對(duì)象 Schema 信息,可以進(jìn)一步節(jié)省存儲(chǔ)空間。
Flink 可以處理任意的 Java 或 Scala 對(duì)象,而不必實(shí)現(xiàn)特定的接口。對(duì)于 Java 實(shí)現(xiàn)的 Flink 程序,F(xiàn)link 會(huì)通過反射框架獲取用戶自定義函數(shù)返回的類型;而對(duì)于 Scala 實(shí)現(xiàn)的 Flink 程序,則通過 Scala Compiler 分析用戶自定義函數(shù)返回的類型。每一種數(shù)據(jù)類型都對(duì)應(yīng)一個(gè) TypeInfomation。
- BasicTypeInfo: 基本類型(裝箱的)或 String 類型
- BasicArrayTypeInfo: 基本類型數(shù)組(裝箱的)或 String 數(shù)組
- WritableTypeInfo: 任意 Hadoop Writable 接口的實(shí)現(xiàn)類
- TupleTypeInfo: 任意的 Flink Tuple 類型 (支持Tuple1 to Tuple25)
- CaseClassTypeInfo: 任意的 Scala CaseClass (包括 Scala tuples)
- PojoTypeInfo: 任意的 POJO (Java or Scala),Java對(duì)象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法
- GenericTypeInfo: 任意無(wú)法匹配之前幾種類型的類
通過 TypeInfomation 可以獲取到對(duì)應(yīng)數(shù)據(jù)類型的序列化器 TypeSerializer。對(duì)于 BasicTypeInfo,F(xiàn)link 提供了對(duì)應(yīng)的序列化器;對(duì)于 WritableTypeInfo, Flink 會(huì)將序列化和反序列化操作委托給 Hadoop Writable 接口的 write() and readFields();對(duì)于 GenericTypeInfo, Flink 默認(rèn)使用 “”Kyro 進(jìn)行序列化“”;而 TupleTypeInfo、CaseClassTypeInfo 和 PojoTypeInfo 是一種組合類型,序列化時(shí)分別委托給成員的序列化器進(jìn)行序列化即可。
對(duì)于可以用作 key 的數(shù)據(jù)類型,TypeInfomation 還可以生成 TypeComparator,用來(lái)直接在序列化后的二進(jìn)制數(shù)據(jù)上進(jìn)行 compare、hash 等操作。
Flink 的類型和序列化系統(tǒng)也可以方便地進(jìn)行擴(kuò)展,用戶可以提供自定義的序列化器和比較器,具體可以參考 Flink 官方提供的文檔 Data Types & Serialization。
在批處理的場(chǎng)景下,諸如 group, sort, 和 join 等操作都需要訪問大量的數(shù)據(jù)。借助于 MemorySegment 并直接操作二進(jìn)制數(shù)據(jù),F(xiàn)link 可以高效地完成這些操作,避免了頻繁地序列化/反序列化,并且這些操作是緩存友好的。具體可以參考 Flink 團(tuán)隊(duì)的文章Juggling with Bits and Bytes。
這種基于 MemorySegment 和二進(jìn)制數(shù)據(jù)直接管理數(shù)據(jù)對(duì)象的方式可以帶來(lái)如下好處:
- 保證內(nèi)存安全:由于分配的 MemorySegment 的數(shù)量是固定的,因而可以準(zhǔn)確地追蹤 MemorySegment 的使用情況。在 Batch 模式下,如果 MemorySegment 資源不足,會(huì)將一批 MemorySegment 寫入磁盤,需要時(shí)再重新讀取。這樣有效地減少了 OOM 的情況。
- 減少了 GC 的壓力:因?yàn)榉峙涞?MemorySegment 是長(zhǎng)生命周期的對(duì)象,數(shù)據(jù)都以二進(jìn)制形式存放,且 MemorySegment 可以回收重用,所以 MemorySegment 會(huì)一直保留在老年代不會(huì)被 GC;而由用戶代碼生成的對(duì)象基本都是短生命周期的,Minor GC 可以快速回收這部分對(duì)象,盡可能減少 Major GC 的頻率。此外,MemorySegment 還可以配置為使用堆外內(nèi)存,進(jìn)而避免 GC。
- 節(jié)省內(nèi)存空間:數(shù)據(jù)對(duì)象序列化后以二進(jìn)制形式保存在 MemorySegment 中,減少了對(duì)象存儲(chǔ)的開銷。
- 高效的二進(jìn)制操作和緩存友好的計(jì)算:可以直接基于二進(jìn)制數(shù)據(jù)進(jìn)行比較等操作,避免了反復(fù)進(jìn)行序列化于反序列;另外,二進(jìn)制形式可以把相關(guān)的值,以及 hash 值,鍵值和指針等相鄰地放進(jìn)內(nèi)存中,這使得數(shù)據(jù)結(jié)構(gòu)可以對(duì)高速緩存更友好。
MemorySegment
前面已經(jīng)介紹了,MemorySegment 是一段固定長(zhǎng)度的內(nèi)存,也是 Flink 中最小的內(nèi)存分配單元。在早期版本的實(shí)現(xiàn)中,MemorySegment 使用的都是堆上的內(nèi)存。盡管 Flink 的內(nèi)存管理機(jī)制已經(jīng)做了很多優(yōu)化,但是 Flink 團(tuán)隊(duì)仍然加入了對(duì)堆外內(nèi)存的支持。主要是考慮到以下幾個(gè)方面:
啟動(dòng)很大堆內(nèi)存(100s of GBytes heap memory)的 JVM 需要很長(zhǎng)時(shí)間,GC 停留時(shí)間也會(huì)很長(zhǎng)(分鐘級(jí))。使用堆外內(nèi)存的話,JVM 只需要分配較少的堆內(nèi)存(只需要分配 Remaining Heap 那一塊)。
堆外內(nèi)存在寫磁盤或網(wǎng)絡(luò)傳輸時(shí)是可以利用 zero-copy 特性,I/O 和網(wǎng)絡(luò)傳輸?shù)男矢摺?堆外內(nèi)存是進(jìn)程間共享的,也就是說,即使 JVM 進(jìn)程崩潰也不會(huì)丟失數(shù)據(jù)。這可以用來(lái)做故障恢復(fù)。Flink暫時(shí)沒有利用起這個(gè),不過未來(lái)有可能會(huì)利用這個(gè)特性。
但是使用堆外內(nèi)存同樣存在一些潛在的問題:
堆內(nèi)存可以很方便地進(jìn)行監(jiān)控和分析,相較而言堆外內(nèi)存則更加難以控制;
Flink 有時(shí)可能需要短生命周期的 MemorySegment,在堆上申請(qǐng)開銷會(huì)更??;
一些操作在堆內(nèi)存上會(huì)更快一些
Flink 將原來(lái)的 MemorySegment 變成了抽象類,并提供了兩個(gè)具體的子類:HeapMemorySegment 和 HybridMemorySegment。前者是用于分配堆內(nèi)存,后者用來(lái)分配堆外內(nèi)存和堆內(nèi)存的。
sun.misc包含了低級(jí)(native硬件級(jí)別的原子操作)、不安全的操作集合。
Java無(wú)法直接訪問到操作系統(tǒng)底層(如系統(tǒng)硬件等),為此Java使用native方法來(lái)擴(kuò)展Java程序的功能。Unsafe類提供了硬件級(jí)別的原子操作,提供了一些繞開JVM的更底層功能,由此提高效率
MemorySegment 的實(shí)現(xiàn):
public abstract class MemorySegment {
protected final byte[] heapMemory; //堆內(nèi)存引用
protected long address; //堆外內(nèi)存地址
//基于堆內(nèi)存創(chuàng)建MemorySegment
MemorySegment(byte[] buffer, Object owner) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
this.heapMemory = buffer;
this.address = BYTE_ARRAY_BASE_OFFSET;
this.size = buffer.length;
this.addressLimit = this.address + this.size;
this.owner = owner;
}
//基于堆外內(nèi)存創(chuàng)建MemorySegment
MemorySegment(long offHeapAddress, int size, Object owner) {
if (offHeapAddress <= 0) {
throw new IllegalArgumentException("negative pointer or size");
}
if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
// this is necessary to make sure the collapsed checks are safe against numeric overflows
throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress
+ " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
}
this.heapMemory = null;
this.address = offHeapAddress;
this.addressLimit = this.address + size;
this.size = size;
this.owner = owner;
}
public boolean isOffHeap() {
return heapMemory == null;
}
public final long getLong(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - 8) {
//這是能夠在一個(gè)實(shí)現(xiàn)中同時(shí)操作對(duì)內(nèi)存和堆外內(nèi)存的關(guān)鍵
return UNSAFE.getLong(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
//......
}
堆外方式的MemorySegment
public final class HybridMemorySegment extends MemorySegment {
private final ByteBuffer offHeapBuffer;
//堆外內(nèi)存初始化
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
//堆內(nèi)內(nèi)存初始化
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}
//......
}
堆內(nèi)方式的MemorySegment
public final class HeapMemorySegment extends MemorySegment {
private byte[] memory;
HeapMemorySegment(byte[] memory, Object owner) {
super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}
//......
}
MemorySegment 的管理
在 TaskManager 的內(nèi)存布局中我們說過,TaskManager 的內(nèi)存主要分為三個(gè)部分,其中 Network Buffers 和 Managed Memory 都是一組 MemorySegment 的集合。下面就分別介紹下這兩塊內(nèi)存是如何管理的。
Buffer 和 NetworkBufferPool
Buffer 接口是對(duì)池化的 MemorySegment 的包裝,帶有引用計(jì)數(shù),類似與 Netty 的 ByteBuf。Buffer也使用兩個(gè)指針分別表示寫入的位置和讀取的位置。Buffer 的具體實(shí)現(xiàn)實(shí)現(xiàn)類 NetworkBuffer 繼承自 Netty 的 AbstractReferenceCountedByteBuf,這使得它很容易地集成了引用計(jì)數(shù)和讀寫指針的功能。同時(shí),在非 Netty 場(chǎng)景下使用時(shí),Buffer 也提供了 java.nio.ByteBuffer 的包裝,但需要手動(dòng)設(shè)置讀寫指針的位置。ReadOnlySlicedNetworkBuffer 則提供了只讀模式的 buffer 的包裝。
BufferBuilder 和 BufferConsumer 構(gòu)成了寫入和消費(fèi) buffer 的通用模式:通過 BufferBuilder 向底層的 MemorySegment 寫入數(shù)據(jù),再通過 BufferConsumer 生成只讀的 Buffer,讀取 BufferBuilder 寫入的數(shù)據(jù)。這兩個(gè)類都不是線程安全的,但可以實(shí)現(xiàn)一個(gè)線程寫入,另一個(gè)線程讀取的效果。
BufferPool 接口繼承了 BufferProvider 和 BufferRecycler 接口,提供了申請(qǐng)以及回收 Buffer 的功能。LocalBufferPool 是 BufferPool 的具體實(shí)現(xiàn),LocalBufferPool 中 Buffer 的數(shù)量是可以動(dòng)態(tài)調(diào)整的。
BufferPoolFactory 接口是 BufferPool 的工廠,用于創(chuàng)建及銷毀 BufferPool。NetworkBufferPool 是 BufferPoolFactory 的具體實(shí)現(xiàn)類。所以按照 BufferPoolFactory -> BufferPool -> Buffer 這樣的結(jié)構(gòu)進(jìn)行組織。NetworkBufferPool 在初始化的時(shí)候創(chuàng)建一組 MemorySegment,這些 MemorySegment 會(huì)在所有的 LocalBufferPool 之間進(jìn)行均勻分配。
class NetworkBufferPool implements BufferPoolFactory {
//所有可用的MemorySegment,阻塞隊(duì)列
private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
this.memorySegmentSize = segmentSize;
final long sizeInLong = (long) segmentSize;
try {
this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {
throw new OutOfMemoryError("Could not allocate buffer queue of length "
+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
//NetworkBufferPool 使用的 MemorySegment 全是堆外內(nèi)存
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
}
}
catch (OutOfMemoryError err) {
......
}
.......
}
}
MemoryManager
MemoryManager 是管理 Managed Memory 的類,這部分主要是在 Batch 模式下使用,在 Streaming 模式下這一塊內(nèi)存不會(huì)分配。MemoryManager 主要通過內(nèi)部接口 MemoryPool 來(lái)管理所有的 MemorySegment。Managed Memory 和管理相比于 Network Buffers 的管理更為簡(jiǎn)單,因?yàn)椴恍枰?Buffer 的那一層封裝。直接來(lái)看下相關(guān)
public class MemoryManager {
//管理所有的 MemorySegment
private final MemoryPool memoryPool;
public MemoryManager(long memorySize, int numberOfSlots, int pageSize,
MemoryType memoryType, boolean preAllocateMemory) {
// sanity checks
......
this.memoryType = memoryType;
this.memorySize = memorySize;
this.numberOfSlots = numberOfSlots;
// assign page size and bit utilities
this.pageSize = pageSize;
this.roundingMask = ~((long) (pageSize - 1));
final long numPagesLong = memorySize / pageSize;
if (numPagesLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + memorySize
+ ") corresponds to more than MAX_INT pages.");
}
//所有可用的 MemorySegment 數(shù)量
this.totalNumPages = (int) numPagesLong;
if (this.totalNumPages < 1) {
throw new IllegalArgumentException("The given amount of memory amounted to less than one page.");
}
this.allocatedSegments = new HashMap<Object, Set<MemorySegment>>();
this.isPreAllocated = preAllocateMemory;
this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
//是否需要預(yù)分配內(nèi)存,Streaming 不會(huì)預(yù)分配
final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;
switch (memoryType) {
case HEAP:
//堆上
this.memoryPool = new HybridHeapMemoryPool(memToAllocate, pageSize);
break;
case OFF_HEAP:
//堆外
if (!preAllocateMemory) {
LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
" the memory type 'taskmanager.memory.off-heap' is set to true.");
}
this.memoryPool = new HybridOffHeapMemoryPool(memToAllocate, pageSize);
break;
default:
throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
}
}
abstract static class MemoryPool {
abstract int getNumberOfAvailableMemorySegments();
abstract MemorySegment allocateNewSegment(Object owner);
abstract MemorySegment requestSegmentFromPool(Object owner);
abstract void returnSegmentToPool(MemorySegment segment);
abstract void clear();
}
static final class HybridHeapMemoryPool extends MemoryPool {
private final ArrayDeque<byte[]> availableMemory;
HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
for (int i = 0; i < numInitialSegments; i++) {
//堆上直接使用byte數(shù)組
this.availableMemory.add(new byte[segmentSize]);
}
}
}
static final class HybridOffHeapMemoryPool extends MemoryPool {
private final ArrayDeque<ByteBuffer> availableMemory;
HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
this.availableMemory = new ArrayDeque<>(numInitialSegments);
this.segmentSize = segmentSize;
//堆外使用 DirectByteBuffer
for (int i = 0; i < numInitialSegments; i++) {
this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
}
}
}
}
轉(zhuǎn)自
https://blog.jrwang.me/2019/flink-source-code-memory-management/
http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/