Spark Core 性能調(diào)優(yōu)總結(jié)

使用 mapPartitions,按每個分區(qū)計算結(jié)果

如果每條記錄的開銷太大,例:

rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}

則可以使用MapPartitions,按每個分區(qū)計算結(jié)果,如

rdd.mapPartitions(records => conn.getDBConn;for(item <- records)

write(item.toString); conn.close)

使用mapPartitions可以更靈活地操作數(shù)據(jù),例如對一個很大的數(shù)據(jù)求TopN,當(dāng)N不是很大時,可以先使用mapPartitions對每個partition求TopN,collect結(jié)果到本地之后再做排序取TopN。這樣相比直接對全量數(shù)據(jù)做排序取TopN效率要高很多。

使用coalesce調(diào)整分片的數(shù)量

coalesce可以調(diào)整分片的數(shù)量。coalesce函數(shù)有兩個參數(shù):

coalesce(numPartitions: Int, shuffle: Boolean = false)

當(dāng)shuffle為true的時候,函數(shù)作用與repartition(numPartitions:?Int)相同,會將數(shù)據(jù)通過Shuffle的方式重新分區(qū);當(dāng)shuffle為false的時候,則只是簡單的將父RDD的多個partition合并到同一個task進(jìn)行計算,shuffle為false時,如果numPartitions大于父RDD的切片數(shù),那么分區(qū)不會重新調(diào)整。

遇到下列場景,可選擇使用coalesce算子:

● 當(dāng)之前的操作有很多filter時,使用coalesce減少空運(yùn)行的任務(wù)數(shù)量。此時使用coalesce(numPartitions,

false),numPartitions小于父RDD切片數(shù)。

● 當(dāng)輸入切片個數(shù)太大,導(dǎo)致程序無法正常運(yùn)行時使用。

● 當(dāng)任務(wù)數(shù)過大時候Shuffle壓力太大導(dǎo)致程序掛住不動,或者出現(xiàn)linux資源受限的問題。此時需要對數(shù)據(jù)重新進(jìn)行分區(qū),使用coalesce(numPartitions,?true)。

localDir 配置

Spark的Shuffle過程需要寫本地磁盤,Shuffle是Spark性能的瓶頸,I/O是Shuffle的瓶頸。配置多個磁盤則可以并行的把數(shù)據(jù)寫入磁盤。如果節(jié)點(diǎn)中掛載多個磁盤,則在每個磁盤配置一個Spark的localDir,這將有效分散Shuffle文件的存放,提高磁盤I/O的效率。如果只有一個磁盤,配置了多個目錄,性能提升效果不明顯。

collect 小數(shù)據(jù)

大數(shù)據(jù)量不適用collect操作。

collect操作會將Executor的數(shù)據(jù)發(fā)送到Driver端,因此使用collect前需要確保Driver端內(nèi)存足夠,以免Driver進(jìn)程發(fā)生OutOfMemory異常。當(dāng)不確定數(shù)據(jù)量大小時,可使用saveAsTextFile等操作把數(shù)據(jù)寫入HDFS中。只有在能夠大致確定數(shù)據(jù)大小且driver內(nèi)存充足的時候,才能使用collect。

使用reduceByKey

reduceByKey會在Map端做本地聚合,使得Shuffle過程更加平緩,而groupByKey等Shuffle操作不會在Map端做聚合。因此能使用reduceByKey的地方盡量使用該算子,避免出現(xiàn)groupByKey().map(x=>(x._1,x._2.size))這類實現(xiàn)方式。

廣播 map 代替數(shù)組

當(dāng)每條記錄需要查表,如果是Driver端用廣播方式傳遞的數(shù)據(jù),數(shù)據(jù)結(jié)構(gòu)優(yōu)先采用set/map而不是Iterator,因為Set/Map的查詢速率接近O(1),而Iterator是O(n)。

數(shù)據(jù)傾斜

當(dāng)數(shù)據(jù)發(fā)生傾斜(某一部分?jǐn)?shù)據(jù)量特別大),雖然沒有GC(Gabage?Collection,垃圾回收),但是task執(zhí)行時間嚴(yán)重不一致。

● 需要重新設(shè)計key,以更小粒度的key使得task大小合理化。?

● 修改并行度。

優(yōu)化數(shù)據(jù)結(jié)構(gòu)

● 把數(shù)據(jù)按列存放,讀取數(shù)據(jù)時就可以只掃描需要的列。

● 使用 Hash Shuffle 時,通過設(shè)置spark.shuffle.consolidateFiles為true,來合并shuffle中間文件,減少shuffle文件的數(shù)量,減少文件IO操作以提升性能。最終文件數(shù)為reduce?tasks數(shù)目。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容