解決Spark數(shù)據(jù)傾斜(三) 使用隨機(jī)前綴分散傾斜Key

原理

為數(shù)據(jù)量特別大的Key增加隨機(jī)前/后綴,使得原來Key相同的數(shù)據(jù)變?yōu)镵ey不相同的數(shù)據(jù),從而使傾斜的數(shù)據(jù)集分散到不同的Task中,徹底解決數(shù)據(jù)傾斜問題。Join另一則的數(shù)據(jù)中,與傾斜Key對應(yīng)的部分?jǐn)?shù)據(jù),與隨機(jī)前綴集作笛卡爾乘積,從而保證無論數(shù)據(jù)傾斜側(cè)傾斜Key如何加前綴,都能與之正常Join。

image

案例

通過如下SQL,將id為9億到9.08億共800萬條數(shù)據(jù)的id轉(zhuǎn)為9500048或者9500096,其它數(shù)據(jù)的id除以100取整。從而該數(shù)據(jù)集中,id為9500048和9500096的數(shù)據(jù)各400萬,其它id對應(yīng)的數(shù)據(jù)記錄數(shù)均為100條。這些數(shù)據(jù)存于名為test的表中。

對于另外一張小表test_new,取出50萬條數(shù)據(jù),并將id(遞增且唯一)除以100取整,使得所有id都對應(yīng)100條數(shù)據(jù)。

INSERT OVERWRITE TABLE test

SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )

ELSE CAST(id/100 AS INT) END AS STRING),

name

FROM student_external

WHERE id BETWEEN 900000000 AND 1050000000;

INSERT OVERWRITE TABLE test_new

SELECT CAST(CAST(id/100 AS INT) AS STRING),

name

FROM student_delta_external

WHERE id BETWEEN 950000000 AND 950500000;

通過如下代碼(具體代碼請點(diǎn)擊“閱讀原文”),讀取test表對應(yīng)的文件夾內(nèi)的數(shù)據(jù)并轉(zhuǎn)換為JavaPairRDD存于leftRDD中,同樣讀取test表對應(yīng)的數(shù)據(jù)存于rightRDD中。通過RDD的join算子對leftRDD與rightRDD進(jìn)行Join,并指定并行度為48。

image

從下圖可看出,整個(gè)Join耗時(shí)1分54秒,其中Join Stage耗時(shí)1.7分鐘。

image

通過分析Join Stage的所有Task可知,在其它Task所處理記錄數(shù)為192.71萬的同時(shí)Task 32的處理的記錄數(shù)為992.72萬,故它耗時(shí)為1.7分鐘,遠(yuǎn)高于其它Task的約10秒。這與上文準(zhǔn)備數(shù)據(jù)集時(shí),將id為9500048為9500096對應(yīng)的數(shù)據(jù)量設(shè)置非常大,其它id對應(yīng)的數(shù)據(jù)集非常均勻相符合。

image

現(xiàn)通過如下操作,實(shí)現(xiàn)傾斜Key的分散處理

  • 將leftRDD中傾斜的key(即9500048與9500096)對應(yīng)的數(shù)據(jù)單獨(dú)過濾出來,且加上1到24的隨機(jī)前綴,并將前綴與原數(shù)據(jù)用逗號分隔(以方便之后去掉前綴)形成單獨(dú)的leftSkewRDD

  • 將rightRDD中傾斜key對應(yīng)的數(shù)據(jù)抽取出來,并通過flatMap操作將該數(shù)據(jù)集中每條數(shù)據(jù)均轉(zhuǎn)換為24條數(shù)據(jù)(每條分別加上1到24的隨機(jī)前綴),形成單獨(dú)的rightSkewRDD

  • 將leftSkewRDD與rightSkewRDD進(jìn)行Join,并將并行度設(shè)置為48,且在Join過程中將隨機(jī)前綴去掉,得到傾斜數(shù)據(jù)集的Join結(jié)果skewedJoinRDD

  • 將leftRDD中不包含傾斜Key的數(shù)據(jù)抽取出來作為單獨(dú)的leftUnSkewRDD

  • 對leftUnSkewRDD與原始的rightRDD進(jìn)行Join,并行度也設(shè)置為48,得到Join結(jié)果unskewedJoinRDD

  • 通過union算子將skewedJoinRDD與unskewedJoinRDD進(jìn)行合并,從而得到完整的Join結(jié)果集

具體實(shí)現(xiàn)代碼如下。(具體代碼請點(diǎn)擊“閱讀原文”)

image

從下圖可看出,整個(gè)Join耗時(shí)58秒,其中Join Stage耗時(shí)33秒。

image

通過分析Join Stage的所有Task可知

  • 由于Join分傾斜數(shù)據(jù)集Join和非傾斜數(shù)據(jù)集Join,而各Join的并行度均為48,故總的并行度為96

  • 由于提交任務(wù)時(shí),設(shè)置的Executor個(gè)數(shù)為4,每個(gè)Executor的core數(shù)為12,故可用Core數(shù)為48,所以前48個(gè)Task同時(shí)啟動(其Launch時(shí)間相同),后48個(gè)Task的啟動時(shí)間各不相同(等待前面的Task結(jié)束才開始)

  • 由于傾斜Key被加上隨機(jī)前綴,原本相同的Key變?yōu)椴煌腒ey,被分散到不同的Task處理,故在所有Task中,未發(fā)現(xiàn)所處理數(shù)據(jù)集明顯高于其它Task的情況

image

實(shí)際上,由于傾斜Key與非傾斜Key的操作完全獨(dú)立,可并行進(jìn)行。而本實(shí)驗(yàn)受限于可用總核數(shù)為48,可同時(shí)運(yùn)行的總Task數(shù)為48,故而該方案只是將總耗時(shí)減少一半(效率提升一倍)。如果資源充足,可并發(fā)執(zhí)行Task數(shù)增多,該方案的優(yōu)勢將更為明顯。在實(shí)際項(xiàng)目中,該方案往往可提升數(shù)倍至10倍的效率。

總結(jié)

適用場景

兩張表都比較大,無法使用Map則Join。其中一個(gè)RDD有少數(shù)幾個(gè)Key的數(shù)據(jù)量過大,另外一個(gè)RDD的Key分布較為均勻。

解決方案

將有數(shù)據(jù)傾斜的RDD中傾斜Key對應(yīng)的數(shù)據(jù)集單獨(dú)抽取出來加上隨機(jī)前綴,另外一個(gè)RDD每條數(shù)據(jù)分別與隨機(jī)前綴結(jié)合形成新的RDD(相當(dāng)于將其數(shù)據(jù)增到到原來的N倍,N即為隨機(jī)前綴的總個(gè)數(shù)),然后將二者Join并去掉前綴。然后將不包含傾斜Key的剩余數(shù)據(jù)進(jìn)行Join。最后將兩次Join的結(jié)果集通過union合并,即可得到全部Join結(jié)果。

優(yōu)勢

相對于Map則Join,更能適應(yīng)大數(shù)據(jù)集的Join。如果資源充足,傾斜部分?jǐn)?shù)據(jù)集與非傾斜部分?jǐn)?shù)據(jù)集可并行進(jìn)行,效率提升明顯。且只針對傾斜部分的數(shù)據(jù)做數(shù)據(jù)擴(kuò)展,增加的資源消耗有限。

劣勢

如果傾斜Key非常多,則另一側(cè)數(shù)據(jù)膨脹非常大,此方案不適用。而且此時(shí)對傾斜Key與非傾斜Key分開處理,需要掃描數(shù)據(jù)集兩遍,增加了開銷。

關(guān)注我的公眾號,后臺回復(fù)【JAVAPDF】獲取200頁面試題!

5萬人關(guān)注的大數(shù)據(jù)成神之路,不來了解一下嗎?

5萬人關(guān)注的大數(shù)據(jù)成神之路,真的不來了解一下嗎?

5萬人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來了解一下嗎?

歡迎您關(guān)注《大數(shù)據(jù)成神之路》

[圖片上傳失敗...(image-1115b7-1593177454094)])

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容