資源調(diào)優(yōu)
在開發(fā)完Spark作業(yè)之后,就該為作業(yè)配置合適的資源了。Spark的資源參數(shù),基本都可以在spark-submit命令中作為參數(shù)設(shè)置
Spark作業(yè)基本運(yùn)行原理

- spark-submit提交一個(gè)Spark作業(yè)之后,這個(gè)作業(yè)就會啟動一個(gè)對應(yīng)的Driver進(jìn)程
- 根據(jù)你使用的部署模式(deploy-mode:client/cluster)不同,Driver進(jìn)程可能在本地啟動,也可能在集群中某個(gè)工作節(jié)點(diǎn)上啟動
- Driver進(jìn)程本身會根據(jù)我們設(shè)置的參數(shù),占有一定數(shù)量的內(nèi)存和CPU Core
- Driver進(jìn)程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是YARN)申請運(yùn)行Spark作業(yè)需要使用的資源。資源指的就是Executor進(jìn)程。在各個(gè)工作節(jié)點(diǎn)上,啟動一定數(shù)量的Executor進(jìn)程,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU Core
- 在申請到了作業(yè)執(zhí)行所需的資源之后,Driver進(jìn)程就會開始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了
- Driver進(jìn)程會將我們編寫的Spark作業(yè)代碼分拆為多個(gè)Stage,每個(gè)Stage執(zhí)行一部分代碼片段,并為每個(gè)Stage創(chuàng)建一批Task,然后將這些Task分配到各個(gè)Executor進(jìn)程中執(zhí)行
- Task是最小的計(jì)算單元,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們編寫的某個(gè)代碼片段),只是每個(gè)Task處理的數(shù)據(jù)不同而已。一個(gè)Stage的所有Task都執(zhí)行完畢之后,會在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果,然后Driver就會調(diào)度運(yùn)行下一個(gè)Stage。下一個(gè)Stage的Task的輸入數(shù)據(jù)就是上一個(gè)Stage輸出的中間結(jié)果
- Spark是根據(jù)Shuffle類算子來進(jìn)行Stage的劃分,Shuffle算子執(zhí)行之前的代碼會被劃分為一個(gè)Stage
Executor的內(nèi)存主要分為三塊
- 第一塊是讓task執(zhí)行我們自己編寫的代碼時(shí)使用,默認(rèn)是占Executor總內(nèi)存的20%
- 第二塊是讓Task通過Shuffle過程拉取了上一個(gè)Stage的Task的輸出后,進(jìn)行聚合等操作時(shí)使用,默認(rèn)也是占Executor總內(nèi)存的20%
- 第三塊是讓RDD持久化時(shí)使用,默認(rèn)占Executor總內(nèi)存的60%
參考Spark內(nèi)存架構(gòu)
Task的執(zhí)行速度是跟每個(gè)Executor進(jìn)程的CPU Core數(shù)量有直接關(guān)系的。一個(gè)CPU Core同一時(shí)間只能執(zhí)行一個(gè)線程。而每個(gè)Executor進(jìn)程上分配到的多個(gè)Task,都是以每個(gè)Task一條線程的方式,多線程并發(fā)運(yùn)行的。如果CPU Core數(shù)量比較充足,而且分配到的Task數(shù)量比較合理,那么通常來說,可以比較快速和高效地執(zhí)行完這些Task線程
資源參數(shù)調(diào)優(yōu)
num-executors
設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來執(zhí)行
Driver在向YARN集群管理器申請資源時(shí),YARN集群管理器會盡可能按照你的設(shè)置來在集群的各個(gè)工作節(jié)點(diǎn)上,啟動相應(yīng)數(shù)量的Executor進(jìn)程
每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右(根據(jù)集群的規(guī)模)的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無法充分利用集群資源;設(shè)置的太多的話,大部分隊(duì)列可能無法給予充分的資源
executor-memory
設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存
Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常,也有直接的關(guān)聯(lián)
每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適,num-executors乘以executor-memory,是不能超過隊(duì)列的最大內(nèi)存量的,Spark集群可以設(shè)置每個(gè)executor最多使用的內(nèi)存大小。如果你是跟團(tuán)隊(duì)里其他人共享這個(gè)資源隊(duì)列,那么申請的內(nèi)存量最好不要超過資源隊(duì)列最大總內(nèi)存的1/3~1/2
executor-cores
設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量
決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力
數(shù)量設(shè)置為2~4個(gè)較為合適,依據(jù)資源隊(duì)列的最大CPU Core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量,來決定每個(gè)Executor進(jìn)程可以分配到幾個(gè)CPU Core
driver-memory
設(shè)置Driver進(jìn)程的內(nèi)存
Driver的內(nèi)存通常來說不設(shè)置,或者設(shè)置1G左右應(yīng)該就夠了
如果需要使用 collect 算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理,那么必須確保Driver的內(nèi)存足夠大,否則會出現(xiàn)OOM內(nèi)存溢出的問題
spark.default.parallelism
設(shè)置每個(gè)stage的默認(rèn)task數(shù)量
不去設(shè)置這個(gè)參數(shù),那么Spark根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認(rèn)是一個(gè)HDFS block對應(yīng)一個(gè)task,通常來說,Spark默認(rèn)設(shè)置的數(shù)量是偏少的
設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適
如果task數(shù)量偏少的話,Executor進(jìn)程可能根本就沒有task執(zhí)行,也就是白白浪費(fèi)了資源
spark.storage.memoryFraction
設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6
根據(jù)你選擇的不同的持久化策略,如果內(nèi)存不夠時(shí),可能數(shù)據(jù)就不會持久化,或者數(shù)據(jù)會寫入磁盤
如果Spark作業(yè)中,有較多的RDD持久化操作,該參數(shù)的值可以適當(dāng)提高一些
如果Spark作業(yè)中的Shuffle類操作比較多,而持久化操作比較少,那么這個(gè)參數(shù)的值適當(dāng)降低一些比較合適
如果發(fā)現(xiàn)作業(yè)由于頻繁的GC導(dǎo)致運(yùn)行緩慢(通過Spark WebUI可以觀察到作業(yè)的GC耗時(shí)),意味著Task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值
spark.shuffle.memoryFraction
設(shè)置Shuffle過程中一個(gè)task拉取到上個(gè)Stage的Task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例,默認(rèn)是0.2
Shuffle操作在進(jìn)行聚合時(shí),如果發(fā)現(xiàn)使用的內(nèi)存超出了這個(gè)20%的限制,那么多余的數(shù)據(jù)就會溢寫到磁盤文件中去,此時(shí)就會極大地降低性能
如果Spark作業(yè)中的RDD持久化操作較少,Shuffle操作較多時(shí),建議降低持久化操作的內(nèi)存占比,提高Shuffle操作的內(nèi)存占比比例
如果發(fā)現(xiàn)作業(yè)由于頻繁的GC導(dǎo)致運(yùn)行緩慢,意味著Task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值
示例
#args :
/usr/local/spark/bin/spark-submit --class Process \
--master yarn-cluster \
--name Process \
--queue fetech \
--num-executors 20 \
--driver-memory 5g \
--executor-memory 4g \
--executor-cores 2 \
--conf spark.default.parallelism=500 \
--conf spark.storage.memoryFraction=0.5 \
/process.jar $1 $2 $3 $4