Spark基本架構(gòu)及原理

Hadoop 和 Spark 的關(guān)系

Spark 運(yùn)算比 Hadoop 的 MapReduce 框架快的原因是因?yàn)?Hadoop 在一次 MapReduce 運(yùn)算之后,會(huì)將數(shù)據(jù)的運(yùn)算結(jié)果從內(nèi)存寫入到磁盤中,第二次 Mapredue 運(yùn)算時(shí)在從磁盤中讀取數(shù)據(jù),所以其瓶頸在2次運(yùn)算間的多余 IO 消耗. Spark 則是將數(shù)據(jù)一直緩存在內(nèi)存中,直到計(jì)算得到最后的結(jié)果,再將結(jié)果寫入到磁盤,所以多次運(yùn)算的情況下, Spark 是比較快的. 其優(yōu)化了迭代式工作負(fù)載

Hadoop 的局限 Spark的改進(jìn)
抽象層次低,學(xué)習(xí)成本較高 通過使用RDD的統(tǒng)一抽象,實(shí)現(xiàn)數(shù)據(jù)處理邏輯的代碼非常簡(jiǎn)潔
只有 Map , Reduce 操作 通過RDD提供了很多轉(zhuǎn)換和動(dòng)作,實(shí)現(xiàn)了很多基本操作,如Sort, Join等
一個(gè)Job只有Map和Reduce兩個(gè)階段,復(fù)雜的程序需要大量的Job來完成,且Job之間的依賴關(guān)系需要開發(fā)者自行管理 一個(gè)Job可以包含RDD的多個(gè)轉(zhuǎn)換操作,在調(diào)度時(shí)可以生成多個(gè)階段(Stage),而且如果多個(gè)map操作的RDD的分區(qū)不變,是可以放在同一個(gè)Task中進(jìn)行
處理邏輯隱藏在代碼細(xì)節(jié)中,缺乏整體邏輯視圖 RDD的轉(zhuǎn)換支持流式API,提供處理邏輯的整體視圖
對(duì)迭代式數(shù)據(jù)處理性能比較差,Reduce與下一步Map之間的中間結(jié)果只能存放在HDFS中 通過內(nèi)存緩存數(shù)據(jù),可大大提高迭代式計(jì)算的性能,內(nèi)存不足時(shí)可以溢出到本地磁盤,而不是HDFS
ReduceTask需要等待所有MapTask都完成后才可以開始 分區(qū)相同的轉(zhuǎn)換構(gòu)成流水線放在一個(gè)Task中運(yùn)行,分區(qū)不同的轉(zhuǎn)換需要Shuffle,被劃分到不同的Stage中,需要等待前面的Stage完成后才可以開始
時(shí)延高,只適用Batch數(shù)據(jù)處理,對(duì)于交互式數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理的支持不夠 通過將流拆成小的batch提供Discretized Stream處理流數(shù)據(jù)

Spark 的主要特點(diǎn)還包括:

  1. 提供 Cache 機(jī)制來支持需要反復(fù)迭代計(jì)算或者多次數(shù)據(jù)共享,減少數(shù)據(jù)讀取的 IO 開銷;
  2. 提供了一套支持 DAG 圖的分布式并行計(jì)算的編程框架,減少多次計(jì)算之間中間結(jié)果寫到 Hdfs 的開銷;
  3. 使用多線程池模型減少 Task 啟動(dòng)開稍, shuffle 過程中避免不必要的 sort 操作并減少磁盤 IO 操作。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)

Spark 系統(tǒng)架構(gòu)

Application: Appliction都是指用戶編寫的Spark應(yīng)用程序,其中包括一個(gè)Driver功能的代碼和分布在集群中多個(gè)節(jié)點(diǎn)上運(yùn)行的Executor代碼

Driver: Spark中的Driver即運(yùn)行上述Application的main函數(shù)并創(chuàng)建SparkContext,創(chuàng)建SparkContext的目的是為了準(zhǔn)備Spark應(yīng)用程序的運(yùn)行環(huán)境,在Spark中有SparkContext負(fù)責(zé)與ClusterManager通信,進(jìn)行資源申請(qǐng)、任務(wù)的分配和監(jiān)控等,當(dāng)Executor部分運(yùn)行完畢后,Driver同時(shí)負(fù)責(zé)將SparkContext關(guān)閉,通常用SparkContext代表Driver

Executor: 某個(gè)Application運(yùn)行在worker節(jié)點(diǎn)上的一個(gè)進(jìn)程, 該進(jìn)程負(fù)責(zé)運(yùn)行某些Task, 并且負(fù)責(zé)將數(shù)據(jù)存到內(nèi)存或磁盤上,每個(gè)Application都有各自獨(dú)立的一批Executor, 在Spark on Yarn模式下,其進(jìn)程名稱為CoarseGrainedExecutor Backend。一個(gè)CoarseGrainedExecutor Backend有且僅有一個(gè)Executor對(duì)象, 負(fù)責(zé)將Task包裝成taskRunner,并從線程池中抽取一個(gè)空閑線程運(yùn)行Task, 這個(gè)每一個(gè)oarseGrainedExecutor Backend能并行運(yùn)行Task的數(shù)量取決與分配給它的cpu個(gè)數(shù)。

Cluter Manager:指的是在集群上獲取資源的外部服務(wù)。目前有三種類型

  1.   Standalon : spark原生的資源管理,由Master負(fù)責(zé)資源的分配
    
  2. Apache Mesos:與hadoop MR兼容性良好的一種資源調(diào)度框架
  3. Hadoop Yarn: 主要是指Yarn中的ResourceManager

Worker: 集群中任何可以運(yùn)行Application代碼的節(jié)點(diǎn),在Standalone模式中指的是通過slave文件配置的Worker節(jié)點(diǎn),在Spark on Yarn模式下就是NoteManager節(jié)點(diǎn)。

Task: 被送到某個(gè)Executor上的工作單元,但hadoopMR中的MapTask和ReduceTask概念一樣,是運(yùn)行Application的基本單位,多個(gè)Task組成一個(gè)Stage,而Task的調(diào)度和管理等是由TaskScheduler負(fù)責(zé)。

Job: 包含多個(gè)Task組成的并行計(jì)算,往往由Spark Action觸發(fā)生成, 一個(gè)Application中往往會(huì)產(chǎn)生多個(gè)Job。

Stage: 每個(gè)Job會(huì)被拆分成多組Task, 作為一個(gè)TaskSet, 其名稱為Stage,Stage的劃分和調(diào)度是有DAGScheduler來負(fù)責(zé)的,Stage有非最終的Stage(Shuffle Map Stage)和最終的Stage(Result Stage)兩種,Stage的邊界就是發(fā)生shuffle的地方。

DAGScheduler: 根據(jù)Job構(gòu)建基于Stage的DAG(Directed Acyclic Graph有向無環(huán)圖),并提交Stage給TASkScheduler。 其劃分Stage的依據(jù)是RDD之間的依賴的關(guān)系找出開銷最小的調(diào)度方法,如下圖:

file

TASKSedulter: 將TaskSET提交給worker運(yùn)行,每個(gè)Executor運(yùn)行什么Task就是在此處分配的. TaskScheduler維護(hù)所有TaskSet,當(dāng)Executor向Driver發(fā)生心跳時(shí),TaskScheduler會(huì)根據(jù)資源剩余情況分配相應(yīng)的Task。另外TaskScheduler還維護(hù)著所有Task的運(yùn)行標(biāo)簽,重試失敗的Task。下圖展示了TaskScheduler的作用:

file

在不同運(yùn)行模式中任務(wù)調(diào)度器具體為:

  1. Spark on Standalone模式為TaskScheduler
  2. YARN-Client模式為YarnClientClusterScheduler
  3. YARN-Cluster模式為YarnClusterScheduler

運(yùn)行層次圖如下:

file

Job=多個(gè)stage,Stage=多個(gè)同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

整個(gè) Spark 集群中,分為 Master 節(jié)點(diǎn)與 worker 節(jié)點(diǎn),其中 Master 節(jié)點(diǎn)負(fù)責(zé)將串行任務(wù)變成可并行執(zhí)行的任務(wù)集Tasks, 同時(shí)還負(fù)責(zé)出錯(cuò)問題處理等,而 Worker 節(jié)點(diǎn)負(fù)責(zé)執(zhí)行任務(wù).
  Driver 的功能是創(chuàng)建 SparkContext, 負(fù)責(zé)執(zhí)行用戶寫的 Application 的 main 函數(shù)進(jìn)程,Application 就是用戶寫的程序.
  不同的模式可能會(huì)將 Driver 調(diào)度到不同的節(jié)點(diǎn)上執(zhí)行.集群管理模式里, local 一般用于本地調(diào)試.
  每個(gè) Worker 上存在一個(gè)或多個(gè) Executor 進(jìn)程,該對(duì)象擁有一個(gè)線程池,每個(gè)線程負(fù)責(zé)一個(gè) Task 任務(wù)的執(zhí)行.根據(jù) Executor 上 CPU-core 的數(shù)量,其每個(gè)時(shí)間可以并行多個(gè) 跟 core 一樣數(shù)量的 Task.Task 任務(wù)即為具體執(zhí)行的 Spark 程序的任務(wù).

file

spark運(yùn)行流程圖如下:

file
  1. 構(gòu)建Spark Application的運(yùn)行環(huán)境,啟動(dòng)SparkContext
  2. SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請(qǐng)運(yùn)行Executor資源,并啟動(dòng)StandaloneExecutorbackend,
  3. Executor向SparkContext申請(qǐng)Task
  4. SparkContext將應(yīng)用程序分發(fā)給Executor
  5. SparkContext構(gòu)建成DAG圖,將DAG圖分解成Stage、將Taskset發(fā)送給Task Scheduler,最后由Task Scheduler將Task發(fā)送給Executor運(yùn)行
  6. Task在Executor上運(yùn)行,運(yùn)行完釋放所有資源

Spark運(yùn)行特點(diǎn):

  1. 每個(gè)Application獲取專屬的executor進(jìn)程,該進(jìn)程在Application期間一直駐留,并以多線程方式運(yùn)行Task。這種Application隔離機(jī)制是有優(yōu)勢(shì)的,無論是從調(diào)度角度看(每個(gè)Driver調(diào)度他自己的任務(wù)),還是從運(yùn)行角度看(來自不同Application的Task運(yùn)行在不同JVM中),當(dāng)然這樣意味著Spark Application不能跨應(yīng)用程序共享數(shù)據(jù),除非將數(shù)據(jù)寫入外部存儲(chǔ)系統(tǒng)
  2. Spark與資源管理器無關(guān),只要能夠獲取executor進(jìn)程,并能保持相互通信就可以了
  3. 提交SparkContext的Client應(yīng)該靠近Worker節(jié)點(diǎn)(運(yùn)行Executor的節(jié)點(diǎn)),最好是在同一個(gè)Rack里,因?yàn)镾park Application運(yùn)行過程中SparkContext和Executor之間有大量的信息交換
  4. Task采用了數(shù)據(jù)本地性和推測(cè)執(zhí)行的優(yōu)化機(jī)制

Spark作業(yè)基本運(yùn)行原理

file

我們使用spark-submit提交一個(gè)Spark作業(yè)之后,這個(gè)作業(yè)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程。根據(jù)你使用的部署模式(deploy-mode)不同,Driver進(jìn)程可能在本地啟動(dòng),也可能在集群中某個(gè)工作節(jié)點(diǎn)上啟動(dòng)。Driver進(jìn)程本身會(huì)根據(jù)我們?cè)O(shè)置的參數(shù),占有一定數(shù)量的內(nèi)存和CPU core。而Driver進(jìn)程要做的第一件事情,就是向集群管理器(YARN或者其他資源管理集群)申請(qǐng)運(yùn)行Spark作業(yè)需要使用的資源,這里的資源指的就是Executor進(jìn)程。YARN集群管理器會(huì)根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù),在各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)一定數(shù)量的Executor進(jìn)程,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core。

在申請(qǐng)到了作業(yè)執(zhí)行所需的資源之后,Driver進(jìn)程就會(huì)開始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了。Driver進(jìn)程會(huì)將我們編寫的Spark作業(yè)代碼分拆為多個(gè)stage,每個(gè)stage執(zhí)行一部分代碼片段,并為每個(gè)stage創(chuàng)建一批task,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行。task是最小的計(jì)算單元,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段),只是每個(gè)task處理的數(shù)據(jù)不同而已。一個(gè)stage的所有task都執(zhí)行完畢之后,會(huì)在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果,然后Driver就會(huì)調(diào)度運(yùn)行下一個(gè)stage。下一個(gè)stage的task的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果。如此循環(huán)往復(fù),直到將我們自己編寫的代碼邏輯全部執(zhí)行完,并且計(jì)算完所有的數(shù)據(jù),得到我們想要的結(jié)果為止。

Spark是根據(jù)shuffle類算子來進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個(gè)shuffle類算子(比如reduceByKey、join等),那么就會(huì)在該算子處,劃分出一個(gè)stage界限來??梢源笾吕斫鉃?,shuffle算子執(zhí)行之前的代碼會(huì)被劃分為一個(gè)stage,shuffle算子執(zhí)行以及之后的代碼會(huì)被劃分為下一個(gè)stage。因此一個(gè)stage剛開始執(zhí)行的時(shí)候,它的每個(gè)task可能都會(huì)從上一個(gè)stage的task所在的節(jié)點(diǎn),去通過網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key,然后對(duì)拉取到的所有相同的key使用我們自己編寫的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))。這個(gè)過程就是shuffle。

當(dāng)我們?cè)诖a中執(zhí)行了cache/persist等持久化操作時(shí),根據(jù)我們選擇的持久化級(jí)別的不同,每個(gè)task計(jì)算出來的數(shù)據(jù)也會(huì)保存到Executor進(jìn)程的內(nèi)存或者所在節(jié)點(diǎn)的磁盤文件中。

因此Executor的內(nèi)存主要分為三塊:第一塊是讓task執(zhí)行我們自己編寫的代碼時(shí)使用,默認(rèn)是占Executor總內(nèi)存的20%;第二塊是讓task通過shuffle過程拉取了上一個(gè)stage的task的輸出后,進(jìn)行聚合等操作時(shí)使用,默認(rèn)也是占Executor總內(nèi)存的20%;第三塊是讓RDD持久化時(shí)使用,默認(rèn)占Executor總內(nèi)存的60%。

task的執(zhí)行速度是跟每個(gè)Executor進(jìn)程的CPU core數(shù)量有直接關(guān)系的。一個(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)線程。而每個(gè)Executor進(jìn)程上分配到的多個(gè)task,都是以每個(gè)task一條線程的方式,多線程并發(fā)運(yùn)行的。如果CPU core數(shù)量比較充足,而且分配到的task數(shù)量比較合理,那么通常來說,可以比較快速和高效地執(zhí)行完這些task線程。


file

lightbatis 數(shù)據(jù)庫訪問包 Lightbatis !

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容