2017年柏林Flink Forward大會(huì)上Robert Metger的"Keep It Going: How to Reiably and Efficiently Operate Apache Flink"的演講很受歡迎。Robert的其中一個(gè)主題演講涉及到了如何估算Flink集群規(guī)模。Flink Forward大會(huì)的觀眾們認(rèn)為這個(gè)計(jì)算方法對(duì)他們很有用,因此我們把他的演講主題轉(zhuǎn)變成這篇博客。
Flink社區(qū)上經(jīng)常被問(wèn)起的一個(gè)問(wèn)題是當(dāng)從開(kāi)發(fā)轉(zhuǎn)到線上時(shí),如何估算集群規(guī)模大小。當(dāng)然,最準(zhǔn)確的的答案是根據(jù)需要,但是這并沒(méi)有什么用。這篇博客提出了一系列問(wèn)題使你能夠計(jì)算出一些基準(zhǔn)。
1 通過(guò)數(shù)學(xué)建立基準(zhǔn)
首先,思考一下你的應(yīng)用操作需要的資源基準(zhǔn)的指標(biāo)。
關(guān)鍵指標(biāo)如下:
- 每秒的數(shù)據(jù)量和每條數(shù)據(jù)的大小
- 去重key的數(shù)量和每個(gè)key的state大小
- state更新的數(shù)量和state backend的方式
最后,考慮一下你的服務(wù)等級(jí)協(xié)議(SLAS),比如宕機(jī)時(shí)間、延遲和最大的吞吐量。這些指標(biāo)將直接影響你的容量計(jì)算。
接下來(lái),看一下基于預(yù)算可用的資源大小。比如:
- 網(wǎng)絡(luò)容量,需要考慮外部服務(wù)的網(wǎng)絡(luò)消耗,比如Kakfa、HDFS等。
- 磁盤(pán)帶寬,比如你使用磁盤(pán)的state backend,比如RocksDB。同時(shí)需要考慮外部服務(wù)的磁盤(pán)使用,比如Kafka、HDFS等。
- 機(jī)器的數(shù)量和它們的CPU和內(nèi)存。
基于上述這些因素,你現(xiàn)在能夠估算正常流程的的資源基準(zhǔn)。另外,還需要增加一些資源用作異常的恢復(fù)和checkpointing。
2 樣例:計(jì)算
我現(xiàn)在通過(guò)一個(gè)集群上的虛擬job部署來(lái)描述整個(gè)資源基準(zhǔn)的建立過(guò)程。信封背計(jì)算法的所用到的數(shù)字是不精準(zhǔn)的,同時(shí)并沒(méi)有考慮的很全面。在后面,我會(huì)指出在做計(jì)算時(shí)的忽視的一些點(diǎn)。
2.1 Flink流式應(yīng)用樣例和硬件

在這個(gè)案列中,我將部署典型的Flink流式應(yīng)用,Kafka Topic的數(shù)據(jù)作為數(shù)據(jù)源。這個(gè)流接著使用keyed, aggregating window操作轉(zhuǎn)換。窗口操作執(zhí)行5分鐘的窗口聚合。同時(shí)假設(shè)有源源不斷的數(shù)據(jù)進(jìn)來(lái),window被設(shè)置成1分鐘滑動(dòng)一次。
這表示每分鐘執(zhí)行一次過(guò)去5分鐘內(nèi)的窗口聚合。這個(gè)流式應(yīng)用根據(jù)userId字段進(jìn)行聚合。Kafka Topic中消息的大小平均是2KB。
吞吐量是每秒100萬(wàn)條消息。為了理解窗口操作的state大小,你需要知道distinct Keys的數(shù)量,就是userIds的數(shù)量,這邊大約是5000萬(wàn)個(gè)不同的用戶ID。對(duì)于每個(gè)用戶,你需要計(jì)算4個(gè)數(shù)字,通過(guò)longs(8 byte)存儲(chǔ)。
現(xiàn)在,讓我們總結(jié)一下這個(gè)任務(wù)的關(guān)鍵指標(biāo):
- 消息大?。?KB
- 吞吐量:1000000 msg/sec
- Distinct Keys: 500000000(窗口聚合:每個(gè)key4個(gè)long大小)
- Checkpointing: 每分鐘一次
— 硬件:
- 5臺(tái)機(jī)器
- 10 gigabit 以太網(wǎng)
- 每臺(tái)機(jī)器運(yùn)行一個(gè)Flink TaskManager
- 磁盤(pán)通過(guò)網(wǎng)絡(luò)掛載
— Kafka 獨(dú)立部署

總共有5臺(tái)機(jī)器運(yùn)行這個(gè)job,每臺(tái)機(jī)器上運(yùn)行一個(gè)TaskManager。磁盤(pán)通過(guò)網(wǎng)絡(luò)掛載,同時(shí)有10 gigabit的以太網(wǎng)接入。同時(shí)Kafka是獨(dú)立部署在其他機(jī)器上。
每臺(tái)機(jī)器有16CPU核。為了簡(jiǎn)化的需要,這邊不考慮CPU和內(nèi)存的使用情況。在實(shí)際情況下,你需要根據(jù)應(yīng)用邏輯和state backend的使用,來(lái)考慮內(nèi)存的使用。這個(gè)例子使用RocksDB state backend。(它是健壯的,同時(shí)對(duì)內(nèi)存需求比較低)。
2.2 單機(jī)計(jì)算
為了理解整個(gè)job運(yùn)行部署的資源需求,最容易的方式是關(guān)注單臺(tái)機(jī)器和TaskManager的操作。你可以通過(guò)單臺(tái)機(jī)器計(jì)算出來(lái)的數(shù)字來(lái)推斷整個(gè)集群的資源需求。
默認(rèn)(所有的操作都有并行度和沒(méi)有特殊的調(diào)度限制)所有的操作在每臺(tái)機(jī)器上都有運(yùn)行。
在這個(gè)例子中,Kafka source, 窗口操作和Kafka sink都運(yùn)行在每臺(tái)機(jī)器上。

keyBy是一個(gè)分離的操作,因此資源需求計(jì)算比較容易。在現(xiàn)實(shí)中,keyBy是一個(gè)API,連接了Kafak Source和窗口操作。
我現(xiàn)在將從頭到底理解這些操作的網(wǎng)絡(luò)資源需求。
2.3 Kafka source
為了計(jì)算Kafka source收到的數(shù)據(jù)量,首先需要計(jì)算Kafka的聚合輸入。sources每秒收到100萬(wàn)消息,每條消息2KB大小。
2GB/s除以5臺(tái)機(jī)器,得到如下結(jié)果:
集群中每臺(tái)機(jī)器上TaskManager的source收到400MB/s的數(shù)據(jù)。

2.4 Shuffle / keyBY
接下來(lái),你需要確保同一個(gè)key的所有事件落在一些機(jī)器上。這邊,你從kafka中讀取的數(shù)據(jù)可能被重新分區(qū)。
shuffle過(guò)程發(fā)送所有擁有相同key的數(shù)據(jù)到同一臺(tái)機(jī)器,因此這邊把400MB/s的數(shù)據(jù)分割成一個(gè)根據(jù)userId分區(qū)的流。
平均來(lái)看,你將發(fā)送80MB/s的數(shù)據(jù)到每一臺(tái)機(jī)器。這個(gè)分析是從單臺(tái)機(jī)器的角度,但是一些數(shù)據(jù)已經(jīng)在目標(biāo)機(jī)器上了,因此要減去80MB/s。

2.5 Window Emit and Kafka Sink
接下去的問(wèn)題是窗口操作發(fā)送多少數(shù)據(jù)到Kafka Sink。結(jié)果是67MB/s,讓我們看一下如何計(jì)算。
窗口操作為每個(gè)key保持了4個(gè)數(shù)字(longs)聚合。每一分鐘,操作將發(fā)送當(dāng)前的聚合值。每個(gè)key發(fā)送2ints(user_id, window_ts)和4 longs。
然后乘以keys數(shù)量(500000000除以機(jī)器數(shù)量)
然后計(jì)算每秒的大?。?br>
這表示每個(gè)TaskManager從窗口操作中平均發(fā)送67MB/s的用戶數(shù)據(jù)。因?yàn)镵afka sink運(yùn)行在每個(gè)TaskManager上,所以沒(méi)有進(jìn)一步的分區(qū)操作。這就是從Flink到Kafka的發(fā)送的數(shù)據(jù)量。

從窗口操作中得到的數(shù)據(jù)每分鐘會(huì)發(fā)送一次。在實(shí)際中,這個(gè)操作不會(huì)發(fā)以67MB/s的發(fā)送數(shù)據(jù),而是在一分鐘之內(nèi)的幾秒間到達(dá)最大帶寬。
現(xiàn)在,總結(jié)一下:
- 進(jìn)來(lái)的數(shù)據(jù):720MB/s(400+320)per machine
- 出去的數(shù)據(jù):387MB/s(320+67)per machine

2.6 State和Checkpointing
到目前為止,我們僅僅計(jì)算了Flink處理的用戶數(shù)據(jù)。你同時(shí)還需要考慮磁盤(pán)的使用,比如存儲(chǔ)state 和checkpointing。為了計(jì)算磁盤(pán)的花銷,你需要查看窗口計(jì)算如何進(jìn)入state。Kafka Source也需要保持一些state,但是跟窗口操作的state相比,可以忽略不計(jì)。
為了理解窗口操作的state大小,讓我們換一個(gè)角度看這個(gè)問(wèn)題。Flink計(jì)算5分鐘的時(shí)間窗口,并且1分鐘滑動(dòng)一次。Flink是通過(guò)保持5個(gè)窗口來(lái)實(shí)現(xiàn)滑動(dòng)窗口。根據(jù)先前提到的,在使用窗口時(shí),你需要為每個(gè)窗口保持40bytes的狀態(tài),并且窗口是提前聚合的。對(duì)于每一條到來(lái)的事件,你首先需要取出當(dāng)前聚合值,再更新聚合值,然后把新值寫(xiě)回去。

這意味著:
有40MB/s的磁盤(pán)讀寫(xiě)(每臺(tái)機(jī)器上)。根據(jù)先前說(shuō)的,磁盤(pán)是通過(guò)網(wǎng)絡(luò)掛載的。因此需要在先前的基礎(chǔ)上增加這個(gè)值。
現(xiàn)在總共需要的資源如下:
- 進(jìn)入的數(shù)據(jù):760MB/s(400MB/s data in + 320MB/s shuffle + 40MB/s state)
- 出去的數(shù)據(jù):427MB/s(320MB/s shuffle + 67MB/s data out + 40MB/s state)

上述的計(jì)算只考慮了事件到達(dá)窗口操作時(shí)觸發(fā)時(shí)state的進(jìn)入。此外,你還需要checkpoint和容錯(cuò)機(jī)制。因?yàn)?,如果一臺(tái)機(jī)器或者其他任何東西掛掉,你需要恢復(fù)你的窗口并繼續(xù)處理。
根據(jù)先前所說(shuō),Checkpointing是每隔1分鐘執(zhí)行一次,并且每個(gè)checkpoint會(huì)復(fù)制整個(gè)job的狀態(tài)到(通過(guò)網(wǎng)絡(luò)掛載)文件系統(tǒng)。
現(xiàn)在,讓我們快速的計(jì)算一下每臺(tái)機(jī)器的state大小:
接著算每秒的值:
和窗口操作類似,checkpointing也是每分鐘執(zhí)行一次。它嘗試全速發(fā)送數(shù)據(jù)到外部存儲(chǔ)。Checkpointing引起了額外的state進(jìn)入。(自從Flink1.3后,RocksDB支持增量checkpointing來(lái)降低每次checkpoint時(shí)所需的網(wǎng)絡(luò)傳輸。)
計(jì)算更新如下:
- 進(jìn)入的數(shù)據(jù):760MB/s(400 + 320 + 40)
- 出去的數(shù)據(jù):760MB/s(320 + 67 + 40 + 333)

這意味著整個(gè)集群網(wǎng)絡(luò)流量是:
400是80MB的state讀寫(xiě)乘以5臺(tái)機(jī)器。2335是Kafka進(jìn)和出的總值。
整個(gè)硬件的網(wǎng)絡(luò)可用容量如下:

免責(zé)聲明:上述這些計(jì)算沒(méi)有包含協(xié)議的花銷,比如TCP、Ethernet和RPC(在Flink、Kafka和HDFS等中)。但是上述的計(jì)算仍舊對(duì)如何計(jì)算一個(gè)job的資源有指導(dǎo)意義。
Scale Your Way
基于我的分析,這個(gè)例子中,5個(gè)節(jié)點(diǎn)的集群,在典型的操作中,每個(gè)機(jī)器需要處理760MB/s的數(shù)據(jù)進(jìn)出,同時(shí)每臺(tái)機(jī)器可以處理的容量是1250MB/s。這樣保留了40%的網(wǎng)絡(luò)容量來(lái)應(yīng)對(duì)我剛才提到的復(fù)雜度,比如網(wǎng)絡(luò)協(xié)議花銷,事件重放,數(shù)據(jù)傾斜引起的不平均的負(fù)載等。
當(dāng)然,沒(méi)有一個(gè)標(biāo)準(zhǔn)答案來(lái)說(shuō)明留40%的余量是否合適。但是這個(gè)算法可以給你一個(gè)好的開(kāi)始。嘗試上述的計(jì)算,修改上述的參數(shù)為你自己的參數(shù)。Happy scaling!
翻譯源
How To Size Your Apache Flink? Cluster: A Back-of-the-Envelope Calculation