Spark源碼分析:SparkContext初始化

Spark源碼分析:SparkContext初始化

1.Spark怎么運(yùn)行?

spark一般從spark-shell開(kāi)始,

具體調(diào)用層次關(guān)系:

Spark-shell->spark submit->real.main->sparkLoop—>createSparkcontext

2.Spark初始化綜述

Spark初始化主要涉及一下內(nèi)容:

sparkenv. taskscheduler DAGScheduler webui

  • No1

    通過(guò)sparkconf來(lái)構(gòu)建sparkenv, sparkenv主要包含blockmanager mapoutputtracker shufflefetcher connectionmanager

    SparkEnv構(gòu)造函數(shù)如下:

    class SparkEnv (
        val executorId: String,
        private[spark] val rpcEnv: RpcEnv,
        val serializer: Serializer,
        val closureSerializer: Serializer,
        val serializerManager: SerializerManager,
        val mapOutputTracker: MapOutputTracker,//用來(lái)緩存mapstatus信息
        val shuffleManager: ShuffleManager,//路由維護(hù)表
        val broadcastManager: BroadcastManager,//廣播
        val blockManager: BlockManager,//塊管理
        val securityManager: SecurityManager,//安全管理
        val metricsSystem: MetricsSystem,//測(cè)量
        val memoryManager: MemoryManager,
        val outputCommitCoordinator: OutputCommitCoordinator,
        val conf: SparkConf) extends Logging 
    
  • No2

    創(chuàng)建taskscheduler。根據(jù)運(yùn)行模式選擇schedulerbackend,同時(shí)啟動(dòng)taskscheduler。

    /**
     * Create a task scheduler based on a given master URL.
     * Return a 2-tuple of the scheduler backend and the task scheduler.
     */
    private def createTaskScheduler(
        sc: SparkContext,
        master: String,
        deployMode: String): (SchedulerBackend, TaskScheduler) = {
      import SparkMasterRegex._
    
      // When running locally, don't try to re-execute tasks on failure.
      val MAX_LOCAL_TASK_FAILURES = 1
    
      master match {
        case "local" =>
          val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
          val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
          scheduler.initialize(backend)
          (backend, scheduler)
    
        case LOCAL_N_REGEX(threads) =>
          def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
          // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
          val threadCount = if (threads == "*") localCpuCount else threads.toInt
          if (threadCount <= 0) {
            throw new SparkException(s"Asked to run locally with $threadCount threads")
          }
          val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
          val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
          scheduler.initialize(backend)
          (backend, scheduler)
    
        case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
          def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
          // local[*, M] means the number of cores on the computer with M failures
          // local[N, M] means exactly N threads with M failures
          val threadCount = if (threads == "*") localCpuCount else threads.toInt
          val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
          val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
          scheduler.initialize(backend)
          (backend, scheduler)
    
        case SPARK_REGEX(sparkUrl) =>
          val scheduler = new TaskSchedulerImpl(sc)
          val masterUrls = sparkUrl.split(",").map("spark://" + _)
          val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
          scheduler.initialize(backend)
          (backend, scheduler)
    
        case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
          // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
          val memoryPerSlaveInt = memoryPerSlave.toInt
          if (sc.executorMemory > memoryPerSlaveInt) {
            throw new SparkException(
              "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                memoryPerSlaveInt, sc.executorMemory))
          }
    
          val scheduler = new TaskSchedulerImpl(sc)
          val localCluster = new LocalSparkCluster(
            numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
          val masterUrls = localCluster.start()
          val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
          scheduler.initialize(backend)
          backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
            localCluster.stop()
          }
          (backend, scheduler)
    
        case masterUrl =>
          val cm = getClusterManager(masterUrl) match {
            case Some(clusterMgr) => clusterMgr
            case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
          }
          try {
            val scheduler = cm.createTaskScheduler(sc, masterUrl)
            val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
            cm.initialize(scheduler, backend)
            (backend, scheduler)
          } catch {
            case se: SparkException => throw se
            case NonFatal(e) =>
              throw new SparkException("External scheduler cannot be instantiated", e)
          }
      }
    }
    
    def initialize(backend: SchedulerBackend) {
        this.backend = backend
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
              s"$schedulingMode")
          }
        }
        schedulableBuilder.buildPools()
      }
    
    
  • No3

    根據(jù)taskscheduler實(shí)例創(chuàng)建dagscheduler。

    class DAGScheduler(
        private[scheduler] val sc: SparkContext,
        private[scheduler] val taskScheduler: TaskScheduler,
        listenerBus: LiveListenerBus,
        mapOutputTracker: MapOutputTrackerMaster,
        blockManagerMaster: BlockManagerMaster,
        env: SparkEnv,
        clock: Clock = new SystemClock())
      extends Logging {
    
      def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
        this(
          sc,
          taskScheduler,
          sc.listenerBus,
          sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
          sc.env.blockManager.master,
          sc.env)
      }
    
  • No4

    啟動(dòng)webUI。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容