小白解讀 Apache Flink 核心技術(shù) - 學(xué)習(xí)筆記

背景知識(shí)

低延遲 vs 高吞吐

流處理系統(tǒng)與批處理系統(tǒng)最大不同在于節(jié)點(diǎn)間的數(shù)據(jù)傳輸方式。

對(duì)于一個(gè)流處理系統(tǒng),當(dāng)一條數(shù)據(jù)被處理完成后,序列化到緩存中,然后立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn),由下一個(gè)節(jié)點(diǎn)繼續(xù)處理。而對(duì)于一個(gè)批處理系統(tǒng),其節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)臉?biāo)準(zhǔn)模型是:當(dāng)一條數(shù)據(jù)被處理完成后,序列化到緩存中,并不會(huì)立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn),當(dāng)緩存寫滿,就持久化到本地硬盤上,當(dāng)所有數(shù)據(jù)都被處理完成后,才開始將處理后的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn)。

這兩種數(shù)據(jù)傳輸模式是兩個(gè)極端,對(duì)應(yīng)的是流處理系統(tǒng)對(duì)低延遲的要求和批處理系統(tǒng)對(duì)高吞吐量的要求。?

Flink以固定的緩存塊為單位進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)傳輸,用戶可以通過緩存塊超時(shí)值指定緩存塊的傳輸時(shí)機(jī)。如果緩存塊的超時(shí)值為0,則Flink的數(shù)據(jù)傳輸方式類似上文所提到流處理系統(tǒng)的標(biāo)準(zhǔn)模型,此時(shí)系統(tǒng)可以獲得最低的處理延遲。如果緩存塊的超時(shí)值為無限大,則Flink的數(shù)據(jù)傳輸方式類似上文所提到批處理系統(tǒng)的標(biāo)準(zhǔn)模型,此時(shí)系統(tǒng)可以獲得最高的吞吐量。同時(shí)緩存塊的超時(shí)值也可以設(shè)置為0到無限大之間的任意值。緩存塊的超時(shí)閾值越小,則Flink流處理執(zhí)行引擎的數(shù)據(jù)處理延遲越低,但吞吐量也會(huì)降低,反之亦然。通過調(diào)整緩存塊的超時(shí)閾值,用戶可根據(jù)需求靈活地權(quán)衡系統(tǒng)延遲和吞吐量

Stream & Transformation & Operator

用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個(gè)基本構(gòu)建塊組成,其中Stream是一個(gè)中間結(jié)果數(shù)據(jù),而Transformation是一個(gè)操作,它對(duì)一個(gè)或多個(gè)輸入Stream進(jìn)行計(jì)算處理,輸出一個(gè)或多個(gè)結(jié)果Stream。當(dāng)一個(gè)Flink程序被執(zhí)行的時(shí)候,它會(huì)被映射為Streaming Dataflow。一個(gè)Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似于一個(gè)DAG圖,在啟動(dòng)的時(shí)候從一個(gè)或多個(gè)Source Operator開始,結(jié)束于一個(gè)或多個(gè)Sink Operator。

下面是一個(gè)由Flink程序映射為Streaming Dataflow的示意圖:

Parallel Dataflow

在Flink中,程序天生是并行和分布式的:一個(gè)Stream可以被分成多個(gè)Stream分區(qū)(Stream Partitions),一個(gè)Operator可以被分成多個(gè)Operator Subtask,每一個(gè)Operator Subtask是在不同的線程中獨(dú)立執(zhí)行的。一個(gè)Operator的并行度,等于Operator Subtask的個(gè)數(shù),一個(gè)Stream的并行度總是等于生成它的Operator的并行度。

在Flink分布式執(zhí)行環(huán)境中,會(huì)將多個(gè)Operator Subtask串起來組成一個(gè)Operator Chain,實(shí)際上就是一個(gè)執(zhí)行鏈,每個(gè)執(zhí)行鏈會(huì)在TaskManager上一個(gè)獨(dú)立的線程中執(zhí)行

組件棧

Flink是一個(gè)分層架構(gòu)的系統(tǒng),每一層所包含的組件都提供了特定的抽象,用來服務(wù)于上層組件。Flink分層的組件棧如下圖所示:

Flink on YARN的部署模式?

Flink YARN Client負(fù)責(zé)與YARN RM(Resouce manager)通信協(xié)商資源請(qǐng)求,F(xiàn)link JobManager和Flink TaskManager分別申請(qǐng)到Container去運(yùn)行各自的進(jìn)程。通過上圖可以看到,YARN AM(application master)與Flink JobManager在同一個(gè)Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請(qǐng)Container去啟動(dòng)Flink TaskManager。待Flink成功運(yùn)行在YARN集群上,F(xiàn)link YARN Client就可以提交Flink Job到Flink JobManager,并進(jìn)行后續(xù)的映射、調(diào)度和計(jì)算處理?

Runtime層: 提供了支持Flink計(jì)算的全部核心實(shí)現(xiàn),比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調(diào)度等等,為上層API層提供基礎(chǔ)服務(wù)。

API層: 主要實(shí)現(xiàn)了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對(duì)應(yīng)DataStream API,面向批處理對(duì)應(yīng)DataSet API。

Libraries層: 該層也可以稱為Flink應(yīng)用框架層,根據(jù)API層的劃分,在API層之上構(gòu)建的滿足特定應(yīng)用的實(shí)現(xiàn)計(jì)算框架,也分別對(duì)應(yīng)于面向流處理和面向批處理兩類。面向流處理支持:CEP(復(fù)雜事件處理)、基于SQL-like的操作(基于Table的關(guān)系操作);面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)、Gelly(圖處理)。

基本架構(gòu)

Flink系統(tǒng)的架構(gòu)與Spark類似,是一個(gè)基于Master-Slave風(fēng)格的架構(gòu),如下圖所示:

Flink集群啟動(dòng)時(shí),會(huì)啟動(dòng)一個(gè)JobManager進(jìn)程、至少一個(gè)TaskManager進(jìn)程。在Local模式下,會(huì)在同一個(gè)JVM內(nèi)部啟動(dòng)一個(gè)JobManager進(jìn)程和TaskManager進(jìn)程。當(dāng)Flink程序提交后,會(huì)創(chuàng)建一個(gè)Client來進(jìn)行預(yù)處理,并轉(zhuǎn)換為一個(gè)并行數(shù)據(jù)流,這是對(duì)應(yīng)著一個(gè)Flink Job,從而可以被JobManager和TaskManager執(zhí)行。在實(shí)現(xiàn)上,F(xiàn)link基于Actor實(shí)現(xiàn)了JobManager和TaskManager,所以JobManager與TaskManager之間的信息交換,都是通過事件的方式來進(jìn)行處理。


如上圖所示,F(xiàn)link系統(tǒng)主要包含如下3個(gè)主要的進(jìn)程:

? ??JobManager:?系統(tǒng)的協(xié)調(diào)者,它負(fù)責(zé)接收Flink Job,調(diào)度組成Job的多個(gè)Task的執(zhí)行。同時(shí),JobManager還負(fù)責(zé)收集Job的狀態(tài)信息,并管理Flink集群中從節(jié)點(diǎn)TaskManager

? ??TaskManager:?是一個(gè)Actor,它是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,在其上執(zhí)行Flink Job的一組Task。每個(gè)TaskManager負(fù)責(zé)管理其所在節(jié)點(diǎn)上的資源信息,如內(nèi)存、磁盤、網(wǎng)絡(luò),在啟動(dòng)的時(shí)候?qū)①Y源的狀態(tài)向JobManager匯報(bào)。TaskManager端可以分成兩個(gè)階段:

????????注冊(cè)階段: TaskManager會(huì)向JobManager注冊(cè),發(fā)送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以進(jìn)行初始化過程。

????????可操作階段: 該階段TaskManager可以接收并處理與Task有關(guān)的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯(lián)系,會(huì)自動(dòng)進(jìn)入“注冊(cè)階段”,只有完成注冊(cè)才能繼續(xù)處理Task相關(guān)的消息。

? ??????Client?當(dāng)用戶提交一個(gè)Flink程序時(shí),會(huì)首先創(chuàng)建一個(gè)Client,該Client首先會(huì)對(duì)用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager。Client會(huì)將用戶提交的Flink程序組裝一個(gè)JobGraph, 并且是以JobGraph的形式提交的。一個(gè)JobGraph是一個(gè)Flink Dataflow,它由多個(gè)JobVertex組成的DAG。其中,一個(gè)JobGraph包含了一個(gè)Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。

JobGraph是一個(gè)Job的用戶邏輯視圖表示,將一個(gè)用戶要對(duì)數(shù)據(jù)流進(jìn)行的處理表示為單個(gè)DAG圖(對(duì)應(yīng)于JobGraph),DAG圖由頂點(diǎn)(JobVertex)和中間結(jié)果集(IntermediateDataSet)組成,其中JobVertex表示了對(duì)數(shù)據(jù)流進(jìn)行的轉(zhuǎn)換操作,比如map、flatMap、filter、keyBy等操作

調(diào)度機(jī)制

在JobManager,?會(huì)將一個(gè)JobGraph轉(zhuǎn)換映射為一個(gè)ExecutionGraph.?ExecutionGraph是JobGraph的并行表示.?

Execution,是一個(gè)ExecutionVertex的一次運(yùn)行Attempt,一個(gè)ExecutionVertex可能對(duì)應(yīng)多個(gè)運(yùn)行狀態(tài)的Execution.?每個(gè)Execution通過ExecutionAttemptID來唯一標(biāo)識(shí),在TaskManager和JobManager之間進(jìn)行Task狀態(tài)的交換都是通過ExecutionAttemptID來實(shí)現(xiàn)的

迭代機(jī)制

在機(jī)器學(xué)習(xí)和圖計(jì)算應(yīng)用中,都會(huì)使用到迭代計(jì)算,F(xiàn)link通過在迭代Operator中定義Step函數(shù)來實(shí)現(xiàn)迭代算法,這種迭代算法包括

?????????Iterate (全量更新) : 每次迭代輸入完整的上次迭代的數(shù)據(jù)集

????????和Delta Iterate (只更新增量)


比如,我們現(xiàn)在已知一個(gè)Solution集合中保存的是,已有的商品分類大類中購買量最多的商品,而Workset輸入的是來自線上實(shí)時(shí)交易中最新達(dá)成購買的商品的人數(shù),經(jīng)過計(jì)算會(huì)生成新的商品分類大類中商品購買量最多的結(jié)果,如果某些大類中商品購買量突然增長,它需要更新Solution Set中的結(jié)果(原來購買量最多的商品,經(jīng)過增量迭代計(jì)算,可能已經(jīng)不是最多),最后會(huì)輸出最終商品分類大類中購買量最多的商品結(jié)果集合

Backpressure監(jiān)控

一個(gè)Stream上進(jìn)行處理的多個(gè)Operator之間,它們處理速度和方式可能非常不同,所以就存在上游Operator如果處理速度過快,下游Operator處可能機(jī)會(huì)堆積Stream記錄.如果下游Operator能夠?qū)⒆约禾幚頎顟B(tài)傳播給上游Operator,使得上游Operator處理速度慢下來就會(huì)緩解上述問題

Flink Web界面上提供了對(duì)運(yùn)行Job的Backpressure行為的監(jiān)控,它通過使用Sampling線程對(duì)正在運(yùn)行的Task進(jìn)行堆棧跟蹤采樣來實(shí)現(xiàn)


如果getStackTrace 發(fā)現(xiàn)task 一直suck in certain method call, then it indicates there is a backpressure for the task

Flink流處理容錯(cuò)

批處理系統(tǒng)比較容易實(shí)現(xiàn)容錯(cuò)機(jī)制,由于文件可以重復(fù)訪問,當(dāng)某個(gè)任務(wù)失敗后,重啟該任務(wù)即可。但是到了流處理系統(tǒng),F(xiàn)link基于分布式快照與可部分重發(fā)的數(shù)據(jù)源實(shí)現(xiàn)了容錯(cuò)。 主要的核心為Asynchronous Barrier Snapshots (即Chandy Lamport Algorithm 算法的變種https://yq.aliyun.com/articles/448900)

按照用戶自定義的分布式快照間隔時(shí)間,F(xiàn)link會(huì)定時(shí)在所有數(shù)據(jù)源中插入一種特殊的快照標(biāo)記(barrier)消息,這些快照標(biāo)記消息和其他消息一樣在DAG中流動(dòng),每一個(gè)快照標(biāo)記消息都將其所在的數(shù)據(jù)流分成兩部分:本次快照(snapshot)數(shù)據(jù)和下次快照數(shù)據(jù)。快照標(biāo)記消息沿著DAG流經(jīng)各個(gè)操作符,當(dāng)操作符處理到快照標(biāo)記消息時(shí),會(huì)對(duì)自己的狀態(tài)進(jìn)行快照,并存儲(chǔ)起來。當(dāng)一個(gè)操作符有多個(gè)輸入的時(shí)候,F(xiàn)link會(huì)將先抵達(dá)的快照標(biāo)記消息及其之后的消息緩存起來(stream aligning),當(dāng)所有的輸入中對(duì)應(yīng)該次快照的快照標(biāo)記消息全部抵達(dá)后,操作符對(duì)自己的狀態(tài)快照并存儲(chǔ),之后處理所有快照標(biāo)記消息之后的已緩存消息。操作符對(duì)自己的狀態(tài)快照并存儲(chǔ)可以是異步與增量的操作,并不需要阻塞消息的處理. 一旦最后一個(gè)Stream接收到Barrier n,Operator會(huì)emit所有暫存在Buffer中的記錄,然后向Checkpoint Coordinator發(fā)送Snapshot n

Lightweight approach of storing the state of all operators without pausing the execution. Implemented using?barriers?flowing?through the topology

當(dāng)然,因?yàn)镾tream Aligning?為了排列對(duì)齊Barrier,會(huì)暫時(shí)緩存一部分Stream的記錄到Buffer中,會(huì)造成一部分延遲。在Flink中,提供了一個(gè)開關(guān),選擇是否使用Stream Aligning,如果關(guān)掉則Exactly Once會(huì)變成At least once

這種實(shí)現(xiàn)擁有如下的優(yōu)勢:?

1.? 低延遲。由于操作符狀態(tài)的存儲(chǔ)可以異步,所以進(jìn)行快照的過程基本上不會(huì)阻塞消息的處理,因此不會(huì)對(duì)消息延遲產(chǎn)生負(fù)面影響。

2.? 高吞吐量。當(dāng)操作符狀態(tài)較少時(shí),對(duì)吞吐量基本沒有影響。當(dāng)操作符狀態(tài)較多時(shí),相對(duì)于其他的容錯(cuò)機(jī)制,分布式快照的時(shí)間間隔是用戶自定義的,所以用戶可以權(quán)衡錯(cuò)誤恢復(fù)時(shí)間和吞吐量要求來調(diào)整分布式快照的時(shí)間間隔。

????錯(cuò)誤恢復(fù)代價(jià)。分布式快照的時(shí)間間隔越短,錯(cuò)誤恢復(fù)的時(shí)間越少,與吞吐量負(fù)相關(guān)。

3. 與業(yè)務(wù)邏輯的隔離。Flink的分布式快照機(jī)制與用戶的業(yè)務(wù)邏輯是完全隔離的,用戶的業(yè)務(wù)邏輯不會(huì)依賴或是對(duì)分布式快照產(chǎn)生任何影響。


將所有task的狀態(tài)做一個(gè)快照(snapshot), 即為checkpoint,然后存儲(chǔ)到memory/file syste, with a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file

Snapshot并不僅僅是對(duì)數(shù)據(jù)流做了一個(gè)狀態(tài)的Checkpoint(一個(gè)Flink Job,在一個(gè)特定時(shí)刻的一份全局狀態(tài)快照),它也包含了一個(gè)Operator內(nèi)部所持有的狀態(tài),這樣才能夠在保證在流處理系統(tǒng)失敗時(shí)能夠正確地恢復(fù)數(shù)據(jù)流處理。也就是說,如果一個(gè)Operator包含任何形式的狀態(tài),這種狀態(tài)必須是Snapshot的一部分

State(state. Streaming應(yīng)用的狀態(tài), Element in window is state, 一個(gè)具體的task/operator的狀態(tài)) 會(huì)被存儲(chǔ)到一個(gè)可配置的存儲(chǔ)系統(tǒng)中,例如HDFS。在一個(gè)Checkpoint執(zhí)行過程中,存儲(chǔ)的狀態(tài)信息及其交互

SavePoint

Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered

A Savepoint is a consistent image of the execution state of a streaming job, created via Flink’s?checkpointing mechanism. You can use Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file

Flink’s Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink

Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume

Out of the box, Flink bundles these state backends:?MemoryStateBackend,?FsStateBackend,?RocksDBStateBackend,?If nothing else is configured, the system will use the MemoryStateBackend.

? How to do something with the data? Windowing

? How does the system handle large windows? Managed state?

? How do operate such a system 24x7? Savepoints?

? How to ensure correct results across failures? Checkpoints, Master HA?

Savepoint: Create an addressable copy of a job’s current state.Restart a job from any savepoint.

Flink流處理的時(shí)間窗口

對(duì)于流處理系統(tǒng)來說,流入的消息不存在上限,所以對(duì)于聚合或是連接等操作,流處理系統(tǒng)需要對(duì)流入的消息進(jìn)行分段,然后基于每一段數(shù)據(jù)進(jìn)行聚合或是連接。消息的分段即稱為窗口(window),流處理系統(tǒng)支持的窗口有很多類型,最常見的就是時(shí)間窗口,基于時(shí)間間隔對(duì)消息進(jìn)行分段處理.

有關(guān)窗口操作的不同類型,可以分為如下幾種:傾斜窗口(Tumbling Windows,記錄沒有重疊)、滑動(dòng)窗口(Slide Windows,記錄有重疊)、會(huì)話窗口(Session Windows)

由于不同節(jié)點(diǎn)的時(shí)鐘可能不同,以及消息在流經(jīng)各個(gè)節(jié)點(diǎn)的延遲不同,在某個(gè)節(jié)點(diǎn)屬于同一個(gè)時(shí)間窗口處理的消息,流到下一個(gè)節(jié)點(diǎn)時(shí)可能被切分到不同的時(shí)間窗口中,從而產(chǎn)生不符合預(yù)期的結(jié)果。

Flink支持3種類型的時(shí)間窗口,分別適用于用戶對(duì)于時(shí)間窗口不同類型的要求:

????1.? Operator Time。根據(jù)Task所在節(jié)點(diǎn)的本地時(shí)鐘來切分的時(shí)間窗口。

????2.? Event Time。消息自帶時(shí)間戳,根據(jù)消息的時(shí)間戳進(jìn)行處理,確保時(shí)間戳在同一個(gè)時(shí)間窗口的所有消息一定會(huì)被正確處理。由于消息可能亂序流入Task,所以Task需要緩存當(dāng)前時(shí)間窗口消息處理的狀態(tài),直到確認(rèn)屬于該時(shí)間窗口的所有消息都被處理,才可以釋放,如果亂序的消息延遲很高會(huì)影響分布式系統(tǒng)的吞吐量和延遲。

????3.? Ingress Time。有時(shí)消息本身并不帶有時(shí)間戳信息,但用戶依然希望按照消息而不是節(jié)點(diǎn)時(shí)鐘劃分時(shí)間窗口,例如避免上面提到的第二個(gè)問題,此時(shí)可以在消息源流入Flink時(shí)由Flink自動(dòng)生成時(shí)間戳,之后處理的流程與Event Time相同。Ingress Time可以看成是Event Time的一個(gè)特例,由于其在消息源處時(shí)間戳一定是有序的,所以在流處理系統(tǒng)中,相對(duì)于Event Time,其亂序的消息延遲不會(huì)很高,因此對(duì)Flink分布式系統(tǒng)的吞吐量和延遲的影響也會(huì)更小。

Event Time時(shí)間窗口的實(shí)現(xiàn) - WaterMark

Flink借鑒了Google的MillWheel項(xiàng)目,通過WaterMark來支持基于Event Time的時(shí)間窗口。

但是由于消息可能是亂序的,所以操作符無法直接確認(rèn)何時(shí)所有屬于該時(shí)間窗口的消息全部流入此操作符。WaterMark包含一個(gè)時(shí)間戳,F(xiàn)link使用WaterMark標(biāo)記所有小于該時(shí)間戳的消息都已流入,

Flink的數(shù)據(jù)源在確認(rèn)所有小于某個(gè)時(shí)間戳的消息都已輸出到Flink流處理系統(tǒng)后,會(huì)生成一個(gè)包含該時(shí)間戳的WaterMark,插入到消息流中輸出到Flink流處理系統(tǒng)中,F(xiàn)link操作符按照時(shí)間窗口緩存所有流入的消息,當(dāng)操作符處理到WaterMark時(shí),它對(duì)所有小于該WaterMark時(shí)間戳的時(shí)間窗口數(shù)據(jù)進(jìn)行處理并發(fā)送到下一個(gè)操作符節(jié)點(diǎn)

為了保證能夠處理所有屬于某個(gè)時(shí)間窗口的消息,操作符必須等到大于這個(gè)時(shí)間窗口的WaterMark之后才能開始對(duì)該時(shí)間窗口的消息進(jìn)行處理,相對(duì)于基于Operator Time的時(shí)間窗口,F(xiàn)link需要占用更多內(nèi)存,且會(huì)直接影響消息處理的延遲時(shí)間。對(duì)此,一個(gè)可能的優(yōu)化措施是,對(duì)于聚合類的操作符,可以提前對(duì)部分消息進(jìn)行聚合操作,當(dāng)有屬于該時(shí)間窗口的新消息流入時(shí),基于之前的部分聚合結(jié)果繼續(xù)計(jì)算

基于時(shí)間戳的排序

在Flink流處理系統(tǒng)中,基于WaterMark,F(xiàn)link實(shí)現(xiàn)了基于時(shí)間戳的全局排序。排序的實(shí)現(xiàn)思路如下:排序操作符緩存所有流入的消息,當(dāng)其接收到WaterMark時(shí),對(duì)時(shí)間戳小于該WaterMark的消息進(jìn)行排序

由于WaterMark保證了在其之后不會(huì)出現(xiàn)時(shí)間戳比它小的消息,所以可以保證排序的正確性。需要注意的是,如果排序操作符有多個(gè)節(jié)點(diǎn),只能保證每個(gè)節(jié)點(diǎn)的流出消息是有序的,節(jié)點(diǎn)之間的消息不能保證有序,要實(shí)現(xiàn)全局有序,則只能有一個(gè)排序操作符節(jié)點(diǎn)。

Flink內(nèi)存管理

flink 基于大數(shù)據(jù)計(jì)算場景設(shè)計(jì)的特殊內(nèi)存管理。 而不是向JAVA 這種面向多種目的的編程語言。與之類似的為Spark Tungsten project 也在逐步實(shí)現(xiàn)采用類似的技術(shù)

顯式內(nèi)存管理的前提步驟就是序列化,將Java對(duì)象序列化成二進(jìn)制數(shù)據(jù)存儲(chǔ)在內(nèi)存上。

????on heap:數(shù)據(jù)存儲(chǔ)在 java heap 上, 會(huì)被GC

? ? 或是off-heap:不在JVM但還在內(nèi)存,不會(huì)被GC?

1. 所有的JVM由統(tǒng)一的內(nèi)存管理器管理,避免內(nèi)存碎片。 且數(shù)據(jù)都以二進(jìn)制存儲(chǔ), 垃圾回收壓力低

2.?只將操作相關(guān)的數(shù)據(jù)連續(xù)存儲(chǔ),可以最大化的利用L1/L2/L3緩存,減少Cache miss的概率。以排序?yàn)槔?,?)將所有排序數(shù)據(jù)的Key與Value分開存儲(chǔ),排序時(shí)只需對(duì)key 和指向value的指針進(jìn)行交換 ,(2) 并對(duì)Key連續(xù)存儲(chǔ),那么訪問Key時(shí)的Cache命中率會(huì)大大提高

3. 定制化的序列工具:

? ? a. 由于處理的數(shù)據(jù)流為同一類型, 可只保存一份對(duì)象的schema

? ? b. 對(duì)于固定大小的類型, 可通過固定的偏移位置存取。這樣在訪問對(duì)象成員變量時(shí), 只需要通過偏移量反序列化特定的對(duì)象成員變量。并不需要反序列化整個(gè)Java對(duì)象, 這樣能夠大大減少Java對(duì)象的創(chuàng)建開銷,以及內(nèi)存數(shù)據(jù)的拷貝大小

????c. 對(duì)數(shù)據(jù)集的類型信息進(jìn)行分析,然后自動(dòng)生成定制的序列化工具類,.同時(shí)自動(dòng)生成TypeComparator,用來輔助直接對(duì)序列化后的二進(jìn)制數(shù)據(jù)直接進(jìn)行compare、hash等操作. (java.io.Serializable將Java對(duì)象及其成員變量的所有元信息作為其序列化數(shù)據(jù)的一部分,序列化后的數(shù)據(jù)包含了所有反序列化所需的信息。這在某些場景中十分必要,但是對(duì)于Flink這樣的分布式計(jì)算框架來說,這些元數(shù)據(jù)信息可能是冗余數(shù)據(jù))

4. 顯示內(nèi)存管理

Flink implements its algorithms not against Java objects, arrays, or lists, but actually against a data structure similar to?java.nio.ByteBuffer. Flink uses its own specialized version, called?MemorySegment?on which algorithms put and get at specific positions ints, longs, byte arrays, etc, and compare and copy memory

Flink將內(nèi)存分為3個(gè)部分:

? ? (1)Network buffers: 一些以32KB Byte數(shù)組為單位的buffer,主要被網(wǎng)絡(luò)模塊用于數(shù)據(jù)的網(wǎng)絡(luò)傳輸。

? ? (2)Memory Manager pool 大量32KB (默認(rèn),相當(dāng)大的空間) Byte數(shù)組為單位的內(nèi)存池,所有的運(yùn)行時(shí)算法(例如Sort/Shuffle/Join)都從這個(gè)內(nèi)存池申請(qǐng)內(nèi)存,并將序列化后的數(shù)據(jù)存儲(chǔ)其中,結(jié)束后釋放回內(nèi)存池。

? ? (3)Remaining (Free) Heap主要留給UDF中用戶自己創(chuàng)建的Java對(duì)象,由JVM管理。

所有的運(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)和算法只能通過內(nèi)存池申請(qǐng)內(nèi)存,保證了其使用的內(nèi)存大小是固定的, ?進(jìn)行對(duì)象大小檢查和閾值設(shè)置(spillling to disk), 不會(huì)因?yàn)檫\(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)和算法而發(fā)生OOM。

off-heap的內(nèi)存管理支持。好處有:(http://sungsoo.github.io/2015/09/24/fllink-off-heap.html)

啟動(dòng)分配了大內(nèi)存(例如100G)的JVM很耗費(fèi)時(shí)間,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都會(huì)很小,垃圾回收也不用考慮MemorySegment中的Java對(duì)象了。

更有效率的IO操作。在off-heap下,將MemorySegment寫到磁盤或是網(wǎng)絡(luò)可以支持zeor-copy技術(shù),而on-heap的話則至少需要一次內(nèi)存拷貝. (Zero-copy 指CPU不需要先將數(shù)據(jù)從某處內(nèi)存復(fù)制到另一個(gè)特定區(qū)域。 在Java 7 中新加入的零拷貝機(jī)制,使原來將數(shù)據(jù)從磁盤寫入網(wǎng)卡需要經(jīng)過四次拷貝,縮減到了三次。零拷貝機(jī)制能夠省去其中從用戶端到內(nèi)核端的數(shù)據(jù)拷貝過程。關(guān)于零復(fù)制的介紹,可以參照下面的資料https://www.ibm.com/developerworks/cn/java/j-zerocopy/?)

off-heap可用于錯(cuò)誤恢復(fù),比如JVM崩潰,在on-heap時(shí)數(shù)據(jù)也隨之丟失,但在off-heap下,off-heap的數(shù)據(jù)可能還在。此外,off-heap上的數(shù)據(jù)還可以和其他程序和進(jìn)程共享, 暫時(shí)Flink 還沒有利用這點(diǎn) (https://flink.apache.org/news/2015/09/16/off-heap-memory.html)


參考文獻(xiàn):

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

http://sungsoo.github.io/2015/09/24/fllink-off-heap.html

http://www.itdecent.cn/p/c4d6a7230973

https://qconlondon.com/london-2016/system/files/presentation-slides/robertmetzger.pdf

https://www.oreilly.com/library/view/stream-processing-with/9781491974285/ch01.html

http://www.itdecent.cn/p/b74909d47fb9

http://shiyanjun.cn/archives/1508.html

https://flink.apache.org/news/2015/09/16/off-heap-memory.html

https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/iterations.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容