反壓是什么
反壓是在實(shí)時(shí)數(shù)據(jù)處理中,數(shù)據(jù)管道某個(gè)節(jié)點(diǎn)上游產(chǎn)生數(shù)據(jù)的速度大于該節(jié)點(diǎn)處理數(shù)據(jù)速度的一種現(xiàn)象。反壓會(huì)從該節(jié)點(diǎn)向上游傳遞,一直到數(shù)據(jù)源,并降低數(shù)據(jù)源的攝入速度。這在流數(shù)據(jù)處理中非常常見,很多場景可以導(dǎo)致反壓的出現(xiàn),比如, GC導(dǎo)致短時(shí)間數(shù)據(jù)積壓,數(shù)據(jù)的波動(dòng)帶來的一段時(shí)間內(nèi)需處理的數(shù)據(jù)量大增,甚至是checkpoint本身都可能造成反壓。
反壓的原理

上面是一個(gè)Flink任務(wù)的流程圖,我們將反壓過程拆分成兩個(gè)部分:跨TaskManager的反壓過程和TaskManager內(nèi)的反壓過程,最后再介紹一下基于Credit的反壓過程。
跨TaskManager的反壓過程
跨TaskManager的反壓是怎么向上游傳播的呢?在這之前我們先了解一下Flink中TaskManager 之間數(shù)據(jù)的網(wǎng)絡(luò)傳輸過程。

從上圖可知,TaskManager A給TaskManager B發(fā)送數(shù)據(jù),TaskManager A做為Producer,TaskManager B做為Consumer。Producer處理完的數(shù)據(jù)首先輸出到Producer對應(yīng)的NetWork Buffer中,NetWorkBuffer就是圖中的ResultSubPartition和InputChannel。
ResultSubPartition和InputChannel都向LocalBufferPool申請Buffer空間,然后LocalBufferPool再向NetWork BufferPool申請內(nèi)存空間。這里,NetWork BufferPool是TaskManager內(nèi)所有Task共享的BufferPool,TaskManager初始化時(shí)就會(huì)向堆外內(nèi)存申請NetWork BufferPool。LocalBufferPool是每個(gè)Task自己的BufferPool,假如一個(gè)TaskManager內(nèi)運(yùn)行著5個(gè)Task,那么就會(huì)有5個(gè)LocalBufferPool,但TaskManager內(nèi)永遠(yuǎn)只有一個(gè)NetWork BufferPool。
Netty的Buffer也是初始化時(shí)直接向堆外內(nèi)存申請內(nèi)存空間。雖然可以申請,但是必須明白內(nèi)存申請肯定是有限制的,不可能無限制的申請,我們在啟動(dòng)任務(wù)時(shí)可以指定該任務(wù)最多可能申請多大的內(nèi)存空間用于NetWorkBuffer。
經(jīng)過netty的buffer后,數(shù)據(jù)又會(huì)被拷貝到Socket的Send Buffer中,最后通過Socket發(fā)送網(wǎng)絡(luò)請求,把Send Buffer中的數(shù)據(jù)發(fā)送到Consumer端的 Receive Buffer,并依圖中所示,在Consumer端向上傳遞直到Consumer Operator。
假設(shè)Producer端生產(chǎn)速率為2,Consumer端消費(fèi)速率為1。那么一段時(shí)間后消費(fèi)端(Task B)的Network buffer會(huì)打滿,即使向LocalBufferPool和Network BufferPool申請可用的資源,也會(huì)逐漸被用完。由于Network buffer已滿,Netty也就不會(huì)從receive buffer讀數(shù)據(jù)了,也即socket到netty的數(shù)據(jù)傳輸會(huì)阻塞。這樣receive buffer很快會(huì)用完,TCP的Socket通信有動(dòng)態(tài)反饋的流控機(jī)制,會(huì)把容量為0的消息反饋給上游發(fā)送端,所以上游的Socket就不會(huì)往下游再發(fā)送數(shù)據(jù)了。Producer端從send buffer向上傳遞過程類似,直到network buffer無空間可用,RecordWriter輸出就被wait,Task A不再生產(chǎn)數(shù)據(jù)。
TaskManager內(nèi)部的反壓過程

由于operator下游的buffer耗盡,此時(shí)Record Writer就會(huì)被阻塞,又由于Record Reader、Operator、Record Writer 都屬于同一個(gè)線程,所以Record Reader也會(huì)被阻塞。這時(shí)上游數(shù)據(jù)還在不斷寫入,不多久network buffer就會(huì)被用完,然后跟前面類似,經(jīng)是netty和socket,壓力就會(huì)向上游傳遞。
基于Credit的反壓過程

還以之前的圖為例,在每一次ResultSubPartition向InputChannel發(fā)送消息時(shí),都會(huì)發(fā)送一個(gè)backlog size告訴下游準(zhǔn)備發(fā)送多少消息,下游會(huì)計(jì)算 Buffer空間大小去接收消息,如果有充足的Buffer就返還給上游一個(gè)Credit告知可以發(fā)送消息的大?。▓D中ResultSubPartition和InputChannel之間的虛線表示最終還是需要通過Netty和Socket去通信,并不是直接通信)。
相同的場景,上游生產(chǎn)的速率為2,下游消費(fèi)的速率為1,這樣InputChannel中的內(nèi)存很快就會(huì)耗盡,通信過程中就會(huì)返回credit=0給ResultSubPartition告知上游,下游已經(jīng)沒有空間了,上游也就不再繼續(xù)發(fā)送數(shù)據(jù)給netty,直到下游消費(fèi)給InputChannel騰出空間了,數(shù)據(jù)才會(huì)繼續(xù)發(fā)送。
基于credit的反壓過程,效率比之前要高,因?yàn)橹灰掠蜪nputChannel空間耗盡,就能通過credit讓上游ResultSubPartition感知到,不需要在通過netty和socket層來一層一層的傳遞。另外,它還解決了由于一個(gè)Task反壓導(dǎo)致 TaskManager和TaskManager之間的Socket阻塞的問題。
反壓的影響
一般短時(shí)間的反壓并不會(huì)對實(shí)時(shí)任務(wù)太大影響,如果是持續(xù)性的反壓就需要注意了,意味著任務(wù)本身存在瓶頸,可能導(dǎo)致潛在的不穩(wěn)定或者數(shù)據(jù)延遲,尤其是數(shù)據(jù)量較大的場景下。
反壓的影響主要體現(xiàn)在Flink中checkpoint過程上,主要影響兩個(gè)方面:
- 反壓出現(xiàn)時(shí),相關(guān)數(shù)據(jù)流阻塞,會(huì)使數(shù)據(jù)管道中數(shù)據(jù)處理速度變慢,按正常數(shù)據(jù)量間隔插入的barrier也會(huì)被阻塞,進(jìn)而拉長,checkpoint時(shí)間,可能導(dǎo)致checkpoint超時(shí),甚至失敗。
- 在對齊checkpoint場景中,算子接收多個(gè)管道輸入,輸入較快的管道數(shù)據(jù)state會(huì)被緩存起來,等待輸入較慢的管道數(shù)據(jù)barrier對齊,這樣由于輸入較快管道數(shù)據(jù)沒被處理,一直積壓可能導(dǎo)致OOM或者內(nèi)存資源耗盡的不穩(wěn)定問題。
如何定位反壓
定位造成反壓問題的節(jié)點(diǎn),通常有兩種途徑。
- 壓監(jiān)控面板;
- Flink Task Metrics
前者簡單易上手,適合簡單分析,后者信息豐富,但需要更多背景知識(shí),適合系統(tǒng)分析。
反壓監(jiān)控面板
Flink Web UI 的反壓監(jiān)控提供了 SubTask 級別的反壓監(jiān)控,通過周期性對 Task線程的棧信息采樣,得到線程被阻塞在請求 Buffer(意味著被下游隊(duì)列阻塞)的比例來判斷該節(jié)點(diǎn)是否處于反壓狀態(tài)。默認(rèn)配置如下:
- OK: 0 <= Ratio <= 0.10
- LOW: 0.10 < Ratio <= 0.5
- HIGH: 0.5 < Ratio <= 1
值得注意的是,反壓的根源節(jié)點(diǎn)并不一定會(huì)在反壓監(jiān)控面板體現(xiàn)出高反壓,因?yàn)榉磯好姘灞O(jiān)控的是發(fā)送端,如果某個(gè)節(jié)點(diǎn)是性能瓶頸并不會(huì)導(dǎo)致它本身出現(xiàn)高反壓,而是導(dǎo)致它的上游出現(xiàn)高反壓??傮w來看,如果我們找到第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn),那么反壓根源要么是就這個(gè)節(jié)點(diǎn),要么是它緊接著的下游節(jié)點(diǎn)。

Task Metrics
由于各指標(biāo)在不同版本里,具體含義并不一致,在這里就先不列了,感興趣的可以參考官網(wǎng)的介紹
分析反壓的具體原因
根據(jù)上面介紹的方法,可以定位到反壓問題的節(jié)點(diǎn),也即數(shù)據(jù)處理的瓶頸,然后可以分析造成反壓的原因。下面列出了一些從基本到復(fù)雜的原因。也要注意到,反壓也可能是短暫的,比如,短時(shí)負(fù)載過大,檢查點(diǎn)生成或者任務(wù)重啟時(shí)處理積壓的數(shù)據(jù)等,都會(huì)造成反壓,但這些場景通??梢院雎?。另外需要注意的是,分析和解決反壓問題的過程也會(huì)受瓶頸本身非連續(xù)性的影響。
系統(tǒng)資源
首先,需要檢查機(jī)器的資源使用情況,像CPU、網(wǎng)絡(luò)、磁盤I/O等。如果一些資源負(fù)載過高,就可以進(jìn)行下面的處理:
1、嘗試優(yōu)化代碼;
2、針對特定資源對Flink進(jìn)行調(diào)優(yōu);
3、增加并發(fā)或者增加機(jī)器
垃圾回收
性能問題常常源自過長的GC時(shí)長。這種情況下可以通過打印GC日志,或者使用一些內(nèi)存/GC分析工具來定位問題。
CPU/線程瓶頸
有時(shí)候,如果一個(gè)或者一些線程造成CPU瓶頸,而此時(shí),整個(gè)機(jī)器的CPU使用率還相對較低,這種CPU瓶頸不容易發(fā)現(xiàn)。比如,如果一個(gè)48核的CPU,有一個(gè)線程成為瓶頸,這時(shí)CPU的使用率只有2%。這種情況下可以考慮使用代碼分析工具來定位熱點(diǎn)線程。
線程爭用
跟上面CPU/線程瓶頸問題類似,一個(gè)子任務(wù)可能由于對共享資源的高線程爭用成為瓶頸。同樣的,CPU分析工具對于探查這類問題也很有用。
負(fù)載不均
如果瓶頸是數(shù)據(jù)傾斜造成的,可以嘗試刪除傾斜數(shù)據(jù),或者通過改變數(shù)據(jù)分區(qū)策略將造成數(shù)據(jù)的key值拆分,或者也可以進(jìn)行本地聚合/預(yù)聚合。
上面幾項(xiàng)并不是全部場景。通常,解決數(shù)據(jù)處理過程中的瓶頸問題,進(jìn)而消除反壓,首先需要定位問題節(jié)點(diǎn)(瓶頸所在),然后找到原因,尋找原因,一般從檢查資源過載開始。