??Apache Spark是開源的、分布式的、集成計算引擎,支持在計算機集群上的并行數(shù)據(jù)計算。Spark支持多種被廣泛使用的編程語言(Python,Java,Scala,R),包含一系列適用于不同任務處理的工具,從SQL數(shù)據(jù)分析,到流數(shù)據(jù)處理,到機器學習等,并且在單個PC上和數(shù)千臺服務器上都能夠友好運行,非常適合大數(shù)據(jù)分析處理。
spark應用的結構
??spark應用包含driver進程和executor進程。driver進程負責三件事情:保存維護spark應用的狀態(tài)信息,返回程序執(zhí)行結果,分配任務給executor進程。executor進程負責執(zhí)行driver進程分配的任務,并且返回任務狀態(tài)和結果給driver。
??除了driver進程和executor進程,spark應用還必須包含一個集群管理器,負責管理集群中的機器和分配計算資源。spark支持三種集群管理器:spark本身自帶的集群管理器,Apache Mesos,Hadoop YARN。集群管理器也有類似于driver和executor的driver節(jié)點(有時候也叫master節(jié)點)和worker節(jié)點,其區(qū)別在于,集群管理器的driver節(jié)點和worker節(jié)點都是物理機器,spark應用的driver進程和executor進程都是執(zhí)行程序的進程。當啟動spark應用時,程序會首先向集群管理器申請計算資源,然后在申請到資源執(zhí)行任務的過程中,集群管理器負責管理運行任務的機器。
執(zhí)行模式Execution Modes
??執(zhí)行模式的不同決定了在運行spark應用時資源是如何進行分配的。spark支持三種執(zhí)行模式可供選擇:cluster mode集群模式,client mode客戶端模式,local mode單機模式。
cluster mode集群模式
??集群模式是spark最常見的執(zhí)行模式,使用集群中的多臺機器更能發(fā)揮spark分布式計算的優(yōu)勢。在集群模式下spark應用啟動的過程是這樣的:
1)首先,用戶提交應用程序給集群管理器;
2)然后,集群管理器的driver節(jié)點在集群中的一臺worker節(jié)點上啟動driver進程,然后在其他worker節(jié)點上啟動executor進程;
- driver進程和executor進程在不同的worker節(jié)點上;
- 同一個worker節(jié)點上可能會啟動多個executor進程;
client mode客戶端模式
??客戶端模式與集群模式下,spark應用的啟動過程幾乎是相同的,唯一不同的一點是,在客戶端模式下,driver進程是在用戶提交應用程序的客戶端機器上啟動的,而不是在集群中的任何一臺worker節(jié)點上。
local mode單機模式
??單機模式與前兩種執(zhí)行模式完全不同,在單機模式下,driver進程和executor進程全部運行在本地的一臺機器上,并行計算是通過單機上的多線程實現(xiàn)的。單機模式不適合用于實際生產(chǎn)應用,通常用于在本地測試spark程序。
Spark應用的生命周期(spark程序外)
用戶請求
- 首先,由用戶提交spark應用程序;
1)在本機執(zhí)行應用程序,準備向集群管理器的driver節(jié)點提交申請,申請的內(nèi)容只有driver進程所需的資源;
- 用戶的應用程序中必須包含SparkSession,因為SparkSession是spark程序的入口,用于與集群傳遞信息(比如executor的數(shù)量等);
2)集群管理器的driver節(jié)點接收到用戶請求,并且在集群中的一臺worker節(jié)點上啟動driver進程;
3)集群管理器返回狀態(tài)信息給本機,本機上提交程序的用戶進程退出,此時,應用與本機斷開,并且開始在啟動的driver進程中運行。
啟動
1)driver進程中的SparkSession向集群管理器中的driver節(jié)點發(fā)出請求,請求的內(nèi)容是啟動executor進程;
2)driver節(jié)點接收到請求,并且對資源進行評估,如果資源滿足請求,driver節(jié)點在集群中的worker節(jié)點上啟動executor進程;
- executor的數(shù)量和每個executor的資源大小,用戶可以在使用spark-submit命令提交任務的時候通過參數(shù)進行配置;
3)集群的driver節(jié)點把啟動的executor地址等信息發(fā)送給driver進程;
執(zhí)行
driver進程和executor進程全部啟動之后,接下來的執(zhí)行過程就是在spark內(nèi)部的生命周期流程:driver負責向executor分配任務,executor執(zhí)行任務并且向driver返回任務的運行狀態(tài)(成功或失?。?。詳細的過程在下一部分spark內(nèi)部的生命周期中介紹。
執(zhí)行結束
在spark應用運行結束之后,首先driver進程關閉,并且把運行結果返回給集群管理器,然后集群管理器關閉executor進程。
Spark應用的生命周期(spark程序內(nèi))
在spark內(nèi)部實際執(zhí)行的是用戶提交的應用程序代碼,理解用戶程序在spark內(nèi)部的生命周期流程有助于任務的跟蹤和優(yōu)化。
SparkSession
spark應用的第一步是創(chuàng)建一個SparkSession,因為它是spark程序的入口,在很多交互模式下,系統(tǒng)會為用戶自動創(chuàng)建好一個SparkSession,但是在用戶編寫應用的時候必須手動創(chuàng)建SparkSession。
SparkSession創(chuàng)建之后,就開始執(zhí)行用戶提交的程序。在spark中,用戶提交的所有程序都會被編譯成底層RDD。
邏輯指令
Spark程序本質上由transformation操作和action操作組成,可以通過SQL API或者RDD API創(chuàng)建。為了理解spark程序是如何在集群上運行的,最重要的是清楚用戶編寫的程序是如何轉化成物理執(zhí)行計劃的。為此,我們以以下代碼為例,描述spark詳細描述:
df1 = spark.range(2,10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)") # 2500000000000
Spark Job
一般情況下,一個action操作對應一個Spark Job。每個Job又會分成多個stages,stages的數(shù)量取決于程序中有多少個shuffle類型的操作。
stages
Stages是能夠在多臺機器上并行計算以完成想同類型的操作的一組tasks。一般情況下,Spark會把盡量多連續(xù)的transformation操作包含在同一個stage中,直到遇到shuffle類型的操作,才重新開始一個新的stage。shuffle,中文直譯為洗牌,表示數(shù)據(jù)在不同executor之間的重新分配,比如對Dataframe按照某個字段進行排序,按照某個字段group之后聚合操作,都會引起數(shù)據(jù)的重新分配,屬于shuffle類型的操作。Spark在遇到shuffle操作之后重新開始一個stage,并且記錄下stages的順序。
Tasks
每個stages中包含了很多tasks,每個task對應于單個executor上面對一組數(shù)據(jù)進行的一組transformation操作。tasks的數(shù)量取決于數(shù)據(jù)的partition的數(shù)量,如果你有一個包含非常多數(shù)據(jù)的很大的partition,就會有一個大的task,但是如果把一個大的數(shù)據(jù)repartition成很多個小的partition,就會有很多個小任務,所以,把數(shù)據(jù)分成很多個partition意味著更多的數(shù)據(jù)可以并行計算。
另外,stages和tasks有兩個重要的特性:1)pipelining,spark會自動把連續(xù)的transformation操作放在一個stage中執(zhí)行;2)shuffle persistence,spark在執(zhí)行shuffle操作的時候,shuffle的前一個stage會首先把數(shù)據(jù)寫入到stage當前節(jié)點的磁盤上,然后,執(zhí)行shuffle的stage才啟動,并且從每個節(jié)點的磁盤文件中讀取對應的記錄(比如對應某些特定的key的記錄)執(zhí)行計算任務。這樣的機制的非常大的好處是,由于shuffle任務需要的數(shù)據(jù)已經(jīng)在前一個stage寫入到磁盤上,當shuffle任務執(zhí)行失敗的時候,spark只需要基于磁盤數(shù)據(jù)重新啟動shuffle任務,而不需要重新啟動所有的任務。