目錄
前言
前面兩篇文章一直在講SparkContext初始化的內(nèi)部邏輯,除此之外,它也對(duì)外提供一部分其他功能,我們挑選幾個(gè)主要的來(lái)簡(jiǎn)要了解。SparkContext還有一個(gè)伴生對(duì)象,里面涉及到一些SparkContext創(chuàng)建的內(nèi)部機(jī)制。
本文就是SparkContext概況的收尾。在它的背后,還有形形色色的更加底層的邏輯等著我們?nèi)ヌ剿鳌?/p>
SparkContext提供的其他功能
生成RDD
在文章#0中,我們提到了生成RDD的兩種方法,一是對(duì)內(nèi)存中存在的數(shù)據(jù)執(zhí)行并行化(Parallelize)操作,二是從外部存儲(chǔ)中的數(shù)據(jù)源讀取。這兩類(lèi)方法都在SparkContext中。以下是parallelize()方法的代碼。
代碼#4.1 - o.a.s.SparkContext.parallelize()方法
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
該方法生成的RDD類(lèi)型為ParallelCollectionRDD。numSlices就是該RDD的分區(qū)數(shù),默認(rèn)值與TaskScheduler的Task并行度相同。這個(gè)方法非常簡(jiǎn)單,因此在Spark入門(mén)教程中經(jīng)常會(huì)用到它。
從外部數(shù)據(jù)源讀取并生成RDD的方法比較多,為了簡(jiǎn)潔,我們只看代碼#0.1中出現(xiàn)的textFile()方法。
代碼#4.2 - o.a.s.SparkContext.textFile()與hadoopFile()方法
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
FileSystem.getLocal(hadoopConfiguration)
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
可見(jiàn),textFile()方法用TextInputFormat格式讀取HDFS上指定路徑的文件,生成HadoopRDD,再將其中的具體內(nèi)容用map()算子提取出來(lái)。HadoopRDD是一個(gè)Pair RDD,它內(nèi)部存儲(chǔ)的是二元組,如上面代碼中的(LongWritable, Text)二元組。
廣播變量
廣播變量是Spark兩種共享變量中的一種。所謂廣播,就是Driver直接向每個(gè)Worker節(jié)點(diǎn)發(fā)送同一份數(shù)據(jù)的只讀副本,而不像通常一樣通過(guò)Task來(lái)計(jì)算。廣播變量適合處理多節(jié)點(diǎn)跨Stage的共享數(shù)據(jù),特別是輸入數(shù)據(jù)量較大的集合,可以提高效率。
下面是broadcast()方法的源碼。它在上文代碼#4.2中已經(jīng)出現(xiàn)過(guò),用來(lái)廣播序列化過(guò)的Hadoop配置信息。
代碼#4.3 - o.a.s.SparkContext.broadcast()方法
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
廣播變量的產(chǎn)生依賴(lài)于Spark執(zhí)行環(huán)境里的廣播管理器BroadcastManager,因此在之后閱讀SparkEnv的源碼時(shí),會(huì)詳細(xì)分析廣播的內(nèi)部機(jī)制。
累加器
累加器與廣播變量一樣,也是Spark的共享變量。顧名思義,累加器就是一個(gè)能夠累積結(jié)果值的變量,最常見(jiàn)的用途是做計(jì)數(shù)。它在Driver端創(chuàng)建和讀取,Executor端(也就是各個(gè)Task)只能做累加操作。SparkContext已經(jīng)提供了數(shù)值型累加器的創(chuàng)建方法,如長(zhǎng)整型的LongAccumulator。
代碼#4.4 - o.a.s.SparkContext.longAccumulator()方法
def longAccumulator: LongAccumulator = {
val acc = new LongAccumulator
register(acc)
acc
}
def longAccumulator(name: String): LongAccumulator = {
val acc = new LongAccumulator
register(acc, name)
acc
}
所有累加器的基類(lèi)都是AccumulatorV2抽象類(lèi),我們也可以自定義其他類(lèi)型的累加器。特征AccumulatorParam則用于封裝累加器對(duì)應(yīng)的數(shù)據(jù)類(lèi)型及累加操作,在后面的文章中也會(huì)閱讀到與累加器相關(guān)的源碼。
運(yùn)行Job
SparkContext提供了很多種runJob()方法的重載來(lái)運(yùn)行一個(gè)Job,也就是觸發(fā)RDD動(dòng)作算子的執(zhí)行。歸根結(jié)底,所有runJob()方法的重載都會(huì)調(diào)用如下所示的邏輯。
代碼#4.5 - o.a.s.SparkContext.runJob()方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
可見(jiàn),它最終調(diào)用了DAGScheduler.runJob()方法來(lái)運(yùn)行Job。它會(huì)將需要計(jì)算的RDD及其分區(qū)列表傳入,在計(jì)算完成后,將結(jié)果傳回給resultHandler回調(diào)方法。在運(yùn)行Job的同時(shí),還會(huì)對(duì)RDD本身保存其檢查點(diǎn)。關(guān)于DAGScheduler的細(xì)節(jié),在涉及調(diào)度邏輯時(shí)會(huì)深入了解。
SparkContext伴生對(duì)象
前文代碼#2.11里的createTaskScheduler()方法就來(lái)自SparkContext伴生對(duì)象。除了它之外,伴生對(duì)象主要用來(lái)跟蹤并維護(hù)SparkContext的創(chuàng)建與激活。
伴生對(duì)象中的屬性
代碼#4.6 - SparkContext伴生對(duì)象中的屬性
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)
private var contextBeingConstructed: Option[SparkContext] = None
這三個(gè)屬性都與SparkContext的創(chuàng)建過(guò)程相關(guān)。SPARK_CONTEXT_CONSTRUCTOR_LOCK是SparkContext構(gòu)造過(guò)程中使用的鎖對(duì)象,用來(lái)保證線(xiàn)程安全性。activeContext用于保存當(dāng)前活動(dòng)的SparkContext的原子引用。contextBeingConstructed用于保存當(dāng)前正在創(chuàng)建的SparkContext。
markPartiallyConstructed()方法
這個(gè)方法實(shí)際上在SparkContext主構(gòu)造方法的開(kāi)頭就被調(diào)用了,它將當(dāng)前的SparkContext標(biāo)記為正在創(chuàng)建。
代碼#4.7 - o.a.s.SparkContext.markPartiallyConstructed()方法
private[spark] def markPartiallyConstructed(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = Some(sc)
}
}
可見(jiàn),最終是調(diào)用了assertNoOtherContextIsRunning()方法。這是一個(gè)私有方法,它檢測(cè)當(dāng)前是否有多個(gè)SparkContext實(shí)例在運(yùn)行,并根據(jù)spark.driver.allowMultipleContexts參數(shù)的設(shè)置拋出異?;蜉敵鼍?。
setActiveContext()方法
與上面的方法相對(duì),它是在SparkContext主構(gòu)造方法的結(jié)尾處調(diào)用的,將當(dāng)前的SparkContext標(biāo)記為已激活。
代碼#4.8 - o.a.s.SparkContext.setActiveContext()方法
private[spark] def setActiveContext(
sc: SparkContext,
allowMultipleContexts: Boolean): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
activeContext.set(sc)
}
getOrCreate()方法
該方法是除new SparkContext()之外,另一種更好的創(chuàng)建SparkContext的途徑。它會(huì)檢查當(dāng)前有沒(méi)有已經(jīng)激活的SparkContext,如果有則直接復(fù)用,沒(méi)有的話(huà)再創(chuàng)建。
代碼#4.9 - o.a.s.SparkContext.getOrCreate()方法
def getOrCreate(config: SparkConf): SparkContext = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
} else {
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
}
}
activeContext.get()
}
}
總結(jié)
本文對(duì)SparkContext初始化邏輯之外剩下的一些邏輯做了簡(jiǎn)要介紹,包括SparkContext提供的其他功能,及其伴生對(duì)象中的一些細(xì)節(jié)。這樣,我們就對(duì)SparkContext有了相對(duì)全面的了解。
接下來(lái),我們會(huì)選擇幾個(gè)SparkContext組件初始化邏輯中涉及到的重要組件,對(duì)它們的實(shí)現(xiàn)機(jī)制加以分析。下一篇仍然計(jì)劃從基礎(chǔ)開(kāi)始講起,就是LiveListenerBus及以其為代表的事件總線(xiàn)。