Flink 使用介紹相關文檔目錄
批模式和流模式
Flink從誕生以來,在設計上一套架構(gòu)同時支持批模式和流模式。在Flink1.12之前,F(xiàn)link針對批處理作業(yè)和流處理作業(yè)分別提供了2套不同的API。用戶對于批處理作業(yè)和流處理作業(yè)需要編寫不同的應用代碼。一定程度上限制了Flink的靈活性。幸運的是Flink1.12版本之后,社區(qū)廢棄了Flink DataSet API(批模式)API。流模式API可以同時兼容批處理模式。這一舉措打破了Flink批流融合的最后一道障礙。
Flink使用統(tǒng)一的方式去處理流式作業(yè)和批式作業(yè)。對于有界的輸入源,無論Flink配置成什么執(zhí)行模式,都可以保證最終的結(jié)果是一致的。通常來說Flink的流處理模式會按照間隔(Trigger等控制)輸出增量的中間結(jié)果,對于批模式而言,中間的處理結(jié)果不會被輸出,只會等到數(shù)據(jù)攝入處理完畢之后,輸出最終的處理結(jié)果。所以說上面最終的結(jié)果一致指的是不考慮流模式中間結(jié)果輸出的情況下,流模式等到有界數(shù)據(jù)全部攝入之后,最終輸出的結(jié)果和使用批模式運行時完全一樣的。
就適用范圍來說,批模式適用于有界數(shù)據(jù)源。流模式適用于無界/有界數(shù)據(jù)源。
Flink針對不同的處理模式,會有針對應的優(yōu)化措施。除此之外對于部分算子還會有行為上的不同。本篇以官網(wǎng)Execution Mode (Batch/Streaming) | Apache Flink為準,介紹下Flink在批模式和流模式下的不同行為和優(yōu)化方式。
配置方式
配置Flink的運行模式有3種。flink-conf.yaml配置文件,應用代碼中編寫和提交作業(yè)時候指定。優(yōu)先級依次從低到高。后面的配置會覆蓋前面的配置。
flink-conf配置文件配置
配置項的名稱為execution.runtime-mode。有如下3個值:
- STREAMING:使用流模式。這一項是默認值。
- BATCH:使用批模式。
- AUTOMATIC:讓Flink基于數(shù)據(jù)源是否有界來決定采用哪種模式。
在應用代碼中配置
應用代碼如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置為流模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
// 設置為批模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
// 自動模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
在提交作業(yè)時候配置
提交命令如下:
bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
上面的命令中使用-Dexecution.runtime-mode來覆蓋默認的執(zhí)行模式配置。建議使用提交作業(yè)的時候命令行指定執(zhí)行模式的方式。這種方式切換執(zhí)行模式非常靈活,不需要修改和重新編譯作業(yè)代碼。
行為區(qū)別
任務調(diào)度和shuffle
和Spark類似,F(xiàn)link也是以shuffle邊界劃分stage。
這個可以參考官網(wǎng)的解釋。官網(wǎng)的例子為:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
對于map,flatMap,filter等算子,他們的數(shù)據(jù)來源是單一的(只從某個上游的分區(qū))。這些算子如果沒有改變并行度的話,可以將這些算子整合到一個Task中運行。這個術(shù)語在Flink中叫做chaining。簡單起來可以將chain在一起的一系列算子理解為他們組成了一個算子,他們整體是調(diào)度的最小單位。Task可以以多個并行度運行,每一個并行度的實例稱之為一個subtask。Chaining在一起的算子被完整的包含在subtask中,整體參與調(diào)度。這種優(yōu)化可以顯著減少task數(shù)量,減少不必要的線程執(zhí)行上下文切換,減少不必要的網(wǎng)絡數(shù)據(jù)交換和線程/進程間數(shù)據(jù)交換。
對于rebalance,keyBy等算子,他們所需的數(shù)據(jù)不一定來自同一個數(shù)據(jù)分區(qū)。所以說這類數(shù)據(jù)源往往伴隨著數(shù)據(jù)的分發(fā)和交換。他們叫做shuffle算子。Flink和Spark類似,也是以這類算子來劃分執(zhí)行計劃的stage。因此上面的例子這個作業(yè)被劃分為了三個task:Task1(map1, map2),Task2(map3, map4),Task3(map5, map6, sink)。中間使用rebalance和keyBy算子連接在一起。
批模式和流模式的Task資源分配和網(wǎng)絡shuffle的行為是不同的。
對于流模式而言,由于數(shù)據(jù)是持續(xù)不斷的到來,結(jié)果要求持續(xù)不斷的輸出,因此所有的task都必須同時運行,還需要占有足夠多的資源來應對瞬間的高流量沖擊,確保流處理作業(yè)的低延遲。所以說,taskManager必須分配足夠的資源和slot給所有的task。網(wǎng)絡shuffle是pipelined,上游的數(shù)據(jù)會被立刻發(fā)往下游。以吞吐量換取低延遲。但是高頻次發(fā)送少量數(shù)據(jù)會影響網(wǎng)絡通信性能。針對流處理業(yè)務的特點,F(xiàn)link做出了取舍。
對于批模式而言,不要求作業(yè)延遲很低,也不關注作業(yè)運行的中間結(jié)果。因此,各個stage不用同時執(zhí)行。事實上,他們是分批執(zhí)行的,也就是說可以集中所有資源運行一個stage,將中間結(jié)果緩存起來,然后銷毀任務,再分配資源開始運行下一個stage。中間結(jié)果緩存起來,即便是上游stage已經(jīng)運行完畢停止了,下游stage也可以讀取上游計算結(jié)果。下游stage如果計算過程中出現(xiàn)問題導致失敗,還可以重啟stage,從緩存結(jié)果再次讀取中間結(jié)果,沒必要重新計算整個批處理作業(yè)。Flink在批模式下犧牲延遲換取高吞吐量。
狀態(tài)后端
流模式:使用狀態(tài)后端。
批模式:狀態(tài)后端配置會被忽略。對于keyed(分區(qū))操作,由于數(shù)據(jù)已按照key排序,狀態(tài)只需保存一個key下的數(shù)據(jù)。當處理到下一個key的時候,這些數(shù)據(jù)會被刪除。個人理解是批模式由于數(shù)據(jù)源是有界的,可以對數(shù)據(jù)進行預加工(按照key排序),狀態(tài)僅緩存上一步的中間結(jié)果即可。
處理順序
流模式和批模式針對數(shù)據(jù)到來順序的處理有很大不同。
對于流模式,數(shù)據(jù)到來的順序本身也是一種有意義的信息(亂序的問題除外,F(xiàn)link有應對措施)。所以說語義上保留接收到的數(shù)據(jù)順序,不會調(diào)整。數(shù)據(jù)一旦到來會立刻被處理。
對于批模式,除了排序操作外,F(xiàn)link先處理哪個數(shù)據(jù)并不是很重要,因此Flink可以做出針對應優(yōu)化。對于KeyedStream,會對key排序,一次處理同一個key下的所有數(shù)據(jù)。對于broadcast和非keyed數(shù)據(jù),不排序。通過本人的另一篇博客Flink 源碼之batch問題處理可以發(fā)現(xiàn)批處理模式下,F(xiàn)link一次處理同一個key下的所有數(shù)據(jù)。
Event time/watermark
流模式認為數(shù)據(jù)可能亂序。為了解決亂序問題Flink引入了watermark機制。Flink數(shù)據(jù)源(或者配置)中可以指定watermark發(fā)送策略(周期發(fā)送還是接收到數(shù)據(jù)的時候發(fā)送)。watermark攜帶了一個時間戳,F(xiàn)link算子接收到watermark的時候會讀取這個時間戳,認為該時間戳之前的數(shù)據(jù)都已經(jīng)全部到齊,可以開始計算(比如window計算,window的結(jié)束時間在watermark時間戳之前,這時候認為該window中所有的數(shù)據(jù)都已經(jīng)接收到,可以開始計算)。因此使用watermark可一定程度上避免亂序問題(watermark時間戳比接收到元素的時間戳早一些,起到了等待數(shù)據(jù)的作用),避免遲到數(shù)據(jù)影響統(tǒng)計結(jié)果。
批模式中數(shù)據(jù)是否亂序沒有意義。根本無需watermark,只需要在數(shù)據(jù)攝入結(jié)束的時候發(fā)送MAX_WATERMARK,讓所有的數(shù)據(jù)參與計算就可以了。
Processing time
Processing time是Flink集群的時間。與之相對的Event time是數(shù)據(jù)攜帶的時間。同一個數(shù)據(jù)中Event time是不會改變的,因此基于event time的數(shù)據(jù)處理結(jié)果是確定的。但數(shù)據(jù)的processing time和數(shù)據(jù)被真正處理的時間是相關的。基于processing time的數(shù)據(jù)處理結(jié)果是不確定/不可重現(xiàn)的。
在流模式中,processing time和event time通常具有相關性。比如event time一小時時間跨度的實時數(shù)據(jù),通常他們的processing time跨度也是一小時。這樣processing time可用于early fire,提前輸出中間計算的結(jié)果來預測最終結(jié)果。參見Flink 源碼之 Table early fire 和 late fire。
對于批模式processing time和event time沒有任何相關性。允許用戶使用processing time和注冊processing time定時器,但是event time定時器僅在數(shù)據(jù)輸入結(jié)束的時候觸發(fā)。
失敗恢復
作業(yè)執(zhí)行圖會被劃分為多個pipelined region。對于blocking 的數(shù)據(jù)交換方式,結(jié)果分區(qū)會在上游全部計算完成后再交由下游進行消費,數(shù)據(jù)會持久化到本地,支持多次消費。對于pipelined 數(shù)據(jù)交換,上游結(jié)果分區(qū)的產(chǎn)出和下游任務節(jié)點的消費是同時進行的,所有數(shù)據(jù)不會被持久化且只能讀取一次。由pipelined邊相連的節(jié)點構(gòu)成了一個region。Region為Flink故障重啟恢復的最小單位。
對于流模式,作業(yè)形成一個pipelined region,因此遇到故障,作業(yè)全部重啟。
批模式只需要重啟失敗的stage(region)。因為該region和上游region的數(shù)據(jù)交換方式為blocked,數(shù)據(jù)可以持久化到本地且支持重復消費。
算子行為
部分算子的行為在批處理和流處理模式下的行為不同。通常體現(xiàn)在聚合類型算子在流計算模式會輸出中間結(jié)果,在批處理模式下只輸出最終結(jié)果。
比如reduce 或者 sum等在流模式使用滾動輸出方式(每到來一條數(shù)據(jù)計算并輸出一次),批模式僅輸出最終結(jié)果。
迭代計算和依賴checkpoint的算子不支持。
為了演示聚合算子行為的不同,我們編寫如下例子:
public class BatchStreamDemo {
public static void main(String[] args) throws Exception {
doBatch();
// doStream();
}
public static void doBatch() throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
DataStreamSource<Integer> integerDataStreamSource = streamExecutionEnvironment.fromElements(1, 2, 3, 4, 5, 6);
integerDataStreamSource.keyBy((value) -> 1).sum(0).print();
streamExecutionEnvironment.execute();
}
public static void doStream() throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<Integer> integerDataStreamSource = streamExecutionEnvironment.fromElements(1, 2, 3, 4, 5, 6);
integerDataStreamSource.keyBy((value) -> 1).sum(0).print();
streamExecutionEnvironment.execute();
}
}
分別運行doBatch和doStream方法。我們發(fā)現(xiàn)同樣是sum算子,批處理只會輸出最后的計算結(jié)果,流模式每次讀進來一個數(shù)據(jù),都會將當前已讀取到數(shù)據(jù)求和一次,將結(jié)果輸出。
參考文獻
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API