Spark源碼之Worker

Spark源碼之Worker介紹篇

Worker介紹

Worker作為工作節(jié)點,一般Driver以及Executor都會在這Worker上分布;

Worker代碼概覽

  1. Worker繼承了ThreadSafeRpcEndpoint,所以本身就是一個消息循環(huán)體,可以直接跟其他組件進行通信;
  2. 內(nèi)部封裝一堆數(shù)據(jù)結(jié)構(gòu),用于記錄存儲Driver,Executor,Application等信息;
  3. Worker內(nèi)部對自身的資源維護;
  4. 與其他組件通信的通信結(jié)構(gòu);
  5. 與Master之間的協(xié)助等
    見下面源碼:
//todo 繼承ThreadSafeRpcEndpoint消息循環(huán)體
private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    systemName: String,
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging {


  //TODO 用戶存儲數(shù)據(jù)結(jié)構(gòu)
  var workDir: File = null
  val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
  val drivers = new HashMap[String, DriverRunner]
  val executors = new HashMap[String, ExecutorRunner]
  val finishedDrivers = new LinkedHashMap[String, DriverRunner]
  val appDirectories = new HashMap[String, Seq[String]]
  val finishedApps = new HashSet[String]


  //TODO 內(nèi)部的資源維護
  var coresUsed = 0
  var memoryUsed = 0

  def coresFree: Int = cores - coresUsed
  def memoryFree: Int = memory - memoryUsed
  
  //通信結(jié)構(gòu)
  override def receive: PartialFunction[Any, Unit] = synchronized(......)
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {......}
  
  //與Master之間協(xié)作
  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {......}
  private def tryRegisterAllMasters(): Array[JFuture[_]] = {......}
  private def reregisterWithMaster(): Unit = {......}
  ......
  

Worker內(nèi)部分析

還會從Woker的初始化和啟動開始,如下代碼所示:

  //在Worker的伴生對象中
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }

再看Worker的啟動onStart()方法,因為Worker是主動向Master注冊的,所以在WorKer啟動的方法內(nèi)就直接向Master注冊;

  override def onStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    createWorkDir()
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
    //TODO 向master注冊
    registerWithMaster()

    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }

進入registerWithMaster() 方法,

  private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        registered = false
        registerMasterFutures = tryRegisterAllMasters()
        connectionAttemptCount = 0
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }

繼續(xù)往下走,進入tryRegisterAllMasters(),為什么會有tryRegisterAllMasters()方法呢?因為如何Master是HA的情況下就會出現(xiàn)多個Master,所以Worker要將它的信息注冊給每個Master,在下面的代碼中可見,在Worker里用了一個線程池registerMasterThreadPool生成一條線程去與Master通信;

  private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    masterRpcAddresses.map { masterAddress =>
      registerMasterThreadPool.submit(new Runnable {
        override def run(): Unit = {
          try {
            logInfo("Connecting to master " + masterAddress + "...")
            val masterEndpoint =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            registerWithMaster(masterEndpoint)
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        }
      })
    }
  }

繼續(xù)進入registerWithMaster()方法,在這個方法內(nèi)你會看到具體想Master發(fā)送的注冊請求,以及對請求的響應(yīng)狀態(tài)的處理;
Worker向Master注冊,在[Spark源碼之Master中已經(jīng)詳細敘述過],這里就不再累贅;

  private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

進入handleRegisterResponse()方法,看下具體是如何處理這些Worker向Master注冊后的事件;
這個方法里處理的事件:

  1. Worker向Master注冊成功,在worker注冊成功后會調(diào)用changeMaster(masterRef, masterWebUiUrl)將當前的Master
    信息保存在Woker內(nèi)部的數(shù)據(jù)結(jié)構(gòu)中;
  2. Worker向Master注冊失敗;
  3. Master出現(xiàn)StandBy的情況;
  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
      case RegisteredWorker(masterRef, masterWebUiUrl) =>
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        registered = true
        changeMaster(masterRef, masterWebUiUrl)
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
        if (CLEANUP_ENABLED) {
          logInfo(
            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(WorkDirCleanup)
            }
          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
        }

      case RegisterWorkerFailed(message) =>
        if (!registered) {
          logError("Worker registration failed: " + message)
          System.exit(1)
        }

      case MasterInStandby =>
        // Ignore. Master not yet ready.
    }
  }
  private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
    // activeMasterUrl it's a valid Spark url since we receive it from master.
    activeMasterUrl = masterRef.address.toSparkURL
    activeMasterWebUiUrl = uiUrl
    master = Some(masterRef)
    connected = true
    // Cancel any outstanding re-registration attempts because we found a new master
    cancelLastRegistrationRetry()
  }

Woker的其他工作

在Worker還維護著Driver和Executor的變化,如下代碼所示:

  private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {......}
  private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged): Unit = {......}
  ......
  

Worker的源碼內(nèi)容敘述完畢!Worker中的Driver和Executor分析流程部分可參看下圖:

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容