在Spark中,每一個(gè)進(jìn)程包含一個(gè)executor對(duì)象,一個(gè)executor包含一個(gè)線(xiàn)程池,每個(gè)線(xiàn)程執(zhí)行一個(gè)tasks
- 線(xiàn)程池的好處就在于省去了線(xiàn)程頻繁啟停的開(kāi)銷(xiāo)
- task并發(fā)度的概念
1.每一個(gè)節(jié)點(diǎn)可以啟動(dòng)一個(gè)或者多個(gè)executor進(jìn)程
2.每一個(gè)executor進(jìn)程可以有多個(gè)core組成,每一個(gè)core一次只能執(zhí)行一個(gè)task,core是虛擬出的cpu,人為設(shè)定的,每次任務(wù)不能超過(guò)core數(shù)的并發(fā)度,也可以理解成線(xiàn)程池的最大線(xiàn)程數(shù)
內(nèi)存劃分
20% execution(執(zhí)行內(nèi)存)
join,groupby這類(lèi)算子設(shè)計(jì)內(nèi)存,shuffle數(shù)據(jù)都會(huì)緩存在這個(gè)內(nèi)存區(qū),如果內(nèi)存滿(mǎn),把數(shù)據(jù)寫(xiě)到磁盤(pán)(spill)
60% storage
存儲(chǔ)cache,presist,broadcast(廣播,數(shù)據(jù)量不是特別大的時(shí)候,類(lèi)似MapReduce里的-file分發(fā))數(shù)據(jù)
20% 留給程序自己
版本相關(guān)
1.6.0之前
每個(gè)類(lèi)的內(nèi)存是相互隔離的,導(dǎo)致了executor的內(nèi)存利用率不高,只能使用者自己調(diào)整參數(shù)來(lái)優(yōu)化內(nèi)存
1.6.0以上
execution和storage內(nèi)存是可以相互借用的,減少了OOM(out of memory)的情況發(fā)生
參數(shù)調(diào)優(yōu)
num-executors
- 該作業(yè)總共需要多少executor進(jìn)程執(zhí)行
- 建議:每個(gè)作業(yè)運(yùn)行一班設(shè)置50-100個(gè)左右較合適,設(shè)置太少無(wú)法充分利用集群資源,設(shè)置太多大部分隊(duì)列可能無(wú)法給予充分的資源
executor-memory
- 設(shè)置每個(gè)executor進(jìn)程的內(nèi)存,num-executors * executor-memory代表作業(yè)申請(qǐng)的總內(nèi)存了(盡量不要超過(guò)最大總內(nèi)存的1/3~1/2)
- 建議:設(shè)置4G~8G較合適
- executor內(nèi)存的大小,很多時(shí)候直接決定了spark作業(yè)的性能,而且跟常見(jiàn)的JVM OOM異常也有直接的關(guān)聯(lián)
executor-cores
- 每個(gè)executor進(jìn)程的CPU core數(shù)量,該參數(shù)決定每個(gè)executor進(jìn)行并行執(zhí)行task線(xiàn)程的能力,num-executors * executor-cores代表作業(yè)申請(qǐng)總CPU core數(shù),不要超過(guò)總CPU core的1/3~1/2
- 建議:設(shè)置2~4個(gè)較合適
- 每個(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)task進(jìn)程,因?yàn)槊總€(gè)executor進(jìn)程的CPU core數(shù)量越多,越能夠快速的執(zhí)行完分配給自己的所有task線(xiàn)程
driver-memory
- 設(shè)置Driver進(jìn)程的內(nèi)存
- 建議:通常不同設(shè)置,一般1G就夠了,若出現(xiàn)使用collect算子將RDD數(shù)據(jù)全部拉取到Driver上處理,就必須確保該值足夠大,否則OOM內(nèi)存溢出
spark.default.parallelism
- 每個(gè)stage的默認(rèn)task數(shù)量
- 建議:設(shè)置500~1000較合適,默認(rèn)一個(gè)HDFS的block對(duì)應(yīng)一個(gè)task,Spark默認(rèn)值偏少,這樣導(dǎo)致不能充分利用資源
spark.storage.memoryFraction
- 設(shè)置RDD持久化數(shù)據(jù)在executor內(nèi)存中能占的比例,默認(rèn)0.6,即默認(rèn)executor 60%內(nèi)存可以保存持久化RDD數(shù)據(jù)
- 建議:如有較多的持久化操作,可以設(shè)置高些,超出內(nèi)存的會(huì)頻繁GC,導(dǎo)致運(yùn)行緩慢
spark.shuffle.memoryFraction
- 聚合操作占executor內(nèi)存的比例,默認(rèn)0.2
- 建議:若持久化操作較少,但shuffle較多時(shí),可以降低持久化內(nèi)存占比,提高shuffle操作內(nèi)存占比
spark的作業(yè)提交
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \ #需要多少進(jìn)程
--executor-memory 6G \ #每個(gè)進(jìn)程需要多大內(nèi)存
--executor-cores 4 \ #每個(gè)進(jìn)程可以并發(fā)多少個(gè)task
--driver-memory 1G \ #默認(rèn)1G,本地上傳的時(shí)候需要設(shè)置,以供上傳的數(shù)據(jù)放入內(nèi)存,方便使用
--conf spark.default.parallelism=1000 \ #并發(fā)度
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3
開(kāi)發(fā)調(diào)優(yōu)原則
1.避免創(chuàng)建重復(fù)的RDD
- 對(duì)同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD,不能創(chuàng)建多個(gè)RDD來(lái)代表同一份數(shù)據(jù),極大浪費(fèi)內(nèi)存
2.盡可能復(fù)用同一個(gè)人RDD
- 比如:一個(gè)RDD的數(shù)據(jù)格式是key-value,另一個(gè)單獨(dú)value類(lèi)型,這兩個(gè)RDD的value部分完全一樣,這樣可以復(fù)用,達(dá)到減少算子執(zhí)行次數(shù)
3.對(duì)多次使用的RDD進(jìn)行持久化處理
- 每次對(duì)一個(gè)RDD執(zhí)行一個(gè)算子操作時(shí),都會(huì)重新從源頭處理計(jì)算一遍,性能很差
- 對(duì)多次使用的RDD進(jìn)行持久化,將RDD的數(shù)據(jù)保存在內(nèi)存或者磁盤(pán)中,避免重復(fù)勞動(dòng)
-
借助cache()和persist()方法
示例
持久化級(jí)別
4.避免使用shuffle類(lèi)算子
- 最消耗性能的地方就是shuffle過(guò)程,因?yàn)橐却袛?shù)據(jù)準(zhǔn)備就緒才能執(zhí)行
- 將分布在集群眾多個(gè)節(jié)點(diǎn)上的同一個(gè)key,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合和join處理,比如groupByKey,reduceByKey,join等算子,都會(huì)觸發(fā)shuffle
5.使用map-side預(yù)聚合的shuffle操作
- 一定要使用shuffle的,無(wú)法使用map類(lèi)算子替代的,盡量使用map-side預(yù)聚合的算子
- 思想類(lèi)似MapReduce中的Combiner(提前計(jì)算,減少數(shù)據(jù)規(guī)模)
- 可能的情況下使用reduceByKey或aggregateByKey算子替代groupByKey算子,因?yàn)閞educeByKey或aggregateByKey算子會(huì)使用用戶(hù)自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地相同的key進(jìn)行預(yù)聚合,而groupByKey算子不會(huì)預(yù)聚合

