Spark詳解01概覽|Spark部署|執(zhí)行原理

概覽

拿到系統(tǒng)后,部署系統(tǒng)是第一件事,那么系統(tǒng)部署成功以后,各個節(jié)點都啟動了哪些服務(wù)?

部署圖

Spark部署圖

從部署圖中可以看到

  • 整個集群分為 Master 節(jié)點和 Worker 節(jié)點,相當(dāng)于 Hadoop 的 Master 和 Slave 節(jié)點。

  • Master 節(jié)點上常駐 Master 守護進程,負責(zé)管理全部的 Worker 節(jié)點。

  • Worker 節(jié)點上常駐 Worker 守護進程,負責(zé)與 Master 節(jié)點通信并管理 executors。

  • Driver 官方解釋是 “The process running the main() function of the application and creating the SparkContext”。Application 就是用戶自己寫的 Spark 程序(driver program),比如 WordCount.scala。如果 driver program 在 Master 上運行,比如在 Master 上運行

    ./bin/run-example SparkPi 10
    

那么 SparkPi 就是 Master 上的 Driver。如果是 YARN 集群,那么 Driver 可能被調(diào)度到 Worker 節(jié)點上運行(比如上圖中的 Worker Node 2)。另外,如果直接在自己的 PC 上運行 driver program,比如在 Eclipse 中運行 driver program,使用

```scala
val sc = new SparkContext("spark://master:7077", "AppName")
``` 

去連接 master 的話,driver 就在自己的 PC 上,但是不推薦這樣的方式,因為 PC 和 Workers 可能不在一個局域網(wǎng),driver 和 executor 之間的通信會很慢。

  • 每個 Worker 上存在一個或者多個 ExecutorBackend 進程。每個進程包含一個 Executor 對象,該對象持有一個線程池,每個線程可以執(zhí)行一個 task。

  • 每個 application 包含一個 driver 和多個 executors,每個 executor 里面運行的 tasks 都屬于同一個 application。

  • 在 Standalone 版本中,ExecutorBackend 被實例化成 CoarseGrainedExecutorBackend 進程。

    在我部署的集群中每個 Worker 只運行了一個 CoarseGrainedExecutorBackend 進程,沒有發(fā)現(xiàn)如何配置多個 CoarseGrainedExecutorBackend 進程。(應(yīng)該是運行多個 applications 的時候會產(chǎn)生多個進程,這個我還沒有實驗,)

    想了解 Worker 和 Executor 的關(guān)系詳情,可以參閱 @OopsOutOfMemory 同學(xué)寫的 Spark Executor Driver資源調(diào)度小結(jié) 。

  • Worker 通過持有 ExecutorRunner 對象來控制 CoarseGrainedExecutorBackend 的啟停。

了解了部署圖之后,我們先給出一個 job 的例子,然后概覽一下 job 如何生成與運行。

Job 例子

我們使用 Spark 自帶的 examples 包中的 GroupByTest,假設(shè)在 Master 節(jié)點運行,命令是

/* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */
    
bin/run-example GroupByTest 100 10000 1000 36

GroupByTest 具體代碼如下

package org.apache.spark.examples

import java.util.Random

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

/**
  * Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
  */
object GroupByTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("GroupBy Test")
    var numMappers = 100
    var numKVPairs = 10000
    var valSize = 1000
    var numReducers = 36

    val sc = new SparkContext(sparkConf)

    val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
      val ranGen = new Random
      var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
      for (i <- 0 until numKVPairs) {
        val byteArr = new Array[Byte](valSize)
        ranGen.nextBytes(byteArr)
        arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
      }
      arr1
    }.cache
    // Enforce that everything has been calculated and in cache
    pairs1.count

    println(pairs1.groupByKey(numReducers).count)

    sc.stop()
  }
}

閱讀代碼后,用戶頭腦中 job 的執(zhí)行流程是這樣的:

Job執(zhí)行流程

具體流程很簡單,這里來估算下 data size 和執(zhí)行結(jié)果:

  1. 初始化 SparkConf()。
  2. 初始化 numMappers=100, numKVPairs=10,000, valSize=1000, numReducers= 36。
  3. 初始化 SparkContext。這一步很重要,是要確立 driver 的地位,里面包含創(chuàng)建 driver 所需的各種 actors 和 objects。
  4. 每個 mapper 生成一個 arr1: Array[(Int, Byte[])],length 為 numKVPairs。每一個 Byte[] 的 length 為 valSize,Int 為隨機生成的整數(shù)。Size(arr1) = numKVPairs * (4 + valSize) = 10MB,所以 Size(pairs1) = numMappers * Size(arr1) =1000MB。這里的數(shù)值計算結(jié)果都是約等于。
  5. 每個 mapper 將產(chǎn)生的 arr1 數(shù)組 cache 到內(nèi)存。
  6. 然后執(zhí)行一個 action 操作 count(),來統(tǒng)計所有 mapper 中 arr1 中的元素個數(shù),執(zhí)行結(jié)果是 numMappers * numKVPairs = 1,000,000。這一步主要是為了將每個 mapper 產(chǎn)生的 arr1 數(shù)組 cache 到內(nèi)存。
  7. 在已經(jīng)被 cache 的 paris1 上執(zhí)行 groupByKey 操作,groupByKey 產(chǎn)生的 reducer (也就是 partition) 個數(shù)為 numReducers。理論上,如果 hash(Key) 比較平均的話,每個 reducer 收到的 <Int, Array[Byte]> record 個數(shù)為 numMappers * numKVPairs / numReducer = 27,777,大小為 Size(pairs1) / numReducer = 27MB。
  8. reducer 將收到的 <Int, Byte[]> records 中擁有相同 Int 的 records 聚在一起,得到 <Int, list(Byte[], Byte[], ..., Byte[])>。
  9. 最后 count 將所有 reducer 中 records 個數(shù)進行加和,最后結(jié)果實際就是 pairs1 中不同的 Int 總個數(shù)。

Job 邏輯執(zhí)行圖

Job 的實際執(zhí)行流程比用戶頭腦中的要復(fù)雜,需要先建立邏輯執(zhí)行圖(或者叫數(shù)據(jù)依賴圖),然后劃分邏輯執(zhí)行圖生成 DAG 型的物理執(zhí)行圖,然后生成具體 task 執(zhí)行。分析一下這個 job 的邏輯執(zhí)行圖:

使用 RDD.toDebugString 可以看到整個 logical plan (RDD 的數(shù)據(jù)依賴關(guān)系)如下

  MapPartitionsRDD[3] at groupByKey at GroupByTest.scala:51 (36 partitions)
    ShuffledRDD[2] at groupByKey at GroupByTest.scala:51 (36 partitions)
      FlatMappedRDD[1] at flatMap at GroupByTest.scala:38 (100 partitions)
        ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:38 (100 partitions)

用圖表示就是:

Job實際流程圖

需要注意的是 data in the partition 展示的是每個 partition 應(yīng)該得到的計算結(jié)果,并不意味著這些結(jié)果都同時存在于內(nèi)存中。

根據(jù)上面的分析可知:

  • 用戶首先 init 了一個0-99 的數(shù)組: 0 until numMappers
  • parallelize() 產(chǎn)生最初的 ParrallelCollectionRDD,每個 partition 包含一個整數(shù) i。
  • 執(zhí)行 RDD 上的 transformation 操作(這里是 flatMap)以后,生成 FlatMappedRDD,其中每個 partition 包含一個 Array[(Int, Array[Byte])]。
  • 第一個 count() 執(zhí)行時,先在每個 partition 上執(zhí)行 count,然后執(zhí)行結(jié)果被發(fā)送到 driver,最后在 driver 端進行 sum。
  • 由于 FlatMappedRDD 被 cache 到內(nèi)存,因此這里將里面的 partition 都換了一種顏色表示。
  • groupByKey 產(chǎn)生了后面兩個 RDD,為什么產(chǎn)生這兩個在后面章節(jié)討論。
  • 如果 job 需要 shuffle,一般會產(chǎn)生 ShuffledRDD。該 RDD 與前面的 RDD 的關(guān)系類似于 Hadoop 中 mapper 輸出數(shù)據(jù)與 reducer 輸入數(shù)據(jù)之間的關(guān)系。
  • MapPartitionsRDD 里包含 groupByKey() 的結(jié)果。
  • 最后將 MapPartitionsRDD 中的 每個value(也就是Array[Byte])都轉(zhuǎn)換成 Iterable 類型。
  • 最后的 count 與上一個 count 的執(zhí)行方式類似。

可以看到邏輯執(zhí)行圖描述的是 job 的數(shù)據(jù)流:job 會經(jīng)過哪些 transformation(),中間生成哪些 RDD 及 RDD 之間的依賴關(guān)系。

Job 物理執(zhí)行圖

邏輯執(zhí)行圖表示的是數(shù)據(jù)上的依賴關(guān)系,不是 task 的執(zhí)行圖。在 Hadoop 中,用戶直接面對 task,mapper 和 reducer 的職責(zé)分明:一個進行分塊處理,一個進行 aggregate。Hadoop 中 整個數(shù)據(jù)流是固定的,只需要填充 map() 和 reduce() 函數(shù)即可。Spark 面對的是更復(fù)雜的數(shù)據(jù)處理流程,數(shù)據(jù)依賴更加靈活,很難將數(shù)據(jù)流和物理 task 簡單地統(tǒng)一在一起。因此 Spark 將數(shù)據(jù)流和具體 task 的執(zhí)行流程分開,并設(shè)計算法將邏輯執(zhí)行圖轉(zhuǎn)換成 task 物理執(zhí)行圖,轉(zhuǎn)換算法后面的章節(jié)討論。

針對這個 job,我們先畫出它的物理執(zhí)行 DAG 圖如下:

Job 物理執(zhí)行圖

可以看到 GroupByTest 這個 application 產(chǎn)生了兩個 job,第一個 job 由第一個 action(也就是 pairs1.count)觸發(fā)產(chǎn)生,分析一下第一個 job:

  • 整個 job 只包含 1 個 stage(不明白什么是stage沒關(guān)系,后面章節(jié)會解釋,這里只需知道有這樣一個概念)。
  • Stage 0 包含 100 個 ResultTask。
  • 每個 task 先計算 flatMap,產(chǎn)生 FlatMappedRDD,然后執(zhí)行 action() 也就是 count(),統(tǒng)計每個 partition 里 records 的個數(shù),比如 partition 99 里面只含有 9 個 records。
  • 由于 pairs1 被聲明要進行 cache,因此在 task 計算得到 FlatMappedRDD 后會將其包含的 partitions 都 cache 到 executor 的內(nèi)存。
  • task 執(zhí)行完后,driver 收集每個 task 的執(zhí)行結(jié)果,然后進行 sum()。
  • job 0 結(jié)束。

第二個 job 由 pairs1.groupByKey(numReducers).count 觸發(fā)產(chǎn)生。分析一下該 job:

  • 整個 job 包含 2 個 stage。
  • Stage 1 包含 100 個 ShuffleMapTask,每個 task 負責(zé)從 cache 中讀取 pairs1 的一部分數(shù)據(jù)并將其進行類似 Hadoop 中 mapper 所做的 partition,最后將 partition 結(jié)果寫入本地磁盤。
  • Stage 0 包含 36 個 ResultTask,每個 task 首先 shuffle 自己要處理的數(shù)據(jù),邊 fetch 數(shù)據(jù)邊進行 aggregate 以及后續(xù)的 mapPartitions() 操作,最后進行 count() 計算得到 result。
  • task 執(zhí)行完后,driver 收集每個 task 的執(zhí)行結(jié)果,然后進行 sum()。
  • job 1 結(jié)束。

可以看到物理執(zhí)行圖并不簡單。與 MapReduce 不同的是,Spark 中一個 application 可能包含多個 job,每個 job 包含多個 stage,每個 stage 包含多個 task。怎么劃分 job,怎么劃分 stage,怎么劃分 task 等等問題會在后面的章節(jié)介紹。

Discussion

到這里,我們對整個系統(tǒng)和 job 的生成與執(zhí)行有了概念,而且還探討了 cache 等特性。
接下來的章節(jié)會討論 job 生成與執(zhí)行涉及到的系統(tǒng)核心功能,包括:

  1. 如何生成邏輯執(zhí)行圖
  2. 如何生成物理執(zhí)行圖
  3. 如何提交與調(diào)度 Job
  4. Task 如何生成、執(zhí)行與結(jié)果處理
  5. 如何進行 shuffle
  6. cache機制
  7. broadcast 機制
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容