(轉(zhuǎn)自我的微信公眾號 KAMI說 )
Flink 是當(dāng)前最流行的分布式計(jì)算框架,其提供的 Table API 和 SQL 特性,使得開發(fā)者可以通過成熟,直觀、簡潔、表達(dá)力強(qiáng)的標(biāo)準(zhǔn) SQL 描述計(jì)算邏輯,大大減少其學(xué)習(xí)、開發(fā)和維護(hù)成本。
Flink SQL 支持面向無邊界輸入流的流處理。然而。聚合統(tǒng)計(jì)、窗口統(tǒng)計(jì)等計(jì)算是有狀態(tài)的。在流處理中,若這些狀態(tài)數(shù)據(jù)隨時(shí)間不斷堆積、不斷膨脹,會導(dǎo)致因?yàn)镺OM頻繁發(fā)生導(dǎo)致的作業(yè)崩潰、重啟。
從 Flink 1.6 版本開始,社區(qū)引入了狀態(tài) TTL(Time-To-Live) 特性。在通過Flink SQL 實(shí)現(xiàn)流處理時(shí),開發(fā)者可以為作業(yè) SQL 設(shè)置TTL,實(shí)現(xiàn)過期狀態(tài)的自動(dòng)清理,從而防止作業(yè)狀態(tài)無限膨脹。
然而,目前Flink SQL 只支持粗粒度的TTL設(shè)置,即一段 SQL 只能設(shè)置一個(gè)TTL。在一些常見的應(yīng)用場景中,這不足夠。
一
下面是一段計(jì)算DAU指標(biāo)的 SQL 代碼
SELECT
t_date
, COUNT(DISTINCT user_id) AS cnt_login
, COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new
FROM
(
SELECT
t_date
, user_id
, MIN(t_date) OVER (
PARTITION BY user_id
ORDER BY proctime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
) AS t_debut
FROM Login
) AS t
GROUP BY t_date
這段SQL的業(yè)務(wù)意義很直觀,就是計(jì)算實(shí)時(shí)每日登陸用戶和新增登陸用戶。
- 第一層的窗口統(tǒng)計(jì),計(jì)算每個(gè)用戶有史以來最小的登陸日期,即其新增日期
- 第二層的聚合統(tǒng)計(jì),按天進(jìn)行聚合,計(jì)算每天的登陸用戶數(shù)和新增用戶數(shù)
然而,在TTL的設(shè)置上,我們面臨兩難狀況:
- 不設(shè)置TTL。那么在第二層按天進(jìn)行的聚合統(tǒng)計(jì),COUNT DISTINCT計(jì)算帶來的狀態(tài)會隨著天數(shù)近乎線性增長,狀態(tài)會不斷膨脹,帶來OOM等一系列問題
- 設(shè)置TTL,例如 n 天未訪問的狀態(tài)自動(dòng)清理。那么在第一層的窗口統(tǒng)計(jì),n天不活躍的用戶的登陸日期狀態(tài)就可能被清除,導(dǎo)致其后續(xù)再次登錄時(shí)被誤判為新增
要解決這個(gè)矛盾,我們實(shí)際上需要 Flink SQL 提供 TTL 的細(xì)粒度配置,即為一段SQL設(shè)置多個(gè) TTL :
- 第一層的窗口統(tǒng)計(jì)不設(shè)置TTL,所有用戶的登陸日期狀態(tài)永久保留
- 第二層的聚合統(tǒng)計(jì)設(shè)置 n 天的 TTL,保證其狀態(tài)不會無限增長
下面給大家介紹,如何實(shí)現(xiàn)Flink SQL的細(xì)粒度 TTL 配置。
二
大家都知道,在 Flink 中,通過 Table API 和 SQL 實(shí)現(xiàn)的流處理邏輯,最終會翻譯為基于 DataStream API 實(shí)現(xiàn)的 DataStream 作業(yè),返回這個(gè)作業(yè)輸出的 DataStream (writeToSink 本質(zhì)上也是先得到 DataStream 作業(yè),再為其輸出 DataStream 加上一個(gè) DataStreamSink) 。
從一段 SQL 到 DataStream 作業(yè),其過程簡單描述如下:
- 在 TableEnvironment,即“表環(huán)境”,將數(shù)據(jù)源注冊為動(dòng)態(tài)表。例如,通過表環(huán)境的接口
registerDataStream, 作為源的DataStream,即數(shù)據(jù)流, 在表環(huán)境注冊為動(dòng)態(tài)表 - 通過表環(huán)境的接口
sqlQuery,將 SQL 構(gòu)造為 Table 對象 - 通過toAppendStream/toRetractedStream接口,即翻譯接口,將 Table 對象表達(dá)的作業(yè)邏輯,翻譯為 DataStream 作業(yè)。

在調(diào)用翻譯接口,將 Table 對象翻譯為 DataStream 作業(yè)時(shí),通過翻譯接口傳入的 TTL 配置,遞歸傳遞到各個(gè)計(jì)算節(jié)點(diǎn)的翻譯、構(gòu)造邏輯里,使得翻譯出來的 DataStream 算子的內(nèi)部狀態(tài)按照該 TTL 配置及時(shí)清理。
如果我們將上述計(jì)算DAU的SQL拆分成兩段,前者作為一個(gè)中間結(jié)果,提供給后者調(diào)用。
SQL1:
SELECT
t_date
, user_id
, MIN(t_date) OVER (
PARTITION BY user_id
ORDER BY proctime
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
) AS t_debut
FROM Login
SQL2:
SELECT
t_date
, COUNT(DISTINCT user_id) AS cnt_login
, COUNT(DISTINCT CASE WHEN t_date = t_debut THEN user_id END) AS cnt_new
FROM t_middle
GROUP BY t_date
從第一段 SQL 構(gòu)建對應(yīng) Table 對象,再調(diào)用翻譯接口,翻譯成 DataStream 作業(yè),其輸出數(shù)據(jù)流為 s_middle。其可以使用 Row 作為流數(shù)據(jù)類型,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得。顯然,這個(gè) DataStream 作業(yè)是原來完整DAU計(jì)算 DataStream 作業(yè)的一部分,其輸出為一個(gè)中間結(jié)果。
然后,將這個(gè)中間結(jié)果數(shù)據(jù)流 s_middle 在表環(huán)境重新注冊為動(dòng)態(tài)表 t_middle ,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得。這是第二段 SQL 需要調(diào)用的中間結(jié)果動(dòng)態(tài)表。
最后,從第二段 SQL 構(gòu)建對應(yīng) Table 對象,再調(diào)用翻譯接口,加上 n 天的 TTL 配置,翻譯成 DataStream 作業(yè)。顯然,這個(gè) DataStream 作業(yè)是原來完整DAU計(jì)算 DataStream 作業(yè)的另外一部分,其輸出為完整的 DAU 計(jì)算結(jié)果。
顯然,第一段 SQL 對應(yīng)的計(jì)算節(jié)點(diǎn),其狀態(tài) TTL 為永不過期。第二段 SQL 對應(yīng)的計(jì)算節(jié)點(diǎn),其狀態(tài) TTL 為 n 天后過期!TTL的細(xì)粒度配置實(shí)現(xiàn)!
三
歸納一下,如果要給 Flink SQL 設(shè)置細(xì)粒度TTL配置,我們只需要:
1. 將原來一段 SQL 代碼,按照不同的TTL,改寫為前后依賴的多個(gè)子 SQL。
2. 對于每個(gè)子 SQL,若不是最下游的,進(jìn)行“翻譯-重注冊”:
a. 加上對應(yīng)的 TTL 配置,翻譯為 DataStream 作業(yè),得到其輸出數(shù)據(jù)流,其中,流數(shù)據(jù)類型使用 Row,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得
b. 將中間結(jié)果數(shù)據(jù)流在表環(huán)境重新注冊,表名為下游子SQL調(diào)用的表名,各個(gè)字段的名稱和類型可以通過 Table 對象的 Schema獲得
3. 最后一個(gè)子 SQL,加上對應(yīng)的 TTL 配置,翻譯成 DataStream 作業(yè),其輸出數(shù)據(jù)流即為完整計(jì)算的輸出。

需要注意的是,處理時(shí)間(Process-Time)和事件時(shí)間(Event-Time)字段,對應(yīng)的數(shù)據(jù)類型在Flink Table API & SQL 的包 flink-table 中是私有的,在外部訪問會出錯(cuò)。
所以,在“翻譯-重注冊”過程中,需要特殊處理時(shí)間和事件時(shí)間字段:
- 通過 Table 對象的 Schema 找出時(shí)間特性字段,然后通過 Table.select 方法,剔除時(shí)間特性字段,再翻譯成 DataStream 作業(yè),得到中間結(jié)果數(shù)據(jù)流。
- 為中間結(jié)果數(shù)據(jù)流重新構(gòu)造時(shí)間特性字段,在重注冊為動(dòng)態(tài)表時(shí),按照原字段名重新聲明。
總結(jié)一下,整個(gè)細(xì)粒度TTL配置的實(shí)現(xiàn)過程實(shí)施:
- 按 TTL 的不同,將 SQL 拆解為多個(gè)子 SQL
- 對每個(gè)子 SQL 進(jìn)行“翻譯-重注冊”,包括時(shí)間特性字段的處理
- 最后一個(gè)子 SQL 完成翻譯,得到的 DataStream 作業(yè)的輸出便是完整計(jì)算邏輯的輸出
四
細(xì)心的讀者會發(fā)現(xiàn),如果中間的計(jì)算過程包含聚合計(jì)算,翻譯出的 DataStream 作業(yè)的輸出數(shù)據(jù)流只能是帶撤回標(biāo)志位的數(shù)據(jù)流(簡稱撤回流)DataStream<Tuple<Boolean, Row>>,無法直接重注冊到表環(huán)境中。上述的方法無法應(yīng)用于有多層 TTL 配置不一樣的聚合操作的 Flink SQL 中。
也就是說,要實(shí)現(xiàn)所有場景下的 Flink SQL 的細(xì)粒度 TTL 配置,我們必須實(shí)現(xiàn)撤回流注冊為動(dòng)態(tài)表這一特性。
本系列文的第二篇《Flink SQL 細(xì)粒度TTL配置的實(shí)現(xiàn)(二)》將給大家介紹具體的實(shí)現(xiàn)方法,需要對Flink Table API & SQL 的包 flink-table 的源碼進(jìn)行一點(diǎn)修改,盡情期待。
掃描下方二維碼關(guān)注我的公眾號“KAMI說”,有更多精彩原創(chuàng)內(nèi)容哦~
