spark—RDD

1.什么是RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變、可分區(qū)、里面的元素可并行計算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動容錯、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個查詢時顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。

2.RDD屬性
(1)一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度。用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

(2)一個計算每個分區(qū)的函數(shù)。Spark中RDD的計算是以分片為單位的,每個RDD都會實(shí)現(xiàn)compute函數(shù)以達(dá)到這個目的。compute函數(shù)會對迭代器進(jìn)行復(fù)合,不需要保存每次計算的結(jié)果。

(3)RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計算。

(4)一個Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時的分片數(shù)量。

(5)一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時候,會盡可能地將計算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲位置。

3.RDD操作

  • 轉(zhuǎn)換操作(Transformation)
    返回一個新的RDD操作。

1.map(func)
返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
2.filter(func)
返回一個新的RDD,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成
3.flatMap(func)
類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應(yīng)該返回一個序列,而不是單一元素)
4.mapPartitions(func)
類似于map,但獨(dú)立地在RDD的每一個分片上運(yùn)行,因此在類型為T的RDD上運(yùn)行時,func的函數(shù)類型必須是Iterator[T] => Iterator[U]
5.mapPartitionsWithIndex(func)
類似于mapPartitions,但func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運(yùn)行時,func的函數(shù)類型必須是
(Int, Interator[T]) => Iterator[U]
6.sample(withReplacement, fraction, seed)
根據(jù)fraction指定的比例對數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子
7.union(otherDataset)
對源RDD和參數(shù)RDD求并集后返回一個新的RDD
8.intersection(otherDataset)
對源RDD和參數(shù)RDD求交集后返回一個新的RDD
9.distinct([numTasks]))
對源RDD進(jìn)行去重后返回一個新的RDD
10.groupByKey([numTasks])
在一個(K,V)的RDD上調(diào)用,返回一個(K, Iterator[V])的RDD
11.reduceByKey(func, [numTasks])
在一個(K,V)的RDD上調(diào)用,返回一個(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類似,reduce任務(wù)的個數(shù)可以通過第二個可選的參數(shù)來設(shè)置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
先按分區(qū)聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進(jìn)行操作
12.sortByKey([ascending], [numTasks])
在一個(K,V)的RDD上調(diào)用,K必須實(shí)現(xiàn)Ordered接口,返回一個按照key進(jìn)行排序的(K,V)的RDD
13.sortBy(func,[ascending], [numTasks])
與sortByKey類似,但是更靈活 第一個參數(shù)是根據(jù)什么排序 第二個是怎么排序 false倒序 第三個排序后分區(qū)數(shù) 默認(rèn)與原RDD一樣
14.join(otherDataset, [numTasks])
在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個相同key對應(yīng)的所有元素對在一起的(K,(V,W))的RDD 相當(dāng)于內(nèi)連接(求交集)
15.cogroup(otherDataset, [numTasks])
在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
16.cartesian(otherDataset)
兩個RDD的笛卡爾積 的成很多個K/V
17.pipe(command, [envVars])
調(diào)用外部程序
18.coalesce(numPartitions)
重新分區(qū) 第一個參數(shù)是要分多少區(qū),第二個參數(shù)是否shuffle 默認(rèn)false 少分區(qū)變多分區(qū) true 多分區(qū)變少分區(qū) false
19.repartition(numPartitions)
重新分區(qū) 必須shuffle 參數(shù)是要分多少區(qū) 少變多
20.repartitionAndSortWithinPartitions(partitioner)
重新分區(qū)+排序 比先分區(qū)再排序效率高 對K/V的RDD進(jìn)行操作
21.foldByKey(zeroValue)(seqOp)
該函數(shù)用于K/V做折疊,合并處理 ,與aggregate類似 第一個括號的參數(shù)應(yīng)用于每個V值 第二括號函數(shù)是聚合例如:+
22.combineByKey
合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
23.partitionBy(partitioner)
對RDD進(jìn)行分區(qū) partitioner是分區(qū)器 例如new HashPartition(2
cache
RDD緩存,可以避免重復(fù)計算從而減少時間,區(qū)別:cache內(nèi)部調(diào)用了persist算子,cache默認(rèn)就一個緩存級別MEMORY-ONLY ,而persist則可以選擇緩存級別
persist
24.Subtract(rdd)
返回前rdd元素不在后rdd的rdd
25.leftOuterJoin
leftOuterJoin類似于SQL中的左外關(guān)聯(lián)left outer join,返回結(jié)果以前面的RDD為主,關(guān)聯(lián)不上的記錄為空。只能用于兩個RDD之間的關(guān)聯(lián),如果要多個RDD關(guān)聯(lián),多關(guān)聯(lián)幾次即可。
26.rightOuterJoin
rightOuterJoin類似于SQL中的有外關(guān)聯(lián)right outer join,返回結(jié)果以參數(shù)中的RDD為主,關(guān)聯(lián)不上的記錄為空。只能用于兩個RDD之間的關(guān)聯(lián),如果要多個RDD關(guān)聯(lián),多關(guān)聯(lián)幾次即可
27.subtractByKey
substractByKey和基本轉(zhuǎn)換操作中的subtract類似只不過這里是針對K的,返回在主RDD中出現(xiàn),并且不在otherRDD中出現(xiàn)的元素

  • 行動操作(Action)
    向驅(qū)動程序返回結(jié)果,或把結(jié)果寫入外部系統(tǒng)。

1.reduce(func)
通過func函數(shù)聚集RDD中的所有元素,這個功能必須是課交換且可并聯(lián)的
2.collect()
在驅(qū)動程序中,以數(shù)組的形式返回數(shù)據(jù)集的所有元素
3.count()
返回RDD的元素個數(shù)
4.first()
返回RDD的第一個元素(類似于take(1))
5.take(n)
返回一個由數(shù)據(jù)集的前n個元素組成的數(shù)組
6.takeSample(withReplacement,num, [seed])
返回一個數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子
7.takeOrdered(n, [ordering])
8.saveAsTextFile(path)
將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對于每個元素,Spark將會調(diào)用toString方法,將它裝換為文件中的文本
9.saveAsSequenceFile(path)
將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
10.saveAsObjectFile(path)
11.countByKey()
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應(yīng)的元素個數(shù)。
12.foreach(func)
在數(shù)據(jù)集的每一個元素上,運(yùn)行函數(shù)func進(jìn)行更新。
13.aggregate
先對分區(qū)進(jìn)行操作,在總體操作
14.reduceByKeyLocally
15.lookup
16.top
17.fold

4.一個spark實(shí)例

    public static void main(String[] args) {
        // 使用local模式,不需要啟動spark集群
        SparkConf sparkConf = new SparkConf().setAppName("wordCount ").setMaster("local[2]");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> file = ctx.textFile("分析的文件路徑", 6);
        file.persist(StorageLevel.MEMORY_ONLY());
        file.cache();
        Comparator<Tuple2<String, Integer>> orderCompare = new TupleComparator();
        List<Tuple2<String, Integer>> wordToCounts = file
                // 轉(zhuǎn)換操作,將每行以空格分割
                .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                // 轉(zhuǎn)換操作
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
                // 轉(zhuǎn)換操作
                .reduceByKey((s1, s2) -> s1 + s2)// 將相同的key進(jìn)行reduce,并將value相加
                // 行動操作
                .takeOrdered(50, orderCompare);
        wordToCounts.forEach(line -> System.out.println(line._1() + ":" + line._2()));
    }

5.RDD依賴關(guān)系
由于RDD是粗粒度的操作數(shù)據(jù)集,每個Transformation操作都會生成一個新的RDD,所以RDD之間就會形成類似流水線的前后依賴關(guān)系;RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。如圖所示顯示了RDD之間的依賴關(guān)系。

image

從圖中可知:

窄依賴:是指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操作都會產(chǎn)生窄依賴;(獨(dú)生子女)

寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都會產(chǎn)生寬依賴;(超生)

需要特別說明的是對join操作有兩種情況:

(1)圖中左半部分join:如果兩個RDD在進(jìn)行join操作時,一個RDD的partition僅僅和另一個RDD中已知個數(shù)的Partition進(jìn)行join,那么這種類型的join操作就是窄依賴,例如圖1中左半部分的join操作(join with inputs co-partitioned);

(2)圖中右半部分join:其它情況的join操作就是寬依賴,例如圖1中右半部分的join操作(join with inputs not co-partitioned),由于是需要父RDD的所有partition進(jìn)行join的轉(zhuǎn)換,這就涉及到了shuffle,因此這種類型的join操作也是寬依賴。

為什么需要依賴關(guān)系
(1) 窄依賴(narrow dependencies)可以支持在同一個集群Executor上,以pipeline管道形式順序執(zhí)行多條命令,例如在執(zhí)行了map后,緊接著執(zhí)行filter。分區(qū)內(nèi)的計算收斂,不需要依賴所有分區(qū)的數(shù)據(jù),可以并行地在不同節(jié)點(diǎn)進(jìn)行計算。所以它的失敗恢復(fù)也更有效,因?yàn)樗恍枰匦掠嬎銇G失的parent partition即可

(2)寬依賴(shuffle dependencies) 則需要所有的父分區(qū)都是可用的,必須等RDD的parent partition數(shù)據(jù)全部ready之后才能開始計算,可能還需要調(diào)用類似MapReduce之類的操作進(jìn)行跨節(jié)點(diǎn)傳遞。從失敗恢復(fù)的角度看,shuffle dependencies 牽涉RDD各級的多個parent partition。

5.RDD分區(qū)
我們分析這樣一個應(yīng)用,它在內(nèi)存中保存著一張很大的用戶信息表(UserData)——也就是一個由(UserId, UserInfo)對組成的RDD,其中UserInfo包含一個該用戶所訂閱的主題列表。該應(yīng)用會周期性性地將這張表與一個小文件進(jìn)行組合,這個小文件中存著過去五分鐘內(nèi)發(fā)生的事件(events)——其實(shí)就是一個由(UserID, LinkInfo)對組成的表。如果我們要進(jìn)行對用戶訪問情況的統(tǒng)計,就需要對這兩個表進(jìn)行join操作,以獲得(UserID,UserInfo,LinkInfo)信息。

如圖默認(rèn)情況下,join操作會將兩個數(shù)據(jù)集中的所有的鍵的哈希值都求出來,將哈希值相同的記錄傳送到同一臺機(jī)器上,之后在該機(jī)器上對所有鍵相同的記錄進(jìn)行join操作。


image.png

所以這種情況之下,每次進(jìn)行join都會有數(shù)據(jù)混洗的問題。造成了很大的網(wǎng)絡(luò)傳輸開銷。

這種情況之下由于UserData表比events表要大得多,所以選擇將UserData進(jìn)行分區(qū)。如果對UserData進(jìn)行分區(qū),之后Spark就會知曉該RDD是根據(jù)鍵的哈希值來分區(qū)的,這樣在調(diào)用join()時,Spark就會利用這一點(diǎn)。當(dāng)調(diào)用UserData.join(events)時,Spark只會對events進(jìn)行數(shù)據(jù)混洗操作,將events中特定的UserID的記錄發(fā)送到userData的對應(yīng)分區(qū)所在的那臺機(jī)器上。如下圖:


image.png

自定義分區(qū)
我們都知道Spark內(nèi)部提供了HashPartitioner和RangePartitioner兩種分區(qū)策略,這兩種分區(qū)策略在很多情況下都適合我們的場景。但是有些情況下,Spark內(nèi)部不能符合咱們的需求,這時候我們就可以自定義分區(qū)策略。為此,Spark提供了相應(yīng)的接口,我們只需要擴(kuò)展Partitioner抽象類,然后實(shí)現(xiàn)里面的三個方法:

abstract class Partitioner extends Serializable {
  int numPartitions()
  int getPartition()
  boolean equals(String s)
}

ps:HashPartitioner和RangePartitioner
https://www.cnblogs.com/liuming1992/p/6377540.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容