==Spark系列(九)DAGScheduler工作原理

Spark系列(九)DAGScheduler工作原理 - 會(huì)飛的紙盒 - 博客園
http://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BDAGScheduler%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html

1、textFile方法的實(shí)現(xiàn)內(nèi)部先通過hadoopFile創(chuàng)建HadoopRDD(key-value對(duì)格式,key為文本文件的每一行偏移量,value為每行的內(nèi)容),再轉(zhuǎn)換為MapPartitionsRDD(每個(gè)集合元素只包含每行的內(nèi)容)


以wordcount為示例進(jìn)行深入分析

1
object wordcount {

2

3
def main(args: Array[String]) {

4
val conf = new SparkConf()

5
conf.setAppName("****wordcount****").setMaster("****local****")

6

7
val sc = new SparkContext(conf)

8
// 產(chǎn)生HadoopRDD->MapPartitionsRDD

9
val lines = sc.textFile("****C://Users//Administrator//Desktop//wordcount.txt****", 1)

10
// 產(chǎn)生FlatMappedRDD

11
val words = lines.flatMap(line=>line.split("**** ****"))

12
// 產(chǎn)生MapPartitionsRDD

13
val pairs = words.map(word=>(word,1))

14
//產(chǎn)生MapPartitionsRDD -> ShuffleRDD -> MapPartitionsRDD, 產(chǎn)生三個(gè)RDD

15
val result= pairs.reduceByKey(_ + _);

16
// foreach為action操作,通過SparkContext的runJob方法去觸發(fā)job(DAGScheduler)

17
result.foreach(count=>println(count))

18
}

19
}

說明:

1、textFile方法的實(shí)現(xiàn)內(nèi)部先通過hadoopFile創(chuàng)建HadoopRDD(key-value對(duì)格式,key為文本文件的每一行偏移量,value為每行的內(nèi)容),再轉(zhuǎn)換為MapPartitionsRDD(每個(gè)集合元素只包含每行的內(nèi)容)

2、RDD里是沒有reduceByKey的,因此對(duì)RDD調(diào)用reduceByKey()方法的時(shí)候,會(huì)觸發(fā)scala的隱式轉(zhuǎn)換;此時(shí)就會(huì)在作用域內(nèi),尋找隱式轉(zhuǎn)換,會(huì)在RDD中找到rddToPairRDDFunctions()隱式轉(zhuǎn)換,然后將RDD轉(zhuǎn)換為PairRDDFunctions。

stage劃分算法說明

從觸發(fā)action操作的rdd開始往前倒推,首先會(huì)為最后一個(gè)rdd創(chuàng)建一個(gè)stage,繼續(xù)往前倒退的時(shí)候,如果發(fā)現(xiàn)對(duì)某個(gè) rdd是寬依賴,那么就會(huì)將該寬依賴的rdd創(chuàng)建一個(gè)新的stage,之前面的那個(gè)rdd就是新的stage的最后一個(gè)rdd。然后以次類推,繼續(xù)往前倒退,根據(jù)窄依賴和寬依賴進(jìn)行stage的劃分,知道所有的rdd全部遍歷完成。

劃分stage的作用

在spark中提交的應(yīng)用都會(huì)以job的形式進(jìn)行執(zhí)行,job提交后會(huì)被劃分為多個(gè)stage,然后把stage封裝為TaskSet提交到TaskScheduler到executor中執(zhí)行。

源碼分析

以上wordcount程序action操作后執(zhí)行流程:

foreach(RDD.scala) -> runJob(SparkContext.scala) -> runJob(DAGScheduler.scala) -> submitJob(DAGScheduler.scala) -> eventProcessLoop.post發(fā)送JobSubmitted(DAGScheduler.scala) -> onReceive(DAGScheduler.scala)->case JobSubmitted -> handleJobSubmitted (入口)

DAGScheduler實(shí)現(xiàn)類所屬包:org.apache.spark.scheduler

**handleJobSubmitted
**
**功能:stage的依賴分析及生成stage和對(duì)應(yīng)的Job提交
**
1
private[scheduler] def handleJobSubmitted(jobId: Int,

2
finalRDD: RDD[_],

3
func: (TaskContext, Iterator[_]) => _,

4
partitions: Array[Int],

5
allowLocal: Boolean,

6
callSite: CallSite,

7
listener: JobListener,

8
properties: Properties = null)

9
{

10
var finalStage: Stage = null

11
try {

12
// New stage creation may throw an exception if, for example, jobs are run on a

13
// HadoopRDD whose underlying HDFS files have been deleted.

14
// 使用job的最后一個(gè)rdd創(chuàng)建finalStage,并加入到DAGScheduler內(nèi)部緩存中(stageIdToStage)

15
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)

16
} catch {

17
case e: Exception =>

18
logWarning("****Creating**** ****new**** ****stage**** ****failed**** ****due**** ****to**** ****exception**** ****-**** ****job:**** ****" + jobId, e)

19
listener.jobFailed(e)

20
return

21
}

22
if (finalStage != null) {

23
// 使用finalStage創(chuàng)建一個(gè)Job,也就是該Job的最后一個(gè)stage

24
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

25
clearCacheLocs()

26
logInfo("****Got**** ****job**** ****%s**** ****(%s)**** ****with**** ****%d**** ****output**** ****partitions**** ****(allowLocal=%s)****".format(

27
job.jobId, callSite.shortForm, partitions.length, allowLocal))

28
logInfo("****Final**** ****stage:**** ****" + finalStage + "****(****" + finalStage.name + "****)****")

29
logInfo("****Parents**** ****of**** ****final**** ****stage:**** ****" + finalStage.parents)

30
logInfo("****Missing**** ****parents:**** ****" + getMissingParentStages(finalStage))

31
val shouldRunLocally =

32
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1

33
val jobSubmissionTime = clock.getTimeMillis()

34
// 對(duì)于沒有父stage的job 本地執(zhí)行

35
if (shouldRunLocally) {

36
// Compute very short actions like first() or take() with no parent stages locally.

37
listenerBus.post(

38
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))

39
// 本地執(zhí)行Job

40
runLocally(job)

41
} else {

42
// 將Job加入內(nèi)存緩存中

43
jobIdToActiveJob(jobId) = job

44
activeJobs += job

45
finalStage.resultOfJob = Some(job)

46
val stageIds = jobIdToStageIds(jobId).toArray

47
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

48
listenerBus.post(

49
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

50
// 提交stage,所有的stage都放入waitingStages隊(duì)列里

51
submitStage(finalStage)

52
}

53
}

54
submitWaitingStages()

55
}

submitStage

功能:stage劃分算法實(shí)現(xiàn)入口

1
private def submitStage(stage: Stage) {

2
val jobId = activeJobForStage(stage)

3
if (jobId.isDefined) {

4
logDebug("****submitStage(****" + stage + "****)****")

5
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

6
//獲取當(dāng)前stage的父stage

7
val missing = getMissingParentStages(stage).sortBy(_.id)

8
logDebug("****missing:**** ****" + missing)

9
if (missing == Nil) {

10
logInfo("****Submitting**** ****" + stage + "**** ****(****" + stage.rdd + "****),**** ****which**** ****has**** ****no**** ****missing**** ****parents****")

11
// 為stage創(chuàng)建task,且task數(shù)據(jù)與partition數(shù)量相同

12
submitMissingTasks(stage, jobId.get)

13
} else {

14
// 提交父stage

15
for (parent <- missing) {

16
submitStage(parent)

17
}

18
// 將stage加入waitingStages緩存中

19
waitingStages += stage

20
}

21
}

22
} else {

23
abortStage(stage, "****No**** ****active**** ****job**** ****for**** ****stage**** ****" + stage.id)

24
}

25
}


getMissingParentStages

**
**
功能:
stage劃分算法的具體實(shí)現(xiàn)

實(shí)現(xiàn)原理:

對(duì)于一個(gè)stage如果它的最后一個(gè)rdd的所有依賴都是窄依賴,那么不會(huì)創(chuàng)建新的stage,但如果存在寬依賴,就用寬依賴的那個(gè)rdd

創(chuàng)建一個(gè)新的stage并返回

1
// stage劃分算法的具體實(shí)現(xiàn)

2
// 對(duì)于一個(gè)stage如果它的最后一個(gè)rdd的所有依賴都是窄依賴,那么不會(huì)創(chuàng)建新的stage,

3
// 但如果存在寬依賴,就用寬依賴的那個(gè)rdd創(chuàng)建一個(gè)新的stage并返回

4
private def getMissingParentStages(stage: Stage): List[Stage] = {

5
val missing = new HashSet[Stage]

6
val visited = new HashSet[RDD[_]]

7
// We are manually maintaining a stack here to prevent StackOverflowError

8
// caused by recursively visiting

9
val waitingForVisit = new Stack[RDD[_]]

10
def visit(rdd: RDD[_]) {

11
if (!visited(rdd)) {

12
visited += rdd

13
if (getCacheLocs(rdd).contains(Nil)) {

14
// 遍歷RDD

15
for (dep <- rdd.dependencies) {

16
dep match {

17
// 寬依賴處理

18
case shufDep: ShuffleDependency[_, _, _] =>

19
// 創(chuàng)建stage,并將isShuffleMap設(shè)置為true

20
val mapStage = getShuffleMapStage(shufDep, stage.jobId)

21
if (!mapStage.isAvailable) {

22
// 將新創(chuàng)建的stage緩存到missing中

23
missing += mapStage

24
}

25
// 窄依賴處理

26
case narrowDep: NarrowDependency[_] =>

27
// 將依賴的rdd放入棧中

28
waitingForVisit.push(narrowDep.rdd)

29
}

30
}

31
}

32
}

33
}

34
// 向waitingForVisit棧中壓rdd

35
waitingForVisit.push(stage.rdd)

36
while (!waitingForVisit.isEmpty) {

37
visit(waitingForVisit.pop())

38
}

39
// 返回stage列表

40
missing.toList

41
}

說明:

stage劃分算法由submitStage()方法和getMissingStages()方法共同組成

submitMissingTasks

功能:

為stage創(chuàng)建一批task,且task數(shù)量與partition數(shù)量相同

1
// 為stage創(chuàng)建一批task,且task數(shù)量與partition數(shù)量相同

2
private def submitMissingTasks(stage: Stage, jobId: Int) {

3
logDebug("****submitMissingTasks(****" + stage + "****)****")

4
// Get our pending tasks and remember them in our pendingTasks entry

5
stage.pendingTasks.clear()

6

7
// First figure out the indexes of partition ids to compute.

8
// 獲取需要?jiǎng)?chuàng)建的partition數(shù)量

9
val partitionsToCompute: Seq[Int] = {

10
if (stage.isShuffleMap) {

11
(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)

12
} else {

13
val job = stage.resultOfJob.get

14
(0 until job.numPartitions).filter(id => !job.finished(id))

15
}

16
}

17

18
................................

19

20
// 將stae加入到runningStages緩存中

21
runningStages += stage

22

23
................................

24

25
// 為stage創(chuàng)建指定數(shù)量的task,并計(jì)算最佳位置

26
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {

27
partitionsToCompute.map { id =>

28
// 計(jì)算最佳位置

29
val locs = getPreferredLocs(stage.rdd, id)

30
val part = stage.rdd.partitions(id)

31
// 創(chuàng)建ShuffleMapTask

32
new ShuffleMapTask(stage.id, taskBinary, part, locs)

33
}

34
} else {

35
val job = stage.resultOfJob.get

36
partitionsToCompute.map { id =>

37
val p: Int = job.partitions(id)

38
val part = stage.rdd.partitions(p)

39
val locs = getPreferredLocs(stage.rdd, p)

40
// 給final stage創(chuàng)建ResultTask

41
new ResultTask(stage.id, taskBinary, part, locs, id)

42
}

43
}

44

45
if (tasks.size > 0) {

46
logInfo("****Submitting**** ****" + tasks.size + "**** ****missing**** ****tasks**** ****from**** ****" + stage + "**** ****(****" + stage.rdd + "****)****")

47
stage.pendingTasks ++= tasks

48
logDebug("****New**** ****pending**** ****tasks:**** ****" + stage.pendingTasks)

49
// 對(duì)stage的task創(chuàng)建TaskSet對(duì)象,調(diào)用TaskScheduler的submitTasks()方法提交TaskSet

50
taskScheduler.submitTasks(

51
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

52
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

53
}

54

55
......................

56
}

getPreferredLocsInternal

功能:

計(jì)算每個(gè)task對(duì)應(yīng)的partition最佳位置,從stage的最后一個(gè)rdd開始查找,看rdd的partition是否有被cache、chencjpoint,如果有那么task的最佳位置就被cache或者checkpoint的partition的位置

調(diào)用過程:

submitMissingTasks->getPreferredLocs->getPreferredLocsInternal

1
// 計(jì)算每個(gè)task對(duì)應(yīng)的partition最佳位置

2
// 從stage的最后一個(gè)rdd開始查找,看rdd的partition是否有被cache、chencjpoint,

3
// 如果有那么task的最佳位置就被cache或者checkpoint的partition的位置

4
private def getPreferredLocsInternal(

5
rdd: RDD[_],

6
partition: Int,

7
visited: HashSet[(RDD[_],Int)])

8
: Seq[TaskLocation] =

9
{

10
// If the partition has already been visited, no need to re-visit.

11
// This avoids exponential path exploration. SPARK-695

12
if (!visited.add((rdd,partition))) {

13
// Nil has already been returned for previously visited partitions.

14
return Nil

15
}

16
// If the partition is cached, return the cache locations

17
// 尋找rdd是否被緩存

18
val cached = getCacheLocs(rdd)(partition)

19
if (!cached.isEmpty) {

20
return cached

21
}

22
// If the RDD has some placement preferences (as is the case for input RDDs), get those

23
// 尋找當(dāng)前RDD是否被cachepoint

24
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

25
if (!rddPrefs.isEmpty) {

26
return rddPrefs.map(TaskLocation(_))

27
}

28
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep

29
// that has any placement preferences. Ideally we would choose based on transfer sizes,

30
// but this will do for now.

31
// 遞歸調(diào)用自己尋找rdd的父rdd,檢查對(duì)應(yīng)的partition是否被緩存或者checkpoint

32
rdd.dependencies.foreach {

33
case n: NarrowDependency[_] =>

34
for (inPart <- n.getParents(partition)) {

35
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

36
if (locs != Nil) {

37
return locs

38
}

39
}

40
case _ =>

41
}

42
// 如果stage從最后一個(gè)rdd到最開始的rdd,partiton都沒有被緩存或者cachepoint,

43
// 那么task的最佳位置(preferredLocs)為Nil

44
Nil

45
}

分類: Spark

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spark Job執(zhí)行流程大體如下:用戶提交Job后會(huì)生成SparkContext對(duì)象,SparkContext向...
    imarch1閱讀 3,687評(píng)論 0 7
  • 本文基于spark2.11 1. 前言 1.1 基本概念 RDD關(guān)于RDD已經(jīng)有很多文章了,可以參考一下理解Spa...
    aaron1993閱讀 1,889評(píng)論 0 3
  • 版權(quán)聲明:本文為原創(chuàng)文章,未經(jīng)允許不得轉(zhuǎn)載。 Spark程序程序job的運(yùn)行是通過actions算子觸發(fā)的,每一個(gè)...
    lehi閱讀 1,187評(píng)論 0 0
  • 介紹 本篇文章主要摘自Spark官網(wǎng)的Spark Programming Guide,在之前的一篇文章中已經(jīng)有對(duì)這...
    shohokuooo閱讀 2,391評(píng)論 1 48
  • 版權(quán)聲明:本文為原創(chuàng)文章,未經(jīng)允許不得轉(zhuǎn)載。復(fù)習(xí)內(nèi)容:Spark中Stage的提交 http://www.jian...
    lehi閱讀 634評(píng)論 0 0

友情鏈接更多精彩內(nèi)容