1. 談?wù)凷park RDD 的幾大特性,并深入講講體現(xiàn)在哪?
Spark的RDD有五大特性:
- A list of partitions:RDD是由多個(gè)分區(qū)(partition)組成的集合。
- A function for computing each split:對(duì)于RDD的計(jì)算,其實(shí)是RDD的每個(gè)分區(qū)都會(huì)執(zhí)行這個(gè)計(jì)算。
- A list of dependencies on other RDDs:RDD是一條依賴鏈,每一個(gè)RDD都會(huì)記錄其父RDD的信息。
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):分區(qū)器作用在K:V結(jié)構(gòu)的RDD中(HashPartition、RangPartition)。
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):計(jì)算就近原則,Spark會(huì)盡量的將計(jì)算和存儲(chǔ)放在同一個(gè)位置中。
- 具體體現(xiàn):
1、2體現(xiàn)的是Spark的分布式計(jì)算,Partition分布在多臺(tái)節(jié)點(diǎn)上,每臺(tái)節(jié)點(diǎn)上都有N個(gè)Partition在同步指定指定的function(計(jì)算)。
3體現(xiàn)的RDD的失敗恢復(fù)容錯(cuò)機(jī)制,RDD的創(chuàng)建只能通過創(chuàng)建或者由上一個(gè)RDD生成,創(chuàng)建RDD其實(shí)也能理解為依賴于文件或者集合數(shù)據(jù),子RDD出現(xiàn)故障,可以通過重新調(diào)用該依賴鏈上的父RDD來重新生成。
4體現(xiàn)的是RDD的分區(qū)特性,K:V結(jié)構(gòu)的數(shù)據(jù),可以通過默認(rèn)分區(qū)器(HashPartition)、范圍分區(qū)器(RangePartiton)或者自定義的分區(qū)器來進(jìn)行分區(qū)、打亂、shuffle、重分配。
5體現(xiàn)的計(jì)算的優(yōu)化之一:就近策略。Spark通過將計(jì)算和數(shù)據(jù)放置在同一進(jìn)程、同一節(jié)點(diǎn)、同一機(jī)架等方式來盡力減少shuffle的產(chǎn)生。
2. RDD的彈性主要體現(xiàn)在哪?
RDD又被稱為彈性分布式數(shù)據(jù)集,其彈性體現(xiàn)在:
- 自動(dòng)進(jìn)行內(nèi)存和磁盤的切換:主要體現(xiàn)在溢寫方面。
- 基于Lineage的高效容錯(cuò):也就是依賴鏈容錯(cuò),也叫血緣關(guān)系。
- task和stage在失敗后會(huì)進(jìn)行指定次數(shù)的重試機(jī)制:task會(huì)重試3次后調(diào)用stage重試機(jī)制,stage重試4次后任務(wù)退出。
- checkpoint和persist的數(shù)據(jù)持久化、緩存機(jī)制:checkpoint可以將備份保存在HDFS上,主要用于失敗重調(diào);persist(cache)將中間數(shù)據(jù)保存在內(nèi)存中,主要用于計(jì)算加速(后續(xù)計(jì)算中多次調(diào)用該數(shù)據(jù)集)。
3. 描述下Spark的任務(wù)提交流程?
4. 談?wù)凷park的寬窄依賴?
-
寬依賴:一個(gè)父RDD同時(shí)被多個(gè)子RDD依賴,也就是說該父RDD的數(shù)據(jù)會(huì)分發(fā)到多個(gè)子RDD上去,此時(shí)會(huì)觸發(fā)shuffle操作。寬依賴會(huì)觸發(fā)shuffle。
寬依賴.png -
窄依賴:與寬依賴相反,一個(gè)父RDD同時(shí)只被一個(gè)子RDD依賴,但是一個(gè)子RDD可能同時(shí)依賴多個(gè)父RDD。窄依賴不會(huì)觸發(fā)shuffle。
窄依賴1.png
窄依賴2.png
5. Spark的job、stage劃分,task跟分區(qū)的關(guān)系?
job由action算法劃分,每個(gè)action算子就會(huì)觸發(fā)一個(gè)新的job。
stage由寬依賴劃分,每個(gè)寬依賴算子就會(huì)將job切分為兩個(gè)stage。
task是Spark任務(wù)的最小執(zhí)行單位,運(yùn)行在Executor的線程池中,而partition是數(shù)據(jù)的最小單位,每個(gè)partition對(duì)應(yīng)由一個(gè)task執(zhí)行。所以Spark的并發(fā)通常與partition緊密關(guān)聯(lián)。
6. Spark的算子分為哪幾類?分別說說你常用的幾個(gè)算子?
Spark算子通常問題transformation算子和action算子。
- 常用的transform算子:map、mapPartition、flatmap、reduceByKey、groupByKey、combineByKey、filter等。
- 常用的action算子:collect、task、reduce、saveAsTextFile、count、first、foreach等。
7. 說說Spark的shuffle?與MapReduce的shuffle有什么不同?
Shuffle就是講數(shù)據(jù)按照一定規(guī)則打散并進(jìn)行重新排序的過程,在Spark中,shuffle的過程由寬依賴算子觸發(fā),一般都是groupByKey、reduceByKey等類型的操作算子。這些過程會(huì)將原來的數(shù)據(jù),按照key為分組重新將有序(分組)的數(shù)據(jù)分發(fā)到對(duì)應(yīng)的節(jié)點(diǎn)上去,這個(gè)過程就叫shuffle。
- MapReduce Shuffle
MapReduce Shuffle主要分為map端shffle和reduce端shuffle。- Map端shuffle
- 數(shù)據(jù)并不會(huì)直接寫入磁盤,這樣的IO效率太過低下。MR會(huì)首先將數(shù)據(jù)寫入環(huán)形緩沖區(qū)(默認(rèn)100M),并在寫入過程中進(jìn)行分區(qū)(Partition),對(duì)于每一個(gè)鍵值來說,都會(huì)增加一個(gè)partition的屬性。
- 當(dāng)環(huán)形緩沖區(qū)到達(dá)閾值時(shí)(默認(rèn)80%),數(shù)據(jù)就會(huì)溢寫(spill)到磁盤的臨時(shí)文件中(具體本地物理位置由參數(shù)
mapreduce.cluster.local.dir指定),并在寫入時(shí)進(jìn)行排序(快排)和預(yù)聚合(combine,可選,視業(yè)務(wù)環(huán)境而定;該操作會(huì)優(yōu)先在map端進(jìn)行數(shù)據(jù)預(yù)聚合,減少shuffle的數(shù)據(jù)量,可以有效提高shuffle的效率),溢寫完成后,數(shù)據(jù)首先按照partition屬性排序,其次按照key的hash值排序。 - 溢寫完成后,MR會(huì)將所有生成的臨時(shí)文件進(jìn)行聚合排序(歸并排序)并生成最終的正式文件,此時(shí)會(huì)按照partition進(jìn)行歸并,按照key的hash值進(jìn)行排序。當(dāng)溢寫文件數(shù)量超過參數(shù)
min.num.spills.for.combine的值時(shí)(默認(rèn)為3),MR會(huì)進(jìn)行第二次合并,直到文件數(shù)據(jù)在參數(shù)指定值之下。
Map端Shuffle.png
- Reduce端shuffle
- Reduce Tak從每個(gè)Map Task的結(jié)果文件中(合并之后的溢寫文件)拉取對(duì)應(yīng)的partition的數(shù)據(jù)。(數(shù)據(jù)結(jié)果文件會(huì)在Map端進(jìn)行排序,并且會(huì)額外生成一個(gè)索引文件記錄每個(gè)分區(qū)的起始偏移量,此時(shí)Reduce端直接根據(jù)偏移量拉取分區(qū)數(shù)據(jù)即可。)
-
Reduce Task在拉取數(shù)據(jù)時(shí),會(huì)再次進(jìn)行排序、合并(歸并排序)。拉取數(shù)據(jù)完成后,shuffle即結(jié)束。
Reduce端Shuffle.png
- Map端shuffle
- Spark Shuffle
Spark Shuffle是對(duì)MapReduce Shuffle的優(yōu)化。Spark的shuffle write等同于MR的Map shuffle,shuffle read等同于Reduce shuffle。
Spark在發(fā)展過程中,產(chǎn)生了多個(gè)Shuffle Manager,早期版本使用HashShuffleManager(1.2前,1.2版本后默認(rèn)shuffle管理器更改為SortShuffleManager,2.0后移除),后期使用SortShuffleManager。
HashShuffleManager也分為兩種:普通版本以及優(yōu)化后的合并版本。HashShuffleManager不具備排序的功能。- HashShuffleManager(未優(yōu)化)
- shuffle管理器根據(jù)shuffle write數(shù)量和shuffle read數(shù)量創(chuàng)建對(duì)應(yīng)的bucket緩存,以及對(duì)應(yīng)的ShuffleBlockFile臨時(shí)文件(緩存及文件數(shù)量取決于shuffle write的數(shù)量 * shuffle read的數(shù)量:每個(gè)shuffle write都可能包含有所有shuffle read的分組數(shù)據(jù),因此會(huì)創(chuàng)建對(duì)應(yīng)shuffle read數(shù)量的緩存和文件數(shù)量)。
- 溢寫完成后,每個(gè)shuffle read會(huì)從各個(gè)shuffle write溢寫成的ShuffleBlockFile中拉取數(shù)據(jù)(此時(shí)的RDD為ShuffleRDD),拉取到的數(shù)據(jù)優(yōu)先寫入內(nèi)存,內(nèi)存達(dá)到閾值溢寫到磁盤。
-
shuffle read拉取數(shù)據(jù)完成后,會(huì)對(duì)數(shù)據(jù)進(jìn)行聚合(如果寬依賴算子為reduceByKey等類型)。此時(shí)shuffle完成。
HashShuffleManager.png
- HashShuffleManager(優(yōu)化)
未優(yōu)化的HashShuffle會(huì)生成大量的小文件,會(huì)對(duì)文件系統(tǒng)造成很大的壓力,因此Spark針對(duì)該問題,出現(xiàn)了對(duì)普通HashShuffleManager的優(yōu)化手段。
該優(yōu)化手段通過參數(shù)spark.shuffle.consolidateFiles=true開啟,開啟后shuffle管理器會(huì)根據(jù)CPU Core的數(shù)量 * shuffle read的數(shù)量生成指定數(shù)量的bucket緩存和ShuffleBlockFile文件。其實(shí)就是將運(yùn)行在一個(gè)Core上的shuffle write生成的臨時(shí)文件進(jìn)行了歸并操作。此時(shí),每個(gè)ShuffleBlockFile文件都會(huì)對(duì)應(yīng)一個(gè)索引文件,用于標(biāo)記每個(gè)shuffle write在文件中的偏移位置。
HashShuffleManager(優(yōu)化).png - SortShuffleManager(普通)
SortShuffleManager也分為兩種模式:普通和bypass,bypass主要是針對(duì)少量數(shù)據(jù)的情況下使用。
普通SortShuffleManager與MR的Shuffle過程類似。- 數(shù)據(jù)寫入到一個(gè)內(nèi)存結(jié)構(gòu)中(根據(jù)不同的Shuffle算子,選用不同的內(nèi)存結(jié)果,如reduceByKey為Map結(jié)構(gòu),join為Array結(jié)構(gòu))。
- 內(nèi)存到達(dá)閾值,會(huì)溢寫數(shù)據(jù)到磁盤文件。在溢寫前會(huì)對(duì)數(shù)據(jù)按照key的hash值進(jìn)行排序(快排),然后再將數(shù)據(jù)分批(batch,默認(rèn)每batch為10000條)溢寫到磁盤。
- 溢寫完成后,SortShuffleManager會(huì)將所有臨時(shí)文件進(jìn)行合并,并生成索引文件(用于標(biāo)識(shí)shuffle read拉取數(shù)據(jù)的偏移量)。
- 此時(shí)shuffle write過程結(jié)束,每個(gè)shuffle write會(huì)生成1個(gè)臨時(shí)文件,最終生成task數(shù)量的臨時(shí)文件。
-
shuffle read根據(jù)索引文件拉取數(shù)據(jù)到本地,并執(zhí)行后續(xù)邏輯。此時(shí),整個(gè)shuffle過程結(jié)束。
SortShuffleManager.png
- SortShuffleManager(bypass)
bypass其實(shí)就是在shuffle數(shù)據(jù)量小的時(shí)候自動(dòng)運(yùn)行的模式,該模式放棄了排序的功能,整體功能等同于優(yōu)化后的HashShuffleManager。
bypass觸發(fā)機(jī)制如下:- shuffle write數(shù)量小于參數(shù)
spark.shuffle.sort.bypassMergeThreshold的設(shè)定值(默認(rèn)200)。 -
不是由聚合類算子(reduceByKey之類,只能由join之類的算子觸發(fā))觸發(fā)的shuffle。
SortShuffleManager(bypass).png
MapReduce與Spark的Shuffle的異同(主要根據(jù)SortShuffleManger來說)
- shuffle write數(shù)量小于參數(shù)
- shuffle的意義一致,都是將map端的數(shù)據(jù),按照制定key的hash值進(jìn)行分區(qū)后,將分區(qū)后的數(shù)據(jù)分別傳輸?shù)讲煌膔educe進(jìn)行處理,以提高數(shù)據(jù)的計(jì)算性能。
- 在Spark 1.2之后,兩者的shuffle都會(huì)先進(jìn)行排序。這樣有利于進(jìn)行預(yù)聚合(combine),并且對(duì)shuffle的IO性能也會(huì)產(chǎn)生一定的優(yōu)化。
- MapReduce每個(gè)階段劃分的很詳細(xì):map、spill、merge、shuffle、sort、reduce等。Spark根據(jù)不同的依賴對(duì)這些步驟進(jìn)行了聚合,最終只保留各個(gè)stage,所以spill、merge等功能需要包含在transformation內(nèi)。
- MR每個(gè)map和reduce都會(huì)觸發(fā)shuffle操作。Spark在一個(gè)stage內(nèi),不會(huì)觸發(fā)shuffle操作,只在stage間觸發(fā)。
圖片等信息來自于(https://blog.csdn.net/u012369535/article/details/90757029)
- HashShuffleManager(未優(yōu)化)
8. 了解bypass機(jī)制么?詳細(xì)說說?
在Spark 1.2之后,shuffle管理器由HashShuffleManager更改為SortShuffleManager。而bypass其實(shí)就是在shuffle數(shù)據(jù)量小的時(shí)候自動(dòng)運(yùn)行的模式,該模式放棄了排序的功能,整體功能等同于優(yōu)化后的HashShuffleManager。
具體參考第7題。
9. Spark和MR有什么異同?
- MR是基于磁盤的分布式計(jì)算框架,Spark是基于內(nèi)存的分布式計(jì)算框架。MR每個(gè)map和reduce之間,必然會(huì)產(chǎn)生shuffle將數(shù)據(jù)落盤;而Spark優(yōu)化了此功能,shuffle僅產(chǎn)生在stage間,stage內(nèi)部數(shù)據(jù)不落地,shuffle也是優(yōu)先寫在內(nèi)存中,內(nèi)存不足才會(huì)溢寫到磁盤。
- Spark具有更高的容錯(cuò)性。Spark通過checkpoint和persist(主要是緩存)進(jìn)行容錯(cuò),計(jì)算失敗后無需重頭進(jìn)行計(jì)算;而MR失敗后必須重頭進(jìn)行計(jì)算。
- Spark的框架更完善。Spark具有SparkCore、SparkSQL、SparkStreaming、SparkML、SparkGraph等一站式功能;而MR僅具備基礎(chǔ)的MapReduce離線分析引擎。
- Spark基于內(nèi)存進(jìn)行計(jì)算,在執(zhí)行海量數(shù)據(jù)計(jì)算時(shí),并不是太穩(wěn)定;而MR在海量數(shù)據(jù)計(jì)算時(shí),會(huì)比Spark運(yùn)行更穩(wěn)定。
- Spark基于內(nèi)存計(jì)算,在同樣數(shù)據(jù)量的情況下,執(zhí)行效率要遠(yuǎn)高于MR。
10. 談?wù)勀闫綍r(shí)是怎么應(yīng)對(duì)Spark的數(shù)據(jù)傾斜問題的?
在shuffle的時(shí)候,ShuffleManager會(huì)將各個(gè)節(jié)點(diǎn)上相同key的數(shù)據(jù)拉在一個(gè)shuffle read節(jié)點(diǎn)上進(jìn)行計(jì)算,此時(shí)如果某個(gè)key或者某個(gè)特殊值數(shù)據(jù)量過大,就會(huì)發(fā)生數(shù)據(jù)傾斜。數(shù)據(jù)傾斜只會(huì)發(fā)生在shuffle過程中。具體表現(xiàn)為:Spark任務(wù)執(zhí)行時(shí)間長,具體表現(xiàn)在大多task已完成,只有少部分task需要執(zhí)行很長時(shí)間;Spark發(fā)生OOM的問題也可能是數(shù)據(jù)傾斜導(dǎo)致的。
解決思路:
- 使用Hive ETL等工具預(yù)處理數(shù)據(jù)
使用Hive或者M(jìn)R等計(jì)算框架,提前對(duì)數(shù)據(jù)進(jìn)行join或者預(yù)聚合。
該方案主要思路就是將數(shù)據(jù)傾斜的壓力轉(zhuǎn)移到Hive、MR等計(jì)算框架,從而避免在Spark計(jì)算時(shí)發(fā)生OOM等問題。但是該方案無法實(shí)際上解決數(shù)據(jù)傾斜問題。 - 過濾少數(shù)導(dǎo)致數(shù)據(jù)傾斜的key
大多數(shù)時(shí)候,數(shù)據(jù)傾斜都是因?yàn)槟硞€(gè)key或者特殊值(null)而導(dǎo)致的,此時(shí)如果這些數(shù)據(jù)對(duì)業(yè)務(wù)本身并不會(huì)造成影響,那么可以在join或者分組前將其filter過濾掉。如計(jì)算數(shù)據(jù)內(nèi)存在多少個(gè)key,則可以過濾null值,在隨后的結(jié)果值+1即可。 - 提高shuffle并行度
Spark默認(rèn)的并行度只有200,有時(shí)候數(shù)據(jù)量很大,但是并行度很低,導(dǎo)致每個(gè)線程都需要計(jì)算很大的數(shù)據(jù)量,此時(shí)可能會(huì)導(dǎo)致任務(wù)執(zhí)行效率低。此時(shí)可以在執(zhí)行聚合類算子時(shí),傳遞并行度,如reduceByKey(1000),SparkSQL下需要通過參數(shù)來設(shè)置全局并行度spark.sql.shuffle.partitions=1000。 - 使用預(yù)聚合
對(duì)于reduceByKey或者SparkSQL中的group by等操作有效,為每個(gè)key生成隨機(jī)值,如hive -> hive_1,此時(shí)進(jìn)行聚合,因?yàn)槊總€(gè)key都追加了隨機(jī)值后綴,會(huì)將原來數(shù)據(jù)量大的key打散,在聚合后,將隨機(jī)值還原,再進(jìn)行第二階段的聚合,此時(shí)生成的結(jié)果為真實(shí)結(jié)果。
此方案僅適用于group by等聚合類業(yè)務(wù)場景。 - 使用map join
如果是大小表join,可以將小表進(jìn)行廣播,將reduce join更改為map join,此方案可以直接消除shuffle。
如果是雙大表,可以將其中一個(gè)大表進(jìn)行過濾,然后使用過濾后的小表再進(jìn)行map join操作。
如果雙大表都不可以進(jìn)行過濾,可以將其中key分布均勻的大表進(jìn)行拆分,拆分后的小表進(jìn)行map join操作,最后將所有結(jié)果union即為最終結(jié)果。
如果在業(yè)務(wù)場景中,雙表會(huì)頻繁使用join操作,此時(shí)可以用分桶表進(jìn)行優(yōu)化。 - 采樣傾斜key進(jìn)行分拆join
在方案5的第三條中,如果數(shù)據(jù)分布均勻的表key較少,但是數(shù)據(jù)量很大,拆分后也無法形成map join可以采用此方案。
對(duì)數(shù)據(jù)分布不均勻的表進(jìn)行采樣,確認(rèn)數(shù)據(jù)量較大的key,并將這些key(數(shù)據(jù)集A)和其他key(數(shù)據(jù)集B)拆分為兩個(gè)數(shù)據(jù)集,然后數(shù)據(jù)集B正常join,數(shù)據(jù)集A對(duì)key打上隨機(jī)后綴然后再進(jìn)行join(此時(shí),數(shù)據(jù)集B也需要指定同樣的隨機(jī)值操作),join結(jié)束后,還原key并與數(shù)據(jù)集A的結(jié)果進(jìn)行union。 - 隨機(jī)前綴和RDD擴(kuò)容
如果執(zhí)行join操作的表都是大表,都存在數(shù)據(jù)分布均勻,且數(shù)據(jù)分布不均勻的key很多時(shí),可以采用該方案。
與方案6類似,但是缺少拆分?jǐn)?shù)據(jù)A的過程。
11. 在平時(shí)的工作中,你對(duì)Spark做了什么優(yōu)化?
- 避免創(chuàng)建重復(fù)的RDD,盡量對(duì)RDD進(jìn)行復(fù)用。
- 對(duì)多次使用的RDD進(jìn)行持久化處理,使用cache或者persist算子,將中間數(shù)據(jù)緩存到內(nèi)存中,可以減少重復(fù)計(jì)算的過程。
- 盡量避免使用shuffle類算子。分布式計(jì)算中,shuffle是最影響任務(wù)性能的關(guān)鍵之一。
- 盡量使用map-side預(yù)聚合操作。如果無法避免shuffle,在業(yè)務(wù)場景支撐的情況下,可以使用具有預(yù)聚合的算子來替代普通聚合算子,如reduceByKey或者aggregateByKey替代groupByKey。
- 使用高性能的算子。如預(yù)聚合算子,分區(qū)算子(mapPartition),在filter后使用coalesce進(jìn)行分區(qū)收縮等。
- 多使用廣播變量,實(shí)現(xiàn)map join操作。
- 如果有自定義數(shù)據(jù)結(jié)果,盡量使用Kryo替代Java默認(rèn)序列化工具。
- 熟悉數(shù)據(jù)和業(yè)務(wù)場景,盡力減少數(shù)據(jù)傾斜的產(chǎn)生。
- 對(duì)部分參數(shù)進(jìn)行調(diào)優(yōu)。
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
...
12. Spark的內(nèi)存管理機(jī)制了解嗎?堆外內(nèi)存了解嗎?
Spark內(nèi)存管理,主要是針對(duì)的是Executor的JVM內(nèi)存。
作為一個(gè)JVM進(jìn)程,Executor的內(nèi)存管理機(jī)制是建立在JVM的堆內(nèi)存上的,Spark對(duì)JVM堆內(nèi)存進(jìn)行了更詳細(xì)的分配,以充分利用JVM堆內(nèi)存。并引入了堆外內(nèi)存(Off-Heap)的機(jī)制,可以直接在工作節(jié)點(diǎn)的內(nèi)存中開辟空間,進(jìn)一步優(yōu)化了內(nèi)存的利用。
- 堆內(nèi)內(nèi)存
使用--excutor-memory進(jìn)行控制。堆內(nèi)內(nèi)存在Spark的不同版本里被分為兩類:靜態(tài)內(nèi)存管理機(jī)制和動(dòng)態(tài)內(nèi)存管理機(jī)制。-
靜態(tài)內(nèi)存管理機(jī)制(Spark1.x版本默認(rèn)內(nèi)存管理機(jī)制)
靜態(tài)內(nèi)存管理機(jī)制中,內(nèi)存分配后不可變更。
Spark靜態(tài)內(nèi)存管理機(jī)制.png -
動(dòng)態(tài)內(nèi)存管理機(jī)制(Spark2.x版本默認(rèn)內(nèi)存管理機(jī)制)
Storage區(qū)和Execution區(qū)可以動(dòng)態(tài)互相占用,但是在雙方都內(nèi)存不足的情況下,Storage動(dòng)態(tài)占用部分可被強(qiáng)制收回,Execution動(dòng)態(tài)占用部分不可被強(qiáng)制收回。
Spark動(dòng)態(tài)內(nèi)存管理機(jī)制.png
-
- 堆外內(nèi)存
默認(rèn)情況下,堆外內(nèi)存并不啟用,需要通過參數(shù)spark.memory.offHeap.enabled進(jìn)行啟用,并通過參數(shù)spark.memory.offHeap.size對(duì)堆外內(nèi)存進(jìn)行內(nèi)存空間大小分配。
Spark2.0后,堆外內(nèi)存管理機(jī)制由Tachyon變更為JDK Unsafe API,Spark可以直接操作操作系統(tǒng)內(nèi)存,減少了不必要的內(nèi)存開銷以及頻繁的GC操作。堆外內(nèi)存也可以被精確的申請(qǐng)和釋放。
Spark堆外內(nèi)存.png
具體信息請(qǐng)參考(https://blog.csdn.net/pre_tender/article/details/101517789)
13. 說說Spark的分區(qū)機(jī)制?
為什么要分區(qū):在分布式運(yùn)算中,最影響性能的往往是網(wǎng)絡(luò)間的通信行為(shuffle),在將數(shù)據(jù)進(jìn)行分區(qū)并將不同的分區(qū)傳輸?shù)街付ǖ挠?jì)算節(jié)點(diǎn)中,由某一個(gè)節(jié)點(diǎn)獨(dú)立計(jì)算某一塊分區(qū)的數(shù)據(jù),可以減少shuffle的數(shù)據(jù)量,從而提升任務(wù)執(zhí)行效率。
RDD的分區(qū)原則:盡可能使分區(qū)數(shù)量 == 任務(wù)CPU Core數(shù)量。
Spark的分區(qū)操作由分區(qū)器來完成。默認(rèn)分區(qū)器由兩個(gè):HashPartition和RangePartition。
- HashParition:
Spark默認(rèn)提供的分區(qū)器,通過key的hash值%分區(qū)數(shù)量,從而獲得該key的分區(qū)結(jié)果。該分區(qū)器可以將同一個(gè)key的數(shù)據(jù)分發(fā)到一臺(tái)節(jié)點(diǎn)上去,但是可能會(huì)造成數(shù)據(jù)傾斜問題。 - RangePartition:
通過抽樣確定各個(gè)Partition的Key范圍。首先會(huì)對(duì)采樣的key進(jìn)行排序,然后計(jì)算每個(gè)Partition平均包含的Key權(quán)重,最后采用平均分配原則來確定各個(gè)Partition包含的Key范圍。盡量保證每個(gè)分區(qū)中數(shù)據(jù)量的均勻,而且分區(qū)與分區(qū)之間是有序的,一個(gè)分區(qū)中的元素肯定都是比另一個(gè)分區(qū)內(nèi)的元素小或者大;但是分區(qū)內(nèi)的元素是不能保證順序的。 - 自定義分區(qū)器:
通過集成Partitioner,重寫numPartitions()、getPartition()方法,自定義特殊業(yè)務(wù)場景分區(qū)器。
14. RDD、DataFrame和Dataset有什么異同?
- RDD、DataFrame和Dataset都是Spark的彈性分布式數(shù)據(jù)集。
- 三者都為惰性的,只有在遇到action類算子才會(huì)觸發(fā)執(zhí)行。
- 三者都具有partition的概念。
- RDD一般在SparkCore的場景中使用。DF和DS在SparkSQL、StructStreaming、SparkML中使用。
- Dataset等同于RDD+scheam。
- DataFrame等同于Dataset[Row]。
- Dataset是強(qiáng)類型的,因此pyspark只能使用DataFrame。
8.DataFrame和DataSet可以保存為帶列頭的csv等特殊格式。
9.三者可以互相轉(zhuǎn)化。
15. Spark 廣播變量在項(xiàng)目中如何運(yùn)用的?
在Spark中,當(dāng)傳遞一個(gè)自定義個(gè)數(shù)據(jù)集(如黑名單、白名單),Spark默認(rèn)會(huì)在Driver進(jìn)行分發(fā),在join等shuffle類算子中,會(huì)在每個(gè)task都分發(fā)一份,這樣會(huì)造成大量的內(nèi)存資源浪費(fèi)和shuffle的產(chǎn)生。
廣播變量就是為了應(yīng)對(duì)該問題的。廣播變量將指定數(shù)據(jù)集分發(fā)在executor中,而非task中,從而減少了內(nèi)存占用,并且在join等shuffle類操作中,可以避免節(jié)點(diǎn)數(shù)據(jù)傳輸而產(chǎn)生的shuffle操作。
但是廣播變量是只讀的,不能進(jìn)行修改,而且由于廣播變量在每個(gè)executor中都會(huì)保存一份副本,因此如果該變量過大,會(huì)造成OOM的出現(xiàn)。
創(chuàng)建廣播變量:
val a = 3
val broadcast = sc.broadcast(a)
獲取廣播變量:
val c = broadcast.value
16. Spark 累加器在項(xiàng)目中用來做什么?
在Spark程序中,我們通常會(huì)對(duì)某一項(xiàng)值做監(jiān)控或者對(duì)程序進(jìn)行調(diào)試,這種時(shí)候都需要用來累加器(計(jì)數(shù)器)。
累加器在Driver進(jìn)行聲明并賦初始值,累加器只能在Driver讀取最終結(jié)果值,只能在Executor中進(jìn)行更改。
創(chuàng)建累加器
val a = sc.accumulator(0)
獲取累加器結(jié)果
val b = a.value
17. Repartition和Colease有什么區(qū)別?是寬依賴?還是窄依賴?
Colease和Repartition都是用來改變Spark程序分區(qū)數(shù)量的。
Colease只能縮小分區(qū)數(shù),不會(huì)產(chǎn)生shuffle操作,是窄依賴。Colease底層調(diào)用的Repartition類。
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends UnaryNode {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override def output: Seq[Attribute] = child.output
}
Repartition可以縮小和放大分區(qū)數(shù),默認(rèn)會(huì)產(chǎn)生shuffle操作,是寬依賴。如果指定進(jìn)行分區(qū)改變,底層調(diào)用的是Repartition類;如果根據(jù)指定字段進(jìn)行分區(qū)改變,底層調(diào)用的是RepartitionByExperssion類。
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
}
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
}
case class RepartitionByExpression(
partitionExpressions: Seq[Expression],
child: LogicalPlan,
numPartitions: Option[Int] = None) extends RedistributeData {
numPartitions match {
case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
case None => // Ok
}
18.SparkStreaming結(jié)合Kafka的兩種方式分別是什么?
Receiver和Direct兩種模式。
- Receiver模式:
SparkStreaming對(duì)Kafka的高級(jí)API消費(fèi)模式,需要消費(fèi)者連接zookeeper來讀取數(shù)據(jù),偏移量是由zookeeper進(jìn)行維護(hù)的。
但是該模式可能會(huì)出現(xiàn)一系列的問題:容易導(dǎo)致數(shù)據(jù)丟失;采用WAL浪費(fèi)資源;Zookeeper的偏移量記錄可能導(dǎo)致SparkStreaming的重復(fù)讀問題;效率偏低。 - Direct模式:
SparkStreaming對(duì)Kafka的低級(jí)API消費(fèi)模式,SparkStreaming直接連接kafka進(jìn)行數(shù)據(jù)消費(fèi),但是需要手動(dòng)維護(hù)偏移量。
該模式下:會(huì)根據(jù)Kafka的Partition數(shù)量自動(dòng)生成RDD的Partition數(shù)量;無需通過WAL來保證數(shù)據(jù)的完整性;可以保證數(shù)據(jù)只被讀了一次;
19. SparkStreaming的WAL了解嗎?
WAL(Write ahead logs):預(yù)寫日志。主要用于故障恢復(fù),保證數(shù)據(jù)的無丟失。
WAL使用文件系統(tǒng)或者數(shù)據(jù)庫作為數(shù)據(jù)持久化,先將數(shù)據(jù)寫入到持久化的日志文件中去,其后才執(zhí)行其他邏輯,此時(shí)如果程序崩潰,在重啟后可以直接讀取預(yù)寫日志進(jìn)行恢復(fù)。
預(yù)寫日志需要通過參數(shù)來開啟spark.streaming.receiver.writeAheadLog.enable=true,并且同時(shí)在SparkStreaming的環(huán)境中設(shè)置checkpoint的保存路徑。
20.SparkStreaming的反壓機(jī)制了解嗎?詳細(xì)介紹下?
SparkStreaming的反壓機(jī)制是1.5版本后退出的新特性,主要用于動(dòng)態(tài)處理數(shù)據(jù)的攝入速度。
當(dāng)批處理時(shí)間大于批次間隔時(shí),說明數(shù)據(jù)處理能力已經(jīng)小于數(shù)據(jù)的進(jìn)入速度,這種情況會(huì)導(dǎo)致數(shù)據(jù)的積壓,最終可能會(huì)引發(fā)程序OOM。
手動(dòng)情況下(一般都是Kafka),可以通過參數(shù)spark.streaming.kafka.maxRatePerPartition來手動(dòng)指定攝入的最大速度。但是這種方法需要提前預(yù)知程序的處理能力和數(shù)據(jù)峰值。
反壓機(jī)制由SparkStreaming動(dòng)態(tài)來調(diào)整數(shù)據(jù)的攝入速度,通過參數(shù)spark.streaming.backpressure.enabled=true來開啟反壓機(jī)制。
其他參數(shù):
spark.streaming.backpressure.enabled=false; // 開啟反壓機(jī)制。默認(rèn)為false。
spark.streaming.backpressure.initialRate // 設(shè)定初始化接收值。只適用于Receiver模式。
spark.streaming.kafka.maxRatePerPartition // 設(shè)定每個(gè)消費(fèi)線程最大消費(fèi)kafka分區(qū)的數(shù)量。默認(rèn)全部。
spark.streaming.stopGracefullyOnShutdown // 設(shè)定未處理數(shù)據(jù),不會(huì)強(qiáng)制killSparkStreaming程序。
21.SparkStreaming如何實(shí)現(xiàn)Exactly-Once語義?
容錯(cuò)語義一般有三種:
- At Most Once:最多一次
- At Least Once:最少一次
- Exactly Once:有且僅有一次
要想實(shí)現(xiàn)Exactly Once語義,必須在源碼、程序、目的三個(gè)環(huán)境都保證是Exactly Once語義。
源:通過Kafka的Direct手動(dòng)維護(hù)偏移量來保證Exactly Once語義。
程序:通過WAL來保證SparkStreaming程序的Exactly Once語義。
目的:通過事務(wù)(事務(wù)的原子性,要么成功要么失敗)或者冪等(多次寫入)來保證一次性語義。











