
Spark集群
一組計(jì)算機(jī)的集合,每個計(jì)算機(jī)節(jié)點(diǎn)作為獨(dú)立的計(jì)算資源,又可以虛擬出多個具備計(jì)算能力的虛擬機(jī),這些虛擬機(jī)是集群中的計(jì)算單元。Spark的核心模塊專注于調(diào)度和管理虛擬機(jī)之上分布式計(jì)算任務(wù)的執(zhí)行,集群中的計(jì)算資源則交給Cluster Manager這個角色來管理,Cluster Manager可以為自帶的Standalone、或第三方的Yarn和Mesos。
Cluster Manager一般采用Master-Slave結(jié)構(gòu)。以Yarn為例,部署ResourceManager服務(wù)的節(jié)點(diǎn)為Master,負(fù)責(zé)集群中所有計(jì)算資源的統(tǒng)一管理和分配;部署NodeManager服務(wù)的節(jié)點(diǎn)為Slave,負(fù)責(zé)在當(dāng)前節(jié)點(diǎn)創(chuàng)建一個或多個具備獨(dú)立計(jì)算能力的JVM實(shí)例,在Spark中,這些節(jié)點(diǎn)也叫做Worker。
另外還有一個Client節(jié)點(diǎn)的概念,是指用戶提交Spark Application時(shí)所在的節(jié)點(diǎn)。
Application
用戶自己寫的Spark應(yīng)用程序,批處理作業(yè)的集合。Application的main方法為應(yīng)用程序的入口,用戶通過Spark的API,定義了RDD和對RDD的操作。這里可以參考一段定義:
可以認(rèn)為應(yīng)用是多次批量計(jì)算組合起來的過程,在物理上可以表現(xiàn)為你寫的程序包+部署配置。應(yīng)用的概念類似于計(jì)算機(jī)中的程序,它只是一個藍(lán)本,尚沒有運(yùn)行起來?!?a target="_blank" rel="nofollow">spark學(xué)習(xí)筆記三:spark原理介紹
SparkContext
Spark最重要的API,用戶邏輯與Spark集群主要的交互接口,它會和Cluster Master交互,包括向它申請計(jì)算資源等。
Driver和Executor
Spark在執(zhí)行每個Application的過程中會啟動Driver和Executor兩種JVM進(jìn)程:
- Driver進(jìn)程為主控進(jìn)程,負(fù)責(zé)執(zhí)行用戶Application中的main方法,提交Job,并將Job轉(zhuǎn)化為Task,在各個Executor進(jìn)程間協(xié)調(diào)Task的調(diào)度。
- 運(yùn)行在Worker上的Executor進(jìn)程負(fù)責(zé)執(zhí)行Task,并將結(jié)果返回給Driver,同時(shí)為需要緩存的RDD提供存儲功能。

圖片來源 - Spark Cluster Mode Overview
Spark有Client和Cluster兩種部署Application的模式,Application以以Client模式部署時(shí),Driver運(yùn)行于Client節(jié)點(diǎn),而以Cluster模式部署時(shí),Driver運(yùn)行于Worker節(jié)點(diǎn),與Executor一樣由Cluster Manager啟動。
RDD
彈性分布式數(shù)據(jù)集,只讀分區(qū)記錄的集合,Spark對所處理數(shù)據(jù)的基本抽象。Spark中的計(jì)算可以簡單抽象為對RDD的創(chuàng)建、轉(zhuǎn)換和返回操作結(jié)果的過程:
- 創(chuàng)建
通過加載外部物理存儲(如HDFS)中的數(shù)據(jù)集,或Application中定義的對象集合(如List)來創(chuàng)建。RDD在創(chuàng)建后不可被改變,只可以對其執(zhí)行下面兩種操作。 - 轉(zhuǎn)換(Transformation)
對已有的RDD中的數(shù)據(jù)執(zhí)行計(jì)算進(jìn)行轉(zhuǎn)換,而產(chǎn)生新的RDD,在這個過程中有時(shí)會產(chǎn)生中間RDD。Spark對于Transformation采用惰性計(jì)算機(jī)制,遇到Transformation時(shí)并不會立即計(jì)算結(jié)果,而是要等遇到Action時(shí)一起執(zhí)行。 - 行動(Action)
對已有的RDD中的數(shù)據(jù)執(zhí)行計(jì)算產(chǎn)生結(jié)果,將結(jié)果返回Driver程序或?qū)懭氲酵獠课锢泶鎯ΑT贏ction過程中同樣有可能生成中間RDD。
Partition(分區(qū))
一個RDD在物理上被切分為多個Partition,即數(shù)據(jù)分區(qū),這些Partition可以分布在不同的節(jié)點(diǎn)上。Partition是Spark計(jì)算任務(wù)的基本處理單位,決定了并行計(jì)算的粒度,而Partition中的每一條Record為基本處理對象。例如對某個RDD進(jìn)行map操作,在具體執(zhí)行時(shí)是由多個并行的Task對各自分區(qū)的每一條記錄進(jìn)行map映射。
Dependency(依賴)
對RDD的Transformation或Action操作,讓RDD產(chǎn)生了父子依賴關(guān)系(事實(shí)上,Transformation或Action操作生成的中間RDD也存在依賴關(guān)系),這種依賴分為寬依賴和窄依賴兩種:
- NarrowDependency (窄依賴)
parent RDD中的每個Partition最多被child RDD中的一個Partition使用。讓RDD產(chǎn)生窄依賴的操作可以稱為窄依賴操作,如map、union。 - WideDependency (或ShuffleDependency,寬依賴)
parent RDD中的每個Partition被child RDD中的多個Partition使用,這時(shí)會依據(jù)Record的key進(jìn)行數(shù)據(jù)重組,這個過程即為Shuffle(洗牌)。讓RDD產(chǎn)生寬依賴的操作可以稱為寬依賴操作,如reduceByKey, groupByKey。
Spark根據(jù)用戶Application中的RDD的轉(zhuǎn)換和行動,生成RDD之間的依賴關(guān)系,RDD之間的計(jì)算鏈構(gòu)成了RDD的血統(tǒng)(Lineage),同時(shí)也生成了邏輯上的DAG(有向無環(huán)圖)。每一個RDD都可以根據(jù)其依賴關(guān)系一級一級向前回溯重新計(jì)算,這便是Spark實(shí)現(xiàn)容錯的一種手段:
RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計(jì)算。
——《Spark技術(shù)內(nèi)幕》-第3章-RDD實(shí)現(xiàn)詳解
Job
在一個Application中,以Action為劃分邊界的Spark批處理作業(yè)。前面提到,Spark采用惰性機(jī)制,對RDD的創(chuàng)建和轉(zhuǎn)換并不會立即執(zhí)行,只有在遇到第一個Action時(shí)才會生成一個Job,然后統(tǒng)一調(diào)度執(zhí)行。一個Job包含N個Transformation和1個Action。
Shuffle
有一部分Transformation或Action會讓RDD產(chǎn)生寬依賴,這樣過程就像是將父RDD中所有分區(qū)的Record進(jìn)行了“洗牌”(Shuffle),數(shù)據(jù)被打散重組,如屬于Transformation操作的join,以及屬于Action操作的reduce等,都會產(chǎn)生Shuffle。
Stage
一個Job中,以Shuffle為邊界劃分出的不同階段。每個階段包含一組可以被串行執(zhí)行的窄依賴或?qū)捯蕾嚥僮鳎?/p>
用戶提交的計(jì)算任務(wù)是一個由RDD構(gòu)成的DAG,如果RDD在轉(zhuǎn)換的時(shí)候需要做Shuffle,那么這個Shuffle的過程就將這個DAG分為了不同的階段(即Stage)。由于Shuffle的存在,不同的Stage是不能并行計(jì)算的,因?yàn)楹竺鍿tage的計(jì)算需要前面Stage的Shuffle的結(jié)果。
——《Spark技術(shù)內(nèi)幕》-第4章-Scheduler模塊詳解
在對Job中的所有操作劃分Stage時(shí),一般會按照倒序進(jìn)行,即從Action開始,遇到窄依賴操作,則劃分到同一個執(zhí)行階段,遇到寬依賴操作,則劃分一個新的執(zhí)行階段,且新的階段為之前階段的parent,然后依次類推遞歸執(zhí)行。child Stage需要等待所有的parent Stage執(zhí)行完之后才可以執(zhí)行,這時(shí)Stage之間根據(jù)依賴關(guān)系構(gòu)成了一個大粒度的DAG。
在一個Stage內(nèi),所有的操作以串行的Pipeline的方式,由一組Task完成計(jì)算。
Task
對一個Stage之內(nèi)的RDD進(jìn)行串行操作的計(jì)算任務(wù)。每個Stage由一組并發(fā)的Task組成(即TaskSet),這些Task的執(zhí)行邏輯完全相同,只是作用于不同的Partition。一個Stage的總Task的個數(shù)由Stage中最后的一個RDD的Partition的個數(shù)決定。
Spark Driver會根據(jù)數(shù)據(jù)所在的位置分配計(jì)算任務(wù),即把所有Task根據(jù)其Partition所在的位置分配給相應(yīng)的Executor,以盡量減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸(這也就是所謂的移動數(shù)據(jù)不如移動計(jì)算)。一個Executor內(nèi)同一時(shí)刻可以并行執(zhí)行的Task數(shù)由總CPU數(shù)/每個Task占用的CPU數(shù)決定,即spark.executor.cores / spark.task.cpus。
Task分為ShuffleMapTask和ResultTask兩種,位于最后一個Stage的Task為ResultTask,其他階段的屬于ShuffleMapTask。
Persist & Checkpoint
Persist
通過RDD的persist方法,可以將RDD的分區(qū)數(shù)據(jù)持久化在內(nèi)存或硬盤中,通過cache方法則是緩存到內(nèi)存。這里的persist和cache是一樣的機(jī)制,只不過cache是使用默認(rèn)的MEMORY_ONLY的存儲級別對RDD進(jìn)行persist,故“緩存”也就是一種“持久化”。
前面提到,只有觸發(fā)了一個Action之后,Spark才會提交Job進(jìn)行真正的計(jì)算。所以RDD只有經(jīng)過一次Action之后,才能將RDD持久化,然后在Job間共享,即如果兩個Job用到了相同的RDD,那么可以在第一個Job中對這個RDD進(jìn)行緩存,在第二個Job中就避免了RDD的重新計(jì)算。持久化機(jī)制使需要訪問重復(fù)數(shù)據(jù)的Application運(yùn)行地更快,是能夠提升Spark運(yùn)算速度的一個重要功能。
Checkpoint
調(diào)用RDD的checkpoint方法,可以將RDD保存到外部存儲中,如硬盤或HDFS。Spark引入checkpoint機(jī)制,是因?yàn)槌志没腞DD的數(shù)據(jù)有可能丟失或被替換,checkpoint可以在這時(shí)候發(fā)揮作用,避免重新計(jì)算。
創(chuàng)建checkpoint是在當(dāng)前Job完成后,由另外一個專門的Job完成:
也就是說需要checkpoint的RDD會被計(jì)算兩次。因此,在使用rdd.checkpoint()的時(shí)候,建議加上rdd.cache(),這樣第二次運(yùn)行的Job久不用再去計(jì)算該rdd了。
——Apache Spark的設(shè)計(jì)與實(shí)現(xiàn)- Cache和Checkpoint功能
一個Job在開始處理RDD的Partition時(shí),或者更準(zhǔn)確點(diǎn)說,在Executor中運(yùn)行的任務(wù)在獲取Partition數(shù)據(jù)時(shí),會先判斷是否被持久化,在沒有命中時(shí)再判斷是否保存了checkpoint,如果沒有讀取到則會重新計(jì)算該P(yáng)artition。
案例分析
這里借用@JerryLead的ComplexJob案例做一下分析:
object complexJob {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
(3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
println(result.toDebugString)
}
}
作者在這個例子中主要定義了一個對RDD的union和join操作,主要的RDD之間的關(guān)系如下圖所示:

Job的物理執(zhí)行圖:

圖片來源 - Job 物理執(zhí)行圖
參考作者畫的物理執(zhí)行圖,我們可以觀察到:
- 該Application僅有一個Job,由foreachWith這個Action觸發(fā)。
- 這個Job中有三個Stage,partitionBy操作對RDD重新分區(qū)產(chǎn)生了Shuffle,是劃分Stage0和Stage1的邊界。join操作則是Stage2和Stage0的邊界。
- 每個Stage的Task總數(shù)等于該階段的最后一個RDD的Partition個數(shù)。
- 每個Task都是串行執(zhí)行一個Stage內(nèi)的所有操作。
- Transformation操作的過程中會產(chǎn)生中間RDD。