1. 分配更多的資源
首選,一定范圍內(nèi),增加資源與性能的提升成正比,在資源最大化后考慮后面的調優(yōu)
1.1 分配哪些資源
executor‐memory、executor‐cores、driver‐memory
1.2 在哪里可以設置這些資源
在spark-submit shell 腳本進行集群資源的分配
1.3資源最大化如何實現(xiàn)
第一種情況:標準模式 standalone模式下
? ? 比如共有20個worker節(jié)點,每個節(jié)點的配置是8g內(nèi)存,10個cpu
那么實際分配資源的時候可以給20個excutor,每個excutor的內(nèi)存8g,每個excutor的使用cpu為10
?
第二種情況:Yarn
? ? 先計算出yarn集群的所有大小,比如一共500g內(nèi)存,100個cpu; 這個時候可以分配的最大資源,比如給定50個executor、每個executor的內(nèi)存大小10g,每個executor使用的cpu 數(shù)為2。
2. 提高并行度
并行度:各個stage中的task的數(shù)量,也就代表了spark各個階段stage的并行度,我們說的RDD并行計算是在各個stage中并行計算的,而stage和stage之間是串行的,也就是又先后順序的。
?
并行度的提高應該結合當前資源的情況進行設置。比如,50個excutor,每個excutor內(nèi)存10g,3個cpu core數(shù),那么task應該至少為150個,少于150個就會造成資源浪費,一般每個core分配2-3個task比較合理(也就是2-3個partition)
2.1如何提高并行度
2.2.1增加task數(shù)量(sparkcore)
第一種方式
通過設置參數(shù)spark.default.parallelism
默認是沒有值的,如果設置了值為10,它會在shuffle的過程才會起作用。 比如
val rdd2 = rdd1.reduceByKey(_+_) 此時rdd2的分區(qū)數(shù)就是10,rdd1的分區(qū)數(shù)不受這個參數(shù)的影響。
可以通過在構建SparkConf對象的時候設置,例如:new SparkConf().set("spark.defalut.parallelism","500")
第二種方式
通過給RDD進行重新分區(qū)間接實現(xiàn)增加task的操作(因為一個RDD會有多個分區(qū),每個分區(qū)對應一個分片,每個分片會對應一個task,增加partition數(shù)量就是間接增加了task數(shù)量)(可以想象成mr中的一個文件就是一個RDD)
2.2.2提高sparksql運行的task數(shù)量
通過設置參數(shù) spark.shuffle.partitions=500 默認是200
3. RDD的重用和持久化
rdd1-->rdd2-->rdd3-->rdd4
rdd1-->rdd2-->rdd3-->rdd5
可以把多次使用的RDD進行持久化(rdd3),避免重復計算
3.1如何進行持久化
可以使用cache或者persist方法
cache是persist中的一種,將RDD緩存到內(nèi)存,參數(shù)
persist中存在多個緩存級別以及多個緩存份數(shù)(磁盤內(nèi)存)
3.2rdd持久化的時可以采用序列
如果因為內(nèi)存無法存儲公共的rdd,導致oom,那么就可以采用將RDD序列化到內(nèi)存中,序列化后會大大減少內(nèi)存空間。
如果序列化后內(nèi)存還是oom了,只能將RDD緩存到磁盤
序列化的缺點,獲取數(shù)據(jù)時要進行反序列化,優(yōu)點:可以減少占用的內(nèi)存空間和提高網(wǎng)絡傳輸?shù)乃俾?/p>
3.3 廣播變量的使用
spark中分布式執(zhí)行代碼需要傳遞到各個excutor中的task上運行。對于固定的數(shù)據(jù),每次都要從driver端發(fā)送到各個task中去,造成大量的網(wǎng)絡傳輸消耗??梢蕴崆皩⑦@些數(shù)據(jù)廣播到每個excutor中,每個excutor中的task共用一份數(shù)據(jù)。
廣播變量最初的時候會在driver端保存一份副本,task在運行時,會在所在的excutor對應的blockmanager中,嘗試獲取廣播變量,如果沒有就會到driver中加載數(shù)據(jù)并存到blockmanager中
executor的BlockManager除了從driver上拉取,也可能從其他節(jié)點的BlockManager上拉取變量副本,網(wǎng)絡距離 越近越好。
3.3.1如何使用廣播變量
例如: (1) 通過sparkContext的broadcast方法把數(shù)據(jù)轉換成廣播變量,類型為Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6)) (2) 然后executor上的BlockManager就可以拉取該廣播變量的副本獲取具體的數(shù)據(jù)。 獲取廣播變量中的值可以通過調用其value方法 val array: Array[Int] = broadcastArray.value
4.降低cache操作的內(nèi)存占比
jvm內(nèi)存不充足的時候,會出現(xiàn)的問題: (1)、頻繁minor gc,也會導致頻繁spark停止工作 (2)、老年代囤積大量活躍對象(短生命周期的對象),導致頻繁full gc,full gc時間很長,短則數(shù)十秒,長則數(shù)分 鐘,甚至數(shù)小時??赡軐е聅park長時間停止工作。 (3)、頻繁gc會嚴重影響spark的性能和運行的速度。
cache操作的內(nèi)存占比為堆內(nèi)存的0.6 也就是百分之60,可以適當調節(jié),降低該值, 修改spark.storage.memoryFraction參數(shù) 可以設置為0.5‐‐‐>0.4‐‐‐‐‐‐>0.3 例如: new SparkConf().set("spark.storage.memoryFraction","0.4") 把cache操作的內(nèi)存占比修改為堆內(nèi)存的百分之40,讓堆內(nèi)存可以容納更多的對象,減少gc的頻率,提高spark任務運行 的速度和性能。
5.數(shù)據(jù)傾斜調優(yōu)
?5.1數(shù)據(jù)傾斜發(fā)生時的現(xiàn)象
1.有些task很快運行完,有些要一兩個小時
2.原本正常執(zhí)行的spark作業(yè),某天突然oom(內(nèi)存溢出)異常,這種情況比較少見
5.2數(shù)據(jù)傾斜發(fā)生的原理
shuffle完成后會對相同的key進行拉取到某個節(jié)點上的task進行處理,如果某個key對應的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜
5.3如何定位導致數(shù)據(jù)傾斜的代碼
數(shù)據(jù)傾斜只會發(fā)生在shuffle過程中,常用的并且可能會觸發(fā)shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、 repartition等出現(xiàn)數(shù)據(jù)傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導致的。
???? 5.3.1某個task執(zhí)行特別慢的情況
???? 5.3.2某個task莫名其妙內(nèi)存溢出的情況
數(shù)據(jù)傾斜的解決方案
解決方案一:使用Hive ETL預處理數(shù)據(jù)
解決方案二:過濾少數(shù)導致傾斜的key
解決方案三:提高shuffle操作的并行度
解決方案四:兩階段聚合(局部聚合+全局聚合)
解決方案五:將reduce join轉為map join
解決方案六:采樣傾斜key并分拆join操作