spark源碼解析

如何閱讀源碼?
任何一個程序,或者一個框架,無論做什么,多么復雜,都會有唯一的入口。通過這個入口,能夠找到一條主線,這條主線就是這個程序或者框架的核心。
圍繞這條主線,追溯整個調(diào)用鏈路,就能發(fā)掘出框架中的核心抽象,將這些抽象的作用搞懂,同時將它們之間的關系通過uml表示出來,這樣,源碼的大體結構就一覽無余了。
源碼會涉及很多包,其實包本身就是對源碼的分類和抽象,這些包的功能也要弄清楚。源碼分為核心代碼和輔助代碼,比如io相關,屬于輔助部分。core包下面一般都是主流程下核心代碼。
理清主線,理清抽象概念,理清抽象之間的關系之后,再去看具體的方法和處理過程,相信,這樣閱讀起來就順暢的多。
還有一些比較難理解的點,比如處理某些特定問題時的思路和算法,或者是某種復雜的數(shù)據(jù)結構,需要著重挖掘。
源碼都是水平較高的程序員所寫,使用面向?qū)ο筮M行抽象,還會涉及到很多設計模式,所以讀源碼之前,對各種設計模式要有所了解。
另外最重要的基礎,其實是語言基礎。比如閱讀spark源碼,scala基礎一定要好,這樣在閱讀的過程中才不會被各種攔路虎攔住。

尋找入口,有兩種方式。
一是找到框架或者程序的二進制程序,然后追溯編譯該程序的類。
二是啟動之后閱讀日志,從日志中追溯主調(diào)用鏈。

以上。

以spark為例子。
通過spark-submit命令提交spark任務,能夠找到對應的啟動類org.apache.spark.deploy.SparkSubmit,其main方法如下

  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

submit方法分成兩步,首先準備環(huán)境,設置類加載路徑(classpath),環(huán)境變量,啟動類,其次通過反射調(diào)用用戶的spark任務

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
try {
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }

用戶的任務中首先需要創(chuàng)建sparkSession,創(chuàng)建時候先查看當前線程中是否已有實例

  /** The active SparkSession for the current thread. */
  private val activeThreadSession = new InheritableThreadLocal[SparkSession]
def getOrCreate(): SparkSession = synchronized {
        // Get the session from current thread's active session.
      var session = activeThreadSession.get()
      if ((session ne null) && !session.sparkContext.isStopped) {
        options.foreach { case (k, v) => session.conf.set(k, v) }
        if (options.nonEmpty) {
          logWarning("Using an existing SparkSession; some configuration may not take effect.")
        }
        return session
      }
}

如果沒有,使用默認defaultSession(從哪里來?),如果defaultSession不可用,創(chuàng)建。創(chuàng)建sparkSession前,首先創(chuàng)建sparkContext

// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
  // set app name if not given
  val randomAppName = java.util.UUID.randomUUID().toString
  val sparkConf = new SparkConf()
  options.foreach { case (k, v) => sparkConf.set(k, v) }
  if (!sparkConf.contains("spark.app.name")) {
    sparkConf.setAppName(randomAppName)
  }
  val sc = SparkContext.getOrCreate(sparkConf)
  // maybe this is an existing SparkContext, update its SparkConf which maybe used
  // by SparkSession
  options.foreach { case (k, v) => sc.conf.set(k, v) }
  if (!sc.conf.contains("spark.app.name")) {
    sc.conf.setAppName(randomAppName)
  }
  sc
}

所以關鍵類是sparkContext,嗯,最核心的類了,先看看它有哪些核心成員變量。

private var _conf: SparkConf = _
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _  //提供spark任務運行時環(huán)境
private var _jobProgressListener: JobProgressListener = _  // 跟蹤task級的各種信息,用于展示在spark ui
private var _statusTracker: SparkStatusTracker = _  // 提供監(jiān)控job和stage的api,有些被應用到spark ui
private var _progressBar: Option[ConsoleProgressBar] = None  // 用于在控制臺輸出stage進程信息
private var _ui: Option[SparkUI] = None
private var _hadoopConfiguration: Configuration = _
private var _executorMemory: Int = _
private var _schedulerBackend: SchedulerBackend = _  // 后臺調(diào)度進程
private var _taskScheduler: TaskScheduler = _  // 通過SchedulerBackend實現(xiàn)調(diào)度的task調(diào)度器
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _  // 面向stage的調(diào)度器,核心類,切分stage,構成DAG都在此類中完成
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None  // 根據(jù)負載動態(tài)分配或者回收executor
private var _cleaner: Option[ContextCleaner] = None  // 異步清理器,清除RDD,shuffle和broadcast的狀態(tài)
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _

逐個過一下核心的成員變量。
sparkEnv提供spark任務的運行時環(huán)境,看一類的聲明

  class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val serializerManager: SerializerManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleManager: ShuffleManager,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val metricsSystem: MetricsSystem,
    val memoryManager: MemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf
)

(未完待續(xù))

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容