spark 數(shù)據(jù)傾斜解決方案

產(chǎn)生的原因

  • shuffle操作之后,1個(gè)key有80萬數(shù)據(jù),其它key8萬,這就會(huì)導(dǎo)致某個(gè)reducetask上被分配了88萬數(shù)據(jù)執(zhí)行,兩外兩個(gè)task完成之后等待這個(gè)task完成
  • 在業(yè)務(wù)層面,產(chǎn)生的原因一般是網(wǎng)站被刷

造成影響

該作業(yè)執(zhí)行非常慢,或者直接OOM

定位問題

  • 觀察spark ui 發(fā)現(xiàn)大部分task都執(zhí)行非???,刷刷刷,剩下幾個(gè)task,執(zhí)行的特別特別慢,前面的task,一般1s可以執(zhí)行完5個(gè);最后發(fā)現(xiàn)1000個(gè)task,998,999 task,要執(zhí)行1個(gè)小時(shí),2個(gè)小時(shí)才能執(zhí)行完一個(gè)task刷刷刷,突然OOM了
  • 找代碼哪些地方有shuffle操作

解決問題思路

解決問題的本質(zhì)辦法:

  1. 預(yù)聚合,相當(dāng)于hadoop map 的 Combiner,在map端進(jìn)行預(yù)聚合
  2. 打散key,二次聚合

1、過濾異常數(shù)據(jù)

countByKey然后對(duì)這些 key 對(duì)應(yīng)的記錄進(jìn)行分析:

  • 空值或者異常值之類的,大多是這個(gè)原因引起(網(wǎng)站被刷,生產(chǎn)環(huán)境經(jīng)常遇到)
    在hue上寫spark sql,執(zhí)行l(wèi)eft join 操作,大量空值會(huì)產(chǎn)生數(shù)據(jù)傾斜,改為union ,優(yōu)化sql
  • 無效數(shù)據(jù),大量重復(fù)的測試數(shù)據(jù)或是對(duì)結(jié)果影響不大的有效數(shù)據(jù)
  • 有效數(shù)據(jù),業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布
    正對(duì)以上前兩種情況,直接過濾掉,第三種情況業(yè)務(wù)數(shù)據(jù)分布本身就傾斜,怎么辦?

2、業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布傾斜

提高 shuffle 并行度

  • Spark SQL,還可通過 SET spark.sql.shuffle.partitions=[num_tasks] 設(shè)置并行度
  • RDD 操作 可在需要 Shuffle 的操作算子上直接設(shè)置并行度或者使用 spark.default.parallelism 設(shè)置。如果是
    解決:大量不同的 Key 被分配到了相同的 Task 造成該 Task 數(shù)據(jù)量過大。

自定義 Partitioner

.groupByKey(new Partitioner() {
  @Override
  public int numPartitions() {
    return 12;
  }

  @Override
  public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
      return (id - 9500000) / 12;
    } else {
      return id % 12;
    }
  }
})

解決:使用自定義的 Partitioner 實(shí)現(xiàn)類代替默認(rèn)的 HashPartitioner,盡量將所有不同的 Key 均勻分配到不同的 Task 中。

對(duì)源數(shù)據(jù)進(jìn)行預(yù)聚合操作

  • spark sql
    執(zhí)行,優(yōu)化key(一大一小表)
    某個(gè)key對(duì)應(yīng)的80萬數(shù)據(jù),某些key對(duì)應(yīng)幾百條,某些key對(duì)應(yīng)幾十條;現(xiàn)在,咱們直接在生成hive表的hive etl中,對(duì)數(shù)據(jù)進(jìn)行聚合。比如按key來分組,將key對(duì)應(yīng)的所有的values,全部用一種特殊的格式,拼接到一個(gè)字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。
    對(duì)key進(jìn)行g(shù)roup,在spark中,拿到key=sessionid,values<Iterable>;hive etl中,直接對(duì)key進(jìn)行了聚合。那么也就意味著,每個(gè)key就只對(duì)應(yīng)一條數(shù)據(jù)。在spark中,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了。直接對(duì)每個(gè)key對(duì)應(yīng)的values字符串,map操作,進(jìn)行你需要的操作即可。key,values串。
    Spark SQL SET spark.sql.autoBroadcastJoinThreshold=10485760 (10m)可以設(shè)置為20m
  • rdd 執(zhí)行(一大一小表)
    使用廣播變量,進(jìn)行map端join,小表join大表轉(zhuǎn)為小表broadcast+map大表實(shí)現(xiàn),例如幾百M(fèi)B或者1~2GB

拆分 join 再 union 兩大表join

  • spark sql
    拆分sql優(yōu)化為union all 方式
  • rdd
    想辦法轉(zhuǎn)為大小表,過濾掉不需要的數(shù)據(jù),然后再使用broadcast+map方式
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容