一個(gè)作業(yè),多個(gè)TTL-Flink SQL 細(xì)粒度TTL配置的實(shí)現(xiàn)(一)

(轉(zhuǎn)自我的微信公眾號 KAMI說 )

Flink 是當(dāng)前最流行的分布式計(jì)算框架,其提供的 Table API 和 SQL 特性,使得開發(fā)者可以通過成熟,直觀、簡潔、表達(dá)力強(qiáng)的標(biāo)準(zhǔn) SQL 描述計(jì)算邏輯,大大減少其學(xué)習(xí)、開發(fā)和維護(hù)成本。

Flink SQL 支持面向無邊界輸入流的流處理。然而。聚合統(tǒng)計(jì)、窗口統(tǒng)計(jì)等計(jì)算是有狀態(tài)的。在流處理中,若這些狀態(tài)數(shù)據(jù)隨時(shí)間不斷堆積、不斷膨脹,會導(dǎo)致因?yàn)镺OM頻繁發(fā)生導(dǎo)致的作業(yè)崩潰、重啟。

從 Flink 1.6 版本開始,社區(qū)引入了狀態(tài) TTL(Time-To-Live) 特性。在通過Flink SQL 實(shí)現(xiàn)流處理時(shí),開發(fā)者可以為作業(yè) SQL 設(shè)置TTL,實(shí)現(xiàn)過期狀態(tài)的自動(dòng)清理,從而防止作業(yè)狀態(tài)無限膨脹。

然而,目前Flink SQL 只支持粗粒度的TTL設(shè)置,即一段 SQL 只能設(shè)置一個(gè)TTL。在一些常見的應(yīng)用場景中,這不足夠。

下面是一段計(jì)算DAU指標(biāo)的 SQL 代碼

SELECT
     t_date
   , COUNT(DISTINCT user_id) AS cnt_login
   , COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new
 FROM 
   (
     SELECT
        t_date
      , user_id
      , MIN(t_date) OVER (
              PARTITION BY user_id
              ORDER BY proctime
              ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
        ) AS t_debut
     FROM Login
 ) AS t
 GROUP BY t_date

這段SQL的業(yè)務(wù)意義很直觀,就是計(jì)算實(shí)時(shí)每日登陸用戶和新增登陸用戶。

  • 第一層的窗口統(tǒng)計(jì),計(jì)算每個(gè)用戶有史以來最小的登陸日期,即其新增日期
  • 第二層的聚合統(tǒng)計(jì),按天進(jìn)行聚合,計(jì)算每天的登陸用戶數(shù)和新增用戶數(shù)

然而,在TTL的設(shè)置上,我們面臨兩難狀況:

  • 不設(shè)置TTL。那么在第二層按天進(jìn)行的聚合統(tǒng)計(jì),COUNT DISTINCT計(jì)算帶來的狀態(tài)會隨著天數(shù)近乎線性增長,狀態(tài)會不斷膨脹,帶來OOM等一系列問題
  • 設(shè)置TTL,例如 n 天未訪問的狀態(tài)自動(dòng)清理。那么在第一層的窗口統(tǒng)計(jì),n天不活躍的用戶的登陸日期狀態(tài)就可能被清除,導(dǎo)致其后續(xù)再次登錄時(shí)被誤判為新增

要解決這個(gè)矛盾,我們實(shí)際上需要 Flink SQL 提供 TTL 的細(xì)粒度配置,即為一段SQL設(shè)置多個(gè) TTL :

  • 第一層的窗口統(tǒng)計(jì)不設(shè)置TTL,所有用戶的登陸日期狀態(tài)永久保留
  • 第二層的聚合統(tǒng)計(jì)設(shè)置 n 天的 TTL,保證其狀態(tài)不會無限增長

下面給大家介紹,如何實(shí)現(xiàn)Flink SQL的細(xì)粒度 TTL 配置。

大家都知道,在 Flink 中,通過 Table API 和 SQL 實(shí)現(xiàn)的流處理邏輯,最終會翻譯為基于 DataStream API 實(shí)現(xiàn)的 DataStream 作業(yè),返回這個(gè)作業(yè)輸出的 DataStream (writeToSink 本質(zhì)上也是先得到 DataStream 作業(yè),再為其輸出 DataStream 加上一個(gè) DataStreamSink) 。
從一段 SQL 到 DataStream 作業(yè),其過程簡單描述如下:

  1. 在 TableEnvironment,即“表環(huán)境”,將數(shù)據(jù)源注冊為動(dòng)態(tài)表。例如,通過表環(huán)境的接口registerDataStream, 作為源的DataStream,即數(shù)據(jù)流, 在表環(huán)境注冊為動(dòng)態(tài)表
  2. 通過表環(huán)境的接口 sqlQuery,將 SQL 構(gòu)造為 Table 對象
  3. 通過toAppendStream/toRetractedStream接口,即翻譯接口,將 Table 對象表達(dá)的作業(yè)邏輯,翻譯為 DataStream 作業(yè)。
從SQL到 DataStream 作業(yè)

在調(diào)用翻譯接口,將 Table 對象翻譯為 DataStream 作業(yè)時(shí),通過翻譯接口傳入的 TTL 配置,遞歸傳遞到各個(gè)計(jì)算節(jié)點(diǎn)的翻譯、構(gòu)造邏輯里,使得翻譯出來的 DataStream 算子的內(nèi)部狀態(tài)按照該 TTL 配置及時(shí)清理。

如果我們將上述計(jì)算DAU的SQL拆分成兩段,前者作為一個(gè)中間結(jié)果,提供給后者調(diào)用。

SQL1:

    SELECT
       t_date
     , user_id
     , MIN(t_date) OVER (
             PARTITION BY user_id
             ORDER BY proctime
             ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
       ) AS t_debut
    FROM Login

SQL2:

  SELECT
      t_date
    , COUNT(DISTINCT user_id) AS cnt_login
    , COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new
  FROM t_middle
  GROUP BY t_date

從第一段 SQL 構(gòu)建對應(yīng) Table 對象,再調(diào)用翻譯接口,翻譯成 DataStream 作業(yè),其輸出數(shù)據(jù)流為 s_middle。其可以使用 Row 作為流數(shù)據(jù)類型,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得。顯然,這個(gè) DataStream 作業(yè)是原來完整DAU計(jì)算 DataStream 作業(yè)的一部分,其輸出為一個(gè)中間結(jié)果。

然后,將這個(gè)中間結(jié)果數(shù)據(jù)流 s_middle 在表環(huán)境重新注冊為動(dòng)態(tài)表 t_middle ,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得。這是第二段 SQL 需要調(diào)用的中間結(jié)果動(dòng)態(tài)表。

最后,從第二段 SQL 構(gòu)建對應(yīng) Table 對象,再調(diào)用翻譯接口,加上 n 天的 TTL 配置,翻譯成 DataStream 作業(yè)。顯然,這個(gè) DataStream 作業(yè)是原來完整DAU計(jì)算 DataStream 作業(yè)的另外一部分,其輸出為完整的 DAU 計(jì)算結(jié)果。

顯然,第一段 SQL 對應(yīng)的計(jì)算節(jié)點(diǎn),其狀態(tài) TTL 為永不過期。第二段 SQL 對應(yīng)的計(jì)算節(jié)點(diǎn),其狀態(tài) TTL 為 n 天后過期!TTL的細(xì)粒度配置實(shí)現(xiàn)!

歸納一下,如果要給 Flink SQL 設(shè)置細(xì)粒度TTL配置,我們只需要:
1. 將原來一段 SQL 代碼,按照不同的TTL,改寫為前后依賴的多個(gè)子 SQL。
2. 對于每個(gè)子 SQL,若不是最下游的,進(jìn)行“翻譯-重注冊”:
a. 加上對應(yīng)的 TTL 配置,翻譯為 DataStream 作業(yè),得到其輸出數(shù)據(jù)流,其中,流數(shù)據(jù)類型使用 Row,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得
b. 將中間結(jié)果數(shù)據(jù)流在表環(huán)境重新注冊,表名為下游子SQL調(diào)用的表名,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得
3. 最后一個(gè)子 SQL,加上對應(yīng)的 TTL 配置,翻譯成 DataStream 作業(yè),其輸出數(shù)據(jù)流即為完整計(jì)算的輸出。


Flink SQL 細(xì)粒度TTL配置的實(shí)現(xiàn)

需要注意的是,處理時(shí)間(Process-Time)和事件時(shí)間(Event-Time)字段,對應(yīng)的數(shù)據(jù)類型在Flink Table API & SQL 的包 flink-table 中是私有的,在外部訪問會出錯(cuò)。
所以,在“翻譯-重注冊”過程中,需要特殊處理時(shí)間和事件時(shí)間字段:

  1. 通過 Table 對象的 Schema 找出時(shí)間特性字段,然后通過 Table.select 方法,剔除時(shí)間特性字段,再翻譯成 DataStream 作業(yè),得到中間結(jié)果數(shù)據(jù)流。
  2. 為中間結(jié)果數(shù)據(jù)流重新構(gòu)造時(shí)間特性字段,在重注冊為動(dòng)態(tài)表時(shí),按照原字段名重新聲明。

總結(jié)一下,整個(gè)細(xì)粒度TTL配置的實(shí)現(xiàn)過程實(shí)施:

  1. 按 TTL 的不同,將 SQL 拆解為多個(gè)子 SQL
  2. 對每個(gè)子 SQL 進(jìn)行“翻譯-重注冊”,包括時(shí)間特性字段的處理
  3. 最后一個(gè)子 SQL 完成翻譯,得到的 DataStream 作業(yè)的輸出便是完整計(jì)算邏輯的輸出

細(xì)心的讀者會發(fā)現(xiàn),如果中間的計(jì)算過程包含聚合計(jì)算,翻譯出的 DataStream 作業(yè)的輸出數(shù)據(jù)流只能是帶撤回標(biāo)志位的數(shù)據(jù)流(簡稱撤回流)DataStream<Tuple<Boolean, Row>>,無法直接重注冊到表環(huán)境中。上述的方法無法應(yīng)用于有多層 TTL 配置不一樣的聚合操作的 Flink SQL 中。

也就是說,要實(shí)現(xiàn)所有場景下的 Flink SQL 的細(xì)粒度 TTL 配置,我們必須實(shí)現(xiàn)撤回流注冊為動(dòng)態(tài)表這一特性。

本系列文的第二篇《Flink SQL 細(xì)粒度TTL配置的實(shí)現(xiàn)(二)》將給大家介紹具體的實(shí)現(xiàn)方法,需要對Flink Table API & SQL 的包 flink-table 的源碼進(jìn)行一點(diǎn)修改,盡情期待。

掃描下方二維碼關(guān)注我的公眾號“KAMI說”,有更多精彩原創(chuàng)內(nèi)容哦~

KAMI說
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容