一種平衡Spark計算負(fù)載的中間數(shù)據(jù)存放策略


An intermediate data placement algorithm for load balancing in Spark computing environment

最近在研究一些Spark成本優(yōu)化的東西,看了一些論文稍微總結(jié)一下思路,方便思維拓寬和希望與大家交流!

本篇博文參考自:

Future Generation Computer Systems 78 (2018) 287–301:
《An intermediate data placement algorithm for load balancing in Spark computing environment》


1 文章概述及問題描述

Spark作為基于內(nèi)存迭代的云計算框架,其很容易發(fā)生數(shù)據(jù)傾斜,尤其是在Shuffle階段,reduce端所拉取的數(shù)據(jù)量很容易出現(xiàn)不平衡,這將導(dǎo)致某些reduce計算很久,使得整體計算發(fā)生延時,嚴(yán)重時會導(dǎo)致application失敗。本篇論文討論spark中mapreduce框架中所出現(xiàn)的shuffle階段的數(shù)據(jù)傾斜問題。

MR的數(shù)據(jù)格式全是K-V形式,因此數(shù)據(jù)的傾斜就是Key的傾斜,導(dǎo)致某些分區(qū)中的數(shù)據(jù)量過大,因此,對于reducer來說,partition的傾斜將會導(dǎo)致reduce的傾斜。

  • 為什么容易出現(xiàn)數(shù)據(jù)傾斜 *
    partition的大小取決于具有相關(guān)性的key/value的數(shù)量,由于spark中keys的分配調(diào)取由hash算法決定,因此不同的reducer之間會出現(xiàn)數(shù)據(jù)量不同的情況,有些reducer要處理的數(shù)據(jù)可能會非常大。

2 論文中的相關(guān)術(shù)語以及spark基礎(chǔ)

  1. clusters
    一個data clusters就是一組包含全部相同Key的map中間輸出結(jié)果的數(shù)據(jù)集合。
    在Spark中,所有的被相同的reduce處理的clustes組成一個分區(qū)(partition)。

  1. Bucket
    bucket是指一個序列的緩沖區(qū),用于收集并緩存map的輸出,相對應(yīng)的reduce會從相應(yīng)的bucket上拉取數(shù)據(jù)。

  1. MR*
    假設(shè)reduce個數(shù)為R,map個數(shù)為M,則每一個map將會創(chuàng)建R個bucket,所以bucket的總數(shù)為R*M

  1. spark-shuffle(1.1.0)中的輸出分布
    Data distribution of shuffle in Spark 1.1.0.

  1. Zipf distributions
    文本數(shù)據(jù)的key一般來說是服從Zipf distributions

參考:J. Lin, et al. The curse of zipf and limits to parallelization: A look at the stragglers problem in mapreduce, in: 7th Workshop on Large-Scale Distributed Systems for Information Retrieval, 2012, pp. 2000–2009.


3 方法概述

為了解決shuffle階段出現(xiàn)的數(shù)據(jù)傾斜,本文提出了一種分割與合并的算法(splitting and combination algorithm for skew intermediate data blocks (SCID)),這種算法采用基于水塘抽樣的抽樣算法,來自動偵測中間數(shù)據(jù)的分布情況,并且是在spark程序運行之前就可以判斷。

大致做法就是SCID會對鍵值集合的clustes的大小進(jìn)行排序,并且將會依次存放在相應(yīng)的bucket中,bucket的大小是固定的,某個bucket存滿后,該clustes會被切分,剩下的clusters將進(jìn)入下一次的迭代。經(jīng)過這樣的幾輪分配,每一個bucket中的數(shù)據(jù)大小幾乎相同,所以此時相應(yīng)的reducer拉取到的數(shù)據(jù)就是相同的。與此同時,我們可以知道,同一個bucket中可能會存放來自不同clusters的數(shù)據(jù)。

但是,上述算法所遇到的問題就是,如果map中間輸出數(shù)據(jù)的key的分布實現(xiàn)不知道的話,就無法在clusters上使用精確的判別機(jī)制來進(jìn)行切分。而這樣的算法只有在MR執(zhí)行前進(jìn)行才會顯得有意義,因此本文提出動態(tài)范圍分區(qū)(dynamic range partition)的方法,對輸入數(shù)據(jù)進(jìn)行以此預(yù)先的抽樣,并將抽樣結(jié)果輸入給一小部分的map tasks中,以實現(xiàn)分布統(tǒng)計,通過相應(yīng)的統(tǒng)計值,可以預(yù)測出map階段后所產(chǎn)生冊所有clusters的大小,這個結(jié)果將會作為split策略的輸入。

  • 注意,所有的策略均在真正spark程序運行之前進(jìn)行 *

綜上所述,本文所做的工作就是在原有spark架構(gòu)基礎(chǔ)上添加了一個統(tǒng)計分布功能用于抽樣并作為split和combine算法的輸入。這是一種細(xì)粒度(fine-grained)的算法,他能夠重新進(jìn)行partition,改變原有分區(qū)的大小并發(fā)送給相應(yīng)的buckets,從而解決了reduce端數(shù)據(jù)傾斜問題。


4 主要貢獻(xiàn)

使用水塘抽樣進(jìn)行輸入數(shù)據(jù)的采樣,并提出一種驗證模型來選擇合適的采樣率。這樣的模型在運行中考慮到了成本、效果以及方差的重要性
提出了一種切分以及合并鍵值對clusters的算法。通過以相同大小的clusters的組合來填充固定數(shù)量的buckets從而達(dá)到reduce端具有更好的負(fù)載均衡。
文章基于spark1.1.0驗證了本文算法模型帶來的性能提升和數(shù)據(jù)傾斜問題的減緩。


5 系統(tǒng)整體介紹

由前面的術(shù)語介紹以及mapreduce的原理可知:每一個split會由一個map來處理,并生成一中間結(jié)果,中間結(jié)果通常是被partitioned的,而且被分到相應(yīng)的bucket中去。本文使用如下表達(dá)式來來自 m map Tasks的中間結(jié)果:



根據(jù)bucket的概念,我們可以用下面的公式來表達(dá)一個cluster所分成的buckets:



所有屬于相同cluster的鍵值集合被放置在同一個partition中,所以根據(jù)key可以將所有的K進(jìn)行n個partition的劃分,用如下表示:

下圖流程是一個被改善過的Spark工作流程。


其中最重的組件就是 load balancing module。spark運行之前,這個組件會生成一個balanced partitioning策略,這個策略會指導(dǎo)該如何split clusters以及如何combine分割后的clusters。balance策略能夠被使用在shuffle階段。
關(guān)于load balancing module,在本文共包含以下兩個階段:

  1. Data samling:抽樣是在map之前進(jìn)行的,所謂的抽樣就是從全部的輸入split采樣小樣本,進(jìn)而通過對key的統(tǒng)計方法來預(yù)測map輸出結(jié)果中每一個cluster的大小。
  2. Splitting and combination:抽樣結(jié)束后,會產(chǎn)生一個clusters的切分和combine方法。一些過大的clusters可能會被切分多個以填充到不同的buckets中去。切分的主要還是要看固定的bucket的容量大小。

6 抽樣模型

6.1 數(shù)據(jù)偏移模型

模型的本質(zhì)就是對clusters、bucket等概念進(jìn)行結(jié)構(gòu)化,通過對每一個組件進(jìn)行結(jié)構(gòu)化的分析,最終引出偏移與均值和方差的條件關(guān)系,推導(dǎo)出FoS這樣一個偏移程度的指標(biāo)。因此,文中定義以下幾個公式:

1)因為一個cluster中是包含相同key的鍵值對數(shù)據(jù),因此,所有的clusters能夠被形式化如下:
C={C1,C2....Ci...,Cm}, 1<=i<=m
m是cluster的編號,Ci被定義為一種結(jié)構(gòu)體:Ci ={order,SC},其中order記錄了這個cluster的順序,而SC被定義為一個序列:SC = {SC1,SC2....,SCi...SCm} 1<=i<=m.其中SCi是整形變量,表示對應(yīng)的cluster的數(shù)據(jù)集的大小。
2)bucket可以形式化為:B = {B1,B2,....Bk,...Bn} 1<=k<=n 其中n為bucket的編號

3)前面說到,文本數(shù)據(jù)的key一般服從Zipf分布,這個模型里面設(shè)置了一個變量a(0.1<=a<=1.2)來控制偏移量,a越大代表偏移越大,a僅僅影響當(dāng)前輸入數(shù)據(jù)的偏移大小。本文使用矩陣P來表示這樣一種分布,pki表示從cluster Ci所獲得的鍵值對的數(shù)量,這個數(shù)量后面將會被bucket Bk所拉?。ㄕf到這,大家還是要搞清楚文中這幾個術(shù)語之間的關(guān)系的:map的輸出數(shù)據(jù)相當(dāng)于一堆鍵值對,擁有相同key的數(shù)據(jù)<tuples>組成一個cluster,而同時被相同reduce處理的clusters組成一個partition,partition最后會被放入bucket中),因此SCi可以被看作是在所有buckets中來自cluster Ci的總大小,可以用如下公式來定義:


4)bucket k中所包含的鍵值對的個數(shù)由BC(k) 來表示,對于數(shù)據(jù)的分布pki,外加上之前給的變量a,則分布可以表示為:


那么BC(k)的值可以表示為如下公式:

5)由上式,我們可以計算在當(dāng)前偏移參數(shù)a的前提下的所有buckets的鍵值對的平均數(shù)量如下公式所示,其中,n代表的是全部bucket的數(shù)量。

6)一般而言,被bucket所處理的中間數(shù)據(jù)能夠被看作是具有數(shù)據(jù)偏移標(biāo)準(zhǔn)差的,那么,當(dāng)滿足以下條件時,被bucket所處理的clusters將被視作具有數(shù)據(jù)偏移,std是所有buckets中key-value對個數(shù)的標(biāo)準(zhǔn)差:

7)所有clusters中的數(shù)據(jù)的標(biāo)準(zhǔn)差可以用以下公示表達(dá):

8)最后,為了歸一化表達(dá)數(shù)據(jù)偏移的程度,定義了FoS指標(biāo)(factor of skew)來評測所有buckets負(fù)載均衡的程度,FoS指標(biāo)值越小,則代表負(fù)載越均衡,則偏移也相應(yīng)的越?。?br>

6.2 水塘抽樣算法

為何要使用水塘抽樣?
在一般的編程語言中,常規(guī)的抽樣是使用偽隨機(jī)數(shù),對于大規(guī)模的數(shù)據(jù),特別是隨著采樣空間的增加,這樣簡單的偽隨機(jī)數(shù)不能保證所有樣本完全隨機(jī)化,不可避免地會產(chǎn)生一些重復(fù)樣本。而水塘抽樣則能夠有效避免這一問題,他將使得樣本出現(xiàn)的概率均相同,保證樣本的隨機(jī)性,特別是從某些序列流數(shù)據(jù)中抽取數(shù)據(jù)時,水塘抽樣可以保證原始key的分布更加接近于整體真實情況。
水塘抽樣是為了解決未知大小數(shù)據(jù)集的隨機(jī)數(shù)抽取問題,要求從一個未知大小的數(shù)據(jù)集中等概率地拿出k個數(shù)。尤其是在大數(shù)據(jù)背景下的采樣問題,對于大規(guī)模數(shù)據(jù),我們無法將其全部加載到內(nèi)存,此時需要根據(jù)內(nèi)存大小k來等概率地從全部數(shù)據(jù)中抽取大小為k的數(shù)。

水塘抽樣基本思想:
1、簡單場景:如果我們已知這個數(shù)據(jù)集只有3個數(shù)字,那么我們在拿取第一個數(shù)的時候,其出現(xiàn)在水池中的概率為1,拿取第二個數(shù)的時候,其出現(xiàn)在水池中的概率為1/2,在拿取最后一個數(shù)的時候,我們?yōu)榱说雀怕实胤祷?個數(shù),分為兩種情況:1)返回第三個數(shù):顯然如果要保留第三個數(shù)則齊概率為1/3。2)如果返回前兩個數(shù)的其中一個,則其概率為(1-1/3)1/2=1/3,即不選擇第三個數(shù)的概率選擇前兩個數(shù)任意一個的概率,因此水塘中每個數(shù)字出現(xiàn)的概率均相同。
2、復(fù)雜場景:文中的抽樣方法是需要返回k個數(shù),因此這里直接修改1中的返回一個數(shù)為k個數(shù)即可,即:每個數(shù)字在水池中出現(xiàn)的概率為k/n。
其算法流程如下:
1)初始化時,我們依次將前k個數(shù)加載到水池中。
2)隨后考慮第k+1個數(shù)的生死。此時分兩種情況:
a.水池中全部元素沒有被替換
b.水池中某個元素被第k+1個替換
先來看情況b:對于第k個元素,此時生成一個0i(i=k+1)的隨機(jī)數(shù)p,如果p<k(相當(dāng)于生成01的隨機(jī)數(shù)),則第k+1個數(shù)被選中,并且用這個數(shù)去替換水池中的某一個數(shù),此時第k+1個數(shù)在水庫中出現(xiàn)的概率為k/k+1,接下來我們要看看水庫中每個元素被替換的概率:條件概率,首先要第k+1個數(shù)被選中,其次是k個數(shù)中隨機(jī)選出一個來替換,則k個數(shù)中被替換的概率為(k/k+1)*(1/k)=1/k+1.那么水庫中原先的k個數(shù)每個數(shù)還能繼續(xù)出現(xiàn)的概率就等于1-1/(k+1)=k/k+1,可以看出,不管是新來的元素還是以前的元素的出現(xiàn)概率均為k/k+1。
對于情況a:如果所有元素都沒有被替換,就說明第k+1個元素沒有被選中,則此時水池中每個元素出現(xiàn)的概率就為1-第k+1個元素被選中的概率=1-k/(k+1)=1/k+1
這樣的一個規(guī)律可以用數(shù)學(xué)歸納法,直到證明完第i+1個數(shù)時仍然成立即可。
下面是水塘抽樣算法的偽代碼部分:

//stream代表數(shù)據(jù)流  
//reservoir代表返回長度為k的池塘  
//從stream中取前k個放入reservoir;  
for ( int i = 1; i < k; i++)  
   reservoir[i] = stream[i];  
for (i = k; stream != null; i++) {  
   p = random(0, i);  
   if (p < k) reservoir[p] = stream[i];  
return reservoir;

6.3 cluster容量大小的預(yù)測

建立在抽樣算法的輸出結(jié)果上,同時,基于抽樣算法中的假設(shè):抽樣得到的數(shù)據(jù)的key的分布與整體數(shù)據(jù)的key的分布是相同的,因此,能夠大致預(yù)測出每個map節(jié)點上每個cluster的數(shù)據(jù)大小。
步驟:

  1. 輸入數(shù)據(jù)是抽樣算法的輸出結(jié)果,用BS表示;同時MRjob用來表示處理BS的spark作業(yè)。
  2. 基于部署在map節(jié)點上的監(jiān)控工具,可以得到clusters個數(shù)的記錄,同時獲得每個cluster中k/v的個數(shù){SCi}.
  3. 由于抽樣數(shù)據(jù)中的每一個key的數(shù)量被認(rèn)為是與原始數(shù)據(jù)中key的數(shù)量成比例的,因此可以通過擴(kuò)大每一個clusters的個數(shù)來來近似估計cluster的真實大小。

7 Cluster的切分和重組模型(5. Cluster splitting and combining)

  1. 算法流程概述:
    clusters的切分和重組算法實際就是想法設(shè)法將全部的clusters打包并分配到大小相近的buckets中去。整體流程可參考下圖:


算法的輸入是前面我們所定義的{Ci}:clusters tuples的集合;{SCi}:每一個cluster在{Ci}中集合的個數(shù);B:當(dāng)前buckets的序列;RB:當(dāng)前bucket剩余容量。算法的輸出是矩陣P,代表著clusters的存放策略。
整體思想是先對map的中間結(jié)果產(chǎn)生的clusters進(jìn)行降序排序,然后從最大的cluster(SCm)開始判斷其與固定bucket(RB)的大小關(guān)系,如果SCm>=RB,則對SCm進(jìn)行切分,只將滿足bucke大小的那一部分放進(jìn)第一個bucket中,剩余的一部分作為一個segment將進(jìn)入第二輪判斷(注意,被spilt的剩余部分在第二輪迭代前會與其他的clusters進(jìn)行重新排序),以此類推;如果SCm<=RB,則直接將其放入該bucket中然后判斷下一個SCm-1是否滿足剩余空間大小,以此類推。整個流程分輪次進(jìn)行迭代判斷,也就是說,每一次只會填充一個bucket。其中RB是表示當(dāng)前bucket剩余空間大小,隨著clusters的填充,RB的值會不斷更新,某一輪的停止條件為尋找到一個SC的大小大于RB,則表示當(dāng)前輪的bucket即將被填滿。

那么問題來了:
在spark-1.1.0中,當(dāng)map的中間輸出達(dá)到20%時,就會有shuffle階段啟動,那么說明我們是無法等到全部clusters都生成后才去得到切分和重組策略?。?!
對于這個問題,其實我們需要知道,切分和重組算法其實相當(dāng)于一個模擬的過程,我們在作業(yè)執(zhí)行前通過采樣得到了一組采樣數(shù)據(jù),這組采樣數(shù)據(jù)能夠真實反映整體數(shù)據(jù)在map輸出時的大致分布情況,因此,在進(jìn)行切分和重組算法時,模型只需輸出在已知clusters分布的情況下的一個模擬bucket分配方式即可。

切分和重組算法的輸出矩陣P如下圖所示:Pij的定義回顧一下:第i個bucket從第j個cluster所拉取的k/v的數(shù)量。

matrixP.png

這個矩陣可以換一種形式來表示,下面的形式將直接表示出每個cluster中被每個bucket所處理的k/v的數(shù)量。

matrixP02.png

2.真實作業(yè)時的Cluster分配算法
真實作業(yè)時,map達(dá)到20%輸出時就會進(jìn)行shuffle,此算法就是根據(jù)先前的模擬放置策略來動態(tài)決定當(dāng)前輸出數(shù)據(jù)的存放。算法以矩陣P以及{Ci}(每一個clusters的大?。檩斎?,以{CBij}來表示每一個bucket的當(dāng)前負(fù)載情況,即代表當(dāng)前第i個bucket所拉取的第j個cluster的數(shù)據(jù)大小。
算法流程:
該算法用于真實作業(yè)情況下中間數(shù)據(jù)的放置。有兩種基本情況
1)首先對C進(jìn)行遍歷獲得Ki的位置,如果此時在C中找不到Ki,則按照默認(rèn)的hash來放置Ki。
2)如果找得到,則代表可以在矩陣P中獲取到該Ki。算法中實現(xiàn)這一過程其實是去遍歷相關(guān)列向量pj,在矩陣P中,我們知道列序號代表著key在clusters集合C中的位置,因此,對于向量pj,每一個元素都表示著該key在每個bucket中所被允許的最大的容量。解釋完pj,則這個流程可以概括為:在遍歷pj的同時,來判斷當(dāng)前每一個bucket的負(fù)載情況BCij是否小于pij的極限,也就是為了判斷Ki的位置,算法將會遍歷所有的bucket。


8 實驗環(huán)境

1、16節(jié)點
2、spark-1.1.0 backend:HDFS
3、用例:
1)sort,該用例的數(shù)據(jù)應(yīng)該是使用的谷歌全球排序比賽的數(shù)據(jù)
排序數(shù)據(jù)分為兩種:Daytona (stock car) 和 Indy (formula 1) 。
值得一提的是目前最快紀(jì)錄是騰訊。
2)Text Search 數(shù)據(jù):(English Wikipedia
archive data set)[https://en.wikipedia.org/wiki/Archive]
3)WordCount,該用例的數(shù)據(jù)未在文中看到...........

我的博客 : https://NingSM.github.io

轉(zhuǎn)載請注明原址,謝謝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容