概覽
拿到系統(tǒng)后,部署系統(tǒng)是第一件事,那么系統(tǒng)部署成功以后,各個節(jié)點都啟動了哪些服務(wù)?
部署圖

從部署圖中可以看到
整個集群分為 Master 節(jié)點和 Worker 節(jié)點,相當(dāng)于 Hadoop 的 Master 和 Slave 節(jié)點。
Master 節(jié)點上常駐 Master 守護進程,負責(zé)管理全部的 Worker 節(jié)點。
Worker 節(jié)點上常駐 Worker 守護進程,負責(zé)與 Master 節(jié)點通信并管理 executors。
-
Driver 官方解釋是 “The process running the main() function of the application and creating the SparkContext”。Application 就是用戶自己寫的 Spark 程序(driver program),比如 WordCount.scala。如果 driver program 在 Master 上運行,比如在 Master 上運行
./bin/run-example SparkPi 10
那么 SparkPi 就是 Master 上的 Driver。如果是 YARN 集群,那么 Driver 可能被調(diào)度到 Worker 節(jié)點上運行(比如上圖中的 Worker Node 2)。另外,如果直接在自己的 PC 上運行 driver program,比如在 Eclipse 中運行 driver program,使用
```scala
val sc = new SparkContext("spark://master:7077", "AppName")
```
去連接 master 的話,driver 就在自己的 PC 上,但是不推薦這樣的方式,因為 PC 和 Workers 可能不在一個局域網(wǎng),driver 和 executor 之間的通信會很慢。
每個 Worker 上存在一個或者多個 ExecutorBackend 進程。每個進程包含一個 Executor 對象,該對象持有一個線程池,每個線程可以執(zhí)行一個 task。
每個 application 包含一個 driver 和多個 executors,每個 executor 里面運行的 tasks 都屬于同一個 application。
-
在 Standalone 版本中,ExecutorBackend 被實例化成 CoarseGrainedExecutorBackend 進程。
在我部署的集群中每個 Worker 只運行了一個 CoarseGrainedExecutorBackend 進程,沒有發(fā)現(xiàn)如何配置多個 CoarseGrainedExecutorBackend 進程。(應(yīng)該是運行多個 applications 的時候會產(chǎn)生多個進程,這個我還沒有實驗,)
想了解 Worker 和 Executor 的關(guān)系詳情,可以參閱 @OopsOutOfMemory 同學(xué)寫的 Spark Executor Driver資源調(diào)度小結(jié) 。
Worker 通過持有 ExecutorRunner 對象來控制 CoarseGrainedExecutorBackend 的啟停。
了解了部署圖之后,我們先給出一個 job 的例子,然后概覽一下 job 如何生成與運行。
Job 例子
我們使用 Spark 自帶的 examples 包中的 GroupByTest,假設(shè)在 Master 節(jié)點運行,命令是
/* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */
bin/run-example GroupByTest 100 10000 1000 36
GroupByTest 具體代碼如下
package org.apache.spark.examples
import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
/**
* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
*/
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
var numMappers = 100
var numKVPairs = 10000
var valSize = 1000
var numReducers = 36
val sc = new SparkContext(sparkConf)
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count
println(pairs1.groupByKey(numReducers).count)
sc.stop()
}
}
閱讀代碼后,用戶頭腦中 job 的執(zhí)行流程是這樣的:

具體流程很簡單,這里來估算下 data size 和執(zhí)行結(jié)果:
- 初始化 SparkConf()。
- 初始化 numMappers=100, numKVPairs=10,000, valSize=1000, numReducers= 36。
- 初始化 SparkContext。這一步很重要,是要確立 driver 的地位,里面包含創(chuàng)建 driver 所需的各種 actors 和 objects。
- 每個 mapper 生成一個
arr1: Array[(Int, Byte[])],length 為 numKVPairs。每一個 Byte[] 的 length 為 valSize,Int 為隨機生成的整數(shù)。Size(arr1) = numKVPairs * (4 + valSize) = 10MB,所以Size(pairs1) = numMappers * Size(arr1) =1000MB。這里的數(shù)值計算結(jié)果都是約等于。 - 每個 mapper 將產(chǎn)生的 arr1 數(shù)組 cache 到內(nèi)存。
- 然后執(zhí)行一個 action 操作 count(),來統(tǒng)計所有 mapper 中 arr1 中的元素個數(shù),執(zhí)行結(jié)果是
numMappers * numKVPairs = 1,000,000。這一步主要是為了將每個 mapper 產(chǎn)生的 arr1 數(shù)組 cache 到內(nèi)存。 - 在已經(jīng)被 cache 的 paris1 上執(zhí)行 groupByKey 操作,groupByKey 產(chǎn)生的 reducer (也就是 partition) 個數(shù)為 numReducers。理論上,如果 hash(Key) 比較平均的話,每個 reducer 收到的 <Int, Array[Byte]> record 個數(shù)為
numMappers * numKVPairs / numReducer = 27,777,大小為Size(pairs1) / numReducer = 27MB。 - reducer 將收到的
<Int, Byte[]>records 中擁有相同 Int 的 records 聚在一起,得到<Int, list(Byte[], Byte[], ..., Byte[])>。 - 最后 count 將所有 reducer 中 records 個數(shù)進行加和,最后結(jié)果實際就是 pairs1 中不同的 Int 總個數(shù)。
Job 邏輯執(zhí)行圖
Job 的實際執(zhí)行流程比用戶頭腦中的要復(fù)雜,需要先建立邏輯執(zhí)行圖(或者叫數(shù)據(jù)依賴圖),然后劃分邏輯執(zhí)行圖生成 DAG 型的物理執(zhí)行圖,然后生成具體 task 執(zhí)行。分析一下這個 job 的邏輯執(zhí)行圖:
使用 RDD.toDebugString 可以看到整個 logical plan (RDD 的數(shù)據(jù)依賴關(guān)系)如下
MapPartitionsRDD[3] at groupByKey at GroupByTest.scala:51 (36 partitions)
ShuffledRDD[2] at groupByKey at GroupByTest.scala:51 (36 partitions)
FlatMappedRDD[1] at flatMap at GroupByTest.scala:38 (100 partitions)
ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:38 (100 partitions)
用圖表示就是:

需要注意的是 data in the partition 展示的是每個 partition 應(yīng)該得到的計算結(jié)果,并不意味著這些結(jié)果都同時存在于內(nèi)存中。
根據(jù)上面的分析可知:
- 用戶首先 init 了一個0-99 的數(shù)組:
0 until numMappers - parallelize() 產(chǎn)生最初的 ParrallelCollectionRDD,每個 partition 包含一個整數(shù) i。
- 執(zhí)行 RDD 上的 transformation 操作(這里是 flatMap)以后,生成 FlatMappedRDD,其中每個 partition 包含一個 Array[(Int, Array[Byte])]。
- 第一個 count() 執(zhí)行時,先在每個 partition 上執(zhí)行 count,然后執(zhí)行結(jié)果被發(fā)送到 driver,最后在 driver 端進行 sum。
- 由于 FlatMappedRDD 被 cache 到內(nèi)存,因此這里將里面的 partition 都換了一種顏色表示。
- groupByKey 產(chǎn)生了后面兩個 RDD,為什么產(chǎn)生這兩個在后面章節(jié)討論。
- 如果 job 需要 shuffle,一般會產(chǎn)生 ShuffledRDD。該 RDD 與前面的 RDD 的關(guān)系類似于 Hadoop 中 mapper 輸出數(shù)據(jù)與 reducer 輸入數(shù)據(jù)之間的關(guān)系。
- MapPartitionsRDD 里包含 groupByKey() 的結(jié)果。
- 最后將 MapPartitionsRDD 中的 每個value(也就是Array[Byte])都轉(zhuǎn)換成 Iterable 類型。
- 最后的 count 與上一個 count 的執(zhí)行方式類似。
可以看到邏輯執(zhí)行圖描述的是 job 的數(shù)據(jù)流:job 會經(jīng)過哪些 transformation(),中間生成哪些 RDD 及 RDD 之間的依賴關(guān)系。
Job 物理執(zhí)行圖
邏輯執(zhí)行圖表示的是數(shù)據(jù)上的依賴關(guān)系,不是 task 的執(zhí)行圖。在 Hadoop 中,用戶直接面對 task,mapper 和 reducer 的職責(zé)分明:一個進行分塊處理,一個進行 aggregate。Hadoop 中 整個數(shù)據(jù)流是固定的,只需要填充 map() 和 reduce() 函數(shù)即可。Spark 面對的是更復(fù)雜的數(shù)據(jù)處理流程,數(shù)據(jù)依賴更加靈活,很難將數(shù)據(jù)流和物理 task 簡單地統(tǒng)一在一起。因此 Spark 將數(shù)據(jù)流和具體 task 的執(zhí)行流程分開,并設(shè)計算法將邏輯執(zhí)行圖轉(zhuǎn)換成 task 物理執(zhí)行圖,轉(zhuǎn)換算法后面的章節(jié)討論。
針對這個 job,我們先畫出它的物理執(zhí)行 DAG 圖如下:

可以看到 GroupByTest 這個 application 產(chǎn)生了兩個 job,第一個 job 由第一個 action(也就是 pairs1.count)觸發(fā)產(chǎn)生,分析一下第一個 job:
- 整個 job 只包含 1 個 stage(不明白什么是stage沒關(guān)系,后面章節(jié)會解釋,這里只需知道有這樣一個概念)。
- Stage 0 包含 100 個 ResultTask。
- 每個 task 先計算 flatMap,產(chǎn)生 FlatMappedRDD,然后執(zhí)行 action() 也就是 count(),統(tǒng)計每個 partition 里 records 的個數(shù),比如 partition 99 里面只含有 9 個 records。
- 由于 pairs1 被聲明要進行 cache,因此在 task 計算得到 FlatMappedRDD 后會將其包含的 partitions 都 cache 到 executor 的內(nèi)存。
- task 執(zhí)行完后,driver 收集每個 task 的執(zhí)行結(jié)果,然后進行 sum()。
- job 0 結(jié)束。
第二個 job 由 pairs1.groupByKey(numReducers).count 觸發(fā)產(chǎn)生。分析一下該 job:
- 整個 job 包含 2 個 stage。
- Stage 1 包含 100 個 ShuffleMapTask,每個 task 負責(zé)從 cache 中讀取 pairs1 的一部分數(shù)據(jù)并將其進行類似 Hadoop 中 mapper 所做的 partition,最后將 partition 結(jié)果寫入本地磁盤。
- Stage 0 包含 36 個 ResultTask,每個 task 首先 shuffle 自己要處理的數(shù)據(jù),邊 fetch 數(shù)據(jù)邊進行 aggregate 以及后續(xù)的 mapPartitions() 操作,最后進行 count() 計算得到 result。
- task 執(zhí)行完后,driver 收集每個 task 的執(zhí)行結(jié)果,然后進行 sum()。
- job 1 結(jié)束。
可以看到物理執(zhí)行圖并不簡單。與 MapReduce 不同的是,Spark 中一個 application 可能包含多個 job,每個 job 包含多個 stage,每個 stage 包含多個 task。怎么劃分 job,怎么劃分 stage,怎么劃分 task 等等問題會在后面的章節(jié)介紹。
Discussion
到這里,我們對整個系統(tǒng)和 job 的生成與執(zhí)行有了概念,而且還探討了 cache 等特性。
接下來的章節(jié)會討論 job 生成與執(zhí)行涉及到的系統(tǒng)核心功能,包括:
- 如何生成邏輯執(zhí)行圖
- 如何生成物理執(zhí)行圖
- 如何提交與調(diào)度 Job
- Task 如何生成、執(zhí)行與結(jié)果處理
- 如何進行 shuffle
- cache機制
- broadcast 機制