101、Flink的基礎(chǔ)編程模型了解嗎?
上圖是來自Flink官網(wǎng)的運(yùn)行流程圖。通過上圖我們可以得知,F(xiàn)link 程序的基本構(gòu)建是數(shù)據(jù)輸入來自一個(gè) Source,Source 代表數(shù)據(jù)的輸入端,經(jīng)過 Transformation 進(jìn)行轉(zhuǎn)換,然后在一個(gè)或者多個(gè)Sink接收器中結(jié)束。數(shù)據(jù)流(stream)就是一組永遠(yuǎn)不會(huì)停止的數(shù)據(jù)記錄流,而轉(zhuǎn)換(transformation)是將一個(gè)或多個(gè)流作為輸入,并生成一個(gè)或多個(gè)輸出流的操作。執(zhí)行時(shí),F(xiàn)link程序映射到 streaming dataflows,由流(streams)和轉(zhuǎn)換操作(transformation operators)組成。
102、Flink集群有哪些角色?各自有什么作用?
Flink 程序在運(yùn)行時(shí)主要有 TaskManager,JobManager,Client三種角色。其中JobManager扮演著集群中的管理者M(jìn)aster的角色,它是整個(gè)集群的協(xié)調(diào)者,負(fù)責(zé)接收Flink Job,協(xié)調(diào)檢查點(diǎn),F(xiàn)ailover 故障恢復(fù)等,同時(shí)管理Flink集群中從節(jié)點(diǎn)TaskManager。
TaskManager是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,在其上執(zhí)行Flink Job的一組Task,每個(gè)TaskManager負(fù)責(zé)管理其所在節(jié)點(diǎn)上的資源信息,如內(nèi)存、磁盤、網(wǎng)絡(luò),在啟動(dòng)的時(shí)候?qū)①Y源的狀態(tài)向JobManager匯報(bào)。
Client是Flink程序提交的客戶端,當(dāng)用戶提交一個(gè)Flink程序時(shí),會(huì)首先創(chuàng)建一個(gè)Client,該Client首先會(huì)對(duì)用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager。
103、說說 Flink 資源管理中 Task Slot 的概念
在Flink架構(gòu)角色中我們提到,TaskManager是實(shí)際負(fù)責(zé)執(zhí)行計(jì)算的Worker,TaskManager 是一個(gè) JVM 進(jìn)程,并會(huì)以獨(dú)立的線程來執(zhí)行一個(gè)task或多個(gè)subtask。為了控制一個(gè) TaskManager 能接受多少個(gè) task,F(xiàn)link 提出了 Task Slot 的概念。
簡(jiǎn)單的說,TaskManager會(huì)將自己節(jié)點(diǎn)上管理的資源分為不同的Slot:固定大小的資源子集。這樣就避免了不同Job的Task互相競(jìng)爭(zhēng)內(nèi)存資源,但是需要主要的是,Slot只會(huì)做內(nèi)存的隔離。沒有做CPU的隔離。
104、說說 Flink 的常用算子?
Flink 最常用的常用算子包括:
Map:DataStream → DataStream,輸入一個(gè)參數(shù)產(chǎn)生一個(gè)參數(shù),map的功能是對(duì)輸入的參數(shù)進(jìn)行轉(zhuǎn)換操作。
Filter:過濾掉指定條件的數(shù)據(jù)。
KeyBy:按照指定的key進(jìn)行分組。
Reduce:用來進(jìn)行結(jié)果匯總合并。
Window:窗口函數(shù),根據(jù)某些特性將每個(gè)key的數(shù)據(jù)進(jìn)行分組(例如:在5s內(nèi)到達(dá)的數(shù)據(jù))
105、說說你知道的Flink分區(qū)策略?
什么要搞懂什么是分區(qū)策略。分區(qū)策略是用來決定數(shù)據(jù)如何發(fā)送至下游。目前 Flink 支持了8中分區(qū)策略的實(shí)現(xiàn)。
上圖是整個(gè)Flink實(shí)現(xiàn)的分區(qū)策略繼承圖:
GlobalPartitioner 數(shù)據(jù)會(huì)被分發(fā)到下游算子的第一個(gè)實(shí)例中進(jìn)行處理。
ShufflePartitioner 數(shù)據(jù)會(huì)被隨機(jī)分發(fā)到下游算子的每一個(gè)實(shí)例中進(jìn)行處理。
RebalancePartitioner 數(shù)據(jù)會(huì)被循環(huán)發(fā)送到下游的每一個(gè)實(shí)例中進(jìn)行處理。
RescalePartitioner 這種分區(qū)器會(huì)根據(jù)上下游算子的并行度,循環(huán)的方式輸出到下游算子的每個(gè)實(shí)例。這里有點(diǎn)難以理解,假設(shè)上游并行度為2,編號(hào)為A和B。下游并行度為4,編號(hào)為1,2,3,4。那么A則把數(shù)據(jù)循環(huán)發(fā)送給1和2,B則把數(shù)據(jù)循環(huán)發(fā)送給3和4。假設(shè)上游并行度為4,編號(hào)為A,B,C,D。下游并行度為2,編號(hào)為1,2。那么A和B則把數(shù)據(jù)發(fā)送給1,C和D則把數(shù)據(jù)發(fā)送給2。
BroadcastPartitioner 廣播分區(qū)會(huì)將上游數(shù)據(jù)輸出到下游算子的每個(gè)實(shí)例中。適合于大數(shù)據(jù)集和小數(shù)據(jù)集做Jion的場(chǎng)景。
ForwardPartitioner ForwardPartitioner 用于將記錄輸出到下游本地的算子實(shí)例。它要求上下游算子并行度一樣。簡(jiǎn)單的說,F(xiàn)orwardPartitioner用來做數(shù)據(jù)的控制臺(tái)打印。
KeyGroupStreamPartitioner Hash分區(qū)器。會(huì)將數(shù)據(jù)按 Key 的 Hash 值輸出到下游算子實(shí)例中。
CustomPartitionerWrapper 用戶自定義分區(qū)器。需要用戶自己實(shí)現(xiàn)Partitioner接口,來定義自己的分區(qū)邏輯。例如:
staticclassCustomPartitionerimplementsPartitioner {@Overridepublicintpartition(Stringkey, int numPartitions) {switch(key){case"1":return1;case"2":return2;case"3":return3;default:return4;? ? ? ? ? }? ? ? }? }
106、Flink的并行度了解嗎?Flink的并行度設(shè)置是怎樣的?
Flink中的任務(wù)被分為多個(gè)并行任務(wù)來執(zhí)行,其中每個(gè)并行的實(shí)例處理一部分?jǐn)?shù)據(jù)。這些并行實(shí)例的數(shù)量被稱為并行度。
我們?cè)趯?shí)際生產(chǎn)環(huán)境中可以從四個(gè)不同層面設(shè)置并行度:
操作算子層面(Operator Level)
執(zhí)行環(huán)境層面(Execution Environment Level)
客戶端層面(Client Level)
系統(tǒng)層面(System Level)
需要注意的優(yōu)先級(jí):算子層面>環(huán)境層面>客戶端層面>系統(tǒng)層面。
107、Flink的Slot和parallelism有什么區(qū)別?
官網(wǎng)上十分經(jīng)典的圖:
slot是指taskmanager的并發(fā)執(zhí)行能力,假設(shè)我們將 taskmanager.numberOfTaskSlots 配置為3 那么每一個(gè) taskmanager 中分配3個(gè) TaskSlot, 3個(gè) taskmanager 一共有9個(gè)TaskSlot。
parallelism是指taskmanager實(shí)際使用的并發(fā)能力。假設(shè)我們把 parallelism.default 設(shè)置為1,那么9個(gè) TaskSlot 只能用1個(gè),有8個(gè)空閑。
108、Flink有沒有重啟策略?說說有哪幾種?
Flink 實(shí)現(xiàn)了多種重啟策略。
固定延遲重啟策略(Fixed Delay Restart Strategy)
故障率重啟策略(Failure Rate Restart Strategy)
沒有重啟策略(No Restart Strategy)
Fallback重啟策略(Fallback Restart Strategy)
109、用過Flink中的分布式緩存嗎?如何使用?
Flink實(shí)現(xiàn)的分布式緩存和Hadoop有異曲同工之妙。目的是在本地讀取文件,并把他放在 taskmanager 節(jié)點(diǎn)中,防止task重復(fù)拉取。
valenv = ExecutionEnvironment.getExecutionEnvironment// register a file from HDFSenv.registerCachedFile("hdfs:///path/to/your/file","hdfsFile")// register a local executable file (script, executable, ...)env.registerCachedFile("file:///path/to/exec/file","localExecFile",true)// define your program and execute...valinput: DataSet[String] = ...valresult: DataSet[Integer] = input.map(new MyMapper())...env.execute()
110、說說Flink中的廣播變量,使用時(shí)需要注意什么?
我們知道Flink是并行的,計(jì)算過程可能不在一個(gè) Slot 中進(jìn)行,那么有一種情況即:當(dāng)我們需要訪問同一份數(shù)據(jù)。那么Flink中的廣播變量就是為了解決這種情況。
我們可以把廣播變量理解為是一個(gè)公共的共享變量,我們可以把一個(gè)dataset 數(shù)據(jù)集廣播出去,然后不同的task在節(jié)點(diǎn)上都能夠獲取到,這個(gè)數(shù)據(jù)在每個(gè)節(jié)點(diǎn)上只會(huì)存在一份。
111、說說Flink中的窗口?
來一張官網(wǎng)經(jīng)典的圖:
Flink 支持兩種劃分窗口的方式,按照time和count。如果根據(jù)時(shí)間劃分窗口,那么它就是一個(gè)time-window 如果根據(jù)數(shù)據(jù)劃分窗口,那么它就是一個(gè)count-window。
flink支持窗口的兩個(gè)重要屬性(size和interval)
如果size=interval,那么就會(huì)形成tumbling-window(無重疊數(shù)據(jù)) 如果size>interval,那么就會(huì)形成sliding-window(有重疊數(shù)據(jù)) 如果size< interval, 那么這種窗口將會(huì)丟失數(shù)據(jù)。比如每5秒鐘,統(tǒng)計(jì)過去3秒的通過路口汽車的數(shù)據(jù),將會(huì)漏掉2秒鐘的數(shù)據(jù)。
通過組合可以得出四種基本窗口:
time-tumbling-window 無重疊數(shù)據(jù)的時(shí)間窗口,設(shè)置方式舉例:timeWindow(Time.seconds(5))
time-sliding-window 有重疊數(shù)據(jù)的時(shí)間窗口,設(shè)置方式舉例:timeWindow(Time.seconds(5), Time.seconds(3))
count-tumbling-window無重疊數(shù)據(jù)的數(shù)量窗口,設(shè)置方式舉例:countWindow(5)
count-sliding-window 有重疊數(shù)據(jù)的數(shù)量窗口,設(shè)置方式舉例:countWindow(5,3)
112、說說Flink中的狀態(tài)存儲(chǔ)?
Flink在做計(jì)算的過程中經(jīng)常需要存儲(chǔ)中間狀態(tài),來避免數(shù)據(jù)丟失和狀態(tài)恢復(fù)。選擇的狀態(tài)存儲(chǔ)策略不同,會(huì)影響狀態(tài)持久化如何和 checkpoint 交互。
Flink提供了三種狀態(tài)存儲(chǔ)方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
113、Flink 中的時(shí)間有哪幾類
Flink 中的時(shí)間和其他流式計(jì)算系統(tǒng)的時(shí)間一樣分為三類:事件時(shí)間,攝入時(shí)間,處理時(shí)間三種。
如果以 EventTime 為基準(zhǔn)來定義時(shí)間窗口將形成EventTimeWindow,要求消息本身就應(yīng)該攜帶EventTime。
(1)如果以 IngesingtTime 為基準(zhǔn)來定義時(shí)間窗口將形成 IngestingTimeWindow,
(2)以 source 的systemTime為準(zhǔn)。如果以 ProcessingTime 基準(zhǔn)來定義時(shí)間窗口將形成 ProcessingTimeWindow,
(3)以 operator 的systemTime 為準(zhǔn)。
114、Flink 中水印是什么概念,起到什么作用?
Watermark 是 Apache Flink 為了處理 EventTime 窗口計(jì)算提出的一種機(jī)制, 本質(zhì)上是一種時(shí)間戳。一般來講Watermark經(jīng)常和Window一起被用來處理亂序事件。
115、Flink Table & SQL 熟悉嗎?TableEnvironment這個(gè)類有什么作用
TableEnvironment是Table API和SQL集成的核心概念。
這個(gè)類主要用來:
(1)在內(nèi)部catalog中注冊(cè)表
(2)注冊(cè)外部catalog
(3)執(zhí)行SQL查詢
(4)注冊(cè)用戶定義(標(biāo)量,表或聚合)函數(shù)
(5)將DataStream或DataSet轉(zhuǎn)換為表
持有對(duì)ExecutionEnvironment或StreamExecutionEnvironment的引用
116、Flink SQL的實(shí)現(xiàn)原理是什么?是如何實(shí)現(xiàn) SQL 解析的呢?
首先大家要知道 Flink 的SQL解析是基于Apache Calcite這個(gè)開源框架。
基于此,一次完整的SQL解析過程如下:
用戶使用對(duì)外提供Stream SQL的語法開發(fā)業(yè)務(wù)應(yīng)用
用calcite對(duì)StreamSQL進(jìn)行語法檢驗(yàn),語法檢驗(yàn)通過后,轉(zhuǎn)換成calcite的邏輯樹節(jié)點(diǎn);最終形成calcite的邏輯計(jì)劃
采用Flink自定義的優(yōu)化規(guī)則和calcite火山模型、啟發(fā)式模型共同對(duì)邏輯樹進(jìn)行優(yōu)化,生成最優(yōu)的Flink物理計(jì)劃
對(duì)物理計(jì)劃采用janino codegen生成代碼,生成用低階API DataStream 描述的流應(yīng)用,提交到Flink平臺(tái)執(zhí)行
117、Flink是如何支持批流一體的?
本道面試題考察的其實(shí)就是一句話:Flink的開發(fā)者認(rèn)為批處理是流處理的一種特殊情況。批處理是有限的流處理。Flink 使用一個(gè)引擎支持了DataSet API 和 DataStream API。
118、Flink是如何做到高效的數(shù)據(jù)交換的?
在一個(gè)Flink Job中,數(shù)據(jù)需要在不同的task中進(jìn)行交換,整個(gè)數(shù)據(jù)交換是有 TaskManager 負(fù)責(zé)的,TaskManager 的網(wǎng)絡(luò)組件首先從緩沖buffer中收集records,然后再發(fā)送。Records 并不是一個(gè)一個(gè)被發(fā)送的,二是積累一個(gè)批次再發(fā)送,batch 技術(shù)可以更加高效的利用網(wǎng)絡(luò)資源。
119、Flink是如何做容錯(cuò)的?
Flink 實(shí)現(xiàn)容錯(cuò)主要靠強(qiáng)大的CheckPoint機(jī)制和State機(jī)制。Checkpoint 負(fù)責(zé)定時(shí)制作分布式快照、對(duì)程序中的狀態(tài)進(jìn)行備份;State 用來存儲(chǔ)計(jì)算過程中的中間狀態(tài)。
120、Flink 分布式快照的原理是什么?
Flink的分布式快照是根據(jù)Chandy-Lamport算法量身定做的。簡(jiǎn)單來說就是持續(xù)創(chuàng)建分布式數(shù)據(jù)流及其狀態(tài)的一致快照。
核心思想是在 input source 端插入 barrier,控制 barrier 的同步來實(shí)現(xiàn) snapshot 的備份和 exactly-once 語義。
121、Flink 是如何保證Exactly-once語義的?
Flink通過實(shí)現(xiàn)兩階段提交和狀態(tài)保存來實(shí)現(xiàn)端到端的一致性語義。分為以下幾個(gè)步驟:
(1)開始事務(wù)(beginTransaction)創(chuàng)建一個(gè)臨時(shí)文件夾,來寫把數(shù)據(jù)寫入到這個(gè)文件夾里面
(2)預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫入文件并關(guān)閉
(3)正式提交(commit)將之前寫完的臨時(shí)文件放入目標(biāo)目錄下。這代表著最終的數(shù)據(jù)會(huì)有一些延遲
丟棄(abort)丟棄臨時(shí)文件
若失敗發(fā)生在預(yù)提交成功后,正式提交前。可以根據(jù)狀態(tài)來提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的數(shù)據(jù)。
122、Flink 的 kafka 連接器有什么特別的地方?
Flink源碼中有一個(gè)獨(dú)立的connector模塊,所有的其他connector都依賴于此模塊,F(xiàn)link 在1.9版本發(fā)布的全新kafka連接器,摒棄了之前連接不同版本的kafka集群需要依賴不同版本的connector這種做法,只需要依賴一個(gè)connector即可。
123、說說 Flink的內(nèi)存管理是如何做的?
Flink 并不是將大量對(duì)象存在堆上,而是將對(duì)象都序列化到一個(gè)預(yù)分配的內(nèi)存塊上。此外,F(xiàn)link大量的使用了堆外內(nèi)存。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會(huì)將部分?jǐn)?shù)據(jù)存儲(chǔ)到硬盤上。Flink 為了直接操作二進(jìn)制數(shù)據(jù)實(shí)現(xiàn)了自己的序列化框架。
理論上Flink的內(nèi)存管理分為三部分:
Network Buffers:這個(gè)是在TaskManager啟動(dòng)的時(shí)候分配的,這是一組用于緩存網(wǎng)絡(luò)數(shù)據(jù)的內(nèi)存,每個(gè)塊是32K,默認(rèn)分配2048個(gè),可以通過“taskmanager.network.numberOfBuffers”修改
Memory Manage pool:大量的Memory Segment塊,用于運(yùn)行時(shí)的算法(Sort/Join/Shuffle等),這部分啟動(dòng)的時(shí)候就會(huì)分配。下面這段代碼,根據(jù)配置文件中的各種參數(shù)來計(jì)算內(nèi)存的分配方法。(heap or off-heap,這個(gè)放到下節(jié)談),內(nèi)存的分配支持預(yù)分配和lazy load,默認(rèn)懶加載的方式。
User Code,這部分是除了Memory Manager之外的內(nèi)存用于User code和TaskManager本身的數(shù)據(jù)結(jié)構(gòu)。
124、說說 Flink的序列化如何做的?
Java本身自帶的序列化和反序列化的功能,但是輔助信息占用空間比較大,在序列化對(duì)象時(shí)記錄了過多的類信息。
Apache Flink摒棄了Java原生的序列化方法,以獨(dú)特的方式處理數(shù)據(jù)類型和序列化,包含自己的類型描述符,泛型類型提取和類型序列化框架。
TypeInformation 是所有類型描述符的基類。它揭示了該類型的一些基本屬性,并且可以生成序列化器。TypeInformation 支持以下幾種類型:
(1)BasicTypeInfo: 任意Java 基本類型或 String 類型
(2)BasicArrayTypeInfo: 任意Java基本類型數(shù)組或 String 數(shù)組
(3)WritableTypeInfo: 任意 Hadoop Writable 接口的實(shí)現(xiàn)類
(4)TupleTypeInfo: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長(zhǎng)度固定類型的Java Tuple實(shí)現(xiàn)
(5)CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)
(6)PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java對(duì)象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法
(7)GenericTypeInfo: 任意無法匹配之前幾種類型的類
針對(duì)前六種類型數(shù)據(jù)集,F(xiàn)link皆可以自動(dòng)生成對(duì)應(yīng)的TypeSerializer,能非常高效地對(duì)數(shù)據(jù)集進(jìn)行序列化和反序列化。
125、 Flink中的Window出現(xiàn)了數(shù)據(jù)傾斜,你有什么解決辦法?
window產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過兩種方式來解決:
在數(shù)據(jù)進(jìn)入窗口前做預(yù)聚合
重新設(shè)計(jì)窗口聚合的key
126、 Flink中在使用聚合函數(shù) GroupBy、Distinct、KeyBy 等函數(shù)時(shí)出現(xiàn)數(shù)據(jù)熱點(diǎn)該如何解決?
數(shù)據(jù)傾斜和數(shù)據(jù)熱點(diǎn)是所有大數(shù)據(jù)框架繞不過去的問題。處理這類問題主要從3個(gè)方面入手:
在業(yè)務(wù)上規(guī)避這類問題
例如一個(gè)假設(shè)訂單場(chǎng)景,北京和上海兩個(gè)城市訂單量增長(zhǎng)幾十倍,其余城市的數(shù)據(jù)量不變。這時(shí)候我們?cè)谶M(jìn)行聚合的時(shí)候,北京和上海就會(huì)出現(xiàn)數(shù)據(jù)堆積,我們可以單獨(dú)數(shù)據(jù)北京和上海的數(shù)據(jù)。
Key的設(shè)計(jì)上
把熱key進(jìn)行拆分,比如上個(gè)例子中的北京和上海,可以把北京和上海按照地區(qū)進(jìn)行拆分聚合。
參數(shù)設(shè)置
Flink 1.9.0 SQL(Blink Planner) 性能優(yōu)化中一項(xiàng)重要的改進(jìn)就是升級(jí)了微批模型,即 MiniBatch。原理是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對(duì)State的訪問,從而提升吞吐和減少數(shù)據(jù)的輸出量。
127、Flink任務(wù)延遲高,想解決這個(gè)問題,你會(huì)如何入手?
在Flink的后臺(tái)任務(wù)管理中,我們可以看到Flink的哪個(gè)算子和task出現(xiàn)了反壓。最主要的手段是資源調(diào)優(yōu)和算子調(diào)優(yōu)。資源調(diào)優(yōu)即是對(duì)作業(yè)中的Operator的并發(fā)數(shù)(parallelism)、CPU(core)、堆內(nèi)存(heap_memory)等參數(shù)進(jìn)行調(diào)優(yōu)。作業(yè)參數(shù)調(diào)優(yōu)包括:并行度的設(shè)置,State的設(shè)置,checkpoint的設(shè)置。
128、Flink是如何處理反壓的?
Flink 內(nèi)部是基于 producer-consumer 模型來進(jìn)行消息傳遞的,F(xiàn)link的反壓設(shè)計(jì)也是基于這個(gè)模型。Flink 使用了高效有界的分布式阻塞隊(duì)列,就像 Java 通用的阻塞隊(duì)列(BlockingQueue)一樣。下游消費(fèi)者消費(fèi)變慢,上游就會(huì)受到阻塞。
129、Flink的反壓和Strom有哪些不同?
Storm 是通過監(jiān)控 Bolt 中的接收隊(duì)列負(fù)載情況,如果超過高水位值就會(huì)將反壓信息寫到 Zookeeper ,Zookeeper 上的 watch 會(huì)通知該拓?fù)涞乃?Worker 都進(jìn)入反壓狀態(tài),最后 Spout 停止發(fā)送 tuple。
Flink中的反壓使用了高效有界的分布式阻塞隊(duì)列,下游消費(fèi)變慢會(huì)導(dǎo)致發(fā)送端阻塞。二者最大的區(qū)別是Flink是逐級(jí)反壓,而Storm是直接從源頭降速。
130、 Operator Chains(算子鏈)這個(gè)概念你了解嗎?
為了更高效地分布式執(zhí)行,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task。每個(gè)task在一個(gè)線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。這就是我們所說的算子鏈。
131、 Flink什么情況下才會(huì)把Operator chain在一起形成算子鏈?
兩個(gè)operator chain在一起的的條件:
上下游的并行度一致
下游節(jié)點(diǎn)的入度為1 (也就是說下游節(jié)點(diǎn)沒有來自其他節(jié)點(diǎn)的輸入)
上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是ALWAYS)
上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD)
兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
用戶沒有禁用 chain
132、 說說Flink1.9的新特性?
支持hive讀寫,支持UDF
Flink SQL TopN和GroupBy等優(yōu)化
Checkpoint跟savepoint針對(duì)實(shí)際業(yè)務(wù)場(chǎng)景做了優(yōu)化
Flink state查詢
133、消費(fèi)kafka數(shù)據(jù)的時(shí)候,如何處理臟數(shù)據(jù)?
可以在處理前加一個(gè)fliter算子,將不符合規(guī)則的數(shù)據(jù)過濾出去。
134、Flink Job的提交流程
用戶提交的Flink Job會(huì)被轉(zhuǎn)化成一個(gè)DAG任務(wù)運(yùn)行,分別是:StreamGraph、JobGraph、ExecutionGraph,F(xiàn)link中JobManager與TaskManager,JobManager與Client的交互是基于Akka工具包的,是通過消息驅(qū)動(dòng)。整個(gè)Flink Job的提交還包含著ActorSystem的創(chuàng)建,JobManager的啟動(dòng),TaskManager的啟動(dòng)和注冊(cè)。
135、Flink所謂"三層圖"結(jié)構(gòu)是哪幾個(gè)"圖"?
一個(gè)Flink任務(wù)的DAG生成計(jì)算圖大致經(jīng)歷以下三個(gè)過程:
StreamGraph 最接近代碼所表達(dá)的邏輯層面的計(jì)算拓?fù)浣Y(jié)構(gòu),按照用戶代碼的執(zhí)行順序向StreamExecutionEnvironment添加StreamTransformation構(gòu)成流式圖。
JobGraph 從StreamGraph生成,將可以串聯(lián)合并的節(jié)點(diǎn)進(jìn)行合并,設(shè)置節(jié)點(diǎn)之間的邊,安排資源共享slot槽位和放置相關(guān)聯(lián)的節(jié)點(diǎn),上傳任務(wù)所需的文件,設(shè)置檢查點(diǎn)配置等。相當(dāng)于經(jīng)過部分初始化和優(yōu)化處理的任務(wù)圖。
ExecutionGraph 由JobGraph轉(zhuǎn)換而來,包含了任務(wù)具體執(zhí)行所需的內(nèi)容,是最貼近底層實(shí)現(xiàn)的執(zhí)行圖。
136、JobManger在集群中扮演了什么角色?
JobManager 負(fù)責(zé)整個(gè) Flink 集群任務(wù)的調(diào)度以及資源的管理,從客戶端中獲取提交的應(yīng)用,然后根據(jù)集群中 TaskManager 上 TaskSlot 的使用情況,為提交的應(yīng)用分配相應(yīng)的 TaskSlot 資源并命令 TaskManager 啟動(dòng)從客戶端中獲取的應(yīng)用。
JobManager 相當(dāng)于整個(gè)集群的 Master 節(jié)點(diǎn),且整個(gè)集群有且只有一個(gè)活躍的 JobManager ,負(fù)責(zé)整個(gè)集群的任務(wù)管理和資源管理。
JobManager 和 TaskManager 之間通過 Actor System 進(jìn)行通信,獲取任務(wù)執(zhí)行的情況并通過 Actor System 將應(yīng)用的任務(wù)執(zhí)行情況發(fā)送給客戶端。
同時(shí)在任務(wù)執(zhí)行的過程中,F(xiàn)link JobManager 會(huì)觸發(fā) Checkpoint 操作,每個(gè) TaskManager 節(jié)點(diǎn) 收到 Checkpoint 觸發(fā)指令后,完成 Checkpoint 操作,所有的 Checkpoint 協(xié)調(diào)過程都是在 Fink JobManager 中完成。
當(dāng)任務(wù)完成后,F(xiàn)link 會(huì)將任務(wù)執(zhí)行的信息反饋給客戶端,并且釋放掉 TaskManager 中的資源以供下一次提交任務(wù)使用。
137、JobManger在集群?jiǎn)?dòng)過程中起到什么作用?
JobManager的職責(zé)主要是接收Flink作業(yè),調(diào)度Task,收集作業(yè)狀態(tài)和管理TaskManager。它包含一個(gè)Actor,并且做如下操作:
RegisterTaskManager: 它由想要注冊(cè)到JobManager的TaskManager發(fā)送。注冊(cè)成功會(huì)通過AcknowledgeRegistration消息進(jìn)行Ack。
SubmitJob: 由提交作業(yè)到系統(tǒng)的Client發(fā)送。提交的信息是JobGraph形式的作業(yè)描述信息。
CancelJob: 請(qǐng)求取消指定id的作業(yè)。成功會(huì)返回CancellationSuccess,否則返回CancellationFailure。
UpdateTaskExecutionState: 由TaskManager發(fā)送,用來更新執(zhí)行節(jié)點(diǎn)(ExecutionVertex)的狀態(tài)。成功則返回true,否則返回false。
RequestNextInputSplit: TaskManager上的Task請(qǐng)求下一個(gè)輸入split,成功則返回NextInputSplit,否則返回null。
JobStatusChanged:它意味著作業(yè)的狀態(tài)(RUNNING, CANCELING, FINISHED,等)發(fā)生變化。這個(gè)消息由ExecutionGraph發(fā)送。
138、TaskManager在集群中扮演了什么角色?
TaskManager 相當(dāng)于整個(gè)集群的 Slave 節(jié)點(diǎn),負(fù)責(zé)具體的任務(wù)執(zhí)行和對(duì)應(yīng)任務(wù)在每個(gè)節(jié)點(diǎn)上的資源申請(qǐng)和管理。
客戶端通過將編寫好的 Flink 應(yīng)用編譯打包,提交到 JobManager,然后 JobManager 會(huì)根據(jù)已注冊(cè)在 JobManager 中 TaskManager 的資源情況,將任務(wù)分配給有資源的 TaskManager節(jié)點(diǎn),然后啟動(dòng)并運(yùn)行任務(wù)。
TaskManager 從 JobManager 接收需要部署的任務(wù),然后使用 Slot 資源啟動(dòng) Task,建立數(shù)據(jù)接入的網(wǎng)絡(luò)連接,接收數(shù)據(jù)并開始數(shù)據(jù)處理。同時(shí) TaskManager 之間的數(shù)據(jù)交互都是通過數(shù)據(jù)流的方式進(jìn)行的。
可以看出,F(xiàn)link 的任務(wù)運(yùn)行其實(shí)是采用多線程的方式,這和 MapReduce 多 JVM 進(jìn)行的方式有很大的區(qū)別,F(xiàn)link 能夠極大提高 CPU 使用效率,在多個(gè)任務(wù)和 Task 之間通過 TaskSlot 方式共享系統(tǒng)資源,每個(gè) TaskManager 中通過管理多個(gè) TaskSlot 資源池進(jìn)行對(duì)資源進(jìn)行有效管理。
139、TaskManager在集群?jiǎn)?dòng)過程中起到什么作用?
TaskManager的啟動(dòng)流程較為簡(jiǎn)單:?jiǎn)?dòng)類:org.apache.flink.runtime.taskmanager.TaskManager 核心啟動(dòng)方法 :selectNetworkInterfaceAndRunTaskManager 啟動(dòng)后直接向JobManager注冊(cè)自己,注冊(cè)完成后,進(jìn)行部分模塊的初始化。
140、Flink 計(jì)算資源的調(diào)度是如何實(shí)現(xiàn)的?
TaskManager中最細(xì)粒度的資源是Task slot,代表了一個(gè)固定大小的資源子集,每個(gè)TaskManager會(huì)將其所占有的資源平分給它的slot。
通過調(diào)整 task slot 的數(shù)量,用戶可以定義task之間是如何相互隔離的。每個(gè) TaskManager 有一個(gè)slot,也就意味著每個(gè)task運(yùn)行在獨(dú)立的 JVM 中。每個(gè) TaskManager 有多個(gè)slot的話,也就是說多個(gè)task運(yùn)行在同一個(gè)JVM中。
而在同一個(gè)JVM進(jìn)程中的task,可以共享TCP連接(基于多路復(fù)用)和心跳消息,可以減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸,也能共享一些數(shù)據(jù)結(jié)構(gòu),一定程度上減少了每個(gè)task的消耗。每個(gè)slot可以接受單個(gè)task,也可以接受多個(gè)連續(xù)task組成的pipeline,如下圖所示,F(xiàn)latMap函數(shù)占用一個(gè)taskslot,而key Agg函數(shù)和sink函數(shù)共用一個(gè)taskslot:
141、簡(jiǎn)述Flink的數(shù)據(jù)抽象及數(shù)據(jù)交換過程?
Flink 為了避免JVM的固有缺陷例如java對(duì)象存儲(chǔ)密度低,F(xiàn)GC影響吞吐和響應(yīng)等,實(shí)現(xiàn)了自主管理內(nèi)存。MemorySegment就是Flink的內(nèi)存抽象。默認(rèn)情況下,一個(gè)MemorySegment可以被看做是一個(gè)32kb大的內(nèi)存塊的抽象。這塊內(nèi)存既可以是JVM里的一個(gè)byte[],也可以是堆外內(nèi)存(DirectByteBuffer)。
在MemorySegment這個(gè)抽象之上,F(xiàn)link在數(shù)據(jù)從operator內(nèi)的數(shù)據(jù)對(duì)象在向TaskManager上轉(zhuǎn)移,預(yù)備被發(fā)給下個(gè)節(jié)點(diǎn)的過程中,使用的抽象或者說內(nèi)存對(duì)象是Buffer。
對(duì)接從Java對(duì)象轉(zhuǎn)為Buffer的中間對(duì)象是另一個(gè)抽象StreamRecord。
142、Flink 中的分布式快照機(jī)制是如何實(shí)現(xiàn)的?
Flink的容錯(cuò)機(jī)制的核心部分是制作分布式數(shù)據(jù)流和操作算子狀態(tài)的一致性快照。這些快照充當(dāng)一致性checkpoint,系統(tǒng)可以在發(fā)生故障時(shí)回滾。Flink用于制作這些快照的機(jī)制在“分布式數(shù)據(jù)流的輕量級(jí)異步快照”中進(jìn)行了描述。它受到分布式快照的標(biāo)準(zhǔn)Chandy-Lamport算法的啟發(fā),專門針對(duì)Flink的執(zhí)行模型而定制。
barriers在數(shù)據(jù)流源處被注入并行數(shù)據(jù)流中。快照n的barriers被插入的位置(我們稱之為Sn)是快照所包含的數(shù)據(jù)在數(shù)據(jù)源中最大位置。例如,在Apache Kafka中,此位置將是分區(qū)中最后一條記錄的偏移量。將該位置Sn報(bào)告給checkpoint協(xié)調(diào)器(Flink的JobManager)。
然后barriers向下游流動(dòng)。當(dāng)一個(gè)中間操作算子從其所有輸入流中收到快照n的barriers時(shí),它會(huì)為快照n發(fā)出barriers進(jìn)入其所有輸出流中。一旦sink操作算子(流式DAG的末端)從其所有輸入流接收到barriers n,它就向checkpoint協(xié)調(diào)器確認(rèn)快照n完成。在所有sink確認(rèn)快照后,意味快照著已完成。
一旦完成快照n,job將永遠(yuǎn)不再向數(shù)據(jù)源請(qǐng)求Sn之前的記錄,因?yàn)榇藭r(shí)這些記錄(及其后續(xù)記錄)將已經(jīng)通過整個(gè)數(shù)據(jù)流拓?fù)?,也即是已?jīng)被處理結(jié)束。
143、簡(jiǎn)單說說FlinkSQL的是如何實(shí)現(xiàn)的?
Flink 將 SQL 校驗(yàn)、SQL 解析以及 SQL 優(yōu)化交給了Apache Calcite。Calcite 在其他很多開源項(xiàng)目里也都應(yīng)用到了,譬如 Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架構(gòu)中處于核心的地位,如下圖所示。
構(gòu)建抽象語法樹的事情交給了 Calcite 去做。SQL query 會(huì)經(jīng)過 Calcite 解析器轉(zhuǎn)變成 SQL 節(jié)點(diǎn)樹,通過驗(yàn)證后構(gòu)建成 Calcite 的抽象語法樹(也就是圖中的 Logical Plan)。另一邊,Table API 上的調(diào)用會(huì)構(gòu)建成 Table API 的抽象語法樹,并通過 Calcite 提供的 RelBuilder 轉(zhuǎn)變成 Calcite 的抽象語法樹。然后依次被轉(zhuǎn)換成邏輯執(zhí)行計(jì)劃和物理執(zhí)行計(jì)劃。
在提交任務(wù)后會(huì)分發(fā)到各個(gè) TaskManager 中運(yùn)行,在運(yùn)行時(shí)會(huì)使用 Janino 編譯器編譯代碼后運(yùn)行。
144、 Flink CDC了解嗎?
Flink CDC Connector 是ApacheFlink的一組數(shù)據(jù)源連接器,使用變化數(shù)據(jù)捕獲change data capture (CDC))從不同的數(shù)據(jù)庫中提取變更數(shù)據(jù)。Flink CDC連接器將Debezium集成為引擎來捕獲數(shù)據(jù)變更。因此,它可以充分利用Debezium的功能。
支持讀取數(shù)據(jù)庫快照,并且能夠持續(xù)讀取數(shù)據(jù)庫的變更日志,即使發(fā)生故障,也支持exactly-once 的處理語義。
對(duì)于DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個(gè)作業(yè)中使用多個(gè)數(shù)據(jù)庫和表上的變更數(shù)據(jù)。
對(duì)于Table/SQL API 的CDC connector,用戶可以使用SQL DDL創(chuàng)建CDC數(shù)據(jù)源,來監(jiān)視單個(gè)表上的數(shù)據(jù)變更。