從上篇中可以看到了 MemorySegmentPool,MemoryPoolFactory, 各種 buffer ,概念比較多再來(lái)重新梳理下整個(gè) writer 的構(gòu)建過(guò)程同時(shí)也關(guān)注下 MemorySegmentPool, buffer, 看看 MemorySegmentPool,MemoryPoolFactory 是算子級(jí)別( subtask 的所有 writer 共用 MemorySegmentPool)的 還是 writer 級(jí)別(每個(gè) write 獨(dú)享一個(gè) MemorySegmentPool)的
RowDataStoreWriteOperator 有成員變量 MemorySegmentPool memoryPool 、StoreSinkWrite write
1.1 memoryPool 在 setup 方法時(shí) 根據(jù)是否 配置了 sink.use-managed-memory-allocator 會(huì)創(chuàng)建 FlinkMemorySegmentPool, 如果沒(méi)有配置則為空,所以默認(rèn)是空的, MemorySegmentPool 主要有兩個(gè)實(shí)現(xiàn)一個(gè)是基于managed memory 的 FlinkMemorySegmentPool 一個(gè)是基于內(nèi)存的 HeapMemorySegmentPool
1.2 initializeState 時(shí)創(chuàng)建了 StoreSinkWrite 創(chuàng)建 StoreSinkWrite 會(huì)把 memoryPool 傳入進(jìn)去。StoreSinkWrite 會(huì)根據(jù)是否要合并分為 StoreSinkWriteImpl 和 GlobalFullCompactionSinkWrite 兩種實(shí)現(xiàn), 同時(shí) GlobalFullCompactionSinkWrite 是繼承自 StoreSinkWriteImplStoreSinkWriteImpl 有成員變量 MemorySegmentPool memoryPool、MemoryPoolFactory memoryPoolFactory、TableWriteImpl<?> write
2.1 memoryPool 從 RowDataStoreWriteOperator 傳下來(lái)的可能為空 對(duì)于 配置了 sink.use-managed-memory-allocator 會(huì)有值
2.2 memoryPoolFactory 是空
2.3 write 是 StoreSinkWriteImpl 構(gòu)造函數(shù)里面創(chuàng)建的最終是通過(guò) FileStoreTable 的 newWrite 方法創(chuàng)建 TableWriteImpl,創(chuàng)建完 TableWriteImpl 后會(huì)緊接著通過(guò) withMemoryPool 設(shè)置 memoryPool,對(duì)于 memoryPool 為空的場(chǎng)景則會(huì)初始化一個(gè) HeapMemorySegmentPool, 到此 memoryPool 就都有值了
2.4 創(chuàng)建完 TableWriteImpl 后會(huì)緊接著通過(guò) withMemoryPoolFactory 設(shè)置 memoryPoolFactory, 因?yàn)?memoryPoolFactory 是空的所以忽略TableWriteImpl 有屬性 FileStoreWrite<T> write
3.1 他是在構(gòu)建 TableWriteImpl 時(shí)創(chuàng)建的,對(duì)于主鍵表創(chuàng)建的是 KeyValueFileStoreWrite
3.2 在 TableWriteImpl 調(diào)用 withMemoryPool 設(shè)置 memoryPool 實(shí)際還是調(diào)用的 KeyValueFileStoreWrite withMemoryPool 這樣 memoryPool 不為空了也傳導(dǎo)到了 KeyValueFileStoreWrite
3.3 KeyValueFileStoreWrite 執(zhí)行 withMemoryPool 時(shí)其實(shí)是 new MemoryPoolFactory(memoryPool) 然后調(diào)用
withMemoryPoolFactory 把這個(gè) MemoryPoolFactory 賦值給 KeyValueFileStoreWrite 的 writeBufferPool 屬性
3.4 這樣 KeyValueFileStoreWrite 就有 MemoryPoolFactory, MemoryPoolFactory 里面放的是 memoryPool
此時(shí) MemoryPoolFactory 里面的 memoryPool 叫做 innerPool
3.5 MemoryPoolFactory 里面一個(gè) owners 其實(shí)對(duì)應(yīng)的 KeyValueFileStoreWrite 的 writers,同一個(gè) subTask 針對(duì)不同的 分區(qū)和 bucket 都會(huì)創(chuàng)建一個(gè) write 所以 KeyValueFileStoreWrite 里面有多個(gè) writer
3.6 所以到這里大概可以猜測(cè)出 KeyValueFileStoreWrite 是一個(gè) subTask 一個(gè), MemoryPoolFactory 也是一個(gè),又因?yàn)?MemoryPoolFactory 里面只有一個(gè) innerPool 看上去也是 subTask 一個(gè) innerPoolKeyValueFileStoreWrite 有屬性 MemoryPoolFactory writeBufferPool; 有一個(gè) createWriter 方法
4.1 writeBufferPool 來(lái)自于 KeyValueFileStoreWrite 新建的 MemoryPoolFactory
4.2 createWriter 方法針對(duì)每個(gè)分區(qū)和 bucket 創(chuàng)建一個(gè) MergeTreeWriter, 創(chuàng)建完 MergeTreeWriter 之后會(huì)給這個(gè) MergeTreeWriter 創(chuàng)建一個(gè) OwnerMemoryPool, OwnerMemoryPool 和 MergeTreeWriter 是 一一 對(duì)應(yīng)的,但是操作的還是 MemoryPoolFactory 的 innerPool。之后還會(huì)初始化 MergeTreeWriter 的 writeBuffer
此時(shí) writeBuffer 是 new SortBufferWriteBuffer , 構(gòu)造 SortBufferWriteBuffer 會(huì)把 OwnerMemoryPool 傳進(jìn)去MergeTreeWriter 有屬性 WriteBuffer writeBuffer
5.1 在 KeyValueFileStoreWrite 創(chuàng)建完成 MergeTreeWriter 后會(huì)創(chuàng)建一個(gè) OwnerMemoryPool, writeBuffer 里面會(huì)與這個(gè) OwnerMemoryPool
5.2 writeBuffer 是 SortBufferWriteBufferSortBufferWriteBuffer 有屬性 SortBuffer buffer
6.1 SortBuffer 根據(jù)是否 spill 會(huì)生成 BinaryExternalSortBuffer 或者 BinaryInMemorySortBuffer
6.2 SortBuffer 里面會(huì)用到 memoryPool 也就是 OwnerMemoryPool
畫(huà)個(gè)圖總結(jié)如下
