調(diào)優(yōu)的思路依賴(lài)平時(shí)工作中不斷總結(jié)所形成的豐富經(jīng)驗(yàn)。而這些是很難直接從知識(shí)文檔中獲取的,應(yīng)當(dāng)具體問(wèn)題具體分析,本文對(duì)Spark調(diào)優(yōu)進(jìn)行歸納總結(jié),縮短了你摸爬滾打的時(shí)間。
常規(guī)調(diào)優(yōu)
- 并行度調(diào)節(jié)
理想的并行度設(shè)置,應(yīng)該是讓并行度與資源相匹配,一般來(lái)說(shuō),task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2-3倍,task總數(shù)量盡量使并行執(zhí)行partition上限的3-5倍,可以避免task數(shù)量過(guò)少,Executor分配過(guò)多CPU core所造成的資源浪費(fèi)。 - 廣播變量
如果多個(gè)Executor都要使用同一數(shù)據(jù),每一個(gè)Executor都要去Driver端多次讀取會(huì)浪費(fèi)大量資源,此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中嘗試獲取變量,如果本地沒(méi)有,BlockManager就會(huì)從Driver或者其他節(jié)點(diǎn)的BlockManager上遠(yuǎn)程拉取變量的副本,并由本地的BlockManager進(jìn)行管理;之后此Executor的所有task都會(huì)直接從本地的BlockManager中獲取變量。 - Kryo序列化
默認(rèn)情況下,Spark使用Java的序列化機(jī)制。Java的序列化機(jī)制使用方便,不需要額外的配置,在算子中使用的變量實(shí)現(xiàn)Serializable接口即可,但是,Java序列化機(jī)制的效率不高,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大。
Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右,Spark之所以沒(méi)有默認(rèn)使用Kryo作為序列化類(lèi)庫(kù),是因?yàn)樗恢С炙袑?duì)象的序列化,同時(shí)Kryo需要用戶(hù)在使用前注冊(cè)需要序列化的類(lèi)型,不夠方便,但從Spark 2.0.0版本開(kāi)始,簡(jiǎn)單類(lèi)型、簡(jiǎn)單類(lèi)型數(shù)組、字符串類(lèi)型的Shuffling RDDs 已經(jīng)默認(rèn)使用Kryo序列化方式了。 - 調(diào)節(jié)本地等待時(shí)長(zhǎng)
根據(jù)Spark的task分配算法,Spark希望task能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn)上,但是這些節(jié)點(diǎn)可用的資源可能已經(jīng)用盡,此時(shí)Spark會(huì)等待一段時(shí)間,默認(rèn)3s,如果等待指定時(shí)間后仍然無(wú)法在指定節(jié)點(diǎn)運(yùn)行,那么會(huì)自動(dòng)降級(jí),嘗試將task分配到比較差的本地化級(jí)別所對(duì)應(yīng)的節(jié)點(diǎn)上,比如將task分配到離它要計(jì)算的數(shù)據(jù)比較近的一個(gè)節(jié)點(diǎn),然后進(jìn)行計(jì)算,如果當(dāng)前級(jí)別仍然不行,那么繼續(xù)降級(jí)。 - RDD持久化緩存
算子調(diào)優(yōu)
- mapPartitions/foreachPartition
mapPartitions和foreachPartition算子針對(duì)每個(gè)分區(qū)只進(jìn)行一次操作,相比map算子對(duì)所有數(shù)據(jù)都要執(zhí)行一次操作,效率更高,但是如果數(shù)據(jù)量比較大的時(shí)候,一旦內(nèi)存不足,容易出現(xiàn)OOM,也就是內(nèi)存溢出。 - filter與coalesce的配合使用
通常filter之后,每個(gè)分區(qū)內(nèi)數(shù)據(jù)不一致,即數(shù)據(jù)傾斜,如果還按照之前每個(gè)partition分配的task數(shù),就會(huì)出現(xiàn)運(yùn)算速度的差異。
這個(gè)時(shí)候我們可以對(duì)數(shù)據(jù)進(jìn)行重新分區(qū),如果是分區(qū)合并,最好采用coalesce算子;如果是分區(qū)分解,采用repartition算子。 - repartition解決SparkSQL低并行度問(wèn)題
SparkSQL的并行度不允許用戶(hù)自己指定,所以前面所說(shuō)的并行度調(diào)節(jié)對(duì)SparkSQL無(wú)效,我們可以使用repertition算子,對(duì)SparkSQL查詢(xún)出來(lái)的結(jié)果重新分區(qū),stage的并行度就等于你手動(dòng)重新分區(qū)之后的值。 - reduceByKey本地聚合
reduceByKey會(huì)進(jìn)行本地的map聚合,效率比groupByKey高,所有我們可以考慮將
groupByKey替換成reduceByKey。
Shuffle調(diào)優(yōu)
- 調(diào)節(jié)map端緩沖區(qū)大小
map端緩沖的默認(rèn)配置是32KB,導(dǎo)致溢寫(xiě)次數(shù)過(guò)多,對(duì)性能影響比較大,適當(dāng)增大map端緩沖區(qū)。 - 調(diào)節(jié)reduce端拉取數(shù)據(jù)緩沖區(qū)的大小
Spark Shuffle過(guò)程中,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量,適當(dāng)增加緩沖區(qū)大小,可以減
少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。 - 增加reduce端拉取數(shù)據(jù)重試次數(shù)
Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試。建議增加重試最大次數(shù),可以大幅提升穩(wěn)定性。 - 調(diào)節(jié)reduce端拉取數(shù)據(jù)等待間隔
Spark Shuffle過(guò)程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試,在一次失敗后,會(huì)等待一定的時(shí)間間隔再進(jìn)行重試,可以通過(guò)加大間隔時(shí)長(zhǎng),以增加shuffle操作的穩(wěn)定性。 - 調(diào)節(jié)SortShuffle排序操作閾值
對(duì)于SortShuffleManager,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過(guò)程中不會(huì)進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫(xiě)數(shù)據(jù),但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤(pán)文件都合并成一個(gè)文件,并會(huì)創(chuàng)建單獨(dú)的索引文件。當(dāng)你使用SortShuffleManager時(shí),如果的確不需要排序操作,那么建議將這個(gè)參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量,那么此時(shí)map-side就不會(huì)進(jìn)行排序了,減少了排序的性能開(kāi)銷(xiāo),但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤(pán)文件,因此shuffle write性能有待提高。
Spark數(shù)據(jù)傾斜
- 聚合原始數(shù)據(jù)
如果Spark作業(yè)的數(shù)據(jù)來(lái)源于Hive表,那么可以先在Hive表中對(duì)數(shù)據(jù)進(jìn)行聚合,比如說(shuō)將同一key對(duì)應(yīng)的所有value用一種特殊的格式拼接到一個(gè)字符串里去,這樣,一個(gè)key就只有一條數(shù)據(jù)了;之后,對(duì)一個(gè)key的所有value進(jìn)行處理時(shí),只需要進(jìn)行map操作即可,無(wú)需再進(jìn)行任何的shuffle操作。通過(guò)上述方式就避免了執(zhí)行shuffle操作,也就不可能會(huì)發(fā)生任何的數(shù)據(jù)傾斜問(wèn)題。
還可以通過(guò)增大粒度的方式,減少key的數(shù)量,key之間的數(shù)據(jù)量差異也有可
能會(huì)減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問(wèn)題。 - 提高shuffle操作中的reduce并行度
增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來(lái)更少的數(shù)據(jù),在一定程度上緩解數(shù)據(jù)傾斜。 - 使用隨機(jī)key實(shí)現(xiàn)雙重聚合
通過(guò)map算子給每個(gè)數(shù)據(jù)的key添加隨機(jī)數(shù)前綴,將原先一樣的key變成不一樣的key,然后進(jìn)行第一次聚合,這樣就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合;隨后,去除掉每個(gè)key的前綴,再次進(jìn)行聚合。 - 將reducejoin轉(zhuǎn)換為mapjoin
將較小RDD中的數(shù)據(jù)直接通過(guò)collect算子拉取到Driver端的內(nèi)存中來(lái),然后對(duì)其創(chuàng)建一個(gè)Broadcast變量;接著對(duì)另外一個(gè)RDD執(zhí)行map類(lèi)算子,在算子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對(duì),如果連接key相同的話(huà),那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來(lái)。