問題導(dǎo)讀:
1.怎樣優(yōu)化Logical Plan?
2.怎樣優(yōu)化Stream Graph?
3.TimeWindow,EventTime,ProcessTime 和 Watermark四者之間的關(guān)系是什么?
序言
? ?? ? 時(shí)效性提升數(shù)據(jù)的價(jià)值,所以Flink這樣的流式(Streaming)計(jì)算系統(tǒng)應(yīng)用得越來越廣泛。
? ?? ? 廣大的普通用戶決定一個(gè)產(chǎn)品的界面和接口。
ETL開發(fā)者需要簡(jiǎn)單而有效的開發(fā)工具,從而把更多時(shí)間花在理業(yè)務(wù)和對(duì)口徑上。
因此流式計(jì)算系統(tǒng)都趨同以SQL作為唯一開發(fā)語言,讓用戶以Table形式操作Stream。
程序開發(fā)三部曲:First make it work, then make it right, and, finally, make it fast.
? ?? ?流計(jì)算開發(fā)者面對(duì)的現(xiàn)狀及趨勢(shì):
? ?? ?第一步,讓程序運(yùn)行起來。
開發(fā)者能用SQL方便地表達(dá)問題。
開發(fā)者能通過任務(wù)管理系統(tǒng)一體化地管理任務(wù),如:開發(fā),上線,調(diào)優(yōu),監(jiān)控和排查任務(wù)。
? ?? ? 第二步,讓程序運(yùn)行正確。
簡(jiǎn)單數(shù)據(jù)清洗之外的流計(jì)算開發(fā)需求通常會(huì)涉及到Streaming SQL的兩個(gè)核心擴(kuò)展:Window 和 Emit。
開發(fā)者深入理解Window和 Emit的語義是正確實(shí)現(xiàn)這些業(yè)務(wù)需求的關(guān)鍵,
否則無法在數(shù)據(jù)時(shí)效性和數(shù)據(jù)準(zhǔn)確性上做適合各個(gè)業(yè)務(wù)場(chǎng)景的決策和折中。
? ?? ? 第三步,讓程序運(yùn)行越來越快。
蘋果每年都會(huì)發(fā)布新手機(jī):使用了**芯片,性能提升了多少,耗電降低了多少,增加**功能...。
當(dāng)前,流計(jì)算系統(tǒng)每年也會(huì)有很大的性能提升和功能擴(kuò)展,但想要深入調(diào)優(yōu)及排錯(cuò),
還是要學(xué)習(xí)分布式系統(tǒng)的各個(gè)組件及原理,各種算子實(shí)現(xiàn)方法,性能優(yōu)化技術(shù)等知識(shí)。
以后,隨著系統(tǒng)的進(jìn)一步成熟和完善,開發(fā)者在性能優(yōu)化上的負(fù)擔(dān)會(huì)越來越低
無需了解底層技術(shù)實(shí)現(xiàn)細(xì)節(jié)和手動(dòng)配置各種參數(shù),就能享受性能和穩(wěn)定性的逐步提升。
? ?? ?分布式系統(tǒng)的一致性和可用性是一對(duì)矛盾。
流計(jì)算系統(tǒng)的數(shù)據(jù)準(zhǔn)確性和數(shù)據(jù)時(shí)效性也是一對(duì)矛盾。
應(yīng)用開發(fā)者都需要認(rèn)識(shí)到這些矛盾,并且知道自己在什么場(chǎng)景下該作何種取舍。
? ???本文希望通過剖析Flink Streaming SQL的三個(gè)具體例子:Union,Group By 和 Join ,
來依次闡述流式計(jì)算模型的核心概念:?What, Where, When, How?。
以便開發(fā)者加深對(duì)Streaming SQL的Window 和 Emit語義的理解,
從而能在數(shù)據(jù)準(zhǔn)確性和數(shù)據(jù)時(shí)效性上做適合業(yè)務(wù)場(chǎng)景的折中和取舍。
也順帶介紹Streaming SQL的底層實(shí)現(xiàn),以便于SQL任務(wù)的開發(fā)和調(diào)優(yōu)。
UNION
? ?? ? 通過這個(gè)例子來闡述Streaming SQL的底層實(shí)現(xiàn)和優(yōu)化手段:Logical Plan Optimization 和 Operator Chaining。
例子
改編自Flink?StreamSQLExample?。只在最外層加了一個(gè)Filter,以便觸發(fā)Filter下推及合并。
Source

SQL

Sink

運(yùn)行結(jié)果

轉(zhuǎn)換Table為Stream
? ?? ?? ???Flink 會(huì)把基于Table的Streaming SQL轉(zhuǎn)為基于Stream的底層算子,并同時(shí)完成Logical Plan及Operator Chaining等優(yōu)化
轉(zhuǎn)為邏輯計(jì)劃(Logical Plan)
上述UNION ALL SQL依據(jù)Relational Algebra轉(zhuǎn)換為下面的邏輯計(jì)劃:

SQL字段與邏輯計(jì)劃有如下的對(duì)應(yīng)關(guān)系:

優(yōu)化Logical Plan理論基礎(chǔ)冪等
數(shù)學(xué):??19 * 10 * 1??* 1 = 19 * 10 = 190
SQL:??SELECT * FROM (SELECT user, product FROM OrderA) =??SELECT user, product FROM OrderA
數(shù)學(xué):10 * 19 = 19 * 10 = 190
SQL:? ?tableA UNION ALL tableB??= tableB UNION ALL tableA
數(shù)學(xué):
(1900 * 0.5)* 0.2 = 1900 * (0.5 * 0.2) = 190
1900 * (1.0 + 0.01) = 1900 * 1.0 + 1900 * 0.01 = 1919
SQL:
SELECT * FROM (SELECT user, amount FROM OrderA) WHERE amount > 2?
SELECT * FROM (SELECT user, amount FROM OrderA WHERE amount > 2)
優(yōu)化過程
Flink的邏輯計(jì)劃優(yōu)化規(guī)則清單請(qǐng)見:?FlinkRuleSets
此Union All例子根據(jù)冪等,交換律和結(jié)合律來完成以下三步優(yōu)化:
消除冗余的Project
? ?? ?? ?? ?? ?? ?? ? 利用冪等特性,消除冗余的Project

下推Filter
? ?? ?? ?? ?? ?? ? 利用交換率和結(jié)合律特性,下推Filter。

合并Filter? ?
? ?? ?? ?? ?? ???利用結(jié)合律,合并Filter。

轉(zhuǎn)為物理計(jì)劃(Physical Plan)
轉(zhuǎn)換后的Flink的物理執(zhí)行計(jì)劃如下:

有Physical Plan優(yōu)化這一步驟,但對(duì)以上例子沒有效果,所以忽略。
? ?? ???這樣,加上Source和Sink,產(chǎn)生了如下的Stream Graph:

優(yōu)化Stream Graph
通過Task Chaining來減少上下游算子的數(shù)據(jù)傳輸消耗,從而提高性能。
Chaining判斷條件

Chaining結(jié)果??
? ?? ?? ?? ?按深度優(yōu)先的順序遍歷Stream Graph,最終產(chǎn)生5個(gè)Task任務(wù)。

GROUP BY
? ?? ? 將以滾動(dòng)窗口的GROUP BY來闡述Streaming SQL里的Window和Emit語義,
及其背后的Streaming的Where(Window)和When(Watermark和Trigger)的概念及關(guān)系。
例子?Source

Water Mark
? ?? ? 簡(jiǎn)單地把最新的EventTime減去Offset。

SQL
? ?? ? 按用戶加滾動(dòng)窗口進(jìn)行Group By。

Sink

轉(zhuǎn)換Table為Stream
? ?? ?因?yàn)閁nion All例子比較詳細(xì)地闡述了轉(zhuǎn)換規(guī)則,此處只討論特殊之處。
轉(zhuǎn)為邏輯計(jì)劃(Logical Plan)

優(yōu)化Logical Plan

GROUP BY優(yōu)化:把{“User + Window” -> SUM} 轉(zhuǎn)為 {User -> {Window -> SUM}}。
新的數(shù)據(jù)結(jié)構(gòu)確保同一User下所有Window都會(huì)被分配到同一個(gè)Operator,以便實(shí)現(xiàn)SessionWindow的Merge功能。

轉(zhuǎn)為物理計(jì)劃(Physical Plan)

優(yōu)化Stream Graph
? ?? ? 經(jīng)過Task Chaining優(yōu)化后,最終生成3個(gè)Task。

Streaming各基本概念之間的聯(lián)系
? ?? ?? ?此處希望以圖表的形式闡述各個(gè)概念之間的關(guān)系。
Window和EventTime
Flink支持三種Window類型:?Tumbling Windows?,?Sliding Windows?和?Session Windows
每個(gè)事件的EventTime決定事件會(huì)落到哪些TimeWindow。
但只有Window的第一個(gè)數(shù)據(jù)來到時(shí),Window才會(huì)被真正創(chuàng)建。

Window和WaterMark
? ?? ?? ???可以設(shè)置TimeWindow的AllowedLateness,從而使Window可以處理延時(shí)數(shù)據(jù)。
只有當(dāng)WaterMark超過TimeWindow.end + AllowedLateness時(shí),Window才會(huì)被銷毀。

TimeWindow,EventTime,ProcessTime 和 Watermark
? ?? ? 我們以WaterMark的推進(jìn)圖來闡述這四者之間的關(guān)系。
Window為TumbleWindow,窗口大小為1小時(shí),允許的數(shù)據(jù)延遲為1小時(shí)。

WaterMark和EventTime:
新數(shù)據(jù)的最新Eventime推進(jìn)WaterMark。
TimeWindow的生命周期:
? ?? ?以下三條數(shù)據(jù)的EventTime決定TimeWindow的狀態(tài)轉(zhuǎn)換。
數(shù)據(jù)1的Eventtime屬于Window[10:00, 11,00),因?yàn)閃indow不存在,所以創(chuàng)建此Window。
數(shù)據(jù)2的Eventime推進(jìn)WaterMark超過11:00(Window.end),所以觸發(fā)Pass End。
數(shù)據(jù)3的Eventime推進(jìn)WaterMark超過12:00(Window.end + allowedLateness), 所以關(guān)閉此Window。
TimeWindow的結(jié)果輸出:
? ?? ? 用戶可以通過Trigger來控制窗口結(jié)果的輸出,按窗口的狀態(tài)類型有以下三種Trigger。

Flink的Streaming SQL目前只支持PassEnd Trigger,且默認(rèn)AllowedLateness = 0。
? ?? ? 如果觸發(fā)頻率是Repeated,比如:每分鐘, 往下游輸出一次。那么這個(gè)時(shí)間只能是ProcessTime。
因?yàn)閃arkMark在不同場(chǎng)景下會(huì)有不同推進(jìn)速度,比如處理一小時(shí)的數(shù)據(jù),
可能只需十分鐘(重跑),一個(gè)小時(shí)(正常運(yùn)行)或 大于1小時(shí)(積壓)。
運(yùn)行結(jié)果
? ? 允許數(shù)據(jù)亂序是分布式系統(tǒng)能夠并發(fā)處理消息的前提。
當(dāng)前這個(gè)例子,數(shù)據(jù)如果亂序可以產(chǎn)生不同的輸出結(jié)果。
數(shù)據(jù)有序SUM算子接收到的數(shù)據(jù)
? ? 數(shù)據(jù)的Eventtime按升序排列。

WarterMark推進(jìn)圖
? ?每條新數(shù)據(jù)都能推進(jìn)Watermark。

結(jié)果輸出
? ???所有數(shù)據(jù)都被處理,沒有數(shù)據(jù)被丟棄。? ?? ?

數(shù)據(jù)亂序SUM算子接收到的數(shù)據(jù)
? ???第四條事件延時(shí)到來。

WarterMark推進(jìn)圖
? ?延遲的數(shù)據(jù)不會(huì)推進(jìn)WaterMark,且被丟棄。

輸出結(jié)果
沒有統(tǒng)計(jì)因延遲被丟棄的第四條事件。

JOIN
? ?將通過此例子來闡述Streaming的Retraction語義。
例子Source

SQL
廣告的展現(xiàn)LEFT JOIN 廣告的點(diǎn)擊來更新狀態(tài):showed 或 clicked。

Sink
? ?LEFT JOIN 可能會(huì)發(fā)送多條數(shù)據(jù)到下游。
因此必須轉(zhuǎn)為RetractionStream,讓下游算子有機(jī)會(huì)能撤銷前次輸出,從而只產(chǎn)生一條最終結(jié)果。

轉(zhuǎn)換Table為Stream
? ? RetractionStream沒有引入特殊變化。
轉(zhuǎn)為邏輯計(jì)劃(Logical Plan)

優(yōu)化Logical Plan??

轉(zhuǎn)為物理計(jì)劃(Physical Plan)

優(yōu)化Stream Graph

運(yùn)行結(jié)果
? ? 結(jié)果數(shù)據(jù)的首個(gè)字段為標(biāo)志位,True為正常數(shù)據(jù),F(xiàn)alse為Retract數(shù)據(jù)。

RetractJoin的執(zhí)行邏輯請(qǐng)見:NonWindowOuterJoin
? ? ImpressionId = 1這條數(shù)據(jù)的ReactJoin執(zhí)行過程。
1: Left流的Show消息先到:??Show("1", "show", "2018-10-10 10:10:10")
因?yàn)橹皼]有輸出,所以無需Retrcact。
只輸出:??(true, 1,2018-10-10 10:10:10,showed)
2: Right流的Click消息后到:Click("1", "click", "2018-10-10 10:13:11")
因?yàn)橹耙演敵鲞^結(jié)果,所以需要Retract,輸出:
(false, 1,2018-10-10 10:10:10,showed)
然后再輸出新結(jié)果,
(true, 1,2018-10-10 10:10:10,clicked)
? ?如上可知,Retraction流相當(dāng)于把一條UPDATE消息分別拆成一條DELETE和一條INSERT消息。
Retraction Stream
? ???雖然Retraction機(jī)制最多增加一倍的數(shù)據(jù)傳輸量,但能降低下游算子的存儲(chǔ)負(fù)擔(dān)和撤銷實(shí)現(xiàn)難度。
傳遞
我們?cè)贚eft Join的輸出流后加一個(gè)GROUP BY,以觀察Retraction流的后續(xù)算子的輸出。? ?

可能得到以下的GROUP BY輸出:

由此可見,Retraction具有傳遞性,RetractStream的后續(xù)的Stream也會(huì)是RetractionStream。
終止
? ?? ?最終需要支持Retraction的Sink來終止RetractionStream,比如:

最終輸出retractedResults:

存儲(chǔ)
只有外部存儲(chǔ)支持UPDATE或DELETE操作時(shí),才能實(shí)現(xiàn)RetractionSink。
常見的KV存儲(chǔ)和數(shù)據(jù)庫,如HBase,Mysql都可實(shí)現(xiàn)RetractionSink。
后續(xù)程序總能從這些存儲(chǔ)中讀取最新數(shù)據(jù),上游是否是Retraction流對(duì)用戶是透明的。
常見的消息隊(duì)列,如Kafka,只支持APPEND操作,則不能實(shí)現(xiàn)RetractionSink。
后續(xù)程序從這些消息隊(duì)列可能會(huì)讀到重復(fù)數(shù)據(jù),因此用戶需要在后續(xù)程序中處理重復(fù)數(shù)據(jù)。
總結(jié)
? ?? ?Flink Streaming SQL的實(shí)現(xiàn)從上到下共有三層:
其中“Streaming Data Model” 和 “Distributed Snapshot” 是Flink這個(gè)分布式流計(jì)算系統(tǒng)的核心架構(gòu)設(shè)計(jì)。
“Streaming Data Model”的What, Where, When, How?明確了流計(jì)算系統(tǒng)的表達(dá)能力及預(yù)期應(yīng)用場(chǎng)景。
“Distributed Snapshots”針對(duì)預(yù)期的應(yīng)用場(chǎng)景在數(shù)據(jù)準(zhǔn)確性,系統(tǒng)穩(wěn)定性和運(yùn)行性能上做了合適的折中。
? ? 本文通過實(shí)例闡述了流計(jì)算開發(fā)者需要了解的最上面兩層的概念和原理,
以便流計(jì)算開發(fā)者能在數(shù)據(jù)準(zhǔn)確性和數(shù)據(jù)時(shí)效性上做適合業(yè)務(wù)場(chǎng)景的折中和取舍。