java大數(shù)據(jù)之flink

一、Flink簡介

1.1 Flink是什么

Apache Flink是一個開源的分布式,高性能,高可用,準(zhǔn)確的流處理框架。支持實時流處理和批處理

1.2 Flink官網(wǎng)

https://flink.apache.org/

1.3 Flink特性

(1)支持批處理和數(shù)據(jù)流程序處理

(2)優(yōu)雅流暢的支持java和scala api

(3)同時支持高吞吐量和低延遲

(4)支持事件處理和無序處理通過SataStream API,基于DataFlow數(shù)據(jù)流模型

(5)在不同的時間語義(時間時間,處理時間)下支持靈活的窗口(時間,技術(shù),會話,自定義觸發(fā)器)

(6)僅處理一次的容錯擔(dān)保

(7)自動反壓機(jī)制

(8)圖處理(批) 機(jī)器學(xué)習(xí)(批) 復(fù)雜事件處理(流)

(9)在dataSet(批處理)API中內(nèi)置支持迭代程序(BSP)

(10)高效的自定義內(nèi)存管理,和健壯的切換能力在in-memory和out-of-core中

(11)兼容hadoop的mapreduce和storm

(12)集成YARN,HDFS,Hbase 和其它hadoop生態(tài)系統(tǒng)的組件


1.4 Flink應(yīng)用場景

(1)優(yōu)化電子商務(wù)的實時搜索結(jié)果:阿里巴巴的所有基礎(chǔ)設(shè)施團(tuán)隊使用flink實時更新產(chǎn)品細(xì)節(jié)和庫存信息,為用戶提供更高的關(guān)聯(lián)性。

(2)針對數(shù)據(jù)分析團(tuán)隊提供實時流處理服務(wù):king通過flink-powered數(shù)據(jù)分析平臺提供實時數(shù)據(jù)分析,從游戲數(shù)據(jù)中大幅縮短了觀察時間

(3)網(wǎng)絡(luò)/傳感器檢測和錯誤檢測:Bouygues電信公司,是法國最大的電信供應(yīng)商之一,使用flink監(jiān)控其有線和無線網(wǎng)絡(luò),實現(xiàn)快速故障響應(yīng)。

(4)商業(yè)智能分析ETL:Zalando使用flink轉(zhuǎn)換數(shù)據(jù)以便于加載到數(shù)據(jù)倉庫,將復(fù)雜的轉(zhuǎn)換操作轉(zhuǎn)化為相對簡單的并確保分析終端用戶可以更快的訪問數(shù)據(jù)。

1.5 Flink適合于的地方

(1)多種數(shù)據(jù)源(有時不可靠):當(dāng)數(shù)據(jù)是由數(shù)以百萬計的不同用戶或設(shè)備產(chǎn)生的,它是安全的假設(shè)數(shù)據(jù)會按照事件產(chǎn)生的順序到達(dá),和在上游數(shù)據(jù)失敗的情況下,一些事件可能會比他們晚幾個小時,遲到的數(shù)據(jù)也需要計算,這樣的結(jié)果是準(zhǔn)確的。

(2)應(yīng)用程序狀態(tài)管理:當(dāng)程序變得更加的復(fù)雜,比簡單的過濾或者增強(qiáng)的數(shù)據(jù)結(jié)構(gòu),這個時候管理這些應(yīng)用的狀態(tài)將會變得比較難(例如:計數(shù)器,過去數(shù)據(jù)的窗口,狀態(tài)機(jī),內(nèi)置數(shù)據(jù)庫)。flink提供了工具,這些狀態(tài)是有效的,容錯的,和可控的,所以你不需要自己構(gòu)建這些功能。

(3)數(shù)據(jù)的快速處理:有一個焦點在實時或近實時用例場景中,從數(shù)據(jù)生成的那個時刻,數(shù)據(jù)就應(yīng)該是可達(dá)的。在必要的時候,flink完全有能力滿足這些延遲。

(4)海量數(shù)據(jù)處理:這些程序需要分布在很多節(jié)點運(yùn)行來支持所需的規(guī)模。flink可以在大型的集群中無縫運(yùn)行,就像是在一個小集群一樣

1.6 Flink分布式執(zhí)行

Flink分布式程序包含2個主要的進(jìn)程:JobManager和TaskManager.當(dāng)程序運(yùn)行時,不同的進(jìn)程就會參與其中,包括Jobmanager、TaskManager和JobClient


首先,F(xiàn)link程序提交給JobClient,JobClient再提交到JobManager,JobManager負(fù)責(zé)資源的協(xié)調(diào)和Job的執(zhí)行。一旦資源分配完成,task就會分配到不同的TaskManager,TaskManager會初始化線程去執(zhí)行task,并根據(jù)程序的執(zhí)行狀態(tài)向JobManager反饋,執(zhí)行的狀態(tài)包括starting、in progress、finished以及canceled和failing等。當(dāng)Job執(zhí)行完成,結(jié)果會返回給客戶端。


二、Flink的基本概念

2.1 Stream & Transformation & Operator

用戶實現(xiàn)的Flink程序是由Stream和Transformation這兩個基本構(gòu)建塊組成,其中Stream是一個中間結(jié)果數(shù)據(jù),而Transformation是一個操作,它對一個或多個輸入Stream進(jìn)行計算處理,輸出一個或多個結(jié)果Stream。當(dāng)一個Flink程序被執(zhí)行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似于一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結(jié)束于一個或多個Sink Operator。

下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:


上圖中,F(xiàn)linkKafkaConsumer是一個Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一個Sink Operator。

2.2 Parallel Dataflow

在Flink中,程序天生是并行和分布式的:一個Stream可以被分成多個Stream分區(qū)(Stream Partitions),一個Operator可以被分成多個Operator Subtask,每一個Operator Subtask是在不同的線程中獨(dú)立執(zhí)行的。一個Operator的并行度,等于Operator Subtask的個數(shù),一個Stream的并行度總是等于生成它的Operator的并行度。


有關(guān)Parallel Dataflow的實例,如下圖所示:


上圖Streaming Dataflow的并行視圖中,展現(xiàn)了在兩個Operator之間的Stream的兩種模式:

[if !supportLists]·?[endif]One-to-one模式

比如從Source[1]到map()[1],它保持了Source的分區(qū)特性(Partitioning)和分區(qū)內(nèi)元素處理的有序性,也就是說map()[1]的Subtask看到數(shù)據(jù)流中記錄的順序,與Source[1]中看到的記錄順序是一致的。

[if !supportLists]·?[endif]Redistribution模式

這種模式改變了輸入數(shù)據(jù)流的分區(qū),比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發(fā)送數(shù)據(jù),改變了數(shù)據(jù)流的分區(qū),這與實際應(yīng)用所選擇的Operator有關(guān)系。另外,Source Operator對應(yīng)2個Subtask,所以并行度為2,而Sink Operator的Subtask只有1個,故而并行度為1。

2.3 Task & Operator Chain

在Flink分布式執(zhí)行環(huán)境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執(zhí)行鏈,每個執(zhí)行鏈會在TaskManager上一個獨(dú)立的線程中執(zhí)行,如下圖所示:


上圖中上半部分表示的是一個Operator Chain,多個Operator通過Stream連接,而每個Operator在運(yùn)行時對應(yīng)一個Task;圖中下半部分是上半部分的一個并行版本,也就是對每一個Task都并行化為多個Subtask。

2.4 Time & Window

Flink支持基于時間窗口操作,也支持基于數(shù)據(jù)的窗口操作,如下圖所示:


上圖中,基于時間的窗口操作,在每個相同的時間間隔對Stream中的記錄進(jìn)行處理,通常各個時間間隔內(nèi)的窗口操作處理的記錄數(shù)不固定;而基于數(shù)據(jù)驅(qū)動的窗口操作,可以在Stream中選擇固定數(shù)量的記錄作為一個窗口,對該窗口中的記錄進(jìn)行處理。

有關(guān)窗口操作的不同類型,可以分為如下幾種:傾斜窗口(Tumbling Windows,記錄沒有重疊)、滑動窗口(Slide Windows,記錄有重疊)、會話窗口(Session Windows),具體可以查閱相關(guān)資料。

在處理Stream中的記錄時,記錄中通常會包含各種典型的時間字段,F(xiàn)link支持多種時間的處理,如下圖所示:


上圖描述了在基于Flink的流處理系統(tǒng)中,各種不同的時間所處的位置和含義,其中,Event Time表示事件創(chuàng)建時間,Ingestion Time表示事件進(jìn)入到Flink Dataflow的時間 ,Processing Time表示某個Operator對事件進(jìn)行處理事的本地系統(tǒng)時間(是在TaskManager節(jié)點上)。這里,談一下基于Event Time進(jìn)行處理的問題,通常根據(jù)Event Time會給整個Streaming應(yīng)用帶來一定的延遲性,因為在一個基于事件的處理系統(tǒng)中,進(jìn)入系統(tǒng)的事件可能會基于Event Time而發(fā)生亂序現(xiàn)象,比如事件來源于外部的多個系統(tǒng),為了增強(qiáng)事件處理吞吐量會將輸入的多個Stream進(jìn)行自然分區(qū),每個Stream分區(qū)內(nèi)部有序,但是要保證全局有序必須同時兼顧多個Stream分區(qū)的處理,設(shè)置一定的時間窗口進(jìn)行暫存數(shù)據(jù),當(dāng)多個Stream分區(qū)基于Event Time排列對齊后才能進(jìn)行延遲處理。所以,設(shè)置的暫存數(shù)據(jù)記錄的時間窗口越長,處理性能越差,甚至嚴(yán)重影響Stream處理的實時性。

有關(guān)基于時間的Streaming處理,可以參考官方文檔,在Flink中借鑒了Google使用的WaterMark實現(xiàn)方式,可以查閱相關(guān)資料。


三、Flink基本架構(gòu)

Flink系統(tǒng)的架構(gòu)與Spark類似,是一個基于Master-Slave風(fēng)格的架構(gòu),如下圖所示:


Flink集群啟動時,會啟動一個JobManager進(jìn)程、至少一個TaskManager進(jìn)程。在Local模式下,會在同一個JVM內(nèi)部啟動一個JobManager進(jìn)程和TaskManager進(jìn)程。當(dāng)Flink程序提交后,會創(chuàng)建一個Client來進(jìn)行預(yù)處理,并轉(zhuǎn)換為一個并行數(shù)據(jù)流,這是對應(yīng)著一個Flink Job,從而可以被JobManager和TaskManager執(zhí)行。在實現(xiàn)上,F(xiàn)link基于Actor實現(xiàn)了JobManager和TaskManager,所以JobManager與TaskManager之間的信息交換,都是通過事件的方式來進(jìn)行處理。

如上圖所示,F(xiàn)link系統(tǒng)主要包含如下3個主要的進(jìn)程:

3.1 JobManager

JobManager是Flink系統(tǒng)的協(xié)調(diào)者,它負(fù)責(zé)接收Flink Job,調(diào)度組成Job的多個Task的執(zhí)行。同時,JobManager還負(fù)責(zé)收集Job的狀態(tài)信息,并管理Flink集群中從節(jié)點TaskManager。JobManager所負(fù)責(zé)的各項管理功能,它接收到并處理的事件主要包括:

(1)RegisterTaskManager

在Flink集群啟動的時候,TaskManager會向JobManager注冊,如果注冊成功,則JobManager會向TaskManager回復(fù)消息AcknowledgeRegistration。

(2)SubmitJob

Flink程序內(nèi)部通過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。

(3)CancelJob

請求取消一個Flink Job的執(zhí)行,CancelJob消息中包含了Job的ID,如果成功則返回消息CancellationSuccess,失敗則返回消息CancellationFailure。

(4)UpdateTaskExecutionState

TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態(tài)信息,更新成功則返回true。

(5)RequestNextInputSplit

運(yùn)行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。

(6)JobStatusChanged

ExecutionGraph向JobManager發(fā)送該消息,用來表示Flink Job的狀態(tài)發(fā)生的變化,例如:RUNNING、CANCELING、FINISHED等。

3.2 TaskManager

TaskManager也是一個Actor,它是實際負(fù)責(zé)執(zhí)行計算的Worker,在其上執(zhí)行Flink Job的一組Task。每個TaskManager負(fù)責(zé)管理其所在節(jié)點上的資源信息,如內(nèi)存、磁盤、網(wǎng)絡(luò),在啟動的時候?qū)①Y源的狀態(tài)向JobManager匯報。TaskManager端可以分成兩個階段:

(1)注冊階段

TaskManager會向JobManager注冊,發(fā)送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然后TaskManager就可以進(jìn)行初始化過程。

(2)可操作階段

該階段TaskManager可以接收并處理與Task有關(guān)的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯(lián)系,會自動進(jìn)入“注冊階段”,只有完成注冊才能繼續(xù)處理Task相關(guān)的消息。

3.3 JOB Client

當(dāng)用戶提交一個Flink程序時,會首先創(chuàng)建一個Client,該Client首先會對用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 并且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。

3.4組件棧

Flink是一個分層架構(gòu)的系統(tǒng),每一層所包含的組件都提供了特定的抽象,用來服務(wù)于上層組件。Flink分層的組件棧如下圖所示:


下面,我們自下而上,分別針對每一層進(jìn)行解釋說明:

(1)Deployment層

該層主要涉及了Flink的部署模式,F(xiàn)link支持多種部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。Standalone部署模式與Spark類似,這里,我們看一下Flink on YARN的部署模式,如下圖所示:


了解YARN的話,對上圖的原理非常熟悉,實際Flink也實現(xiàn)了滿足在YARN集群上運(yùn)行的各個組件:Flink YARN Client負(fù)責(zé)與YARN RM通信協(xié)商資源請求,F(xiàn)link JobManager和Flink TaskManager分別申請到Container去運(yùn)行各自的進(jìn)程。通過上圖可以看到,YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運(yùn)行在YARN集群上,F(xiàn)link YARN Client就可以提交Flink Job到Flink JobManager,并進(jìn)行后續(xù)的映射、調(diào)度和計算處理。

(2)Runtime層

Runtime層提供了支持Flink計算的全部核心實現(xiàn),比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調(diào)度等等,為上層API層提供基礎(chǔ)服務(wù)。

(3)API層

API層主要實現(xiàn)了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應(yīng)DataStream API,面向批處理對應(yīng)DataSet API。

(4)Libraries層

該層也可以稱為Flink應(yīng)用框架層,根據(jù)API層的劃分,在API層之上構(gòu)建的滿足特定應(yīng)用的實現(xiàn)計算框架,也分別對應(yīng)于面向流處理和面向批處理兩類。面向流處理支持:CEP(復(fù)雜事件處理)、基于SQL-like的操作(基于Table的關(guān)系操作);面向批處理支持:FlinkML(機(jī)器學(xué)習(xí)庫)、Gelly(圖處理)。


四、Flink內(nèi)部原理

4.1容錯機(jī)制

Flink基于Checkpoint機(jī)制實現(xiàn)容錯,它的原理是不斷地生成分布式Streaming數(shù)據(jù)流Snapshot。在流處理失敗時,通過這些Snapshot可以恢復(fù)數(shù)據(jù)流處理。理解Flink的容錯機(jī)制,首先需要了解一下Barrier這個概念:

Stream Barrier是Flink分布式Snapshotting中的核心元素,它會作為數(shù)據(jù)流的記錄被同等看待,被插入到數(shù)據(jù)流中,將數(shù)據(jù)流中記錄的進(jìn)行分組,并沿著數(shù)據(jù)流的方向向前推進(jìn)。每個Barrier會攜帶一個Snapshot ID,屬于該Snapshot的記錄會被推向該Barrier的前方。因為Barrier非常輕量,所以并不會中斷數(shù)據(jù)流。帶有Barrier的數(shù)據(jù)流,如下圖所示:


基于上圖,我們通過如下要點來說明:

[if !supportLists]·?[endif]出現(xiàn)一個Barrier,在該Barrier之前出現(xiàn)的記錄都屬于該Barrier對應(yīng)的Snapshot,在該Barrier之后出現(xiàn)的記錄屬于下一個Snapshot

[if !supportLists]·?[endif]來自不同Snapshot多個Barrier可能同時出現(xiàn)在數(shù)據(jù)流中,也就是說同一個時刻可能并發(fā)生成多個Snapshot

[if !supportLists]·?[endif]當(dāng)一個中間(Intermediate)Operator接收到一個Barrier后,它會發(fā)送Barrier到屬于該Barrier的Snapshot的數(shù)據(jù)流中,等到Sink Operator接收到該Barrier后會向Checkpoint Coordinator確認(rèn)該Snapshot,直到所有的Sink Operator都確認(rèn)了該Snapshot,才被認(rèn)為完成了該Snapshot

這里還需要強(qiáng)調(diào)的是,Snapshot并不僅僅是對數(shù)據(jù)流做了一個狀態(tài)的Checkpoint,它也包含了一個Operator內(nèi)部所持有的狀態(tài),這樣才能夠在保證在流處理系統(tǒng)失敗時能夠正確地恢復(fù)數(shù)據(jù)流處理。也就是說,如果一個Operator包含任何形式的狀態(tài),這種狀態(tài)必須是Snapshot的一部分。Operator的狀態(tài)包含兩種:一種是系統(tǒng)狀態(tài),一個Operator進(jìn)行計算處理的時候需要對數(shù)據(jù)進(jìn)行緩沖,所以數(shù)據(jù)緩沖區(qū)的狀態(tài)是與Operator相關(guān)聯(lián)的,以窗口操作的緩沖區(qū)為例,F(xiàn)link系統(tǒng)會收集或聚合記錄數(shù)據(jù)并放到緩沖區(qū)中,直到該緩沖區(qū)中的數(shù)據(jù)被處理完成;另一種是用戶自定義狀態(tài)(狀態(tài)可以通過轉(zhuǎn)換函數(shù)進(jìn)行創(chuàng)建和修改),它可以是函數(shù)中的Java對象這樣的簡單變量,也可以是與函數(shù)相關(guān)的Key/Value狀態(tài)。對于具有輕微狀態(tài)的Streaming應(yīng)用,會生成非常輕量的Snapshot而且非常頻繁,但并不會影響數(shù)據(jù)流處理性能。Streaming應(yīng)用的狀態(tài)會被存儲到一個可配置的存儲系統(tǒng)中,例如HDFS。在一個Checkpoint執(zhí)行過程中,存儲的狀態(tài)信息及其交互過程,如下圖所示:


在Checkpoint過程中,還有一個比較重要的操作——Stream Aligning。當(dāng)Operator接收到多個輸入的數(shù)據(jù)流時,需要在Snapshot Barrier中對數(shù)據(jù)流進(jìn)行排列對齊,如下圖所示:


具體排列過程如下:

[if !supportLists]·?[endif]Operator從一個incoming Stream接收到Snapshot Barrier n,然后暫停處理,直到其它的incoming Stream的Barrier n(否則屬于2個Snapshot的記錄就混在一起了)到達(dá)該Operator

[if !supportLists]·?[endif]接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中

[if !supportLists]·?[endif]一旦最后一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然后向Checkpoint Coordinator發(fā)送Snapshot n

[if !supportLists]·?[endif]繼續(xù)處理來自多個Stream的記錄

基于Stream Aligning操作能夠?qū)崿F(xiàn)Exactly Once語義,但是也會給流處理應(yīng)用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數(shù)據(jù)流并行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關(guān),選擇是否使用Stream Aligning,如果關(guān)掉則Exactly Once會變成At least once。

4.2 調(diào)度機(jī)制

在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉(zhuǎn)換映射為一個ExecutionGraph,如下圖所示:


通過上圖可以看出:JobGraph是一個Job的用戶邏輯視圖表示,將一個用戶要對數(shù)據(jù)流進(jìn)行的處理表示為單個DAG圖(對應(yīng)于JobGraph),DAG圖由頂點(JobVertex)和中間結(jié)果集(IntermediateDataSet)組成,其中JobVertex表示了對數(shù)據(jù)流進(jìn)行的轉(zhuǎn)換操作,比如map、flatMap、filter、keyBy等操作,而IntermediateDataSet是由上游的JobVertex所生成,同時作為下游的JobVertex的輸入。

而ExecutionGraph是JobGraph的并行表示,也就是實際JobManager調(diào)度一個Job在TaskManager上運(yùn)行的邏輯視圖,它也是一個DAG圖,是由ExecutionJobVertex、IntermediateResult(或IntermediateResultPartition)組成,ExecutionJobVertex實際對應(yīng)于JobGraph圖中的JobVertex,只不過在ExecutionJobVertex內(nèi)部是一種并行表示,由多個并行的ExecutionVertex所組成。另外,這里還有一個重要的概念,就是Execution,它是一個ExecutionVertex的一次運(yùn)行Attempt,也就是說,一個ExecutionVertex可能對應(yīng)多個運(yùn)行狀態(tài)的Execution,比如,一個ExecutionVertex運(yùn)行產(chǎn)生了一個失敗的Execution,然后還會創(chuàng)建一個新的Execution來運(yùn)行,這時就對應(yīng)這個2次運(yùn)行Attempt。每個Execution通過ExecutionAttemptID來唯一標(biāo)識,在TaskManager和JobManager之間進(jìn)行Task狀態(tài)的交換都是通過ExecutionAttemptID來實現(xiàn)的。


下面看一下,在物理上進(jìn)行調(diào)度,基于資源的分配與使用的一個例子,來自官網(wǎng),如下圖所示:


說明如下:

[if !supportLists]·?[endif]左上子圖:有2個TaskManager,每個TaskManager有3個Task Slot

[if !supportLists]·?[endif]左下子圖:一個Flink Job,邏輯上包含了1個data source、1個MapFunction、1個ReduceFunction,對應(yīng)一個JobGraph

[if !supportLists]·?[endif]左下子圖:用戶提交的Flink Job對各個Operator進(jìn)行的配置——data source的并行度設(shè)置為4,MapFunction的并行度也為4,ReduceFunction的并行度為3,在JobManager端對應(yīng)于ExecutionGraph

[if !supportLists]·?[endif]右上子圖:TaskManager 1上,有2個并行的ExecutionVertex組成的DAG圖,它們各占用一個Task Slot

[if !supportLists]·?[endif]右下子圖:TaskManager 2上,也有2個并行的ExecutionVertex組成的DAG圖,它們也各占用一個Task Slot

[if !supportLists]·?[endif]在2個TaskManager上運(yùn)行的4個Execution是并行執(zhí)行的

4.3 迭代機(jī)制

機(jī)器學(xué)習(xí)和圖計算應(yīng)用,都會使用到迭代計算,F(xiàn)link通過在迭代Operator中定義Step函數(shù)來實現(xiàn)迭代算法,這種迭代算法包括Iterate和Delta Iterate兩種類型,在實現(xiàn)上它們反復(fù)地在當(dāng)前迭代狀態(tài)上調(diào)用Step函數(shù),直到滿足給定的條件才會停止迭代。下面,對Iterate和Delta Iterate兩種類型的迭代算法原理進(jìn)行說明:

[if !supportLists]·?[endif]Iterate

Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數(shù)的輸入或者是輸入的整個數(shù)據(jù)集,或者是上一輪迭代的結(jié)果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件后,會輸出最終迭代結(jié)果,具體執(zhí)行流程如下圖所示:


Step函數(shù)在每一輪迭代中都會被執(zhí)行,它可以是由map、reduce、join等Operator組成的數(shù)據(jù)流。下面通過官網(wǎng)給出的一個例子來說明Iterate Operator,非常簡單直觀,如下圖所示:


上面迭代過程中,輸入數(shù)據(jù)為1到5的數(shù)字,Step函數(shù)就是一個簡單的map函數(shù),會對每個輸入的數(shù)字進(jìn)行加1處理,而Next Partial Solution對應(yīng)于經(jīng)過map函數(shù)處理后的結(jié)果,比如第一輪迭代,對輸入的數(shù)字1加1后結(jié)果為2,對輸入的數(shù)字2加1后結(jié)果為3,直到對輸入數(shù)字5加1后結(jié)果為變?yōu)?,這些新生成結(jié)果數(shù)字2~6會作為第二輪迭代的輸入。迭代終止條件為進(jìn)行10輪迭代,則最終的結(jié)果為11~15。

[if !supportLists]·?[endif]Delta Iterate

Delta Iterate Operator實現(xiàn)了增量迭代,它的實現(xiàn)原理如下圖所示:


基于Delta Iterate Operator實現(xiàn)增量迭代,它有2個輸入,其中一個是初始Workset,表示輸入待處理的增量Stream數(shù)據(jù),另一個是初始Solution Set,它是經(jīng)過Stream方向上Operator處理過的結(jié)果。第一輪迭代會將Step函數(shù)作用在初始Workset上,得到的計算結(jié)果Workset作為下一輪迭代的輸入,同時還要增量更新初始Solution Set。如果反復(fù)迭代知道滿足迭代終止條件,最后會根據(jù)Solution Set的結(jié)果,輸出最終迭代結(jié)果。比如,我們現(xiàn)在已知一個Solution集合中保存的是,已有的商品分類大類中購買量最多的商品,而Workset輸入的是來自線上實時交易中最新達(dá)成購買的商品的人數(shù),經(jīng)過計算會生成新的商品分類大類中商品購買量最多的結(jié)果,如果某些大類中商品購買量突然增長,它需要更新Solution Set中的結(jié)果(原來購買量最多的商品,經(jīng)過增量迭代計算,可能已經(jīng)不是最多),最后會輸出最終商品分類大類中購買量最多的商品結(jié)果集合。更詳細(xì)的例子,可以參考官網(wǎng)給出的“Propagate Minimum in Graph”,這里不再累述。

4.4 Backpressure監(jiān)控

Backpressure在流式計算系統(tǒng)中會比較受到關(guān)注,因為在一個Stream上進(jìn)行處理的多個Operator之間,它們處理速度和方式可能非常不同,所以就存在上游Operator如果處理速度過快,下游Operator處可能機(jī)會堆積Stream記錄,嚴(yán)重會造成處理延遲或下游Operator負(fù)載過重而崩潰(有些系統(tǒng)可能會丟失數(shù)據(jù))。因此,對下游Operator處理速度跟不上的情況,如果下游Operator能夠?qū)⒆约禾幚頎顟B(tài)傳播給上游Operator,使得上游Operator處理速度慢下來就會緩解上述問題,比如通過告警的方式通知現(xiàn)有流處理系統(tǒng)存在的問題。

Flink Web界面上提供了對運(yùn)行Job的Backpressure行為的監(jiān)控,它通過使用Sampling線程對正在運(yùn)行的Task進(jìn)行堆棧跟蹤采樣來實現(xiàn),具體實現(xiàn)方式如下圖所示:


JobManager會反復(fù)調(diào)用一個Job的Task運(yùn)行所在線程的Thread.getStackTrace(),默認(rèn)情況下,JobManager會每間隔50ms觸發(fā)對一個Job的每個Task依次進(jìn)行100次堆棧跟蹤調(diào)用,根據(jù)調(diào)用調(diào)用結(jié)果來確定Backpressure,F(xiàn)link是通過計算得到一個比值(Radio)來確定當(dāng)前運(yùn)行Job的Backpressure狀態(tài)。在Web界面上可以看到這個Radio值,它表示在一個內(nèi)部方法調(diào)用中阻塞(Stuck)的堆棧跟蹤次數(shù),例如,radio=0.01,表示100次中僅有1次方法調(diào)用阻塞。Flink目前定義了如下Backpressure狀態(tài):

[if !supportLists]·?[endif]OK: 0 <= Ratio <= 0.10

[if !supportLists]·?[endif]LOW: 0.10 < Ratio <= 0.5

[if !supportLists]·?[endif]HIGH: 0.5 < Ratio <= 1

另外,F(xiàn)link還提供了3個參數(shù)來配置Backpressure監(jiān)控行為:

參數(shù)名稱默認(rèn)值說明

jobmanager.web.backpressure.refresh-interval60000默認(rèn)1分鐘,表示采樣統(tǒng)計結(jié)果刷新時間間隔

jobmanager.web.backpressure.num-samples100評估Backpressure狀態(tài),所使用的堆棧跟蹤調(diào)用次數(shù)

jobmanager.web.backpressure.delay-between-samples50默認(rèn)50毫秒,表示對一個Job的每個Task依次調(diào)用的時間間隔

通過上面?zhèn)€定義的Backpressure狀態(tài),以及調(diào)整相應(yīng)的參數(shù),可以確定當(dāng)前運(yùn)行的Job的狀態(tài)是否正常,并且保證不影響JobManager提供服務(wù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容