原文鏈接:
https://www.cnblogs.com/hseagle/p/3664933.html
楔子
源碼閱讀是一件非常容易的事,也是一件非常難的事。容易的是代碼就在那里,一打開就可以看到。難的是要通過代碼明白作者當(dāng)初為什么要這樣設(shè)計(jì),設(shè)計(jì)之初要解決的主要問題是什么。
在對Spark的源碼進(jìn)行具體的走讀之前,如果想要快速對Spark的有一個(gè)整體性的認(rèn)識,閱讀Matei Zaharia做的Spark論文是一個(gè)非常不錯(cuò)的選擇。
在閱讀該論文的基礎(chǔ)之上,再結(jié)合Spark作者在2012 Developer Meetup上做的演講Introduction to Spark Internals,那么對于Spark的內(nèi)部實(shí)現(xiàn)會有一個(gè)比較大概的了解。
有了上述的兩篇文章奠定基礎(chǔ)之后,再來進(jìn)行源碼閱讀,那么就會知道分析的重點(diǎn)及難點(diǎn)。
基本概念(Basic Concepts)
RDD - resillient distributed dataset 彈性分布式數(shù)據(jù)集
Operation - 作用于RDD的各種操作分為transformation和action
Job - 作業(yè),一個(gè)JOB包含多個(gè)RDD及作用于相應(yīng)RDD上的各種operation
Stage - 一個(gè)作業(yè)分為多個(gè)階段
Partition - 數(shù)據(jù)分區(qū), 一個(gè)RDD中的數(shù)據(jù)可以分成多個(gè)不同的區(qū)
DAG - Directed Acycle graph, 有向無環(huán)圖,反應(yīng)RDD之間的依賴關(guān)系
Narrow dependency - 窄依賴,子RDD依賴于父RDD中固定的data partition
Wide Dependency - 寬依賴,子RDD對父RDD中的所有data partition都有依賴
Caching Managenment -- 緩存管理,對RDD的中間計(jì)算結(jié)果進(jìn)行緩存管理以加快整體的處理速度
編程模型(Programming Model)
RDD是只讀的數(shù)據(jù)分區(qū)集合,注意是數(shù)據(jù)集。
作用于RDD上的Operation分為transformantion和action。 經(jīng)Transformation處理之后,數(shù)據(jù)集中的內(nèi)容會發(fā)生更改,由數(shù)據(jù)集A轉(zhuǎn)換成為數(shù)據(jù)集B;而經(jīng)Action處理之后,數(shù)據(jù)集中的內(nèi)容會被歸約為一個(gè)具體的數(shù)值。
只有當(dāng)RDD上有action時(shí),該RDD及其父RDD上的所有operation才會被提交到cluster中真正的被執(zhí)行。
從代碼到動(dòng)態(tài)運(yùn)行,涉及到的組件如下圖所示。

演示代碼

valsc =newSparkContext("Spark://...","MyJob", home, jars)valfile = sc.textFile("hdfs://...")valerrors = file.filter(_.contains("ERROR"))errors.cache()errors.count()
運(yùn)行態(tài)(Runtime view)
不管什么樣的靜態(tài)模型,其在動(dòng)態(tài)運(yùn)行的時(shí)候無外乎由進(jìn)程,線程組成。
用Spark的術(shù)語來說,static view稱為dataset view,而dynamic view稱為parition view. 關(guān)系如圖所示

在Spark中的task可以對應(yīng)于線程,worker是一個(gè)個(gè)的進(jìn)程,worker由driver來進(jìn)行管理。
那么問題來了,這一個(gè)個(gè)的task是如何從RDD演變過來的呢?下節(jié)將詳細(xì)回答這個(gè)問題。
部署(Deployment view)
當(dāng)有Action作用于某RDD時(shí),該action會作為一個(gè)job被提交。
在提交的過程中,DAGScheduler模塊介入運(yùn)算,計(jì)算RDD之間的依賴關(guān)系。RDD之間的依賴關(guān)系就形成了DAG。
每一個(gè)JOB被分為多個(gè)stage,劃分stage的一個(gè)主要依據(jù)是當(dāng)前計(jì)算因子的輸入是否是確定的,如果是則將其分在同一個(gè)stage,避免多個(gè)stage之間的消息傳遞開銷。
當(dāng)stage被提交之后,由taskscheduler來根據(jù)stage來計(jì)算所需要的task,并將task提交到對應(yīng)的worker.
Spark支持以下幾種部署模式1)standalone 2)Mesos 3) yarn. 這些部署模式將作為taskscheduler的初始化入?yún)ⅰ?/p>

RDD接口(RDD Interface)
RDD由以下幾個(gè)主要部分組成
partitions --??? partition集合,一個(gè)RDD中有多少data partition
dependencies -- RDD依賴關(guān)系
compute(parition) -- 對于給定的數(shù)據(jù)集,需要作哪些計(jì)算
preferredLocations --? 對于data partition的位置偏好
partitioner -- 對于計(jì)算出來的數(shù)據(jù)結(jié)果如何分發(fā)
緩存機(jī)制(caching)
RDD的中間計(jì)算結(jié)果可以被緩存起來,緩存先選Memory,如果Memory不夠的話,將會被寫入到磁盤中。
根據(jù)LRU(last-recent update)來決定哪先內(nèi)容繼續(xù)保存在內(nèi)存,哪些保存到磁盤。
容錯(cuò)性(Fault-tolerant)
從最初始的RDD到衍生出來的最后一個(gè)RDD,中間要經(jīng)過一系列的處理。那么如何處理中間環(huán)節(jié)出現(xiàn)錯(cuò)誤的場景呢?
Spark提供的解決方案是只對失效的data partition進(jìn)行事件重演,而無須對整個(gè)數(shù)據(jù)全集進(jìn)行事件重演,這樣可以大大加快場景恢復(fù)的開銷。
RDD又是如何知道自己的data partition的number該是多少?如果是hdfs文件,那么hdfs文件的block將會成為一個(gè)重要的計(jì)算依據(jù)。
集群管理(cluster management)
task運(yùn)行在cluster之上,除了spark自身提供的standalone部署模式之外,spark還內(nèi)在支持yarn和mesos.
Yarn來負(fù)責(zé)計(jì)算資源的調(diào)度和監(jiān)控,根據(jù)監(jiān)控結(jié)果來重啟失效的task或者是重新distributed task一旦有新的node加入cluster的話。
這一部分的內(nèi)容需要參考yarn的文檔。
小結(jié)
在源碼閱讀時(shí),需要重點(diǎn)把握以下兩大主線。
靜態(tài)view?即 RDD, transformation and action
動(dòng)態(tài)view?即?life of a job, 每一個(gè)job又分為多個(gè)stage,每一個(gè)stage中可以包含多個(gè)rdd及其transformation,這些stage又是如何映射成為task被distributed到cluster中
參考資料(reference)
Introduction to Spark Internals?http://files.meetup.com/3138542/dev-meetup-dec-2012.pptx
Resilient Distributed Datasets: A Fault-tolerant Abstraction for In-Memory Cluster Computing?https://www.usenix.org/system/files/.../nsdi12-final138.pdf
Lightning-Fast Cluster Computing with Spark and Shark?http://www.meetup.com/TriHUG/events/112474102/