上一節(jié)我們是不是講到,Driver,Application注冊到Master上面后,Master中調(diào)用scheduler()進(jìn)行資源調(diào)度,在這個里面通過LaunchDriver(),LaunchExecutor(),向Worker發(fā)出啟動Driver,Exeutor的請求(或者說命令),
Worker接收到發(fā)來的請求,通過創(chuàng)建DriverRunner,ExecutorRunner線程來啟動我們的Driver與Application.在啟動完成后,根據(jù)第一章節(jié)我們分析的Spark核心原理,Executor會反向注冊到Driver上,這樣Driver就清楚哪些Executor在執(zhí)行Application,實際上到了這個時候,我們的SparkContext已經(jīng)全部初使化完成。
接下來我們就要繼續(xù)執(zhí)行我們自己編寫的代碼程序,其實一個Application包括多個JOB,那么JOB是如何劃分的呢? 實際上一個Action操作,會劃分一個JOB,就是說多個Action操作就會有多個JOB,JOB執(zhí)行的順序是從第一個開始。
下面我們以wordcount實例來詳細(xì)分析一下JOB的劃分:
val lines = sc.textFile(...)
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
counts.foreach(count => println(count._1 + ": " + count._2))
看到上面的幾行代碼,大家是不是太熟悉了,這就是我們學(xué)習(xí)spark的第一個例子。
首先我們先來分析第一行代碼:
val lines = sc.textFile()
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
上面的代碼是不是很熟悉,
- 首先,textFile會調(diào)用一個hadoopFile(...)的方法,我們看里面的參數(shù),TextInputFormat這是不是很熟悉,這是hadoop里讀取文本文件的解析器啊
- 后面的classOf[LongWritable], classOf[Text]這就是hadoop
map()函數(shù)的k1,v1吧
這可都是hadoop里的東東。下面我們看hadoopFile方法的代碼
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
//這個是不是我們把hadoop配置數(shù)據(jù)序列化后在廣播出去,共享廣播這個我們在之前講過,不會忘記吧!
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
不難看出,這個方法返回了一個HadoopRDD算子
接著我們看后面調(diào)用了map(...),map方法具體我們這里就不分析,以前的章節(jié)我們分析過了,
.map(pair => pair._2.toString).setName(path)
其中的pair是不是一個tuple,也就是我們Hadoop的k1,v1,那么pair._2.toStrig,是不是我們讀取文件的每一行內(nèi)容。
接下來我們繼續(xù)執(zhí)行代碼:
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
這兩行與上面分析map的相似,我們就不重復(fù)了。接著我們執(zhí)行
val counts = pairs.reduceByKey(_ + _)
這行代碼我們驚奇的發(fā)現(xiàn)在RDD類里沒有reduceByKey方法,這是為什么了?沒有方法如何調(diào)用。
大家是不是想到scala中的隱式轉(zhuǎn)換,在RDD類中找查到了隱式轉(zhuǎn)換方法rddToPairRDDFunctions(),把rdd轉(zhuǎn)換成了rddToPairRDDFunctions,返回這個對象,如下面代碼
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
果然在PairRDDFunction類中找到這個方法:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}`
接下來我們執(zhí)行最后一行代碼,
counts.foreach(count => println(count._1 + ": " + count._2))
當(dāng)我們看foreach時,明白這就是一個action算子。它調(diào)用了sparkContext的runJob方法,來劃分一個JOB
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
//執(zhí)行SparkContext中的runJob
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
經(jīng)過多個runJob()重載的調(diào)用,最后我們找到最終的調(diào)用方法:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//調(diào)用sparkContext之前初使化 時創(chuàng)建的DAGScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
看到這里大家 是不是明白了,最終其實就是調(diào)用了sparkContext之前初使化 時創(chuàng)建的DAGScheduler的runJob方法,里面的第一個參數(shù)rdd是不是以前的rdd算子。
這里也說明了JOB的劃分都是在DAGScheduler里完成的。接下來我們會在在下一章節(jié)會對DAGScheduler如何劃分JOB進(jìn)行深入的分析.
本章中每一個字(包括源碼注解)都是作者敲出來的,你感覺有用,幫點擊'喜歡'