基于Apache Flink的流處理

第1章 狀態(tài)化流處理概述。

1.1 傳統(tǒng)的數(shù)據(jù)處理框架

事務(wù)型處理

企業(yè)在日常業(yè)務(wù)運(yùn)營(yíng)過程中會(huì)用到各類基于web的應(yīng)用,通常是業(yè)務(wù)系統(tǒng),比如訂單、客戶系統(tǒng)等等,通常一個(gè)應(yīng)用對(duì)于1個(gè)或多個(gè)數(shù)據(jù)庫(kù),應(yīng)用通過執(zhí)行遠(yuǎn)程數(shù)據(jù)庫(kù)系統(tǒng)的事務(wù)來讀取或更新狀態(tài)。

分析型處理

存儲(chǔ)于不同事務(wù)類型數(shù)據(jù)系統(tǒng)中的數(shù)據(jù),可以為企業(yè)提供業(yè)務(wù)運(yùn)營(yíng)相關(guān)的分析見解,通常是將數(shù)據(jù)從業(yè)務(wù)系統(tǒng)的數(shù)據(jù)庫(kù)中復(fù)制到數(shù)倉(cāng),然后再進(jìn)行分析和查詢,這個(gè)過程稱為ETL。

ETL:向數(shù)倉(cāng)拷貝數(shù)據(jù)的過程,提取-轉(zhuǎn)換-加載(Extract-Transform-Load)。

1.2 歷史演變

Lambda架構(gòu)

Lambda架構(gòu)是一種用于處理大規(guī)模實(shí)時(shí)數(shù)據(jù)的架構(gòu)模式,它結(jié)合了批處理和流處理的優(yōu)勢(shì)。在Lambda架構(gòu)中,數(shù)據(jù)流被同時(shí)發(fā)送到批處理層和提速層,然后將它們的結(jié)果合并以提供全面的分析視圖。

Lambda架構(gòu)的核心組件包括:

  • 批處理層(Batch Layer):批處理層負(fù)責(zé)處理大規(guī)模的歷史數(shù)據(jù)。它使用批處理作業(yè)來處理存儲(chǔ)在持久化數(shù)據(jù)存儲(chǔ)中的數(shù)據(jù),生成批處理視圖。這些批處理作業(yè)可以使用分布式計(jì)算框架(如Hadoop MapReduce或Spark)來執(zhí)行。
  • 提速層(Speed Layer):速度層負(fù)責(zé)處理實(shí)時(shí)數(shù)據(jù)流。它使用流處理引擎(如Apache Flink、Apache Storm或Spark Streaming)來處理數(shù)據(jù)流,并生成實(shí)時(shí)視圖。速度層通常采用近似實(shí)時(shí)的處理方式,以保證低延遲和高吞吐量。
  • 服務(wù)層(Serving Layer):服務(wù)層負(fù)責(zé)合并批處理層和速度層的結(jié)果,提供一致的查詢接口。它將批處理視圖和實(shí)時(shí)視圖進(jìn)行合并,并將結(jié)果存儲(chǔ)在可查詢的數(shù)據(jù)存儲(chǔ)系統(tǒng)中(如HBase或Cassandra)。這樣,用戶可以通過查詢接口實(shí)時(shí)地獲取數(shù)據(jù)分析結(jié)果。

Lambda架構(gòu)的優(yōu)勢(shì)在于它能夠處理大規(guī)模的實(shí)時(shí)數(shù)據(jù),并提供實(shí)時(shí)和全面的數(shù)據(jù)分析視圖。批處理層處理歷史數(shù)據(jù),可以進(jìn)行復(fù)雜的分析和計(jì)算,而提速層處理實(shí)時(shí)數(shù)據(jù)流,可以提供近似實(shí)時(shí)的結(jié)果。通過將批處理層和提速層的結(jié)果合并,Lambda架構(gòu)能夠提供一致和全面的數(shù)據(jù)分析能力。

1.3 狀態(tài)化流處理

狀態(tài)化流處理是一種設(shè)計(jì)模式:

  • 在狀態(tài)化流處理中,"狀態(tài)化"指的是將數(shù)據(jù)處理過程中的狀態(tài)顯式地管理和維護(hù)。傳統(tǒng)的流處理模型是無狀態(tài)的,每個(gè)事件都獨(dú)立地處理,沒有記憶或跟蹤之前的事件。而在狀態(tài)化流處理中,系統(tǒng)會(huì)維護(hù)一個(gè)狀態(tài),用于存儲(chǔ)和更新事件流的上下文信息。
  • 狀態(tài)化流處理適用于需要考慮事件之間的關(guān)系和上下文的場(chǎng)景。通過維護(hù)狀態(tài),系統(tǒng)可以跟蹤和處理事件流中的狀態(tài)變化,從而更好地理解和處理數(shù)據(jù)。
  • 狀態(tài)可以是簡(jiǎn)單的變量,也可以是更復(fù)雜的數(shù)據(jù)結(jié)構(gòu),取決于具體的應(yīng)用場(chǎng)景。狀態(tài)可以用于聚合、過濾、轉(zhuǎn)換等操作,以及用于實(shí)時(shí)計(jì)算、模式檢測(cè)、窗口聚合等高級(jí)分析。
  • 通過狀態(tài)化流處理,我們可以實(shí)現(xiàn)更復(fù)雜和有狀態(tài)的數(shù)據(jù)處理邏輯,從而能夠處理更豐富的實(shí)時(shí)數(shù)據(jù)分析和應(yīng)用場(chǎng)景。

通過定期將應(yīng)用狀態(tài)的一致性檢查點(diǎn)寫入遠(yuǎn)程持久化存儲(chǔ)實(shí)現(xiàn)狀態(tài)維護(hù),事件日志負(fù)責(zé)存儲(chǔ)事件流并將其分布式化,由于日志是追加形式,故事件的順序不會(huì)因向消費(fèi)者發(fā)布而改變。在出現(xiàn)故障時(shí),F(xiàn)link以此來進(jìn)行失敗恢復(fù),此外還可應(yīng)用于應(yīng)用更新、Bug修復(fù)、結(jié)果修正、集群遷移或針對(duì)不同版本應(yīng)用執(zhí)行A/B測(cè)試。

1.4 有狀態(tài)的流處理應(yīng)用

  • 事件驅(qū)動(dòng)型,例如實(shí)時(shí)推薦、異常檢測(cè)、欺詐識(shí)別。
  • 數(shù)據(jù)管道,例如實(shí)時(shí)計(jì)算。
  • 流式分析,例如實(shí)時(shí)監(jiān)控系統(tǒng)可以對(duì)傳感器數(shù)據(jù)流進(jìn)行分析,以檢測(cè)異常情況或趨勢(shì)變化;實(shí)時(shí)廣告投放系統(tǒng)可以根據(jù)用戶行為實(shí)時(shí)調(diào)整廣告內(nèi)容。

1.5 Flink特性

  • 提供精確1次( exactly-once )的狀態(tài)一致性保障。
  • 在每秒處理數(shù)百萬(wàn)條事件的同時(shí)保持毫秒級(jí)延遲?;贔link應(yīng)用可以擴(kuò)展到數(shù)千核心之上。
  • 支持高可用性配置(無單點(diǎn)失效), Kubernetes、YARN、Apache Mesos緊密集成,快速故障恢復(fù),動(dòng)態(tài)擴(kuò)縮容作業(yè)等?;谏鲜鎏攸c(diǎn),它可以 24 小時(shí)運(yùn)行流式應(yīng)用,幾乎無須停機(jī)。
  • 允許在不丟失應(yīng)用狀態(tài)的前提下更新作業(yè)的程序代碼,或進(jìn)行跨Flink集群的作業(yè)遷移。
  • 提供了詳細(xì)、可自由定制的系統(tǒng)及應(yīng)用指標(biāo)( metrics )集合,用于提前定位和響應(yīng)問題。

第2章 流處理基礎(chǔ)

2.1 DataFlow圖

有向圖,每個(gè)節(jié)點(diǎn)都稱為算子,表示計(jì)算;邊表示數(shù)據(jù)依賴關(guān)系。沒有輸入端的算子稱為數(shù)據(jù)源,數(shù)據(jù)源可以從TCP套接字、文件、Kafka主題或傳感器數(shù)據(jù)接口中獲取數(shù)據(jù),將原始數(shù)據(jù)轉(zhuǎn)換成適合后續(xù)處理的格式;沒有輸出端的算子稱為數(shù)據(jù)匯,其寫入的目標(biāo)可以是文件、數(shù)據(jù)庫(kù)、消息隊(duì)列或監(jiān)控接口等。

數(shù)據(jù)并行:統(tǒng)一操作的多個(gè)任務(wù)執(zhí)行在不同的數(shù)據(jù)子集上。
任務(wù)并行:不同算子的任務(wù)(基于相同或不同的數(shù)據(jù))并行計(jì)算。

2.2 數(shù)據(jù)交換策略

  • 轉(zhuǎn)發(fā)策略,發(fā)送端和接收端一對(duì)一地進(jìn)行數(shù)據(jù)傳輸。
  • 廣播策略,每個(gè)數(shù)據(jù)項(xiàng)都會(huì)發(fā)往下游算子的全部并行任務(wù),該策略會(huì)把數(shù)據(jù)復(fù)制多份且涉及網(wǎng)絡(luò)通信,因此代價(jià)比較高。
  • 基于鍵值的策略,根據(jù)某一鍵的值屬性對(duì)數(shù)據(jù)分區(qū),保證鍵相同的數(shù)據(jù)項(xiàng)交由同一任務(wù)處理。
  • 隨機(jī)策略,隨機(jī)均勻分配。

2.3 流處理中的基本概念

延遲和吞吐

延遲指處理一個(gè)事件所需的時(shí)間,吞吐是衡量系統(tǒng)處理能力(處理速率)的指標(biāo)。

窗口操作

是指創(chuàng)建一些稱為 “ 桶 ” 的有限事件集合將流數(shù)據(jù)劃分為有限大小的時(shí)間段或數(shù)據(jù)塊,對(duì)桶內(nèi)的數(shù)據(jù)進(jìn)行運(yùn)算的方式。常見的窗口類型有滑動(dòng)窗口、會(huì)話窗口。

處理時(shí)間

當(dāng)前算子的本地時(shí)鐘時(shí)間。

事件時(shí)間

數(shù)據(jù)流中事件實(shí)際發(fā)生的時(shí)間,它以附加在數(shù)據(jù)流中事件的時(shí)間戳為依據(jù)。

水位線

全局進(jìn)度指標(biāo),表示我們確信不會(huì)再有延遲事件到來的某個(gè)時(shí)間點(diǎn)。

狀態(tài)和一致性模型

有狀態(tài)算子要考慮到以下幾點(diǎn):

  • 狀態(tài)管理;
  • 狀態(tài)劃分;
  • 狀態(tài)恢復(fù)。
任務(wù)故障

對(duì)于流中的每個(gè)事件,任務(wù)都要執(zhí)行以下步驟:

  • 接收事件并存在本地緩沖區(qū);
  • 選擇性地更新內(nèi)部狀態(tài);
  • 產(chǎn)生輸出記錄;

其中任一步驟都有可能發(fā)生故障,需要結(jié)果保障處理。

結(jié)果保障
  • 至多一次
  • 至少一次
  • 精確一次
  • 端到端的精確一次

第3章 Apache Flink架構(gòu)

3.1 Flink組件

Flink各組件之間的交互過程
  • 作業(yè)管理器(JobManager):負(fù)責(zé)申請(qǐng)資源,任務(wù)分發(fā),任務(wù)調(diào)度執(zhí)行,checkpoint的協(xié)調(diào)執(zhí)行;可以搭建HA,雙master
  • 資源管理器(ResourceManager):負(fù)責(zé)管理任務(wù)管理器的插槽 slot
  • 任務(wù)管理器(TaskManager):負(fù)責(zé)任務(wù)的執(zhí)行,基于dataflow劃分出的task;與jobmanager保持心跳,匯報(bào)任務(wù)狀態(tài)
  • 分發(fā)器(Dispatcher):為應(yīng)用提交提供了rest接口;當(dāng)一個(gè)應(yīng)用被提交執(zhí)行時(shí),分發(fā)器就會(huì)啟動(dòng)并且將應(yīng)用移交給一個(gè)jobmanager

3.2 應(yīng)用部署

框架模式
  • Flink 應(yīng)用打包成 JAR 文件;
  • 通過客戶端提交到運(yùn)行的服務(wù),如:Dispatch、JobManager、YARN;
  • 運(yùn)行的服務(wù)接收 Flink 應(yīng)用,并確保其執(zhí)行
庫(kù)模式
  • Flink 會(huì)綁定到一個(gè)特定的容器鏡像(Docker)中;
  • 鏡像中包含著運(yùn)行 JobManager 以及 ResourceManager 代碼;
  • 容器啟動(dòng)后會(huì)自動(dòng)加載 JobManager 和 ResourceManager,并將綁定的作業(yè)提交執(zhí)行;
  • 另一個(gè)和作業(yè)無關(guān)的鏡像負(fù)責(zé)部署 TaskManager;
  • 容器通過鏡像啟動(dòng)后會(huì)自動(dòng)運(yùn)行 TaskManager,TaskManager 向 ResourceManager 注冊(cè);
  • 外部資源管理框架好處:負(fù)責(zé)鏡像啟動(dòng),并在發(fā)生故障時(shí)候容器能夠重啟

3.3 任務(wù)執(zhí)行

算子、任務(wù)及處理槽
  • 算子并行度是指在并行計(jì)算中,一個(gè)算子(操作)被分成多個(gè)并行任務(wù)執(zhí)行的數(shù)量。并行度的大小取決于可用的計(jì)算資源、問題的性質(zhì)以及并行算法的設(shè)計(jì)。
  • 將任務(wù)以切片的形式調(diào)度至處理槽中有一個(gè)好處: TaskManager 中的多個(gè)任務(wù)可以在同一進(jìn)程內(nèi)高效地執(zhí)行數(shù)據(jù)交換而無須訪問網(wǎng)絡(luò)。然而,任務(wù)過于集中也會(huì)使TaskManager負(fù)載變高,繼而可能導(dǎo)致性能下降。
  • TaskManager 會(huì)在同 JVM 進(jìn)程內(nèi)以多線程的方式執(zhí)行任務(wù)。和獨(dú)立進(jìn)程相比,線程更加輕量并且通信開銷更低,但無法嚴(yán)格地將任務(wù)彼此隔離。因此只要有 個(gè)任務(wù)運(yùn)行異常,就有可能“殺死” TaskManager 進(jìn)程,導(dǎo)致它上面運(yùn)行的所有任務(wù)都停止。如果將每個(gè) TaskManager 成只有 個(gè)處理槽,則可以限制應(yīng)用在 TaskManag 級(jí)別進(jìn)行隔離,即每個(gè) TaskManager 只運(yùn)行單個(gè)應(yīng)用的任務(wù)。

3.4 高可用性設(shè)置

流式應(yīng)用通常都會(huì)設(shè)計(jì)成 7 × 24 小時(shí)運(yùn)行,因此對(duì)于它很重要的點(diǎn)是:即便內(nèi)部進(jìn)程發(fā)生故障時(shí)也不能終止運(yùn)行。為了從故障中恢復(fù),系統(tǒng)首先要重啟故障進(jìn)程,隨后需要重啟應(yīng)用并恢復(fù)其狀態(tài)。

  • TaskManager 故障
    如果任務(wù)管理器發(fā)生故障,整個(gè)系統(tǒng)的可用處理槽會(huì)對(duì)應(yīng)減少。這時(shí)作業(yè)管理器會(huì)向資源管理器申請(qǐng)更多的處理槽,只有申請(qǐng)成功后應(yīng)用才會(huì)重啟,因此應(yīng)用的重啟策略決定了任務(wù)管理器的故障解決策略。
  • JobManager 故障
    任務(wù)管理器用于控制流式應(yīng)用執(zhí)行以及保存過程中的元數(shù)據(jù),因此任務(wù)管理器發(fā)生故障將導(dǎo)致流式應(yīng)用無法繼續(xù)處理數(shù)據(jù)。為了解決該問題,F(xiàn)link提供了高可用模式,支持在原 JobManager 消失的情況下將作業(yè)的管理職責(zé)及元數(shù)據(jù)遷移到另一個(gè)JobManager。


    Flink高可用模式

    Flink 中的高可用模式是基于 ZooKeeper 來完成的,它在 Flink 中主要用于“領(lǐng)導(dǎo)”選舉以及持久且高可用的數(shù)據(jù)存儲(chǔ)。 這種模式下,JobManager 會(huì)將 JobGraph 以及全部所需的元數(shù)據(jù)(例如應(yīng)用的 JAR 文件)寫入遠(yuǎn)程持久化存儲(chǔ)系統(tǒng)中。此外,JobManager 還會(huì)將存儲(chǔ)位置的路徑地址寫入 ZooKeeper 的數(shù)據(jù)存儲(chǔ)。
    JobManager 發(fā)生故障時(shí),其下應(yīng)用的所有任務(wù)都會(huì)自動(dòng)取消。新接手工作JobManager 會(huì)執(zhí)行以下步驟:
    1. 向 ZooKeeper 請(qǐng)求存儲(chǔ)位置,以獲取最新檢查點(diǎn)在遠(yuǎn)程存儲(chǔ)的狀態(tài)句柄。
    2. ResourceManager 申請(qǐng)?zhí)幚聿蹃砝^續(xù)執(zhí)行應(yīng)用
    3. 重啟應(yīng)用并利用最近 次檢查點(diǎn)重置任務(wù)狀態(tài)。
    如果是在容器環(huán)境(如 Kubernetes )中以庫(kù)模式部署運(yùn)行應(yīng)用,容器編排服務(wù)通常會(huì)自動(dòng)重啟故障的 JobManager 容器。當(dāng)運(yùn)行在 YARN Mesos 上面時(shí), Flink 的其余進(jìn)程會(huì)觸發(fā) JobManager 進(jìn)程重啟。Flink 沒有針對(duì)獨(dú)立集群模式提供重啟故障進(jìn)程的工具,因此有必要運(yùn)行一些后備 JobManager 來接管故障進(jìn)程的工作,對(duì)于 TaskManager 也是同樣。

3.5 Flink中的數(shù)據(jù)傳輸

TaskManager之間的數(shù)據(jù)傳輸
  • 在算子處理完數(shù)據(jù)后,為了不造成太大的網(wǎng)絡(luò)壓力,不會(huì)馬上發(fā)送,會(huì)先收集到緩沖區(qū)中,以批次形式發(fā)送
  • 每個(gè)TaskManager都有網(wǎng)絡(luò)緩沖池(每個(gè)緩沖默認(rèn) 32KB大小),用于不同機(jī)器數(shù)據(jù)傳輸
  • 如果接收端和發(fā)送端位于同一臺(tái)機(jī)器內(nèi),序列化先放入緩沖區(qū),緩沖區(qū)完畢放到隊(duì)列中,接收任務(wù)獲取數(shù)據(jù)再反序列化
  • 如果發(fā)送端和接收端不在同一個(gè)機(jī)器,放入緩存后,先發(fā)送到TaskManager的網(wǎng)絡(luò)緩沖池中,再進(jìn)行發(fā)送

通過網(wǎng)絡(luò)連接逐條發(fā)送記錄不但低效,還會(huì)導(dǎo)致很多額外開銷。若想充分利用網(wǎng)絡(luò)連接帶寬,就需要對(duì)數(shù)據(jù)進(jìn)行緩沖。在流處理環(huán)境下,緩沖的一個(gè)明顯缺點(diǎn)是會(huì)增加延遲,因?yàn)橛涗浭紫纫占骄彌_區(qū)中而不會(huì)立即發(fā)送。

基于信用值的流量控制

Flink實(shí)現(xiàn)了一個(gè)基于信用值的流量控制機(jī)制,它的工作原理如下:接收任務(wù)會(huì)給發(fā)送任務(wù)授予一定的信用值,其實(shí)就是保留一些用來接收它數(shù)據(jù)的網(wǎng)絡(luò)緩沖。一旦發(fā)送端收到信用通知,就會(huì)在信用值所限定的范圍 盡可能多傳輸緩沖數(shù)據(jù),并會(huì)附帶上積壓量(已經(jīng)填滿準(zhǔn)備傳輸?shù)木W(wǎng)絡(luò)緩沖數(shù)目)大小。接收端使用保留的緩沖來處理收到的數(shù)據(jù),同時(shí)依據(jù)各發(fā)送端的積壓量信息來計(jì)算所有相連的發(fā)送端在下一輪的信用優(yōu)先級(jí)。

由于發(fā)送端可以在接收端有足夠資源時(shí)立即傳輸數(shù)據(jù),所以基于信用值量控制有效降低延遲。 此外,信用值的授予是根據(jù)各發(fā)送端 數(shù)據(jù)積壓量來完成的,因此該機(jī)制還能在出現(xiàn)數(shù)據(jù)傾斜( data skew )時(shí)有效分配網(wǎng)絡(luò)資源。

任務(wù)鏈接
滿足任務(wù)鏈接條件的算子流水線

任務(wù)鏈接的前提條件是多個(gè)算子必須有相同的并行度且通過本地轉(zhuǎn)發(fā)通道相連,F(xiàn)link 在默認(rèn)情況下會(huì)開啟任務(wù)鏈接。

在任務(wù)鏈接模式下,多個(gè)算子的函數(shù)被“融合”到同個(gè)任務(wù)中,在同個(gè)線程內(nèi)執(zhí)行。函數(shù)生成的記錄只需通過簡(jiǎn)單方法調(diào)用就可以分別發(fā)往各自的下游函數(shù),因此函數(shù)之間的記錄傳輸基本上不存在序列化及通信開銷。

3.6 事件時(shí)間處理

在事件時(shí)間模式下, Flink 流式應(yīng)用處理的所有記錄都必須包含時(shí)間戳和水位線。

一般情況下只要保證流記錄的時(shí)間戳?xí)S著數(shù)據(jù)流的前進(jìn)大致遞增即可。

水位線擁有兩個(gè)基本屬性:1、必須單調(diào)遞增;2、和記錄的時(shí)間戳存在聯(lián)系。水位線的意義之一在于它允許應(yīng)用控制結(jié)果的完整性和延遲。

3.7 算子狀態(tài)

算子狀態(tài)

算子狀態(tài)的作用域是某個(gè)算子任務(wù),這意味著所有在同一個(gè)并行任務(wù)之內(nèi)的記錄都能訪問到相同的狀態(tài)。算子狀態(tài)不能通過其他任務(wù)訪問,無論該任務(wù)是否來自相同算子。
原語(yǔ)有三類:

  • 列表狀態(tài)
  • 聯(lián)合列表狀態(tài)
  • 廣播狀態(tài)
鍵值分區(qū)狀態(tài)

原語(yǔ)有三類:

  • 單值狀態(tài)
  • 列表狀態(tài)
  • 映射狀態(tài)
狀態(tài)后端

一個(gè)可插拔組件,主要負(fù)責(zé)兩件事:本地狀態(tài)管理和將狀態(tài)、以檢查點(diǎn)的形式寫入遠(yuǎn)程存儲(chǔ)。

3.8 算子擴(kuò)縮容

鍵控狀態(tài)

帶有鍵值分區(qū)狀態(tài)的算子在擴(kuò)縮容時(shí)會(huì)根據(jù)新的任務(wù)數(shù)量對(duì)鍵值重新分區(qū)。
為了降低狀態(tài)在不同任務(wù)之間遷移的必要成本, Flink 不會(huì)對(duì)單獨(dú)的鍵值實(shí)施再分配,而是會(huì)把所有鍵值分為不同的鍵值組。每個(gè)鍵值組都包含了部分鍵值, Flink 以此為單位把鍵值分配給不同任務(wù)。

算子狀態(tài)

列表狀態(tài):把所有狀態(tài)的列表?xiàng)l目收集起來,均勻分配給新的任務(wù)
聯(lián)合列表狀態(tài):將狀態(tài)列表的全部條目廣播到全部的任務(wù),由任務(wù)決定去留
廣播狀態(tài):把狀態(tài)直接拷貝到新的任務(wù)上

3.9 Flink檢查點(diǎn)算法

基于Chandy-Lamport分布式快照算法來實(shí)現(xiàn)。該算法不會(huì)暫停整個(gè)應(yīng)用,而是會(huì)把任務(wù)處理和檢查點(diǎn)分離,這樣在部分任務(wù)持久化狀態(tài)過程中,其他任務(wù)還可以繼續(xù)執(zhí)行。
檢查點(diǎn)的原理步驟:

  1. 由JobManager向Source數(shù)據(jù)源任務(wù)生成一個(gè)新的檢查點(diǎn)編號(hào),Source算子接收到信息后,暫停發(fā)出記錄,利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點(diǎn),狀態(tài)后端保存完檢查點(diǎn)后通知任務(wù),隨后任務(wù)向JobManager發(fā)送確認(rèn)信息,隨后恢復(fù)正常工作,然后生成特殊的CheckPoint Barrier記錄,以廣播的形式發(fā)送到下游任務(wù)。
  2. 當(dāng)下游Transform算子接收到新的檢查點(diǎn)分割符號(hào),會(huì)暫停處理并且緩存當(dāng)前流的數(shù)據(jù),等待接收其他分區(qū)的檢查點(diǎn)分隔符,所有分隔符到達(dá)后,通知狀態(tài)后端生成檢查點(diǎn),保存通知JobManager后,向下游發(fā)送檢查點(diǎn)分隔符CheckPoint Barrier后,繼續(xù)處理數(shù)據(jù)。
  3. Sink算子接收到分隔符后依次等待分隔符到齊后,生成快照并且寫入檢查點(diǎn),向JobManager確認(rèn)。
  4. 當(dāng)JobManager確認(rèn)已接受所有應(yīng)用任務(wù)返回檢查點(diǎn)確認(rèn)消息后,將此次檢查點(diǎn)標(biāo)記為完成。

第4章 設(shè)置Apache Flink開發(fā)環(huán)境

Java語(yǔ)言示例見 https://github.com/streaming-with-flink/examples-java


第5章 DataStream API(1.7版本)

5.1 Hello, Flink!

//針對(duì)傳感器數(shù)據(jù)流每5秒計(jì)算一次平均溫度
public static void main(String[] args) throws Exception {
    
    // 設(shè)置執(zhí)行環(huán)境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 在應(yīng)用中使用事件時(shí)間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    // 設(shè)置生成水位線的時(shí)間間隔,系統(tǒng)默認(rèn)為200毫秒
    env.getConfig().setAutoWatermarkInterval(1000L);

    // 從流式數(shù)據(jù)源中創(chuàng)建 DataStream<SensorReading>對(duì)象
    DataStream<SensorReading> sensorData = env
        // 設(shè)置數(shù)據(jù)源,這些數(shù)據(jù)流的來驚可以是消息隊(duì)列或文件,也可以是實(shí)時(shí)生成的
        .addSource(new SensorSource())
        // 負(fù)責(zé)分配事件時(shí)間所需的時(shí)間戳和水位線
        .assignTimestampsAndWatermarks(new SensorTimeAssigner());

    DataStream<SensorReading> avgTemp = sensorData
        // 使用內(nèi)聯(lián) lambda 函數(shù)把華氏溫度轉(zhuǎn)為攝氏溫度
        .map( r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
        // 按照傳感器 id 組織數(shù)據(jù)
        .keyBy(r -> r.id)
        // 將讀數(shù)按5秒的滾動(dòng)窗口分組
        .timeWindow(Time.seconds(1))
        // 使用用戶自定義函數(shù)計(jì)算平均溫度
        .apply(new TemperatureAverager());

    // 將結(jié)果流打印到標(biāo)準(zhǔn)輸出
    avgTemp.print();

    /**
     * 以上代碼構(gòu)建執(zhí)行計(jì)劃,構(gòu)建完的計(jì)劃會(huì)被轉(zhuǎn)成 JobGrap 并根據(jù)執(zhí)行環(huán)境類型的不同提交至本地或遠(yuǎn)程的 JobManager,
     * 只有在調(diào)用 execute()方法時(shí),系統(tǒng)才會(huì)觸發(fā)程序執(zhí)行。
     * 如果是提交至遠(yuǎn)程,除 JobGraph 之外,我們還要同時(shí)提供包含應(yīng)用所需全部類和依賴的 JAR 包。
     */
    env.execute("Compute average sensor temperature");
}

5.2 轉(zhuǎn)換操作

基本轉(zhuǎn)換
  • map
  • filter
  • flatMap
基于KeyedStream的轉(zhuǎn)換
  • keyBy
  • 滾動(dòng)聚合sum()、min()、max()、minBy()、maxBy()
  • reduce
多流轉(zhuǎn)換
  • union
  • connect、coMap、coFlatMap
  • split、select
分發(fā)轉(zhuǎn)換
  • 隨機(jī) shuffle
  • 輪流 rebalance
  • 重調(diào) rescale
  • 廣播 broadcast
  • 全局 global
  • 自定義 partitionCustom

5.3 富函數(shù)

open、close

  • DataStream API 中所有的轉(zhuǎn)換函數(shù)都有對(duì)應(yīng)的富函數(shù)。
  • 富函數(shù)可以在處理第一條數(shù)據(jù)之前進(jìn)行初始化操作,獲取到一些上下文信息。

第6章 基于時(shí)間和窗口的算子

6.1 配置時(shí)間特性

在定義分布式流處理應(yīng)用程序中的時(shí)間算子操作之前,我們先了解“時(shí)間”的含義,當(dāng)你指定了一個(gè)窗口用于收集每一分鐘的bucket中產(chǎn)生的事件時(shí),如何確定每個(gè)bucket中具體包含了哪些事件呢?在DataStream API,你可以在創(chuàng)建窗口的時(shí)候使用時(shí)間特性去告知Flink如何定義時(shí)間,時(shí)間特性是StreamExecutionEnvironment的一個(gè)屬性,包括了幾種時(shí)間類型:

處理時(shí)間(Processing Time)

處理時(shí)間是指執(zhí)行相應(yīng)算子操作的機(jī)器的系統(tǒng)時(shí)間。當(dāng)流式程序按處理時(shí)間運(yùn)行時(shí),所有基于時(shí)間的操作(如時(shí)間窗口)將使用運(yùn)行相應(yīng)算子的計(jì)算機(jī)的系統(tǒng)時(shí)鐘。每小時(shí)處理時(shí)間窗口將包括系統(tǒng)時(shí)鐘指示整小時(shí)的時(shí)間之間到達(dá)特定算子的所有記錄。例如,如果應(yīng)用程序在9:15 am開始運(yùn)行,則第一個(gè)每小時(shí)處理時(shí)間窗口將包括在9:15 am和10:00 am之間處理的事件,下一個(gè)窗口將包括在10:00 am和11:00 am之間處理的事件,以此類推。處理時(shí)間是最簡(jiǎn)單的時(shí)間概念,不需要流和機(jī)器之間的協(xié)調(diào),無需依賴水位線,它提供了最佳的性能和最低的延遲。但是,在分布式和異步環(huán)境中,處理時(shí)間不能提供結(jié)果確定性,因?yàn)樗菀资艿接涗浀竭_(dá)系統(tǒng)(例如從消息隊(duì)列寫入)的速度以及數(shù)據(jù)在上下游算子之間的處理速度的影響。

事件時(shí)間(Event Time)

Event Time指的是數(shù)據(jù)流中每個(gè)元素或者每個(gè)事件自帶的時(shí)間屬性,一般是指事件發(fā)生的時(shí)間,系統(tǒng)的邏輯時(shí)間由水位線去定義。正如我們?cè)凇皶r(shí)間戳”章節(jié)中了解到的,時(shí)間戳要么在進(jìn)入數(shù)據(jù)處理管道之前就存在于數(shù)據(jù)中,要么由源函數(shù)生成,在事件時(shí)間中,時(shí)間的進(jìn)度取決于數(shù)據(jù),而不取決于任何時(shí)鐘。當(dāng)水位線聲明某個(gè)時(shí)間間隔內(nèi)的所有時(shí)間戳都已接收到了,事件時(shí)間窗口將觸發(fā)計(jì)算。理想情況下,事件時(shí)間窗口會(huì)產(chǎn)生確定性結(jié)果,即使事件發(fā)生順序混亂,窗口結(jié)果將不依賴于讀取或處理流的速度。

注入時(shí)間(Ingest Time)

將源算子操作的處理時(shí)間指定為每個(gè)接入記錄的事件時(shí)間戳,并自動(dòng)生成水位線。它是EventTime和ProcessingTime的混合體。事件的接入時(shí)間是它進(jìn)入流處理器的時(shí)間。與事件時(shí)間相比,接入時(shí)間并沒有提供太多的實(shí)際價(jià)值,因?yàn)樗荒芴峁┐_定的結(jié)果,并且具有與事件時(shí)間相近的性能。

6.2 分配時(shí)間戳和生成水位線

DataStream API提供了TimestampAssigner接口,以便在元素被接入到流應(yīng)用程序后從元素中提取時(shí)間戳。通常,時(shí)間戳分配程序是在源函數(shù)之后立即調(diào)用的,因?yàn)榇蠖鄶?shù)分配程序在生成水位線時(shí)都對(duì)元素的時(shí)間戳順序作了假設(shè)性猜想。由于元素通常是并行攝入的,所以任何導(dǎo)致Flink跨并行流分區(qū)重新分配元素的操作,都會(huì)打亂元素的時(shí)間戳順序,例如并行性更改、KeyBy()或其他引起重新分配的操作。
最好的做法是分配時(shí)間戳,并在盡可能靠近源的地方甚至在SourceFunction內(nèi)生成水位線。根據(jù)應(yīng)用場(chǎng)景,在分配時(shí)間戳之前,如果這些操作沒有引起元素的重新分配,可以對(duì)輸入流應(yīng)用執(zhí)行過濾或轉(zhuǎn)換操作。
為了確保事件時(shí)間操作按預(yù)期運(yùn)行,應(yīng)該在任何依賴于事件時(shí)間的轉(zhuǎn)換之前調(diào)用分配器,例如, 在第一個(gè)事件時(shí)間窗口之前。

周期性水位線分配器

周期性地分配水位線意味著我們指示系統(tǒng)以固定的機(jī)器時(shí)間間隔檢查事件時(shí)間的進(jìn)度,默認(rèn)時(shí)間間隔設(shè)置是200毫秒。

//  設(shè)置時(shí)間間隔為1000毫秒
env.getConfig().setAutoWatermarkInterval(1000L)

實(shí)際上,每隔5秒,F(xiàn)link就會(huì)調(diào)用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果該方法返回非空值的時(shí)間戳大于前一個(gè)水位線的時(shí)間戳,則生成新的水位線。否則,如果方法返回一個(gè)空值,或者返回的水位線的時(shí)間戳小于最后發(fā)出的水位線的時(shí)間戳,則不會(huì)生成新水位線。

定點(diǎn)水位線分配器

有時(shí)輸入流包含特殊的元組或標(biāo)記,用于指示流的進(jìn)度。對(duì)于這種情況,或者當(dāng)可以根據(jù)輸入元素的其他屬性定義水位線時(shí),F(xiàn)link提供了AssignerWithPunctuatedWatermarks接口。它定義了checkAndGetNextWatermark()方法,該方法在extractTimestamp()之后為每個(gè)事件調(diào)用。該方法可以決定是否生成新的水位線。如果方法返回的非空水位線大于最新發(fā)出的水位線,則發(fā)出新水位線。

水位線、延遲及完整性問題
  • 現(xiàn)實(shí)中沒有完美的水位線,需要進(jìn)行有根據(jù)的猜測(cè),假設(shè)性設(shè)定數(shù)據(jù)整體之間的延遲,從而在應(yīng)用程序中生成水位線。需要使用關(guān)于源、網(wǎng)絡(luò)和分區(qū)的等因素來估計(jì)處理進(jìn)度和輸入記錄延遲的上限,也就是對(duì)遲到數(shù)據(jù)的容忍度。估計(jì)就意味著有出錯(cuò)的空間,在這種情況下,生成的水位線可能是不準(zhǔn)確的,往往會(huì)造成不必要數(shù)據(jù)延遲或應(yīng)用程序延遲變大。
  • 如果生成松散的水位線(水位線遠(yuǎn)遠(yuǎn)落后于處理過的記錄的時(shí)間戳),則會(huì)增加生成結(jié)果的延遲,但是可以更大程度上保證了結(jié)果完整性。此外,狀態(tài)的大小通常會(huì)增加,因?yàn)閼?yīng)用程序需要緩沖更多的數(shù)據(jù),直到觸發(fā)計(jì)算為止。在執(zhí)行計(jì)算時(shí),我們基本可以確定所有相關(guān)的數(shù)據(jù)都是可用的。
  • 另一方面,如果你生成了非常緊密的水位線,也就是設(shè)置了一個(gè)很小的遲到時(shí)間,這些水位線可能比一些后續(xù)記錄的時(shí)間戳更大,基于時(shí)間的計(jì)算可能在所有相關(guān)數(shù)據(jù)到達(dá)之前執(zhí)行,這樣做可能會(huì)產(chǎn)生不完整或不準(zhǔn)確的結(jié)果,但是好處是可以降低結(jié)果的延遲。
  • 與構(gòu)建的批處理應(yīng)用程序不同,在基于所有數(shù)據(jù)都可用的前提條件下,延遲/結(jié)果完整性是權(quán)衡流處理應(yīng)用程序的基本特征,流處理應(yīng)用程序處理的是接收到的無界數(shù)據(jù)。水位線是一種功能強(qiáng)大的解決方式,可以根據(jù)時(shí)間控制應(yīng)用程序的行為。除了水位線之外,F(xiàn)link還有許多特性來調(diào)整基于時(shí)間的操作的確切行為,如process函數(shù)和窗口觸發(fā)器,并提供了處理遲到數(shù)據(jù)的不同方法。

6.3 處理函數(shù)

DataStream API提供了一系列底層轉(zhuǎn)換,即process 函數(shù),這些函數(shù)可以訪問記錄的時(shí)間戳和水位線,并注冊(cè)將來某個(gè)特定時(shí)間觸發(fā)的定時(shí)器。此外,process函數(shù)還支持將記錄發(fā)送到多個(gè)輸出流。process函數(shù)通常用于構(gòu)建事件驅(qū)動(dòng)的應(yīng)用程序,并實(shí)現(xiàn)可能不適用于預(yù)定義窗口和轉(zhuǎn)換的自定義邏輯。例如,F(xiàn)link的SQL支持的大多數(shù)算子都是基于process函數(shù)實(shí)現(xiàn)的。

目前,F(xiàn)link提供八種不同的process函數(shù):ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction和ProcessAllWindowFunction。

時(shí)間服務(wù)和計(jì)時(shí)器

KeyedProcessFunction是一個(gè)非常通用的函數(shù),可以應(yīng)用于KeyedStream。對(duì)流的每個(gè)記錄調(diào)用該函數(shù),并返回零條、一條或多條記錄。所有process函數(shù)都實(shí)現(xiàn)RichFunction接口,提供open()、close()和getRuntimeContext()方法。另外,KeyedProcessFunction還提供了以下兩個(gè)方法:processElement()、onTimer()

當(dāng)定時(shí)器觸發(fā)時(shí),將調(diào)用onTimer()回調(diào)函數(shù)。processElement()和onTimer()方法是同步的,以防止對(duì)狀態(tài)的并發(fā)訪問和操作。

每個(gè)key可以有多個(gè)定時(shí)器(Timers),但每個(gè)時(shí)間戳只能有一個(gè)定時(shí)器。默認(rèn)情況下,KeyedProcessFunction會(huì)保留heap上的優(yōu)先級(jí)隊(duì)列中的所有定時(shí)器(Timers)的時(shí)間戳。但是,你可以配置RocksDB狀態(tài)后端來存儲(chǔ)定時(shí)器(Timer)。定時(shí)器與函數(shù)的任何其他狀態(tài)一起被存入檢查點(diǎn)。如果應(yīng)用程序需要從故障中恢復(fù),則在應(yīng)用程序重新啟動(dòng)時(shí)過期的所有處理時(shí)間定時(shí)器將在應(yīng)用程序恢復(fù)時(shí)立即觸發(fā)。

6.4 窗口算子

定義窗口算子

窗口算子可以應(yīng)用于key類型流或none-key類型流。key類型窗口上的窗口算子是并行計(jì)算的,而非key類型窗口是在單線程處理的。

要?jiǎng)?chuàng)建窗口算子,你需要指定兩個(gè)窗口組件:

  • 確定輸入流的元素如何分組到窗口中的窗口分配器,窗口分配器生成一個(gè)WindowedStream(非key類型數(shù)據(jù)流生成AllWindowedStream)。
  • 應(yīng)用于WindowedStream(或AllWindowedStream)上,并處理分配給窗口的元素的窗口函數(shù)。
內(nèi)置窗口分配器

Flink的內(nèi)置窗口分配器創(chuàng)建類型為TimeWindow的窗口。此窗口類型實(shí)質(zhì)上表示兩個(gè)時(shí)間戳之間的時(shí)間間隔,左閉右開。此類型窗口包括定義窗口邊界、檢查窗口是否相交以及合并重疊窗口的方法。常見的內(nèi)置窗口分配器有以下幾種:


滾動(dòng)窗口分配器將元素放入大小固定且互不重疊的窗口中

渭動(dòng)窗口分配器將元素放入大小固定且可能重疊的窗口中

會(huì)話窗口分配器將元素放入大小不同的活動(dòng)的非重疊窗口中
在窗口上應(yīng)用函數(shù)

窗口函數(shù)定義了對(duì)窗口中的數(shù)據(jù)元素執(zhí)行的計(jì)算邏輯。有兩種類型的函數(shù)可以用于窗口函數(shù):

  • 增量聚合函數(shù)(Incremental aggregation functions):在元素被添加到窗口并保持和更新單個(gè)值為窗口狀態(tài)時(shí)直接應(yīng)用增量聚合函數(shù)。這些函數(shù)通常非常節(jié)省空間,并最終產(chǎn)生聚合值作為結(jié)果。ReduceFunction和AggregateFunction都是增量聚合函數(shù)。
  • 全量窗口函數(shù)(Full window functions):收集窗口的所有元素,并在對(duì)所有收集的元素求值時(shí)遍歷元素列表。全量窗口函數(shù)通常需要更多的內(nèi)存,可以完成比增量聚合函數(shù)更復(fù)雜的邏輯。ProcessWindowFunction是一個(gè)全量窗口函數(shù)。
自定義函數(shù)
配置了增量聚合及全量窗口函數(shù)的窗口算子
觸發(fā)器

觸發(fā)器定義何時(shí)計(jì)算窗口并輸出窗口的結(jié)果。觸發(fā)器可以根據(jù)特定于時(shí)間或特定數(shù)據(jù)條件(如元素計(jì)數(shù)或某些接收到的元素值)中的處理情況決定是否觸發(fā)。例如,當(dāng)處理時(shí)間或水位線超過窗口結(jié)束邊界的時(shí)間戳?xí)r,將觸發(fā)前面討論的時(shí)間窗口的默認(rèn)觸發(fā)器。

每次調(diào)用觸發(fā)器時(shí),它都會(huì)生成一個(gè)TriggerResult來確定應(yīng)該對(duì)窗口執(zhí)行什么操作。TriggerResult可以取以下值之一:

  • CONTINUE(跳過)
  • FIRE(觸發(fā))
  • PURGE(清除)
  • FIRE_AND_PURGE(觸發(fā)并清除):首先計(jì)算窗口(FIRE),然后刪除所有狀態(tài)和元數(shù)據(jù)(PURGE)。
移除器

在Flink的窗口機(jī)制中,移除器是一個(gè)可選組件。它可以在窗口函數(shù)執(zhí)行之前或之后從窗口中刪除元素。

在將窗口函數(shù)應(yīng)用于窗口內(nèi)容之前和之后分別調(diào)用evictBefore()和evictAfter()方法。這兩個(gè)方法都有一個(gè)Iterable參數(shù)(服務(wù)于添加到窗口的所有元素)、窗口中的元素?cái)?shù)量(大小)參數(shù)、窗口對(duì)象和一個(gè)EvictorContext參數(shù)。通過調(diào)用可從Iterable獲得的Iterator對(duì)象上的remove()方法,可以從窗口中刪除元素。

6.5 基于時(shí)間的雙流 Join

基于間隔的Join
基于間隔的關(guān)聯(lián)(Interval Join)

interval join連接來自兩個(gè)具有公共key的流的事件,這兩個(gè)流之間的時(shí)間戳間隔不超過指定的時(shí)間間隔。

 input1
.keyBy(…)
.between(<lower-bound>, <upper-bound>) // 下界和上界定義為負(fù)的和正的時(shí)間間隔,例如,between(Time.hour(-1), Time.minute(15)).
.process(ProcessJoinFunction) // process pairs of matched events,join事件雙方都被傳遞到ProcessJoinFunction中
基于窗口的Join

兩個(gè)輸入流的元素都被分配到公共窗口,并在窗口完成時(shí)聯(lián)接(或分組)。

6.6 處理遲到數(shù)據(jù)

DataStream API提供了處理延遲事件的不同選項(xiàng):

  • 延遲事件可以簡(jiǎn)單地刪除。
  • 延遲事件可以重定向到單獨(dú)的流。
  • 計(jì)算結(jié)果可以根據(jù)延遲事件進(jìn)行更新,并且必須輸出更新。
重定向遲到事件

延遲事件還可以使用側(cè)輸出流特性重定向到另一個(gè)DataStream,可以使用常規(guī)的接收函數(shù)處理或發(fā)出延遲事件。根據(jù)業(yè)務(wù)需求,后期數(shù)據(jù)稍后可以通過定期的回填過程集成到流應(yīng)用程序的結(jié)果中。

基于遲到事件更新結(jié)果

延遲事件在它們應(yīng)該完成的計(jì)算之后到達(dá)算子。因此,算子輸出的結(jié)果是不完整或不準(zhǔn)確的。另一種策略是重新計(jì)算不完整的結(jié)果并輸出更新,而不是刪除或重定向延遲事件。但是,為了能夠重新計(jì)算和更新結(jié)果,需要考慮一些問題。

  • 支持重新計(jì)算和更新已輸出結(jié)果的算子需要在發(fā)出第一個(gè)結(jié)果后保留計(jì)算所需的所有狀態(tài)。但是,由于算子通常不可能永遠(yuǎn)保留所有狀態(tài),所以需要在某個(gè)時(shí)候清除狀態(tài)。一旦清除了某個(gè)結(jié)果的狀態(tài),就不能再更新該結(jié)果,只能刪除或重定向延遲事件。

  • 除了保持狀態(tài)外,下游算子或跟隨算子的外部系統(tǒng)還需要能夠處理這些更新。例如,將結(jié)果和key值窗口算子的更新寫入key值存儲(chǔ)的接收器算子可以通過使用upsert寫操作用最新更新結(jié)果覆蓋不準(zhǔn)確的結(jié)果來實(shí)現(xiàn)這一點(diǎn)。對(duì)于某些用例,可能還需要區(qū)分第一個(gè)結(jié)果和由于延遲事件而導(dǎo)致的更新。

窗口算子API提供了一個(gè)方法來顯式聲明你期望的遲到元素。在使用事件時(shí)間窗口時(shí),可以指定允許遲到的時(shí)間。允許遲到的窗口算子不會(huì)在水位線通過窗口的結(jié)束時(shí)間戳后刪除窗口及其狀態(tài)。相反,算子將繼續(xù)維護(hù)包括遲到時(shí)間段內(nèi)的完整窗口。當(dāng)一個(gè)遲到元素在允許的遲到周期內(nèi)到達(dá)時(shí),它就像一個(gè)正常到達(dá)的元素一樣被處理并傳遞給觸發(fā)器。當(dāng)水位線通過窗口的結(jié)束時(shí)間戳和延遲間隔時(shí),窗口最終被刪除,隨后的所有遲到元素被丟棄。


第7章 有狀態(tài)算子和應(yīng)用

有狀態(tài)算子及用戶函數(shù)都是流應(yīng)用中常見組成部分。事實(shí)上,由于數(shù)據(jù)會(huì)隨著時(shí)間以流式到來,大多數(shù)復(fù)雜一些的操作都需要存儲(chǔ)部分?jǐn)?shù)據(jù)或者中間結(jié)果,很多Flink內(nèi)置的DataStream算子、數(shù)據(jù)源以及數(shù)據(jù)匯都是有狀態(tài)的,它們需要對(duì)數(shù)據(jù)記錄進(jìn)行緩沖或者對(duì)中間結(jié)果以及元數(shù)據(jù)加以維護(hù)。

7.1 實(shí)現(xiàn)有狀態(tài)函數(shù)

鍵值分區(qū)狀態(tài)只能由作用在KeyedStream上面的函數(shù)使用,這個(gè)可以通過DataStream.keyBy()方法來得到一個(gè)KeyedStream。KeyedStream會(huì)根據(jù)指定鍵值進(jìn)行分區(qū)并記住鍵值的定義,作用在KeyedStream上的算子可以訪問它的鍵值定義上下文信息。Flink為鍵值分區(qū)狀態(tài)提供了多種數(shù)據(jù)類型,每個(gè)類型對(duì)應(yīng)了一種狀態(tài)結(jié)構(gòu),用戶可以根據(jù)自定義函數(shù)與狀態(tài)的交互方式或性能選擇不同的狀態(tài)類型:

  • ValueState[T]:用于保存類型為T的單個(gè)值??捎梅椒ㄓ衯alue()、update();
  • ListState[T]:用list結(jié)構(gòu)保存多個(gè)類型為T的元素,常用方法get()、add()、addAll()、update()等,但是它不支持刪除單個(gè)元素,我們可以使用update()方法更新整個(gè)列表,使用給定的列表值替換已有值;
  • MapState[K,V]:用于保存一組鍵到值的映射;
  • ReducingState[T]:提供了和ListState[T]相同的方法(除了addAll、update),但是你需要傳遞一個(gè)聚合函數(shù)ReduceFunction用來對(duì)存入的數(shù)據(jù)進(jìn)行聚合;
  • AggregatingState[I,O]:和ReducingState[T]行為類似,但它使用了更加通用的AggregatingFunction來聚合狀態(tài)內(nèi)部的值。

總結(jié)

  • 當(dāng)我們創(chuàng)建一個(gè)狀態(tài)對(duì)象時(shí),我們需要利用RichFunction中的RuntimeContext在Flink運(yùn)行時(shí)中注冊(cè)一個(gè)StateDescriptor;
  • 每個(gè)狀態(tài)類型都有自己特定的StateDescriptor,入?yún)⒅幸獙懭霠顟B(tài)名稱與類型,ReducingState和AggregatingState的描述符還需要接收一個(gè)ReducingFunction或AggregatingFunction對(duì)象,以此來對(duì)加入的值進(jìn)行聚合;
  • 狀態(tài)名稱的作用域是整個(gè)算子,我們可以通過在函數(shù)中注冊(cè)多個(gè)狀態(tài)描述符來創(chuàng)建多個(gè)狀態(tài)對(duì)象;
  • 狀態(tài)類型可以通過Class或TypeInformation對(duì)象指定,因?yàn)镕link要為狀態(tài)創(chuàng)建合適的序列化器,所有類型指定是強(qiáng)制的。
  • 通常情況下,狀態(tài)引用對(duì)象要在RichFunction的open()方法中初始化;
  • 我們一般會(huì)將狀態(tài)引用對(duì)象聲明為函數(shù)類的普通成員變量;
  • 對(duì)于函數(shù)類得外部傳入的參數(shù),我們也是以普通成員變量的方式通過構(gòu)造函數(shù)傳入;
  • 狀態(tài)引用對(duì)象只提供用于訪問狀態(tài)的接口而不會(huì)存儲(chǔ)狀態(tài)本身,具體保存工作交由狀態(tài)后端完成。

7.2 為有狀態(tài)的應(yīng)用開啟故障恢復(fù)

啟動(dòng)周期性檢查點(diǎn)功能

7.3 確保有狀態(tài)應(yīng)用的可維護(hù)性

flink利用保存點(diǎn)機(jī)制來對(duì)應(yīng)用及其狀態(tài)進(jìn)行維護(hù),但是需要初始版本應(yīng)用的全部有狀態(tài)算子都指定好兩個(gè)參數(shù),才可以在未來正常使用,這兩個(gè)參數(shù)是算子唯一標(biāo)識(shí)和最大并行度
算子的唯一標(biāo)識(shí)和最大并行度會(huì)被固定在保存點(diǎn)上,不可更改.一旦修改只能丟棄從頭開始運(yùn)行

指定算子唯一標(biāo)識(shí),uid方法

為使用鍵值分區(qū)狀態(tài)的算子定義最大并行度

7.4 有狀態(tài)應(yīng)用的性能及魯棒性

選擇狀態(tài)后端
  1. MemoryStateBackend

MemoryStateBackend 是將狀態(tài)維護(hù)在 Java 堆上的一個(gè)內(nèi)部狀態(tài)后端。鍵值狀態(tài)和窗口算子使用哈希表來存儲(chǔ)數(shù)據(jù)(values)和定時(shí)器(timers)。當(dāng)應(yīng)用程序 checkpoint 時(shí),此后端會(huì)在將狀態(tài)發(fā)給 JobManager 之前快照下狀態(tài),JobManager 也將狀態(tài)存儲(chǔ)在 Java 堆上。默認(rèn)情況下,MemoryStateBackend 配置成支持異步快照。異步快照可以避免阻塞數(shù)據(jù)流的處理,從而避免反壓的發(fā)生。當(dāng)然,使用 new MemoryStateBackend(MAX_MEM_STATE_SIZE, false)也可以禁用該特點(diǎn)

默認(rèn)情況下,每一個(gè)狀態(tài)的大小限制為 5 MB??梢酝ㄟ^ MemoryStateBackend 的構(gòu)造函數(shù)增加這個(gè)大小。狀態(tài)大小受到 akka 幀大小的限制(maxStateSize <= akka.framesize 默認(rèn) 10 M),所以無論怎么調(diào)整狀態(tài)大小配置,都不能大于 akka 的幀大小。也可以通過 akka.framesize 調(diào)整 akka 幀大小。
狀態(tài)的總大小不能超過 JobManager 的內(nèi)存。

  1. FsStateBackend

FsStateBackend需要配置的主要是文件系統(tǒng),如 URL(類型,地址,路徑)。

當(dāng)選擇使用 FsStateBackend時(shí),正在進(jìn)行的數(shù)據(jù)會(huì)被存在TaskManager的內(nèi)存中。在checkpoint時(shí),此后端會(huì)將狀態(tài)快照寫入配置的文件系統(tǒng)和目錄的文件中,同時(shí)會(huì)在JobManager的內(nèi)存中(在高可用場(chǎng)景下會(huì)存在 Zookeeper 中)存儲(chǔ)極少的元數(shù)據(jù)。容量限制上,單 TaskManager 上 State 總量不超過它的內(nèi)存,總大小不超過配置的文件系統(tǒng)容量。

默認(rèn)情況下,F(xiàn)sStateBackend 配置成提供異步快照,以避免在狀態(tài) checkpoint 時(shí)阻塞數(shù)據(jù)流的處理。該特性可以實(shí)例化 FsStateBackend 時(shí)傳入false的布爾標(biāo)志來禁用掉,例如:new FsStateBackend(path, false)

  1. RocksDBStateBackend

RocksDBStateBackend 的配置也需要一個(gè)文件系統(tǒng)(類型,地址,路徑)。

RocksDB 是一種嵌入式的本地?cái)?shù)據(jù)庫(kù)。RocksDBStateBackend 將處理中的數(shù)據(jù)使用 RocksDB 存儲(chǔ)在本地磁盤上。在 checkpoint 時(shí),整個(gè) RocksDB 數(shù)據(jù)庫(kù)會(huì)被存儲(chǔ)到配置的文件系統(tǒng)中,或者在超大狀態(tài)作業(yè)時(shí)可以將增量的數(shù)據(jù)存儲(chǔ)到配置的文件系統(tǒng)中。同時(shí) Flink 會(huì)將極少的元數(shù)據(jù)存儲(chǔ)在 JobManager 的內(nèi)存中,或者在 Zookeeper 中(對(duì)于高可用的情況)。RocksDB 默認(rèn)也是配置成異步快照的模式。

RocksDB是一個(gè) key/value 的內(nèi)存存儲(chǔ)系統(tǒng),和其他的 key/value 一樣,先將狀態(tài)放到內(nèi)存中,如果內(nèi)存快滿時(shí),則寫入到磁盤中,但需要注意RocksDB不支持同步的 Checkpoint,構(gòu)造方法中沒有同步快照這個(gè)選項(xiàng)。不過RocksDB支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲(chǔ)在外部文件系統(tǒng)(本地或HDFS),其容量限制只要單個(gè) TaskManager 上 State 總量不超過它的內(nèi)存+磁盤,單Key最大2G,總大小不超過配置的文件系統(tǒng)容量即可。

7.5 更新有狀態(tài)應(yīng)用


第8章 讀寫外部系統(tǒng)

8.1 應(yīng)用的一致性保證

Flink的檢查點(diǎn)和恢復(fù)機(jī)制定期的會(huì)保存應(yīng)用程序狀態(tài)的一致性檢查點(diǎn)。在故障的情況下,應(yīng)用程序的狀態(tài)將會(huì)從最近一次完成的檢查點(diǎn)恢復(fù),并繼續(xù)處理。盡管如此,可以使用檢查點(diǎn)來重置應(yīng)用程序的狀態(tài)無法完全達(dá)到令人滿意的一致性保證。相反,source和sink的連接器需要和Flink的檢查點(diǎn)和恢復(fù)機(jī)制進(jìn)行集成才能提供有意義的一致性保證。

為了給應(yīng)用程序提供恰好處理一次語(yǔ)義的狀態(tài)一致性保證,應(yīng)用程序的source連接器需要能夠?qū)ource的讀位置重置到之前保存的檢查點(diǎn)位置。當(dāng)處理一次檢查點(diǎn)時(shí),source操作符將會(huì)把source的讀位置持久化,并在恢復(fù)的時(shí)候從這些讀位置開始重新讀取。支持讀位置的檢查點(diǎn)的source連接器一般來說是基于文件的存儲(chǔ)系統(tǒng),如:文件流或者Kafka source(檢查點(diǎn)會(huì)持久化某個(gè)正在消費(fèi)的topic的讀偏移量)。如果一個(gè)應(yīng)用程序從一個(gè)無法存儲(chǔ)和重置讀位置的source連接器攝入數(shù)據(jù),那么當(dāng)任務(wù)出現(xiàn)故障的時(shí)候,數(shù)據(jù)就會(huì)丟失。也就是說我們只能提供at-most-once)的一致性保證。

Fink的檢查點(diǎn)和恢復(fù)機(jī)制和可以重置讀位置的source連接器結(jié)合使用,可以保證應(yīng)用程序不會(huì)丟失任何數(shù)據(jù)。盡管如此,應(yīng)用程序可能會(huì)發(fā)出兩次計(jì)算結(jié)果,因?yàn)閺纳弦淮螜z查點(diǎn)恢復(fù)的應(yīng)用程序所計(jì)算的結(jié)果將會(huì)被重新發(fā)送一次(一些結(jié)果已經(jīng)發(fā)送出去了,這時(shí)任務(wù)故障,然后從上一次檢查點(diǎn)恢復(fù),這些結(jié)果將被重新計(jì)算一次然后發(fā)送出去)。所以,可重置讀位置的source和Flink的恢復(fù)機(jī)制不足以提供端到端的恰好處理一次語(yǔ)義,即使應(yīng)用程序的狀態(tài)是恰好處理一次一致性級(jí)別。

一個(gè)志在提供端到端恰好處理一次語(yǔ)義一致性的應(yīng)用程序需要特殊的sink連接器。sink連接器可以在不同的情況下使用兩種技術(shù)來達(dá)到恰好處理一次一致性語(yǔ)義:冪等性寫入和事務(wù)性寫入。

冪等性寫入

一個(gè)冪等操作無論執(zhí)行多少次都會(huì)返回同樣的結(jié)果。例如,重復(fù)的向hashmap中插入同樣的key-value對(duì)就是冪等操作,因?yàn)轭^一次插入操作之后所有的插入操作都不會(huì)改變這個(gè)hashmap,因?yàn)閔ashmap已經(jīng)包含這個(gè)key-value對(duì)了。另一方面,append操作就不是冪等操作了,因?yàn)槎啻蝍ppend同一個(gè)元素將會(huì)導(dǎo)致列表每次都會(huì)添加一個(gè)元素。在流處理程序中,冪等寫入操作是很有意思的,因?yàn)閮绲葘懭氩僮骺梢詧?zhí)行多次但不改變結(jié)果。所以它們可以在某種程度上緩和Flink檢查點(diǎn)機(jī)制帶來的重播計(jì)算結(jié)果的效應(yīng)。

需要注意的是,依賴于冪等性sink來達(dá)到exactly-once語(yǔ)義的應(yīng)用程序,必須保證在從檢查點(diǎn)恢復(fù)以后,它將會(huì)覆蓋之前已經(jīng)寫入的結(jié)果。例如,一個(gè)包含有sink操作的應(yīng)用在sink到一個(gè)key-value存儲(chǔ)時(shí)必須保證它能夠確定的計(jì)算出將要更新的key值。同時(shí),從Flink程序sink到的key-value存儲(chǔ)中讀取數(shù)據(jù)的應(yīng)用,在Flink從檢查點(diǎn)恢復(fù)的過程中,可能會(huì)看到不想看到的結(jié)果。當(dāng)重播開始時(shí),之前已經(jīng)發(fā)出的計(jì)算結(jié)果可能會(huì)被更早的結(jié)果所覆蓋(因?yàn)樵诨謴?fù)過程中)。所以,一個(gè)消費(fèi)Flink程序輸出數(shù)據(jù)的應(yīng)用,可能會(huì)觀察到時(shí)間回退,例如讀到了比之前小的計(jì)數(shù)。也就是說,當(dāng)流處理程序處于恢復(fù)過程中時(shí),流處理程序的結(jié)果將處于不穩(wěn)定的狀態(tài),因?yàn)橐恍┙Y(jié)果被覆蓋掉,而另一些結(jié)果還沒有被覆蓋。一旦重播完成,也就是說應(yīng)用程序已經(jīng)通過了之前出故障的點(diǎn),結(jié)果將會(huì)繼續(xù)保持一致性。

事務(wù)性寫入

第二種實(shí)現(xiàn)端到端的恰好處理一次一致性語(yǔ)義的方法基于事務(wù)性寫入。其思想是只將最近一次成功保存的檢查點(diǎn)之前的計(jì)算結(jié)果寫入到外部系統(tǒng)中去。這樣就保證了在任務(wù)故障的情況下,端到端恰好處理一次語(yǔ)義。應(yīng)用將被重置到最近一次的檢查點(diǎn),而在這個(gè)檢查點(diǎn)之后并沒有向外部系統(tǒng)發(fā)出任何計(jì)算結(jié)果。通過只有當(dāng)檢查點(diǎn)保存完成以后再寫入數(shù)據(jù)這種方法,事務(wù)性的方法將不會(huì)遭受冪等性寫入所遭受的重播不一致的問題。盡管如此,事務(wù)性寫入?yún)s帶來了延遲,因?yàn)橹挥性跈z查點(diǎn)完成以后,我們才能看到計(jì)算結(jié)果。

Flink提供了兩種構(gòu)建模塊來實(shí)現(xiàn)事務(wù)性sink連接器:write-ahead-log(WAL,預(yù)寫式日志)sink和兩階段提交sink。WAL式sink將會(huì)把所有計(jì)算結(jié)果寫入到應(yīng)用程序的狀態(tài)中,等接到檢查點(diǎn)完成的通知,才會(huì)將計(jì)算結(jié)果發(fā)送到sink系統(tǒng)。因?yàn)閟ink操作會(huì)把數(shù)據(jù)都緩存在狀態(tài)后段,所以WAL可以使用在任何外部sink系統(tǒng)上。盡管如此,WAL還是無法提供刀槍不入的恰好處理一次語(yǔ)義的保證,再加上由于要緩存數(shù)據(jù)帶來的狀態(tài)后段的狀態(tài)大小的問題,WAL模型并不十分完美。

與之形成對(duì)比的,2PC sink需要sink系統(tǒng)提供事務(wù)的支持或者可以模擬出事務(wù)特性的模塊。對(duì)于每一個(gè)檢查點(diǎn),sink開始一個(gè)事務(wù),然后將所有的接收到的數(shù)據(jù)都添加到事務(wù)中,并將這些數(shù)據(jù)寫入到sink系統(tǒng),但并沒有提交(commit)它們。當(dāng)事務(wù)接收到檢查點(diǎn)完成的通知時(shí),事務(wù)將被commit,數(shù)據(jù)將被真正的寫入sink系統(tǒng)。這項(xiàng)機(jī)制主要依賴于一次sink可以在檢查點(diǎn)完成之前開始事務(wù),并在應(yīng)用程序從一次故障中恢復(fù)以后再commit的能力。

2PC協(xié)議依賴于Flink的檢查點(diǎn)機(jī)制。檢查點(diǎn)屏障是開始一個(gè)新的事務(wù)的通知,所有操作符自己的檢查點(diǎn)成功的通知是它們可以commit的投票,而作業(yè)管理器通知一個(gè)檢查點(diǎn)成功的消息是commit事務(wù)的指令。于WAL sink形成對(duì)比的是,2PC sinks依賴于sink系統(tǒng)和sink本身的實(shí)現(xiàn)可以實(shí)現(xiàn)恰好處理一次語(yǔ)義。更多的,2PC sink不斷的將數(shù)據(jù)寫入到sink系統(tǒng)中,而WAL寫模型就會(huì)有之前所述的問題。

不可重置的源 可重置的源
any sink at-most-once at-least-once
冪等性sink at-most-once exactly-once(當(dāng)從任務(wù)失敗中恢復(fù)時(shí),存在暫時(shí)的不一致性)
預(yù)寫式日志sink at-most-once at-least-once
2PC sink at-most-once exactly-once

第9章 搭建Flink運(yùn)行流式應(yīng)用


第10章 Flink和流式應(yīng)用運(yùn)維


第11章 還有什么

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容