Flink 使用之批模式和流模式

Flink 使用介紹相關文檔目錄

Flink 使用介紹相關文檔目錄

批模式和流模式

Flink從誕生以來,在設計上一套架構(gòu)同時支持批模式和流模式。在Flink1.12之前,F(xiàn)link針對批處理作業(yè)和流處理作業(yè)分別提供了2套不同的API。用戶對于批處理作業(yè)和流處理作業(yè)需要編寫不同的應用代碼。一定程度上限制了Flink的靈活性。幸運的是Flink1.12版本之后,社區(qū)廢棄了Flink DataSet API(批模式)API。流模式API可以同時兼容批處理模式。這一舉措打破了Flink批流融合的最后一道障礙。

Flink使用統(tǒng)一的方式去處理流式作業(yè)和批式作業(yè)。對于有界的輸入源,無論Flink配置成什么執(zhí)行模式,都可以保證最終的結(jié)果是一致的。通常來說Flink的流處理模式會按照間隔(Trigger等控制)輸出增量的中間結(jié)果,對于批模式而言,中間的處理結(jié)果不會被輸出,只會等到數(shù)據(jù)攝入處理完畢之后,輸出最終的處理結(jié)果。所以說上面最終的結(jié)果一致指的是不考慮流模式中間結(jié)果輸出的情況下,流模式等到有界數(shù)據(jù)全部攝入之后,最終輸出的結(jié)果和使用批模式運行時完全一樣的。

就適用范圍來說,批模式適用于有界數(shù)據(jù)源。流模式適用于無界/有界數(shù)據(jù)源。

Flink針對不同的處理模式,會有針對應的優(yōu)化措施。除此之外對于部分算子還會有行為上的不同。本篇以官網(wǎng)Execution Mode (Batch/Streaming) | Apache Flink為準,介紹下Flink在批模式和流模式下的不同行為和優(yōu)化方式。

配置方式

配置Flink的運行模式有3種。flink-conf.yaml配置文件,應用代碼中編寫和提交作業(yè)時候指定。優(yōu)先級依次從低到高。后面的配置會覆蓋前面的配置。

flink-conf配置文件配置

配置項的名稱為execution.runtime-mode。有如下3個值:

  • STREAMING:使用流模式。這一項是默認值。
  • BATCH:使用批模式。
  • AUTOMATIC:讓Flink基于數(shù)據(jù)源是否有界來決定采用哪種模式。

在應用代碼中配置

應用代碼如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置為流模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
// 設置為批模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
// 自動模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)

在提交作業(yè)時候配置

提交命令如下:

bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

上面的命令中使用-Dexecution.runtime-mode來覆蓋默認的執(zhí)行模式配置。建議使用提交作業(yè)的時候命令行指定執(zhí)行模式的方式。這種方式切換執(zhí)行模式非常靈活,不需要修改和重新編譯作業(yè)代碼。

行為區(qū)別

任務調(diào)度和shuffle

和Spark類似,F(xiàn)link也是以shuffle邊界劃分stage。

這個可以參考官網(wǎng)的解釋。官網(wǎng)的例子為:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<String> source = env.fromElements(...);

source.name("source")
    .map(...).name("map1")
    .map(...).name("map2")
    .rebalance()
    .map(...).name("map3")
    .map(...).name("map4")
    .keyBy((value) -> value)
    .map(...).name("map5")
    .map(...).name("map6")
    .sinkTo(...).name("sink");

對于map,flatMap,filter等算子,他們的數(shù)據(jù)來源是單一的(只從某個上游的分區(qū))。這些算子如果沒有改變并行度的話,可以將這些算子整合到一個Task中運行。這個術(shù)語在Flink中叫做chaining。簡單起來可以將chain在一起的一系列算子理解為他們組成了一個算子,他們整體是調(diào)度的最小單位。Task可以以多個并行度運行,每一個并行度的實例稱之為一個subtask。Chaining在一起的算子被完整的包含在subtask中,整體參與調(diào)度。這種優(yōu)化可以顯著減少task數(shù)量,減少不必要的線程執(zhí)行上下文切換,減少不必要的網(wǎng)絡數(shù)據(jù)交換和線程/進程間數(shù)據(jù)交換。

對于rebalance,keyBy等算子,他們所需的數(shù)據(jù)不一定來自同一個數(shù)據(jù)分區(qū)。所以說這類數(shù)據(jù)源往往伴隨著數(shù)據(jù)的分發(fā)和交換。他們叫做shuffle算子。Flink和Spark類似,也是以這類算子來劃分執(zhí)行計劃的stage。因此上面的例子這個作業(yè)被劃分為了三個task:Task1(map1, map2),Task2(map3, map4),Task3(map5, map6, sink)。中間使用rebalance和keyBy算子連接在一起。

批模式和流模式的Task資源分配和網(wǎng)絡shuffle的行為是不同的。

對于流模式而言,由于數(shù)據(jù)是持續(xù)不斷的到來,結(jié)果要求持續(xù)不斷的輸出,因此所有的task都必須同時運行,還需要占有足夠多的資源來應對瞬間的高流量沖擊,確保流處理作業(yè)的低延遲。所以說,taskManager必須分配足夠的資源和slot給所有的task。網(wǎng)絡shuffle是pipelined,上游的數(shù)據(jù)會被立刻發(fā)往下游。以吞吐量換取低延遲。但是高頻次發(fā)送少量數(shù)據(jù)會影響網(wǎng)絡通信性能。針對流處理業(yè)務的特點,F(xiàn)link做出了取舍。

對于批模式而言,不要求作業(yè)延遲很低,也不關注作業(yè)運行的中間結(jié)果。因此,各個stage不用同時執(zhí)行。事實上,他們是分批執(zhí)行的,也就是說可以集中所有資源運行一個stage,將中間結(jié)果緩存起來,然后銷毀任務,再分配資源開始運行下一個stage。中間結(jié)果緩存起來,即便是上游stage已經(jīng)運行完畢停止了,下游stage也可以讀取上游計算結(jié)果。下游stage如果計算過程中出現(xiàn)問題導致失敗,還可以重啟stage,從緩存結(jié)果再次讀取中間結(jié)果,沒必要重新計算整個批處理作業(yè)。Flink在批模式下犧牲延遲換取高吞吐量。

狀態(tài)后端

流模式:使用狀態(tài)后端。

批模式:狀態(tài)后端配置會被忽略。對于keyed(分區(qū))操作,由于數(shù)據(jù)已按照key排序,狀態(tài)只需保存一個key下的數(shù)據(jù)。當處理到下一個key的時候,這些數(shù)據(jù)會被刪除。個人理解是批模式由于數(shù)據(jù)源是有界的,可以對數(shù)據(jù)進行預加工(按照key排序),狀態(tài)僅緩存上一步的中間結(jié)果即可。

處理順序

流模式和批模式針對數(shù)據(jù)到來順序的處理有很大不同。

對于流模式,數(shù)據(jù)到來的順序本身也是一種有意義的信息(亂序的問題除外,F(xiàn)link有應對措施)。所以說語義上保留接收到的數(shù)據(jù)順序,不會調(diào)整。數(shù)據(jù)一旦到來會立刻被處理。

對于批模式,除了排序操作外,F(xiàn)link先處理哪個數(shù)據(jù)并不是很重要,因此Flink可以做出針對應優(yōu)化。對于KeyedStream,會對key排序,一次處理同一個key下的所有數(shù)據(jù)。對于broadcast和非keyed數(shù)據(jù),不排序。通過本人的另一篇博客Flink 源碼之batch問題處理可以發(fā)現(xiàn)批處理模式下,F(xiàn)link一次處理同一個key下的所有數(shù)據(jù)。

Event time/watermark

流模式認為數(shù)據(jù)可能亂序。為了解決亂序問題Flink引入了watermark機制。Flink數(shù)據(jù)源(或者配置)中可以指定watermark發(fā)送策略(周期發(fā)送還是接收到數(shù)據(jù)的時候發(fā)送)。watermark攜帶了一個時間戳,F(xiàn)link算子接收到watermark的時候會讀取這個時間戳,認為該時間戳之前的數(shù)據(jù)都已經(jīng)全部到齊,可以開始計算(比如window計算,window的結(jié)束時間在watermark時間戳之前,這時候認為該window中所有的數(shù)據(jù)都已經(jīng)接收到,可以開始計算)。因此使用watermark可一定程度上避免亂序問題(watermark時間戳比接收到元素的時間戳早一些,起到了等待數(shù)據(jù)的作用),避免遲到數(shù)據(jù)影響統(tǒng)計結(jié)果。

批模式中數(shù)據(jù)是否亂序沒有意義。根本無需watermark,只需要在數(shù)據(jù)攝入結(jié)束的時候發(fā)送MAX_WATERMARK,讓所有的數(shù)據(jù)參與計算就可以了。

Processing time

Processing time是Flink集群的時間。與之相對的Event time是數(shù)據(jù)攜帶的時間。同一個數(shù)據(jù)中Event time是不會改變的,因此基于event time的數(shù)據(jù)處理結(jié)果是確定的。但數(shù)據(jù)的processing time和數(shù)據(jù)被真正處理的時間是相關的。基于processing time的數(shù)據(jù)處理結(jié)果是不確定/不可重現(xiàn)的。

在流模式中,processing time和event time通常具有相關性。比如event time一小時時間跨度的實時數(shù)據(jù),通常他們的processing time跨度也是一小時。這樣processing time可用于early fire,提前輸出中間計算的結(jié)果來預測最終結(jié)果。參見Flink 源碼之 Table early fire 和 late fire。

對于批模式processing time和event time沒有任何相關性。允許用戶使用processing time和注冊processing time定時器,但是event time定時器僅在數(shù)據(jù)輸入結(jié)束的時候觸發(fā)。

失敗恢復

作業(yè)執(zhí)行圖會被劃分為多個pipelined region。對于blocking 的數(shù)據(jù)交換方式,結(jié)果分區(qū)會在上游全部計算完成后再交由下游進行消費,數(shù)據(jù)會持久化到本地,支持多次消費。對于pipelined 數(shù)據(jù)交換,上游結(jié)果分區(qū)的產(chǎn)出和下游任務節(jié)點的消費是同時進行的,所有數(shù)據(jù)不會被持久化且只能讀取一次。由pipelined邊相連的節(jié)點構(gòu)成了一個region。Region為Flink故障重啟恢復的最小單位。

對于流模式,作業(yè)形成一個pipelined region,因此遇到故障,作業(yè)全部重啟。

批模式只需要重啟失敗的stage(region)。因為該region和上游region的數(shù)據(jù)交換方式為blocked,數(shù)據(jù)可以持久化到本地且支持重復消費。

算子行為

部分算子的行為在批處理和流處理模式下的行為不同。通常體現(xiàn)在聚合類型算子在流計算模式會輸出中間結(jié)果,在批處理模式下只輸出最終結(jié)果。

比如reduce 或者 sum等在流模式使用滾動輸出方式(每到來一條數(shù)據(jù)計算并輸出一次),批模式僅輸出最終結(jié)果。

迭代計算和依賴checkpoint的算子不支持。

為了演示聚合算子行為的不同,我們編寫如下例子:

public class BatchStreamDemo {
    public static void main(String[] args) throws Exception {
        doBatch();
//        doStream();
    }

    public static void doBatch() throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        DataStreamSource<Integer> integerDataStreamSource = streamExecutionEnvironment.fromElements(1, 2, 3, 4, 5, 6);
        integerDataStreamSource.keyBy((value) -> 1).sum(0).print();
        streamExecutionEnvironment.execute();
    }

    public static void doStream() throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource<Integer> integerDataStreamSource = streamExecutionEnvironment.fromElements(1, 2, 3, 4, 5, 6);
        integerDataStreamSource.keyBy((value) -> 1).sum(0).print();
        streamExecutionEnvironment.execute();
    }
}

分別運行doBatchdoStream方法。我們發(fā)現(xiàn)同樣是sum算子,批處理只會輸出最后的計算結(jié)果,流模式每次讀進來一個數(shù)據(jù),都會將當前已讀取到數(shù)據(jù)求和一次,將結(jié)果輸出。

參考文獻

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/

https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams

https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API

https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling#FLIP119PipelinedRegionScheduling-PipelinedRegionScheduling

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容