4.3 RDD操作
RDD提供了一個(gè)抽象的分布式數(shù)據(jù)架構(gòu),我們不必?fù)?dān)心底層數(shù)據(jù)的分布式特性,而應(yīng)用邏輯可以表達(dá)為一系列轉(zhuǎn)換處理。
通常應(yīng)用邏輯是以一系列轉(zhuǎn)換(Transformation)和執(zhí)行(Action)來表達(dá)的,前者在RDD之間指定處理的相互依賴關(guān)系,后者指定輸出的形式。
其中:
□轉(zhuǎn)換:是指該操作從已經(jīng)存在的數(shù)據(jù)集上創(chuàng)建一個(gè)新的數(shù)據(jù)集,是數(shù)據(jù)集的邏輯操作,并沒有真正計(jì)算。
□執(zhí)行:是指該方法提交一個(gè)與前一個(gè)Action之間的所有Transformation組成的Job進(jìn)行計(jì)算,Spark會(huì)根據(jù)Action將作業(yè)切分成多個(gè)Job。
比如,Map操作傳遞數(shù)據(jù)集中的每一個(gè)元素經(jīng)過一個(gè)函數(shù),形成一個(gè)新的RDD轉(zhuǎn)換結(jié)果,而Reduce操作通過一些函數(shù)對(duì)RDD的所有元素進(jìn)行操作,并返回最終結(jié)果給Driver程序。
在默認(rèn)情況下,Spark所有的轉(zhuǎn)換操作都是惰性(Lazy)的,每個(gè)被轉(zhuǎn)換得到的RDD不會(huì)立即計(jì)算出結(jié)果,只是記下該轉(zhuǎn)換操作應(yīng)用的一些基礎(chǔ)數(shù)據(jù)集,可以有多個(gè)轉(zhuǎn)換結(jié)果。轉(zhuǎn)換只有在遇到一個(gè)Action時(shí)才會(huì)執(zhí)行,如圖4-2所示。
[插圖]
圖4-2 Spark轉(zhuǎn)換和執(zhí)行
這種設(shè)計(jì)使得Spark以更高的效率運(yùn)行。例如,可以通過將要在Reduce操作中使用的Map轉(zhuǎn)換來創(chuàng)建一個(gè)數(shù)據(jù)集,并且只返回Reduce的結(jié)果給驅(qū)動(dòng)程序,而不是整個(gè)Map所得的數(shù)據(jù)集。
每當(dāng)一個(gè)Job計(jì)算完成,其內(nèi)部的所有RDD都會(huì)被清除,如果在下一個(gè)Job中有用到其他Job中的RDD,會(huì)引發(fā)該RDD的再次計(jì)算,為避免這種情況,我們可以使用Persist(默認(rèn)是Cache)方法“持久化”一個(gè)RDD到內(nèi)存中。在這種情況下,Spark將會(huì)在集群中保留這個(gè)RDD,以便其他Job可以更快地訪問,另外,Spark也支持持久化RDD到磁盤中,或者復(fù)制RDD到各個(gè)節(jié)點(diǎn)。
下面,通過幾行簡單的程序,進(jìn)一步說明RDD的基礎(chǔ)知識(shí)。
? ? ? ? ? val lines=sc.textFile("data.txt")
val lineLengths=lines.map(s=>s.length)
val totalLength=lineLengths.reduce((a,b)=>a+b)
第一行讀取外部文件data.txt返回一個(gè)基礎(chǔ)的MappedRDD,該MappedRDD并不加載到內(nèi)存中或被執(zhí)行操作,lines只是記錄轉(zhuǎn)換操作結(jié)果的指針。
第二行定義了lineLengths作為一個(gè)Map轉(zhuǎn)換的結(jié)果,由于惰性機(jī)制的存在,lineLengths的值不會(huì)立即計(jì)算。
最后,運(yùn)行Reduce,該操作為一個(gè)Action。Spark將計(jì)算打散成多個(gè)任務(wù)以便在不同的機(jī)器上分別運(yùn)行,每臺(tái)機(jī)器并行運(yùn)行Map,并將結(jié)果進(jìn)行Reduce操作,返回結(jié)果值Driver程序。
如果需要繼續(xù)使用lineLengths,可以添加緩存Persist或Cache,該持久化會(huì)在執(zhí)行Reduce之前,第一次計(jì)算成功之后,將lineLengths保存在內(nèi)存中。
4.3.1 轉(zhuǎn)換操作
轉(zhuǎn)換操作是RDD的核心之一,通過轉(zhuǎn)換操作實(shí)現(xiàn)不同的RDD結(jié)果,作為下一次RDD計(jì)算的數(shù)據(jù)輸入,轉(zhuǎn)換操作不會(huì)觸發(fā)Job的提交,僅僅是標(biāo)記對(duì)RDD的操作,形成DAG圖,以供Action觸發(fā)Job提交后調(diào)用。
常用的轉(zhuǎn)換操作包括:基礎(chǔ)轉(zhuǎn)換操作和鍵-值轉(zhuǎn)換操作。
1.基礎(chǔ)轉(zhuǎn)換操作
表4-2列出了目前支持的基礎(chǔ)轉(zhuǎn)換操作,具體內(nèi)容請(qǐng)參見RDD的API官方文檔,以獲得更多的細(xì)節(jié)。
表4-2 基礎(chǔ)轉(zhuǎn)換操作
[插圖]
(續(xù))
[插圖]
2.鍵-值轉(zhuǎn)換操作
盡管大多數(shù)Spark操作都基于包含各種類型對(duì)象的RDD,但是一小部分特殊的卻只能在鍵-值對(duì)形式的RDD上執(zhí)行。其中,最普遍的就是分布式“洗牌”(shuffle)操作,比如通過鍵進(jìn)行分組或聚合元素。
例如,使用reduceByKey操作對(duì)文件中每行出現(xiàn)的文字次數(shù)進(jìn)行計(jì)數(shù),各種語言的示例如下。
在Scala中,只要在程序中導(dǎo)入org.apache.spark.SparkContext,就能使用Spark的隱式轉(zhuǎn)換,這些操作就可用于包含二元組對(duì)象的RDD(Scala中的內(nèi)建元組,可通過(a,b)創(chuàng)建),鍵-值對(duì)操作可用PairRDDFunction類,如果導(dǎo)入了轉(zhuǎn)換,該類將自動(dòng)封裝元組RDD。
? ? ? ? ? ? val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
基于counts,可以使用counts.sortByKey()按字母表順序?qū)@些鍵-值對(duì)排序,然后使用counts.collect(),以對(duì)象數(shù)組的形式向Driver返回結(jié)果。
順便說一句,進(jìn)行分組的groupByKey不進(jìn)行本地合并,而進(jìn)行聚合的reduceByKey會(huì)在本地對(duì)每個(gè)分區(qū)的數(shù)據(jù)合并后再做Shuffle,效率比groupByKey高得多。下面通過幾行基于Scala的代碼對(duì)鍵-值轉(zhuǎn)換操作進(jìn)行說明。
// 初始化List
scala>val data = List(("a",1),("b",1),("c",1),("a",2),("b",2),("c",2))
data: List[(String, Int)] = List((a,1), (b,1), (c,1), (a,2), (b,2), (c,2))
// 并行數(shù)組創(chuàng)建RDD
scala>val rdd =sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0]
// 按照key進(jìn)行reduceByKey操作
scala>val rbk = rdd.reduceByKey(_+_).collect
rbk: Array[(String, Int)] = Array((a,3), (b,3), (c,3))
// 按照key進(jìn)行g(shù)roupByKey操作
scala>val gbk = rdd.groupByKey().collect
gbk:? Array[(String,? Iterable[Int])]? =? Array((a,CompactBuffer(1,? 2)),? (b,
CompactBuffer(1, 2)), (c,CompactBuffer(1, 2)))
// 按照key進(jìn)行sortByKey操作
scala>val sbk = rdd.sortByKey().collect
sbk: Array[(String, Int)] = Array((a,1), (a,2), (b,1), (b,2), (c,1), (c,2))
表4-3列出了常用的健-值轉(zhuǎn)換。
表4-3 常用的鍵-值轉(zhuǎn)換
[插圖]
4.3.2 執(zhí)行操作
Spark將提交的Action與前一個(gè)Action之間的所有Transformation組成的Job進(jìn)行計(jì)算,并根據(jù)Action將作業(yè)切分成多個(gè)Job,指定Transformation的輸出結(jié)果。
1.常用執(zhí)行操作
這里以加載Spark自帶的本地文件README.md文件進(jìn)行測試,返回一個(gè)MappedRDD文件,進(jìn)行Filter轉(zhuǎn)換操作和Count執(zhí)行。
? ? ? ? ? // 讀取README.md數(shù)據(jù),并轉(zhuǎn)化為RDD
scala>val data = sc.textFile("file:///$SPARK_HOME/README.md")
data: org.apache.spark.rdd.RDD[String] = file:///$SPARK_HOME/README.md MappedRDD[1]
// 執(zhí)行f ilter操作,提取帶有"Spark"的子集
scala>val datafilter = data.filter(line =>line.contains("Spark"))
datafilter: org.apache.spark.rdd.RDD[String] = FilteredRDD[2]
// 執(zhí)行Action操作,輸出結(jié)果
scala>val datacount = datafilter.count()
datacount: Long = 21
如果想了解更多,請(qǐng)參考表4-4中列出的常用的執(zhí)行操作。
表4-4 常用的執(zhí)行操作
[插圖]
通過常用執(zhí)行操作,Spark可以實(shí)現(xiàn)大部分MapReduce流式計(jì)算的任務(wù),提升了計(jì)算效率,對(duì)Transformation操作進(jìn)行結(jié)果值輸出。
2.存儲(chǔ)執(zhí)行操作
常用存儲(chǔ)操作主要包含的執(zhí)行如表4-5所示。
表4-5 常用存儲(chǔ)操作包含的執(zhí)行
[插圖]
存儲(chǔ)執(zhí)行操作將結(jié)果進(jìn)行保存,以文本、序列化文件、對(duì)象文件的方式輸出到存儲(chǔ)設(shè)備進(jìn)行持久化。
4.3.3 控制操作
控制操作主要包括故障恢復(fù)、數(shù)據(jù)持久性,以及移除數(shù)據(jù)。其中,緩存操作Cache/Pesist是惰性的,在進(jìn)行執(zhí)行操作時(shí)才會(huì)執(zhí)行,而Unpesist是即時(shí)的,會(huì)立即釋放內(nèi)存。checkpoint會(huì)直接將RDD持久化到磁盤或HDFS等路徑,不同于Cache/Persist的是,被checkpoint的RDD不會(huì)因作業(yè)的結(jié)束而被消除,會(huì)一直存在,并可以被后續(xù)的作業(yè)直接讀取并加載。
1. RDD故障恢復(fù)
在一個(gè)典型的分布式系統(tǒng)中,容錯(cuò)機(jī)制主要是采取檢查點(diǎn)(checkpoint)機(jī)制和數(shù)據(jù)備份機(jī)制。故障恢復(fù)是由主動(dòng)檢查,以及不同機(jī)器之間的數(shù)據(jù)復(fù)制實(shí)現(xiàn)的。由于進(jìn)行故障恢復(fù)需要跨集群網(wǎng)絡(luò)來復(fù)制大量數(shù)據(jù),這無疑是相當(dāng)昂貴的。因此,在Spark中則采取了不同的方法進(jìn)行故障恢復(fù)。
作為一個(gè)大型的分布式集群,Spark針對(duì)工作負(fù)載會(huì)做出兩種假設(shè):
□處理時(shí)間是有限的;
□保持?jǐn)?shù)據(jù)持久性是外部數(shù)據(jù)源的職責(zé),主要是讓處理過程中的數(shù)據(jù)保持穩(wěn)定。
基于假設(shè),Spark在執(zhí)行期間發(fā)生數(shù)據(jù)丟失時(shí)會(huì)選擇折中方案,它會(huì)重新執(zhí)行之前的步驟來恢復(fù)丟失的數(shù)據(jù),但并不是說丟棄之前所有已經(jīng)完成的工作,而重新開始再來一遍。
假如其中一個(gè)RDD壞掉,RDD中有記錄之前的依賴關(guān)系,且依賴關(guān)系中記錄算子和分區(qū)。此時(shí),僅僅需要再執(zhí)行一遍父RDD的相應(yīng)分區(qū)。
但是,跨寬依賴的再執(zhí)行能夠涉及多個(gè)父RDD,從而引發(fā)全部的再執(zhí)行。為了規(guī)避這一點(diǎn),Spark會(huì)保持Map階段中間數(shù)據(jù)輸出的持久,在機(jī)器發(fā)生故障的情況下,再執(zhí)行只需要回溯Mapper持續(xù)輸出的相應(yīng)分區(qū),來獲取中間數(shù)據(jù)。
Spark還提供了數(shù)據(jù)檢查點(diǎn)和記錄日志,用于持久化中間RDD,這樣再執(zhí)行就不必追溯到最開始的階段。通過比較恢復(fù)延遲和檢查點(diǎn)開銷進(jìn)行權(quán)衡,Spark會(huì)自動(dòng)化地選擇相應(yīng)的策略進(jìn)行故障恢復(fù)。
2. RDD持久化
Spark的持久化,是指在不同轉(zhuǎn)換操作之間,將過程數(shù)據(jù)緩存在內(nèi)存中,實(shí)現(xiàn)快速重用,或者故障快速恢復(fù)。持久化主要分為兩類,主動(dòng)持久化和自動(dòng)持久化。
主動(dòng)持久化,主要目標(biāo)是RDD重用,從而實(shí)現(xiàn)快速處理,是Spark構(gòu)建迭代算法的關(guān)鍵。例如,持久化一個(gè)RDD,每一個(gè)節(jié)點(diǎn)都將把它的計(jì)算分塊結(jié)果保存在內(nèi)存中,并在該數(shù)據(jù)集(或者衍生數(shù)據(jù)集)進(jìn)行的后續(xù)Action中重用,使得后續(xù)Action執(zhí)行變得更加迅速(通???0倍)。
可以使用persist()方法標(biāo)記一個(gè)持久化的RDD,一旦被一個(gè)執(zhí)行(action)觸發(fā)計(jì)算,它將會(huì)被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中并重用。如果RDD的任一分區(qū)丟失,通過使用原先創(chuàng)建的轉(zhuǎn)換操作,它將會(huì)被自動(dòng)重算,不需要全部重算,而只計(jì)算丟失的部分。
此外,每一個(gè)RDD都可以用不同的保存級(jí)別進(jìn)行保存,從而允許持久化數(shù)據(jù)集在硬盤或內(nèi)存作為序列化的Java對(duì)象(節(jié)省空間),甚至跨節(jié)點(diǎn)復(fù)制。
持久化的等級(jí)選擇,是通過將一個(gè)StorageLevel對(duì)象傳遞給persist()方法進(jìn)行確定的,cache()方法調(diào)用persist()的默認(rèn)級(jí)別MEMORY_ONLY。表4-6是持久化的等級(jí)。
表4-6 持久化的等級(jí)
[插圖]
相對(duì)于MEMORY_ONLY_SER,OFF_HEAP減小了垃圾回收的開銷,同時(shí)也允許Executor變得更小且可共享內(nèi)存儲(chǔ)備,Executor的崩潰不會(huì)導(dǎo)致內(nèi)存中的緩存丟失。在這種模式下,Tachyon中的內(nèi)存是不可丟棄的。
自動(dòng)持久化,是指不需要用戶調(diào)用persist(),Spark自動(dòng)地保存一些Shuffle操作(如reduceByKey)的中間結(jié)果。這樣做是為了避免在Shuffle過程中一個(gè)節(jié)點(diǎn)崩潰時(shí)重新計(jì)算所有的輸入。
持久化時(shí),一旦設(shè)置了就不能改變,想要改變就要先去持久化。推薦用戶在重用RDD結(jié)果時(shí)調(diào)用Persist,這樣會(huì)使持久化變得可控。
Persist持久化RDD,修改了RDD的meta info中的StorageLevel。而檢查點(diǎn)在持久化的同時(shí)切斷Lineage,修改了RDD的meta info中的Lineage。二者均返回經(jīng)過修改的RDD對(duì)象自身,而非新的RDD對(duì)象,也均屬于Lazy操作。
3. 選擇存儲(chǔ)等級(jí)
Spark的不同存儲(chǔ)級(jí)別,旨在滿足內(nèi)存使用和CPU效率權(quán)衡上的不同需求,建議通過以下步驟進(jìn)行選擇:
□如果你的RDD可以很好地與默認(rèn)的存儲(chǔ)級(jí)別(MEMORY_ONLY)契合,那么就不需要做任何修改。這已經(jīng)是CPU使用效率最高的選項(xiàng),它使RDD的操作盡可能快。
□如果不能與MEMORY_ONLY很好地契合,建議使用MEMORY_ONLY_SER并選擇一個(gè)快速序列化的庫,使對(duì)象在有較高空間使用率的情況下,依然可以較快地被訪問。
□盡可能不要存儲(chǔ)數(shù)據(jù)到硬盤上,除非計(jì)算數(shù)據(jù)集的函數(shù),計(jì)算量特別大,或者它們過濾了大量的數(shù)據(jù)。否則,重新計(jì)算一個(gè)分區(qū)的速度與從硬盤中讀取的效率差不多。
□如果想擁有快速故障恢復(fù)能力,可使用復(fù)制存儲(chǔ)級(jí)別(例如,用Spark來響應(yīng)Web應(yīng)用的請(qǐng)求)。所有的存儲(chǔ)級(jí)別都有通過重新計(jì)算丟失數(shù)據(jù)恢復(fù)錯(cuò)誤的容錯(cuò)機(jī)制,但是復(fù)制存儲(chǔ)級(jí)別可以讓你在RDD上持續(xù)地運(yùn)行任務(wù),而不需要等待丟失的分區(qū)被重新計(jì)算。
□如果想要定義自己的存儲(chǔ)級(jí)別(如復(fù)制因子為3而不是2),可以使用StorageLevel單例對(duì)象的apply()方法。
4. 移除數(shù)據(jù)
RDD可以隨意在RAM中進(jìn)行緩存,因此它提供了更快速的數(shù)據(jù)訪問。目前,緩存的粒度為RDD級(jí)別,只能緩存全部的RDD。
Spark自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)上使用的緩存,在集群中沒有足夠的內(nèi)存時(shí),Spark會(huì)根據(jù)緩存情況確定一個(gè)LRU(Least Recently Used,最近最少使用算法)的數(shù)據(jù)分區(qū)進(jìn)行刪除。
如果想手動(dòng)刪除RDD,而不想等待它從緩存中消失,可以使用RDD的unpersist()方法移除數(shù)據(jù),unpersist()方法是立即生效的。