Apache Flink——DataStream API 執(zhí)行環(huán)境

前言

Flink 有非常靈活的分層 API 設(shè)計(jì),其中的核心層就是 DataStream/DataSet API。由于新版本已經(jīng)實(shí)現(xiàn)了流批一體,DataSet API 將被棄用,官方推薦統(tǒng)一使用 DataStream API 處理流數(shù)據(jù)和批數(shù)據(jù)。

DataStream(數(shù)據(jù)流)本身是 Flink 中一個(gè)用來(lái)表示數(shù)據(jù)集合的類(Class),我們編寫的Flink 代碼其實(shí)就是基于這種數(shù)據(jù)類型的處理,所以這套核心 API 就以 DataStream 命名。對(duì)于批處理和流處理,我們都可以用這同一套 API 來(lái)實(shí)現(xiàn)。

一個(gè) Flink 程序,其實(shí)就是對(duì) DataStream 的各種轉(zhuǎn)換。具體來(lái)說(shuō),代碼基本上都由以下幾部分構(gòu)成:

  • 獲取執(zhí)行環(huán)境(execution environment)
  • 讀取數(shù)據(jù)源(source)
  • 定義基于數(shù)據(jù)的轉(zhuǎn)換操作(transformations)
  • 定義計(jì)算結(jié)果的輸出位置(sink)
  • 觸發(fā)程序執(zhí)行(execute)

其中,獲取環(huán)境和觸發(fā)執(zhí)行,都可以認(rèn)為是針對(duì)執(zhí)行環(huán)境的操作。

一、執(zhí)行環(huán)境(Execution Environment)

Flink 程序可以在各種上下文環(huán)境中運(yùn)行:我們可以在本地 JVM 中執(zhí)行程序,也可以提交到遠(yuǎn)程集群上運(yùn)行。

不同的環(huán)境,代碼的提交運(yùn)行的過(guò)程會(huì)有所不同。這就要求我們?cè)谔峤蛔鳂I(yè)執(zhí)行計(jì)算時(shí),首先必須獲取當(dāng)前 Flink 的運(yùn)行環(huán)境,從而建立起與 Flink 框架之間的聯(lián)系。只有獲取了環(huán)境上下文信息,才能將具體的任務(wù)調(diào)度到不同的 TaskManager 執(zhí)行。

1.1 創(chuàng)建執(zhí)行環(huán)境

  • StreamExecutionEnvironment.getExecutionEnvironment()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

這種“智能”的方式不需要我們額外做判斷,用起來(lái)簡(jiǎn)單高效,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。

1.2 StreamExecutionEnvironment.createLocalEnvironment()

這個(gè)方法返回一個(gè)本地執(zhí)行環(huán)境??梢栽谡{(diào)用時(shí)傳入一個(gè)參數(shù),指定默認(rèn)的并行度;如果不傳入,則默認(rèn)并行度就是本地的 CPU 核心數(shù)。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

1.3 StreamExecutionEnvironment.createRemoteEnvironment()

這個(gè)方法返回集群執(zhí)行環(huán)境。需要在調(diào)用時(shí)指定 JobManager 的主機(jī)名和端口號(hào),并指定要在集群中運(yùn)行的 Jar 包。

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
                                            "host", // JobManager 主機(jī)名
                                             1234, // JobManager 進(jìn)程端口號(hào)
                                             "path/to/jarFile.jar" // 提交給 JobManager 的 JAR 包
                                            );

二、執(zhí)行模式(Execution Mode)

我們獲取到的執(zhí)行環(huán)境,是一個(gè) StreamExecutionEnvironment,顧名思義它應(yīng)該是做流處理的。那對(duì)于批處理,又應(yīng)該怎么獲取執(zhí)行環(huán)境呢?

在之前的 Flink 版本中,批處理的執(zhí)行環(huán)境與流處理類似,是調(diào)用類ExecutionEnvironment的靜態(tài)方法,返回它的對(duì)象:

// 批處理環(huán)境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

// 流處理環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

而從 1.12.0 版本起,F(xiàn)link 實(shí)現(xiàn)了 API 上的流批統(tǒng)一。DataStream API 新增了一個(gè)重要特性:可以支持不同的“執(zhí)行模式”(execution mode),通過(guò)簡(jiǎn)單的設(shè)置就可以讓一段 Flink 程序在流處理和批處理之間切換。這樣一來(lái),DataSet API 也就沒(méi)有存在的必要了。

  • 流執(zhí)行模式(STREAMING)
    這是 DataStream API 最經(jīng)典的模式,一般用于需要持續(xù)實(shí)時(shí)處理的無(wú)界數(shù)據(jù)流。默認(rèn)情況下,程序使用的就是 STREAMING 執(zhí)行模式。

  • 批執(zhí)行模式(BATCH)
    專門用于批處理的執(zhí)行模式, 這種模式下,F(xiàn)link 處理作業(yè)的方式類似于MapReduce 框架。對(duì)于不會(huì)持續(xù)計(jì)算的有界數(shù)據(jù),我們用這種模式處理會(huì)更方便。

  • 自動(dòng)模式(AUTOMATIC)
    在這種模式下,將由程序根據(jù)輸入數(shù)據(jù)源是否有界,來(lái)自動(dòng)選擇執(zhí)行模式。

2.1 BATCH 模式的配置方法

由于 Flink 程序默認(rèn)是 STREAMING 模式,我們這里重點(diǎn)介紹一下 BATCH 模式的配置。
主要有兩種方式:

  • 1、通過(guò)命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH

在提交作業(yè)時(shí),增加 execution.runtime-mode 參數(shù),指定值為 BATCH。

  • 2、通過(guò)代碼配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代碼中,直接基于執(zhí)行環(huán)境調(diào)用 setRuntimeMode 方法,傳入 BATCH 模式。

建議:不要在代碼中配置,而是使用命令行。在提交作業(yè)時(shí)指定參數(shù)可以更加靈活,在代碼中可擴(kuò)展太差,不建議使用。

三、觸發(fā)程序執(zhí)行

有了執(zhí)行環(huán)境,我們就可以構(gòu)建程序的處理流程了:基于環(huán)境讀取數(shù)據(jù)源,進(jìn)而進(jìn)行各種轉(zhuǎn)換操作,最后輸出結(jié)果到外部系統(tǒng)。

需要注意的是,寫完輸出(sink)操作并不代表程序已經(jīng)結(jié)束。因?yàn)楫?dāng) main()方法被調(diào)用時(shí),其實(shí)只是定義了作業(yè)的每個(gè)執(zhí)行操作,然后添加到數(shù)據(jù)流圖中;這時(shí)并沒(méi)有真正處理數(shù)據(jù)——因?yàn)閿?shù)據(jù)可能還沒(méi)來(lái)。Flink 是由事件驅(qū)動(dòng)的,只有等到數(shù)據(jù)到來(lái),才會(huì)觸發(fā)真正的計(jì)算,這也被稱為“延遲執(zhí)行”或“懶執(zhí)行”(lazy execution)。

所以我們需要顯式地調(diào)用執(zhí)行環(huán)境的 execute()方法,來(lái)觸發(fā)程序執(zhí)行。execute()方法將一直等待作業(yè)完成,然后返回一個(gè)執(zhí)行結(jié)果(JobExecutionResult)。

env.execute();

參考:
https://blog.csdn.net/weixin_45417821/article/details/124141186

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容