Flink SQL 性能優(yōu)化實(shí)戰(zhàn)

緣起


最近我們組在大規(guī)模上線Flink SQL作業(yè)。首先,在進(jìn)行跑批量初始化完歷史數(shù)據(jù)后,剩下的就是消費(fèi)Kafka歷史數(shù)據(jù)進(jìn)行追數(shù)了。但是發(fā)現(xiàn)某些作業(yè)的追數(shù)過(guò)程十分緩慢,要運(yùn)行一晚上甚至三四天才能追上最新數(shù)據(jù)。由于是實(shí)時(shí)數(shù)倉(cāng)指標(biāo)計(jì)算上線初期,經(jīng)常驗(yàn)證作業(yè)如果有問(wèn)題就得重蹈覆轍重新追數(shù),效率很低,于是我開(kāi)始分析Flink SQL的優(yōu)化。

問(wèn)題


insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a

上面這個(gè)作業(yè)的簡(jiǎn)化版SQL,主要就是做一個(gè)分組聚合:

  1. 從tableA分組聚合出結(jié)果插入tableB

  2. tableA的聯(lián)合主鍵是:a,b(但是a的離散度已經(jīng)很高了)

  3. tableA的Flink表類型為upset-kafka

  4. tableB的Flink表類型為HBase

初步分析


這個(gè)作業(yè)跑在集群上的job graph如下:

image.png

可以看到有三個(gè)vertex:

  1. 第一個(gè)是TableSourceScan

  2. 第二個(gè)是ChangelogNormalize

  3. 第三個(gè)是GroupAggregate

TableSourceScan接入tableA表的upsert-kafka流;

ChangelogNormalize對(duì)upset-kafka進(jìn)行撤回語(yǔ)義的解析;

GroupAggregate對(duì)撤回流進(jìn)行分組聚合,然后寫(xiě)入tableB的HBase;

優(yōu)化思路1:local/global agg


agg分類:

  • group agg
select count(a) from t group by b
  • over agg
select count(a) over (partition by b order by c) from t
  • window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b

local/global agg:

image.png

核心思想與hadoop的combiner是一致的,就是在mapreduce的過(guò)程中,在map階段就做一個(gè)預(yù)聚合,即combine操作。

[圖片上傳失敗...(image-c0ad24-1650075387085)]

帶來(lái)的收益是:減少網(wǎng)絡(luò)shuffle數(shù)據(jù),提升計(jì)算引擎的性能。

前提條件:

  1. agg的所有agg function都是mergeable(實(shí)現(xiàn)merge方法)

  2. table.optimizer.agg-phase-strategy為AUTO或TWO_PHASE

  3. Stream下,minibatch開(kāi)啟;Batch下,AUTO會(huì)根據(jù)cost選擇

解釋說(shuō)明:

mergeable其實(shí)就是能用分治法解決的計(jì)算問(wèn)題,例如sum、count等,而avg就不能用分治法先計(jì)算部分元素的avg,再計(jì)算最終avg了,結(jié)果有時(shí)候會(huì)出錯(cuò)。

table.optimizer.agg-phase-strategy:默認(rèn)為AUTO,意思是引擎盡量做預(yù)聚合;TWO_PHASE表示所有聚合操作都做預(yù)聚合;ONE_PHASE表示所有聚合都不做預(yù)聚合。

minibatch:即開(kāi)啟微批模式。主要有三個(gè)參數(shù):

  • table.exec.mini-batch.enabled:是否開(kāi)啟,默認(rèn)不開(kāi)啟
  • table.exec.mini-batch.size:微批的record buffer大小
  • table.exec.mini-batch.allow-latency:微批的time buffer大小

minibatch的本質(zhì)就是平衡實(shí)時(shí)性和吞吐量的刻度尺。

所以,local/global agg一共需要三個(gè)參數(shù)控制。

驗(yàn)證


經(jīng)過(guò)對(duì)比驗(yàn)證,在這個(gè)SQL場(chǎng)景下的效率提升很小。

local/global agg降低了第二個(gè)vertex即ChangelogNormalize的sent records的數(shù)據(jù)量,而并沒(méi)有使得第一個(gè)vertex的數(shù)據(jù)處理效率有顯著提升。

所以,這個(gè)作業(yè)的瓶頸并不在vertex間, 而在于第一個(gè)vertex的處理數(shù)據(jù)效率。

優(yōu)化思路二:調(diào)大并行度


這個(gè)思路的關(guān)鍵在于source upsert-kafka的分區(qū)數(shù),這是制約吞吐量的瓶頸。因?yàn)樵趗psert-kafka中,每個(gè)partition最多被一個(gè)Flink線程讀取。

增加了10倍的并行度,source分區(qū)也增加10倍后,作業(yè)周轉(zhuǎn)時(shí)間縮短了將近一半。

優(yōu)化思路三:RocksDB性能調(diào)優(yōu)


仔細(xì)分析這個(gè)SQL作業(yè),是對(duì)一個(gè)聯(lián)合主鍵的字段做group by,那么state一定會(huì)非常大。

經(jīng)過(guò)在對(duì)這個(gè)表在數(shù)倉(cāng)中的數(shù)據(jù)進(jìn)行分析,發(fā)現(xiàn)這個(gè)字段的離散度幾乎接近于主鍵的離散度。

而進(jìn)行g(shù)roup by必然要根據(jù)每一條upsert kafka的數(shù)據(jù)去查驗(yàn)在flink statebackend中物化的source table中該字段值的分布情況,這應(yīng)該是才是瓶頸所在!

沿著這個(gè)思路,開(kāi)始分析Flink的statebackend機(jī)制。

這里我們簡(jiǎn)單回顧一下Flink statebackend(后面再做專題總結(jié)):

由 Flink 管理的 keyed state 是一種分片的鍵/值存儲(chǔ),每個(gè) keyed state 的工作副本都保存在負(fù)責(zé)該鍵的 taskmanager 本地中。另外,Operator state 也保存在機(jī)器節(jié)點(diǎn)本地。Flink 定期獲取所有狀態(tài)的快照,并將這些快照復(fù)制到持久化的位置,例如分布式文件系統(tǒng)。

如果發(fā)生故障,F(xiàn)link 可以恢復(fù)應(yīng)用程序的完整狀態(tài)并繼續(xù)處理,就如同沒(méi)有出現(xiàn)過(guò)異常。

Flink 管理的狀態(tài)存儲(chǔ)在 state backend 中。Flink 有兩種 state backend 的實(shí)現(xiàn) – 一種基于 RocksDB 內(nèi)嵌 key/value 存儲(chǔ)將其工作狀態(tài)保存在磁盤(pán)上的,另一種基于堆的 state backend,將其工作狀態(tài)保存在 Java 的堆內(nèi)存中。這種基于堆的 state backend 有兩種類型:FsStateBackend,將其狀態(tài)快照持久化到分布式文件系統(tǒng);MemoryStateBackend,它使用 JobManager 的堆保存狀態(tài)快照。

image.png

當(dāng)使用基于堆的 state backend 保存狀態(tài)時(shí),訪問(wèn)和更新涉及在堆上讀寫(xiě)對(duì)象。但是對(duì)于保存在 RocksDBStateBackend 中的對(duì)象,訪問(wèn)和更新涉及序列化和反序列化,所以會(huì)有更大的開(kāi)銷。但 RocksDB 的狀態(tài)量?jī)H受本地磁盤(pán)大小的限制。還要注意,只有 RocksDBStateBackend 能夠進(jìn)行增量快照,這對(duì)于具有大量變化緩慢狀態(tài)的應(yīng)用程序來(lái)說(shuō)是大有裨益的。

所有這些 state backends 都能夠異步執(zhí)行快照,這意味著它們可以在不妨礙正在進(jìn)行的流處理的情況下執(zhí)行快照。

我們的線上一般采用的是RocksDB作為狀態(tài)后端,checkpoint dir采用hdfs文件系統(tǒng)。其實(shí)我個(gè)人覺(jué)得這個(gè)應(yīng)該根據(jù)作業(yè)的特性進(jìn)行選擇,根據(jù)我個(gè)人的經(jīng)驗(yàn)以及知識(shí)沉淀,選擇的主要因素是作業(yè)的state大小及對(duì)處理數(shù)據(jù)性能的要求:

  • RocksDBStateBackend可以突破內(nèi)存的限制,rocksDB的數(shù)據(jù)邏輯結(jié)構(gòu)和redis相似,但是數(shù)據(jù)的物理存儲(chǔ)結(jié)構(gòu)又和hbase相似,繼承自levelDB的LSM樹(shù)思想,缺點(diǎn)是性能太低

  • 而FsStateBackend是在做snapshot的時(shí)候才將內(nèi)存的state持久化到遠(yuǎn)端,速度接近于內(nèi)存狀態(tài)

  • MemoryStateBackend是純內(nèi)存的,一般只用做調(diào)試。

但是由于這個(gè)大狀態(tài)作業(yè)追數(shù)速度實(shí)在太慢,我甚至想過(guò):

在追數(shù)的時(shí)候用FsStateBackend,并配置大內(nèi)存,且把managed memory調(diào)成0,同時(shí)將ck的周期設(shè)置的很大,基本上不做ck,追上后savepoint。再把狀態(tài)后端換成RocksDB,并且從FSSatebackend的savepoint處恢復(fù),但是發(fā)現(xiàn)1.13才支持savepoint切換statebackend類型。

只剩下調(diào)優(yōu)RocksDB一條路了。根據(jù)之前對(duì)HBase的LSM原理的理解,進(jìn)行知識(shí)遷移,馬上對(duì)RocksDB有了一定的認(rèn)識(shí)。在HBase中調(diào)優(yōu)效果最明顯無(wú)乎:

blockcache讀緩存、memStore寫(xiě)緩存、增加布隆過(guò)濾器、提升compact效率

沿著這個(gè)思路,再查閱了一番RocksDB資料后,決定先對(duì)如下參數(shù)進(jìn)行調(diào)優(yōu):

  • state.backend.rocksdb.block.cache-size

  • state.backend.rocksdb.block.blocksize

Block 塊是 RocksDB 保存在磁盤(pán)中的 SST 文件的基本單位,它包含了一系列列有序的 Key 和 Value 集合,可以設(shè)置固定的大小。

image.png

但是,通過(guò)增加 Block Size,會(huì)顯著增加讀放大(Read Amplification)效應(yīng),令讀取數(shù)據(jù)時(shí),吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小沒(méi)有變,就會(huì)?大減少 Cache 中可存放的 Block 數(shù)。如果 Cache 中還存處理索引和過(guò)濾?等內(nèi)容,那么可放置的數(shù)據(jù)塊數(shù)目就會(huì)更少,可能需要更多的磁盤(pán) IO 操作,找到數(shù)據(jù)就更更慢了,此時(shí)讀取性能會(huì)大幅下降。反之,如果減小BlockSize,會(huì)讓讀的性能有不少提升,但是寫(xiě)性能會(huì)下降,?而且對(duì) SSD 壽命也不利。

因此我的調(diào)優(yōu)經(jīng)驗(yàn)是,如果需要增加 Block Size 的大小來(lái)提升讀寫(xiě)性能,請(qǐng)務(wù)必一并增加 Block Cache Size 的大小,這樣才可以取得比較好的讀寫(xiě)性能。Block Cache,緩存清除算法?用的是 LRU(Least Recently Used)。

驗(yàn)證


測(cè)試對(duì)比后發(fā)現(xiàn),原本半天左右完成的作業(yè)只需要一到兩個(gè)小時(shí)即可追上數(shù)據(jù)!

感悟


性能調(diào)優(yōu)就如同把脈治病,關(guān)鍵在于對(duì)癥下藥。

前期,要分析當(dāng)前場(chǎng)景下真正制約性能的瓶頸所在,后期,在癥結(jié)處用效果最明顯的方式處理癥結(jié)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容