Spark學(xué)習(xí)筆記

1.Spark簡(jiǎn)述

Spark通過(guò)內(nèi)存計(jì)算能力,急劇的提高大數(shù)據(jù)處理速度。解決了Hadoop只適合于離線的高吞吐量、批量處理的業(yè)務(wù)場(chǎng)景的弊端,提出了實(shí)時(shí)計(jì)算的解決方法。

1.1 Spark特點(diǎn)

a.快速處理能力:Hadoop的MapReduce中間數(shù)據(jù)采用磁盤存儲(chǔ),而Spark優(yōu)先使用內(nèi)存避免大量的磁盤IO,極大的提高了計(jì)算速度;
b.支持性強(qiáng):Spark支持Java、Scala、Python等;
c.可查詢:Spark SQL支持SQL;
d.支持流計(jì)算:Spark支持實(shí)時(shí)的流計(jì)算Spark Streaming;
e.可用性高:Spark的Standalone模式支持多Master,避免了單點(diǎn)故障;
f.Spark支持的數(shù)據(jù)源多:Spark支持HDFS、Hbase、Hive、Cassandra等,方便數(shù)據(jù)遷移。

1.2 Spark術(shù)語(yǔ)

a.RDD(Resillient Distributed Dataset) :彈性分布式數(shù)據(jù)集,程序可以根據(jù)需要,將RDD的Partition的個(gè)數(shù)進(jìn)行增加和減少(一個(gè)Partition對(duì)應(yīng)一個(gè)task),以提高執(zhí)行效率。而且RDD支持容錯(cuò),如果有失效的RDD,那么Spark可以從父RDD重新生成子RDD;
RDD特征:
1)不可變性: RDD只能通過(guò)轉(zhuǎn)換生成需要的新的RDD;
2)分區(qū)性: RDD由多個(gè)partition構(gòu)成,每個(gè)partition獨(dú)立的存儲(chǔ)于各自的機(jī)器內(nèi)存+磁盤上;
3)內(nèi)存優(yōu)先: 可以全部或部分緩存在內(nèi)存中,當(dāng)內(nèi)存不夠時(shí)再使用磁盤存儲(chǔ);
4)彈性存儲(chǔ): RDD在運(yùn)行時(shí)內(nèi)存不夠時(shí),它會(huì)進(jìn)行內(nèi)存和磁盤進(jìn)行數(shù)據(jù)交換,這對(duì)開(kāi)發(fā)者是透明的;
5)容錯(cuò)性: 當(dāng)RDD數(shù)據(jù)被刪除或者丟失,可使用父RDD重新計(jì)算重新獲取RDD,而且用戶無(wú)感知。
6)本地計(jì)算: 當(dāng)程序和數(shù)據(jù)塊不在同一臺(tái)機(jī)器時(shí),優(yōu)先遷移程序而不是優(yōu)先遷移數(shù)據(jù)。

b.Task:執(zhí)行任務(wù),有ShuffleMapTask(對(duì)應(yīng)Hadoop的Map)和ResultTask(對(duì)應(yīng)Hadoop的Reduce);
c.Job:程序提交的作業(yè),由task構(gòu)成;
d.Stage:Job的分段,一個(gè)Job劃分成多個(gè)Stage;
e.Partition: 數(shù)據(jù)分區(qū),一個(gè)RDD可以后多個(gè)分區(qū)構(gòu)成;
f.Narrow依賴:即窄依賴,子RDD的Partition依賴父RDD中固定的一個(gè)Partition,例如map、filter、union等操作會(huì)產(chǎn)生窄依賴;
g.Shuffle依賴:即寬依賴,子RDD的Partition依賴父RDD中的所有Partition,例如groupByKey、reduceByKey、sortByKey等操作會(huì)產(chǎn)生寬依賴,會(huì)產(chǎn)生shuffle;
h.DAG(Directed Acycle Graph): 記錄RDD之前依賴關(guān)系的有向無(wú)環(huán)圖。
i.算子:是指對(duì)RDD的運(yùn)算,可以理解為Spark處理RDD 的函數(shù)
算子的分類:

i.1:Value型Transformation算子:

針對(duì)處理的數(shù)據(jù)項(xiàng)是value,例如一對(duì)一型(map,flatMap,mapPartition,glom),多對(duì)一型(union,cartesian),多對(duì)多型(groupBy),輸出分區(qū)為輸入分區(qū)子集型(filter,distinct,subtract,sample,takeSample),Cache型(cache:將RDD從磁盤緩存到內(nèi)存,persist:將RDD從進(jìn)行緩存,可以緩存到內(nèi)存或磁盤)。

i.2:Key-Value型Transformation算子:

針對(duì)處理的數(shù)據(jù)項(xiàng)是Key-Value形式的算子。例如一對(duì)一型(mapValues),聚集(combineByKey,reduceByKey,partitionBy,cogroup),連接(join,leftOutJoin,rigthOutJoin)。

i.3:Action算子:

Action算子是指通過(guò)SparkContext執(zhí)行提交作業(yè)的runJob操作,觸發(fā)RDD的DAG的執(zhí)行。例如無(wú)輸出(foreach),HDFS(saveAsTextFile,saveAsObjectFile),Scala集合型(collect,collectAsMap,reduceByKeyLocally,lookup,count,top,reduce,fold,aggregate)。

2.Spark的基本架構(gòu)

Spark基本架構(gòu).jpg

過(guò)程說(shuō)明:
Client 提交應(yīng)用(由Spark Action算子觸發(fā)),Cluster Manager找到一個(gè)Worker啟動(dòng)Driver,Driver向Cluster Manager申請(qǐng)資源,然后將應(yīng)用轉(zhuǎn)換為RDD DAG,再有DAG Scheduler 將RDD DAG 轉(zhuǎn)化為Stage DAG(一個(gè)Stage由一組相同的task集合構(gòu)成),然后提交給TaskScheduler,由TaskScheduler將task(每個(gè)task對(duì)應(yīng)一個(gè)Partition)交給Executor執(zhí)行。

a.Cluster Manager:
Spark集群管理器,主要負(fù)責(zé)資源的分配和管理,負(fù)責(zé)Worker上的內(nèi)存,CPU等資源的分配,不負(fù)責(zé)Executor的資源分配和管理。常用的Spark資源管理器有Standalone,YARN,Mesos,EC2等。

b.Driver:
是spark的驅(qū)動(dòng)節(jié)點(diǎn),執(zhí)行spark中的main方法,負(fù)責(zé)代碼的執(zhí)行工作。主要任務(wù)如下:
1.將用戶查詢轉(zhuǎn)換為任務(wù);
2.在Executor之間調(diào)度任務(wù);
3.跟蹤Executor的執(zhí)行情況;
4.支持UI展示運(yùn)行情況。

c.Worker:
Spark中的工作節(jié)點(diǎn),得到Cluster Manager的分配到資源的Worker的工作有:創(chuàng)建Executor,將自己的資源和任務(wù)分配給Executor,同步資源信息給Cluster Manager。

d.Executor:
Executor是一個(gè)JVM進(jìn)程:
1.負(fù)責(zé)執(zhí)行Spark中的task執(zhí)行,并將結(jié)果返回給Driver;
2.通過(guò)自己的Block Manager為RDD提供基于內(nèi)存的存儲(chǔ)(如果需要),因此RDD是可以直接緩存在Executor進(jìn)程內(nèi)的,故執(zhí)行效率高。

各組件之間通信采用Netty通信服務(wù),每個(gè)組件有一個(gè)InBox負(fù)責(zé)接收數(shù)據(jù),N個(gè)OutBox(N取決于要發(fā)送數(shù)據(jù)的接收組件的個(gè)數(shù))

2.1 Spark運(yùn)行模式:

模式 說(shuō)明
local 本地運(yùn)行,可以指定CPU的核心數(shù)量
Standalone Spark自帶的集群運(yùn)行模式
Mesos 在Mesos集群上運(yùn)行,Driver和Worker都運(yùn)行在Mesos上
yarn-client Driver運(yùn)行在本地,Worker運(yùn)行在YARN上
yarn-cluster Driver運(yùn)行在YARN上,Worker運(yùn)行在YARN上,線上環(huán)境基本都是這種模式

2.2 Spark的task提交過(guò)程

Spark-task-提交過(guò)程.jpg

a.spark根據(jù)transaction操作,將RDD構(gòu)建血緣關(guān)系圖,即DAG,然后由action算子觸發(fā)job調(diào)度執(zhí)行;

b.DAGScheduler負(fù)責(zé)Stage的調(diào)度,將job劃分成多個(gè)stage,然后將stage打包成taskSet交給TaskScheduler調(diào)度;

c.TaskScheduler負(fù)責(zé)task的調(diào)度,將從DAGScheduler獲取的TaskSet按照指定的調(diào)度策略(FIFO使用先進(jìn)先出的TaskSetManger隊(duì)列或者FAIR)分發(fā)到Executor上執(zhí)行;

d.SchedulerBackend負(fù)責(zé)提供可用資源,接收Executor的注冊(cè)信息,并維護(hù)Executor的狀態(tài),資源情況的上報(bào)等。

調(diào)度策略:
FIFO:使用先進(jìn)先出的隊(duì)列,TaskSetManger為一個(gè)節(jié)點(diǎn)。

FAIR:對(duì)TaskSetManger的runningTasks個(gè)數(shù),minShare值,weight值;
主要原則有:

runningTasks < minShare的先執(zhí)行;
minShare使用率低的先執(zhí)行;
weight使用率低的先執(zhí)行;

最后將排序后的TaskSetManger放入緩存隊(duì)列中,然后依次交給Executor執(zhí)行。

2.3 Job,Stage,task的劃分

Stage劃分.jpg

說(shuō)明:
每個(gè)Action算子是一個(gè)job,一個(gè)job由shuffle(寬依賴)分割成多個(gè)Stage,一個(gè)Stage內(nèi)有多少個(gè)Partition就產(chǎn)生多少個(gè)task。故一個(gè)job中task的數(shù)量 = stage數(shù)量 * 每個(gè)stage的task數(shù)量。
由shuffle寬依賴劃分Stage的原因:
shuffle寬依賴中子RDD的Partition會(huì)依賴父RDD的多個(gè)Partition,這樣就會(huì)出現(xiàn)一些父Partition沒(méi)有準(zhǔn)備好數(shù)據(jù),導(dǎo)致不能繼續(xù)計(jì)算,直到父RDD的所有Partition都準(zhǔn)備好了,才能夠執(zhí)行將父RDD轉(zhuǎn)換為子RDD的計(jì)算,而且往往需要跨節(jié)點(diǎn)數(shù)據(jù)傳輸。而窄依賴是父RDD的一個(gè)Partition決定了子RDD的一個(gè)Partition,直接計(jì)算就可以了。另外,在數(shù)據(jù)恢復(fù)時(shí),窄依賴只需要重新執(zhí)行丟失子RDD的Partition的父RDD的Partition即可,而shuffle寬依賴需要考慮通過(guò)恢復(fù)所有父RDD的Partition,然后通過(guò)計(jì)算再獲得子RDD的Partition。

Task的本地化等級(jí):

名稱 說(shuō)明
PROCESS_LOCAL 進(jìn)程本地化,即Task和對(duì)應(yīng)的Partition在一個(gè)Executor中,性能最好
NODE_LOCAL 節(jié)點(diǎn)本地化,即Task和對(duì)應(yīng)的Partition在一個(gè)Worker中,但是不在一個(gè)Executor中,數(shù)據(jù)需要進(jìn)程間傳輸
RACK_LOCAL 機(jī)架本地化,即Task和對(duì)應(yīng)的Partition在一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上,數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)在節(jié)點(diǎn)間傳輸
NO_PREF 對(duì)于Task來(lái)說(shuō),從哪里去都一樣,沒(méi)有優(yōu)劣之分
ANY Task和數(shù)據(jù)不在一個(gè)機(jī)架上,性能最差。

Spark在調(diào)度時(shí),盡可能的讓每個(gè)task以最高級(jí)別的本地性級(jí)別執(zhí)行,但是當(dāng)本地性級(jí)別對(duì)應(yīng)的所有節(jié)點(diǎn)都沒(méi)有空閑資源時(shí),那么Spark會(huì)等待重試,如果超過(guò)閾值時(shí)間,那么將會(huì)降低本地性級(jí)別啟動(dòng)。

Shuffle的任務(wù)個(gè)數(shù):
map task個(gè)數(shù):
當(dāng)Spark從HDFS讀取數(shù)據(jù)時(shí),task的個(gè)數(shù)由HDFS的split個(gè)數(shù)相同;
當(dāng)Spark的Shuffle之前執(zhí)行了repartition或者coalesce操作,那么task的個(gè)數(shù)和Partition個(gè)數(shù)相同。

reduce task個(gè)數(shù):
如果配置了spark.default.parallelism=N,那么reduce的task的個(gè)數(shù)為N;
如果沒(méi)有配置,那么task的個(gè)數(shù)和Partition個(gè)數(shù)相同。

3.Spark的Shuffle過(guò)程

3.1 我們先來(lái)看看Hadoop的Shuffle過(guò)程:
Hadoop-Shuffle.jpg
說(shuō)明:

Reduce獲得Map的中間輸出結(jié)果后,會(huì)對(duì)這些數(shù)據(jù)在磁盤上進(jìn)行merge sort,需要大量的IO。

3.2 Spark基于Hash的Shuffle:
Spark-Hash-Shuffle.jpg
說(shuō)明:

在Hash Shuffle的時(shí)候,每個(gè)task會(huì)根據(jù)Reduce的個(gè)數(shù)創(chuàng)建相同個(gè)數(shù)的bucket,故bucket的總個(gè)數(shù)是task個(gè)數(shù) * Reduce的個(gè)數(shù)。相比Hadoop的Shuffle,Hash Shuffle避免了不必要的排序。

缺點(diǎn):

task個(gè)數(shù)和Reduce的個(gè)數(shù)比較大時(shí),該Shuffle會(huì)生成大量的bucket文件,不但對(duì)系統(tǒng)產(chǎn)生很大壓力,也影響了IO吞吐量。另外,Map的中間結(jié)果是首先保存到內(nèi)存中的,然后再寫入磁盤,對(duì)內(nèi)存容量要求比較高。

3.3 Spark基于Hash的Shuffle的優(yōu)化
Spark-Hash-Shuffle的優(yōu)化.jpg
說(shuō)明:

使用Consolidation機(jī)制(spark.shuffle.consolidation=true開(kāi)啟),將一個(gè)CPU core的所有Task的輸出到一個(gè)ShuffleBlockFile文件中,不同的Task輸出到不同的ShuffleBlockFile的Segment中。即只有CPU core中第一個(gè)執(zhí)行的task要?jiǎng)?chuàng)建磁盤文件,后面該CPU core再執(zhí)行后面的task時(shí),復(fù)用之前第一個(gè)task創(chuàng)建的磁盤文件。

3.4 Spark基于sort的Shuffle
Spark-sort-shuffle.jpg

sort的Shuffle有兩種運(yùn)行模式
a.普通運(yùn)行模式
當(dāng)執(zhí)行shuffle時(shí),數(shù)據(jù)會(huì)首先寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)中(reduceByKey使用map結(jié)構(gòu),join使用Array結(jié)構(gòu)),每一條數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)后,會(huì)判斷是否達(dá)到了閾值,如果達(dá)到了閾值,那么根據(jù)key將內(nèi)存結(jié)構(gòu)中的數(shù)據(jù)進(jìn)行排序,排序之后,那么將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)使用Java的BufferedOutputStream分批寫入到磁盤中(默認(rèn)batch=10000),然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。多次flush內(nèi)存結(jié)構(gòu)數(shù)據(jù),會(huì)產(chǎn)生多個(gè)臨時(shí)文件,最后將這些臨時(shí)文件都進(jìn)行合并,即merge。

b.ByPass運(yùn)行模式
當(dāng)Shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)時(shí)(默認(rèn)為200)且不是聚合類的shuffle算子,則啟用ByPass模式。
其過(guò)程大致和未經(jīng)優(yōu)化的Hash Shuffle相同,只是后面多了一步文件merge的過(guò)程。
好處:

1.不會(huì)進(jìn)行排序;
2.向磁盤寫數(shù)據(jù)的方式不同;

4.Spark的內(nèi)存管理

Spark內(nèi)存.jpg

4.1 堆內(nèi)內(nèi)存(on-heap memory):

指Executor中的堆內(nèi)存,即JVM中的heap中的空間,由JVM統(tǒng)一管理。通過(guò)-executor-memory或spark.executor.memory參數(shù)可指定大小。

1.storage內(nèi)存: 主要保存RDD的緩存,廣播變量的內(nèi)存部分為;
2.Execution內(nèi)存:主要保存shuffle時(shí)產(chǎn)生的數(shù)據(jù);
3.other內(nèi)存:保存Spark內(nèi)部的對(duì)象實(shí)例,或者用戶自定義的對(duì)象實(shí)例。

由于Spark的堆內(nèi)內(nèi)存,交由JVM管理,故會(huì)出現(xiàn)Spark標(biāo)記為釋放的實(shí)例,在JVM方面還沒(méi)有被垃圾回收器回收,因此JVM實(shí)際可用的堆內(nèi)存小于Spark記錄的可用內(nèi)存,即Spark記錄的可用堆內(nèi)內(nèi)存會(huì)和JVM實(shí)際的堆內(nèi)內(nèi)存不一致,故有可能會(huì)導(dǎo)致OMM。

4.2 堆外內(nèi)存(off-heap memory):

Spark從操作系統(tǒng)獲取的內(nèi)存,存儲(chǔ)序列化的二進(jìn)制對(duì)象。Spark使用JDK Unsafe API管理堆外內(nèi)存,跳過(guò)了JVM,精準(zhǔn)的申請(qǐng)和釋放內(nèi)存,減少不必要的堆內(nèi)存開(kāi)銷,以及JVM GC帶來(lái)的性能問(wèn)題,也提高了內(nèi)存空間使用情況計(jì)算的精度。
默認(rèn)情況下不開(kāi)啟堆外內(nèi)存,可通過(guò)spark.memory.offHeap.enabled開(kāi)啟,并設(shè)置堆外內(nèi)存大小,堆外空間有storage內(nèi)存和Execution內(nèi)存,沒(méi)有other內(nèi)存空間。

4.3 內(nèi)存分配:

a.靜態(tài)內(nèi)存分配:

早期的Spark采用靜態(tài)內(nèi)存分配,即程序運(yùn)行期間的存儲(chǔ)內(nèi)存,執(zhí)行內(nèi)存和other內(nèi)存大小都是固定的,都是在Spark啟動(dòng)時(shí)設(shè)置好的。


image.png
缺點(diǎn):

需要對(duì)Spark深入了解,對(duì)任務(wù)邏輯很清楚的情況下,才可以分配好存儲(chǔ)空間,否則很容易空間分布不均衡。故目前很少使用這種分配方式。

b.統(tǒng)一內(nèi)存管理:

Spark1.6之后引入,即Storage和Execution存儲(chǔ)空間在同一塊區(qū)域,可以動(dòng)態(tài)占用另一方的空閑區(qū)域。

image.png

a.Spark啟動(dòng)時(shí),設(shè)置Storage和Execution空間大??;
b.當(dāng)一個(gè)空間不夠(放不下一個(gè)Block),另外一個(gè)空閑時(shí),則借用空閑空間; 當(dāng)Storage和Execution空間都不夠時(shí),將數(shù)據(jù)存儲(chǔ)到磁盤;
c.當(dāng)自己Execution內(nèi)存不夠時(shí),可以要求另一方歸還借用的空間;
d.當(dāng)自己Storage內(nèi)存不夠時(shí),無(wú)法可以要求另一方歸還借用的空間,主要因?yàn)镾huffle的過(guò)程比較復(fù)雜。

優(yōu)點(diǎn):
提高內(nèi)存使用率,降低Spark內(nèi)存維護(hù)難度,要防止緩存數(shù)據(jù)過(guò)多導(dǎo)致full GC帶來(lái)的性能問(wèn)題。

4.4 RDD的持久化:

持久化級(jí)別 說(shuō)明
MEMORY_ONLY(默認(rèn)級(jí)別) 以非序列化的JAVA對(duì)象的方式存儲(chǔ)在JVM的內(nèi)存中,當(dāng)內(nèi)存不夠存儲(chǔ)所有Partition時(shí),沒(méi)有存儲(chǔ)到JVM內(nèi)存中的Partition會(huì)在需要他的時(shí)候重新計(jì)算。
MEMORY_AND_DISK 以非序列化的JAVA對(duì)象的方式存儲(chǔ)在JVM的內(nèi)存中,當(dāng)JVM內(nèi)存不夠時(shí),會(huì)將超出的Partition持久化到磁盤
MEMORY_ONLY_SER 與MEMORY_ONLY相同,但是會(huì)將Java對(duì)象進(jìn)行序列化,減少了內(nèi)存開(kāi)銷,但是增大了CPU開(kāi)銷
MEMORY_AND_DISK_SER 與MEMORY_AND_DISK相同,但是會(huì)將Java對(duì)象進(jìn)行序列化,減少了內(nèi)存開(kāi)銷,但是增大了CPU開(kāi)銷
*_2(MEMORY_ONLY_2,MEMORY_AND_DISK_2,MEMORY_ONLY_SER_2,MEMORY_ONLY_SER_2 ) 表示持久化數(shù)據(jù)要有一個(gè)副本,保存到其他節(jié)點(diǎn)上,實(shí)現(xiàn)容錯(cuò),避免因?yàn)橐粋€(gè)RDD分區(qū)意外丟失導(dǎo)致所有數(shù)據(jù)重新計(jì)算;

4.5 RDD的緩存過(guò)程:

Unroll過(guò)程:

RDD在緩存到Storage之前,Partition以Iterator數(shù)據(jù)結(jié)構(gòu)來(lái)訪問(wèn),通過(guò)Iterator可以獲得每一條數(shù)據(jù)記錄(Record,序列化使用SerializedMemoryEntry數(shù)據(jù)結(jié)構(gòu)定義,使用字節(jié)緩沖區(qū)ByteBuffer存儲(chǔ)二進(jìn)制數(shù)據(jù);非序列化使用DeserializedMemoryEntry數(shù)據(jù)結(jié)構(gòu)定義,使用數(shù)組存儲(chǔ)),這些Record在邏輯上占用了Spark堆內(nèi)內(nèi)存的other空間,注意:同一個(gè)Partition內(nèi)的Record并不連續(xù)。

當(dāng)將RDD緩存到Storage內(nèi)存之后,Partition被轉(zhuǎn)換成Block,Record在堆內(nèi)或者堆外的Storage內(nèi)存中占用一塊連續(xù)的空間,這樣講Partition由不連續(xù)的存儲(chǔ)空間轉(zhuǎn)換為連續(xù)的存儲(chǔ)空間的過(guò)程,Spark稱之為Unroll。

每個(gè)Executor的Storage使用一個(gè)LinkedHashMap來(lái)管理所有堆內(nèi)內(nèi)存和堆外內(nèi)存中所有的Block實(shí)例,對(duì)LinkedHashMap中Block的新增和刪除,伴隨著內(nèi)存的申請(qǐng)和釋放。

Block的淘汰和落盤具體規(guī)則如下:
a.舊的Block和新的Block要同屬堆內(nèi)內(nèi)存或者同屬堆內(nèi)內(nèi)存;
b.新舊Block不能同屬一個(gè)RDD,避免循環(huán)淘汰;
c.舊的Block對(duì)應(yīng)的RDD不能處于被讀狀態(tài),避免發(fā)生一致性問(wèn)題;
d.LinkedHashMap的淘汰策略使用LRU方式,采用遍歷的方式,找到淘汰的Block;
e.只有存儲(chǔ)級(jí)別中包含了DISK,Block才進(jìn)行落盤,否則直接刪除該Block,需要的時(shí)候再重新計(jì)算得到被刪除的Block。

由于Storage空間容納的Iterator是有限的,當(dāng)前計(jì)算任務(wù),向Memory Manager申請(qǐng)足夠的Uroll空間(空間的估算方式,序列化的Partition直接累加計(jì)算即可,而且計(jì)算準(zhǔn)確;非序列化的Partition,使用遍歷Record的方式采樣估算所需要的Unroll空間,估算會(huì)精度會(huì)有偏差,當(dāng)程序執(zhí)行Unroll時(shí)遇到空間不足時(shí),Unroll會(huì)釋放已占有的空間)來(lái)臨時(shí)占位,當(dāng)空間不足時(shí),unroll失敗,直到空間足夠時(shí)才能夠執(zhí)行Unroll。

4.6 Execution內(nèi)存管理:

Execution主要保存Shuffle執(zhí)行時(shí)占用的內(nèi)存。

4.6.1 Shuffle Write

a.若在map端使用普通的排序方式,則會(huì)使用ExeternalSorter進(jìn)行外排,使用Spark堆內(nèi)內(nèi)存空間;
b.若在map端使用tungsten排序(鎢絲排序),則會(huì)使用ShuffleExeternalSorter直接對(duì)序列化形式的對(duì)象進(jìn)行排序,該方式可以使用Spark堆內(nèi)內(nèi)存空間和Spark堆外內(nèi)存空間(當(dāng)堆外內(nèi)存開(kāi)啟時(shí))。

4.6.2 Shuffle Read

a.在對(duì)reduce端數(shù)據(jù)進(jìn)行聚合時(shí),數(shù)據(jù)由Aggregator處理,使用Spark堆內(nèi)內(nèi)存空間;
b.如果最終結(jié)果需要排序,那么數(shù)據(jù)還需要ExeternalSorter進(jìn)行外排,使用Spark堆內(nèi)內(nèi)存空間。

ExeternalSorter和Aggregator使用AppendOnlyMap哈希表保存數(shù)據(jù)(當(dāng)內(nèi)存不夠時(shí),會(huì)保存到磁盤中)。

4.7 Block的管理:

image.png
a.BlockManagerMaster:

負(fù)責(zé)對(duì)各個(gè)節(jié)點(diǎn)上的BlockManager內(nèi)部數(shù)據(jù)的元數(shù)據(jù)進(jìn)行維護(hù)(主要是Block的增刪改等);

b.BlockManager:

每個(gè)BlockManager創(chuàng)建之后,首先將自己注冊(cè)到BlockManagerMaster,這時(shí)候BlockManagerMaster就會(huì)有對(duì)應(yīng)的BlockManagerInfo;

BlockManager進(jìn)行數(shù)據(jù)讀寫的時(shí)候,首先從本地讀?。ㄊ褂肈iskStore或者M(jìn)emoryStore),如果本地沒(méi)有對(duì)應(yīng)數(shù)據(jù)的話,通過(guò)TransferService與有數(shù)據(jù)的BlockManager建立連接,然后從遠(yuǎn)程拉取數(shù)據(jù)。

4.8 Spark共享變量:

默認(rèn)情況下,每個(gè)Spark的算子,使用了某個(gè)外部變量,那么這個(gè)變量就會(huì)復(fù)制到每個(gè)task中,這樣每個(gè)task讀取各自的外部變量。

4.8.1 Spark廣播變量(Broadcast Variable)

廣播變量在Spark開(kāi)始時(shí)是在Driver端,當(dāng)task使用廣播變量時(shí),先從自己所在Executor中BlockManager中獲取,則從Driver端或者其他節(jié)點(diǎn)的BlockManager獲取,該Executor后續(xù)的task使用廣播變量時(shí),自己從自己所在Executor中BlockManager中獲取。為每個(gè)Executor復(fù)制一份,避免了每個(gè)task復(fù)制一份的性能問(wèn)題,減少了IO和內(nèi)存消耗。廣播變量不能被task修改。

4.8.2 Spark累加器(Accumulator)

Spark Accumulator保存于Driver端,task對(duì)Accumulator的累加操作后,會(huì)把值發(fā)送到Driver端,Driver對(duì)值進(jìn)行匯總。task只能對(duì)Accumulator只能進(jìn)行累加操作,不能夠讀取。

5.Spark性能調(diào)優(yōu)

5.1 資源的配置:

在資源允許的情況下:
a.增加Executor個(gè)數(shù),提高task并行度;
b.增加每個(gè)Executor的CPU core的個(gè)數(shù),提高task并行度;
c.增加每個(gè)Executor的內(nèi)存,這樣可以緩存更多的數(shù)據(jù),為Shuffle提供更多的內(nèi)存,每個(gè)task也獲取了更多的內(nèi)存。

5.2 RDD優(yōu)化:

a.RDD復(fù)用,避免重復(fù)計(jì)算帶來(lái)的資源消耗;
b.RDD持久化,通過(guò)持久化,將多次使用的RDD緩存到內(nèi)存或磁盤中。

5.3 RDD并行度控制:

盡量讓并行度和資源匹配,Spark官方推薦,Stage中的task數(shù)量為Spark的總CPU core數(shù)量的2-3倍,避免不同CPU不同的性能,導(dǎo)致先執(zhí)行完task的CPU空閑。

5.4 廣播大變量:

廣播變量為每個(gè)Executor復(fù)制一份,避免了每個(gè)task復(fù)制一份的性能問(wèn)題,減少了IO和內(nèi)存消耗。

5.5 kyro序列化:

Spark默認(rèn)使用Java的序列化,但是Java的序列化性能和效率不高,使用kyro序列化的性能是Java序列化的性能高很多(官方說(shuō)是10倍)。

5.6 本地化等待時(shí)間:

Spark的希望task能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)所在節(jié)點(diǎn)上(數(shù)據(jù)本地化思想,避免網(wǎng)絡(luò)傳輸),當(dāng)該節(jié)點(diǎn)的計(jì)算資源不夠時(shí),spark會(huì)等待(默認(rèn)等待3秒),如果超過(guò)了等待時(shí)間,那么task的本地化等級(jí)會(huì)降級(jí),然后再等待一段時(shí)間,否則再次降級(jí)。

本地化等待時(shí)間太長(zhǎng),會(huì)導(dǎo)致大量的等待時(shí)間過(guò)長(zhǎng),反而使Spark任務(wù)性能下降。

5.7 算子調(diào)優(yōu):

a.MapPartitions的使用場(chǎng)景;
b.foreachPartitions的使用場(chǎng)景;
c.filter和coalesce配合,過(guò)濾數(shù)據(jù)之后,考慮數(shù)據(jù)重新分區(qū)的問(wèn)題;
d.repartition重新分區(qū)提高并行度,Spark SQL的并行度不允許用戶指定,如讀取hive數(shù)據(jù)時(shí),HDFS的split個(gè)數(shù),即使所在Stage的并行度,即RDD的個(gè)數(shù)。但是在讀取到數(shù)據(jù)之后,可以立即使用repartition算子,進(jìn)行合理的分區(qū);
e.reduceByKey本地聚合,在map端對(duì)本地的數(shù)據(jù)進(jìn)行聚合。

5.8 Shuffle調(diào)優(yōu):

a.調(diào)整map端緩沖區(qū)(默認(rèn)32KB),避免因?yàn)榫彺鎱^(qū)不夠?qū)е碌乃⒈PIO操作;
b.調(diào)整reduce端緩沖區(qū)(默認(rèn)48MB),避免因?yàn)榫彺鎱^(qū)不夠?qū)е碌乃⒈PIO操作,減少數(shù)據(jù)拉去的次數(shù);
c.調(diào)整reduce端拉去數(shù)據(jù)重試次數(shù)(默認(rèn)3次),避免網(wǎng)絡(luò)不穩(wěn)定或者full gc導(dǎo)致的數(shù)據(jù)拉去失敗問(wèn)題;
d.調(diào)整reduce端拉去數(shù)據(jù)等待時(shí)間的閾值(默認(rèn)5秒),避免網(wǎng)絡(luò)不穩(wěn)定或者full gc導(dǎo)致的數(shù)據(jù)拉去失敗問(wèn)題;
e.調(diào)整SortShuffle排序操作閾值,即bypass操作的閾值,避免不必要的排序操作;

5.9 JVM調(diào)優(yōu):

a.降低cache操作的內(nèi)存占比;
b.調(diào)節(jié)Executor的堆外內(nèi)存;
c.調(diào)節(jié)連接等待時(shí)長(zhǎng);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容