flink 的學(xué)習(xí)與介紹

一. 什么是Flink

  1. Apache Flink 是一個(gè)分布式流處理引擎, 提供了直觀且極豐富表達(dá)力的Api來(lái)實(shí)現(xiàn)有狀態(tài)的流式處理。
  2. 用于無(wú)界和有界數(shù)據(jù)流上的有狀態(tài)計(jì)算。Flink被設(shè)計(jì)為在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以?xún)?nèi)存中的速度和任何規(guī)模執(zhí)行計(jì)算。


    image.png

二.Flink應(yīng)用場(chǎng)景

A. 事件驅(qū)動(dòng)型應(yīng)用
  1. 事件驅(qū)動(dòng)型應(yīng)用是一類(lèi)具有狀態(tài)的應(yīng)用,它從一個(gè)或多個(gè)事件流提取數(shù)據(jù),并根據(jù)到來(lái)的事件觸發(fā)計(jì)算、狀態(tài)更新或其他外部動(dòng)作。
  2. 事件驅(qū)動(dòng)型應(yīng)用無(wú)須查詢(xún)遠(yuǎn)程數(shù)據(jù)庫(kù),本地?cái)?shù)據(jù)訪問(wèn)使得它具有更高的吞吐和更低的延遲。


    image.png
B. 數(shù)據(jù)分析應(yīng)用
  1. 數(shù)據(jù)分析任務(wù)需要從原始數(shù)據(jù)中提取有價(jià)值的信息和指標(biāo)。
  2. 傳統(tǒng)的分析方式通常是利用批查詢(xún),或?qū)⑹录涗浵聛?lái)并基于此有限數(shù)據(jù)集構(gòu)建應(yīng)用來(lái)完成。為了得到最新數(shù)據(jù)的分析結(jié)果,必須先將它們加入分析數(shù)據(jù)集并重新執(zhí)行查詢(xún)或運(yùn)行應(yīng)用,隨后將結(jié)果寫(xiě)入存儲(chǔ)系統(tǒng)或生成報(bào)告。
  3. Flink可以實(shí)時(shí)地進(jìn)行數(shù)據(jù)分析。和傳統(tǒng)模式下讀取有限數(shù)據(jù)集不同,流式查詢(xún)或應(yīng)用會(huì)接入實(shí)時(shí)事件流,并隨著事件消費(fèi)持續(xù)產(chǎn)生和更新結(jié)果。這些結(jié)果數(shù)據(jù)可能會(huì)寫(xiě)入外部數(shù)據(jù)庫(kù)系統(tǒng)或以?xún)?nèi)部狀態(tài)的形式維護(hù)。儀表展示應(yīng)用可以相應(yīng)地從外部數(shù)據(jù)庫(kù)讀取數(shù)據(jù)或直接查詢(xún)應(yīng)用的內(nèi)部狀態(tài)。
  4. 流式分析省掉了周期性的數(shù)據(jù)導(dǎo)入和查詢(xún)過(guò)程,因此從事件中獲取指標(biāo)的延遲更低。


    image.png
C. 數(shù)據(jù)管道應(yīng)用
  1. 提取-轉(zhuǎn)換-加載(ETL)是一種在存儲(chǔ)系統(tǒng)之間進(jìn)行數(shù)據(jù)轉(zhuǎn)換和遷移的常用方法。ETL 作業(yè)通常會(huì)周期性地觸發(fā),將數(shù)據(jù)從事務(wù)型數(shù)據(jù)庫(kù)拷貝到分析型數(shù)據(jù)庫(kù)或數(shù)據(jù)倉(cāng)庫(kù)。
  2. 數(shù)據(jù)管道和 ETL 作業(yè)的用途相似,都可以轉(zhuǎn)換、豐富數(shù)據(jù),并將其從某個(gè)存儲(chǔ)系統(tǒng)移動(dòng)到另一個(gè)。但數(shù)據(jù)管道是以持續(xù)流模式運(yùn)行,而非周期性觸發(fā)。因此它支持從一個(gè)不斷生成數(shù)據(jù)的源頭讀取記錄,并將它們以低延遲移動(dòng)到終點(diǎn)。
  3. 和周期性 ETL 作業(yè)相比,持續(xù)數(shù)據(jù)管道可以明顯降低將數(shù)據(jù)移動(dòng)到目的端的延遲。此外,由于它能夠持續(xù)消費(fèi)和發(fā)送數(shù)據(jù),因此用途更廣,支持用例更多。


    image.png

三. Flink特點(diǎn)

  1. Deploy Applications Anywhere
    集成了所有常見(jiàn)的集群資源管理器,如Hadoop YARN、Apache Mesos 和 Kubernetes,也可作為一個(gè)獨(dú)立的集群運(yùn)行。
  2. Run Applications at any Scale
    設(shè)計(jì)目的是在任何規(guī)模上運(yùn)行有狀態(tài)流應(yīng)用程序。應(yīng)用程序可能被并行化為數(shù)千個(gè)任務(wù),這些任務(wù)分布在集群中并同時(shí)執(zhí)行。所以應(yīng)用程序能夠充分利用無(wú)盡的 CPU、內(nèi)存、磁盤(pán)和網(wǎng)絡(luò) IO。而且 Flink 很容易維護(hù)非常大的應(yīng)用程序狀態(tài)。其異步和增量的檢查點(diǎn)算法對(duì)處理延遲產(chǎn)生最小的影響,同時(shí)保證精確一次狀態(tài)的一致性。
  3. Leverage In-Memory Performance (利用內(nèi)存性能)
    有狀態(tài)的 Flink 程序針對(duì)本地狀態(tài)訪問(wèn)進(jìn)行了優(yōu)化。任務(wù)的狀態(tài)始終保留在內(nèi)存中,如果狀態(tài)大小超過(guò)可用內(nèi)存,則會(huì)保存在能高效訪問(wèn)的磁盤(pán)數(shù)據(jù)結(jié)構(gòu)中。任務(wù)通過(guò)訪問(wèn)本地(通常在內(nèi)存中)狀態(tài)來(lái)進(jìn)行所有的計(jì)算,從而產(chǎn)生非常低的處理延遲。Flink 通過(guò)定期和異步地對(duì)本地狀態(tài)進(jìn)行持久化存儲(chǔ)來(lái)保證故障場(chǎng)景下精確一次的狀態(tài)一致性。


    image.png

四. Flink 組件

A. Flink 基本組件
  1. Dispatcher:負(fù)責(zé)接收用戶(hù)提供的作業(yè),并且負(fù)責(zé)為這個(gè)新提交的作業(yè)拉起一個(gè)新的 JobManager 服務(wù)
  2. ResourceManager:負(fù)責(zé)資源的管理,在整個(gè) Flink 集群中只有一個(gè) ResourceManager,資源相關(guān)的內(nèi)容都由這個(gè)服務(wù)負(fù)責(zé) (TaskManager 處理槽)
  3. JobManager:控控制單個(gè)應(yīng)用的執(zhí)行,JobManager將JobGraph 轉(zhuǎn)換為 ExecutionGraph. JobManager向ResourceManager申請(qǐng)任務(wù)槽,一旦收到足夠數(shù)量的TaskManager任務(wù)槽,就會(huì)將ExecutionGraph任務(wù)分發(fā)給TaskManager。 JobManager 還要負(fù)責(zé)所有集中需要協(xié)調(diào)的工作,如創(chuàng)建檢查點(diǎn)。
  4. TaskManager:是JVM工作進(jìn)程,每個(gè)TaskManager提供一定數(shù)量的工作槽


    image.png
B. JobManager 數(shù)據(jù)結(jié)構(gòu)
  1. 在作業(yè)執(zhí)行期間,JobManager 會(huì)持續(xù)跟蹤各個(gè) task,決定何時(shí)調(diào)度下一個(gè)或一組 task,處理已完成的 task 或執(zhí)行失敗的情況
  2. JobManager 會(huì)接收到一個(gè) JobGraph,用來(lái)描述由多個(gè)算子頂點(diǎn) (JobVertex) 組成的數(shù)據(jù)流圖,以及中間結(jié)果數(shù)據(jù) (IntermediateDataSet)。
  3. JobManager 會(huì)將 JobGraph 轉(zhuǎn)成 ExecutionGraph。
  4. ExecutionVertex 會(huì)跟蹤子 task 的執(zhí)行狀態(tài)。ExecutionJobVertex 持有同一個(gè) JobVertext 的所有 ExecutionVertex ,并跟蹤整個(gè)算子的運(yùn)行狀態(tài)。
image.png
C. JobManager & TaskManager
image.png
D. Task slot

Task slot: 用于定義執(zhí)行資源, 一個(gè)TaskManager 有一個(gè)到多個(gè)Task slot
每個(gè)Task slot代表TaskManager的資源的固定子集。例如,具有三個(gè)插槽的TaskManager會(huì)將其托管內(nèi)存的1/3專(zhuān)用于每個(gè)插槽。分配資源意味著子任務(wù)不會(huì)與其他作業(yè)的子任務(wù)競(jìng)爭(zhēng)托管內(nèi)存,而是具有一定數(shù)量的保留托管內(nèi)存。請(qǐng)注意,此處未發(fā)生任何CPU隔離。當(dāng)前插槽僅將任務(wù)的托管內(nèi)存分開(kāi)。
對(duì)于分布式執(zhí)行,F(xiàn)link將操作符子任務(wù)連接到一起,形成多個(gè)任務(wù)。每個(gè)任務(wù)由一個(gè)線程執(zhí)行。將操作員鏈接到任務(wù)是一個(gè)有用的優(yōu)化:它減少了線程到線程的切換和緩沖的開(kāi)銷(xiāo),并增加了總體吞吐量,同時(shí)減少了延遲。
默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享插槽,即使它們是不同任務(wù)的子任務(wù)也是如此,只要它們來(lái)自同一作業(yè)即可。結(jié)果是一個(gè)插槽可以容納整個(gè)作業(yè)管道。
默認(rèn)任務(wù)鏈?zhǔn)情_(kāi)啟的。


image.png

image.png
E. 算子、任務(wù)、處理槽
image.png
F. Job 狀態(tài)變化
image.png
G. Task 狀態(tài)變化
image.png

五. Execution Plan

  1. 獲取執(zhí)行計(jì)劃JSON:env.getExecutionPlan()
  2. 可視化執(zhí)行計(jì)劃:https://flink.apache.org/visualizer/

六. DataStream API 與DataSet API區(qū)別

DataSet API
  1. 批式處理,其接口封裝類(lèi)似于Spark的Dataset,支持豐富的函數(shù)操作,比如map/fliter/join/cogroup等
  2. 數(shù)據(jù)源創(chuàng)建初始數(shù)據(jù)集,例如來(lái)自文件或Java集合等靜態(tài)數(shù)據(jù)
  3. 所有的操作為Operator的子類(lèi),實(shí)現(xiàn)具體邏輯,比如Join邏輯是在JoinOperator中實(shí)現(xiàn)
  4. Dataset的實(shí)現(xiàn)在flink-javamodule中
DataStram API
  1. 流式處理,其結(jié)構(gòu)封裝實(shí)現(xiàn)輸入流的處理,其也實(shí)現(xiàn)了豐富的函數(shù)支持
  2. 所有的操作為StreamOperator的子類(lèi),實(shí)現(xiàn)具體邏輯,比如Join邏輯是在IntervalJoinOperator中實(shí)現(xiàn)的
  3. DataStream的實(shí)現(xiàn)在flink-streaming-java中

七. Flink 時(shí)間語(yǔ)義

A. Time時(shí)間
  • Event Time:事件時(shí)間戳可以從每個(gè)event中提取。在事件時(shí)間,時(shí)間的進(jìn)展取決于數(shù)據(jù),而不取決于任何掛鐘。事件時(shí)間程序必須指定如何生成事件時(shí)間水印,這是在事件時(shí)間中發(fā)出進(jìn)展信號(hào)的機(jī)制。
  • Ingestion Time:攝入時(shí)間是事件進(jìn)入Flink的時(shí)間。在內(nèi)部,攝取時(shí)間與事件時(shí)間很相似,但是具有自動(dòng)時(shí)間戳分配和自動(dòng)水印生成。
  • Processing Time:處理時(shí)間是指執(zhí)行相應(yīng)操作(算子)的機(jī)器的系統(tǒng)時(shí)間。它提供了最佳的性能和最低的延遲。與事件時(shí)間相比,攝取時(shí)間程序不能處理任何無(wú)序的事件或延遲的數(shù)據(jù),但程序不必指定如何生成水印。
B. 事件時(shí)間處理
  1. Timestamps
  • Flink事件時(shí)間流應(yīng)用程序處理的所有記錄都必須帶有時(shí)間戳。
  • 隨著流的前進(jìn),流記錄的時(shí)間戳大致是升序?;旧显谒袑?shí)際案列中時(shí)間戳都會(huì)出現(xiàn)一定程度的亂序。
  • 當(dāng)Flink以事件時(shí)間模式處理數(shù)據(jù)流時(shí),它基于記錄的時(shí)間戳評(píng)估基于時(shí)間的算子(eg. 窗口算子,會(huì)在事件時(shí)間超過(guò)窗口結(jié)束邊界時(shí)觸發(fā)計(jì)算推動(dòng)進(jìn)度前進(jìn))。Flink將時(shí)間戳編碼為16字節(jié)長(zhǎng)的值,并將它們作為元數(shù)據(jù)附加到記錄中。
  1. Watermarks
  • Watermarks 是一種全局進(jìn)度度量,它指示我們確信不會(huì)再出現(xiàn)延遲事件的時(shí)間點(diǎn)。即:Watermarks 時(shí)間 T < 接下來(lái)接收到的事件時(shí)間T'
  • Watermarks 允許我們?cè)诮Y(jié)果的精確性和延遲之間做出取舍。
  • Watermarks 必須是單調(diào)遞增的。
  • 對(duì)于事件時(shí)間窗口和處理無(wú)序事件的算子來(lái)說(shuō),Watermarks 都是必不可少的。一旦接收到 Watermarks,算子就會(huì)被通知某個(gè)時(shí)間間隔內(nèi)的所有時(shí)間戳都已被觀察到,并且要么觸發(fā)計(jì)算,要么接收事件。
C. 分配時(shí)間戳與生成水位線
  • 水位線用于告訴算子不必再等待那些時(shí)間戳小于或等于水位線的事件。
  • 水位線和時(shí)間戳都是通過(guò)自1970-01-01 00:00:00 以來(lái)的毫秒數(shù)指定。
  • 通過(guò)SourceFunction分配時(shí)間戳及生成水位線
  • 時(shí)間戳分配及水位線生成器
    • 周期性水位分配器 : AssignerWithPeriodicWatermarks (默認(rèn) 200ms)
      • AscendingTimestampExtractor:?jiǎn)握{(diào)遞
      • BoundedOutOfOrdernessTimestampExtractor(maxOutOfOrderness):最大允許的亂序時(shí)間
    • 定點(diǎn)水位分配器: AssignerWithPunctuatedWatermarks

PS: 時(shí)間蹉分配與水位線的生成盡可能的接近數(shù)據(jù)源。如果某些初始化的過(guò)濾或者其他轉(zhuǎn)換操作不會(huì)引起元素的重新分發(fā),可以考慮在分配時(shí)間戳之前使用它。重新分發(fā)可能會(huì)導(dǎo)致數(shù)據(jù)亂序 (keyBy())。

D. 多輸入流Watermark

一個(gè)任務(wù)為它每個(gè)輸入分區(qū)都維護(hù)一個(gè)分區(qū)水位線。當(dāng)收到某個(gè)分區(qū)傳來(lái)的水位線后,會(huì)以當(dāng)前值和接收值較大的去更新相應(yīng)分區(qū)水位線值。然后,任務(wù)會(huì)把事件時(shí)間時(shí)鐘調(diào)整為所有分區(qū)水位線中最小的值。
對(duì)于有多條輸入流的算子(union, coFlatMap),它們的任務(wù)也是利用全部分區(qū)最小值來(lái)計(jì)算事件時(shí)間時(shí)鐘的。

E. 處理遲到數(shù)據(jù)
  • 遲到是指元素到達(dá)算子后,本應(yīng)參加與貢獻(xiàn)的算子已經(jīng)計(jì)算完成。在事件時(shí)間窗口算子的環(huán)境下,如果事件時(shí)間到達(dá)算子時(shí)窗口分配器為其分配的窗口已經(jīng)因?yàn)樗阕铀痪€超過(guò)了它的結(jié)束時(shí)間而計(jì)算完畢,那么事件就是遲到的。
  • 通過(guò)比較事件時(shí)間戳和當(dāng)前水位線來(lái)識(shí)別遲到事件。
  1. 丟棄事件:事件時(shí)間窗口默認(rèn)行為
  2. 重定向遲到事件:sideOutputLateData(OutputTag<T> outputTag) ;getSideOutput(OutputTag<T> outputTag)
  3. 基于遲到事件更新結(jié)果:對(duì)不完整的結(jié)果進(jìn)行重新計(jì)算并發(fā)出更新
    • 算子中保存用于再次計(jì)算結(jié)果的狀態(tài)
    • 受結(jié)果更新影響的下游算子或者外部系統(tǒng)能夠處理更新
  4. 指定延遲容忍度:allowedLateness, 窗口算子在水位線超過(guò)窗口結(jié)束時(shí)間戳之后不會(huì)立即刪除窗口,而是會(huì)將窗口繼續(xù)保留該延遲容忍度時(shí)間。在這段額外時(shí)間內(nèi)到達(dá)的遲到元素會(huì)像按時(shí)到達(dá)的元素一樣交給觸發(fā)器處理。水位線超過(guò)窗口時(shí)間加延遲容忍度間隔,窗口才會(huì)被刪除,此后所有的遲到數(shù)據(jù)都會(huì)被丟棄。

水位線、延遲及完整性

  • 水位線用于平衡延遲和結(jié)果的完整性
  • 水位線遠(yuǎn)落后于已處理記錄的時(shí)間蹉,導(dǎo)致延遲增大,數(shù)據(jù)準(zhǔn)確性較高。
  • 生成的水位線過(guò)于緊迫,水位線可能大于部分后來(lái)的數(shù)據(jù)的時(shí)間蹉,計(jì)算可能已經(jīng)觸發(fā),導(dǎo)致數(shù)據(jù)不夠完整和精確,但具體較低的延遲。

八. flink 消息投遞機(jī)制

At most once 至多一次:隨意丟棄事件,沒(méi)有保障
At least once 至少一次:不丟失事件,事件可能會(huì)被處理多次。源頭會(huì)在緩沖區(qū)需要支持重放
Exactly once 精確一次:不丟失事件,每個(gè)事件對(duì)于內(nèi)部狀態(tài)的更新都只有一次。數(shù)據(jù)重放。(檢查點(diǎn))

九. checkPoint檢查點(diǎn)和savePoint保存點(diǎn)

Flink 狀態(tài)
只有在每一個(gè)單獨(dú)的事件上進(jìn)行轉(zhuǎn)換操作的應(yīng)用才不需要狀態(tài),換言之,每一個(gè)具有一定復(fù)雜度的流處理應(yīng)用都是有狀態(tài)的。任何運(yùn)行基本業(yè)務(wù)邏輯的流處理應(yīng)用都需要在一定時(shí)間內(nèi)存儲(chǔ)所接收的事件或中間結(jié)果,以供后續(xù)的某個(gè)時(shí)間點(diǎn)(例如收到下一個(gè)事件或者經(jīng)過(guò)一段特定時(shí)間)進(jìn)行訪問(wèn)并進(jìn)行后續(xù)處理。

checkPoint

  • 周期性生成,根據(jù)配置的策略自動(dòng)丟棄
  • 應(yīng)用被手動(dòng)停止后,檢查點(diǎn)會(huì)被刪除。
  • 可以通過(guò)配置來(lái)保留 checkpoint,這些被保留的 checkpoint 在作業(yè)失敗或取 消時(shí)不會(huì)被清除
  • 保證應(yīng)用在出現(xiàn)故障時(shí)可以順利重啟。

savePoint

  • 用戶(hù)觸發(fā),也可周期性生成
  • Flink 不會(huì)自動(dòng)清理
  • 擴(kuò)縮容,遷移,暫停應(yīng)用,應(yīng)用狀態(tài)歸檔,回到過(guò)去

Flink checkPoint算法

  • Flink 使用 Chandy-Lamport algorithm 算法的一種變體,稱(chēng)為異步 barrier 快照(asynchronous barrier snapshotting)
  • 當(dāng) checkpoint coordinator(job manager 的一部分)指示 task manager 開(kāi)始 checkpoint 時(shí),它會(huì)讓所有 sources 記錄它們的偏移量,并將編號(hào)的 checkpoint barriers 插入到它們的流中。這些 barriers 流經(jīng) job graph,標(biāo)注每個(gè) checkpoint 前后的流部分。
    image.png

    image.png

    checkPoint對(duì)性能的影響
  • Flink 檢查點(diǎn)算法能夠在不停止整個(gè)應(yīng)用的情況下為流式應(yīng)用生成一致的分布式檢查點(diǎn),但是會(huì)增加應(yīng)用的處理延遲
  • 將狀態(tài)存入檢查點(diǎn)的過(guò)程中,會(huì)處于阻塞狀態(tài),此時(shí)的輸入會(huì)進(jìn)入緩沖區(qū),由于狀態(tài)可能非常大,還可能通過(guò)網(wǎng)絡(luò)寫(xiě)入遠(yuǎn)程存儲(chǔ)系統(tǒng),因此會(huì)持續(xù)很長(zhǎng)一段時(shí)間。
  • 檢查點(diǎn)由狀態(tài)后端負(fù)責(zé)生成,具體的拷貝過(guò)程由狀態(tài)后端決定。文件系統(tǒng)狀態(tài)后端和RocksDB狀態(tài)后端支持異步生成檢查點(diǎn)。當(dāng)狀態(tài)后端將當(dāng)前狀態(tài)的本地拷貝完成后,任務(wù)就可以繼續(xù)執(zhí)行。后臺(tái)進(jìn)程會(huì)異步的將本地狀態(tài)快照拷貝到遠(yuǎn)程,然后在完成檢查點(diǎn)后通知任務(wù)。異步生成檢查點(diǎn)可以有效的降低任務(wù)恢復(fù)數(shù)據(jù)處理所需要的時(shí)間。 RocksDB還可以進(jìn)行增量生成檢查點(diǎn),可以降低傳輸?shù)臄?shù)據(jù)量。
  • 對(duì)于那些需要極低延遲且能容忍至少一次狀態(tài)保障的應(yīng)用,可以配置讓Flink在分隔符對(duì)齊的過(guò)程中不緩存那些已經(jīng)收到分隔符所對(duì)應(yīng)分區(qū)的記錄,而是直接處理他。

Task 故障恢復(fù)之重啟策略

  1. 固定間隔策略 (Fixed delay)
  • restart-strategy: fixed-delay
  • restart-strategy.fixed-delay.attempts: 3
  • restart-strategy.fixed-delay.delay: 10 s
  1. 失敗率策略 (Failure rate)
  • restart-strategy: failure-rate
  • restart-strategy.failure-rate.max-failures-per-interval: 3
  • restart-strategy.failure-rate.failure-rate-interval: 5 min
  • restart-strategy.failure-rate.delay: 10 s
  1. 回退重啟策略(Fallback Restart)
  • 使用集群定義的重啟策略。
  1. 無(wú)重啟 (No restart)
  • restart-strategy: none
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容