Spark 數(shù)據(jù)傾斜解決方案

Spark中的數(shù)據(jù)傾斜問題主要指shuffle過程中出現(xiàn)的數(shù)據(jù)傾斜問題,是由于不同的key對應(yīng)的數(shù)據(jù)量不同導(dǎo)致的不同task所處理的數(shù)據(jù)量不同的問題。

例如,reduce點(diǎn)一共要處理100萬條數(shù)據(jù),第一個(gè)和第二個(gè)task分別被分配到了1萬條數(shù)據(jù),計(jì)算5分鐘內(nèi)完成,第三個(gè)task分配到了98萬數(shù)據(jù),此時(shí)第三個(gè)task可能需要10個(gè)小時(shí)完成,這使得整個(gè)Spark作業(yè)需要10個(gè)小時(shí)才能運(yùn)行完成,這就是數(shù)據(jù)傾斜所帶來的后果。

注意,要區(qū)分開數(shù)據(jù)傾斜與數(shù)據(jù)量過量這兩種情況,數(shù)據(jù)傾斜是指少數(shù)task被分配了絕大多數(shù)的數(shù)據(jù),因此少數(shù)task運(yùn)行緩慢;數(shù)據(jù)過量是指所有task被分配的數(shù)據(jù)量都很大,相差不多,所有task都運(yùn)行緩慢。

數(shù)據(jù)傾斜的表現(xiàn):

  1. Spark作業(yè)的大部分task都執(zhí)行迅速,只有有限的幾個(gè)task執(zhí)行的非常慢,此時(shí)可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)可以運(yùn)行,但是運(yùn)行得非常慢;
  2. Spark作業(yè)的大部分task都執(zhí)行迅速,但是有的task在運(yùn)行過程中會突然報(bào)出OOM,反復(fù)執(zhí)行幾次都在某一個(gè)task報(bào)出OOM錯(cuò)誤,此時(shí)可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)無法正常運(yùn)行。

定位數(shù)據(jù)傾斜問題:

  1. 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據(jù)代碼邏輯判斷此處是否會出現(xiàn)數(shù)據(jù)傾斜;
  2. 查看Spark作業(yè)的log文件,log文件對于錯(cuò)誤的記錄會精確到代碼的某一行,可以根據(jù)異常定位到的代碼位置來明確錯(cuò)誤發(fā)生在第幾個(gè)stage,對應(yīng)的shuffle算子是哪一個(gè);

數(shù)據(jù)傾斜解決方案

1. 聚合原數(shù)據(jù)

(1) 避免shuffle過程

絕大多數(shù)情況下,Spark作業(yè)的數(shù)據(jù)來源都是Hive表,這些Hive表基本都是經(jīng)過ETL之后的昨天的數(shù)據(jù)。

為了避免數(shù)據(jù)傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那么從根本上就消除了發(fā)生數(shù)據(jù)傾斜問題的可能。

如果Spark作業(yè)的數(shù)據(jù)來源于Hive表,那么可以先在Hive表中對數(shù)據(jù)進(jìn)行聚合,例如按照key進(jìn)行分組,將同一key對應(yīng)的所有value用一種特殊的格式拼接到一個(gè)字符串里去,這樣,一個(gè)key就只有一條數(shù)據(jù)了;之后,對一個(gè)key的所有value進(jìn)行處理時(shí),只需要進(jìn)行map操作即可,無需再進(jìn)行任何的shuffle操作。

通過上述方式就避免了執(zhí)行shuffle操作,也就不可能會發(fā)生任何的數(shù)據(jù)傾斜問題。

對于Hive表中數(shù)據(jù)的操作,不一定是拼接成一個(gè)字符串,也可以是直接對key的每一條數(shù)據(jù)進(jìn)行累計(jì)計(jì)算。要區(qū)分開,處理的數(shù)據(jù)量大和數(shù)據(jù)傾斜的區(qū)別。

(2) 縮小key粒度(增大數(shù)據(jù)傾斜可能性,降低每個(gè)task的數(shù)據(jù)量)
key的數(shù)量增加,可能使數(shù)據(jù)傾斜更嚴(yán)重。

(3) 增大key粒度(減小數(shù)據(jù)傾斜可能性,增大每個(gè)task的數(shù)據(jù)量)
如果沒有辦法對每個(gè)key聚合出來一條數(shù)據(jù),在特定場景下,可以考慮擴(kuò)大key的聚合粒度。

例如,目前有10萬條用戶數(shù)據(jù),當(dāng)前key的粒度是(省,城市,區(qū),日期),現(xiàn)在我們考慮擴(kuò)大粒度,將key的粒度擴(kuò)大為(省,城市,日期),這樣的話,key的數(shù)量會減少,key之間的數(shù)據(jù)量差異也有可能會減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問題。(此方法只針對特定類型的數(shù)據(jù)有效,當(dāng)應(yīng)用場景不適宜時(shí),會加重?cái)?shù)據(jù)傾斜)。

2. 過濾導(dǎo)致傾斜的key

如果在Spark作業(yè)中允許丟棄某些數(shù)據(jù),那么可以考慮將可能導(dǎo)致數(shù)據(jù)傾斜的key進(jìn)行過濾,濾除可能導(dǎo)致數(shù)據(jù)傾斜的key對應(yīng)的數(shù)據(jù),這樣,在Spark作業(yè)中就不會發(fā)生數(shù)據(jù)傾斜了。

3. 提高shuffle操作中的reduce并行度

當(dāng)方案一和方案二對于數(shù)據(jù)傾斜的處理沒有很好的效果時(shí),可以考慮提高shuffle過程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的數(shù)量,那么每個(gè)task分配到的數(shù)據(jù)量就會相應(yīng)減少,由此緩解數(shù)據(jù)傾斜問題。

  1. reduce端并行度的設(shè)置

在大部分的shuffle算子中,都可以傳入一個(gè)并行度的設(shè)置參數(shù),比如reduceByKey(500),這個(gè)參數(shù)會決定shuffle過程中reduce端的并行度,在進(jìn)行shuffle操作的時(shí)候,就會對應(yīng)著創(chuàng)建指定數(shù)量的reduce task。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設(shè)置一個(gè)參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200,對于很多場景來說都有點(diǎn)過小。

增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來更少的數(shù)據(jù)。舉例來說,如果原本有5個(gè)key,每個(gè)key對應(yīng)10條數(shù)據(jù),這5個(gè)key都是分配給一個(gè)task的,那么這個(gè)task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個(gè)task就分配到一個(gè)key,即每個(gè)task就處理10條數(shù)據(jù),那么自然每個(gè)task的執(zhí)行時(shí)間都會變短了。

  1. reduce端并行度設(shè)置存在的缺陷
    提高reduce端并行度并沒有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題(方案一和方案二從根本上避免了數(shù)據(jù)傾斜的發(fā)生),只是盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力,以及數(shù)據(jù)傾斜的問題,適用于有較多key對應(yīng)的數(shù)據(jù)量都比較大的情況。

該方案通常無法徹底解決數(shù)據(jù)傾斜,因?yàn)槿绻霈F(xiàn)一些極端情況,比如某個(gè)key對應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個(gè)對應(yīng)著100萬數(shù)據(jù)的key肯定還是會分配到一個(gè)task中去處理,因此注定還是會發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時(shí)嘗試使用的第一種手段,嘗試去用最簡單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。

在理想情況下,reduce端并行度提升后,會在一定程度上減輕數(shù)據(jù)傾斜的問題,甚至基本消除數(shù)據(jù)傾斜;但是,在一些情況下,只會讓原來由于數(shù)據(jù)傾斜而運(yùn)行緩慢的task運(yùn)行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運(yùn)行緩慢,此時(shí),要及時(shí)放棄方案三,開始嘗試后面的方案。

4. 使用隨機(jī)key實(shí)現(xiàn)雙重聚合

當(dāng)使用了類似于groupByKey、reduceByKey這樣的算子時(shí),可以考慮使用隨機(jī)key實(shí)現(xiàn)雙重聚合。

首先,通過map算子給每個(gè)數(shù)據(jù)的key添加隨機(jī)數(shù)前綴,對key進(jìn)行打散,將原先一樣的key變成不一樣的key,然后進(jìn)行第一次聚合,這樣就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合;隨后,去除掉每個(gè)key的前綴,再次進(jìn)行聚合。

此方法對于由groupByKey、reduceByKey這類算子造成的數(shù)據(jù)傾斜由比較好的效果,僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

5. 將reduce join轉(zhuǎn)換為map join

正常情況下,join操作都會執(zhí)行shuffle過程,并且執(zhí)行的是reduce join,也就是先將所有相同的key和對應(yīng)的value匯聚到一個(gè)reduce task中,然后再進(jìn)行join。

普通的join是會走shuffle過程的,而一旦shuffle,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個(gè)shuffle read task中再進(jìn)行join,此時(shí)就是reduce join。但是如果一個(gè)RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實(shí)現(xiàn)與join同樣的效果,也就是map join,此時(shí)就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。
(注意,RDD是并不能進(jìn)行廣播的,只能將RDD內(nèi)部的數(shù)據(jù)通過collect拉取到Driver內(nèi)存然后再進(jìn)行廣播)

  • 核心思路
    不使用join算子進(jìn)行連接操作,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作,進(jìn)而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來,然后對其創(chuàng)建一個(gè)Broadcast變量;接著對另外一個(gè)RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對,如果連接key相同的話,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來。

根據(jù)上述思路,根本不會發(fā)生shuffle操作,從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜問題。

當(dāng)join操作有數(shù)據(jù)傾斜問題并且其中一個(gè)RDD的數(shù)據(jù)量較小時(shí),可以優(yōu)先考慮這種方式,效果非常好

  • 不適用場景
    由于Spark的廣播變量是在每個(gè)Executor中保存一個(gè)副本,如果兩個(gè)RDD數(shù)據(jù)量都比較大,那么如果將一個(gè)數(shù)據(jù)量比較大的 RDD做成廣播變量,那么很有可能會造成內(nèi)存溢出。

6. sample采樣對傾斜key單獨(dú)進(jìn)行join

在Spark中,如果某個(gè)RDD只有一個(gè)key,那么在shuffle過程中會默認(rèn)將此key對應(yīng)的數(shù)據(jù)打散,由不同的reduce端task進(jìn)行處理。

當(dāng)由單個(gè)key導(dǎo)致數(shù)據(jù)傾斜時(shí),可有將發(fā)生數(shù)據(jù)傾斜的key單獨(dú)提取出來,組成一個(gè)RDD,然后用這個(gè)原本會導(dǎo)致傾斜的key組成的RDD根其他RDD單獨(dú)join,此時(shí),根據(jù)Spark的運(yùn)行機(jī)制,此RDD中的數(shù)據(jù)會在shuffle階段被分散到多個(gè)task中去進(jìn)行join操作。

  • 適用場景
    對于RDD中的數(shù)據(jù),可以將其轉(zhuǎn)換為一個(gè)中間表,或者是直接使用countByKey()的方式,看一個(gè)這個(gè)RDD中各個(gè)key對應(yīng)的數(shù)據(jù)量,此時(shí)如果你發(fā)現(xiàn)整個(gè)RDD就一個(gè)key的數(shù)據(jù)量特別多,那么就可以考慮使用這種方法。當(dāng)數(shù)據(jù)量非常大時(shí),可以考慮使用sample采樣獲取10%的數(shù)據(jù),然后分析這10%的數(shù)據(jù)中哪個(gè)key可能會導(dǎo)致數(shù)據(jù)傾斜,然后將這個(gè)key對應(yīng)的數(shù)據(jù)單獨(dú)提取出來。

  • 不適用場景
    如果一個(gè)RDD中導(dǎo)致數(shù)據(jù)傾斜的key很多,那么此方案不適用。

7. 使用隨機(jī)數(shù)以及擴(kuò)容進(jìn)行join

如果在進(jìn)行join操作時(shí),RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜,那么進(jìn)行分拆key也沒什么意義,此時(shí)就只能使用最后一種方案來解決問題了,對于join操作,我們可以考慮對其中一個(gè)RDD數(shù)據(jù)進(jìn)行擴(kuò)容,另一個(gè)RDD進(jìn)行稀釋后再join。

我們會將原先一樣的key通過附加隨機(jī)前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個(gè)task中去處理,而不是讓一個(gè)task處理大量的相同key。這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進(jìn)行單獨(dú)處理,需要對整個(gè)RDD進(jìn)行數(shù)據(jù)擴(kuò)容,對內(nèi)存資源要求很高。

  • 核心思想

選擇一個(gè)RDD,使用flatMap進(jìn)行擴(kuò)容,對每條數(shù)據(jù)的key添加數(shù)值前綴(1~N的數(shù)值),將一條數(shù)據(jù)映射為多條數(shù)據(jù);(擴(kuò)容)

選擇另外一個(gè)RDD,進(jìn)行map映射操作,每條數(shù)據(jù)的key都打上一個(gè)隨機(jī)數(shù)作為前綴(1~N的隨機(jī)數(shù));(稀釋)

將兩個(gè)處理后的RDD,進(jìn)行join操作。

  • 局限性:

如果兩個(gè)RDD都很大,那么將RDD進(jìn)行N倍的擴(kuò)容顯然行不通;
使用擴(kuò)容的方式只能緩解數(shù)據(jù)傾斜,不能徹底解決數(shù)據(jù)傾斜問題。

  • 對方案六進(jìn)一步優(yōu)化分析:

當(dāng)RDD中有幾個(gè)key導(dǎo)致數(shù)據(jù)傾斜時(shí),方案六不再適用,而方案七又非常消耗資源,此時(shí)可以引入方案七的思想完善方案六:

  1. 對包含少數(shù)幾個(gè)數(shù)據(jù)量過大的key的那個(gè)RDD,通過sample算子采樣出一份樣本來,然后統(tǒng)計(jì)一下每個(gè)key的數(shù)量,計(jì)算出來數(shù)據(jù)量最大的是哪幾個(gè)key。
  2. 然后將這幾個(gè)key對應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個(gè)單獨(dú)的RDD,并給每個(gè)key都打上n以內(nèi)的隨機(jī)數(shù)作為前綴,而不會導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD。
  3. 接著將需要join的另一個(gè)RDD,也過濾出來那幾個(gè)傾斜key對應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù),這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴,不會導(dǎo)致傾斜的大部分key也形成另外一個(gè)RDD。
  4. 再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join,此時(shí)就可以將原先相同的key打散成n份,分散到多個(gè)task中去進(jìn)行join了。
  5. 而另外兩個(gè)普通的RDD就照常join即可。
  6. 最后將兩次join的結(jié)果使用union算子合并起來即可,就是最終的join結(jié)果。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容