一. 什么是Flink
- Apache Flink 是一個(gè)分布式流處理引擎, 提供了直觀且極豐富表達(dá)力的Api來(lái)實(shí)現(xiàn)有狀態(tài)的流式處理。
-
用于無(wú)界和有界數(shù)據(jù)流上的有狀態(tài)計(jì)算。Flink被設(shè)計(jì)為在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以?xún)?nèi)存中的速度和任何規(guī)模執(zhí)行計(jì)算。
image.png
二.Flink應(yīng)用場(chǎng)景
A. 事件驅(qū)動(dòng)型應(yīng)用
- 事件驅(qū)動(dòng)型應(yīng)用是一類(lèi)具有狀態(tài)的應(yīng)用,它從一個(gè)或多個(gè)事件流提取數(shù)據(jù),并根據(jù)到來(lái)的事件觸發(fā)計(jì)算、狀態(tài)更新或其他外部動(dòng)作。
-
事件驅(qū)動(dòng)型應(yīng)用無(wú)須查詢(xún)遠(yuǎn)程數(shù)據(jù)庫(kù),本地?cái)?shù)據(jù)訪問(wèn)使得它具有更高的吞吐和更低的延遲。
image.png
B. 數(shù)據(jù)分析應(yīng)用
- 數(shù)據(jù)分析任務(wù)需要從原始數(shù)據(jù)中提取有價(jià)值的信息和指標(biāo)。
- 傳統(tǒng)的分析方式通常是利用批查詢(xún),或?qū)⑹录涗浵聛?lái)并基于此有限數(shù)據(jù)集構(gòu)建應(yīng)用來(lái)完成。為了得到最新數(shù)據(jù)的分析結(jié)果,必須先將它們加入分析數(shù)據(jù)集并重新執(zhí)行查詢(xún)或運(yùn)行應(yīng)用,隨后將結(jié)果寫(xiě)入存儲(chǔ)系統(tǒng)或生成報(bào)告。
- Flink可以實(shí)時(shí)地進(jìn)行數(shù)據(jù)分析。和傳統(tǒng)模式下讀取有限數(shù)據(jù)集不同,流式查詢(xún)或應(yīng)用會(huì)接入實(shí)時(shí)事件流,并隨著事件消費(fèi)持續(xù)產(chǎn)生和更新結(jié)果。這些結(jié)果數(shù)據(jù)可能會(huì)寫(xiě)入外部數(shù)據(jù)庫(kù)系統(tǒng)或以?xún)?nèi)部狀態(tài)的形式維護(hù)。儀表展示應(yīng)用可以相應(yīng)地從外部數(shù)據(jù)庫(kù)讀取數(shù)據(jù)或直接查詢(xún)應(yīng)用的內(nèi)部狀態(tài)。
-
流式分析省掉了周期性的數(shù)據(jù)導(dǎo)入和查詢(xún)過(guò)程,因此從事件中獲取指標(biāo)的延遲更低。
image.png
C. 數(shù)據(jù)管道應(yīng)用
- 提取-轉(zhuǎn)換-加載(ETL)是一種在存儲(chǔ)系統(tǒng)之間進(jìn)行數(shù)據(jù)轉(zhuǎn)換和遷移的常用方法。ETL 作業(yè)通常會(huì)周期性地觸發(fā),將數(shù)據(jù)從事務(wù)型數(shù)據(jù)庫(kù)拷貝到分析型數(shù)據(jù)庫(kù)或數(shù)據(jù)倉(cāng)庫(kù)。
- 數(shù)據(jù)管道和 ETL 作業(yè)的用途相似,都可以轉(zhuǎn)換、豐富數(shù)據(jù),并將其從某個(gè)存儲(chǔ)系統(tǒng)移動(dòng)到另一個(gè)。但數(shù)據(jù)管道是以持續(xù)流模式運(yùn)行,而非周期性觸發(fā)。因此它支持從一個(gè)不斷生成數(shù)據(jù)的源頭讀取記錄,并將它們以低延遲移動(dòng)到終點(diǎn)。
-
和周期性 ETL 作業(yè)相比,持續(xù)數(shù)據(jù)管道可以明顯降低將數(shù)據(jù)移動(dòng)到目的端的延遲。此外,由于它能夠持續(xù)消費(fèi)和發(fā)送數(shù)據(jù),因此用途更廣,支持用例更多。
image.png
三. Flink特點(diǎn)
- Deploy Applications Anywhere
集成了所有常見(jiàn)的集群資源管理器,如Hadoop YARN、Apache Mesos 和 Kubernetes,也可作為一個(gè)獨(dú)立的集群運(yùn)行。 - Run Applications at any Scale
設(shè)計(jì)目的是在任何規(guī)模上運(yùn)行有狀態(tài)流應(yīng)用程序。應(yīng)用程序可能被并行化為數(shù)千個(gè)任務(wù),這些任務(wù)分布在集群中并同時(shí)執(zhí)行。所以應(yīng)用程序能夠充分利用無(wú)盡的 CPU、內(nèi)存、磁盤(pán)和網(wǎng)絡(luò) IO。而且 Flink 很容易維護(hù)非常大的應(yīng)用程序狀態(tài)。其異步和增量的檢查點(diǎn)算法對(duì)處理延遲產(chǎn)生最小的影響,同時(shí)保證精確一次狀態(tài)的一致性。 -
Leverage In-Memory Performance (利用內(nèi)存性能)
有狀態(tài)的 Flink 程序針對(duì)本地狀態(tài)訪問(wèn)進(jìn)行了優(yōu)化。任務(wù)的狀態(tài)始終保留在內(nèi)存中,如果狀態(tài)大小超過(guò)可用內(nèi)存,則會(huì)保存在能高效訪問(wèn)的磁盤(pán)數(shù)據(jù)結(jié)構(gòu)中。任務(wù)通過(guò)訪問(wèn)本地(通常在內(nèi)存中)狀態(tài)來(lái)進(jìn)行所有的計(jì)算,從而產(chǎn)生非常低的處理延遲。Flink 通過(guò)定期和異步地對(duì)本地狀態(tài)進(jìn)行持久化存儲(chǔ)來(lái)保證故障場(chǎng)景下精確一次的狀態(tài)一致性。
image.png
四. Flink 組件
A. Flink 基本組件
- Dispatcher:負(fù)責(zé)接收用戶(hù)提供的作業(yè),并且負(fù)責(zé)為這個(gè)新提交的作業(yè)拉起一個(gè)新的 JobManager 服務(wù)
- ResourceManager:負(fù)責(zé)資源的管理,在整個(gè) Flink 集群中只有一個(gè) ResourceManager,資源相關(guān)的內(nèi)容都由這個(gè)服務(wù)負(fù)責(zé) (TaskManager 處理槽)
- JobManager:控控制單個(gè)應(yīng)用的執(zhí)行,JobManager將JobGraph 轉(zhuǎn)換為 ExecutionGraph. JobManager向ResourceManager申請(qǐng)任務(wù)槽,一旦收到足夠數(shù)量的TaskManager任務(wù)槽,就會(huì)將ExecutionGraph任務(wù)分發(fā)給TaskManager。 JobManager 還要負(fù)責(zé)所有集中需要協(xié)調(diào)的工作,如創(chuàng)建檢查點(diǎn)。
-
TaskManager:是JVM工作進(jìn)程,每個(gè)TaskManager提供一定數(shù)量的工作槽
image.png
B. JobManager 數(shù)據(jù)結(jié)構(gòu)
- 在作業(yè)執(zhí)行期間,JobManager 會(huì)持續(xù)跟蹤各個(gè) task,決定何時(shí)調(diào)度下一個(gè)或一組 task,處理已完成的 task 或執(zhí)行失敗的情況
- JobManager 會(huì)接收到一個(gè) JobGraph,用來(lái)描述由多個(gè)算子頂點(diǎn) (JobVertex) 組成的數(shù)據(jù)流圖,以及中間結(jié)果數(shù)據(jù) (IntermediateDataSet)。
- JobManager 會(huì)將 JobGraph 轉(zhuǎn)成 ExecutionGraph。
- ExecutionVertex 會(huì)跟蹤子 task 的執(zhí)行狀態(tài)。ExecutionJobVertex 持有同一個(gè) JobVertext 的所有 ExecutionVertex ,并跟蹤整個(gè)算子的運(yùn)行狀態(tài)。

C. JobManager & TaskManager

D. Task slot
Task slot: 用于定義執(zhí)行資源, 一個(gè)TaskManager 有一個(gè)到多個(gè)Task slot
每個(gè)Task slot代表TaskManager的資源的固定子集。例如,具有三個(gè)插槽的TaskManager會(huì)將其托管內(nèi)存的1/3專(zhuān)用于每個(gè)插槽。分配資源意味著子任務(wù)不會(huì)與其他作業(yè)的子任務(wù)競(jìng)爭(zhēng)托管內(nèi)存,而是具有一定數(shù)量的保留托管內(nèi)存。請(qǐng)注意,此處未發(fā)生任何CPU隔離。當(dāng)前插槽僅將任務(wù)的托管內(nèi)存分開(kāi)。
對(duì)于分布式執(zhí)行,F(xiàn)link將操作符子任務(wù)連接到一起,形成多個(gè)任務(wù)。每個(gè)任務(wù)由一個(gè)線程執(zhí)行。將操作員鏈接到任務(wù)是一個(gè)有用的優(yōu)化:它減少了線程到線程的切換和緩沖的開(kāi)銷(xiāo),并增加了總體吞吐量,同時(shí)減少了延遲。
默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享插槽,即使它們是不同任務(wù)的子任務(wù)也是如此,只要它們來(lái)自同一作業(yè)即可。結(jié)果是一個(gè)插槽可以容納整個(gè)作業(yè)管道。
默認(rèn)任務(wù)鏈?zhǔn)情_(kāi)啟的。


E. 算子、任務(wù)、處理槽

F. Job 狀態(tài)變化

G. Task 狀態(tài)變化

五. Execution Plan
- 獲取執(zhí)行計(jì)劃JSON:env.getExecutionPlan()
- 可視化執(zhí)行計(jì)劃:https://flink.apache.org/visualizer/
六
六. DataStream API 與DataSet API區(qū)別
DataSet API
- 批式處理,其接口封裝類(lèi)似于Spark的Dataset,支持豐富的函數(shù)操作,比如map/fliter/join/cogroup等
- 數(shù)據(jù)源創(chuàng)建初始數(shù)據(jù)集,例如來(lái)自文件或Java集合等靜態(tài)數(shù)據(jù)
- 所有的操作為Operator的子類(lèi),實(shí)現(xiàn)具體邏輯,比如Join邏輯是在JoinOperator中實(shí)現(xiàn)
- Dataset的實(shí)現(xiàn)在flink-javamodule中
DataStram API
- 流式處理,其結(jié)構(gòu)封裝實(shí)現(xiàn)輸入流的處理,其也實(shí)現(xiàn)了豐富的函數(shù)支持
- 所有的操作為StreamOperator的子類(lèi),實(shí)現(xiàn)具體邏輯,比如Join邏輯是在IntervalJoinOperator中實(shí)現(xiàn)的
- DataStream的實(shí)現(xiàn)在flink-streaming-java中
七. Flink 時(shí)間語(yǔ)義
A. Time時(shí)間
- Event Time:事件時(shí)間戳可以從每個(gè)event中提取。在事件時(shí)間,時(shí)間的進(jìn)展取決于數(shù)據(jù),而不取決于任何掛鐘。事件時(shí)間程序必須指定如何生成事件時(shí)間水印,這是在事件時(shí)間中發(fā)出進(jìn)展信號(hào)的機(jī)制。
- Ingestion Time:攝入時(shí)間是事件進(jìn)入Flink的時(shí)間。在內(nèi)部,攝取時(shí)間與事件時(shí)間很相似,但是具有自動(dòng)時(shí)間戳分配和自動(dòng)水印生成。
- Processing Time:處理時(shí)間是指執(zhí)行相應(yīng)操作(算子)的機(jī)器的系統(tǒng)時(shí)間。它提供了最佳的性能和最低的延遲。與事件時(shí)間相比,攝取時(shí)間程序不能處理任何無(wú)序的事件或延遲的數(shù)據(jù),但程序不必指定如何生成水印。
B. 事件時(shí)間處理
- Timestamps
- Flink事件時(shí)間流應(yīng)用程序處理的所有記錄都必須帶有時(shí)間戳。
- 隨著流的前進(jìn),流記錄的時(shí)間戳大致是升序?;旧显谒袑?shí)際案列中時(shí)間戳都會(huì)出現(xiàn)一定程度的亂序。
- 當(dāng)Flink以事件時(shí)間模式處理數(shù)據(jù)流時(shí),它基于記錄的時(shí)間戳評(píng)估基于時(shí)間的算子(eg. 窗口算子,會(huì)在事件時(shí)間超過(guò)窗口結(jié)束邊界時(shí)觸發(fā)計(jì)算推動(dòng)進(jìn)度前進(jìn))。Flink將時(shí)間戳編碼為16字節(jié)長(zhǎng)的值,并將它們作為元數(shù)據(jù)附加到記錄中。
- Watermarks
- Watermarks 是一種全局進(jìn)度度量,它指示我們確信不會(huì)再出現(xiàn)延遲事件的時(shí)間點(diǎn)。即:Watermarks 時(shí)間 T < 接下來(lái)接收到的事件時(shí)間T'
- Watermarks 允許我們?cè)诮Y(jié)果的精確性和延遲之間做出取舍。
- Watermarks 必須是單調(diào)遞增的。
- 對(duì)于事件時(shí)間窗口和處理無(wú)序事件的算子來(lái)說(shuō),Watermarks 都是必不可少的。一旦接收到 Watermarks,算子就會(huì)被通知某個(gè)時(shí)間間隔內(nèi)的所有時(shí)間戳都已被觀察到,并且要么觸發(fā)計(jì)算,要么接收事件。
C. 分配時(shí)間戳與生成水位線
- 水位線用于告訴算子不必再等待那些時(shí)間戳小于或等于水位線的事件。
- 水位線和時(shí)間戳都是通過(guò)自1970-01-01 00:00:00 以來(lái)的毫秒數(shù)指定。
- 通過(guò)SourceFunction分配時(shí)間戳及生成水位線
- 時(shí)間戳分配及水位線生成器
- 周期性水位分配器 : AssignerWithPeriodicWatermarks (默認(rèn) 200ms)
- AscendingTimestampExtractor:?jiǎn)握{(diào)遞
- BoundedOutOfOrdernessTimestampExtractor(maxOutOfOrderness):最大允許的亂序時(shí)間
- 定點(diǎn)水位分配器: AssignerWithPunctuatedWatermarks
- 周期性水位分配器 : AssignerWithPeriodicWatermarks (默認(rèn) 200ms)
PS: 時(shí)間蹉分配與水位線的生成盡可能的接近數(shù)據(jù)源。如果某些初始化的過(guò)濾或者其他轉(zhuǎn)換操作不會(huì)引起元素的重新分發(fā),可以考慮在分配時(shí)間戳之前使用它。重新分發(fā)可能會(huì)導(dǎo)致數(shù)據(jù)亂序 (keyBy())。
D. 多輸入流Watermark
一個(gè)任務(wù)為它每個(gè)輸入分區(qū)都維護(hù)一個(gè)分區(qū)水位線。當(dāng)收到某個(gè)分區(qū)傳來(lái)的水位線后,會(huì)以當(dāng)前值和接收值較大的去更新相應(yīng)分區(qū)水位線值。然后,任務(wù)會(huì)把事件時(shí)間時(shí)鐘調(diào)整為所有分區(qū)水位線中最小的值。
對(duì)于有多條輸入流的算子(union, coFlatMap),它們的任務(wù)也是利用全部分區(qū)最小值來(lái)計(jì)算事件時(shí)間時(shí)鐘的。
E. 處理遲到數(shù)據(jù)
- 遲到是指元素到達(dá)算子后,本應(yīng)參加與貢獻(xiàn)的算子已經(jīng)計(jì)算完成。在事件時(shí)間窗口算子的環(huán)境下,如果事件時(shí)間到達(dá)算子時(shí)窗口分配器為其分配的窗口已經(jīng)因?yàn)樗阕铀痪€超過(guò)了它的結(jié)束時(shí)間而計(jì)算完畢,那么事件就是遲到的。
- 通過(guò)比較事件時(shí)間戳和當(dāng)前水位線來(lái)識(shí)別遲到事件。
- 丟棄事件:事件時(shí)間窗口默認(rèn)行為
- 重定向遲到事件:sideOutputLateData(OutputTag<T> outputTag) ;getSideOutput(OutputTag<T> outputTag)
- 基于遲到事件更新結(jié)果:對(duì)不完整的結(jié)果進(jìn)行重新計(jì)算并發(fā)出更新
- 算子中保存用于再次計(jì)算結(jié)果的狀態(tài)
- 受結(jié)果更新影響的下游算子或者外部系統(tǒng)能夠處理更新
- 指定延遲容忍度:allowedLateness, 窗口算子在水位線超過(guò)窗口結(jié)束時(shí)間戳之后不會(huì)立即刪除窗口,而是會(huì)將窗口繼續(xù)保留該延遲容忍度時(shí)間。在這段額外時(shí)間內(nèi)到達(dá)的遲到元素會(huì)像按時(shí)到達(dá)的元素一樣交給觸發(fā)器處理。水位線超過(guò)窗口時(shí)間加延遲容忍度間隔,窗口才會(huì)被刪除,此后所有的遲到數(shù)據(jù)都會(huì)被丟棄。
水位線、延遲及完整性
- 水位線用于平衡延遲和結(jié)果的完整性
- 水位線遠(yuǎn)落后于已處理記錄的時(shí)間蹉,導(dǎo)致延遲增大,數(shù)據(jù)準(zhǔn)確性較高。
- 生成的水位線過(guò)于緊迫,水位線可能大于部分后來(lái)的數(shù)據(jù)的時(shí)間蹉,計(jì)算可能已經(jīng)觸發(fā),導(dǎo)致數(shù)據(jù)不夠完整和精確,但具體較低的延遲。
八. flink 消息投遞機(jī)制
At most once 至多一次:隨意丟棄事件,沒(méi)有保障
At least once 至少一次:不丟失事件,事件可能會(huì)被處理多次。源頭會(huì)在緩沖區(qū)需要支持重放
Exactly once 精確一次:不丟失事件,每個(gè)事件對(duì)于內(nèi)部狀態(tài)的更新都只有一次。數(shù)據(jù)重放。(檢查點(diǎn))
九. checkPoint檢查點(diǎn)和savePoint保存點(diǎn)
Flink 狀態(tài)
只有在每一個(gè)單獨(dú)的事件上進(jìn)行轉(zhuǎn)換操作的應(yīng)用才不需要狀態(tài),換言之,每一個(gè)具有一定復(fù)雜度的流處理應(yīng)用都是有狀態(tài)的。任何運(yùn)行基本業(yè)務(wù)邏輯的流處理應(yīng)用都需要在一定時(shí)間內(nèi)存儲(chǔ)所接收的事件或中間結(jié)果,以供后續(xù)的某個(gè)時(shí)間點(diǎn)(例如收到下一個(gè)事件或者經(jīng)過(guò)一段特定時(shí)間)進(jìn)行訪問(wèn)并進(jìn)行后續(xù)處理。
checkPoint
- 周期性生成,根據(jù)配置的策略自動(dòng)丟棄
- 應(yīng)用被手動(dòng)停止后,檢查點(diǎn)會(huì)被刪除。
- 可以通過(guò)配置來(lái)保留 checkpoint,這些被保留的 checkpoint 在作業(yè)失敗或取 消時(shí)不會(huì)被清除
- 保證應(yīng)用在出現(xiàn)故障時(shí)可以順利重啟。
savePoint
- 用戶(hù)觸發(fā),也可周期性生成
- Flink 不會(huì)自動(dòng)清理
- 擴(kuò)縮容,遷移,暫停應(yīng)用,應(yīng)用狀態(tài)歸檔,回到過(guò)去
Flink checkPoint算法
- Flink 使用 Chandy-Lamport algorithm 算法的一種變體,稱(chēng)為異步 barrier 快照(asynchronous barrier snapshotting)
- 當(dāng) checkpoint coordinator(job manager 的一部分)指示 task manager 開(kāi)始 checkpoint 時(shí),它會(huì)讓所有 sources 記錄它們的偏移量,并將編號(hào)的 checkpoint barriers 插入到它們的流中。這些 barriers 流經(jīng) job graph,標(biāo)注每個(gè) checkpoint 前后的流部分。
image.png
image.png
checkPoint對(duì)性能的影響 - Flink 檢查點(diǎn)算法能夠在不停止整個(gè)應(yīng)用的情況下為流式應(yīng)用生成一致的分布式檢查點(diǎn),但是會(huì)增加應(yīng)用的處理延遲
- 將狀態(tài)存入檢查點(diǎn)的過(guò)程中,會(huì)處于阻塞狀態(tài),此時(shí)的輸入會(huì)進(jìn)入緩沖區(qū),由于狀態(tài)可能非常大,還可能通過(guò)網(wǎng)絡(luò)寫(xiě)入遠(yuǎn)程存儲(chǔ)系統(tǒng),因此會(huì)持續(xù)很長(zhǎng)一段時(shí)間。
- 檢查點(diǎn)由狀態(tài)后端負(fù)責(zé)生成,具體的拷貝過(guò)程由狀態(tài)后端決定。文件系統(tǒng)狀態(tài)后端和RocksDB狀態(tài)后端支持異步生成檢查點(diǎn)。當(dāng)狀態(tài)后端將當(dāng)前狀態(tài)的本地拷貝完成后,任務(wù)就可以繼續(xù)執(zhí)行。后臺(tái)進(jìn)程會(huì)異步的將本地狀態(tài)快照拷貝到遠(yuǎn)程,然后在完成檢查點(diǎn)后通知任務(wù)。異步生成檢查點(diǎn)可以有效的降低任務(wù)恢復(fù)數(shù)據(jù)處理所需要的時(shí)間。 RocksDB還可以進(jìn)行增量生成檢查點(diǎn),可以降低傳輸?shù)臄?shù)據(jù)量。
- 對(duì)于那些需要極低延遲且能容忍至少一次狀態(tài)保障的應(yīng)用,可以配置讓Flink在分隔符對(duì)齊的過(guò)程中不緩存那些已經(jīng)收到分隔符所對(duì)應(yīng)分區(qū)的記錄,而是直接處理他。
Task 故障恢復(fù)之重啟策略
- 固定間隔策略 (Fixed delay)
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 3
- restart-strategy.fixed-delay.delay: 10 s
- 失敗率策略 (Failure rate)
- restart-strategy: failure-rate
- restart-strategy.failure-rate.max-failures-per-interval: 3
- restart-strategy.failure-rate.failure-rate-interval: 5 min
- restart-strategy.failure-rate.delay: 10 s
- 回退重啟策略(Fallback Restart)
- 使用集群定義的重啟策略。
- 無(wú)重啟 (No restart)
- restart-strategy: none








