使用 mapPartitions,按每個分區(qū)計算結(jié)果
如果每條記錄的開銷太大,例:
rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}
則可以使用MapPartitions,按每個分區(qū)計算結(jié)果,如
rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)
使用mapPartitions可以更靈活地操作數(shù)據(jù),例如對一個很大的數(shù)據(jù)求TopN,當(dāng)N不是很大時,可以先使用mapPartitions對每個partition求TopN,collect結(jié)果到本地之后再做排序取TopN。這樣相比直接對全量數(shù)據(jù)做排序取TopN效率要高很多。
使用coalesce調(diào)整分片的數(shù)量
coalesce可以調(diào)整分片的數(shù)量。coalesce函數(shù)有兩個參數(shù):
coalesce(numPartitions: Int, shuffle: Boolean = false)
當(dāng)shuffle為true的時候,函數(shù)作用與repartition(numPartitions:?Int)相同,會將數(shù)據(jù)通過Shuffle的方式重新分區(qū);當(dāng)shuffle為false的時候,則只是簡單的將父RDD的多個partition合并到同一個task進(jìn)行計算,shuffle為false時,如果numPartitions大于父RDD的切片數(shù),那么分區(qū)不會重新調(diào)整。
遇到下列場景,可選擇使用coalesce算子:
● 當(dāng)之前的操作有很多filter時,使用coalesce減少空運(yùn)行的任務(wù)數(shù)量。此時使用coalesce(numPartitions,
false),numPartitions小于父RDD切片數(shù)。
● 當(dāng)輸入切片個數(shù)太大,導(dǎo)致程序無法正常運(yùn)行時使用。
● 當(dāng)任務(wù)數(shù)過大時候Shuffle壓力太大導(dǎo)致程序掛住不動,或者出現(xiàn)linux資源受限的問題。此時需要對數(shù)據(jù)重新進(jìn)行分區(qū),使用coalesce(numPartitions,?true)。
localDir 配置
Spark的Shuffle過程需要寫本地磁盤,Shuffle是Spark性能的瓶頸,I/O是Shuffle的瓶頸。配置多個磁盤則可以并行的把數(shù)據(jù)寫入磁盤。如果節(jié)點(diǎn)中掛載多個磁盤,則在每個磁盤配置一個Spark的localDir,這將有效分散Shuffle文件的存放,提高磁盤I/O的效率。如果只有一個磁盤,配置了多個目錄,性能提升效果不明顯。
collect 小數(shù)據(jù)
大數(shù)據(jù)量不適用collect操作。
collect操作會將Executor的數(shù)據(jù)發(fā)送到Driver端,因此使用collect前需要確保Driver端內(nèi)存足夠,以免Driver進(jìn)程發(fā)生OutOfMemory異常。當(dāng)不確定數(shù)據(jù)量大小時,可使用saveAsTextFile等操作把數(shù)據(jù)寫入HDFS中。只有在能夠大致確定數(shù)據(jù)大小且driver內(nèi)存充足的時候,才能使用collect。
使用reduceByKey
reduceByKey會在Map端做本地聚合,使得Shuffle過程更加平緩,而groupByKey等Shuffle操作不會在Map端做聚合。因此能使用reduceByKey的地方盡量使用該算子,避免出現(xiàn)groupByKey().map(x=>(x._1,x._2.size))這類實現(xiàn)方式。
廣播 map 代替數(shù)組
當(dāng)每條記錄需要查表,如果是Driver端用廣播方式傳遞的數(shù)據(jù),數(shù)據(jù)結(jié)構(gòu)優(yōu)先采用set/map而不是Iterator,因為Set/Map的查詢速率接近O(1),而Iterator是O(n)。
數(shù)據(jù)傾斜
當(dāng)數(shù)據(jù)發(fā)生傾斜(某一部分?jǐn)?shù)據(jù)量特別大),雖然沒有GC(Gabage?Collection,垃圾回收),但是task執(zhí)行時間嚴(yán)重不一致。
● 需要重新設(shè)計key,以更小粒度的key使得task大小合理化。?
● 修改并行度。
優(yōu)化數(shù)據(jù)結(jié)構(gòu)
● 把數(shù)據(jù)按列存放,讀取數(shù)據(jù)時就可以只掃描需要的列。
● 使用 Hash Shuffle 時,通過設(shè)置spark.shuffle.consolidateFiles為true,來合并shuffle中間文件,減少shuffle文件的數(shù)量,減少文件IO操作以提升性能。最終文件數(shù)為reduce?tasks數(shù)目。