Spark調(diào)優(yōu)方案

調(diào)優(yōu)的思路依賴(lài)平時(shí)工作中不斷總結(jié)所形成的豐富經(jīng)驗(yàn)。而這些是很難直接從知識(shí)文檔中獲取的,應(yīng)當(dāng)具體問(wèn)題具體分析,本文對(duì)Spark調(diào)優(yōu)進(jìn)行歸納總結(jié),縮短了你摸爬滾打的時(shí)間。

常規(guī)調(diào)優(yōu)

  1. 并行度調(diào)節(jié)
    理想的并行度設(shè)置,應(yīng)該是讓并行度與資源相匹配,一般來(lái)說(shuō),task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2-3倍,task總數(shù)量盡量使并行執(zhí)行partition上限的3-5倍,可以避免task數(shù)量過(guò)少,Executor分配過(guò)多CPU core所造成的資源浪費(fèi)。
  2. 廣播變量
    如果多個(gè)Executor都要使用同一數(shù)據(jù),每一個(gè)Executor都要去Driver端多次讀取會(huì)浪費(fèi)大量資源,此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中嘗試獲取變量,如果本地沒(méi)有,BlockManager就會(huì)從Driver或者其他節(jié)點(diǎn)的BlockManager上遠(yuǎn)程拉取變量的副本,并由本地的BlockManager進(jìn)行管理;之后此Executor的所有task都會(huì)直接從本地的BlockManager中獲取變量。
  3. Kryo序列化
    默認(rèn)情況下,Spark使用Java的序列化機(jī)制。Java的序列化機(jī)制使用方便,不需要額外的配置,在算子中使用的變量實(shí)現(xiàn)Serializable接口即可,但是,Java序列化機(jī)制的效率不高,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大。
    Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右,Spark之所以沒(méi)有默認(rèn)使用Kryo作為序列化類(lèi)庫(kù),是因?yàn)樗恢С炙袑?duì)象的序列化,同時(shí)Kryo需要用戶(hù)在使用前注冊(cè)需要序列化的類(lèi)型,不夠方便,但從Spark 2.0.0版本開(kāi)始,簡(jiǎn)單類(lèi)型、簡(jiǎn)單類(lèi)型數(shù)組、字符串類(lèi)型的Shuffling RDDs 已經(jīng)默認(rèn)使用Kryo序列化方式了。
  4. 調(diào)節(jié)本地等待時(shí)長(zhǎng)
    根據(jù)Spark的task分配算法,Spark希望task能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn)上,但是這些節(jié)點(diǎn)可用的資源可能已經(jīng)用盡,此時(shí)Spark會(huì)等待一段時(shí)間,默認(rèn)3s,如果等待指定時(shí)間后仍然無(wú)法在指定節(jié)點(diǎn)運(yùn)行,那么會(huì)自動(dòng)降級(jí),嘗試將task分配到比較差的本地化級(jí)別所對(duì)應(yīng)的節(jié)點(diǎn)上,比如將task分配到離它要計(jì)算的數(shù)據(jù)比較近的一個(gè)節(jié)點(diǎn),然后進(jìn)行計(jì)算,如果當(dāng)前級(jí)別仍然不行,那么繼續(xù)降級(jí)。
  5. RDD持久化緩存

算子調(diào)優(yōu)

  1. mapPartitions/foreachPartition
    mapPartitions和foreachPartition算子針對(duì)每個(gè)分區(qū)只進(jìn)行一次操作,相比map算子對(duì)所有數(shù)據(jù)都要執(zhí)行一次操作,效率更高,但是如果數(shù)據(jù)量比較大的時(shí)候,一旦內(nèi)存不足,容易出現(xiàn)OOM,也就是內(nèi)存溢出。
  2. filter與coalesce的配合使用
    通常filter之后,每個(gè)分區(qū)內(nèi)數(shù)據(jù)不一致,即數(shù)據(jù)傾斜,如果還按照之前每個(gè)partition分配的task數(shù),就會(huì)出現(xiàn)運(yùn)算速度的差異。
    這個(gè)時(shí)候我們可以對(duì)數(shù)據(jù)進(jìn)行重新分區(qū),如果是分區(qū)合并,最好采用coalesce算子;如果是分區(qū)分解,采用repartition算子。
  3. repartition解決SparkSQL低并行度問(wèn)題
    SparkSQL的并行度不允許用戶(hù)自己指定,所以前面所說(shuō)的并行度調(diào)節(jié)對(duì)SparkSQL無(wú)效,我們可以使用repertition算子,對(duì)SparkSQL查詢(xún)出來(lái)的結(jié)果重新分區(qū),stage的并行度就等于你手動(dòng)重新分區(qū)之后的值。
  4. reduceByKey本地聚合
    reduceByKey會(huì)進(jìn)行本地的map聚合,效率比groupByKey高,所有我們可以考慮將
    groupByKey替換成reduceByKey。

Shuffle調(diào)優(yōu)

  1. 調(diào)節(jié)map端緩沖區(qū)大小
    map端緩沖的默認(rèn)配置是32KB,導(dǎo)致溢寫(xiě)次數(shù)過(guò)多,對(duì)性能影響比較大,適當(dāng)增大map端緩沖區(qū)。
  2. 調(diào)節(jié)reduce端拉取數(shù)據(jù)緩沖區(qū)的大小
    Spark Shuffle過(guò)程中,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量,適當(dāng)增加緩沖區(qū)大小,可以減
    少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。
  3. 增加reduce端拉取數(shù)據(jù)重試次數(shù)
    Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試。建議增加重試最大次數(shù),可以大幅提升穩(wěn)定性。
  4. 調(diào)節(jié)reduce端拉取數(shù)據(jù)等待間隔
    Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試,在一次失敗后,會(huì)等待一定的時(shí)間間隔再進(jìn)行重試,可以通過(guò)加大間隔時(shí)長(zhǎng),以增加shuffle操作的穩(wěn)定性。
  5. 調(diào)節(jié)SortShuffle排序操作閾值
    對(duì)于SortShuffleManager,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過(guò)程中不會(huì)進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫(xiě)數(shù)據(jù),但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤(pán)文件都合并成一個(gè)文件,并會(huì)創(chuàng)建單獨(dú)的索引文件。當(dāng)你使用SortShuffleManager時(shí),如果的確不需要排序操作,那么建議將這個(gè)參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量,那么此時(shí)map-side就不會(huì)進(jìn)行排序了,減少了排序的性能開(kāi)銷(xiāo),但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤(pán)文件,因此shuffle write性能有待提高。

Spark數(shù)據(jù)傾斜

  1. 聚合原始數(shù)據(jù)
    如果Spark作業(yè)的數(shù)據(jù)來(lái)源于Hive表,那么可以先在Hive表中對(duì)數(shù)據(jù)進(jìn)行聚合,比如說(shuō)將同一key對(duì)應(yīng)的所有value用一種特殊的格式拼接到一個(gè)字符串里去,這樣,一個(gè)key就只有一條數(shù)據(jù)了;之后,對(duì)一個(gè)key的所有value進(jìn)行處理時(shí),只需要進(jìn)行map操作即可,無(wú)需再進(jìn)行任何的shuffle操作。通過(guò)上述方式就避免了執(zhí)行shuffle操作,也就不可能會(huì)發(fā)生任何的數(shù)據(jù)傾斜問(wèn)題。
    還可以通過(guò)增大粒度的方式,減少key的數(shù)量,key之間的數(shù)據(jù)量差異也有可
    能會(huì)減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問(wèn)題。
  2. 提高shuffle操作中的reduce并行度
    增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來(lái)更少的數(shù)據(jù),在一定程度上緩解數(shù)據(jù)傾斜。
  3. 使用隨機(jī)key實(shí)現(xiàn)雙重聚合
    通過(guò)map算子給每個(gè)數(shù)據(jù)的key添加隨機(jī)數(shù)前綴,將原先一樣的key變成不一樣的key,然后進(jìn)行第一次聚合,這樣就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合;隨后,去除掉每個(gè)key的前綴,再次進(jìn)行聚合。
  4. 將reducejoin轉(zhuǎn)換為mapjoin
    將較小RDD中的數(shù)據(jù)直接通過(guò)collect算子拉取到Driver端的內(nèi)存中來(lái),然后對(duì)其創(chuàng)建一個(gè)Broadcast變量;接著對(duì)另外一個(gè)RDD執(zhí)行map類(lèi)算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對(duì),如果連接key相同的話(huà),那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來(lái)。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • spark-submit的時(shí)候如何引入外部jar包 在通過(guò)spark-submit提交任務(wù)時(shí),可以通過(guò)添加配置參數(shù)...
    博弈史密斯閱讀 3,023評(píng)論 1 14
  • 前言 繼基礎(chǔ)篇講解了每個(gè)Spark開(kāi)發(fā)人員都必須熟知的開(kāi)發(fā)調(diào)優(yōu)與資源調(diào)優(yōu)之后,本文作為《Spark性能優(yōu)化指南》的...
    Alukar閱讀 958評(píng)論 0 2
  • 1. 數(shù)據(jù)傾斜發(fā)生時(shí)的現(xiàn)象 絕大多數(shù)task執(zhí)行得都非常快,但個(gè)別task執(zhí)行極慢。比如,總共有1000個(gè)task...
    CoderJed閱讀 664評(píng)論 0 1
  • 我想念你兄弟 當(dāng)年我們嗜酒如狂 而今已各奔東西 我想念你啊兄弟 那一年我們年少 翻墻去買(mǎi)啤酒 那個(gè)周末我們吐滿(mǎn)被褥...
    心物語(yǔ)閱讀 284評(píng)論 0 1
  • 作者:詩(shī)悅女兒 冬天走了, 春天來(lái)了, 春天,好似一陣風(fēng), 吹起了楊柳的長(zhǎng)發(fā); 春天,她好似哭了, 滴滴淚珠把草兒...
    慧鎂閱讀 269評(píng)論 0 2

友情鏈接更多精彩內(nèi)容