如何閱讀源碼?
任何一個程序,或者一個框架,無論做什么,多么復雜,都會有唯一的入口。通過這個入口,能夠找到一條主線,這條主線就是這個程序或者框架的核心。
圍繞這條主線,追溯整個調(diào)用鏈路,就能發(fā)掘出框架中的核心抽象,將這些抽象的作用搞懂,同時將它們之間的關系通過uml表示出來,這樣,源碼的大體結構就一覽無余了。
源碼會涉及很多包,其實包本身就是對源碼的分類和抽象,這些包的功能也要弄清楚。源碼分為核心代碼和輔助代碼,比如io相關,屬于輔助部分。core包下面一般都是主流程下核心代碼。
理清主線,理清抽象概念,理清抽象之間的關系之后,再去看具體的方法和處理過程,相信,這樣閱讀起來就順暢的多。
還有一些比較難理解的點,比如處理某些特定問題時的思路和算法,或者是某種復雜的數(shù)據(jù)結構,需要著重挖掘。
源碼都是水平較高的程序員所寫,使用面向?qū)ο筮M行抽象,還會涉及到很多設計模式,所以讀源碼之前,對各種設計模式要有所了解。
另外最重要的基礎,其實是語言基礎。比如閱讀spark源碼,scala基礎一定要好,這樣在閱讀的過程中才不會被各種攔路虎攔住。
尋找入口,有兩種方式。
一是找到框架或者程序的二進制程序,然后追溯編譯該程序的類。
二是啟動之后閱讀日志,從日志中追溯主調(diào)用鏈。
以上。
以spark為例子。
通過spark-submit命令提交spark任務,能夠找到對應的啟動類org.apache.spark.deploy.SparkSubmit,其main方法如下
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
submit方法分成兩步,首先準備環(huán)境,設置類加載路徑(classpath),環(huán)境變量,啟動類,其次通過反射調(diào)用用戶的spark任務
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
用戶的任務中首先需要創(chuàng)建sparkSession,創(chuàng)建時候先查看當前線程中是否已有實例
/** The active SparkSession for the current thread. */
private val activeThreadSession = new InheritableThreadLocal[SparkSession]
def getOrCreate(): SparkSession = synchronized {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.conf.set(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
return session
}
}
如果沒有,使用默認defaultSession(從哪里來?),如果defaultSession不可用,創(chuàng)建。創(chuàng)建sparkSession前,首先創(chuàng)建sparkContext
// No active nor global default session. Create a new one.
val sparkContext = userSuppliedContext.getOrElse {
// set app name if not given
val randomAppName = java.util.UUID.randomUUID().toString
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }
if (!sparkConf.contains("spark.app.name")) {
sparkConf.setAppName(randomAppName)
}
val sc = SparkContext.getOrCreate(sparkConf)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }
if (!sc.conf.contains("spark.app.name")) {
sc.conf.setAppName(randomAppName)
}
sc
}
所以關鍵類是sparkContext,嗯,最核心的類了,先看看它有哪些核心成員變量。
private var _conf: SparkConf = _
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _ //提供spark任務運行時環(huán)境
private var _jobProgressListener: JobProgressListener = _ // 跟蹤task級的各種信息,用于展示在spark ui
private var _statusTracker: SparkStatusTracker = _ // 提供監(jiān)控job和stage的api,有些被應用到spark ui
private var _progressBar: Option[ConsoleProgressBar] = None // 用于在控制臺輸出stage進程信息
private var _ui: Option[SparkUI] = None
private var _hadoopConfiguration: Configuration = _
private var _executorMemory: Int = _
private var _schedulerBackend: SchedulerBackend = _ // 后臺調(diào)度進程
private var _taskScheduler: TaskScheduler = _ // 通過SchedulerBackend實現(xiàn)調(diào)度的task調(diào)度器
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _ // 面向stage的調(diào)度器,核心類,切分stage,構成DAG都在此類中完成
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None // 根據(jù)負載動態(tài)分配或者回收executor
private var _cleaner: Option[ContextCleaner] = None // 異步清理器,清除RDD,shuffle和broadcast的狀態(tài)
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
逐個過一下核心的成員變量。
sparkEnv提供spark任務的運行時環(huán)境,看一類的聲明
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf
)
(未完待續(xù))