3.2 彈性分布式數(shù)據(jù)集
本節(jié)簡單介紹RDD,并介紹RDD與分布式共享內(nèi)存的異同。
3.2.1 RDD簡介
在集群背后,有一個非常重要的分布式數(shù)據(jù)架構(gòu),即彈性分布式數(shù)據(jù)集(resilient distributed dataset,RDD),它是邏輯集中的實體,在集群中的多臺機器上進行了數(shù)據(jù)分區(qū)。通過對多臺機器上不同RDD分區(qū)的控制,就能夠減少機器之間的數(shù)據(jù)重排(data shuffling)。Spark提供了“partitionBy”運算符,能夠通過集群中多臺機器之間對原始RDD進行數(shù)據(jù)再分配來創(chuàng)建一個新的RDD。RDD是Spark的核心數(shù)據(jù)結(jié)構(gòu),通過RDD的依賴關(guān)系形成Spark的調(diào)度順序。通過對RDD的操作形成整個Spark程序。
(1)RDD的兩種創(chuàng)建方式
1)從Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲系統(tǒng),如Hive、Cassandra、Hbase)輸入(如HDFS)創(chuàng)建。
2)從父RDD轉(zhuǎn)換得到新的RDD。
(2)RDD的兩種操作算子
對于RDD可以有兩種計算操作算子:Transformation(變換)與Action(行動)。
1)Transformation(變換)。
Transformation操作是延遲計算的,也就是說從一個RDD轉(zhuǎn)換生成另一個RDD的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有Actions操作時,才真正觸發(fā)運算。
2)Action(行動)
Action算子會觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出到Spark系統(tǒng)。
(3)RDD的重要內(nèi)部屬性
1)分區(qū)列表。
2)計算每個分片的函數(shù)。
3)對父RDD的依賴列表。
4)對Key-Value 對數(shù)據(jù)類型RDD的分區(qū)器,控制分區(qū)策略和分區(qū)數(shù)。
5)每個數(shù)據(jù)分區(qū)的地址列表(如HDFS上的數(shù)據(jù)塊的地址)。
3.2.2 RDD與分布式共享內(nèi)存的異同
RDD是一種分布式的內(nèi)存抽象,表3-1列出了RDD與分布式共享內(nèi)存(Distributed Shared Memory,DSM)的對比。在DSM系統(tǒng)[插圖]中,應(yīng)用可以向全局地址空間的任意位置進行讀寫操作。DSM是一種通用的內(nèi)存數(shù)據(jù)抽象,但這種通用性同時也使其在商用集群上實現(xiàn)有效的容錯性和一致性更加困難。
RDD與DSM主要區(qū)別在于[插圖],不僅可以通過批量轉(zhuǎn)換創(chuàng)建(即“寫”)RDD,還可以對任意內(nèi)存位置讀寫。RDD限制應(yīng)用執(zhí)行批量寫操作,這樣有利于實現(xiàn)有效的容錯。特別是,由于RDD可以使用Lineage(血統(tǒng))來恢復(fù)分區(qū),基本沒有檢查點開銷。失效時只需要重新計算丟失的那些RDD分區(qū),就可以在不同節(jié)點上并行執(zhí)行,而不需要回滾(Roll Back)整個程序。
表3-1 RDD與DSM的對比
[插圖]
通過備份任務(wù)的復(fù)制,RDD還可以處理落后任務(wù)(即運行很慢的節(jié)點),這點與MapReduce類似,DSM則難以實現(xiàn)備份任務(wù),因為任務(wù)及其副本均需讀寫同一個內(nèi)存位置的數(shù)據(jù)。
與DSM相比,RDD模型有兩個優(yōu)勢。第一,對于RDD中的批量操作,運行時將根據(jù)數(shù)據(jù)存放的位置來調(diào)度任務(wù),從而提高性能。第二,對于掃描類型操作,如果內(nèi)存不足以緩存整個RDD,就進行部分緩存,將內(nèi)存容納不下的分區(qū)存儲到磁盤上。
另外,RDD支持粗粒度和細(xì)粒度的讀操作。RDD上的很多函數(shù)操作(如count和collect等)都是批量讀操作,即掃描整個數(shù)據(jù)集,可以將任務(wù)分配到距離數(shù)據(jù)最近的節(jié)點上。同時,RDD也支持細(xì)粒度操作,即在哈?;蚍秶謪^(qū)的RDD上執(zhí)行關(guān)鍵字查找。
后續(xù)將算子從兩個維度結(jié)合在3.3節(jié)對RDD算子進行詳細(xì)介紹。
1)Transformations(變換)和Action(行動)算子維度。
2)在Transformations算子中再將數(shù)據(jù)類型維度細(xì)分為:Value數(shù)據(jù)類型和Key-Value對數(shù)據(jù)類型的Transformations算子。Value型數(shù)據(jù)的算子封裝在RDD類中可以直接使用,Key-Value 對數(shù)據(jù)類型的算子封裝于PairRDDFunctions類中,用戶需要引入import org.apache.spark.SparkContext._才能夠使用。進行這樣的細(xì)分是由于不同的數(shù)據(jù)類型處理思想不太一樣,同時有些算子是不同的。
3.2.3 Spark的數(shù)據(jù)存儲
Spark數(shù)據(jù)存儲的核心是彈性分布式數(shù)據(jù)集(RDD)。RDD可以被抽象地理解為一個大的數(shù)組(Array),但是這個數(shù)組是分布在集群上的。邏輯上RDD的每個分區(qū)叫一個Partition。
在Spark的執(zhí)行過程中,RDD經(jīng)歷一個個的Transfomation算子之后,最后通過Action算子進行觸發(fā)操作。邏輯上每經(jīng)歷一次變換,就會將RDD轉(zhuǎn)換為一個新的RDD,RDD之間通過Lineage產(chǎn)生依賴關(guān)系,這個關(guān)系在容錯中有很重要的作用。變換的輸入和輸出都是RDD。RDD會被劃分成很多的分區(qū)分布到集群的多個節(jié)點中。分區(qū)是個邏輯概念,變換前后的新舊分區(qū)在物理上可能是同一塊內(nèi)存存儲。這是很重要的優(yōu)化,以防止函數(shù)式數(shù)據(jù)不變性(immutable)導(dǎo)致的內(nèi)存需求無限擴張。有些RDD是計算的中間結(jié)果,其分區(qū)并不一定有相應(yīng)的內(nèi)存或磁盤數(shù)據(jù)與之對應(yīng),如果要迭代使用數(shù)據(jù),可以調(diào)cache()函數(shù)緩存數(shù)據(jù)。
圖3-2為RDD的數(shù)據(jù)存儲模型。
[插圖]
圖3-2 RDD數(shù)據(jù)管理模型
圖3-2中的RDD_1含有5個分區(qū)(p1、p2、p3、p4、p5),分別存儲在4個節(jié)點(Node1、node2、Node3、Node4)中。RDD_2含有3個分區(qū)(p1、p2、p3),分布在3個節(jié)點(Node1、Node2、Node3)中。
在物理上,RDD對象實質(zhì)上是一個元數(shù)據(jù)結(jié)構(gòu),存儲著Block、Node等的映射關(guān)系,以及其他的元數(shù)據(jù)信息。一個RDD就是一組分區(qū),在物理數(shù)據(jù)存儲上,RDD的每個分區(qū)對應(yīng)的就是一個Block,Block可以存儲在內(nèi)存,當(dāng)內(nèi)存不夠時可以存儲到磁盤上。
每個Block中存儲著RDD所有數(shù)據(jù)項的一個子集,暴露給用戶的可以是一個Block的迭代器(例如,用戶可以通過mapPartitions獲得分區(qū)迭代器進行操作),也可以就是一個數(shù)據(jù)項(例如,通過map函數(shù)對每個數(shù)據(jù)項并行計算)。本書會在后面章節(jié)具體介紹數(shù)據(jù)管理的底層實現(xiàn)細(xì)節(jié)。
如果是從HDFS等外部存儲作為輸入數(shù)據(jù)源,數(shù)據(jù)按照HDFS中的數(shù)據(jù)分布策略進行數(shù)據(jù)分區(qū),HDFS中的一個Block對應(yīng)Spark的一個分區(qū)。同時Spark支持重分區(qū),數(shù)據(jù)通過Spark默認(rèn)的或者用戶自定義的分區(qū)器決定數(shù)據(jù)塊分布在哪些節(jié)點。例如,支持Hash分區(qū)(按照數(shù)據(jù)項的Key值取Hash值,Hash值相同的元素放入同一個分區(qū)之內(nèi))和Range分區(qū)(將屬于同一數(shù)據(jù)范圍的數(shù)據(jù)放入同一分區(qū))等分區(qū)策略。
下面具體介紹這些算子的功能。
3.3 Spark算子分類及功能
本節(jié)將主要介紹Spark算子的作用,以及算子的分類。
1.Saprk算子的作用
圖3-3描述了Spark的輸入、運行轉(zhuǎn)換、輸出。在運行轉(zhuǎn)換中通過算子對RDD進行轉(zhuǎn)換。算子是RDD中定義的函數(shù),可以對RDD中的數(shù)據(jù)進行轉(zhuǎn)換和操作。
[插圖]
圖3-3 Spark算子和數(shù)據(jù)空間
1)輸入:在Spark程序運行中,數(shù)據(jù)從外部數(shù)據(jù)空間(如分布式存儲:textFile讀取HDFS等,parallelize方法輸入Scala集合或數(shù)據(jù))輸入Spark,數(shù)據(jù)進入Spark運行時數(shù)據(jù)空間,轉(zhuǎn)化為Spark中的數(shù)據(jù)塊,通過BlockManager進行管理。
2)運行:在Spark數(shù)據(jù)輸入形成RDD后便可以通過變換算子,如fliter等,對數(shù)據(jù)進行操作并將RDD轉(zhuǎn)化為新的RDD,通過Action算子,觸發(fā)Spark提交作業(yè)。如果數(shù)據(jù)需要復(fù)用,可以通過Cache算子,將數(shù)據(jù)緩存到內(nèi)存。
3)輸出:程序運行結(jié)束數(shù)據(jù)會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS),或Scala數(shù)據(jù)或集合中(collect輸出到Scala集合,count返回Scala int型數(shù)據(jù))。
Spark的核心數(shù)據(jù)模型是RDD,但RDD是個抽象類,具體由各子類實現(xiàn),如MappedRDD、ShuffledRDD等子類。Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)化成為RDD的子類。
2.算子的分類
大致可以分為三大類算子。
1)Value數(shù)據(jù)類型的Transformation算子,這種變換并不觸發(fā)提交作業(yè),針對處理的數(shù)據(jù)項是Value型的數(shù)據(jù)。
2)Key-Value數(shù)據(jù)類型的Transfromation算子,這種變換并不觸發(fā)提交作業(yè),針對處理的數(shù)據(jù)項是Key-Value型的數(shù)據(jù)對。
3)Action算子,這類算子會觸發(fā)SparkContext提交Job作業(yè)。
下面分別對這3類算子進行詳細(xì)介紹。
3.3.1 Value型Transformation算子
處理數(shù)據(jù)類型為Value型的Transformation算子可以根據(jù)RDD變換算子的輸入分區(qū)與輸出分區(qū)關(guān)系分為以下幾種類型。
1)輸入分區(qū)與輸出分區(qū)一對一型。
2)輸入分區(qū)與輸出分區(qū)多對一型。
3)輸入分區(qū)與輸出分區(qū)多對多型。
4)輸出分區(qū)為輸入分區(qū)子集型。
5)還有一種特殊的輸入與輸出分區(qū)一對一的算子類型:Cache型。Cache算子對RDD分區(qū)進行緩存。
1.輸入分區(qū)與輸出分區(qū)一對一型
(1)map
將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素。源碼中的map算子相當(dāng)于初始化一個RDD,新RDD叫作MappedRDD(this,sc.clean(f))。
圖3-4中的每個方框表示一個RDD分區(qū),左側(cè)的分區(qū)經(jīng)過用戶自定義函數(shù)f:T->U映射為右側(cè)的新的RDD分區(qū)。但是實際只有等到Action算子觸發(fā)后,這個f函數(shù)才會和其他函數(shù)在一個Stage中對數(shù)據(jù)進行運算。V1輸入f轉(zhuǎn)換輸出V’1。
[插圖]
圖3-4 map算子對RDD轉(zhuǎn)換
(2)flatMap
將原來RDD中的每個元素通過函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD的每個集合中的元素合并為一個集合。內(nèi)部創(chuàng)建 FlatMappedRDD(this,sc.clean(f))。
圖3-5中小方框表示RDD的一個分區(qū),對分區(qū)進行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->U,T和U可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。外部大方框可以認(rèn)為是一個RDD分區(qū),小方框代表一個集合。V1、V2、V3在一個集合作為RDD的一個數(shù)據(jù)項,轉(zhuǎn)換為V’1、V’2、V’3后,將結(jié)合拆散,形成為RDD中的數(shù)據(jù)項。
[插圖]
圖3-5 flapMap算子對RDD轉(zhuǎn)換
(3)mapPartitions
mapPartitions函數(shù)獲取到每個分區(qū)的迭代器,在函數(shù)中通過這個分區(qū)整體的迭代器對整個分區(qū)的元素進行操作。內(nèi)部實現(xiàn)是生成MapPartitionsRDD。圖3-6中的方框代表一個RDD分區(qū)。
圖3-6中,用戶通過函數(shù)f(iter)=>iter.filter(_>=3)對分區(qū)中的所有數(shù)據(jù)進行過濾,>=3的數(shù)據(jù)保留。一個方塊代表一個RDD分區(qū),含有1、2、3的分區(qū)過濾只剩下元素3。
[插圖]
圖3-6 mapPartitions算子對RDD轉(zhuǎn)換
(4)glom
glom函數(shù)將每個分區(qū)形成一個數(shù)組,內(nèi)部實現(xiàn)是返回的GlommedRDD。圖3-7中的每個方框代表一個RDD分區(qū)。
圖3-7中的方框代表一個分區(qū)。該圖表示含有V1、V2、V3的分區(qū)通過函數(shù)glom形成一個數(shù)組Array[(V1),(V2),(V3)]。
[插圖]
圖3-7 glom算子對RDD轉(zhuǎn)換
2.輸入分區(qū)與輸出分區(qū)多對一型
(1)union
使用union函數(shù)時需要保證兩個RDD元素的數(shù)據(jù)類型相同,返回的RDD數(shù)據(jù)類型和被合并的RDD元素數(shù)據(jù)類型相同,并不進行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號相當(dāng)于uion函數(shù)操作。
圖3-8中左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD。V1、V1、V2、V8形成一個分區(qū),其他元素同理進行合并。
(2)cartesian
對兩個RDD內(nèi)的所有元素進行笛卡爾積操作。操作后,內(nèi)部實現(xiàn)返回CartesianRDD。圖3-9中左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。
[插圖]
圖3-8 union算子對RDD轉(zhuǎn)換
圖3-9中的大方框代表RDD,大方框中的小方框代表RDD分區(qū)。例如,V1和另一個RDD中的W1、W2、Q5進行笛卡爾積運算形成(V1,W1)、(V1,W2)、(V1,Q5)。
[插圖]
圖3-9 cartesian算子對RDD轉(zhuǎn)換
3.輸入分區(qū)與輸出分區(qū)多對多型
groupBy:將元素通過函數(shù)生成相應(yīng)的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value 格式,之后將Key相同的元素分為一組。
函數(shù)實現(xiàn)如下。
①sc.clean()函數(shù)將用戶函數(shù)預(yù)處理:
val cleanF=sc.clean(f)
②對數(shù)據(jù)map進行函數(shù)操作,最后再對groupByKey進行分組操作。
this.map(t=>(cleanF(t),t)).groupByKey(p)
其中,p中確定了分區(qū)個數(shù)和分區(qū)函數(shù),也就決定了并行化的程度。圖3-10中的方框代表RDD分區(qū)。
圖3-10中的方框代表一個RDD分區(qū),相同key的元素合并到一個組。例如,V1,V2合并為一個Key-Value對,其中key為“V”,Value為“V1,V2”,形成V,Seq(V1,V2)。
[插圖]
圖3-10 groupBy算子對RDD轉(zhuǎn)換
4.輸出分區(qū)為輸入分區(qū)子集型
(1)filter
filter的功能是對元素進行過濾,對每個元素應(yīng)用f函數(shù),返回值為true的元素在RDD中保留,返回為false的將過濾掉。內(nèi)部實現(xiàn)相當(dāng)于生成FilteredRDD(this,sc.clean(f))。
下面代碼為函數(shù)的本質(zhì)實現(xiàn)。
def filter(f:T=>Boolean):RDD[T]=new FilteredRDD(this,sc.clean(f))
圖3-11中的每個方框代表一個RDD分區(qū)。T可以是任意的類型。通過用戶自定義的過濾函數(shù)f,對每個數(shù)據(jù)項進行操作,將滿足條件,返回結(jié)果為true的數(shù)據(jù)項保留。例如,過濾掉V2、V3保留了V1,將區(qū)分命名為V1'。
[插圖]
圖3-11 filter算子對RDD轉(zhuǎn)換
(2)distinct
distinct將RDD中的元素進行去重操作。圖3-12中的方框代表RDD分區(qū)。
圖3-12中的每個方框代表一個分區(qū),通過distinct函數(shù),將數(shù)據(jù)去重。例如,重復(fù)數(shù)據(jù)V1、V1去重后只保留一份V1。
[插圖]
圖3-12 distinct算子對RDD轉(zhuǎn)換
(3)subtract
subtract相當(dāng)于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。
圖3-13中左側(cè)的大方框代表兩個RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。V1在兩個RDD中均有,根據(jù)差集運算規(guī)則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。
[插圖]
圖3-13 subtract算子對RDD轉(zhuǎn)換
(4)sample
sample將RDD這個集合內(nèi)的元素進行采樣,獲取所有元素的子集。用戶可以設(shè)定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。
內(nèi)部實現(xiàn)是生成SampledRDD(withReplacement,fraction,seed)。
函數(shù)參數(shù)設(shè)置如下。
□withReplacement=true,表示有放回的抽樣;
□withReplacement=false,表示無放回的抽樣。
圖3-14中的每個方框是一個RDD分區(qū)。通過sample函數(shù),采樣50%的數(shù)據(jù)。V1、V2、U1、U2、U3、U4采樣出數(shù)據(jù)V1和U1、U2,形成新的RDD。
(5)takeSample
takeSample()函數(shù)和上面的sample函數(shù)是一個原理,但是不使用相對比例采樣,而是按設(shè)定的采樣個數(shù)進行采樣,同時返回結(jié)果不再是RDD,而是相當(dāng)于對采樣后的數(shù)據(jù)進行Collect(),返回結(jié)果的集合為單機的數(shù)組。
圖3-15中左側(cè)的方框代表分布式的各個節(jié)點上的分區(qū),右側(cè)方框代表單機上返回的結(jié)果數(shù)組。通過takeSample對數(shù)據(jù)采樣,設(shè)置為采樣一份數(shù)據(jù),返回結(jié)果為V1。
[插圖]
圖3-14 sample算子對RDD轉(zhuǎn)換
5.Cache型
(1)cache
cache將RDD元素從磁盤緩存到內(nèi)存,相當(dāng)于persist(MEMORY_ONLY)函數(shù)的功能。圖3-14中的方框代表RDD分區(qū)。
圖3-16中的每個方框代表一個RDD分區(qū),左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲在磁盤,通過cache算子將數(shù)據(jù)緩存在內(nèi)存。
[插圖]
圖3-15 takeSample算子對RDD轉(zhuǎn)換
[插圖]
圖3-16 cache算子對RDD轉(zhuǎn)換
(2)persist
persist函數(shù)對RDD進行緩存操作。數(shù)據(jù)緩存在哪里由StorageLevel枚舉類型確定。有以下幾種類型的組合(見圖3-15),DISK代表磁盤,MEMORY代表內(nèi)存,SER代表數(shù)據(jù)是否進行序列化存儲。
下面為函數(shù)定義,StorageLevel是枚舉類型,代表存儲模式,用戶可以通過圖3-17按需選擇。
persist(newLevel:Stor ageLevel)
圖3-17中列出persist函數(shù)可以緩存的模式。例如,MEMORY_AND_DISK_SER代表數(shù)據(jù)可以存儲在內(nèi)存和磁盤,并且以序列化的方式存儲。其他同理。
[插圖]
圖3-17 persist算子對RDD轉(zhuǎn)換
圖3-18中的方框代表RDD分區(qū)。disk代表存儲在磁盤,mem代表存儲在內(nèi)存。數(shù)據(jù)最初全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存,但是有的分區(qū)無法容納在內(nèi)存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內(nèi)存。
[插圖]
圖3-18 Persist算子對RDD轉(zhuǎn)換
3.3.2 Key-Value型Transformation算子
Transformation處理的數(shù)據(jù)為Key-Value形式的算子,大致可以分為3種類型:輸入分區(qū)與輸出分區(qū)一對一、聚集、連接操作。
1.輸入分區(qū)與輸出分區(qū)一對一
mapValues:針對(Key,Value)型數(shù)據(jù)中的 Value進行Map操作,而不對Key進行處理。
圖3-19中的方框代表RDD分區(qū)。a=>a+2代表只對(V1,1)數(shù)據(jù)中的1進行加2操作,返回結(jié)果為3。
[插圖]
圖3-19 mapValues算子RDD對轉(zhuǎn)換
2.對單個RDD或兩個RDD聚集
(1)單個RDD聚集
1)combineByKey。
定義combineByKey算子的代碼如下。
combineByKey[C](createCombiner:(V)? C,
mergeValue:(C,V)? C,
mergeCombiners:(C,C)? C,
partitioner:Partitioner
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
說明:
□createCombiner:V=>C,在C不存在的情況下,如通過V創(chuàng)建seq C。
□mergeValue:(C,V)=>C,當(dāng)C已經(jīng)存在的情況下,需要merge,如把item V加到seq C中,或者疊加。
□mergeCombiners:(C,C)=>C,合并兩個C。
□partitioner:Partitioner(分區(qū)器),Shuffle時需要通過Partitioner的分區(qū)策略進行分區(qū)。
□mapSideCombine:Boolean=true,為了減小傳輸量,很多combine可以在map端先做。例如,疊加可以先在一個partition中把所有相同的Key的Value疊加,再shuffle。
□serializerClass:String=null,傳輸需要序列化,用戶可以自定義序列化類。
例如,相當(dāng)于將元素為(Int,Int)的RDD轉(zhuǎn)變?yōu)榱耍↖nt,Seq[Int])類型元素的RDD。
圖3-20中的方框代表RDD分區(qū)。通過combineByKey,將(V1,2)、(V1,1)數(shù)據(jù)合并為(V1,Seq(2,1))。
[插圖]
圖3-20 comBineByKey算子對RDD轉(zhuǎn)換
2)reduceByKey。
reduceByKey是更簡單的一種情況,只是兩個值合并成一個值,所以createCombiner很簡單,就是直接返回v,而mergeValue和mergeCombiners的邏輯相同,沒有區(qū)別。
函數(shù)實現(xiàn)代碼如下。
def reduceByKey(partitioner:Partitioner,func:(V,V)=>V):RDD[(K,V)]={
combineByKey[V]((v:V)=>v,func,func,partitioner)
}
圖3-21中的方框代表RDD分區(qū)。通過用戶自定義函數(shù)(A,B)=>(A+B),將相同Key的數(shù)據(jù)(V1,2)、(V1,1)的value相加,結(jié)果為(V1,3)。
[插圖]
圖3-21 reduceByKey算子對RDD轉(zhuǎn)換
3)partitionBy。
partitionBy函數(shù)對RDD進行分區(qū)操作。
函數(shù)定義如下。
partitionBy(partitioner:Partitioner)
如果原有RDD的分區(qū)器和現(xiàn)有分區(qū)器(partitioner)一致,則不重分區(qū),如果不一致,則相當(dāng)于根據(jù)分區(qū)器生成一個新的ShuffledRDD。
圖3-22中的方框代表RDD分區(qū)。通過新的分區(qū)策略將原來在不同分區(qū)的V1、V2數(shù)據(jù)都合并到了一個分區(qū)。
[插圖]
圖3-22 partitionBy算子對RDD轉(zhuǎn)換
(2)對兩個RDD進行聚集
cogroup函數(shù)將兩個RDD進行協(xié)同劃分,cogroup函數(shù)的定義如下。
cogroup[W](other:RDD[(K,W)],numPartitions:Int):RDD[(K,(Iterable[V],Iterable[W]))]
對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,并且返回兩個RDD中對應(yīng)Key的元素集合的迭代器。
(K,(Iterable[V],Iterable[W]))
其中,Key和Value,Value是兩個RDD下相同Key的兩個數(shù)據(jù)集合的迭代器所構(gòu)成的元組。
圖3-23中的大方框代表RDD,大方框內(nèi)的小方框代表RDD中的分區(qū)。將RDD1中的數(shù)據(jù)(U1,1)、(U1,2)和RDD2中的數(shù)據(jù)(U1,2)合并為(U1,((1,2),(2)))。
[插圖]
圖3-23 Cogroup算子對RDD轉(zhuǎn)換
3.連接
(1)join
□oin對兩個需要連接的RDD進行cogroup函數(shù)操作,cogroup原理請見上文。cogroup操作之后形成的新RDD,對每個key下的元素進行笛卡爾積操作,返回的結(jié)果再展平,對應(yīng)Key下的所有元組形成一個集合,最后返回RDD[(K,(V,W))]
下面代碼為join的函數(shù)實現(xiàn),本質(zhì)是通過cogroup算子先進行協(xié)同劃分,再通過flatMapValues將合并的數(shù)據(jù)打散。
this.cogroup(other,partitioner).flatMapValues { case(vs,ws)=>
for(v <- vs;w <- ws)yield(v,w)}
圖3-24是對兩個RDD的join操作示意圖。大方框代表RDD,小方框代表RDD中的分區(qū)。函數(shù)對擁有相同Key的元素(例如V1)為Key,以做連接后的數(shù)據(jù)結(jié)果為(V1,(1,1))和(V1,(1,2))。
[插圖]
圖3-24 join算子對RDD轉(zhuǎn)換
(2)leftOutJoin和rightOutJoin
LeftOutJoin(左外連接)和RightOutJoin(右外連接)相當(dāng)于在join的基礎(chǔ)上先判斷一側(cè)的RDD元素是否為空,如果為空,則填充為空。如果不為空,則將數(shù)據(jù)進行連接運算,并返回結(jié)果。
下面代碼是leftOutJoin的實現(xiàn)。
if(ws.isEmpty){
vs.map(v=>(v,None))
}else {
for(v <- vs;w <- ws)yield(v,Some(w))
}
3.3.3 Actions算子
本質(zhì)上在Actions算子中通過SparkContext執(zhí)行提交作業(yè)的runJob操作,觸發(fā)了RDD DAG的執(zhí)行。
例如,Actions算子collect函數(shù)的代碼如下,感興趣的讀者可以順著這個入口進行源碼剖析。
/*返回這個RDD的所有數(shù)據(jù),結(jié)果以數(shù)組形式存儲*/
def collect():Array[T]={
/*提交Job*/
val results=sc.runJob(this,(iter:Iterator[T])=>iter.toArray)
Array.concat(results:_*)
}
下面根據(jù)Action算子的輸出空間將Action算子進行分類:無輸出、HDFS、Scala集合和數(shù)據(jù)類型。
1.無輸出
(1)foreach
對RDD中的每個元素都應(yīng)用f函數(shù)操作,不返回RDD和Array,而是返回Uint。
圖3-25表示foreach算子通過用戶自定義函數(shù)對每個數(shù)據(jù)項進行操作。本例中自定義函數(shù)為println(),控制臺打印所有數(shù)據(jù)項。
2.HDFS
(1)saveAsTextFile
函數(shù)將數(shù)據(jù)輸出,存儲到HDFS的指定目錄。
下面為函數(shù)的內(nèi)部實現(xiàn)。
this.map(x=>(NullWritable.get(),new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)
將RDD中的每個元素映射轉(zhuǎn)變?yōu)?Null,x.toString),然后再將其寫入HDFS。
圖3-26中左側(cè)的方框代表RDD分區(qū),右側(cè)方框代表HDFS的Block。通過函數(shù)將RDD的每個分區(qū)存儲為HDFS中的一個Block。
[插圖]
圖3-25 foreach算子對RDD轉(zhuǎn)換
[插圖]
圖3-26 saveAsHadoopFile算子對RDD轉(zhuǎn)換
(2)saveAsObjectFile
saveAsObjectFile將分區(qū)中的每10個元素組成一個Array,然后將這個Array序列化,映射為(Null,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式。
下面代碼為函數(shù)內(nèi)部實現(xiàn)。
map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
圖3-27中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表HDFS的Block。通過函數(shù)將RDD的每個分區(qū)存儲為HDFS上的一個Block。
[插圖]
圖3-27 saveAsObjectFile算子對RDD轉(zhuǎn)換
3.Scala集合和數(shù)據(jù)類型
(1)collect
collect相當(dāng)于toArray,toArray已經(jīng)過時不推薦使用,collect將分布式的RDD返回為一個單機的scala Array數(shù)組。在這個數(shù)組上運用scala的函數(shù)式操作。
圖3-28中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機內(nèi)存中的數(shù)組。通過函數(shù)操作,將結(jié)果返回到Driver程序所在的節(jié)點,以數(shù)組形式存儲。
(2)collectAsMap
collectAsMap對(K,V)型的RDD數(shù)據(jù)返回一個單機HashMap。對于重復(fù)K的RDD元素,后面的元素覆蓋前面的元素。
圖3-29中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機數(shù)組。數(shù)據(jù)通過collectAsMap函數(shù)返回給Driver程序計算結(jié)果,結(jié)果以HashMap形式存儲。
[插圖]
圖3-28 Collect算子對RDD轉(zhuǎn)換
[插圖]
圖3-29 collectAsMap算子對RDD轉(zhuǎn)換
(3)reduceByKeyLocally
實現(xiàn)的是先reduce再collectAsMap的功能,先對RDD的整體進行reduce操作,然
后再收集所有結(jié)果返回為一個HashMap。
(4)lookup
下面代碼為lookup的聲明。
lookup(key:K):Seq[V]
Lookup函數(shù)對(Key,Value)型的RDD操作,返回指定Key對應(yīng)的元素形成的Seq。這個函數(shù)處理優(yōu)化的部分在于,如果這個RDD包含分區(qū)器,則只會對應(yīng)處理K所在的分區(qū),然后返回由(K,V)形成的Seq。如果RDD不包含分區(qū)器,則需要對全RDD元素進行暴力掃描處理,搜索指定K對應(yīng)的元素。
圖3-30中的左側(cè)方框代表RDD分區(qū),右側(cè)方框代表Seq,最后結(jié)果返回到Driver所在節(jié)點的應(yīng)用中。
(5)count
count返回整個RDD的元素個數(shù)。內(nèi)部函數(shù)實現(xiàn)如下。
Def count():Long=sc.runJob(this,Utils.getIteratorSize_).sum
在圖3-31中,返回數(shù)據(jù)的個數(shù)為5。一個方塊代表一個RDD分區(qū)。
[插圖]
圖3-30 lookup對RDD轉(zhuǎn)換
[插圖]
圖3-31 count對RDD轉(zhuǎn)換
(6)top
top可返回最大的k個元素。函數(shù)定義如下。
top(num:Int)(implicit ord:Ordering[T]):Array[T]
相近函數(shù)說明如下。
□top返回最大的k個元素。
□take返回最小的k個元素。
□takeOrdered返回最小的k個元素,并且在返回的數(shù)組中保持元素的順序。
□first相當(dāng)于top(1)返回整個RDD中的前k個元素,可以定義排序的方式Ordering[T]。返回的是一個含前k個元素的數(shù)組。
(7)reduce
reduce函數(shù)相當(dāng)于對RDD中的元素進行reduceLeft函數(shù)的操作。函數(shù)實現(xiàn)如下。
Some(iter.reduceLeft(cleanF))
reduceLeft先對兩個元素<K,V>進行reduce函數(shù)操作,然后將結(jié)果和迭代器取出的下一個元素<k,V>進行reduce函數(shù)操作,直到迭代器遍歷完所有元素,得到最后結(jié)果。
在RDD中,先對每個分區(qū)中的所有元素<K,V>的集合分別進行reduceLeft。每個分區(qū)形成的結(jié)果相當(dāng)于一個元素<K,V>,再對這個結(jié)果集合進行reduceleft操作。
例如:用戶自定義函數(shù)如下。
f:(A,B)=>(A._1+"@"+B._1,A._2+B._2)
圖3-32中的方框代表一個RDD分區(qū),通過用戶自定函數(shù)f將數(shù)據(jù)進行reduce運算。示例最后的返回結(jié)果為V1@[插圖]V2U!@U2@U3@U4,12。
[插圖]
圖3-32 reduce算子對RDD轉(zhuǎn)換
(8)fold
fold和reduce的原理相同,但是與reduce不同,相當(dāng)于每個reduce時,迭代器取的第一個元素是zeroValue。
圖3-33中通過下面的用戶自定義函數(shù)進行fold運算,圖中的一個方框代表一個RDD分區(qū)。讀者可以參照(7)reduce函數(shù)理解。
fold(("V0@",2))((A,B)=>(A._1+"@"+B._1,A._2+B._2))
[插圖]
圖3-33 fold算子對RDD轉(zhuǎn)換
(9)aggregate
aggregate先對每個分區(qū)的所有元素進行aggregate操作,再對分區(qū)的結(jié)果進行fold操作。
aggreagate與fold和reduce的不同之處在于,aggregate相當(dāng)于采用歸并的方式進行數(shù)據(jù)聚集,這種聚集是并行化的。而在fold和reduce函數(shù)的運算過程中,每個分區(qū)中需要進行串行處理,每個分區(qū)串行計算完結(jié)果,結(jié)果再按之前的方式進行聚集,并返回最終聚集結(jié)果。
函數(shù)的定義如下。
aggregate[B](z:B)(seqop:(B,A)? B,combop:(B,B)? B):B
圖3-34通過用戶自定義函數(shù)對RDD 進行aggregate的聚集操作,圖中的每個方框代表一個RDD分區(qū)。
rdd.aggregate("V0@",2)((A,B)=>(A._1+"@"+B._1,A._2+B._2)),
(A,B)=>(A._1+"@"+B_1,A._@+B_.2))
最后,介紹兩個計算模型中的兩個特殊變量。
廣播(broadcast)變量:其廣泛用于廣播Map Side Join中的小表,以及廣播大變量等場景。這些數(shù)據(jù)集合在單節(jié)點內(nèi)存能夠容納,不需要像RDD那樣在節(jié)點之間打散存儲。Spark運行時把廣播變量數(shù)據(jù)發(fā)到各個節(jié)點,并保存下來,后續(xù)計算可以復(fù)用。相比Hadoop的distributed cache,廣播的內(nèi)容可以跨作業(yè)共享。Broadcast的底層實現(xiàn)采用了BT機制。有興趣的讀者可以參考論文[插圖]。
[插圖]
圖3-34 aggregate算子對RDD轉(zhuǎn)換
㈡代表V。㈢代表U。
accumulator變量:允許做全局累加操作,如accumulator變量廣泛使用在應(yīng)用中記錄當(dāng)前的運行指標(biāo)的情景。
3.4 本章小結(jié)
本章主要介紹了Spark的計算模型,Spark將應(yīng)用程序整體翻譯為一個有向無環(huán)圖進行調(diào)度和執(zhí)行。相比MapReduce,Spark提供了更加優(yōu)化和復(fù)雜的執(zhí)行流。
讀者還可以深入了解Spark的運行機制與Spark算子,這樣能更加直觀地了解API的使用。Spark提供了更加豐富的函數(shù)式算子,這樣就為Spark上層組件的開發(fā)奠定了堅實的基礎(chǔ)。
通過閱讀本章,讀者可以對Spark計算模型進行更為宏觀的把握。相信讀者還想對Spark內(nèi)部執(zhí)行機制進行更深入的了解,下面章節(jié)就對Spark的內(nèi)核進行更深入的剖析。