一、Flink整體架構(gòu)
Flink整體架構(gòu)可以分為APIs&Libraries、Core和Deploy三層:
- Libraries層也被稱作Flink應(yīng)用組件層,是在API層之上構(gòu)建滿足了特定應(yīng)用領(lǐng)域的計(jì)算框架,包括面向流處理的CEP(復(fù)雜事件處理)、類SQL操作,面向批處理的FlinkML(機(jī)器學(xué)習(xí)庫(kù))、Gelly(圖處理)等;APIs層主要實(shí)現(xiàn)了面向流處理對(duì)應(yīng)的DataStream API,面向批處理對(duì)應(yīng)的DataSet API。
- Core層提供了Flink運(yùn)行時(shí)的全部核心實(shí)現(xiàn),例如支持分布式Stream作業(yè)執(zhí)行、JobGraph到ExecutionGraph的映射和調(diào)度等,為API層提供了基礎(chǔ)服務(wù)。
- Deploy層支持多種部署模式,包括本地、集群(Standalone、YARN、Kubernetes)及云部署(GCE/EC2)。

接下來(lái)我們從上向下,依次介紹Flink的架構(gòu)設(shè)計(jì)與實(shí)現(xiàn),如有不當(dāng)之處歡迎交流與拍磚~
二、Flink API
Flink提供了多種抽象的編程接口,適用于不同層級(jí)用戶的需求,如下圖所示:

2.1 Stateful Processing Function
最底層級(jí)的抽象提供了強(qiáng)大且靈活的編程能力,在其中可以直接操作狀態(tài)數(shù)據(jù)、TimeService等服務(wù),同時(shí)可以注冊(cè)事件時(shí)間和處理時(shí)間回調(diào)定時(shí)器,使程序能夠?qū)崿F(xiàn)更加復(fù)雜的計(jì)算。使用Stateful Processing Function需要借助DataStream API。雖然靈活度很高,但是使用復(fù)雜度也相對(duì)較高,且在DataStreamAPI中已經(jīng)封裝了豐富的算子可以直接使用,因此除非用戶需要自定義比較復(fù)雜的算子(如直接操作狀態(tài)數(shù)據(jù)等),否則無(wú)須使用Stateful Processing Function來(lái)開發(fā)Flink作業(yè)。
2.2 DataStream&DataSet API
大多數(shù)應(yīng)用都是針對(duì)Core API進(jìn)行編程 :DataStream API( 有界或無(wú)界流數(shù)據(jù)) 和DataSet API(有界數(shù)據(jù)集)。這些 API提供了通用的數(shù)據(jù)處理操作, 比如由用戶定義的多種形式的轉(zhuǎn)換( transformations)、連接( joins)、聚合( aggregations)、窗口操作( windows) 等等。DataSet API 為有界數(shù)據(jù)集提供了額外的支持, 例如循環(huán)與迭代。
值得一提的是,雖然Table和SQL API已經(jīng)能夠做到批流一體,但這僅是在邏輯層面上,最終還是會(huì)轉(zhuǎn)換成DataSet API和DataStream API對(duì)應(yīng)的作業(yè)。在未來(lái)的版本中,F(xiàn)link將逐漸通過DataStream處理有界數(shù)據(jù)集和無(wú)界數(shù)據(jù)集,實(shí)現(xiàn)真正意義上的批流一體。
2.3 Flink SQL & Table API
Flink提供的高層級(jí)的抽象是Table API與Flink SQL 。其中Table API 是以表為中心的聲明式編程,表可能會(huì)動(dòng)態(tài)變化(在表達(dá)流數(shù)據(jù)時(shí))。Table API遵循關(guān)系模型:表有二維數(shù)據(jù)結(jié)構(gòu)(schema,類似于關(guān)系數(shù)據(jù)庫(kù)中的表),同時(shí)API提供常用的查詢操作,例如select、project、join、group-by、aggregate等。Table API 可以通過用戶自定義函數(shù)( UDF)進(jìn)行擴(kuò)展。除此之外,Table API程序在執(zhí)行之前會(huì)經(jīng)過內(nèi)置優(yōu)化器進(jìn)行優(yōu)化。
Flink SQL在語(yǔ)法與表達(dá)能力上與Table API類似,只是以SQL查詢表達(dá)式的形式編寫和表達(dá)邏輯。SQL抽象與Table API交互密切,SQL查詢可以直接在Table API定義的表上執(zhí)行。
三、DataStream解析
DataStream API用于構(gòu)建流式類型的Flink程序,處理實(shí)時(shí)無(wú)界數(shù)據(jù)流,是Flink系統(tǒng)中最重要的API。本節(jié)結(jié)合一個(gè)簡(jiǎn)單的示例對(duì)DataStream API進(jìn)行淺析。
3.1 WordCount示例
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataStream<String> dataStream = env.readTextFile("the_path_for_input");
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
dataStream.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1);
counts.print();
// execute program
env.execute("Streaming WordCount");
從上面WordCount代碼示例可以看出,一個(gè)Flink流處理程序主要包括以下3個(gè)部分:
- StreamExecutionEnvironment初始化:該部分主要?jiǎng)?chuàng)建和初始化StreamExecutionEnvironment,提供通過DataStream API構(gòu)建Flink作業(yè)需要的執(zhí)行環(huán)境,包括設(shè)定ExecutionConfig、CheckpointConfig等配置信息以及StateBackend和TimeCharacteristic等變量。
- 業(yè)務(wù)邏輯轉(zhuǎn)換代碼:該模塊是用戶編寫轉(zhuǎn)換邏輯的區(qū)域,在streamExecutionEnvironment中提供了創(chuàng)建DataStream的方法,例如通過StreamExecutionEnvironment.readTextFile()方法讀取文本數(shù)據(jù)并構(gòu)建DataStreamSource數(shù)據(jù)集,之后所有的DataStream轉(zhuǎn)換操作都會(huì)以DataStreamSource為頭部節(jié)點(diǎn)。同時(shí),DataStreamAPI中提供了各種轉(zhuǎn)換操作,例如map、reduce、join等算子,用戶可以通過這些轉(zhuǎn)換操作構(gòu)建完整的Flink計(jì)算邏輯。
- 執(zhí)行應(yīng)用程序:編寫完Flink應(yīng)用后,必須調(diào)用ExecutionEnvironment.execute()方法執(zhí)行整個(gè)應(yīng)用程序,在execute()方法中會(huì)基于DataStream之間的轉(zhuǎn)換操作生成StreamGraph,并將StreamGraph結(jié)構(gòu)轉(zhuǎn)換為JobGraph,最終將JobGraph提交到指定的Session集群中運(yùn)行。
3.2 DataStream結(jié)構(gòu)
DataStream數(shù)據(jù)結(jié)構(gòu)包含兩個(gè)主要成員:streamExecutionEnvironment和transformation。DataStream用于表達(dá)業(yè)務(wù)轉(zhuǎn)換邏輯,可以通過transformation生成新的DataStream。DataStream實(shí)際上并不存儲(chǔ)真實(shí)數(shù)據(jù)。

如上圖所示,DataStream之間的轉(zhuǎn)換操作都是通過StreamTransformation進(jìn)行的,例如當(dāng)用戶執(zhí)行DataStream.map()方法轉(zhuǎn)換時(shí),底層對(duì)應(yīng)的便是OneInputTransformation轉(zhuǎn)換操作。在DataStream轉(zhuǎn)換的過程中,不管是哪種類型的轉(zhuǎn)換操作,都是按照相同方式進(jìn)行:首先將用戶自定義的函數(shù)(如示例中的new Tokenizer())封裝到Operator中,然后將Operator封裝到Transformation轉(zhuǎn)換操作結(jié)構(gòu)中,最后將Transformation寫入StreamExecutionEnvironment提供的Transformation集合。通過DataStream之間的轉(zhuǎn)換操作形成Pipeline拓?fù)?,即StreamGraph數(shù)據(jù)結(jié)構(gòu),最終通過StreamGraph生成JobGraph并提交到集群上運(yùn)行。
3.3 Transformation
由上文可以知道,DataStream之間的轉(zhuǎn)換操作都是基于Transformation來(lái)實(shí)現(xiàn)的,每種Transformation實(shí)現(xiàn)都和DataStream的一個(gè)接口方法對(duì)應(yīng)。

從上面的的Transformation類圖可以看出,Transformation的子類涵蓋了所有的DataStream轉(zhuǎn)換操作。常用到的StreamMap、StreamFilter算子封裝在OneInputTransformation中,即單輸入類型的轉(zhuǎn)換操作;常見的雙輸入類型算子有join、connect等,對(duì)應(yīng)支持雙輸入類型轉(zhuǎn)換的TwoInputTransformation。
在Transformation的基礎(chǔ)上又抽象出了PhysicalTransformation類。PhysicalTransformation中提供了setChainingStrategy方法,可以將上下游算子按照指定的策略連接,從而減少網(wǎng)絡(luò)數(shù)據(jù)傳輸、提高計(jì)算性能。ChainingStrategy支持如下三種策略:
- ALWAYS:代表該Transformation中的算子會(huì)和上游算子盡可能地鏈化,最終將多個(gè)Operator組合成OperatorChain。OperatorChain中的Operator會(huì)運(yùn)行在同一個(gè)SubTask實(shí)例中,這樣做的目的主要是優(yōu)化性能,減少Operator之間的網(wǎng)絡(luò)傳輸。
- NEVER:代表該Transformation中的Operator永遠(yuǎn)不會(huì)和上下游算子之間鏈化,因此對(duì)應(yīng)的Operator會(huì)運(yùn)行在獨(dú)立的SubTask實(shí)例中。
- HEAD:代表該Transformation對(duì)應(yīng)的Operator為頭部算子,不支持上游算子鏈化,但是可以和下游算子鏈化,實(shí)際上就是OperatorChain中的HeaderOperator。
這里整理了常用的Transformation,如下圖所示:

四、運(yùn)行時(shí)架構(gòu)
Flink客戶端會(huì)將用戶的作業(yè)轉(zhuǎn)換為JobGraph結(jié)構(gòu)并提交至集群的運(yùn)行時(shí)中,對(duì)作業(yè)進(jìn)行調(diào)度并拆分成Task繼續(xù)調(diào)度和執(zhí)行。運(yùn)行時(shí)中的核心組件和服務(wù)會(huì)分工并協(xié)調(diào)合作,最終完成整個(gè)Job的調(diào)度和執(zhí)行。

4.1 主要組件
這里先介紹幾個(gè)Flink中的重要概念:
- Job: 一個(gè)Job對(duì)應(yīng)一個(gè)用戶提交的作業(yè),也即對(duì)應(yīng)一個(gè)jobGraph
- Task: Flink會(huì)基于jobGraph中每個(gè)算子的鏈化策略(見3.3)和用戶設(shè)置的并發(fā)度,將一個(gè)Job拆分為多個(gè)Task,并為每個(gè)Task申請(qǐng)資源執(zhí)行
- Slot: 是Flink中并發(fā)執(zhí)行的最小單位,可以理解為Java中的線程。Slot由集群中的TaskManager提供、ResourceManager統(tǒng)一管理。
Dispatcher
Dispatcher主要負(fù)責(zé)接收客戶端提交的JobGraph對(duì)象(例如CLI客戶端或FlinkWebUI提交的任務(wù)最終都會(huì)發(fā)送至Dispatcher組件),并對(duì)JobGraph進(jìn)行分發(fā)和執(zhí)行。其中就包含根據(jù)JobGraph對(duì)象啟動(dòng)JobManager服務(wù),專門用于管理整個(gè)任務(wù)的生命周期。
ResourceManager
ResourceManager有兩個(gè)主要職責(zé):負(fù)責(zé)管理Flink集群中的計(jì)算資源,其中計(jì)算資源主要來(lái)自TaskManager組件;以及接收來(lái)自JobManager的SlotRequest。
如果采用集群部署方式,則ResourceManager會(huì)動(dòng)態(tài)地向集群資源管理器申請(qǐng)Container并啟動(dòng)TaskManager,例如HadoopYarn、Kubernetes等。對(duì)于不同的集群資源管理器,ResourceManager的實(shí)現(xiàn)也會(huì)有所不同。
JobManager
Dispatcher會(huì)根據(jù)接收的JobGraph對(duì)象為任務(wù)創(chuàng)建JobManager服務(wù),由后者對(duì)整個(gè)任務(wù)的生命周期進(jìn)行管理。JobManager會(huì)將JobGraph轉(zhuǎn)換成ExecutionGraph結(jié)構(gòu),并通過內(nèi)部調(diào)度程序?qū)xecutionGraph中的ExecutionVertex節(jié)點(diǎn)進(jìn)行調(diào)度和執(zhí)行,最終會(huì)經(jīng)過ResourceManager向指定的TaskManager提交和運(yùn)行Task實(shí)例。同時(shí)也會(huì)監(jiān)控各個(gè)Task的運(yùn)行狀況,直到整個(gè)作業(yè)中所有的Task都執(zhí)行完畢或停止。
和Dispatcher組件一樣,JobManager組件本身也是RPC服務(wù),因此具備RPC通信的能力,可以與ResourceManager進(jìn)行RPC通信、申請(qǐng)任務(wù)的計(jì)算資源。當(dāng)任務(wù)執(zhí)行完畢后,JobManager服務(wù)也會(huì)關(guān)閉并釋放任務(wù)占用的計(jì)算資源。
TaskManager
TaskManager負(fù)責(zé)向整個(gè)集群提供Slot計(jì)算資源、并管理JobManager提交的Task任務(wù)。TaskManager會(huì)向JobManager服務(wù)提供從ResourceManager中申請(qǐng)和分配的Slot計(jì)算資源,JobManager最終會(huì)根據(jù)分配到的Slot計(jì)算資源將Task提交到TaskManager上運(yùn)行。
4.2 執(zhí)行流程
接下來(lái)我們看整個(gè)集群中各個(gè)主要組件的啟動(dòng)流程。如上圖,我們以Session類型(見第五節(jié))的集群為例進(jìn)行說(shuō)明,F(xiàn)link Session集群的啟動(dòng)流程主要包含如下步驟:
- 用戶通過客戶端命令啟動(dòng)SessionCluster,此時(shí)會(huì)觸發(fā)整個(gè)集群服務(wù)的啟動(dòng)過程,客戶端會(huì)向集群資源管理器申請(qǐng)Container計(jì)算資源以啟動(dòng)運(yùn)行時(shí)中的管理節(jié)點(diǎn)。
- ClusterManagement會(huì)為運(yùn)行時(shí)集群分配Application主節(jié)點(diǎn)需要的資源并啟動(dòng)主節(jié)點(diǎn)服務(wù),例如在HadoopYarn資源管理器中會(huì)分配并啟動(dòng)Flink管理節(jié)點(diǎn)對(duì)應(yīng)的Container。
- 客戶端將用戶提交的應(yīng)用程序代碼經(jīng)過本地運(yùn)行生成JobGraph結(jié)構(gòu),然后通過ClusterClient將JobGraph提交到集群運(yùn)行時(shí)中運(yùn)行。
- 此時(shí)集群運(yùn)行時(shí)中的Dispatcher服務(wù)會(huì)接收到ClusterClient提交的JobGraph對(duì)象,然后根據(jù)JobGraph啟動(dòng)JobManagerRPC服務(wù)。JobManager是每個(gè)提交的作業(yè)都會(huì)單獨(dú)創(chuàng)建的作業(yè)管理服務(wù),生命周期和整個(gè)作業(yè)的生命周期一致。
- 當(dāng)JobManagerRPC服務(wù)啟動(dòng)后,下一步就是根據(jù)JobGraph配置的計(jì)算資源向ResourceManager服務(wù)申請(qǐng)運(yùn)行Task實(shí)例需要的Slot計(jì)算資源。
- 此時(shí)ResourceManager接收到JobManager提交的資源申請(qǐng)后,先判斷集群中是否有足夠的Slot資源滿足作業(yè)的資源申請(qǐng),如果有則直接向JobManager分配計(jì)算資源,如果沒有則動(dòng)態(tài)地向外部集群資源管理器申請(qǐng)啟動(dòng)額外的Container以提供Slot計(jì)算資源。
- 如果在集群資源管理器(例如HadoopYarn)中有足夠的Container計(jì)算資源,就會(huì)根據(jù)ResourceManager的命令啟動(dòng)指定的TaskManager實(shí)例。
- TaskManager啟動(dòng)后會(huì)主動(dòng)向ResourceManager注冊(cè)Slot信息,即其自身能提供的全部Slot資源。ResourceManager接收到TaskManager中的Slot計(jì)算資源時(shí),就會(huì)立即向該TaskManager發(fā)送Slot資源申請(qǐng),為JobManager服務(wù)分配提交任務(wù)所需的Slot計(jì)算資源。
- 當(dāng)TaskManager接收到ResourceManager的資源分配請(qǐng)求后,TaskManager會(huì)對(duì)符合申請(qǐng)條件的SlotRequest進(jìn)行處理,然后立即向JobManager提供Slot資源。
- 此時(shí)JobManager會(huì)接收到來(lái)自TaskManager的offerslots消息,接下來(lái)會(huì)向Slot所在的TaskManager申請(qǐng)?zhí)峤籘ask實(shí)例。TaskManager接收到來(lái)自JobManager的Task啟動(dòng)申請(qǐng)后,會(huì)在已經(jīng)分配的Slot卡槽中啟動(dòng)Task線程。
- TaskManager中啟動(dòng)的Task線程會(huì)周期性地向JobManager匯報(bào)任務(wù)運(yùn)行狀態(tài),直到完成整個(gè)任務(wù)運(yùn)行。
五、Flink部署模式
常見的Flink部署方式有如下三種:
- Standalone:?jiǎn)螜C(jī)部署
- Flink on YARN:YARN集群部署
- Flink on Kubernetes:容器化部署
這里以Flink on YARN來(lái)說(shuō)明兩種常用的部署模式:
- Session-Cluster模式:Session-Cluster模式需要先啟動(dòng)集群,然后再提交作業(yè),接著會(huì)向YARN申請(qǐng)一塊空間后,資源永遠(yuǎn)保持不變。如果資源滿了,下一個(gè)作業(yè)就無(wú)法提交,只能等到Y(jié)ARN中的其中一個(gè)作業(yè)執(zhí)行完成后釋放了資源,下個(gè)作業(yè)才會(huì)正常提交。所有作業(yè)共享Dispatcher和ResourceManager、共享資源,適合規(guī)模小執(zhí)行時(shí)間短的作業(yè)。

- Per-Job-Cluster模式:一個(gè)Job會(huì)對(duì)應(yīng)一個(gè)集群,每提交一個(gè)作業(yè)會(huì)根據(jù)自身的情況,都會(huì)單獨(dú)向YARN申請(qǐng)資源,直到作業(yè)執(zhí)行完成,一個(gè)作業(yè)的失敗與否并不會(huì)影響下一個(gè)作業(yè)的正常提交和運(yùn)行。獨(dú)享Dispatcher和ResourceManager,按需接受資源申請(qǐng),適合規(guī)模大長(zhǎng)時(shí)間運(yùn)行的作業(yè)。

參考資料
- Flink源碼
- 《Flink設(shè)計(jì)與實(shí)現(xiàn) 核心原理與源碼解析》--張利兵
- B站Flink教程