Spark Core源碼精讀計(jì)劃#4:SparkContext提供的其他功能

目錄

前言

前面兩篇文章一直在講SparkContext初始化的內(nèi)部邏輯,除此之外,它也對(duì)外提供一部分其他功能,我們挑選幾個(gè)主要的來(lái)簡(jiǎn)要了解。SparkContext還有一個(gè)伴生對(duì)象,里面涉及到一些SparkContext創(chuàng)建的內(nèi)部機(jī)制。

本文就是SparkContext概況的收尾。在它的背后,還有形形色色的更加底層的邏輯等著我們?nèi)ヌ剿鳌?/p>

SparkContext提供的其他功能

生成RDD

在文章#0中,我們提到了生成RDD的兩種方法,一是對(duì)內(nèi)存中存在的數(shù)據(jù)執(zhí)行并行化(Parallelize)操作,二是從外部存儲(chǔ)中的數(shù)據(jù)源讀取。這兩類(lèi)方法都在SparkContext中。以下是parallelize()方法的代碼。

代碼#4.1 - o.a.s.SparkContext.parallelize()方法

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

該方法生成的RDD類(lèi)型為ParallelCollectionRDD。numSlices就是該RDD的分區(qū)數(shù),默認(rèn)值與TaskScheduler的Task并行度相同。這個(gè)方法非常簡(jiǎn)單,因此在Spark入門(mén)教程中經(jīng)常會(huì)用到它。

從外部數(shù)據(jù)源讀取并生成RDD的方法比較多,為了簡(jiǎn)潔,我們只看代碼#0.1中出現(xiàn)的textFile()方法。

代碼#4.2 - o.a.s.SparkContext.textFile()與hadoopFile()方法

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    FileSystem.getLocal(hadoopConfiguration)

    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

可見(jiàn),textFile()方法用TextInputFormat格式讀取HDFS上指定路徑的文件,生成HadoopRDD,再將其中的具體內(nèi)容用map()算子提取出來(lái)。HadoopRDD是一個(gè)Pair RDD,它內(nèi)部存儲(chǔ)的是二元組,如上面代碼中的(LongWritable, Text)二元組。

廣播變量

廣播變量是Spark兩種共享變量中的一種。所謂廣播,就是Driver直接向每個(gè)Worker節(jié)點(diǎn)發(fā)送同一份數(shù)據(jù)的只讀副本,而不像通常一樣通過(guò)Task來(lái)計(jì)算。廣播變量適合處理多節(jié)點(diǎn)跨Stage的共享數(shù)據(jù),特別是輸入數(shù)據(jù)量較大的集合,可以提高效率。

下面是broadcast()方法的源碼。它在上文代碼#4.2中已經(jīng)出現(xiàn)過(guò),用來(lái)廣播序列化過(guò)的Hadoop配置信息。

代碼#4.3 - o.a.s.SparkContext.broadcast()方法

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

廣播變量的產(chǎn)生依賴(lài)于Spark執(zhí)行環(huán)境里的廣播管理器BroadcastManager,因此在之后閱讀SparkEnv的源碼時(shí),會(huì)詳細(xì)分析廣播的內(nèi)部機(jī)制。

累加器

累加器與廣播變量一樣,也是Spark的共享變量。顧名思義,累加器就是一個(gè)能夠累積結(jié)果值的變量,最常見(jiàn)的用途是做計(jì)數(shù)。它在Driver端創(chuàng)建和讀取,Executor端(也就是各個(gè)Task)只能做累加操作。SparkContext已經(jīng)提供了數(shù)值型累加器的創(chuàng)建方法,如長(zhǎng)整型的LongAccumulator。

代碼#4.4 - o.a.s.SparkContext.longAccumulator()方法

  def longAccumulator: LongAccumulator = {
    val acc = new LongAccumulator
    register(acc)
    acc
  }

  def longAccumulator(name: String): LongAccumulator = {
    val acc = new LongAccumulator
    register(acc, name)
    acc
  }

所有累加器的基類(lèi)都是AccumulatorV2抽象類(lèi),我們也可以自定義其他類(lèi)型的累加器。特征AccumulatorParam則用于封裝累加器對(duì)應(yīng)的數(shù)據(jù)類(lèi)型及累加操作,在后面的文章中也會(huì)閱讀到與累加器相關(guān)的源碼。

運(yùn)行Job

SparkContext提供了很多種runJob()方法的重載來(lái)運(yùn)行一個(gè)Job,也就是觸發(fā)RDD動(dòng)作算子的執(zhí)行。歸根結(jié)底,所有runJob()方法的重載都會(huì)調(diào)用如下所示的邏輯。

代碼#4.5 - o.a.s.SparkContext.runJob()方法

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

可見(jiàn),它最終調(diào)用了DAGScheduler.runJob()方法來(lái)運(yùn)行Job。它會(huì)將需要計(jì)算的RDD及其分區(qū)列表傳入,在計(jì)算完成后,將結(jié)果傳回給resultHandler回調(diào)方法。在運(yùn)行Job的同時(shí),還會(huì)對(duì)RDD本身保存其檢查點(diǎn)。關(guān)于DAGScheduler的細(xì)節(jié),在涉及調(diào)度邏輯時(shí)會(huì)深入了解。

SparkContext伴生對(duì)象

前文代碼#2.11里的createTaskScheduler()方法就來(lái)自SparkContext伴生對(duì)象。除了它之外,伴生對(duì)象主要用來(lái)跟蹤并維護(hù)SparkContext的創(chuàng)建與激活。

伴生對(duì)象中的屬性

代碼#4.6 - SparkContext伴生對(duì)象中的屬性

  private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

  private val activeContext: AtomicReference[SparkContext] =
    new AtomicReference[SparkContext](null)

  private var contextBeingConstructed: Option[SparkContext] = None

這三個(gè)屬性都與SparkContext的創(chuàng)建過(guò)程相關(guān)。SPARK_CONTEXT_CONSTRUCTOR_LOCK是SparkContext構(gòu)造過(guò)程中使用的鎖對(duì)象,用來(lái)保證線(xiàn)程安全性。activeContext用于保存當(dāng)前活動(dòng)的SparkContext的原子引用。contextBeingConstructed用于保存當(dāng)前正在創(chuàng)建的SparkContext。

markPartiallyConstructed()方法

這個(gè)方法實(shí)際上在SparkContext主構(gòu)造方法的開(kāi)頭就被調(diào)用了,它將當(dāng)前的SparkContext標(biāo)記為正在創(chuàng)建。

代碼#4.7 - o.a.s.SparkContext.markPartiallyConstructed()方法

  private[spark] def markPartiallyConstructed(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = Some(sc)
    }
  }

可見(jiàn),最終是調(diào)用了assertNoOtherContextIsRunning()方法。這是一個(gè)私有方法,它檢測(cè)當(dāng)前是否有多個(gè)SparkContext實(shí)例在運(yùn)行,并根據(jù)spark.driver.allowMultipleContexts參數(shù)的設(shè)置拋出異?;蜉敵鼍?。

setActiveContext()方法

與上面的方法相對(duì),它是在SparkContext主構(gòu)造方法的結(jié)尾處調(diào)用的,將當(dāng)前的SparkContext標(biāo)記為已激活。

代碼#4.8 - o.a.s.SparkContext.setActiveContext()方法

  private[spark] def setActiveContext(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = None
      activeContext.set(sc)
    }

getOrCreate()方法

該方法是除new SparkContext()之外,另一種更好的創(chuàng)建SparkContext的途徑。它會(huì)檢查當(dāng)前有沒(méi)有已經(jīng)激活的SparkContext,如果有則直接復(fù)用,沒(méi)有的話(huà)再創(chuàng)建。

代碼#4.9 - o.a.s.SparkContext.getOrCreate()方法

  def getOrCreate(config: SparkConf): SparkContext = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      if (activeContext.get() == null) {
        setActiveContext(new SparkContext(config), allowMultipleContexts = false)
      } else {
        if (config.getAll.nonEmpty) {
          logWarning("Using an existing SparkContext; some configuration may not take effect.")
        }
      }
      activeContext.get()
    }
  }

總結(jié)

本文對(duì)SparkContext初始化邏輯之外剩下的一些邏輯做了簡(jiǎn)要介紹,包括SparkContext提供的其他功能,及其伴生對(duì)象中的一些細(xì)節(jié)。這樣,我們就對(duì)SparkContext有了相對(duì)全面的了解。

接下來(lái),我們會(huì)選擇幾個(gè)SparkContext組件初始化邏輯中涉及到的重要組件,對(duì)它們的實(shí)現(xiàn)機(jī)制加以分析。下一篇仍然計(jì)劃從基礎(chǔ)開(kāi)始講起,就是LiveListenerBus及以其為代表的事件總線(xiàn)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容