如何計(jì)算Flink集群規(guī)模:信封背計(jì)算法

2017年柏林Flink Forward大會(huì)上Robert Metger的"Keep It Going: How to Reiably and Efficiently Operate Apache Flink"的演講很受歡迎。Robert的其中一個(gè)主題演講涉及到了如何估算Flink集群規(guī)模。Flink Forward大會(huì)的觀眾們認(rèn)為這個(gè)計(jì)算方法對(duì)他們很有用,因此我們把他的演講主題轉(zhuǎn)變成這篇博客。

Flink社區(qū)上經(jīng)常被問(wèn)起的一個(gè)問(wèn)題是當(dāng)從開(kāi)發(fā)轉(zhuǎn)到線上時(shí),如何估算集群規(guī)模大小。當(dāng)然,最準(zhǔn)確的的答案是根據(jù)需要,但是這并沒(méi)有什么用。這篇博客提出了一系列問(wèn)題使你能夠計(jì)算出一些基準(zhǔn)。

1 通過(guò)數(shù)學(xué)建立基準(zhǔn)

首先,思考一下你的應(yīng)用操作需要的資源基準(zhǔn)的指標(biāo)。

關(guān)鍵指標(biāo)如下:

  • 每秒的數(shù)據(jù)量和每條數(shù)據(jù)的大小
  • 去重key的數(shù)量和每個(gè)key的state大小
  • state更新的數(shù)量和state backend的方式

最后,考慮一下你的服務(wù)等級(jí)協(xié)議(SLAS),比如宕機(jī)時(shí)間、延遲和最大的吞吐量。這些指標(biāo)將直接影響你的容量計(jì)算。

接下來(lái),看一下基于預(yù)算可用的資源大小。比如:

  • 網(wǎng)絡(luò)容量,需要考慮外部服務(wù)的網(wǎng)絡(luò)消耗,比如Kakfa、HDFS等。
  • 磁盤(pán)帶寬,比如你使用磁盤(pán)的state backend,比如RocksDB。同時(shí)需要考慮外部服務(wù)的磁盤(pán)使用,比如Kafka、HDFS等。
  • 機(jī)器的數(shù)量和它們的CPU和內(nèi)存。

基于上述這些因素,你現(xiàn)在能夠估算正常流程的的資源基準(zhǔn)。另外,還需要增加一些資源用作異常的恢復(fù)和checkpointing。

2 樣例:計(jì)算

我現(xiàn)在通過(guò)一個(gè)集群上的虛擬job部署來(lái)描述整個(gè)資源基準(zhǔn)的建立過(guò)程。信封背計(jì)算法的所用到的數(shù)字是不精準(zhǔn)的,同時(shí)并沒(méi)有考慮的很全面。在后面,我會(huì)指出在做計(jì)算時(shí)的忽視的一些點(diǎn)。

2.1 Flink流式應(yīng)用樣例和硬件

Example Flink Streaming job topology

在這個(gè)案列中,我將部署典型的Flink流式應(yīng)用,Kafka Topic的數(shù)據(jù)作為數(shù)據(jù)源。這個(gè)流接著使用keyed, aggregating window操作轉(zhuǎn)換。窗口操作執(zhí)行5分鐘的窗口聚合。同時(shí)假設(shè)有源源不斷的數(shù)據(jù)進(jìn)來(lái),window被設(shè)置成1分鐘滑動(dòng)一次。

這表示每分鐘執(zhí)行一次過(guò)去5分鐘內(nèi)的窗口聚合。這個(gè)流式應(yīng)用根據(jù)userId字段進(jìn)行聚合。Kafka Topic中消息的大小平均是2KB。

吞吐量是每秒100萬(wàn)條消息。為了理解窗口操作的state大小,你需要知道distinct Keys的數(shù)量,就是userIds的數(shù)量,這邊大約是5000萬(wàn)個(gè)不同的用戶ID。對(duì)于每個(gè)用戶,你需要計(jì)算4個(gè)數(shù)字,通過(guò)longs(8 byte)存儲(chǔ)。

現(xiàn)在,讓我們總結(jié)一下這個(gè)任務(wù)的關(guān)鍵指標(biāo):

  • 消息大?。?KB
  • 吞吐量:1000000 msg/sec
  • Distinct Keys: 500000000(窗口聚合:每個(gè)key4個(gè)long大小)
  • Checkpointing: 每分鐘一次

— 硬件:

  • 5臺(tái)機(jī)器
  • 10 gigabit 以太網(wǎng)
  • 每臺(tái)機(jī)器運(yùn)行一個(gè)Flink TaskManager
  • 磁盤(pán)通過(guò)網(wǎng)絡(luò)掛載

— Kafka 獨(dú)立部署

假設(shè)硬件步驟

總共有5臺(tái)機(jī)器運(yùn)行這個(gè)job,每臺(tái)機(jī)器上運(yùn)行一個(gè)TaskManager。磁盤(pán)通過(guò)網(wǎng)絡(luò)掛載,同時(shí)有10 gigabit的以太網(wǎng)接入。同時(shí)Kafka是獨(dú)立部署在其他機(jī)器上。

每臺(tái)機(jī)器有16CPU核。為了簡(jiǎn)化的需要,這邊不考慮CPU和內(nèi)存的使用情況。在實(shí)際情況下,你需要根據(jù)應(yīng)用邏輯和state backend的使用,來(lái)考慮內(nèi)存的使用。這個(gè)例子使用RocksDB state backend。(它是健壯的,同時(shí)對(duì)內(nèi)存需求比較低)。

2.2 單機(jī)計(jì)算

為了理解整個(gè)job運(yùn)行部署的資源需求,最容易的方式是關(guān)注單臺(tái)機(jī)器和TaskManager的操作。你可以通過(guò)單臺(tái)機(jī)器計(jì)算出來(lái)的數(shù)字來(lái)推斷整個(gè)集群的資源需求。

默認(rèn)(所有的操作都有并行度和沒(méi)有特殊的調(diào)度限制)所有的操作在每臺(tái)機(jī)器上都有運(yùn)行。

在這個(gè)例子中,Kafka source, 窗口操作和Kafka sink都運(yùn)行在每臺(tái)機(jī)器上。

A machine perspective - TaskManager n

keyBy是一個(gè)分離的操作,因此資源需求計(jì)算比較容易。在現(xiàn)實(shí)中,keyBy是一個(gè)API,連接了Kafak Source和窗口操作。

我現(xiàn)在將從頭到底理解這些操作的網(wǎng)絡(luò)資源需求。

2.3 Kafka source

為了計(jì)算Kafka source收到的數(shù)據(jù)量,首先需要計(jì)算Kafka的聚合輸入。sources每秒收到100萬(wàn)消息,每條消息2KB大小。
2KB \times 1000000/s = 2GB/s
2GB/s除以5臺(tái)機(jī)器,得到如下結(jié)果:
2GB/s \div 5 machines = 400MB/s
集群中每臺(tái)機(jī)器上TaskManager的source收到400MB/s的數(shù)據(jù)。

Kafka source calculation

2.4 Shuffle / keyBY

接下來(lái),你需要確保同一個(gè)key的所有事件落在一些機(jī)器上。這邊,你從kafka中讀取的數(shù)據(jù)可能被重新分區(qū)。
shuffle過(guò)程發(fā)送所有擁有相同key的數(shù)據(jù)到同一臺(tái)機(jī)器,因此這邊把400MB/s的數(shù)據(jù)分割成一個(gè)根據(jù)userId分區(qū)的流。
400MB/s \div 5 machines = 80MB/s
平均來(lái)看,你將發(fā)送80MB/s的數(shù)據(jù)到每一臺(tái)機(jī)器。這個(gè)分析是從單臺(tái)機(jī)器的角度,但是一些數(shù)據(jù)已經(jīng)在目標(biāo)機(jī)器上了,因此要減去80MB/s。
400MB/s - 80MB/s = 320MB/s

shuffle calculation

2.5 Window Emit and Kafka Sink

接下去的問(wèn)題是窗口操作發(fā)送多少數(shù)據(jù)到Kafka Sink。結(jié)果是67MB/s,讓我們看一下如何計(jì)算。
窗口操作為每個(gè)key保持了4個(gè)數(shù)字(longs)聚合。每一分鐘,操作將發(fā)送當(dāng)前的聚合值。每個(gè)key發(fā)送2ints(user_id, window_ts)和4 longs。
(2 \times4bytes) + (4\times8bytes) = 40 bytes \, per\, key
然后乘以keys數(shù)量(500000000除以機(jī)器數(shù)量)
500000000 \div5machines \times40bytes = 40GB
然后計(jì)算每秒的大?。?br> 40GB/min \div60=67MB/s
這表示每個(gè)TaskManager從窗口操作中平均發(fā)送67MB/s的用戶數(shù)據(jù)。因?yàn)镵afka sink運(yùn)行在每個(gè)TaskManager上,所以沒(méi)有進(jìn)一步的分區(qū)操作。這就是從Flink到Kafka的發(fā)送的數(shù)據(jù)量。

User data: From Kafka, shuffled to the window operators and back to Kafka

從窗口操作中得到的數(shù)據(jù)每分鐘會(huì)發(fā)送一次。在實(shí)際中,這個(gè)操作不會(huì)發(fā)以67MB/s的發(fā)送數(shù)據(jù),而是在一分鐘之內(nèi)的幾秒間到達(dá)最大帶寬。

現(xiàn)在,總結(jié)一下:

  • 進(jìn)來(lái)的數(shù)據(jù):720MB/s(400+320)per machine
  • 出去的數(shù)據(jù):387MB/s(320+67)per machine
How-to1-1.png

2.6 State和Checkpointing

到目前為止,我們僅僅計(jì)算了Flink處理的用戶數(shù)據(jù)。你同時(shí)還需要考慮磁盤(pán)的使用,比如存儲(chǔ)state 和checkpointing。為了計(jì)算磁盤(pán)的花銷,你需要查看窗口計(jì)算如何進(jìn)入state。Kafka Source也需要保持一些state,但是跟窗口操作的state相比,可以忽略不計(jì)。

為了理解窗口操作的state大小,讓我們換一個(gè)角度看這個(gè)問(wèn)題。Flink計(jì)算5分鐘的時(shí)間窗口,并且1分鐘滑動(dòng)一次。Flink是通過(guò)保持5個(gè)窗口來(lái)實(shí)現(xiàn)滑動(dòng)窗口。根據(jù)先前提到的,在使用窗口時(shí),你需要為每個(gè)窗口保持40bytes的狀態(tài),并且窗口是提前聚合的。對(duì)于每一條到來(lái)的事件,你首先需要取出當(dāng)前聚合值,再更新聚合值,然后把新值寫(xiě)回去。

Window State

這意味著:
40 \, bytes \,of \,state \times 200000 msg/s \, per \, machine = 40MB/s

有40MB/s的磁盤(pán)讀寫(xiě)(每臺(tái)機(jī)器上)。根據(jù)先前說(shuō)的,磁盤(pán)是通過(guò)網(wǎng)絡(luò)掛載的。因此需要在先前的基礎(chǔ)上增加這個(gè)值。

現(xiàn)在總共需要的資源如下:

  • 進(jìn)入的數(shù)據(jù):760MB/s(400MB/s data in + 320MB/s shuffle + 40MB/s state)
  • 出去的數(shù)據(jù):427MB/s(320MB/s shuffle + 67MB/s data out + 40MB/s state)

上述的計(jì)算只考慮了事件到達(dá)窗口操作時(shí)觸發(fā)時(shí)state的進(jìn)入。此外,你還需要checkpoint和容錯(cuò)機(jī)制。因?yàn)?,如果一臺(tái)機(jī)器或者其他任何東西掛掉,你需要恢復(fù)你的窗口并繼續(xù)處理。

根據(jù)先前所說(shuō),Checkpointing是每隔1分鐘執(zhí)行一次,并且每個(gè)checkpoint會(huì)復(fù)制整個(gè)job的狀態(tài)到(通過(guò)網(wǎng)絡(luò)掛載)文件系統(tǒng)。

現(xiàn)在,讓我們快速的計(jì)算一下每臺(tái)機(jī)器的state大小:
40 \, bytes \, of \, state \times 5 \, windows \times100000000 \, keys = 20GB
接著算每秒的值:
20GB \div 60 =333MB/s
和窗口操作類似,checkpointing也是每分鐘執(zhí)行一次。它嘗試全速發(fā)送數(shù)據(jù)到外部存儲(chǔ)。Checkpointing引起了額外的state進(jìn)入。(自從Flink1.3后,RocksDB支持增量checkpointing來(lái)降低每次checkpoint時(shí)所需的網(wǎng)絡(luò)傳輸。)

計(jì)算更新如下:

  • 進(jìn)入的數(shù)據(jù):760MB/s(400 + 320 + 40)
  • 出去的數(shù)據(jù):760MB/s(320 + 67 + 40 + 333)
Window State

這意味著整個(gè)集群網(wǎng)絡(luò)流量是:
(760 + 760)\times5 + 400 + 2335=10335MB/s
400是80MB的state讀寫(xiě)乘以5臺(tái)機(jī)器。2335是Kafka進(jìn)和出的總值。

整個(gè)硬件的網(wǎng)絡(luò)可用容量如下:

Networking requirements

免責(zé)聲明:上述這些計(jì)算沒(méi)有包含協(xié)議的花銷,比如TCP、Ethernet和RPC(在Flink、Kafka和HDFS等中)。但是上述的計(jì)算仍舊對(duì)如何計(jì)算一個(gè)job的資源有指導(dǎo)意義。

Scale Your Way

基于我的分析,這個(gè)例子中,5個(gè)節(jié)點(diǎn)的集群,在典型的操作中,每個(gè)機(jī)器需要處理760MB/s的數(shù)據(jù)進(jìn)出,同時(shí)每臺(tái)機(jī)器可以處理的容量是1250MB/s。這樣保留了40%的網(wǎng)絡(luò)容量來(lái)應(yīng)對(duì)我剛才提到的復(fù)雜度,比如網(wǎng)絡(luò)協(xié)議花銷,事件重放,數(shù)據(jù)傾斜引起的不平均的負(fù)載等。

當(dāng)然,沒(méi)有一個(gè)標(biāo)準(zhǔn)答案來(lái)說(shuō)明留40%的余量是否合適。但是這個(gè)算法可以給你一個(gè)好的開(kāi)始。嘗試上述的計(jì)算,修改上述的參數(shù)為你自己的參數(shù)。Happy scaling!

翻譯源

How To Size Your Apache Flink? Cluster: A Back-of-the-Envelope Calculation

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文是先介紹 Flink,再說(shuō) Flink的過(guò)去和現(xiàn)在 一、Flink介紹 Flink是一款分布式的計(jì)算引擎,它可...
    生活的探路者閱讀 1,403評(píng)論 0 22
  • 當(dāng)一個(gè)人給了你選擇的權(quán)力 說(shuō)明那顆心已經(jīng)碎得一塌糊涂 你只能選擇接受或是辜負(fù) 不被理解的心酸 終是他一個(gè)人的苦悶 ...
    斷橋情傷閱讀 186評(píng)論 0 0
  • 二姨夫是做生意的一把好手,這不就在城關(guān)十字街頭的道路旁邊,他種出來(lái)的西瓜賣(mài)一塊錢(qián)一斤,不遠(yuǎn)處的路對(duì)面也有另一家買(mǎi)西...
    知心玲姐閱讀 308評(píng)論 0 0
  • 對(duì)于愛(ài)情,我們有種莫名的執(zhí)著,我們總是尋求轟轟烈烈的愛(ài)情,那些驚天地泣鬼神的證明才能讓自己信服。 但事實(shí)上,一個(gè)人...
    萌石科技閱讀 559評(píng)論 0 0

友情鏈接更多精彩內(nèi)容