spark常見問題分析

分析

spark常見的問題不外乎oom:

我們首先看一下Spark 的內(nèi)存模型:

Spark在一個Executor中的內(nèi)存分為三塊,一塊是execution內(nèi)存,一塊是storage內(nèi)存,一塊是other內(nèi)存。

execution內(nèi)存是執(zhí)行內(nèi)存,文檔中說join,aggregate都在這部分內(nèi)存中執(zhí)行,shuffle的數(shù)據(jù)也會先緩存在這個內(nèi)存中,滿了再寫入磁盤,能夠減少IO。其實map過程也是在這個內(nèi)存中執(zhí)行的。

storage內(nèi)存是存儲broadcast,cache,persist數(shù)據(jù)的地方。

other內(nèi)存是程序執(zhí)行時預(yù)留給自己的內(nèi)存。

OOM的問題通常出現(xiàn)在execution這塊內(nèi)存中,因為storage這塊內(nèi)存在存放數(shù)據(jù)滿了之后,會直接丟棄內(nèi)存中舊的數(shù)據(jù),對性能有影響但是不會有OOM的問題。

Spark中的OOM問題不外乎以下三種情況

map執(zhí)行中內(nèi)存溢出
shuffle后內(nèi)存溢出
driver內(nèi)存溢出
前兩種情況發(fā)生在executor中,最后情況發(fā)生在driver中

我們針對每種情況具體分析

Driver heap:

Driver heap OOM的三大原因:
(1).用戶在Driver端口生成大對象, 比如創(chuàng)建了一個大的集合數(shù)據(jù)結(jié)構(gòu)
解決思路:
1.1. 考慮將該大對象轉(zhuǎn)化成Executor端加載. 例如調(diào)用sc.textFile/sc.hadoopFile等
1.2. 如若無法避免, 自我評估該大對象占用的內(nèi)存, 相應(yīng)增加driver-memory的值
(2).從Executor端收集數(shù)據(jù)回Driver端
比如Collect. 某個Stage中Executor端發(fā)回的所有數(shù)據(jù)量不能超過spark.driver.maxResultSize,默認1g. 如果用戶增加該值, 請對應(yīng)增加2delta increase到Driver Memory, resultSize該值只是數(shù)據(jù)序列化之后的Size, 如果是Collect的操作會將這些數(shù)據(jù)反序列化收集, 此時真正所需內(nèi)存需要膨脹2-5倍, 甚至10倍.
解決思路:
2.1. 本身不建議將大的數(shù)據(jù)從Executor端, collect回來. 建議將Driver端對collect回來的數(shù)據(jù)所做的操作, 轉(zhuǎn)化成Executor端RDD操作.
2.2. 如若無法避免, 自我評collect需要的內(nèi)存, 相應(yīng)增加driver-memory的值
(3)Spark本身框架的數(shù)據(jù)消耗.
現(xiàn)在在Spark1.6版本之后主要由Spark UI數(shù)據(jù)消耗, 取決于作業(yè)的累計Task個數(shù).
解決思路:
3.1. 考慮縮小大numPartitions的Stage的partition個數(shù), 例如從HDFS load的partitions一般自動計算, 但是后續(xù)用戶的操作中做了過濾等操作已經(jīng)大大減少數(shù)據(jù)量, 此時可以縮小Parititions。
3.2. 通過參數(shù)spark.ui.retainedStages(默認1000)/spark.ui.retainedJobs(默認1000)控制.
3.3. 實在沒法避免, 相應(yīng)增加內(nèi)存.

Executor heap:

map過程產(chǎn)生大量對象導(dǎo)致內(nèi)存溢出:
數(shù)據(jù)傾斜導(dǎo)致內(nèi)存溢出:
coalesce調(diào)用導(dǎo)致內(nèi)存溢出:
shuffle后內(nèi)存溢出:
(1) reduce oom?
原因:reduce task 去map端獲取數(shù)據(jù),reduce一邊拉取數(shù)據(jù)一邊聚合,reduce端有一塊聚合內(nèi)存(executor memory * 0.2),也就是這塊內(nèi)存不夠
解決方法:
1.增加reduce 聚合操作的內(nèi)存的比例
2.增加Executor memory的大小 --executor-memory 5G
3.減少reduce task每次拉取的數(shù)據(jù)量 設(shè)置spak.reducer.maxSizeInFlight 24m, 拉取的次數(shù)就多了,因此建立連接的次數(shù)增多,有可能會連接不上(正好趕上map task端進行GC)

(2) shuffle file cannot find or executor lost?
解決方法:
當出現(xiàn)以下異常時:shuffle file cannot find,executor lost、task lost,out of memory,可以調(diào)節(jié)

(3) Executor的堆外內(nèi)存大小
問題原因:
1.map task所運行的executor內(nèi)存不足,導(dǎo)致executor
掛掉了,executor里面的BlockManager就掛掉了,導(dǎo)致ConnectionManager不能用,也就無法建立連接,從而不能拉取數(shù)據(jù)
2.executor并沒有掛掉
2.1 BlockManage之間的連接失?。╩ap task所運行的executor正在GC)
2.2建立連接成功,map task所運行的executor正在GC
3.reduce task向Driver中的MapOutputTracker獲取shuffle file位置的時候出現(xiàn)了問題
解決方法:
1.增大Executor內(nèi)存(即堆內(nèi)內(nèi)存) ,申請的堆外內(nèi)存也會隨之增加--executor-memory 5G
2.增大堆外內(nèi)存 --conf spark.yarn.executor.memoryoverhead 2048M
--conf spark.executor.memoryoverhead 2048M
(默認申請的堆外內(nèi)存是Executor內(nèi)存的10%,真正處理大數(shù)據(jù)的時候,這里都會出現(xiàn)問題,導(dǎo)致spark作業(yè)反復(fù)崩潰,無法運行;此時就會去調(diào)節(jié)這個參數(shù),到至少1G(1024M),甚至說2G、4G)

注:在shuffle過程中可調(diào)的參數(shù):

spark.shuffle.file.buffer
默認值:32k
參數(shù)說明:該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當增加這個參數(shù)的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。
注:shuffle中有以下操作會使用到該參數(shù):map端:spill、合并文件時

spark.reducer.maxSizeInFlight
默認值:48m
參數(shù)說明:該參數(shù)用于設(shè)置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。
調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當增加這個參數(shù)的大小(比如96m),從而減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進而提升性能。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。
錯誤:reduce oom
reduce task去map拉數(shù)據(jù),reduce 一邊拉數(shù)據(jù)一邊聚合 reduce段有一塊聚合內(nèi)存(executor memory * 0.2)
解決辦法:1、增加reduce 聚合的內(nèi)存的比例 設(shè)置spark.shuffle.memoryFraction
2、 增加executor memory的大小 --executor-memory 5G
3、減少reduce task每次拉取的數(shù)據(jù)量 設(shè)置spark.reducer.maxSizeInFlight 24m

spark.shuffle.io.maxRetries
默認值:3
參數(shù)說明:shuffle read task從shuffle write task所在節(jié)點拉取屬于自己的數(shù)據(jù)時,如果因為網(wǎng)絡(luò)異常導(dǎo)致拉取失敗,是會自動進行重試的。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功,就可能會導(dǎo)致作業(yè)執(zhí)行失敗。
調(diào)優(yōu)建議:對于那些包含了特別耗時的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
shuffle file not find taskScheduler不負責(zé)重試task,由DAGScheduler負責(zé)重試stage

spark.shuffle.io.retryWait
默認值:5s
參數(shù)說明:具體解釋同上,該參數(shù)代表了每次重試拉取數(shù)據(jù)的等待間隔,默認是5s。
調(diào)優(yōu)建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩(wěn)定性。

spark.shuffle.memoryFraction
默認值:0.2
參數(shù)說明:該參數(shù)代表了Executor內(nèi)存中,分配給shuffle read task進行聚合操作的內(nèi)存比例,默認是20%。
調(diào)優(yōu)建議:在資源參數(shù)調(diào)優(yōu)中講解過這個參數(shù)。如果內(nèi)存充足,而且很少使用持久化操作,建議調(diào)高這個比例,給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過程中頻繁讀寫磁盤。在實踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右。

spark.shuffle.manager
默認值:sort
參數(shù)說明:該參數(shù)用于設(shè)置ShuffleManager的類型。Spark 1.5以后,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外內(nèi)存管理機制,內(nèi)存使用效率更高。
調(diào)優(yōu)建議:由于SortShuffleManager默認會對數(shù)據(jù)進行排序,因此如果你的業(yè)務(wù)邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果你的業(yè)務(wù)邏輯不需要對數(shù)據(jù)進行排序,那么建議參考后面的幾個參數(shù)調(diào)優(yōu),通過bypass機制或優(yōu)化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因為之前發(fā)現(xiàn)了一些相應(yīng)的bug。

spark.shuffle.sort.bypassMergeThreshold
默認值:200
參數(shù)說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數(shù)量小于這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨的索引文件。
調(diào)優(yōu)建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles
默認值:false
參數(shù)說明:如果使用HashShuffleManager,該參數(shù)有效。如果設(shè)置為true,那么就會開啟consolidate機制,會大幅度合并shuffle write的輸出文件,對于shuffle read task數(shù)量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。
調(diào)優(yōu)建議:如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制,還可以嘗試將spark.shffle.manager參數(shù)手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發(fā)現(xiàn)其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1、 性能調(diào)優(yōu) 1.1、 分配更多資源 1.1.1、分配哪些資源? Executor的數(shù)量 每個Executor所...
    Frank_8942閱讀 4,840評論 2 36
  • 1.1、 分配更多資源 1.1.1、分配哪些資源? Executor的數(shù)量 每個Executor所能分配的CPU數(shù)...
    miss幸運閱讀 3,283評論 3 15
  • Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎。Spark是UC Berkeley AM...
    大佛愛讀書閱讀 2,979評論 0 20
  • spark-submit的時候如何引入外部jar包 在通過spark-submit提交任務(wù)時,可以通過添加配置參數(shù)...
    博弈史密斯閱讀 3,022評論 1 14
  • Kafka是什么 Kafka是最初由Linkedin公司開發(fā),是一個分布式、分區(qū)的、多副本的、多訂閱者,基于zoo...
    Andone1cc閱讀 320評論 0 1

友情鏈接更多精彩內(nèi)容