Spark排錯與優(yōu)化

一. 運(yùn)維

1. Master掛掉,standby重啟也失效

Master默認(rèn)使用512M內(nèi)存,當(dāng)集群中運(yùn)行的任務(wù)特別多時,就會掛掉,原因是master會讀取每個task的event log日志去生成spark ui,內(nèi)存不足自然會OOM,可以在master的運(yùn)行日志中看到,通過HA啟動的master自然也會因為這個原因失敗。

解決

  1. 增加Master的內(nèi)存占用,在Master節(jié)點spark-env.sh 中設(shè)置:

     export SPARK_DAEMON_MEMORY 10g # 根據(jù)你的實際調(diào)整
    
  2. 減少保存在Master內(nèi)存中的作業(yè)信息

     spark.ui.retainedJobs 500   # 默認(rèn)都是1000
     spark.ui.retainedStages 500
    

2. worker掛掉或假死

有時候我們還會在web ui中看到worker節(jié)點消失或處于dead狀態(tài),在該節(jié)點運(yùn)行的任務(wù)則會報各種 lost worker 的錯誤,引發(fā)原因和上述大體相同,worker內(nèi)存中保存了大量的ui信息導(dǎo)致gc時失去和master之間的心跳。

解決

  1. 增加Master的內(nèi)存占用,在Worker節(jié)點spark-env.sh 中設(shè)置:

     export SPARK_DAEMON_MEMORY 2g # 根據(jù)你的實際情況
    
  2. 減少保存在Worker內(nèi)存中的Driver,Executor信息

     spark.worker.ui.retainedExecutors 200   # 默認(rèn)都是1000
     spark.worker.ui.retainedDrivers 200   
    

二. 運(yùn)行錯誤

1.shuffle FetchFailedException

Spark Shuffle FetchFailedException解決方案

錯誤提示

  1. missing output location

     org.apache.spark.shuffle.MetadataFetchFailedException: 
     Missing an output location for shuffle 0
    
    missing output location
  2. shuffle fetch faild

     org.apache.spark.shuffle.FetchFailedException:
     Failed to connect to spark047215/192.168.47.215:50268
    
    shuffle fetch faild

    當(dāng)前的配置為每個executor使用1core,5GRAM,啟動了20個executor

解決

這種問題一般發(fā)生在有大量shuffle操作的時候,task不斷的failed,然后又重執(zhí)行,一直循環(huán)下去,直到application失敗。

faild

一般遇到這種問題提高executor內(nèi)存即可,同時增加每個executor的cpu,這樣不會減少task并行度。

  • spark.executor.memory 15G
  • spark.executor.cores 3
  • spark.cores.max 21

啟動的execuote數(shù)量為:7個

execuoterNum = spark.cores.max/spark.executor.cores 

每個executor的配置:

3core,15G RAM

消耗的內(nèi)存資源為:105G RAM

15G*7=105G

可以發(fā)現(xiàn)使用的資源并沒有提升,但是同樣的任務(wù)原來的配置跑幾個小時還在卡著,改了配置后幾分鐘就能完成。

2.Executor&Task Lost

錯誤提示

  1. executor lost

     WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
     ExecutorLostFailure (executor lost)
    
  2. task lost

     WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217):
     java.io.IOException: Connection from /192.168.47.217:55483 closed
    
  3. 各種timeout

     java.util.concurrent.TimeoutException: Futures timed out after [120 second]
    
     ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 
     has been quiet for 120000 ms while there are outstanding requests.
     Assuming connection is dead; please adjust spark.network.
     timeout if this is wrong
    

解決

由網(wǎng)絡(luò)或者gc引起,worker或executor沒有接收到executor或task的心跳反饋。
提高 spark.network.timeout 的值,根據(jù)情況改成300(5min)或更高。
默認(rèn)為 120(120s),配置所有網(wǎng)絡(luò)傳輸?shù)难訒r,如果沒有主動設(shè)置以下參數(shù),默認(rèn)覆蓋其屬性

  • spark.core.connection.ack.wait.timeout
  • spark.akka.timeout
  • spark.storage.blockManagerSlaveTimeoutMs
  • spark.shuffle.io.connectionTimeout
  • spark.rpc.askTimeout or spark.rpc.lookupTimeout

3.傾斜

錯誤提示

  1. 數(shù)據(jù)傾斜

    數(shù)據(jù)傾斜
  2. 任務(wù)傾斜
    差距不大的幾個task,有的運(yùn)行速度特別慢。

解決

大多數(shù)任務(wù)都完成了,還有那么一兩個任務(wù)怎么都跑不完或者跑的很慢,分為數(shù)據(jù)傾斜和task傾斜兩種。

  1. 數(shù)據(jù)傾斜
    數(shù)據(jù)傾斜大多數(shù)情況是由于大量的無效數(shù)據(jù)引起,比如null或者"",也有可能是一些異常數(shù)據(jù),比如統(tǒng)計用戶登錄情況時,出現(xiàn)某用戶登錄過千萬次的情況,無效數(shù)據(jù)在計算前需要過濾掉。
    數(shù)據(jù)處理有一個原則,多使用filter,這樣你真正需要分析的數(shù)據(jù)量就越少,處理速度就越快。

     sqlContext.sql("...where col is not null and col != ''")
    

具體可參考:
解決spark中遇到的數(shù)據(jù)傾斜問題

  1. 任務(wù)傾斜
    task傾斜原因比較多,網(wǎng)絡(luò)io,cpu,mem都有可能造成這個節(jié)點上的任務(wù)執(zhí)行緩慢,可以去看該節(jié)點的性能監(jiān)控來分析原因。以前遇到過同事在spark的一臺worker上跑R的任務(wù)導(dǎo)致該節(jié)點spark task運(yùn)行緩慢。
    或者可以開啟spark的推測機(jī)制,開啟推測機(jī)制后如果某一臺機(jī)器的幾個task特別慢,推測機(jī)制會將任務(wù)分配到其他機(jī)器執(zhí)行,最后Spark會選取最快的作為最終結(jié)果。
  • spark.speculation true
  • spark.speculation.interval 100 - 檢測周期,單位毫秒;
  • spark.speculation.quantile 0.75 - 完成task的百分比時啟動推測
  • spark.speculation.multiplier 1.5 - 比其他的慢多少倍時啟動推測。

4.OOM

錯誤提示

堆內(nèi)存溢出

java.lang.OutOfMemoryError: Java heap space

解決

內(nèi)存不夠,數(shù)據(jù)太多就會拋出OOM的Exeception,主要有driver OOM和executor OOM兩種

  1. driver OOM
    一般是使用了collect操作將所有executor的數(shù)據(jù)聚合到driver導(dǎo)致。盡量不要使用collect操作即可。

  2. executor OOM
    可以按下面的內(nèi)存優(yōu)化的方法增加code使用內(nèi)存空間

  • 增加executor內(nèi)存總量,也就是說增加spark.executor.memory的值
  • 增加任務(wù)并行度(大任務(wù)就被分成小任務(wù)了),參考下面優(yōu)化并行度的方法

5.task not serializable

錯誤提示

org.apache.spark.SparkException: Job aborted due to stage failure: 
Task not serializable: java.io.NotSerializableException: ...

解決

如果你在worker中調(diào)用了driver中定義的一些變量,Spark就會將這些變量傳遞給Worker,這些變量并沒有被序列化,所以就會看到如上提示的錯誤了。

val x = new X()  //在driver中定義的變量
dd.map{r => x.doSomething(r) }.collect  //map中的代碼在worker(executor)中執(zhí)行

除了上文的map,還有filter,foreach,foreachPartition等操作,還有一個典型例子就是在foreachPartition中使用數(shù)據(jù)庫創(chuàng)建連接方法。這些變量沒有序列化導(dǎo)致的任務(wù)報錯。

下面提供三種解決方法:

  1. 將所有調(diào)用到的外部變量直接放入到以上所說的這些算子中,這種情況最好使用foreachPartition減少創(chuàng)建變量的消耗。
  2. 將需要使用的外部變量包括sparkConf,SparkContext,都用 @transent進(jìn)行注解,表示這些變量不需要被序列化
  3. 將外部變量放到某個class中對類進(jìn)行序列化。

6.driver.maxResultSize太小

錯誤提示

Caused by: org.apache.spark.SparkException:
 Job aborted due to stage failure: Total size of serialized 
 results of 374 tasks (1026.0 MB) is bigger than
  spark.driver.maxResultSize (1024.0 MB)

解決

spark.driver.maxResultSize默認(rèn)大小為1G 每個Spark action(如collect)所有分區(qū)的序列化結(jié)果的總大小限制,簡而言之就是executor給driver返回的結(jié)果過大,報這個錯說明需要提高這個值或者避免使用類似的方法,比如countByValue,countByKey等。

將值調(diào)大即可

spark.driver.maxResultSize 2g

7.taskSet too large

錯誤提示

WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.

這個WARN可能還會導(dǎo)致ERROR

Caused by: java.lang.RuntimeException: Failed to commit task

Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit

解決

如果你比較了解spark中的stage是如何劃分的,這個問題就比較簡單了。
一個Stage中包含的task過大,一般由于你的transform過程太長,因此driver給executor分發(fā)的task就會變的很大。
所以解決這個問題我們可以通過拆分stage解決。也就是在執(zhí)行過程中調(diào)用cache.count緩存一些中間數(shù)據(jù)從而切斷過長的stage。

8. driver did not authorize commit

driver did not authorize commit

9. 環(huán)境報錯

  1. driver節(jié)點內(nèi)存不足
    driver內(nèi)存不足導(dǎo)致無法啟動application,將driver分配到內(nèi)存足夠的機(jī)器上或減少driver-memory

     Java HotSpot(TM) 64-Bit Server VM warning: INFO:
    

    os::commit_memory(0x0000000680000000, 4294967296, 0) failed;
    error='Cannot allocate memory' (errno=12)

  2. hdfs空間不夠
    hdfs空間不足,event_log無法寫入,所以 ListenerBus會報錯 ,增加hdfs空間(刪除無用數(shù)據(jù)或增加節(jié)點)

     Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
      File /tmp/spark-history/app-20151228095652-0072.inprogress 
      could only be replicated to 0 nodes instead of minReplication (=1)
    
     ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
     java.lang.reflect.InvocationTargetException
    
  3. spark編譯包與hadoop版本不一致
    下載對應(yīng)hadoop版本的spark包或自己編譯。

     java.io.InvalidClassException: org.apache.spark.rdd.RDD;
      local class incompatible: stream classdesc serialVersionUID
    
  4. driver機(jī)器端口使用過多
    在一臺機(jī)器上沒有指定端口的情況下,提交了超過15個任務(wù)。

     16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI
     java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
    

    提交任務(wù)時指定app web ui端口號或是修改最大嘗試次數(shù)

--conf spark.ui.port=4100
--conf spark.port.maxRetries=50
  1. 中文亂碼

    使用write.csv等方法寫出到hdfs的文件,中文亂碼。JVM使用的字符集如果沒有指定,默認(rèn)會使用系統(tǒng)的字符集,因為各個節(jié)點系統(tǒng)字符集并不都是UTF8導(dǎo)致,所以會出現(xiàn)這個問題。直接給JVM指定字符集即可。

    spark-defaults.conf

     spark.executor.extraJavaOptions -Dfile.encoding=UTF-8
    

三. 一些python錯誤

1.python版本過低

java.io.UIException: Cannot run program "python2.7": error=2,沒有那個文件或目錄

spark使用的python版本為2.7,centOS默認(rèn)python版本為2.6,升級即可。

2.python權(quán)限不夠

錯誤提示

部分節(jié)點上有錯誤提示

java.io.IOExeception: Cannot run program "python2.7": error=13, 權(quán)限不夠

解決

新加的節(jié)點運(yùn)維裝2.7版本的python,python命令是正確的,python2.7卻無法調(diào)用,只要改改環(huán)境變量就好了。

3.pickle使用失敗

錯誤提示

TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)',
 <type 'sklearn.tree._tree.Tree'>, (10, array([1], dtype=int32), 1,
  <sklearn.tree._tree.RegressionCriterion object at 0x100077480>,
   50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>))

4.python編碼錯誤

錯誤提示

UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)

解決

方法1:

import sys  
reload(sys)  
sys.setdefaultencoding('utf-8')

方法2:

//報錯
str(u'中國')
//不報錯
str(u'中國'.encode('utf-8'))

解決

該pickle文件是在0.17版本的scikit-learn下訓(xùn)練出來的,有些機(jī)器裝的是0.14版本,版本不一致導(dǎo)致,升級可解決,記得將老版本數(shù)據(jù)清理干凈,否則會報各種Cannot import xxx的錯誤。

四. 一些優(yōu)化

1. 部分Executor不執(zhí)行任務(wù)

有時候會發(fā)現(xiàn)部分executor并沒有在執(zhí)行任務(wù),為什么呢?

(1) 任務(wù)partition數(shù)過少,
要知道每個partition只會在一個task上執(zhí)行任務(wù)。改變分區(qū)數(shù),可以通過 repartition 方法,即使這樣,在 repartition 前還是要從數(shù)據(jù)源讀取數(shù)據(jù),此時(讀入數(shù)據(jù)時)的并發(fā)度根據(jù)不同的數(shù)據(jù)源受到不同限制,常用的大概有以下幾種:

hdfs - block數(shù)就是partition數(shù)
mysql - 按讀入時的分區(qū)規(guī)則分partition
es - 分區(qū)數(shù)即為 es 的 分片數(shù)(shard)

(2) 數(shù)據(jù)本地性的副作用

taskSetManager在分發(fā)任務(wù)之前會先計算數(shù)據(jù)本地性,優(yōu)先級依次是:

process(同一個executor) -> node_local(同一個節(jié)點) -> rack_local(同一個機(jī)架) -> any(任何節(jié)點)

Spark會優(yōu)先執(zhí)行高優(yōu)先級的任務(wù),任務(wù)完成的速度很快(小于設(shè)置的spark.locality.wait時間),則數(shù)據(jù)本地性下一級別的任務(wù)則一直不會啟動,這就是Spark的延時調(diào)度機(jī)制。

舉個極端例子:運(yùn)行一個count任務(wù),如果數(shù)據(jù)全都堆積在某一臺節(jié)點上,那將只會有這臺機(jī)器在長期計算任務(wù),集群中的其他機(jī)器則會處于等待狀態(tài)(等待本地性降級)而不執(zhí)行任務(wù),造成了大量的資源浪費(fèi)。

判斷的公式為:

curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)

其中 curTime 為系統(tǒng)當(dāng)前時間,lastLaunchTime 為在某優(yōu)先級下最后一次啟動task的時間

如果滿足這個條件則會進(jìn)入下一個優(yōu)先級的時間判斷,直到 any,不滿足則分配當(dāng)前優(yōu)先級的任務(wù)。

數(shù)據(jù)本地性任務(wù)分配的源碼在 taskSetManager.scala 。

如果存在大量executor處于等待狀態(tài),可以降低以下參數(shù)的值(也可以設(shè)置為0),默認(rèn)都是3s。

    spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

當(dāng)你數(shù)據(jù)本地性很差,可適當(dāng)提高上述值,當(dāng)然也可以直接在集群中對數(shù)據(jù)進(jìn)行balance。

2. spark task 連續(xù)重試失敗

有可能哪臺worker節(jié)點出現(xiàn)了故障,task執(zhí)行失敗后會在該 executor 上不斷重試,達(dá)到最大重試次數(shù)后會導(dǎo)致整個 application 執(zhí)行失敗,我們可以設(shè)置失敗黑名單(task在該節(jié)點運(yùn)行失敗后會換節(jié)點重試),可以看到在源碼中默認(rèn)設(shè)置的是 0

private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
    conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)  

spark-default.sh 中設(shè)置

spark.scheduler.executorTaskBlacklistTime 30000

當(dāng) task 在該 executor 運(yùn)行失敗后會在其它 executor 中啟動,同時此 executor 會進(jìn)入黑名單30s(不會分發(fā)任務(wù)到該executor)。

3. 內(nèi)存

如果你的任務(wù)shuffle量特別大,同時rdd緩存比較少可以更改下面的參數(shù)進(jìn)一步提高任務(wù)運(yùn)行速度。

spark.storage.memoryFraction - 分配給rdd緩存的比例,默認(rèn)為0.6(60%),如果緩存的數(shù)據(jù)較少可以降低該值。
spark.shuffle.memoryFraction - 分配給shuffle數(shù)據(jù)的內(nèi)存比例,默認(rèn)為0.2(20%)
剩下的20%內(nèi)存空間則是分配給代碼生成對象等。

如果任務(wù)運(yùn)行緩慢,jvm進(jìn)行頻繁gc或者內(nèi)存空間不足,或者可以降低上述的兩個值。
"spark.rdd.compress","true" - 默認(rèn)為false,壓縮序列化的RDD分區(qū),消耗一些cpu減少空間的使用

4. 并發(fā)

mysql讀取并發(fā)度優(yōu)化
spark 讀取 hdfs 數(shù)據(jù)分區(qū)規(guī)則

spark.default.parallelism
發(fā)生shuffle時的并行度,在standalone模式下的數(shù)量默認(rèn)為core的個數(shù),也可手動調(diào)整,數(shù)量設(shè)置太大會造成很多小任務(wù),增加啟動任務(wù)的開銷,太小,運(yùn)行大數(shù)據(jù)量的任務(wù)時速度緩慢。

spark.sql.shuffle.partitions
sql聚合操作(發(fā)生shuffle)時的并行度,默認(rèn)為200,如果該值太小會導(dǎo)致OOM,executor丟失,任務(wù)執(zhí)行時間過長的問題

相同的兩個任務(wù):
spark.sql.shuffle.partitions=300:

并行度300

spark.sql.shuffle.partitions=500:

并行度500

速度變快主要是大量的減少了gc的時間。

但是設(shè)置過大會造成性能惡化,過多的碎片task會造成大量無謂的啟動關(guān)閉task開銷,還有可能導(dǎo)致某些task hang住無法執(zhí)行。

這里寫圖片描述

修改map階段并行度主要是在代碼中使用rdd.repartition(partitionNum)來操作。

5. shuffle

spark-sql join優(yōu)化
map-side-join 關(guān)聯(lián)優(yōu)化
spark range join 優(yōu)化

6. 磁盤

磁盤IO優(yōu)化

7.序列化

kryo Serialization

8.數(shù)據(jù)本地性

Spark不同Cluster Manager下的數(shù)據(jù)本地性表現(xiàn)
spark讀取hdfs數(shù)據(jù)本地性異常

9.代碼

編寫Spark程序的幾個優(yōu)化點

10.PySpark

PySpark Pandas UDF
在spark dataFrame 中使用 pandas dataframe
pypy on PySpark

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容