什么是RDD?
RDD是Spark中的數(shù)據(jù)抽象,全稱彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets)。RDD可以理解為將一個大的數(shù)據(jù)集合以分布式的形式保存在集群服務(wù)器的內(nèi)存中。RDD是一個容錯的、并行的數(shù)據(jù)結(jié)構(gòu),可以讓用戶顯式地將數(shù)據(jù)存儲到磁盤和內(nèi)存中,并能控制數(shù)據(jù)的分區(qū)。
RDD是Spark的核心,也是整個Spark的架構(gòu)基礎(chǔ)。
RDD的特點(diǎn):
-
不可變性
RDD是一種不可變的數(shù)據(jù)結(jié)構(gòu)。一旦創(chuàng)建,就不能修改。 -
分片
RDD表示的是一組數(shù)據(jù)的分區(qū)。這些分區(qū)分布在集群的多個節(jié)點(diǎn)上。RDD中存儲了分區(qū)和數(shù)據(jù)物理分區(qū)之間關(guān)系的映射。 -
容錯性
RDD可以自動處理節(jié)點(diǎn)出現(xiàn)故障的情況。當(dāng)某個節(jié)點(diǎn)出現(xiàn)故障時,該節(jié)點(diǎn)上存儲的數(shù)據(jù)將無法被訪問。Spark會在其他節(jié)點(diǎn)上重建丟失的RDD分區(qū)數(shù)據(jù)。RDD中存儲了血統(tǒng)信息,通過血統(tǒng)信息,Spark可以恢復(fù)RDD的部分信息,當(dāng)節(jié)點(diǎn)出現(xiàn)故障的時候,可以基于血統(tǒng)恢復(fù)整個RDD。 - 內(nèi)存計算
RDD類提供了一套支持內(nèi)存計算的API。RDD可以在內(nèi)存中緩存或長期駐留,從而提升了對RDD操作的效率。
RDD的5個主要屬性:
//只計算一次
protected def getPartitions: Array[Partition]
//對一個分片進(jìn)行計算,得出一個可遍歷的結(jié)果
def compute(split: Partition, context: TaskContext): Iterator[T]
//只計算一次,計算RDD對父RDD的依賴
protected def getDependencies: Seq[Dependency[_]] = deps
//可選的,分區(qū)的方法,針對第4點(diǎn),類似于mapreduce當(dāng)中的Paritioner接口,控制key分到哪個reduce
@transient val partitioner: Option[Partitioner] = None
//可選的,指定優(yōu)先位置,輸入?yún)?shù)是split分片,輸出結(jié)果是一組優(yōu)先的節(jié)點(diǎn)位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- 一組分片(Partition)。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。
-
一個計算每個分區(qū)的函數(shù)。Spark中的RDD的計算是以分片為單位的,每個RDD都會實(shí)現(xiàn)
compute函數(shù)來完成計算任務(wù)。 - RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于劉水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),這樣就不用對RDD的所有分區(qū)進(jìn)行重新計算。源RDD沒有依賴,通過依賴關(guān)系描述血統(tǒng)(lineage)。
-
一個Partitioner(RDD的分片函數(shù))。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的
HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于key-value的RDD,才會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函數(shù)決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。 - 一個存儲每個Partition的優(yōu)先位置的列表。對于一個HDFS文件來說,這個列表保存的就是每個partition所在的塊的位置。Spark在進(jìn)行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置(移動數(shù)據(jù)不如移動計算)。
RDD的創(chuàng)建
可以通過兩種方式創(chuàng)建RDD:
- 由并行化集合創(chuàng)建
- 由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng)、Hadoop文件系統(tǒng)HDFS或從Hadoop接口API創(chuàng)建
// 由并行化集合創(chuàng)建
val list = (1 t0 100).toList
val rdd = sc.parallelize(list)
// 由外部存儲系統(tǒng)創(chuàng)建
val rdd = sc.textFile("/path/data")
val rdd = sc.wholeTextFiles("/path/data")
RDD的操作
RDD轉(zhuǎn)換操作
轉(zhuǎn)換操作指的是在原RDD實(shí)例上進(jìn)行計算,然后創(chuàng)建一個新的RDD實(shí)例。
RDD中的所有的轉(zhuǎn)換操作都是惰性的,在執(zhí)行RDD的轉(zhuǎn)換操作的時候,并不會直接計算結(jié)果,而是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集上的轉(zhuǎn)換動作,只有行動操作時,這些轉(zhuǎn)換才會真正的去執(zhí)行。這樣設(shè)計的好處是更加有效率的運(yùn)行。
惰性求值的好處:
Spark使用惰性求值,這樣就可以把一些操作合并到一起來減少計算數(shù)據(jù)的步驟。
RDD行動操作
行動操作指的是向驅(qū)動器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng)的操作。
Spark在調(diào)用RDD的行動操作的時候,會觸發(fā)Spark中的連鎖反應(yīng)。當(dāng)調(diào)用的行動操作的時候,Spark會嘗試創(chuàng)建作為調(diào)用者的RDD。如果這個RDD是從文件中創(chuàng)建的,那么Spark會在worker節(jié)點(diǎn)上讀取文件至內(nèi)存中。如果這個RDD是通過其他RDD的轉(zhuǎn)換得到的,Spark會嘗試創(chuàng)建其父RDD。這個過程會一直持續(xù)下去,直到Spark找到根RDD。然后Spark就會真正執(zhí)行這些生成RDD所必須的轉(zhuǎn)換計算。最后完成行動操作,將結(jié)果返回給驅(qū)動程序或者寫入外部存儲。
RDD的緩存
Spark速度非??斓脑蛑?,就是在不同操作中在內(nèi)存中持久化一個數(shù)據(jù)集。當(dāng)持久化一個RDD后,每一個節(jié)點(diǎn)都將把計算的分片結(jié)果保存在內(nèi)存中,并在對此數(shù)據(jù)集進(jìn)行的其他動作中重用。這使得后續(xù)的動作變得更加迅速。緩存是Spark構(gòu)建迭代算法和快速交互式查詢的關(guān)鍵。所以我們在開發(fā)過程中,對經(jīng)常使用的RDD要進(jìn)行緩存操作,以提升程序運(yùn)行效率。
RDD緩存的方法
RDD類提供了兩種緩存方法:
rdd.cache()
rdd.persist()
cache方法其實(shí)是將RDD存儲在集群中Worker的內(nèi)存中。
persist是一個通用的cache方法。它可以將RDD存儲在內(nèi)存中或硬盤上或者二者皆有。
| 存儲級別 | 描述 |
|---|---|
| MEMORY_ONLY | 默認(rèn)選項(xiàng),RDD的(分區(qū))數(shù)據(jù)直接以Java對象的形式存儲于JVM的內(nèi)存中,如果內(nèi)存空間不足,某些分區(qū)的數(shù)據(jù)將不會被緩存,需要在使用的時候重新計算。 |
| MEMORY_AND_DISK | RDD的數(shù)據(jù)直接以Java對象的形式存儲于JVM的內(nèi)存中,如果內(nèi)存空間不中,某些分區(qū)的數(shù)據(jù)會被存儲至磁盤,使用的時候從磁盤讀取。 |
| MEMORY_ONLY_SER (Java and Scala) | RDD的數(shù)據(jù)(Java對象)序列化之后存儲于JVM的內(nèi)存中(一個分區(qū)的數(shù)據(jù)為內(nèi)存中的一個字節(jié)數(shù)組),相比于MEMORY_ONLY能夠有效節(jié)約內(nèi)存空間(特別是使用一個快速序列化工具的情況下),但讀取數(shù)據(jù)時需要更多的CPU開銷;如果內(nèi)存空間不足,處理方式與MEMORY_ONLY相同。 |
| MEMORY_AND_DISK_SER (Java and Scala) | 相比于MEMORY_ONLY_SER,在內(nèi)存空間不足的情況下,將序列化之后的數(shù)據(jù)存儲于磁盤。 DISK_ONLY 僅僅使用磁盤存儲RDD的數(shù)據(jù)(未經(jīng)序列化)。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 以MEMORY_ONLY_2為例,MEMORY_ONLY_2相比于MEMORY_ONLY存儲數(shù)據(jù)的方式是相同的,不同的是會將數(shù)據(jù)備份到集群中兩個不同的節(jié)點(diǎn),其余情況類似。 |
| OFF_HEAP (experimental) | 與MEMORY_ONLY_SER類似,但是存儲在非堆的內(nèi)存中,需要開啟非堆內(nèi)存。 |
緩存的容錯
緩存是有可能丟失(如機(jī)器宕機(jī)),或者存儲于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除。RDD的緩存的容錯機(jī)制保證了即使緩存丟失也能保證計算的正確執(zhí)行。通過基于RDD的一系列的轉(zhuǎn)換,丟失的數(shù)據(jù)會被重新計算。因?yàn)镽DD的各個Partition是相對獨(dú)立的,所以在重新計算的時候只需要計算丟失部分Partition即可,不需要重新計算全部的Partition。因此,在一個緩存RDD的節(jié)點(diǎn)出現(xiàn)故障的時候,Spark會在另外的節(jié)點(diǎn)上自動重新創(chuàng)建出現(xiàn)故障的節(jié)點(diǎn)中存儲的分區(qū)。
RDD的檢查點(diǎn)
RDD的緩存能夠在第一次計算完成后,將計算結(jié)果保存到內(nèi)存、本地文件系統(tǒng)或者Tachyon中。通過緩存,Spark避免了RDD上的重復(fù)計算,能夠極大地提升計算速度。但是,如果緩存丟失了,則需要重新計算。如果計算特別復(fù)雜或者計算特別耗時,那么緩存丟失對于整個Job的影響是不容忽視的。為了避免緩存丟失重新計算帶來的開銷,所以Spark引入了檢查點(diǎn)(checkpoint)機(jī)制。
緩存是在計算結(jié)束后,直接將計算結(jié)果通過用戶定義的存儲級別寫入不同的介質(zhì)。而檢查點(diǎn)不同,它是在計算完成后,重新建立一個Job來計算。所以為了避免重復(fù)計算,推薦先將RDD緩存,這樣在進(jìn)行檢查點(diǎn)操作時就可以快速完成。
RDD依賴
Spark會根據(jù)用戶提交的計算邏輯中的RDD的轉(zhuǎn)換和動作來生動RDD之間的依賴關(guān)系,同時這個計算鏈也就生成了邏輯上的DAG。
RDD之間的依賴關(guān)系包括:
- 一個RDD是從哪些RDD轉(zhuǎn)換而來,即RDD的parent RDD(s)是什么
- 依賴于parent RDD(s)的哪些分區(qū)
Spark中的依賴關(guān)系主要體現(xiàn)為兩種形式:
- 窄依賴:是指父RDD的每一個分區(qū)最多被一個子RDD的分區(qū)使用。
- 寬依賴:是指子RDD的每個分區(qū)都依賴于所有父RDD的所有所有分區(qū)或多個分區(qū)。
- 窄依賴允許在單個集群節(jié)點(diǎn)上流水線式執(zhí)行,這個節(jié)點(diǎn)可以計算所有父級分區(qū)。
- 在窄依賴中,節(jié)點(diǎn)失敗后的恢復(fù)更加高效。因?yàn)橹挥衼G失的父級分區(qū)需要重新計算,并且這些丟失的父級分區(qū)可以并行地在不同節(jié)點(diǎn)上重新計算。但是在寬依賴的繼承關(guān)系中,單個失敗的節(jié)點(diǎn)可能導(dǎo)致一個RDD的所有先祖RDD中的一些分區(qū)丟失,導(dǎo)致計算的重新執(zhí)行。