Flink Streaming SQL深度篇

問題導(dǎo)讀:

1.怎樣優(yōu)化Logical Plan?

2.怎樣優(yōu)化Stream Graph?

3.TimeWindow,EventTime,ProcessTime 和 Watermark四者之間的關(guān)系是什么?

序言

? ?? ? 時(shí)效性提升數(shù)據(jù)的價(jià)值,所以Flink這樣的流式(Streaming)計(jì)算系統(tǒng)應(yīng)用得越來越廣泛。

? ?? ? 廣大的普通用戶決定一個(gè)產(chǎn)品的界面和接口。

ETL開發(fā)者需要簡(jiǎn)單而有效的開發(fā)工具,從而把更多時(shí)間花在理業(yè)務(wù)和對(duì)口徑上。

因此流式計(jì)算系統(tǒng)都趨同以SQL作為唯一開發(fā)語言,讓用戶以Table形式操作Stream。

程序開發(fā)三部曲:First make it work, then make it right, and, finally, make it fast.

? ?? ?流計(jì)算開發(fā)者面對(duì)的現(xiàn)狀及趨勢(shì):

? ?? ?第一步,讓程序運(yùn)行起來。

開發(fā)者能用SQL方便地表達(dá)問題。

開發(fā)者能通過任務(wù)管理系統(tǒng)一體化地管理任務(wù),如:開發(fā),上線,調(diào)優(yōu),監(jiān)控和排查任務(wù)。

? ?? ? 第二步,讓程序運(yùn)行正確。

簡(jiǎn)單數(shù)據(jù)清洗之外的流計(jì)算開發(fā)需求通常會(huì)涉及到Streaming SQL的兩個(gè)核心擴(kuò)展:Window 和 Emit。

開發(fā)者深入理解Window和 Emit的語義是正確實(shí)現(xiàn)這些業(yè)務(wù)需求的關(guān)鍵,

否則無法在數(shù)據(jù)時(shí)效性和數(shù)據(jù)準(zhǔn)確性上做適合各個(gè)業(yè)務(wù)場(chǎng)景的決策和折中。

? ?? ? 第三步,讓程序運(yùn)行越來越快。

蘋果每年都會(huì)發(fā)布新手機(jī):使用了**芯片,性能提升了多少,耗電降低了多少,增加**功能...。

當(dāng)前,流計(jì)算系統(tǒng)每年也會(huì)有很大的性能提升和功能擴(kuò)展,但想要深入調(diào)優(yōu)及排錯(cuò),

還是要學(xué)習(xí)分布式系統(tǒng)的各個(gè)組件及原理,各種算子實(shí)現(xiàn)方法,性能優(yōu)化技術(shù)等知識(shí)。

以后,隨著系統(tǒng)的進(jìn)一步成熟和完善,開發(fā)者在性能優(yōu)化上的負(fù)擔(dān)會(huì)越來越低

無需了解底層技術(shù)實(shí)現(xiàn)細(xì)節(jié)和手動(dòng)配置各種參數(shù),就能享受性能和穩(wěn)定性的逐步提升。

? ?? ?分布式系統(tǒng)的一致性和可用性是一對(duì)矛盾。

流計(jì)算系統(tǒng)的數(shù)據(jù)準(zhǔn)確性和數(shù)據(jù)時(shí)效性也是一對(duì)矛盾。

應(yīng)用開發(fā)者都需要認(rèn)識(shí)到這些矛盾,并且知道自己在什么場(chǎng)景下該作何種取舍。

? ???本文希望通過剖析Flink Streaming SQL的三個(gè)具體例子:Union,Group By 和 Join ,

來依次闡述流式計(jì)算模型的核心概念:?What, Where, When, How?。

以便開發(fā)者加深對(duì)Streaming SQL的Window 和 Emit語義的理解,

從而能在數(shù)據(jù)準(zhǔn)確性和數(shù)據(jù)時(shí)效性上做適合業(yè)務(wù)場(chǎng)景的折中和取舍。

也順帶介紹Streaming SQL的底層實(shí)現(xiàn),以便于SQL任務(wù)的開發(fā)和調(diào)優(yōu)。

UNION

? ?? ? 通過這個(gè)例子來闡述Streaming SQL的底層實(shí)現(xiàn)和優(yōu)化手段:Logical Plan Optimization 和 Operator Chaining。

例子

改編自Flink?StreamSQLExample?。只在最外層加了一個(gè)Filter,以便觸發(fā)Filter下推及合并。

Source


SQL


Sink


運(yùn)行結(jié)果


轉(zhuǎn)換Table為Stream

? ?? ?? ???Flink 會(huì)把基于Table的Streaming SQL轉(zhuǎn)為基于Stream的底層算子,并同時(shí)完成Logical Plan及Operator Chaining等優(yōu)化

轉(zhuǎn)為邏輯計(jì)劃(Logical Plan)

上述UNION ALL SQL依據(jù)Relational Algebra轉(zhuǎn)換為下面的邏輯計(jì)劃:

SQL字段與邏輯計(jì)劃有如下的對(duì)應(yīng)關(guān)系:

優(yōu)化Logical Plan理論基礎(chǔ)冪等

數(shù)學(xué):??19 * 10 * 1??* 1 = 19 * 10 = 190

SQL:??SELECT * FROM (SELECT user, product FROM OrderA) =??SELECT user, product FROM OrderA

交換律

數(shù)學(xué):10 * 19 = 19 * 10 = 190

SQL:? ?tableA UNION ALL tableB??= tableB UNION ALL tableA

結(jié)合律

數(shù)學(xué):

(1900 * 0.5)* 0.2 = 1900 * (0.5 * 0.2) = 190

1900 * (1.0 + 0.01) = 1900 * 1.0 + 1900 * 0.01 = 1919

SQL:

SELECT * FROM (SELECT user, amount FROM OrderA) WHERE amount > 2?

SELECT * FROM (SELECT user, amount FROM OrderA WHERE amount > 2)

優(yōu)化過程

Flink的邏輯計(jì)劃優(yōu)化規(guī)則清單請(qǐng)見:?FlinkRuleSets

此Union All例子根據(jù)冪等,交換律和結(jié)合律來完成以下三步優(yōu)化:

消除冗余的Project

? ?? ?? ?? ?? ?? ?? ? 利用冪等特性,消除冗余的Project

下推Filter

? ?? ?? ?? ?? ?? ? 利用交換率和結(jié)合律特性,下推Filter。


合并Filter? ?

? ?? ?? ?? ?? ???利用結(jié)合律,合并Filter。

轉(zhuǎn)為物理計(jì)劃(Physical Plan)

轉(zhuǎn)換后的Flink的物理執(zhí)行計(jì)劃如下:

有Physical Plan優(yōu)化這一步驟,但對(duì)以上例子沒有效果,所以忽略。

? ?? ???這樣,加上Source和Sink,產(chǎn)生了如下的Stream Graph:

優(yōu)化Stream Graph

通過Task Chaining來減少上下游算子的數(shù)據(jù)傳輸消耗,從而提高性能。

Chaining判斷條件


Chaining結(jié)果??

? ?? ?? ?? ?按深度優(yōu)先的順序遍歷Stream Graph,最終產(chǎn)生5個(gè)Task任務(wù)。


GROUP BY

? ?? ? 將以滾動(dòng)窗口的GROUP BY來闡述Streaming SQL里的Window和Emit語義,

及其背后的Streaming的Where(Window)和When(Watermark和Trigger)的概念及關(guān)系。

例子?Source


Water Mark

? ?? ? 簡(jiǎn)單地把最新的EventTime減去Offset。

SQL

? ?? ? 按用戶加滾動(dòng)窗口進(jìn)行Group By。

Sink


轉(zhuǎn)換Table為Stream

? ?? ?因?yàn)閁nion All例子比較詳細(xì)地闡述了轉(zhuǎn)換規(guī)則,此處只討論特殊之處。

轉(zhuǎn)為邏輯計(jì)劃(Logical Plan)

優(yōu)化Logical Plan


GROUP BY優(yōu)化:把{“User + Window” -> SUM} 轉(zhuǎn)為 {User -> {Window -> SUM}}。

新的數(shù)據(jù)結(jié)構(gòu)確保同一User下所有Window都會(huì)被分配到同一個(gè)Operator,以便實(shí)現(xiàn)SessionWindow的Merge功能。


轉(zhuǎn)為物理計(jì)劃(Physical Plan)


優(yōu)化Stream Graph

? ?? ? 經(jīng)過Task Chaining優(yōu)化后,最終生成3個(gè)Task。

Streaming各基本概念之間的聯(lián)系

? ?? ?? ?此處希望以圖表的形式闡述各個(gè)概念之間的關(guān)系。

Window和EventTime

Flink支持三種Window類型:?Tumbling Windows?,?Sliding Windows?和?Session Windows

每個(gè)事件的EventTime決定事件會(huì)落到哪些TimeWindow。

但只有Window的第一個(gè)數(shù)據(jù)來到時(shí),Window才會(huì)被真正創(chuàng)建。

Window和WaterMark

? ?? ?? ???可以設(shè)置TimeWindow的AllowedLateness,從而使Window可以處理延時(shí)數(shù)據(jù)。

只有當(dāng)WaterMark超過TimeWindow.end + AllowedLateness時(shí),Window才會(huì)被銷毀。

TimeWindow,EventTime,ProcessTime 和 Watermark

? ?? ? 我們以WaterMark的推進(jìn)圖來闡述這四者之間的關(guān)系。

Window為TumbleWindow,窗口大小為1小時(shí),允許的數(shù)據(jù)延遲為1小時(shí)。

WaterMark和EventTime:

新數(shù)據(jù)的最新Eventime推進(jìn)WaterMark。

TimeWindow的生命周期:

? ?? ?以下三條數(shù)據(jù)的EventTime決定TimeWindow的狀態(tài)轉(zhuǎn)換。

數(shù)據(jù)1的Eventtime屬于Window[10:00, 11,00),因?yàn)閃indow不存在,所以創(chuàng)建此Window。

數(shù)據(jù)2的Eventime推進(jìn)WaterMark超過11:00(Window.end),所以觸發(fā)Pass End。

數(shù)據(jù)3的Eventime推進(jìn)WaterMark超過12:00(Window.end + allowedLateness), 所以關(guān)閉此Window。

TimeWindow的結(jié)果輸出:

? ?? ? 用戶可以通過Trigger來控制窗口結(jié)果的輸出,按窗口的狀態(tài)類型有以下三種Trigger。

Flink的Streaming SQL目前只支持PassEnd Trigger,且默認(rèn)AllowedLateness = 0。

? ?? ? 如果觸發(fā)頻率是Repeated,比如:每分鐘, 往下游輸出一次。那么這個(gè)時(shí)間只能是ProcessTime。

因?yàn)閃arkMark在不同場(chǎng)景下會(huì)有不同推進(jìn)速度,比如處理一小時(shí)的數(shù)據(jù),

可能只需十分鐘(重跑),一個(gè)小時(shí)(正常運(yùn)行)或 大于1小時(shí)(積壓)。

運(yùn)行結(jié)果

? ? 允許數(shù)據(jù)亂序是分布式系統(tǒng)能夠并發(fā)處理消息的前提。

當(dāng)前這個(gè)例子,數(shù)據(jù)如果亂序可以產(chǎn)生不同的輸出結(jié)果。

數(shù)據(jù)有序SUM算子接收到的數(shù)據(jù)

? ? 數(shù)據(jù)的Eventtime按升序排列。

WarterMark推進(jìn)圖

? ?每條新數(shù)據(jù)都能推進(jìn)Watermark。

結(jié)果輸出

? ???所有數(shù)據(jù)都被處理,沒有數(shù)據(jù)被丟棄。? ?? ?

數(shù)據(jù)亂序SUM算子接收到的數(shù)據(jù)

? ???第四條事件延時(shí)到來。

WarterMark推進(jìn)圖

? ?延遲的數(shù)據(jù)不會(huì)推進(jìn)WaterMark,且被丟棄。

輸出結(jié)果

沒有統(tǒng)計(jì)因延遲被丟棄的第四條事件。

JOIN

? ?將通過此例子來闡述Streaming的Retraction語義。

例子Source


SQL

廣告的展現(xiàn)LEFT JOIN 廣告的點(diǎn)擊來更新狀態(tài):showed 或 clicked。

Sink

? ?LEFT JOIN 可能會(huì)發(fā)送多條數(shù)據(jù)到下游。

因此必須轉(zhuǎn)為RetractionStream,讓下游算子有機(jī)會(huì)能撤銷前次輸出,從而只產(chǎn)生一條最終結(jié)果。

轉(zhuǎn)換Table為Stream

? ? RetractionStream沒有引入特殊變化。

轉(zhuǎn)為邏輯計(jì)劃(Logical Plan)


優(yōu)化Logical Plan??


轉(zhuǎn)為物理計(jì)劃(Physical Plan)

優(yōu)化Stream Graph


運(yùn)行結(jié)果

? ? 結(jié)果數(shù)據(jù)的首個(gè)字段為標(biāo)志位,True為正常數(shù)據(jù),F(xiàn)alse為Retract數(shù)據(jù)。


RetractJoin的執(zhí)行邏輯請(qǐng)見:NonWindowOuterJoin

? ? ImpressionId = 1這條數(shù)據(jù)的ReactJoin執(zhí)行過程。

1: Left流的Show消息先到:??Show("1", "show", "2018-10-10 10:10:10")

因?yàn)橹皼]有輸出,所以無需Retrcact。

只輸出:??(true, 1,2018-10-10 10:10:10,showed)

2: Right流的Click消息后到:Click("1", "click", "2018-10-10 10:13:11")

因?yàn)橹耙演敵鲞^結(jié)果,所以需要Retract,輸出:

(false, 1,2018-10-10 10:10:10,showed)

然后再輸出新結(jié)果,

(true, 1,2018-10-10 10:10:10,clicked)

? ?如上可知,Retraction流相當(dāng)于把一條UPDATE消息分別拆成一條DELETE和一條INSERT消息。

Retraction Stream

? ???雖然Retraction機(jī)制最多增加一倍的數(shù)據(jù)傳輸量,但能降低下游算子的存儲(chǔ)負(fù)擔(dān)和撤銷實(shí)現(xiàn)難度。

傳遞

我們?cè)贚eft Join的輸出流后加一個(gè)GROUP BY,以觀察Retraction流的后續(xù)算子的輸出。? ?


可能得到以下的GROUP BY輸出:


由此可見,Retraction具有傳遞性,RetractStream的后續(xù)的Stream也會(huì)是RetractionStream。

終止

? ?? ?最終需要支持Retraction的Sink來終止RetractionStream,比如:

最終輸出retractedResults:

存儲(chǔ)

只有外部存儲(chǔ)支持UPDATE或DELETE操作時(shí),才能實(shí)現(xiàn)RetractionSink。

常見的KV存儲(chǔ)和數(shù)據(jù)庫,如HBase,Mysql都可實(shí)現(xiàn)RetractionSink。

后續(xù)程序總能從這些存儲(chǔ)中讀取最新數(shù)據(jù),上游是否是Retraction流對(duì)用戶是透明的。

常見的消息隊(duì)列,如Kafka,只支持APPEND操作,則不能實(shí)現(xiàn)RetractionSink。

后續(xù)程序從這些消息隊(duì)列可能會(huì)讀到重復(fù)數(shù)據(jù),因此用戶需要在后續(xù)程序中處理重復(fù)數(shù)據(jù)。

總結(jié)

? ?? ?Flink Streaming SQL的實(shí)現(xiàn)從上到下共有三層:

1:Streaming SQL

2:Streaming?和?Window

3:Distributed Snapshots

其中“Streaming Data Model” 和 “Distributed Snapshot” 是Flink這個(gè)分布式流計(jì)算系統(tǒng)的核心架構(gòu)設(shè)計(jì)。

“Streaming Data Model”的What, Where, When, How?明確了流計(jì)算系統(tǒng)的表達(dá)能力及預(yù)期應(yīng)用場(chǎng)景。

“Distributed Snapshots”針對(duì)預(yù)期的應(yīng)用場(chǎng)景在數(shù)據(jù)準(zhǔn)確性,系統(tǒng)穩(wěn)定性和運(yùn)行性能上做了合適的折中。

? ? 本文通過實(shí)例闡述了流計(jì)算開發(fā)者需要了解的最上面兩層的概念和原理,

以便流計(jì)算開發(fā)者能在數(shù)據(jù)準(zhǔn)確性和數(shù)據(jù)時(shí)效性上做適合業(yè)務(wù)場(chǎng)景的折中和取舍。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    王知無閱讀 3,332評(píng)論 2 11
  • 一、基本特性 1、Flink簡(jiǎn)介 Flink 是分布式實(shí)時(shí)和離線計(jì)算引擎,用于在無界數(shù)據(jù)流和有界數(shù)據(jù)流上進(jìn)行有狀...
    Tu_jc閱讀 677評(píng)論 0 0
  • 基礎(chǔ)概念考察 一、 簡(jiǎn)單介紹一下 Flink Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有...
    Tim在路上閱讀 16,300評(píng)論 0 8
  • 基礎(chǔ)概念考察 一、 簡(jiǎn)單介紹一下 Flink Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有...
    Tim在路上閱讀 871評(píng)論 0 9
  • 概述 2019 年是大數(shù)據(jù)實(shí)時(shí)計(jì)算領(lǐng)域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (內(nèi)部的 Flin...
    Yobhel閱讀 1,910評(píng)論 0 33

友情鏈接更多精彩內(nèi)容