背景知識(shí)
低延遲 vs 高吞吐
流處理系統(tǒng)與批處理系統(tǒng)最大不同在于節(jié)點(diǎn)間的數(shù)據(jù)傳輸方式。
對(duì)于一個(gè)流處理系統(tǒng),當(dāng)一條數(shù)據(jù)被處理完成后,序列化到緩存中,然后立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn),由下一個(gè)節(jié)點(diǎn)繼續(xù)處理。而對(duì)于一個(gè)批處理系統(tǒng),其節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)臉?biāo)準(zhǔn)模型是:當(dāng)一條數(shù)據(jù)被處理完成后,序列化到緩存中,并不會(huì)立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn),當(dāng)緩存寫滿,就持久化到本地硬盤上,當(dāng)所有數(shù)據(jù)都被處理完成后,才開始將處理后的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)较乱粋€(gè)節(jié)點(diǎn)。
這兩種數(shù)據(jù)傳輸模式是兩個(gè)極端,對(duì)應(yīng)的是流處理系統(tǒng)對(duì)低延遲的要求和批處理系統(tǒng)對(duì)高吞吐量的要求。?
Flink以固定的緩存塊為單位進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)傳輸,用戶可以通過緩存塊超時(shí)值指定緩存塊的傳輸時(shí)機(jī)。如果緩存塊的超時(shí)值為0,則Flink的數(shù)據(jù)傳輸方式類似上文所提到流處理系統(tǒng)的標(biāo)準(zhǔn)模型,此時(shí)系統(tǒng)可以獲得最低的處理延遲。如果緩存塊的超時(shí)值為無限大,則Flink的數(shù)據(jù)傳輸方式類似上文所提到批處理系統(tǒng)的標(biāo)準(zhǔn)模型,此時(shí)系統(tǒng)可以獲得最高的吞吐量。同時(shí)緩存塊的超時(shí)值也可以設(shè)置為0到無限大之間的任意值。緩存塊的超時(shí)閾值越小,則Flink流處理執(zhí)行引擎的數(shù)據(jù)處理延遲越低,但吞吐量也會(huì)降低,反之亦然。通過調(diào)整緩存塊的超時(shí)閾值,用戶可根據(jù)需求靈活地權(quán)衡系統(tǒng)延遲和吞吐量
Stream & Transformation & Operator
用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個(gè)基本構(gòu)建塊組成,其中Stream是一個(gè)中間結(jié)果數(shù)據(jù),而Transformation是一個(gè)操作,它對(duì)一個(gè)或多個(gè)輸入Stream進(jìn)行計(jì)算處理,輸出一個(gè)或多個(gè)結(jié)果Stream。當(dāng)一個(gè)Flink程序被執(zhí)行的時(shí)候,它會(huì)被映射為Streaming Dataflow。一個(gè)Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似于一個(gè)DAG圖,在啟動(dòng)的時(shí)候從一個(gè)或多個(gè)Source Operator開始,結(jié)束于一個(gè)或多個(gè)Sink Operator。
下面是一個(gè)由Flink程序映射為Streaming Dataflow的示意圖:

Parallel Dataflow
在Flink中,程序天生是并行和分布式的:一個(gè)Stream可以被分成多個(gè)Stream分區(qū)(Stream Partitions),一個(gè)Operator可以被分成多個(gè)Operator Subtask,每一個(gè)Operator Subtask是在不同的線程中獨(dú)立執(zhí)行的。一個(gè)Operator的并行度,等于Operator Subtask的個(gè)數(shù),一個(gè)Stream的并行度總是等于生成它的Operator的并行度。
在Flink分布式執(zhí)行環(huán)境中,會(huì)將多個(gè)Operator Subtask串起來組成一個(gè)Operator Chain,實(shí)際上就是一個(gè)執(zhí)行鏈,每個(gè)執(zhí)行鏈會(huì)在TaskManager上一個(gè)獨(dú)立的線程中執(zhí)行
組件棧
Flink是一個(gè)分層架構(gòu)的系統(tǒng),每一層所包含的組件都提供了特定的抽象,用來服務(wù)于上層組件。Flink分層的組件棧如下圖所示:

Flink on YARN的部署模式?
Flink YARN Client負(fù)責(zé)與YARN RM(Resouce manager)通信協(xié)商資源請(qǐng)求,F(xiàn)link JobManager和Flink TaskManager分別申請(qǐng)到Container去運(yùn)行各自的進(jìn)程。通過上圖可以看到,YARN AM(application master)與Flink JobManager在同一個(gè)Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請(qǐng)Container去啟動(dòng)Flink TaskManager。待Flink成功運(yùn)行在YARN集群上,F(xiàn)link YARN Client就可以提交Flink Job到Flink JobManager,并進(jìn)行后續(xù)的映射、調(diào)度和計(jì)算處理?

Runtime層: 提供了支持Flink計(jì)算的全部核心實(shí)現(xiàn),比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調(diào)度等等,為上層API層提供基礎(chǔ)服務(wù)。
API層: 主要實(shí)現(xiàn)了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對(duì)應(yīng)DataStream API,面向批處理對(duì)應(yīng)DataSet API。
Libraries層: 該層也可以稱為Flink應(yīng)用框架層,根據(jù)API層的劃分,在API層之上構(gòu)建的滿足特定應(yīng)用的實(shí)現(xiàn)計(jì)算框架,也分別對(duì)應(yīng)于面向流處理和面向批處理兩類。面向流處理支持:CEP(復(fù)雜事件處理)、基于SQL-like的操作(基于Table的關(guān)系操作);面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)、Gelly(圖處理)。
基本架構(gòu)
Flink系統(tǒng)的架構(gòu)與Spark類似,是一個(gè)基于Master-Slave風(fēng)格的架構(gòu),如下圖所示:

如上圖所示,F(xiàn)link系統(tǒng)主要包含如下3個(gè)主要的進(jìn)程:
? ??JobManager:?系統(tǒng)的協(xié)調(diào)者,它負(fù)責(zé)接收Flink Job,調(diào)度組成Job的多個(gè)Task的執(zhí)行。同時(shí),JobManager還負(fù)責(zé)收集Job的狀態(tài)信息,并管理Flink集群中從節(jié)點(diǎn)TaskManager
? ??TaskManager:?是一個(gè)Actor,它是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,在其上執(zhí)行Flink Job的一組Task。每個(gè)TaskManager負(fù)責(zé)管理其所在節(jié)點(diǎn)上的資源信息,如內(nèi)存、磁盤、網(wǎng)絡(luò),在啟動(dòng)的時(shí)候?qū)①Y源的狀態(tài)向JobManager匯報(bào)。TaskManager端可以分成兩個(gè)階段:
????????注冊(cè)階段: TaskManager會(huì)向JobManager注冊(cè),發(fā)送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以進(jìn)行初始化過程。
????????可操作階段: 該階段TaskManager可以接收并處理與Task有關(guān)的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯(lián)系,會(huì)自動(dòng)進(jìn)入“注冊(cè)階段”,只有完成注冊(cè)才能繼續(xù)處理Task相關(guān)的消息。
? ??????Client?當(dāng)用戶提交一個(gè)Flink程序時(shí),會(huì)首先創(chuàng)建一個(gè)Client,該Client首先會(huì)對(duì)用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager。Client會(huì)將用戶提交的Flink程序組裝一個(gè)JobGraph, 并且是以JobGraph的形式提交的。一個(gè)JobGraph是一個(gè)Flink Dataflow,它由多個(gè)JobVertex組成的DAG。其中,一個(gè)JobGraph包含了一個(gè)Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。

調(diào)度機(jī)制
在JobManager,?會(huì)將一個(gè)JobGraph轉(zhuǎn)換映射為一個(gè)ExecutionGraph.?ExecutionGraph是JobGraph的并行表示.?
Execution,是一個(gè)ExecutionVertex的一次運(yùn)行Attempt,一個(gè)ExecutionVertex可能對(duì)應(yīng)多個(gè)運(yùn)行狀態(tài)的Execution.?每個(gè)Execution通過ExecutionAttemptID來唯一標(biāo)識(shí),在TaskManager和JobManager之間進(jìn)行Task狀態(tài)的交換都是通過ExecutionAttemptID來實(shí)現(xiàn)的
迭代機(jī)制
在機(jī)器學(xué)習(xí)和圖計(jì)算應(yīng)用中,都會(huì)使用到迭代計(jì)算,F(xiàn)link通過在迭代Operator中定義Step函數(shù)來實(shí)現(xiàn)迭代算法,這種迭代算法包括
?????????Iterate (全量更新) : 每次迭代輸入完整的上次迭代的數(shù)據(jù)集
????????和Delta Iterate (只更新增量)

Backpressure監(jiān)控
一個(gè)Stream上進(jìn)行處理的多個(gè)Operator之間,它們處理速度和方式可能非常不同,所以就存在上游Operator如果處理速度過快,下游Operator處可能機(jī)會(huì)堆積Stream記錄.如果下游Operator能夠?qū)⒆约禾幚頎顟B(tài)傳播給上游Operator,使得上游Operator處理速度慢下來就會(huì)緩解上述問題
Flink Web界面上提供了對(duì)運(yùn)行Job的Backpressure行為的監(jiān)控,它通過使用Sampling線程對(duì)正在運(yùn)行的Task進(jìn)行堆棧跟蹤采樣來實(shí)現(xiàn)

Flink流處理容錯(cuò)
批處理系統(tǒng)比較容易實(shí)現(xiàn)容錯(cuò)機(jī)制,由于文件可以重復(fù)訪問,當(dāng)某個(gè)任務(wù)失敗后,重啟該任務(wù)即可。但是到了流處理系統(tǒng),F(xiàn)link基于分布式快照與可部分重發(fā)的數(shù)據(jù)源實(shí)現(xiàn)了容錯(cuò)。 主要的核心為Asynchronous Barrier Snapshots (即Chandy Lamport Algorithm 算法的變種https://yq.aliyun.com/articles/448900)
按照用戶自定義的分布式快照間隔時(shí)間,F(xiàn)link會(huì)定時(shí)在所有數(shù)據(jù)源中插入一種特殊的快照標(biāo)記(barrier)消息,這些快照標(biāo)記消息和其他消息一樣在DAG中流動(dòng),每一個(gè)快照標(biāo)記消息都將其所在的數(shù)據(jù)流分成兩部分:本次快照(snapshot)數(shù)據(jù)和下次快照數(shù)據(jù)。快照標(biāo)記消息沿著DAG流經(jīng)各個(gè)操作符,當(dāng)操作符處理到快照標(biāo)記消息時(shí),會(huì)對(duì)自己的狀態(tài)進(jìn)行快照,并存儲(chǔ)起來。當(dāng)一個(gè)操作符有多個(gè)輸入的時(shí)候,F(xiàn)link會(huì)將先抵達(dá)的快照標(biāo)記消息及其之后的消息緩存起來(stream aligning),當(dāng)所有的輸入中對(duì)應(yīng)該次快照的快照標(biāo)記消息全部抵達(dá)后,操作符對(duì)自己的狀態(tài)快照并存儲(chǔ),之后處理所有快照標(biāo)記消息之后的已緩存消息。操作符對(duì)自己的狀態(tài)快照并存儲(chǔ)可以是異步與增量的操作,并不需要阻塞消息的處理. 一旦最后一個(gè)Stream接收到Barrier n,Operator會(huì)emit所有暫存在Buffer中的記錄,然后向Checkpoint Coordinator發(fā)送Snapshot n

當(dāng)然,因?yàn)镾tream Aligning?為了排列對(duì)齊Barrier,會(huì)暫時(shí)緩存一部分Stream的記錄到Buffer中,會(huì)造成一部分延遲。在Flink中,提供了一個(gè)開關(guān),選擇是否使用Stream Aligning,如果關(guān)掉則Exactly Once會(huì)變成At least once

這種實(shí)現(xiàn)擁有如下的優(yōu)勢:?
1.? 低延遲。由于操作符狀態(tài)的存儲(chǔ)可以異步,所以進(jìn)行快照的過程基本上不會(huì)阻塞消息的處理,因此不會(huì)對(duì)消息延遲產(chǎn)生負(fù)面影響。
2.? 高吞吐量。當(dāng)操作符狀態(tài)較少時(shí),對(duì)吞吐量基本沒有影響。當(dāng)操作符狀態(tài)較多時(shí),相對(duì)于其他的容錯(cuò)機(jī)制,分布式快照的時(shí)間間隔是用戶自定義的,所以用戶可以權(quán)衡錯(cuò)誤恢復(fù)時(shí)間和吞吐量要求來調(diào)整分布式快照的時(shí)間間隔。
????錯(cuò)誤恢復(fù)代價(jià)。分布式快照的時(shí)間間隔越短,錯(cuò)誤恢復(fù)的時(shí)間越少,與吞吐量負(fù)相關(guān)。
3. 與業(yè)務(wù)邏輯的隔離。Flink的分布式快照機(jī)制與用戶的業(yè)務(wù)邏輯是完全隔離的,用戶的業(yè)務(wù)邏輯不會(huì)依賴或是對(duì)分布式快照產(chǎn)生任何影響。
將所有task的狀態(tài)做一個(gè)快照(snapshot), 即為checkpoint,然后存儲(chǔ)到memory/file syste, with a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file
Snapshot并不僅僅是對(duì)數(shù)據(jù)流做了一個(gè)狀態(tài)的Checkpoint(一個(gè)Flink Job,在一個(gè)特定時(shí)刻的一份全局狀態(tài)快照),它也包含了一個(gè)Operator內(nèi)部所持有的狀態(tài),這樣才能夠在保證在流處理系統(tǒng)失敗時(shí)能夠正確地恢復(fù)數(shù)據(jù)流處理。也就是說,如果一個(gè)Operator包含任何形式的狀態(tài),這種狀態(tài)必須是Snapshot的一部分
State(state. Streaming應(yīng)用的狀態(tài), Element in window is state, 一個(gè)具體的task/operator的狀態(tài)) 會(huì)被存儲(chǔ)到一個(gè)可配置的存儲(chǔ)系統(tǒng)中,例如HDFS。在一個(gè)Checkpoint執(zhí)行過程中,存儲(chǔ)的狀態(tài)信息及其交互
SavePoint
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered
A Savepoint is a consistent image of the execution state of a streaming job, created via Flink’s?checkpointing mechanism. You can use Savepoints to stop-and-resume, fork, or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, …) and a (relatively small) meta data file
Flink’s Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink
Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume
Out of the box, Flink bundles these state backends:?MemoryStateBackend,?FsStateBackend,?RocksDBStateBackend,?If nothing else is configured, the system will use the MemoryStateBackend.
? How to do something with the data? Windowing
? How does the system handle large windows? Managed state?
? How do operate such a system 24x7? Savepoints?
? How to ensure correct results across failures? Checkpoints, Master HA?

對(duì)于流處理系統(tǒng)來說,流入的消息不存在上限,所以對(duì)于聚合或是連接等操作,流處理系統(tǒng)需要對(duì)流入的消息進(jìn)行分段,然后基于每一段數(shù)據(jù)進(jìn)行聚合或是連接。消息的分段即稱為窗口(window),流處理系統(tǒng)支持的窗口有很多類型,最常見的就是時(shí)間窗口,基于時(shí)間間隔對(duì)消息進(jìn)行分段處理.
有關(guān)窗口操作的不同類型,可以分為如下幾種:傾斜窗口(Tumbling Windows,記錄沒有重疊)、滑動(dòng)窗口(Slide Windows,記錄有重疊)、會(huì)話窗口(Session Windows)
由于不同節(jié)點(diǎn)的時(shí)鐘可能不同,以及消息在流經(jīng)各個(gè)節(jié)點(diǎn)的延遲不同,在某個(gè)節(jié)點(diǎn)屬于同一個(gè)時(shí)間窗口處理的消息,流到下一個(gè)節(jié)點(diǎn)時(shí)可能被切分到不同的時(shí)間窗口中,從而產(chǎn)生不符合預(yù)期的結(jié)果。
Flink支持3種類型的時(shí)間窗口,分別適用于用戶對(duì)于時(shí)間窗口不同類型的要求:
????1.? Operator Time。根據(jù)Task所在節(jié)點(diǎn)的本地時(shí)鐘來切分的時(shí)間窗口。
????2.? Event Time。消息自帶時(shí)間戳,根據(jù)消息的時(shí)間戳進(jìn)行處理,確保時(shí)間戳在同一個(gè)時(shí)間窗口的所有消息一定會(huì)被正確處理。由于消息可能亂序流入Task,所以Task需要緩存當(dāng)前時(shí)間窗口消息處理的狀態(tài),直到確認(rèn)屬于該時(shí)間窗口的所有消息都被處理,才可以釋放,如果亂序的消息延遲很高會(huì)影響分布式系統(tǒng)的吞吐量和延遲。
????3.? Ingress Time。有時(shí)消息本身并不帶有時(shí)間戳信息,但用戶依然希望按照消息而不是節(jié)點(diǎn)時(shí)鐘劃分時(shí)間窗口,例如避免上面提到的第二個(gè)問題,此時(shí)可以在消息源流入Flink時(shí)由Flink自動(dòng)生成時(shí)間戳,之后處理的流程與Event Time相同。Ingress Time可以看成是Event Time的一個(gè)特例,由于其在消息源處時(shí)間戳一定是有序的,所以在流處理系統(tǒng)中,相對(duì)于Event Time,其亂序的消息延遲不會(huì)很高,因此對(duì)Flink分布式系統(tǒng)的吞吐量和延遲的影響也會(huì)更小。
Event Time時(shí)間窗口的實(shí)現(xiàn) - WaterMark
Flink借鑒了Google的MillWheel項(xiàng)目,通過WaterMark來支持基于Event Time的時(shí)間窗口。
但是由于消息可能是亂序的,所以操作符無法直接確認(rèn)何時(shí)所有屬于該時(shí)間窗口的消息全部流入此操作符。WaterMark包含一個(gè)時(shí)間戳,F(xiàn)link使用WaterMark標(biāo)記所有小于該時(shí)間戳的消息都已流入,
Flink的數(shù)據(jù)源在確認(rèn)所有小于某個(gè)時(shí)間戳的消息都已輸出到Flink流處理系統(tǒng)后,會(huì)生成一個(gè)包含該時(shí)間戳的WaterMark,插入到消息流中輸出到Flink流處理系統(tǒng)中,F(xiàn)link操作符按照時(shí)間窗口緩存所有流入的消息,當(dāng)操作符處理到WaterMark時(shí),它對(duì)所有小于該WaterMark時(shí)間戳的時(shí)間窗口數(shù)據(jù)進(jìn)行處理并發(fā)送到下一個(gè)操作符節(jié)點(diǎn)
為了保證能夠處理所有屬于某個(gè)時(shí)間窗口的消息,操作符必須等到大于這個(gè)時(shí)間窗口的WaterMark之后才能開始對(duì)該時(shí)間窗口的消息進(jìn)行處理,相對(duì)于基于Operator Time的時(shí)間窗口,F(xiàn)link需要占用更多內(nèi)存,且會(huì)直接影響消息處理的延遲時(shí)間。對(duì)此,一個(gè)可能的優(yōu)化措施是,對(duì)于聚合類的操作符,可以提前對(duì)部分消息進(jìn)行聚合操作,當(dāng)有屬于該時(shí)間窗口的新消息流入時(shí),基于之前的部分聚合結(jié)果繼續(xù)計(jì)算
在Flink流處理系統(tǒng)中,基于WaterMark,F(xiàn)link實(shí)現(xiàn)了基于時(shí)間戳的全局排序。排序的實(shí)現(xiàn)思路如下:排序操作符緩存所有流入的消息,當(dāng)其接收到WaterMark時(shí),對(duì)時(shí)間戳小于該WaterMark的消息進(jìn)行排序
由于WaterMark保證了在其之后不會(huì)出現(xiàn)時(shí)間戳比它小的消息,所以可以保證排序的正確性。需要注意的是,如果排序操作符有多個(gè)節(jié)點(diǎn),只能保證每個(gè)節(jié)點(diǎn)的流出消息是有序的,節(jié)點(diǎn)之間的消息不能保證有序,要實(shí)現(xiàn)全局有序,則只能有一個(gè)排序操作符節(jié)點(diǎn)。
Flink內(nèi)存管理
flink 基于大數(shù)據(jù)計(jì)算場景設(shè)計(jì)的特殊內(nèi)存管理。 而不是向JAVA 這種面向多種目的的編程語言。與之類似的為Spark Tungsten project 也在逐步實(shí)現(xiàn)采用類似的技術(shù)
顯式內(nèi)存管理的前提步驟就是序列化,將Java對(duì)象序列化成二進(jìn)制數(shù)據(jù)存儲(chǔ)在內(nèi)存上。
????on heap:數(shù)據(jù)存儲(chǔ)在 java heap 上, 會(huì)被GC
? ? 或是off-heap:不在JVM但還在內(nèi)存,不會(huì)被GC?
1. 所有的JVM由統(tǒng)一的內(nèi)存管理器管理,避免內(nèi)存碎片。 且數(shù)據(jù)都以二進(jìn)制存儲(chǔ), 垃圾回收壓力低
2.?只將操作相關(guān)的數(shù)據(jù)連續(xù)存儲(chǔ),可以最大化的利用L1/L2/L3緩存,減少Cache miss的概率。以排序?yàn)槔?,?)將所有排序數(shù)據(jù)的Key與Value分開存儲(chǔ),排序時(shí)只需對(duì)key 和指向value的指針進(jìn)行交換 ,(2) 并對(duì)Key連續(xù)存儲(chǔ),那么訪問Key時(shí)的Cache命中率會(huì)大大提高
3. 定制化的序列工具:
? ? a. 由于處理的數(shù)據(jù)流為同一類型, 可只保存一份對(duì)象的schema
? ? b. 對(duì)于固定大小的類型, 可通過固定的偏移位置存取。這樣在訪問對(duì)象成員變量時(shí), 只需要通過偏移量反序列化特定的對(duì)象成員變量。并不需要反序列化整個(gè)Java對(duì)象, 這樣能夠大大減少Java對(duì)象的創(chuàng)建開銷,以及內(nèi)存數(shù)據(jù)的拷貝大小
????c. 對(duì)數(shù)據(jù)集的類型信息進(jìn)行分析,然后自動(dòng)生成定制的序列化工具類,.同時(shí)自動(dòng)生成TypeComparator,用來輔助直接對(duì)序列化后的二進(jìn)制數(shù)據(jù)直接進(jìn)行compare、hash等操作. (java.io.Serializable將Java對(duì)象及其成員變量的所有元信息作為其序列化數(shù)據(jù)的一部分,序列化后的數(shù)據(jù)包含了所有反序列化所需的信息。這在某些場景中十分必要,但是對(duì)于Flink這樣的分布式計(jì)算框架來說,這些元數(shù)據(jù)信息可能是冗余數(shù)據(jù))
4. 顯示內(nèi)存管理
Flink implements its algorithms not against Java objects, arrays, or lists, but actually against a data structure similar to?java.nio.ByteBuffer. Flink uses its own specialized version, called?MemorySegment?on which algorithms put and get at specific positions ints, longs, byte arrays, etc, and compare and copy memory
Flink將內(nèi)存分為3個(gè)部分:
? ? (1)Network buffers: 一些以32KB Byte數(shù)組為單位的buffer,主要被網(wǎng)絡(luò)模塊用于數(shù)據(jù)的網(wǎng)絡(luò)傳輸。
? ? (2)Memory Manager pool 大量以32KB (默認(rèn),相當(dāng)大的空間) Byte數(shù)組為單位的內(nèi)存池,所有的運(yùn)行時(shí)算法(例如Sort/Shuffle/Join)都從這個(gè)內(nèi)存池申請(qǐng)內(nèi)存,并將序列化后的數(shù)據(jù)存儲(chǔ)其中,結(jié)束后釋放回內(nèi)存池。
? ? (3)Remaining (Free) Heap主要留給UDF中用戶自己創(chuàng)建的Java對(duì)象,由JVM管理。
所有的運(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)和算法只能通過內(nèi)存池申請(qǐng)內(nèi)存,保證了其使用的內(nèi)存大小是固定的, ?進(jìn)行對(duì)象大小檢查和閾值設(shè)置(spillling to disk), 不會(huì)因?yàn)檫\(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)和算法而發(fā)生OOM。
off-heap的內(nèi)存管理支持。好處有:(http://sungsoo.github.io/2015/09/24/fllink-off-heap.html)
啟動(dòng)分配了大內(nèi)存(例如100G)的JVM很耗費(fèi)時(shí)間,垃圾回收也很慢。如果采用off-heap,剩下的Network buffer和Remaining heap都會(huì)很小,垃圾回收也不用考慮MemorySegment中的Java對(duì)象了。
更有效率的IO操作。在off-heap下,將MemorySegment寫到磁盤或是網(wǎng)絡(luò)可以支持zeor-copy技術(shù),而on-heap的話則至少需要一次內(nèi)存拷貝. (Zero-copy 指CPU不需要先將數(shù)據(jù)從某處內(nèi)存復(fù)制到另一個(gè)特定區(qū)域。 在Java 7 中新加入的零拷貝機(jī)制,使原來將數(shù)據(jù)從磁盤寫入網(wǎng)卡需要經(jīng)過四次拷貝,縮減到了三次。零拷貝機(jī)制能夠省去其中從用戶端到內(nèi)核端的數(shù)據(jù)拷貝過程。關(guān)于零復(fù)制的介紹,可以參照下面的資料https://www.ibm.com/developerworks/cn/java/j-zerocopy/?)
off-heap可用于錯(cuò)誤恢復(fù),比如JVM崩潰,在on-heap時(shí)數(shù)據(jù)也隨之丟失,但在off-heap下,off-heap的數(shù)據(jù)可能還在。此外,off-heap上的數(shù)據(jù)還可以和其他程序和進(jìn)程共享, 暫時(shí)Flink 還沒有利用這點(diǎn) (https://flink.apache.org/news/2015/09/16/off-heap-memory.html)
參考文獻(xiàn):
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
http://sungsoo.github.io/2015/09/24/fllink-off-heap.html
http://www.itdecent.cn/p/c4d6a7230973
https://qconlondon.com/london-2016/system/files/presentation-slides/robertmetzger.pdf
https://www.oreilly.com/library/view/stream-processing-with/9781491974285/ch01.html
http://www.itdecent.cn/p/b74909d47fb9
http://shiyanjun.cn/archives/1508.html
https://flink.apache.org/news/2015/09/16/off-heap-memory.html
https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/iterations.html