為什么需要調(diào)優(yōu)
在大數(shù)據(jù)計算領(lǐng)域,Spark已經(jīng)成為了越來越流行、越來越受歡迎的計算平臺之一。然而,通過Spark開發(fā)出高性能的大數(shù)據(jù)計算作業(yè),并不是那么簡單的。如果沒有對Spark作業(yè)進(jìn)行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會很慢,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計算引擎的優(yōu)勢來。因此,想要用好Spark,就必須對其進(jìn)行合理的性能優(yōu)化。
Spark的性能調(diào)優(yōu)由開發(fā)調(diào)優(yōu)、資源調(diào)優(yōu)、數(shù)據(jù)傾斜調(diào)優(yōu)、shuffle調(diào)優(yōu)幾個部分組成。開發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)是所有Spark作業(yè)都需要注意和遵循的一些基本原則,是高性能Spark作業(yè)的基礎(chǔ);數(shù)據(jù)傾斜調(diào)優(yōu),主要用一套完整的用來解決Spark作業(yè)數(shù)據(jù)傾斜的解決方案;shuffle調(diào)優(yōu),面向的是對Spark的原理有較深層次掌握的開發(fā)者。
性能優(yōu)化學(xué)習(xí)
學(xué)習(xí)Spark開發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)比較好的方式是參考美團(tuán)點(diǎn)評技術(shù)團(tuán)隊的技術(shù)博客Spark性能調(diào)優(yōu)-基礎(chǔ)篇,這里已經(jīng)寫得非常全面了,學(xué)習(xí)完就可以掌握Spark性能調(diào)優(yōu)的基礎(chǔ)部分了??傮w可以分為兩個方面:
- 開發(fā)調(diào)優(yōu)
Spark性能優(yōu)化的第一步,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu),包括:RDD lineage設(shè)計、算子的合理使用、特殊操作的優(yōu)化等。在開發(fā)過程中,時時刻刻都應(yīng)該注意以上原則,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場景,靈活地運(yùn)用到自己的Spark作業(yè)中。- 避免創(chuàng)建重復(fù)的RDD
- 盡可能復(fù)用同一個RDD
- 對多次使用的RDD進(jìn)行持久化
- 盡量避免使用shuffle類算子
- 使用map-side預(yù)聚合的shuffle操作
- 使用高性能的算子
- 廣播大變量
- 使用Kryo優(yōu)化序列化性能
- 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
- 資源調(diào)優(yōu)
在開發(fā)完Spark作業(yè)之后,就該為作業(yè)配置合適的資源了。Spark的資源參數(shù),基本都可以在spark-submit命令中作為參數(shù)設(shè)置。很多Spark初學(xué)者,通常不知道該設(shè)置哪些必要的參數(shù),以及如何設(shè)置這些參數(shù),最后就只能胡亂設(shè)置,甚至壓根兒不設(shè)置。資源參數(shù)設(shè)置的不合理,可能會導(dǎo)致沒有充分利用集群資源,作業(yè)運(yùn)行會極其緩慢;或者設(shè)置的資源過大,隊列沒有足夠的資源來提供,進(jìn)而導(dǎo)致各種異常。調(diào)整的主要是一系列的資源相關(guān)參數(shù)。
所謂的Spark資源參數(shù)調(diào)優(yōu),其實(shí)主要就是對Spark運(yùn)行過程中各個使用資源的地方,通過調(diào)節(jié)各種參數(shù),來優(yōu)化資源使用的效率,從而提升Spark作業(yè)的執(zhí)行性能。- num-executors
- executor-memory
- executor-cores
- driver-memory
- spark.default.parallelism
- spark.storage.memoryFraction
- spark.shuffle.memoryFraction
性能優(yōu)化實(shí)踐
以MovieLens數(shù)據(jù)集為基礎(chǔ),完成Spark的Map-Side Join和Reduce Side Join例子(過濾出評分高于4.0分的電影,要求顯示電影ID 電影名稱 電影分?jǐn)?shù)),并比較性能優(yōu)劣。應(yīng)該如何調(diào)整不同的spark-submit參數(shù)獲得最佳效果(運(yùn)行時間),并給出基于目前的運(yùn)行環(huán)境最優(yōu)參數(shù)設(shè)置方案。
查看數(shù)據(jù)
簡單查看一下所有表的結(jié)構(gòu)才能完成目標(biāo)任務(wù)。
所有評級都包含在“ratings.dat”文件中,并且位于格式如下:
| 用戶名 | MovieID | 評級 | 時間戳 |
|---|
- UserID的范圍在1到6040之間
- MovieID的范圍在1到3952之間
- 評級為5星級(僅限全星評級)
- 時間戳以秒為單位表示
- 每個用戶至少有20個評級
用戶信息位于“users.dat”文件中,如下所示
| 用戶名 | 性別 | 年齡 | 職業(yè) | 郵政編碼 |
|---|
性別用男性表示“M”,女性表示“F”表示
-
年齡選自以下范圍:
- 1:“18歲以下”
- 18:“18-24”
- 25:“25-34”
- 35:“35-44”
- 45:“45-49”
- 50:“50-55”
- 56:“56+”
職業(yè)選自0-20的數(shù)字,分別代表不同意義(具體意義可查看官網(wǎng))
電影信息位于文件“movies.dat”中,如下所示
| MovieID | 標(biāo)題 | 流派 |
|---|
- 流派是安裝分隔符(|)分開的,關(guān)鍵字符的拼接,如(Animation|Children's|Comedy)
- 由于意外重復(fù),某些MovieID與電影不對應(yīng)
條目和/或測試條目 - 電影大多是手動輸入的,因此可能存在錯誤和不一致
所以任務(wù)就是把3個表連接起來并按條件過濾,但是不同的連接方式在性能上會出現(xiàn)極大的差距。
Reduce Side Join
- 當(dāng)兩個文件/目錄中的數(shù)據(jù)非常大,難以將某一個存放到內(nèi)存中時,Reduce-side Join是一種解決思路。該算法需要通過Map和Reduce兩個階段完成,在Map階段,將key相同的記錄劃分給同一個Reduce Task(需標(biāo)記每條記錄的來源,便于在Reduce階段合并),在Reduce階段,對key相同的進(jìn)行合并。
- reduce-side-join 的缺陷在于會將key相同的數(shù)據(jù)發(fā)送到同一個partition中進(jìn)行運(yùn)算,大數(shù)據(jù)集的傳輸需要長時間的IO,同時任務(wù)并發(fā)度收到限制,還可能造成數(shù)據(jù)傾斜。
- Spark提供了Join算子,可以直接通過該算子實(shí)現(xiàn)reduce-side join,但要求RDD中的記錄必須是pair,即RDD[KEY, VALUE]
Reduce Side Join
簡單的講就是先把集群上key相同的數(shù)據(jù)拉取到一個節(jié)點(diǎn)(shuffle操作),為每個key的數(shù)據(jù)創(chuàng)建一個task進(jìn)行join連接操作,然后再把每個key連接的結(jié)果進(jìn)行匯總。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object ReduceJoin {
def main(args: Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ratingRDD = sc.textFile(args(0)) //rating.dat表
val moviesRDD = sc.textFile(args(1)) //movies.dat表
val startTime = System.currentTimeMillis()//開始時間
val ratingPair = ratingRDD.map { x => //將數(shù)據(jù)轉(zhuǎn)化為(K,V),K為movieID,V為平均分,直接join
val temp = x.split("::") //按照原始數(shù)據(jù)格式拆分為RDD格式
(temp(1),(temp(2).toFloat,1))
}.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
x._1/x._2 //通過每部電影總分和評論的人數(shù)計算出平均分
).filter(x => x._2.toFloat >= 4.0)//過濾出分?jǐn)?shù)高于4分的電影
val moviePair = moviesRDD.map { x => //將它的類型轉(zhuǎn)化為(K,V),K為movieID,方便join操作
val temp = x.split("::")
(temp(0),temp(1))
}
//println(ratingPair.count()) //查看rating表是否成功過濾掉4.0以下的電影
//根據(jù)key(movieID)進(jìn)行連接,并將數(shù)據(jù)從KV形式格式化為原始格式
val result = moviePair.join(ratingPair).map(x => (x._1,x._2._1,x._2._2))
result.saveAsTextFile(args(2))
val endTime = System.currentTimeMillis()//結(jié)束時間
println("運(yùn)行時間(秒)"+(endTime-startTime)*0.001)
}
}
然后編寫相應(yīng)的運(yùn)行腳本,這里submit的時間隨便使用最簡單的幾個參數(shù),因為目的是對比map-side join和reduce-side join性能上的差異。
#!/bin/bash
hdfs dfs -rm -r /tmp/result
spark-submit --class ReduceJoin --master yarn-cluster /usr/tmp/untitled.jar /tmp/input/ratings.dat /tmp/input/movies.dat /tmp/result
提交任務(wù)后就可以去master的8088端口查看spark任務(wù)的執(zhí)行情況了,18088端口查看執(zhí)行記錄和詳細(xì)過程,看到Reduce-side join任務(wù)執(zhí)行情況如下:



[圖片上傳中...(image.png-94aeb0-1535712775641-0)]
這里首先需要明白job,stage,task的概念。簡單的講,我們提交一個作業(yè)到spark,spark首先根據(jù)提交作業(yè)中的action算子將作業(yè)分為若干個job。
之后對于每個job而言,Spark是根據(jù)shuffle類算子來進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,劃分出一個stage界限來??梢源笾吕斫鉃椋瑂huffle算子執(zhí)行之前的代碼會被劃分為一個stage,shuffle算子執(zhí)行以及之后的代碼會被劃分為下一個stage。
每個stage執(zhí)行一部分代碼片段,并為每個stage創(chuàng)建一批task,然后將這些task分配到各個Executor進(jìn)程中執(zhí)行。task是最小的計算單元,負(fù)責(zé)執(zhí)行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個task處理的數(shù)據(jù)不同而已。
這里只有一個job是因為只有一個action算子(savaAsTextFile),3個stage是因為reduceByKey屬于shuffle算子,還有未經(jīng)協(xié)同劃分的join也屬于shuffle算子,一起將job分成了3個stage,每個stage的2個task是因為RDD數(shù)據(jù)被存在了兩臺機(jī)器上。通過時間統(tǒng)計可以看到stage1是最消耗時間的,因為它要執(zhí)行reduceByKey的shuffle操作,會把key相同的數(shù)據(jù)集中到一個節(jié)點(diǎn),在這個時候數(shù)據(jù)是整個評論數(shù)據(jù)集。而后面求平均后過濾再join的時候數(shù)據(jù)已經(jīng)變得不是那么多了,所以這里的shuffle相對消耗時間較少(網(wǎng)絡(luò),IO少)。
Map-Side Join
- Map-side Join使用場景是一個大表和一個小表的連接操作,其中,“小表”是指文件足夠小,可以加載到內(nèi)存中。該算法可以將join算子執(zhí)行在Map端,無需經(jīng)歷shuffle和reduce等階段,因此效率非常高。
- 在Hadoop MapReduce中, map-side join是借助DistributedCache實(shí)現(xiàn)的。DistributedCache可以幫我們將小文件分發(fā)到各個節(jié)點(diǎn)的Task工作目錄下,這樣,我們只需在程序中將文件加載到內(nèi)存中(比如保存到Map數(shù)據(jù)結(jié)構(gòu)中),然后借助Mapper的迭代機(jī)制,遍歷另一個大表中的每一條記錄,并查找是否在小表中,如果在則輸出,否則跳過。
- 在Apache Spark中,同樣存在類似于DistributedCache的功能,稱為“廣播變量”(Broadcast variable)。其實(shí)現(xiàn)原理與DistributedCache非常類似,但提供了更多的數(shù)據(jù)/文件廣播算法,包括高效的P2P算法,該算法在節(jié)點(diǎn)數(shù)目非常多的場景下,效率遠(yuǎn)遠(yuǎn)好于DistributedCache這種基于HDFS共享存儲的方式。使用MapReduce DistributedCache時,用戶需要顯示地使用File API編寫程序從本地讀取小表數(shù)據(jù),而Spark則不用,它借助Scala語言強(qiáng)大的函數(shù)閉包特性,可以隱藏數(shù)據(jù)/文件廣播過程,讓用戶編寫程序更加簡單。

簡單的講就是把需要join的數(shù)據(jù)集中較小的那個數(shù)據(jù)集進(jìn)行廣播(因為在分布式系統(tǒng)應(yīng)用中,存儲數(shù)據(jù)都是用RDD對象,每個RDD對象中的數(shù)據(jù)都被劃分為多個分區(qū),每個節(jié)點(diǎn)都只持有部分分區(qū),也就是數(shù)據(jù)集的一部分,而廣播就是讓每個節(jié)點(diǎn)都持有被廣播數(shù)據(jù)的完整信息),然后在每個節(jié)點(diǎn)上(map端操作)將自己節(jié)點(diǎn)上持有的部分?jǐn)?shù)據(jù)和被廣播的表進(jìn)行連接即可。但是需要注意,因為那個小的數(shù)據(jù)集要被廣播,所以要求每個節(jié)點(diǎn)的內(nèi)存必須足夠存儲被廣播的那個數(shù)據(jù)集,不然就不能進(jìn)行map-side-join。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object MapJoin {
def main(args: Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ratingRDD = sc.textFile(args(0)) //rating.dat表
val moviesRDD = sc.textFile(args(1)) //movies.dat表
val startTime = System.currentTimeMillis()//開始時間
val ratingPair = ratingRDD.map { x => //將數(shù)據(jù)轉(zhuǎn)化為(K,V),K為movieID,V為平均分
val temp = x.split("::") //按照原始數(shù)據(jù)格式拆分為RDD格式
(temp(1),(temp(2).toFloat,1))
}.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
x._1/x._2 //通過每部電影總分和評論的人數(shù)計算出平均分
).filter(x => x._2 >= 4.0)//過濾出分?jǐn)?shù)高于4分的電影
val moviePair = moviesRDD.map { x => //將數(shù)據(jù)轉(zhuǎn)化為(K,V),K為movieID,V為電影name
val temp = x.split("::")
(temp(0),temp(1))
}.collectAsMap//保存為map 進(jìn)行廣播
var moviesBroadcast = sc.broadcast(moviePair) //將電影數(shù)據(jù)集廣播,使每個節(jié)點(diǎn)都有一份完整的,就不需要shuffle
var result = ratingPair.map({ x =>
var movies = moviesBroadcast.value //取出廣播變量內(nèi)容值
var name = movies.getOrElse(x._1,"No") //取出當(dāng)前movieID的電影名字
(x._1,(name,x._2)) //
})
result.map(x => (x._1,x._2._1,x._2._2)).saveAsTextFile(args(2))//重新定義輸出格式并輸出
val endTime = System.currentTimeMillis()//結(jié)束時間
println("運(yùn)行時間(秒)"+(endTime-startTime)*0.001)
}
}




這里出現(xiàn)了兩個Job的原因是有兩個action算子(saveAsTextFile,collectAsMap)。在Job0中,只進(jìn)行了一個工作collectAsMap,是為了后面廣播方便。在Job1中,因為我們避免了耗時的join的Shuffle操作,自然就只有兩個stage了。
-
這里還有一個地方可以改進(jìn),在廣播的時候我有兩個選擇,廣播內(nèi)容為(電影ID,電影名字)的RDD和(電影ID,電影平均分),雖然兩個數(shù)據(jù)集的行數(shù)一樣,但是,電影名字的字節(jié)數(shù)遠(yuǎn)遠(yuǎn)大于平均分的字節(jié)數(shù),所以廣播(電影ID,電影平均分),最后再把分?jǐn)?shù)低于4.0的過濾掉,可以傳輸更小的字節(jié)數(shù),節(jié)約IO和網(wǎng)絡(luò)傳輸時耗,時間縮短了3s。
運(yùn)行結(jié)果分析
- 從整體運(yùn)行時間來看,Reduce-side Join和Map-side Join分別為27s和24s,其最大的原因就是Reduce-side Join有更多的Shuffle操作,增加網(wǎng)絡(luò)和IO時耗。
- Map-side Join的時耗主要是collectAsMap操作耗時,而Reduce-side Join的時耗主要是兩個shuffle操作。在這里主要是為了計算電影評分的平均分,使得Map-side Join不得不用了一次shuffle操作,如果只是單純的連接表,不需要求平均值的話,那么Map-side Join就不需要shuffle操作,會變得非常快了,這樣Map-side Join和Reduce-side Join的差異會更明顯了。
- 所以在開發(fā)過程中盡肯能的要去避免shuffle操作,用高性能的算子去代替。
Spark參數(shù)優(yōu)化
前面說了,除了可以在開發(fā)過程中進(jìn)行開發(fā)調(diào)優(yōu),還可以靈活的分配資源,使在現(xiàn)有資源上運(yùn)行達(dá)到最優(yōu)。以Map-side Join為例,使用3臺centos6.5虛擬機(jī),內(nèi)存分別為6 2 2,cup為4核心i5。

詳細(xì)原理見上圖。我們使用spark-submit提交一個Spark作業(yè)之后,這個作業(yè)就會啟動一個對應(yīng)的Driver進(jìn)程。根據(jù)你使用的部署模式(deploy-mode)不同,Driver進(jìn)程可能在本地啟動,也可能在集群中某個工作節(jié)點(diǎn)上啟動。Driver進(jìn)程本身會根據(jù)我們設(shè)置的參數(shù),占有一定數(shù)量的內(nèi)存和CPU core。而Driver進(jìn)程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,美團(tuán)?大眾點(diǎn)評使用的是YARN作為資源管理集群)申請運(yùn)行Spark作業(yè)需要使用的資源,這里的資源指的就是Executor進(jìn)程。YARN集群管理器會根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù),在各個工作節(jié)點(diǎn)上,啟動一定數(shù)量的Executor進(jìn)程,每個Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core。
在申請到了作業(yè)執(zhí)行所需的資源之后,Driver進(jìn)程就會開始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了。Driver進(jìn)程會將我們編寫的Spark作業(yè)代碼分拆為多個stage,每個stage執(zhí)行一部分代碼片段,并為每個stage創(chuàng)建一批task,然后將這些task分配到各個Executor進(jìn)程中執(zhí)行。task是最小的計算單元,負(fù)責(zé)執(zhí)行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個task處理的數(shù)據(jù)不同而已。一個stage的所有task都執(zhí)行完畢之后,會在各個節(jié)點(diǎn)本地的磁盤文件中寫入計算中間結(jié)果,然后Driver就會調(diào)度運(yùn)行下一個stage。下一個stage的task的輸入數(shù)據(jù)就是上一個stage輸出的中間結(jié)果。如此循環(huán)往復(fù),直到將我們自己編寫的代碼邏輯全部執(zhí)行完,并且計算完所有的數(shù)據(jù),得到我們想要的結(jié)果為止。
Spark是根據(jù)shuffle類算子來進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,劃分出一個stage界限來。可以大致理解為,shuffle算子執(zhí)行之前的代碼會被劃分為一個stage,shuffle算子執(zhí)行以及之后的代碼會被劃分為下一個stage。因此一個stage剛開始執(zhí)行的時候,它的每個task可能都會從上一個stage的task所在的節(jié)點(diǎn),去通過網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))。這個過程就是shuffle。
當(dāng)我們在代碼中執(zhí)行了cache/persist等持久化操作時,根據(jù)我們選擇的持久化級別的不同,每個task計算出來的數(shù)據(jù)也會保存到Executor進(jìn)程的內(nèi)存或者所在節(jié)點(diǎn)的磁盤文件中。
因此Executor的內(nèi)存主要分為三塊:第一塊是讓task執(zhí)行我們自己編寫的代碼時使用,默認(rèn)是占Executor總內(nèi)存的20%;第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進(jìn)行聚合等操作時使用,默認(rèn)也是占Executor總內(nèi)存的20%;第三塊是讓RDD持久化時使用,默認(rèn)占Executor總內(nèi)存的60%。
task的執(zhí)行速度是跟每個Executor進(jìn)程的CPU core數(shù)量有直接關(guān)系的。一個CPU core同一時間只能執(zhí)行一個線程。而每個Executor進(jìn)程上分配到的多個task,都是以每個task一條線程的方式,多線程并發(fā)運(yùn)行的。如果CPU core數(shù)量比較充足,而且分配到的task數(shù)量比較合理,那么通常來說,可以比較快速和高效地執(zhí)行完這些task線程。
以上就是Spark作業(yè)的基本運(yùn)行原理的說明,大家可以結(jié)合上圖來理解。理解作業(yè)基本原理,是我們進(jìn)行資源參數(shù)調(diào)優(yōu)的基本前提。
num-executors
- 參數(shù)說明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個Executor進(jìn)程來執(zhí)行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設(shè)置來在集群的各個工作節(jié)點(diǎn)上,啟動相應(yīng)數(shù)量的Executor進(jìn)程。這個參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會給你啟動少量的Executor進(jìn)程,此時你的Spark作業(yè)的運(yùn)行速度是非常慢的。
- 參數(shù)調(diào)優(yōu)建議:每個Spark作業(yè)的運(yùn)行一般設(shè)置50~100個左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無法充分利用集群資源;設(shè)置的太多的話,大部分隊列可能無法給予充分的資源。
之前沒有手動設(shè)置這個參數(shù),可以看到spark啟動了2個executor進(jìn)程。我這里只有3臺虛擬機(jī),嘗試就設(shè)置為3了。發(fā)現(xiàn)性能降低了很多,估計是因為我的數(shù)據(jù)太小了,節(jié)點(diǎn)多了就加大了shuffle的消耗,所以設(shè)置為1,發(fā)現(xiàn)更快了,所以這真的是因為數(shù)據(jù)太小,數(shù)據(jù)傳輸?shù)臅r間代價大于數(shù)據(jù)處理的時間代價。平常大數(shù)據(jù)情況下這個參數(shù)應(yīng)該根據(jù)經(jīng)驗設(shè)置為50-100。


executor-memory
- 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常,也有直接的關(guān)聯(lián)。
- 參數(shù)調(diào)優(yōu)建議:每個Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適。但是這只是一個參考值,具體的設(shè)置還是得根據(jù)不同部門的資源隊列來定。可以看看自己團(tuán)隊的資源隊列的最大內(nèi)存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內(nèi)存量的。此外,如果你是跟團(tuán)隊里其他人共享這個資源隊列,那么申請的內(nèi)存量最好不要超過資源隊列最大總內(nèi)存的1/3 ~1/2,避免你自己的Spark作業(yè)占用了隊列所有的資源,導(dǎo)致別的同學(xué)的作業(yè)無法運(yùn)行。

executor-cores
- 參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進(jìn)程的CPU core數(shù)量。這個參數(shù)決定了每個Executor進(jìn)程并行執(zhí)行task線程的能力。因為每個CPU core同一時間只能執(zhí)行一個task線程,因此每個Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程。
- 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個較為合適。同樣得根據(jù)不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量,來決定每個Executor進(jìn)程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運(yùn)行。
由于我的集群是運(yùn)行在虛擬機(jī)上的,所以所有節(jié)點(diǎn)共享windows的cpu,即每個節(jié)點(diǎn)相當(dāng)于有4個cpu,所以設(shè)置為4,發(fā)現(xiàn)報錯了,應(yīng)該是資源不足,設(shè)置為2也報錯,應(yīng)該是虛擬機(jī)的cpu限制機(jī)制吧,所以只能設(shè)置為1或者默認(rèn)了。
driver-memory
- 參數(shù)說明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。
- 參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來說不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了。唯一需要注意的一點(diǎn)是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大,否則會出現(xiàn)OOM內(nèi)存溢出的問題。
這個參數(shù)只要保證進(jìn)行collect算子的時候,所有數(shù)據(jù)全部集中到Driver進(jìn)程不會oom就行了,我這里數(shù)據(jù)相當(dāng)小就不用設(shè)置了。
spark.default.parallelism
- 參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認(rèn)task數(shù)量。這個參數(shù)極為重要,如果不設(shè)置可能會直接影響你的Spark作業(yè)性能。
- 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個較為合適。很多同學(xué)常犯的一個錯誤就是不去設(shè)置這個參數(shù),那么此時就會導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個HDFS block對應(yīng)一個task。通常來說,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個task),如果task數(shù)量偏少的話,就會導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。試想一下,無論你的Executor進(jìn)程有多少個,內(nèi)存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進(jìn)程可能根本就沒有task執(zhí)行,也就是白白浪費(fèi)了資源!因此Spark官網(wǎng)建議的設(shè)置原則是,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數(shù)量為300個,那么設(shè)置1000個task是可以的,此時可以充分地利用Spark集群的資源。
首先隨便查看一個stage的信息,發(fā)現(xiàn)每個executor的task為1(因為文件很小只有一個hadoop block),只有一個線程完全沒有并發(fā),效率很低。根據(jù)num-executors * executor-cores的2~3倍,我這里就設(shè)置為2,和默認(rèn)相比進(jìn)步了1s。


spark.storage.memoryFraction
- 參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6。也就是說,默認(rèn)Executor 60%的內(nèi)存,可以用來保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時,可能數(shù)據(jù)就不會持久化,或者數(shù)據(jù)會寫入磁盤。
- 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù),導(dǎo)致數(shù)據(jù)只能寫入磁盤中,降低了性能。但是如果Spark作業(yè)中的shuffle類操作比較多,而持久化操作比較少,那么這個參數(shù)的值適當(dāng)降低一些比較合適。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢(通過spark web ui可以觀察到作業(yè)的gc耗時),意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個參數(shù)的值。


spark.shuffle.memoryFraction
- 參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個task拉取到上個stage的task的輸出后,進(jìn)行聚合操作時能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2。也就是說,Executor默認(rèn)只有20%的內(nèi)存用來進(jìn)行該操作。shuffle操作在進(jìn)行聚合時,如果發(fā)現(xiàn)使用的內(nèi)存超出了這個20%的限制,那么多余的數(shù)據(jù)就會溢寫到磁盤文件中去,此時就會極大地降低性能。
- 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內(nèi)存占比,提高shuffle操作的內(nèi)存占比比例,避免shuffle過程中數(shù)據(jù)過多時內(nèi)存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個參數(shù)的值。

序列化算法
在Spark的架構(gòu)中,在網(wǎng)絡(luò)中傳遞的或者緩存在內(nèi)存、硬盤中的對象需要進(jìn)行序列化操作,序列化的作用主要是利用時間換空間:
- 分發(fā)給Executor上的Task
- 需要緩存的RDD(前提是使用序列化方式緩存)
- 廣播變量
- Shuffle過程中的數(shù)據(jù)緩存
- 使用receiver方式接收的流數(shù)據(jù)緩存
- 算子函數(shù)中使用的外部變量
上面的六種數(shù)據(jù),通過Java序列化(默認(rèn)的序列化方式)形成一個二進(jìn)制字節(jié)數(shù)組,大大減少了數(shù)據(jù)在內(nèi)存、硬盤中占用的空間,減少了網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)拈_銷,并且可以精確的推測內(nèi)存使用情況,降低GC頻率。
但是在序列化和反序列化的過程中,會消耗大量的時間,所以選擇一個好的序列化算法很重要。目前Spark使用Kryo比Java默認(rèn)的序列化快10倍。具體原理可見Kryo參考,這里只需要添加配置使用Kryo即可。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化庫
運(yùn)行時間變成了17s,再去查看序列化和反序列化的時耗:


總結(jié)
- 首先要從根源進(jìn)行優(yōu)化,也就是編寫程序的時候,比如注意避免創(chuàng)建重復(fù)RDD、持久化常使用的RDD等編碼方式。
- 編碼過程中盡量少的出現(xiàn)shuffle操作,用其它操作代替。
- 序列化和反序列化使用得非常多,所以使用Kryo比默認(rèn)快10倍是非常重要的。
- 對于資源而言,沒有絕對的配置方法,首先要理解每個資源參數(shù)的意義和使用經(jīng)驗,再根據(jù)自己的集群狀態(tài)來做調(diào)整。

