前言
Flink 有非常靈活的分層 API 設(shè)計(jì),其中的核心層就是 DataStream/DataSet API。由于新版本已經(jīng)實(shí)現(xiàn)了流批一體,DataSet API 將被棄用,官方推薦統(tǒng)一使用 DataStream API 處理流數(shù)據(jù)和批數(shù)據(jù)。
DataStream(數(shù)據(jù)流)本身是 Flink 中一個(gè)用來(lái)表示數(shù)據(jù)集合的類(Class),我們編寫的Flink 代碼其實(shí)就是基于這種數(shù)據(jù)類型的處理,所以這套核心 API 就以 DataStream 命名。對(duì)于批處理和流處理,我們都可以用這同一套 API 來(lái)實(shí)現(xiàn)。
一個(gè) Flink 程序,其實(shí)就是對(duì) DataStream 的各種轉(zhuǎn)換。具體來(lái)說(shuō),代碼基本上都由以下幾部分構(gòu)成:
- 獲取執(zhí)行環(huán)境(execution environment)
- 讀取數(shù)據(jù)源(source)
- 定義基于數(shù)據(jù)的轉(zhuǎn)換操作(transformations)
- 定義計(jì)算結(jié)果的輸出位置(sink)
- 觸發(fā)程序執(zhí)行(execute)
其中,獲取環(huán)境和觸發(fā)執(zhí)行,都可以認(rèn)為是針對(duì)執(zhí)行環(huán)境的操作。

一、執(zhí)行環(huán)境(Execution Environment)
Flink 程序可以在各種上下文環(huán)境中運(yùn)行:我們可以在本地 JVM 中執(zhí)行程序,也可以提交到遠(yuǎn)程集群上運(yùn)行。
不同的環(huán)境,代碼的提交運(yùn)行的過(guò)程會(huì)有所不同。這就要求我們?cè)谔峤蛔鳂I(yè)執(zhí)行計(jì)算時(shí),首先必須獲取當(dāng)前 Flink 的運(yùn)行環(huán)境,從而建立起與 Flink 框架之間的聯(lián)系。只有獲取了環(huán)境上下文信息,才能將具體的任務(wù)調(diào)度到不同的 TaskManager 執(zhí)行。
1.1 創(chuàng)建執(zhí)行環(huán)境
- StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
這種“智能”的方式不需要我們額外做判斷,用起來(lái)簡(jiǎn)單高效,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
1.2 StreamExecutionEnvironment.createLocalEnvironment()
這個(gè)方法返回一個(gè)本地執(zhí)行環(huán)境??梢栽谡{(diào)用時(shí)傳入一個(gè)參數(shù),指定默認(rèn)的并行度;如果不傳入,則默認(rèn)并行度就是本地的 CPU 核心數(shù)。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
1.3 StreamExecutionEnvironment.createRemoteEnvironment()
這個(gè)方法返回集群執(zhí)行環(huán)境。需要在調(diào)用時(shí)指定 JobManager 的主機(jī)名和端口號(hào),并指定要在集群中運(yùn)行的 Jar 包。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"host", // JobManager 主機(jī)名
1234, // JobManager 進(jìn)程端口號(hào)
"path/to/jarFile.jar" // 提交給 JobManager 的 JAR 包
);
二、執(zhí)行模式(Execution Mode)
我們獲取到的執(zhí)行環(huán)境,是一個(gè) StreamExecutionEnvironment,顧名思義它應(yīng)該是做流處理的。那對(duì)于批處理,又應(yīng)該怎么獲取執(zhí)行環(huán)境呢?
在之前的 Flink 版本中,批處理的執(zhí)行環(huán)境與流處理類似,是調(diào)用類ExecutionEnvironment的靜態(tài)方法,返回它的對(duì)象:
// 批處理環(huán)境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
而從 1.12.0 版本起,F(xiàn)link 實(shí)現(xiàn)了 API 上的流批統(tǒng)一。DataStream API 新增了一個(gè)重要特性:可以支持不同的“執(zhí)行模式”(execution mode),通過(guò)簡(jiǎn)單的設(shè)置就可以讓一段 Flink 程序在流處理和批處理之間切換。這樣一來(lái),DataSet API 也就沒(méi)有存在的必要了。
流執(zhí)行模式(STREAMING)
這是 DataStream API 最經(jīng)典的模式,一般用于需要持續(xù)實(shí)時(shí)處理的無(wú)界數(shù)據(jù)流。默認(rèn)情況下,程序使用的就是 STREAMING 執(zhí)行模式。批執(zhí)行模式(BATCH)
專門用于批處理的執(zhí)行模式, 這種模式下,F(xiàn)link 處理作業(yè)的方式類似于MapReduce 框架。對(duì)于不會(huì)持續(xù)計(jì)算的有界數(shù)據(jù),我們用這種模式處理會(huì)更方便。自動(dòng)模式(AUTOMATIC)
在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界,來(lái)自動(dòng)選擇執(zhí)行模式。
2.1 BATCH 模式的配置方法
由于 Flink 程序默認(rèn)是 STREAMING 模式,我們這里重點(diǎn)介紹一下 BATCH 模式的配置。
主要有兩種方式:
- 1、通過(guò)命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH
在提交作業(yè)時(shí),增加 execution.runtime-mode 參數(shù),指定值為 BATCH。
- 2、通過(guò)代碼配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代碼中,直接基于執(zhí)行環(huán)境調(diào)用 setRuntimeMode 方法,傳入 BATCH 模式。
建議:不要在代碼中配置,而是使用命令行。在提交作業(yè)時(shí)指定參數(shù)可以更加靈活,在代碼中可擴(kuò)展太差,不建議使用。
三、觸發(fā)程序執(zhí)行
有了執(zhí)行環(huán)境,我們就可以構(gòu)建程序的處理流程了:基于環(huán)境讀取數(shù)據(jù)源,進(jìn)而進(jìn)行各種轉(zhuǎn)換操作,最后輸出結(jié)果到外部系統(tǒng)。
需要注意的是,寫完輸出(sink)操作并不代表程序已經(jīng)結(jié)束。因?yàn)楫?dāng) main()方法被調(diào)用時(shí),其實(shí)只是定義了作業(yè)的每個(gè)執(zhí)行操作,然后添加到數(shù)據(jù)流圖中;這時(shí)并沒(méi)有真正處理數(shù)據(jù)——因?yàn)閿?shù)據(jù)可能還沒(méi)來(lái)。Flink 是由事件驅(qū)動(dòng)的,只有等到數(shù)據(jù)到來(lái),才會(huì)觸發(fā)真正的計(jì)算,這也被稱為“延遲執(zhí)行”或“懶執(zhí)行”(lazy execution)。
所以我們需要顯式地調(diào)用執(zhí)行環(huán)境的 execute()方法,來(lái)觸發(fā)程序執(zhí)行。execute()方法將一直等待作業(yè)完成,然后返回一個(gè)執(zhí)行結(jié)果(JobExecutionResult)。
env.execute();
參考:
https://blog.csdn.net/weixin_45417821/article/details/124141186