【Spark】SparkContext源碼解讀

SparkContext的初始化

SparkContext是應(yīng)用啟動(dòng)時(shí)創(chuàng)建的Spark上下文對(duì)象,是進(jìn)行Spark應(yīng)用開(kāi)發(fā)的主要接口,是Spark上層應(yīng)用與底層實(shí)現(xiàn)的中轉(zhuǎn)站(SparkContext負(fù)責(zé)給executors發(fā)送task)。
SparkContext在初始化過(guò)程中,主要涉及一下內(nèi)容:

  • SparkEnv
  • DAGScheduler
  • TaskScheduler
  • SchedulerBackend
  • SparkUI

生成SparkConf

SparkContext的構(gòu)造函數(shù)中最重要的入?yún)⑹荢parkConf。SparkContext進(jìn)行初始化的時(shí)候,首先要根據(jù)初始化入?yún)?lái)構(gòu)建SparkConf對(duì)象,進(jìn)而再去創(chuàng)建SparkEnv。



創(chuàng)建SparkConf對(duì)象來(lái)管理spark應(yīng)用的屬性設(shè)置。SparkConf類比較簡(jiǎn)單,是通過(guò)一個(gè)HashMap容器來(lái)管理key、value類型的屬性。
下圖為SparkConf類聲明,其中setting變量為HashMap容器:



下面是SparkContext類中,關(guān)于SparkConf對(duì)象的拷貝過(guò)程:

創(chuàng)建LiveListenerBus監(jiān)聽(tīng)器

這是典型的觀察者模式,向LiveListenerBus類注冊(cè)不同類型的SparkListenerEvent事件,SparkListenerBus會(huì)遍歷它的所有監(jiān)聽(tīng)者SparkListener,然后找出事件對(duì)應(yīng)的接口進(jìn)行響應(yīng)。


下面是SparkContext創(chuàng)建LiveListenerBus對(duì)象:

  // An asynchronous listener bus for Spark events
  private[spark] val listenerBus = new LiveListenerBus

創(chuàng)建SparkEnv運(yùn)行環(huán)境

在SparkEnv中創(chuàng)建了MapOutputTracker、MasterActor、BlockManager、CacheManager、HttpFileServer一系列對(duì)象。
下圖為生成SparkEnv的代碼:


SparkEnv的構(gòu)造函數(shù)入?yún)⒘斜頌椋?/p>

class SparkEnv (
    val executorId: String,
    val actorSystem: ActorSystem,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val cacheManager: CacheManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleManager: ShuffleManager,
    val broadcastManager: BroadcastManager,
    val blockTransferService: BlockTransferService,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val httpFileServer: HttpFileServer,
    val sparkFilesDir: String,
    val metricsSystem: MetricsSystem,
    val shuffleMemoryManager: ShuffleMemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf) extends Logging

這里說(shuō)明幾個(gè)入?yún)⒌淖饔茫?/p>

  • cacheManager: 用于存儲(chǔ)中間計(jì)算結(jié)果
  • mapOutputTracker: 用來(lái)緩存MapStatus信息,并提供從MapOutputMaster獲取信息的功能
  • shuffleManager: 路由維護(hù)表
  • broadcastManager: 廣播
  • blockManager: 塊管理
  • securityManager: 安全管理
  • httpFileServer: 文件存儲(chǔ)服務(wù)器
    *l sparkFilesDir: 文件存儲(chǔ)目錄
  • metricsSystem: 測(cè)量
  • conf: 配置文件

創(chuàng)建SparkUI

下面是SparkContext初始化SparkUI的代碼:


其中,在SparkUI對(duì)象初始化函數(shù)中,注冊(cè)了StorageStatusListener監(jiān)聽(tīng)器,負(fù)責(zé)監(jiān)聽(tīng)Storage的變化及時(shí)的展示到Spark web頁(yè)面上。attachTab方法中添加對(duì)象正是我們?cè)赟park Web頁(yè)面中看到的那個(gè)標(biāo)簽。

  /** Initialize all components of the server. */
  def initialize() {
    attachTab(new JobsTab(this))
    val stagesTab = new StagesTab(this)
    attachTab(stagesTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
  }

創(chuàng)建TaskScheduler和DAGScheduler并啟動(dòng)運(yùn)行

在SparkContext中, 最主要的初始化工作就是創(chuàng)建TaskScheduler和DAGScheduler, 這兩個(gè)就是Spark的核心所在。

Spark的設(shè)計(jì)非常的干凈, 把整個(gè)DAG抽象層從實(shí)際的task執(zhí)行中剝離了出來(lái)DAGScheduler, 負(fù)責(zé)解析spark命令,生成stage, 形成DAG, 最終劃分成tasks, 提交給TaskScheduler, 他只完成靜態(tài)分析TaskScheduler,專門(mén)負(fù)責(zé)task執(zhí)行, 他只負(fù)責(zé)資源管理, task分配, 執(zhí)行情況的報(bào)告。
這樣設(shè)計(jì)的好處, 就是Spark可以通過(guò)提供不同的TaskScheduler簡(jiǎn)單的支持各種資源調(diào)度和執(zhí)行平臺(tái)

下面代碼是根據(jù)Spark的運(yùn)行模式來(lái)選擇相應(yīng)的SchedulerBackend,同時(shí)啟動(dòng)TaskScheduler:


其中,createTaskScheduler最為關(guān)鍵的一點(diǎn)就是根據(jù)master變量來(lái)判斷Spark當(dāng)前的部署方式,進(jìn)而生成相應(yīng)的SchedulerBackend的不同子類。創(chuàng)建的SchedulerBackend放置在TaskScheduler中,在后續(xù)的Task分發(fā)過(guò)程中扮演著重要角色。

TaskScheduler.start的目的是啟動(dòng)相應(yīng)的SchedulerBackend,并啟動(dòng)定時(shí)器進(jìn)行檢測(cè),下面是該函數(shù)源碼(定義在TaskSchedulerImpl.scala文件中):

  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
  }

添加EventLoggingListener監(jiān)聽(tīng)器

這個(gè)默認(rèn)是關(guān)閉的,可以通過(guò)spark.eventLog.enabled配置開(kāi)啟。它主要功能是以json格式記錄發(fā)生的事件:

  // Optionally log Spark events
  private[spark] val eventLogger: Option[EventLoggingListener] = {
    if (isEventLogEnabled) {
      val logger =
        new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
      logger.start()
      listenerBus.addListener(logger)
      Some(logger)
    } else None
  }

加入SparkListenerEvent事件

往LiveListenerBus中加入了SparkListenerEnvironmentUpdate、SparkListenerApplicationStart兩類事件,對(duì)這兩種事件監(jiān)聽(tīng)的監(jiān)聽(tīng)器就會(huì)調(diào)用onEnvironmentUpdate、onApplicationStart方法進(jìn)行處理。

  setupAndStartListenerBus()
  postEnvironmentUpdate()
  postApplicationStart()

SparkContext類中的關(guān)鍵函數(shù)

textFile

要載入被處理的數(shù)據(jù), 最常用的textFile, 其實(shí)就是生成HadoopRDD, 作為起始的RDD

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }


    /** Get an RDD for a Hadoop file with an arbitrary InputFormat
   *
   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
   * operation will create many references to the same object.
   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
   * copy them using a `map` function.
   */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

runJob

關(guān)鍵在于調(diào)用了dagScheduler.runJob

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

說(shuō)明

以上的源碼解讀基于spark-1.3.1源代碼工程文件

轉(zhuǎn)載請(qǐng)注明作者Jason Ding及其出處
GitCafe博客主頁(yè)(http://jasonding1354.gitcafe.io/)
Github博客主頁(yè)(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡(jiǎn)書(shū)主頁(yè)(http://www.itdecent.cn/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進(jìn)入我的博客主頁(yè)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容