1、 性能調(diào)優(yōu)
1.1、 分配更多資源
1.1.1、分配哪些資源?
Executor的數(shù)量
每個Executor所能分配的CPU數(shù)量
每個Executor所能分配的內(nèi)存量
Driver端分配的內(nèi)存數(shù)量
1.1.2、在哪里分配這些資源?
在生產(chǎn)環(huán)境中,提交spark作業(yè)時,用的spark-submit shell腳本,里面調(diào)整對應(yīng)的參數(shù):
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的數(shù)量
--driver-memory 100m \ 配置driver的內(nèi)存(影響不大)
--executor-memory 100m \ 配置每個executor的內(nèi)存大小
--total-executor-cores 3 \ 配置所有executor的cpu core數(shù)量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
1.1.3、調(diào)節(jié)到多大,算是最大呢?
常用的資源調(diào)度模式有Spark Standalone和Spark On Yarn。比如說你的每臺機(jī)器能夠給你使用60G內(nèi)存,10個cpu core,20臺機(jī)器。那么executor的數(shù)量是20。平均每個executor所能分配60G內(nèi)存和10個cpu core。
1.1.4、為什么多分配了這些資源以后,性能會得到提升?
加executor:
如果executor數(shù)量比較少,那么,能夠并行執(zhí)行的task數(shù)量就比較少,就意味著,我們的Application的并行執(zhí)行的能力就很弱。
比如有3個executor,每個executor有2個cpu core,那么同時能夠并行執(zhí)行的task,就是6個。6個執(zhí)行完以后,再換下一批6個task。
增加了executor數(shù)量以后,那么,就意味著,能夠并行執(zhí)行的task數(shù)量,也就變多了。比如原先是6個,現(xiàn)在可能可以并行執(zhí)行10個,甚至20個,100個。那么并行能力就比之前提升了數(shù)倍,數(shù)十倍。相應(yīng)的,性能(執(zhí)行的速度),也能提升數(shù)倍~數(shù)十倍。
增加每個executor的cpu core,也是增加了執(zhí)行的并行能力。原本20個executor,每個才2個cpu core。能夠并行執(zhí)行的task數(shù)量,就是40個task。
現(xiàn)在每個executor的cpu core,增加到了4個。能夠并行執(zhí)行的task數(shù)量,就是80個task。就物理性能來看,執(zhí)行的速度,提升了2倍。
增加每個executor的內(nèi)存量。增加了內(nèi)存量以后,對性能的提升,有三點(diǎn):
- 如果需要對RDD進(jìn)行cache,那么更多的內(nèi)存,就可以緩存更多的數(shù)據(jù),將更少的數(shù)據(jù)寫入磁盤,甚至不寫入磁盤。減少了磁盤IO。
- 對于shuffle操作,reduce端,會需要內(nèi)存來存放拉取的數(shù)據(jù)并進(jìn)行聚合。如果內(nèi)存不夠,也會寫入磁盤。如果給executor分配更多內(nèi)存以后,就有更少的數(shù)據(jù),需要寫入磁盤,甚至不需要寫入磁盤。減少了磁盤IO,提升了性能。
- 對于task的執(zhí)行,可能會創(chuàng)建很多對象。如果內(nèi)存比較小,可能會頻繁導(dǎo)致JVM堆內(nèi)存滿了,然后頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內(nèi)存加大以后,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。
1.2、調(diào)節(jié)并行度
1.2.1、并行度的概念
就是指的是Spark作業(yè)中,各個stage的task數(shù)量,代表了Spark作業(yè)的在各個階段(stage)的并行度。
1.2.2、如果不調(diào)節(jié)并行度,導(dǎo)致并行度過低,會怎么樣?
比如現(xiàn)在spark-submit腳本里面,給我們的spark作業(yè)分配了足夠多的資源,比如50個executor,每個executor有10G內(nèi)存,每個executor有3個cpu core。基本已經(jīng)達(dá)到了集群或者yarn隊(duì)列的資源上限。task沒有設(shè)置,或者設(shè)置的很少,比如就設(shè)置了100個task,你的Application任何一個stage運(yùn)行的時候,都有總數(shù)在150個cpu core,可以并行運(yùn)行。但是你現(xiàn)在,只有100個task,平均分配一下,每個executor分配到2個task,ok,那么同時在運(yùn)行的task,只有100個,每個executor只會并行運(yùn)行2個task。每個executor剩下的一個cpu core, 就浪費(fèi)掉了。
你的資源雖然分配足夠了,但是問題是,并行度沒有與資源相匹配,導(dǎo)致你分配下去的資源都浪費(fèi)掉了。
合理的并行度的設(shè)置,應(yīng)該是要設(shè)置的足夠大,大到可以完全合理的利用你的集群資源。比如上面的例子,總共集群有150個cpu core,可以并行運(yùn)行150個task。那么就應(yīng)該將你的Application的并行度,至少設(shè)置成150,才能完全有效的利用你的集群資源,讓150個task,并行執(zhí)行。而且task增加到150個以后,即可以同時并行運(yùn)行,還可以讓每個task要處理的數(shù)據(jù)量變少。比如總共150G的數(shù)據(jù)要處理,如果是100個task,每個task計(jì)算1.5G的數(shù)據(jù),現(xiàn)在增加到150個task,可以并行運(yùn)行,而且每個task主要處理1G的數(shù)據(jù)就可以。
很簡單的道理,只要合理設(shè)置并行度,就可以完全充分利用你的集群計(jì)算資源,并且減少每個task要處理的數(shù)據(jù)量,最終,就是提升你的整個Spark作業(yè)的性能和運(yùn)行速度。
1.2.3、設(shè)置并行度
1)、task數(shù)量,至少設(shè)置成與Spark application的總cpu core數(shù)量相同(最理想情況,比如總共150個cpu core,分配了150個task,一起運(yùn)行,差不多同一時間運(yùn)行完畢)。
2)、官方是推薦,task數(shù)量,設(shè)置成spark application總cpu core數(shù)量的2~3倍,比如150個cpu core,基本要設(shè)置task數(shù)量為300~500。
實(shí)際情況,與理想情況不同的,有些task會運(yùn)行的快一點(diǎn),比如50s就完了,有些task,可能會慢一點(diǎn),要1分半才運(yùn)行完,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core數(shù)量相同,可能還是會導(dǎo)致資源的浪費(fèi)。比如150個task,10個先運(yùn)行完了,剩余140個還在運(yùn)行,但是這個時候,有10個cpu core就空閑出來了,就導(dǎo)致了浪費(fèi)。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍,那么一個task運(yùn)行完了以后,另一個task馬上可以補(bǔ)上來,就盡量讓cpu core不要空閑,同時也是盡量提升spark作業(yè)運(yùn)行的效率和速度,提升性能。
3)、如何設(shè)置一個Spark Application的并行度?
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
1.3、 重構(gòu)RDD架構(gòu)以及RDD持久化
1.3.1、RDD架構(gòu)重構(gòu)與優(yōu)化
盡量去復(fù)用RDD,差不多的RDD,可以抽取成為一個共同的RDD,供后面的RDD計(jì)算時,反復(fù)使用。
1.3.2、公共RDD一定要實(shí)現(xiàn)持久化
對于要多次計(jì)算和使用的公共RDD,一定要進(jìn)行持久化。
持久化,就是將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤中(BlockManager)以后無論對這個RDD做多少次計(jì)算,那么都是直接取這個RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤中,直接提取一份數(shù)據(jù)。
1.3.3、持久化,是可以進(jìn)行序列化的
如果正常將數(shù)據(jù)持久化在內(nèi)存中,那么可能會導(dǎo)致內(nèi)存的占用過大,這樣的話,也許,會導(dǎo)致OOM內(nèi)存溢出。
當(dāng)純內(nèi)存無法支撐公共RDD數(shù)據(jù)完全存放的時候,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲。將RDD的每個partition的數(shù)據(jù),序列化成一個大的字節(jié)數(shù)組,就一個對象。序列化后,大大減少內(nèi)存的空間占用。
序列化的方式,唯一的缺點(diǎn)就是,在獲取數(shù)據(jù)的時候,需要反序列化。
如果序列化純內(nèi)存方式,還是導(dǎo)致OOM內(nèi)存溢出,就只能考慮磁盤的方式、內(nèi)存+磁盤的普通方式(無序列化)、內(nèi)存+磁盤(序列化)。
1.3.4、為了數(shù)據(jù)的高可靠性,而且內(nèi)存充足,可以使用雙副本機(jī)制,進(jìn)行持久化。
持久化的雙副本機(jī)制,持久化后的一個副本,因?yàn)闄C(jī)器宕機(jī)了,副本丟了,就還是得重新計(jì)算一次。持久化的每個數(shù)據(jù)單元,存儲一份副本,放在其他節(jié)點(diǎn)上面。從而進(jìn)行容錯。一個副本丟了,不用重新計(jì)算,還可以使用另外一份副本。這種方式,僅僅針對你的內(nèi)存資源極度充足的情況。
1.4、 廣播變量
1.3.1、概念及需求
Spark Application(我們自己寫的Spark作業(yè))最開始在Driver端,在我們提交任務(wù)的時候,需要傳遞到各個Executor的Task上運(yùn)行。對于一些只讀、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播(提前廣播)給各個Executor。該Executor上的各個Task再從所在節(jié)點(diǎn)的BlockManager獲取變量,如果本地沒有,那么就從Driver遠(yuǎn)程拉取變量副本,并保存在本地的BlockManager中。此后這個executor上的task,都會直接使用本地的BlockManager中的副本。而不是從Driver獲取變量,從而提升了效率。
一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數(shù)據(jù),之后的Task都從本節(jié)點(diǎn)的BlockManager中獲取相關(guān)數(shù)據(jù)。
1.3.2、使用方法
1)調(diào)用SparkContext.broadcast方法創(chuàng)建一個Broadcast[T]對象。任何序列化的類型都可以這么實(shí)現(xiàn)。
2)通過value方法訪問該對象的值。
3)變量只會被發(fā)送到各個節(jié)點(diǎn)一次,應(yīng)作為只讀值處理(修改這個值不會影響到別的節(jié)點(diǎn))
1.5、使用Kryo序列化
1.5.1、概念及需求
默認(rèn)情況下,Spark內(nèi)部是使用Java的序列化機(jī)制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機(jī)制,來進(jìn)行序列化。
這種默認(rèn)序列化機(jī)制的好處在于,處理起來比較方便,也不需要我們手動去做什么事情,只是,你在算子里面使用的變量,必須是實(shí)現(xiàn)Serializable接口的,可序列化即可。
但是缺點(diǎn)在于,默認(rèn)的序列化機(jī)制的效率不高,序列化的速度比較慢,序列化以后的數(shù)據(jù),占用的內(nèi)存空間相對還是比較大。
Spark支持使用Kryo序列化機(jī)制。這種序列化機(jī)制,比默認(rèn)的Java序列化機(jī)制速度要快,序列化后的數(shù)據(jù)更小,大概是Java序列化機(jī)制的1/10。
所以Kryo序列化優(yōu)化以后,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費(fèi)的內(nèi)存資源大大減少。
1.5.2、Kryo序列化機(jī)制啟用以后生效的幾個地方
1)、算子函數(shù)中使用到的外部變量,使用Kryo以后:優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?,可以?yōu)化集群中內(nèi)存的占用和消耗。
2)、持久化RDD,優(yōu)化內(nèi)存的占用和消耗。持久化RDD占用的內(nèi)存越少,task執(zhí)行的時候,創(chuàng)建的對象,就不至于頻繁的占滿內(nèi)存,頻繁發(fā)生GC。
3)、shuffle:可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅堋?/p>
1.5.3、使用方法
第一步,在SparkConf中設(shè)置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類。
第二步,注冊你使用的需要通過Kryo序列化的一些自定義類,SparkConf.registerKryoClasses()。
項(xiàng)目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
1.6、 使用fastutil優(yōu)化數(shù)據(jù)格式
1.6.1、fastutil介紹
fastutil是擴(kuò)展了Java標(biāo)準(zhǔn)集合框架(Map、List、Set。HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue。
fastutil能夠提供更小的內(nèi)存占用,更快的存取速度。我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在于fastutil集合類可以減小內(nèi)存的占用,并且在進(jìn)行集合的遍歷、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時候,提供更快的存取速度。
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實(shí)用的IO類,來處理二進(jìn)制和文本類型的文件。
fastutil最新版本要求Java 7以及以上版本。
fastutil的每一種集合類型,都實(shí)現(xiàn)了對應(yīng)的Java中的標(biāo)準(zhǔn)接口(比如fastutil的map,實(shí)現(xiàn)了Java的Map接口),因此可以直接放入已有系統(tǒng)的任何代碼中。
fastutil還提供了一些JDK標(biāo)準(zhǔn)類庫中沒有的額外功能(比如雙向迭代器)。
fastutil除了對象和原始類型為元素的集合,fastutil也提供引用類型的支持,但是對引用類型是使用等于號(=)進(jìn)行比較的,而不是equals()方法。
fastutil盡量提供了在任何場景下都是速度最快的集合類庫。
1.6.2、Spark中應(yīng)用fastutil的場景
1)、如果算子函數(shù)使用了外部變量。第一,你可以使用Broadcast廣播變量優(yōu)化。第二,可以使用Kryo序列化類庫,提升序列化性能和效率。第三,如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量,首先從源頭上就減少內(nèi)存的占用,通過廣播變量進(jìn)一步減少內(nèi)存占用,再通過Kryo序列化類庫進(jìn)一步減少內(nèi)存占用。
2)、在你的算子函數(shù)里,也就是task要執(zhí)行的計(jì)算邏輯里面,如果有邏輯中,出現(xiàn),要創(chuàng)建比較大的Map、List等集合,可能會占用較大的內(nèi)存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作,此時,可以考慮將這些集合類型使用fastutil類庫重寫,使用了fastutil集合類以后,就可以在一定程度上,減少task創(chuàng)建出來的集合類型的內(nèi)存占用。避免executor內(nèi)存頻繁占滿,頻繁喚起GC,導(dǎo)致性能下降。
1.6.3、關(guān)于fastutil調(diào)優(yōu)的說明
fastutil其實(shí)沒有你想象中的那么強(qiáng)大,也不會跟官網(wǎng)上說的效果那么一鳴驚人。廣播變量、Kryo序列化類庫、fastutil,都是之前所說的,對于性能來說,類似于一種調(diào)味品,烤雞,本來就很好吃了,然后加了一點(diǎn)特質(zhì)的孜然麻辣粉調(diào)料,就更加好吃了一點(diǎn)。分配資源、并行度、RDD架構(gòu)與持久化,這三個就是烤雞。broadcast、kryo、fastutil,類似于調(diào)料。
比如說,你的spark作業(yè),經(jīng)過之前一些調(diào)優(yōu)以后,大概30分鐘運(yùn)行完,現(xiàn)在加上broadcast、kryo、fastutil,也許就是優(yōu)化到29分鐘運(yùn)行完、或者更好一點(diǎn),也許就是28分鐘、25分鐘。
shuffle調(diào)優(yōu),15分鐘。groupByKey用reduceByKey改寫,執(zhí)行本地聚合,也許10分鐘。跟公司申請更多的資源,比如資源更大的YARN隊(duì)列,1分鐘。
1.6.4、fastutil的使用
在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
速度比較慢,可能是從國外的網(wǎng)去拉取jar包,可能要等待5分鐘,甚至幾十分鐘,不等
List<Integer> 相當(dāng)于 IntList
基本都是類似于IntList的格式,前綴就是集合的元素類型。特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。除此之外,還支持object、reference。
1.7、 調(diào)節(jié)數(shù)據(jù)本地化等待時長
1.7.1、task的locality有五種
1)、PROCESS_LOCAL:進(jìn)程本地化,代碼和數(shù)據(jù)在同一個進(jìn)程中,也就是在同一個executor中。計(jì)算數(shù)據(jù)的task由executor執(zhí)行,數(shù)據(jù)在executor的BlockManager中,性能最好。
2)、NODE_LOCAL:節(jié)點(diǎn)本地化,代碼和數(shù)據(jù)在同一個節(jié)點(diǎn)中。比如說,數(shù)據(jù)作為一個HDFS block塊,就在節(jié)點(diǎn)上,而task在節(jié)點(diǎn)上某個executor中運(yùn)行,或者是,數(shù)據(jù)和task在一個節(jié)點(diǎn)上的不同executor中,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸。
3)、NO_PREF:對于task來說,數(shù)據(jù)從哪里獲取都一樣,沒有好壞之分。
4)、RACK_LOCAL:機(jī)架本地化,數(shù)據(jù)和task在一個機(jī)架的兩個節(jié)點(diǎn)上,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點(diǎn)之間進(jìn)行傳輸。
5)、ANY:數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機(jī)架中,性能最差。
1.7.2、Spark的任務(wù)調(diào)度
Spark在Driver上,對Application的每一個stage的task進(jìn)行分配之前都會計(jì)算出每個task要計(jì)算的是哪個分片數(shù)據(jù)。Spark的task分配算法優(yōu)先會希望每個task正好分配到它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn),這樣的話,就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù)。
但是,有時可能task沒有機(jī)會分配到它的數(shù)據(jù)所在的節(jié)點(diǎn)。為什么呢,可能那個節(jié)點(diǎn)的計(jì)算資源和計(jì)算能力都滿了。所以這種時候, Spark會等待一段時間,默認(rèn)情況下是3s(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后,實(shí)在是等待不了了,就會選擇一個比較差的本地化級別。比如說,將task分配到靠它要計(jì)算的數(shù)據(jù)所在節(jié)點(diǎn)比較近的一個節(jié)點(diǎn),然后進(jìn)行計(jì)算。
但是對于第二種情況,通常來說,肯定是要發(fā)生數(shù)據(jù)傳輸,task會通過其所在節(jié)點(diǎn)的BlockManager來獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)自己本地沒有數(shù)據(jù),會通過一個getRemote()方法,通過TransferService(網(wǎng)絡(luò)數(shù)據(jù)傳輸組件)從數(shù)據(jù)所在節(jié)點(diǎn)的BlockManager中,獲取數(shù)據(jù),通過網(wǎng)絡(luò)傳輸回task所在節(jié)點(diǎn)。
對于我們來說,當(dāng)然不希望是類似于第二種情況的了。最好的,當(dāng)然是task和數(shù)據(jù)在一個節(jié)點(diǎn)上,直接從本地executor的BlockManager中獲取數(shù)據(jù),純內(nèi)存,或者帶一點(diǎn)磁盤IO。如果要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)的話,性能肯定會下降的。大量網(wǎng)絡(luò)傳輸,以及磁盤IO,都是性能的殺手。
1.7.3、我們什么時候要調(diào)節(jié)這個參數(shù)
觀察spark作業(yè)的運(yùn)行日志。推薦大家在測試的時候,先用client模式在本地就直接可以看到比較全的日志。日志里面會顯示:starting task…,PROCESS LOCAL、NODE LOCAL
觀察大部分task的數(shù)據(jù)本地化級別,如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了。
如果是發(fā)現(xiàn),好多的級別都是NODE_LOCAL、ANY,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時長。要反復(fù)調(diào)節(jié),每次調(diào)節(jié)完以后再運(yùn)行并觀察日志,看看大部分的task的本地化級別有沒有提升,看看整個spark作業(yè)的運(yùn)行時間有沒有縮短。注意,不要本末倒置,不要本地化級別是提升了,但是因?yàn)榇罅康牡却龝r長,spark作業(yè)的運(yùn)行時間反而增加了,那還是不要調(diào)節(jié)了。
1.7.4、怎么調(diào)節(jié)
spark.locality.wait,默認(rèn)是3s。6s,10s
默認(rèn)情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
new SparkConf().set("spark.locality.wait", "10")
2、JVM調(diào)優(yōu)
堆內(nèi)存存放我們創(chuàng)建的一些對象,有老年代和年輕代。理想情況下,老年代都是放一些生命周期很長的對象,數(shù)量應(yīng)該是很少的,比如數(shù)據(jù)庫連接池。我們在spark task執(zhí)行算子函數(shù)(我們自己寫的),可能會創(chuàng)建很多對象,這些對象都是要放入JVM年輕代中的。
每一次放對象的時候,都是放入eden區(qū)域,和其中一個survivor區(qū)域。另外一個survivor區(qū)域是空閑的。
當(dāng)eden區(qū)域和一個survivor區(qū)域放滿了以后(spark運(yùn)行過程中,產(chǎn)生的對象實(shí)在太多了),就會觸發(fā)minor gc,小型垃圾回收。把不再使用的對象,從內(nèi)存中清空,給后面新創(chuàng)建的對象騰出來點(diǎn)兒地方。
清理掉了不再使用的對象之后,那么也會將存活下來的對象(還要繼續(xù)使用的),放入之前空閑的那一個survivor區(qū)域中。這里可能會出現(xiàn)一個問題。默認(rèn)eden、survior1和survivor2的內(nèi)存占比是8:1:1。問題是,如果存活下來的對象是1.5,一個survivor區(qū)域放不下。此時就可能通過JVM的擔(dān)保機(jī)制(不同JVM版本可能對應(yīng)的行為),將多余的對象,直接放入老年代了。
如果你的JVM內(nèi)存不夠大的話,可能導(dǎo)致頻繁的年輕代內(nèi)存滿溢,頻繁的進(jìn)行minor gc。頻繁的minor gc會導(dǎo)致短時間內(nèi),有些存活的對象,多次垃圾回收都沒有回收掉。會導(dǎo)致這種短生命周期(其實(shí)不一定是要長期使用的)對象,年齡過大,垃圾回收次數(shù)太多還沒有回收到,跑到老年代。
老年代中,可能會因?yàn)閮?nèi)存不足,囤積一大堆,短生命周期的,本來應(yīng)該在年輕代中的,可能馬上就要被回收掉的對象。此時,可能導(dǎo)致老年代頻繁滿溢。頻繁進(jìn)行full gc(全局/全面垃圾回收)。full gc就會去回收老年代中的對象。full gc由于這個算法的設(shè)計(jì),是針對的是,老年代中的對象數(shù)量很少,滿溢進(jìn)行full gc的頻率應(yīng)該很少,因此采取了不太復(fù)雜,但是耗費(fèi)性能和時間的垃圾回收算法。full gc很慢。
full gc / minor gc,無論是快,還是慢,都會導(dǎo)致jvm的工作線程停止工作,stop the world。簡而言之,就是說,gc的時候,spark停止工作了。等著垃圾回收結(jié)束。
內(nèi)存不充足的時候,出現(xiàn)的問題:
- 頻繁minor gc,也會導(dǎo)致頻繁spark停止工作
- 老年代囤積大量活躍對象(短生命周期的對象),導(dǎo)致頻繁full gc,full gc時間很長,短則數(shù)十秒,長則數(shù)分鐘,甚至數(shù)小時??赡軐?dǎo)致spark長時間停止工作。
- 嚴(yán)重影響咱們的spark的性能和運(yùn)行的速度。
2.1、降低cache操作的內(nèi)存占比
spark中,堆內(nèi)存又被劃分成了兩塊,一塊是專門用來給RDD的cache、persist操作進(jìn)行RDD數(shù)據(jù)緩存用的。另外一塊用來給spark算子函數(shù)的運(yùn)行使用的,存放函數(shù)中自己創(chuàng)建的對象。
默認(rèn)情況下,給RDD cache操作的內(nèi)存占比,是0.6,60%的內(nèi)存都給了cache操作了。但是問題是,如果某些情況下cache不是那么的緊張,問題在于task算子函數(shù)中創(chuàng)建的對象過多,然后內(nèi)存又不太大,導(dǎo)致了頻繁的minor gc,甚至頻繁full gc,導(dǎo)致spark頻繁的停止工作。性能影響會很大。
針對上述這種情況,可以在任務(wù)運(yùn)行界面,去查看你的spark作業(yè)的運(yùn)行統(tǒng)計(jì),可以看到每個stage的運(yùn)行情況,包括每個task的運(yùn)行時間、gc時間等等。如果發(fā)現(xiàn)gc太頻繁,時間太長。此時就可以適當(dāng)調(diào)價(jià)這個比例。
降低cache操作的內(nèi)存占比,大不了用persist操作,選擇將一部分緩存的RDD數(shù)據(jù)寫入磁盤,或者序列化方式,配合Kryo序列化類,減少RDD緩存的內(nèi)存占用。降低cache操作內(nèi)存占比,對應(yīng)的,算子函數(shù)的內(nèi)存占比就提升了。這個時候,可能就可以減少minor gc的頻率,同時減少full gc的頻率。對性能的提升是有一定的幫助的。
一句話,讓task執(zhí)行算子函數(shù)時,有更多的內(nèi)存可以使用。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
2.2、調(diào)節(jié)executor堆外內(nèi)存與連接等待時長
調(diào)節(jié)executor堆外內(nèi)存
有時候,如果你的spark作業(yè)處理的數(shù)據(jù)量特別大,幾億數(shù)據(jù)量。然后spark作業(yè)一運(yùn)行,時不時的報(bào)錯,shuffle file cannot find,executor、task lost,out of memory(內(nèi)存溢出)。
可能是executor的堆外內(nèi)存不太夠用,導(dǎo)致executor在運(yùn)行的過程中,可能會內(nèi)存溢出,可能導(dǎo)致后續(xù)的stage的task在運(yùn)行的時候,要從一些executor中去拉取shuffle map output文件,但是executor可能已經(jīng)掛掉了,關(guān)聯(lián)的block manager也沒有了。所以會報(bào)shuffle output file not found,resubmitting task,executor lost。spark作業(yè)徹底崩潰。
上述情況下,就可以去考慮調(diào)節(jié)一下executor的堆外內(nèi)存。也許就可以避免報(bào)錯。此外,有時堆外內(nèi)存調(diào)節(jié)的比較大的時候,對于性能來說,也會帶來一定的提升。
可以調(diào)節(jié)堆外內(nèi)存的上限:
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit腳本里面,去用--conf的方式,去添加配置。用new SparkConf().set()這種方式去設(shè)置是沒有用的!一定要在spark-submit腳本中去設(shè)置。
spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基于yarn的提交模式)
默認(rèn)情況下,這個堆外內(nèi)存上限大概是300M。通常在項(xiàng)目中,真正處理大數(shù)據(jù)的時候,這里都會出現(xiàn)問題,導(dǎo)致spark作業(yè)反復(fù)崩潰,無法運(yùn)行。此時就會去調(diào)節(jié)這個參數(shù),到至少1G(1024M),甚至說2G、4G。
通常這個參數(shù)調(diào)節(jié)上去以后,就會避免掉某些JVM OOM的異常問題,同時呢,會讓整體spark作業(yè)的性能,得到較大的提升。
調(diào)節(jié)連接等待時長
我們知道,executor會優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù)。如果本地block manager沒有的話,那么會通過TransferService,去遠(yuǎn)程連接其他節(jié)點(diǎn)上executor的block manager去獲取。
而此時上面executor去遠(yuǎn)程連接的那個executor,因?yàn)閠ask創(chuàng)建的對象特別大,特別多,
頻繁的讓JVM堆內(nèi)存滿溢,正在進(jìn)行垃圾回收。而處于垃圾回收過程中,所有的工作線程全部停止,相當(dāng)于只要一旦進(jìn)行垃圾回收,spark / executor停止工作,無法提供響應(yīng)。
此時呢,就會沒有響應(yīng),無法建立網(wǎng)絡(luò)連接,會卡住。spark默認(rèn)的網(wǎng)絡(luò)連接的超時時長,是60s,如果卡住60s都無法建立連接的話,那么就宣告失敗了。
報(bào)錯幾次,幾次都拉取不到數(shù)據(jù)的話,可能會導(dǎo)致spark作業(yè)的崩潰。也可能會導(dǎo)致DAGScheduler,反復(fù)提交幾次stage。TaskScheduler反復(fù)提交幾次task。大大延長我們的spark作業(yè)的運(yùn)行時間。
可以考慮調(diào)節(jié)連接的超時時長:
--conf spark.core.connection.ack.wait.timeout=300
spark-submit腳本,切記,不是在new SparkConf().set()這種方式來設(shè)置的。
spark.core.connection.ack.wait.timeout(spark core,connection,連接,ack,wait timeout,建立不上連接的時候,超時等待時長)
調(diào)節(jié)這個值比較大以后,通常來說,可以避免部分的偶爾出現(xiàn)的某某文件拉取失敗,某某文件lost掉了。
3、Shuffle調(diào)優(yōu)
原理概述:
什么樣的情況下,會發(fā)生shuffle?
在spark中,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、join(分情況,先groupByKey后再join是不會發(fā)生shuffle的),等等。
什么是shuffle?
groupByKey,要把分布在集群各個節(jié)點(diǎn)上的數(shù)據(jù)中的同一個key,對應(yīng)的values,都要集中到一塊兒,集中到集群中同一個節(jié)點(diǎn)上,更嚴(yán)密一點(diǎn)說,就是集中到一個節(jié)點(diǎn)的一個executor的一個task中。
然后呢,集中一個key對應(yīng)的values之后,才能交給我們來進(jìn)行處理,<key, Iterable<value>>。reduceByKey,算子函數(shù)去對values集合進(jìn)行reduce操作,最后變成一個value。countByKey需要在一個task中,獲取到一個key對應(yīng)的所有的value,然后進(jìn)行計(jì)數(shù),統(tǒng)計(jì)一共有多少個value。join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應(yīng)的2個value,都能到一個節(jié)點(diǎn)的executor的task中,給我們進(jìn)行處理。
shuffle,一定是分為兩個stage來完成的。因?yàn)檫@其實(shí)是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
reduceByKey(+),在某個action觸發(fā)job的時候,DAGScheduler,會負(fù)責(zé)劃分job為多個stage。劃分的依據(jù),就是,如果發(fā)現(xiàn)有會觸發(fā)shuffle操作的算子,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,劃分為一個stage。shuffle操作的后半部分,以及后面的,直到action為止的RDD和transformation操作,劃分為另外一個stage。
3.1、合并map端輸出文件
3.1.1、如果不合并map端輸出文件的話,會怎么樣?
舉例實(shí)際生產(chǎn)環(huán)境的條件:
100個節(jié)點(diǎn)(每個節(jié)點(diǎn)一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
每個節(jié)點(diǎn),10個task,每個節(jié)點(diǎn)會輸出多少份map端文件?10 * 1000=1萬個文件
總共有多少份map端輸出文件?100 * 10000 = 100萬。
第一個stage,每個task,都會給第二個stage的每個task創(chuàng)建一份map端的輸出文件
第二個stage,每個task,會到各個節(jié)點(diǎn)上面去,拉取第一個stage每個task輸出的,屬于自己的那一份文件。
shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴(yán)重的部分。
通過上面的分析,一個普通的生產(chǎn)環(huán)境的spark job的一個shuffle環(huán)節(jié),會寫入磁盤100萬個文件。
磁盤IO對性能和spark作業(yè)執(zhí)行速度的影響,是極其驚人和嚇人的。
基本上,spark作業(yè)的性能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點(diǎn)。
3.1.2、開啟shuffle map端輸出文件合并的機(jī)制
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
默認(rèn)情況下,是不開啟的,就是會發(fā)生如上所述的大量map端輸出文件的操作,嚴(yán)重影響性能。
3.1.3、合并map端輸出文件,對咱們的spark的性能有哪些方面的影響呢?
1、map task寫入磁盤文件的IO,減少:100萬文件 -> 20萬文件
2、第二個stage,原本要拉取第一個stage的task數(shù)量份文件,1000個task,第二個stage的每個task,都要拉取1000份文件,走網(wǎng)絡(luò)傳輸。合并以后,100個節(jié)點(diǎn),每個節(jié)點(diǎn)2個cpu core,第二個stage的每個task,主要拉取100 * 2 = 200個文件即可。此時網(wǎng)絡(luò)傳輸?shù)男阅芟囊泊蟠鬁p少。
分享一下,實(shí)際在生產(chǎn)環(huán)境中,使用了spark.shuffle.consolidateFiles機(jī)制以后,實(shí)際的性能調(diào)優(yōu)的效果:對于上述的這種生產(chǎn)環(huán)境的配置,性能的提升,還是相當(dāng)?shù)目捎^的。spark作業(yè),5個小時 -> 2~3個小時。
大家不要小看這個map端輸出文件合并機(jī)制。實(shí)際上,在數(shù)據(jù)量比較大,你自己本身做了前面的性能調(diào)優(yōu),executor上去->cpu core上去->并行度(task數(shù)量)上去,shuffle沒調(diào)優(yōu),shuffle就很糟糕了。大量的map端輸出文件的產(chǎn)生,對性能有比較惡劣的影響。
這個時候,去開啟這個機(jī)制,可以很有效的提升性能。
3.2、調(diào)節(jié)map端內(nèi)存緩沖與reduce端內(nèi)存占比
3.2.1、默認(rèn)情況下可能出現(xiàn)的問題
默認(rèn)情況下,shuffle的map task,輸出到磁盤文件的時候,統(tǒng)一都會先寫入每個task自己關(guān)聯(lián)的一個內(nèi)存緩沖區(qū)。
這個緩沖區(qū)大小,默認(rèn)是32kb。
每一次,當(dāng)內(nèi)存緩沖區(qū)滿溢之后,才會進(jìn)行spill溢寫操作,溢寫到磁盤文件中去。
reduce端task,在拉取到數(shù)據(jù)之后,會用hashmap的數(shù)據(jù)格式,來對各個key對應(yīng)的values進(jìn)行匯聚。
針對每個key對應(yīng)的values,執(zhí)行我們自定義的聚合函數(shù)的代碼,比如_ + _(把所有values累加起來)。
reduce task,在進(jìn)行匯聚、聚合等操作的時候,實(shí)際上,使用的就是自己對應(yīng)的executor的內(nèi)存,executor(jvm進(jìn)程,堆),默認(rèn)executor內(nèi)存中劃分給reduce task進(jìn)行聚合的比例是0.2。
問題來了,因?yàn)楸壤?.2,所以,理論上,很有可能會出現(xiàn),拉取過來的數(shù)據(jù)很多,那么在內(nèi)存中,放不下。這個時候,默認(rèn)的行為就是將在內(nèi)存放不下的數(shù)據(jù)都spill(溢寫)到磁盤文件中去。
在數(shù)據(jù)量比較大的情況下,可能頻繁地發(fā)生reduce端的磁盤文件的讀寫。
3.2.2、調(diào)優(yōu)方式
調(diào)節(jié)map task內(nèi)存緩沖:spark.shuffle.file.buffer,默認(rèn)32k(spark 1.3.x不是這個參數(shù),后面還有一個后綴,kb。spark 1.5.x以后,變了,就是現(xiàn)在這個參數(shù))
調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2
3.2.3、在實(shí)際生產(chǎn)環(huán)境中,我們在什么時候來調(diào)節(jié)兩個參數(shù)?
看Spark UI,如果你的公司是決定采用standalone模式,那么狠簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口。進(jìn)去觀察每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁盤和內(nèi)存讀寫的數(shù)據(jù)量。如果是用的yarn模式來提交,從yarn的界面進(jìn)去,點(diǎn)擊對應(yīng)的application,進(jìn)入Spark UI,查看詳情。
如果發(fā)現(xiàn)shuffle 磁盤的write和read,很大。這個時候,就意味著最好調(diào)節(jié)一些shuffle的參數(shù)。首先當(dāng)然是考慮開啟map端輸出文件合并機(jī)制。其次調(diào)節(jié)上面說的那兩個參數(shù)。調(diào)節(jié)的時候的原則:spark.shuffle.file.buffer每次擴(kuò)大一倍,然后看看效果,64,128。spark.shuffle.memoryFraction,每次提高0.1,看看效果。
不能調(diào)節(jié)的太大,太大了以后過猶不及,因?yàn)閮?nèi)存資源是有限的,你這里調(diào)節(jié)的太大了,其他環(huán)節(jié)的內(nèi)存使用就會有問題了。
3.2.4、調(diào)節(jié)以后的效果
map task內(nèi)存緩沖變大了,減少spill到磁盤文件的次數(shù)。reduce端聚合內(nèi)存變大了,減少spill到磁盤的次數(shù),而且減少了后面聚合讀取磁盤文件的數(shù)量。
3.3、HashShuffleManager與SortShuffleManager
3.3.1、shuffle調(diào)優(yōu)概述
大多數(shù)Spark作業(yè)的性能主要就是消耗在了shuffle環(huán) 節(jié),因?yàn)樵摥h(huán)節(jié)包含了大量的磁盤IO、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮?。因此,如果要讓作業(yè)的性能更上一層樓,就有必要對shuffle過程進(jìn)行調(diào)優(yōu)。但是也 必須提醒大家的是,影響一個Spark作業(yè)性能的因素,主要還是代碼開發(fā)、資源參數(shù)以及數(shù)據(jù)傾斜,shuffle調(diào)優(yōu)只能在整個Spark的性能調(diào)優(yōu)中占 到一小部分而已。因此大家務(wù)必把握住調(diào)優(yōu)的基本原則,千萬不要舍本逐末。下面我們就給大家詳細(xì)講解shuffle的原理,以及相關(guān)參數(shù)的說明,同時給出各個參數(shù)的調(diào)優(yōu)建議。
3.3.2、ShuffleManager發(fā)展概述
在Spark的源碼中,負(fù)責(zé)shuffle過程的執(zhí)行、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器。
在Spark 1.2以前,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個非常嚴(yán)重的弊端,就是會產(chǎn)生大量的中間磁盤文件,進(jìn)而由大量的磁盤IO操作影響了性能。
因此在Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較于 HashShuffleManager來說,有了一定的改進(jìn)。主要就在于,每個Task在進(jìn)行shuffle操作時,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分?jǐn)?shù)據(jù)即可。
在spark 1.5.x以后,對于shuffle manager又出來了一種新的manager,tungsten-sort(鎢絲),鎢絲sort shuffle manager。官網(wǎng)上一般說,鎢絲sort shuffle manager,效果跟sort shuffle manager是差不多的。
但是,唯一的不同之處在于,鎢絲manager,是使用了自己實(shí)現(xiàn)的一套內(nèi)存管理機(jī)制,性能上有很大的提升, 而且可以避免shuffle過程中產(chǎn)生的大量的OOM,GC,等等內(nèi)存相關(guān)的異常。
3.3.3、hash、sort、tungsten-sort。如何來選擇?
1、需不需要數(shù)據(jù)默認(rèn)就讓spark給你進(jìn)行排序?就好像mapreduce,默認(rèn)就是有按照key的排序。如果不需要的話,其實(shí)還是建議搭建就使用最基本的HashShuffleManager,因?yàn)樽铋_始就是考慮的是不排序,換取高性能。
2、什么時候需要用sort shuffle manager?如果你需要你的那些數(shù)據(jù)按key排序了,那么就選擇這種吧,而且要注意,reduce task的數(shù)量應(yīng)該是超過200的,這樣sort、merge(多個文件合并成一個)的機(jī)制,才能生效把。但是這里要注意,你一定要自己考量一下,有沒有必要在shuffle的過程中,就做這個事情,畢竟對性能是有影響的。
3、如果你不需要排序,而且你希望你的每個task輸出的文件最終是會合并成一份的,你自己認(rèn)為可以減少性能開銷??梢匀フ{(diào)節(jié)bypassMergeThreshold這個閾值,比如你的reduce task數(shù)量是500,默認(rèn)閾值是200,所以默認(rèn)還是會進(jìn)行sort和直接merge的??梢詫㈤撝嫡{(diào)節(jié)成550,不會進(jìn)行sort,按照hash的做法,每個reduce task創(chuàng)建一份輸出文件,最后合并成一份文件。(一定要提醒大家,這個參數(shù),其實(shí)我們通常不會在生產(chǎn)環(huán)境里去使用,也沒有經(jīng)過驗(yàn)證說,這樣的方式,到底有多少性能的提升)
4、如果你想選用sort based shuffle manager,而且你們公司的spark版本比較高,是1.5.x版本的,那么可以考慮去嘗試使用tungsten-sort shuffle manager??纯葱阅艿奶嵘c穩(wěn)定性怎么樣。
總結(jié):
1、在生產(chǎn)環(huán)境中,不建議大家貿(mào)然使用第三點(diǎn)和第四點(diǎn): 2、如果你不想要你的數(shù)據(jù)在shuffle時排序,那么就自己設(shè)置一下,用hash shuffle manager。 3、如果你的確是需要你的數(shù)據(jù)在shuffle時進(jìn)行排序的,那么就默認(rèn)不用動,默認(rèn)就是sort shuffle manager?;蛘呤鞘裁??如果你壓根兒不care是否排序這個事兒,那么就默認(rèn)讓他就是sort的。調(diào)節(jié)一些其他的參數(shù)(consolidation機(jī)制)。(80%,都是用這種) spark.shuffle.manager:hash、sort、tungsten-sort spark.shuffle.sort.bypassMergeThreshold:200。自己可以設(shè)定一個閾值,默認(rèn)是200,當(dāng)reduce task數(shù)量少于等于200,map task創(chuàng)建的輸出文件小于等于200的,最后會將所有的輸出文件合并為一份文件。這樣做的好處,就是避免了sort排序,節(jié)省了性能開銷,而且還能將多個reduce task的文件合并成一份文件,節(jié)省了reduce task拉取數(shù)據(jù)的時候的磁盤IO的開銷。