產(chǎn)生的原因
- shuffle操作之后,1個(gè)key有80萬數(shù)據(jù),其它key8萬,這就會(huì)導(dǎo)致某個(gè)reducetask上被分配了88萬數(shù)據(jù)執(zhí)行,兩外兩個(gè)task完成之后等待這個(gè)task完成
- 在業(yè)務(wù)層面,產(chǎn)生的原因一般是網(wǎng)站被刷
造成影響
該作業(yè)執(zhí)行非常慢,或者直接OOM
定位問題
- 觀察spark ui 發(fā)現(xiàn)大部分task都執(zhí)行非???,刷刷刷,剩下幾個(gè)task,執(zhí)行的特別特別慢,前面的task,一般1s可以執(zhí)行完5個(gè);最后發(fā)現(xiàn)1000個(gè)task,998,999 task,要執(zhí)行1個(gè)小時(shí),2個(gè)小時(shí)才能執(zhí)行完一個(gè)task刷刷刷,突然OOM了
- 找代碼哪些地方有shuffle操作
解決問題思路
解決問題的本質(zhì)辦法:
- 預(yù)聚合,相當(dāng)于hadoop map 的 Combiner,在map端進(jìn)行預(yù)聚合
- 打散key,二次聚合
1、過濾異常數(shù)據(jù)
countByKey然后對(duì)這些 key 對(duì)應(yīng)的記錄進(jìn)行分析:
- 空值或者異常值之類的,大多是這個(gè)原因引起(網(wǎng)站被刷,生產(chǎn)環(huán)境經(jīng)常遇到)
在hue上寫spark sql,執(zhí)行l(wèi)eft join 操作,大量空值會(huì)產(chǎn)生數(shù)據(jù)傾斜,改為union ,優(yōu)化sql - 無效數(shù)據(jù),大量重復(fù)的測試數(shù)據(jù)或是對(duì)結(jié)果影響不大的有效數(shù)據(jù)
- 有效數(shù)據(jù),業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布
正對(duì)以上前兩種情況,直接過濾掉,第三種情況業(yè)務(wù)數(shù)據(jù)分布本身就傾斜,怎么辦?
2、業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布傾斜
提高 shuffle 并行度
- Spark SQL,還可通過
SET spark.sql.shuffle.partitions=[num_tasks]設(shè)置并行度 - RDD 操作 可在需要 Shuffle 的操作算子上直接設(shè)置并行度或者使用 spark.default.parallelism 設(shè)置。如果是
解決:大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過大。
自定義 Partitioner
.groupByKey(new Partitioner() {
@Override
public int numPartitions() {
return 12;
}
@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})
解決:使用自定義的 Partitioner 實(shí)現(xiàn)類代替默認(rèn)的 HashPartitioner,盡量將所有不同的 Key 均勻分配到不同的 Task 中。
對(duì)源數(shù)據(jù)進(jìn)行預(yù)聚合操作
- spark sql
執(zhí)行,優(yōu)化key(一大一小表)
某個(gè)key對(duì)應(yīng)的80萬數(shù)據(jù),某些key對(duì)應(yīng)幾百條,某些key對(duì)應(yīng)幾十條;現(xiàn)在,咱們直接在生成hive表的hive etl中,對(duì)數(shù)據(jù)進(jìn)行聚合。比如按key來分組,將key對(duì)應(yīng)的所有的values,全部用一種特殊的格式,拼接到一個(gè)字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。
對(duì)key進(jìn)行g(shù)roup,在spark中,拿到key=sessionid,values<Iterable>;hive etl中,直接對(duì)key進(jìn)行了聚合。那么也就意味著,每個(gè)key就只對(duì)應(yīng)一條數(shù)據(jù)。在spark中,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了。直接對(duì)每個(gè)key對(duì)應(yīng)的values字符串,map操作,進(jìn)行你需要的操作即可。key,values串。
Spark SQLSET spark.sql.autoBroadcastJoinThreshold=10485760 (10m)可以設(shè)置為20m - rdd 執(zhí)行(一大一小表)
使用廣播變量,進(jìn)行map端join,小表join大表轉(zhuǎn)為小表broadcast+map大表實(shí)現(xiàn),例如幾百M(fèi)B或者1~2GB
拆分 join 再 union 兩大表join
- spark sql
拆分sql優(yōu)化為union all 方式 - rdd
想辦法轉(zhuǎn)為大小表,過濾掉不需要的數(shù)據(jù),然后再使用broadcast+map方式