RDD(Resilient Distributed Dataset),全稱彈性分布式數(shù)據(jù)集,是Spark對(duì)數(shù)據(jù)進(jìn)行的核心抽象概念。我們可以將RDD理解為一個(gè)不可變的分布式對(duì)象集合,他可以包含Python、Java、Scala 中任意類型的對(duì)象,甚至是用戶自定義的對(duì)象。Spark中的所有操作都是在RDD進(jìn)行的,包括創(chuàng)建RDD,轉(zhuǎn)化RDD跟調(diào)用RDD。
RDD創(chuàng)建
Spark有兩種方法創(chuàng)建RDD:讀取一個(gè)外部數(shù)據(jù)集,或在從程序中讀取一個(gè)對(duì)象集合(比如list 和set)。
# 讀取一個(gè)外部數(shù)據(jù)集
lines = sc.textFile("E:\Documents\Desktop\s.txt")
# 讀取一個(gè)對(duì)象集合
lines = sc.parallelize([1, 2, 3, 4])
RDD操作
RDD 支持兩種類型的操作: 轉(zhuǎn)化操作(transformation) 和行動(dòng)操作(action)。轉(zhuǎn)化操作會(huì)由一個(gè)RDD 生成一個(gè)新的RDD。行動(dòng)操作是對(duì)的RDD 內(nèi)容進(jìn)行操作,它們會(huì)把最終求得的結(jié)果返回到驅(qū)動(dòng)器程序,或者寫入外部存儲(chǔ)系統(tǒng)中。由于行動(dòng)操作需要生成實(shí)際的輸出,它們會(huì)強(qiáng)制執(zhí)行那些求值必須用到的RDD 的轉(zhuǎn)化操作。
RDD的轉(zhuǎn)化操作與行動(dòng)操作不同,是惰性求值的,也就是在被調(diào)用行動(dòng)操作之前Spark 不會(huì)開始計(jì)算。同樣創(chuàng)建操作也是一樣,數(shù)據(jù)并沒(méi)有被立刻讀取到內(nèi)存中,只是記錄了讀取操作需要的相關(guān)信息。我理解為這與tensorflowde的網(wǎng)絡(luò)構(gòu)建類似,我們之前編寫的代碼只是記錄了整個(gè)操作過(guò)程的計(jì)算流程圖,只有當(dāng)計(jì)算操作被激活時(shí),數(shù)據(jù)才會(huì)沿著之前定義的計(jì)算圖進(jìn)行計(jì)算。
轉(zhuǎn)化
許多轉(zhuǎn)化操作都是針對(duì)元素的,也就是說(shuō)這些轉(zhuǎn)化操作每次只會(huì)操作RDD 中的一個(gè)元素。RDD的轉(zhuǎn)化操作函數(shù)接受一個(gè)函數(shù),可以是匿名函數(shù)也可以是自定義函數(shù),然后返回對(duì)每個(gè)元素進(jìn)行操作后的RDD。
filter()
# 返回一個(gè)由通過(guò)傳給filter()的函數(shù)的元素組成的RDD
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = rdd.filter(lambda x: x > 1)
>>> rdd2.collect()
[2, 3, 4]
map()
# 將函數(shù)應(yīng)用于RDD 中的每個(gè)元素,將返回值構(gòu)成新的RDD
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = rdd.map(lambda x: x + 1)
>>> rdd2.collect()
[2, 3, 4, 5]
flatMap()
# 將函數(shù)應(yīng)用于RDD 中的每個(gè)元素,將返回的迭代器的所有內(nèi)容構(gòu)成新的RDD。通常用來(lái)切分單詞。
>>> rdd = sc.parallelize(['ad cd', 'ef gh', 'hi jk'])
>>> rdd2 = rdd.flatMap(lambda x: x.split(' '))
>>> rdd2.collect()
['ad', 'cd', 'ef', 'gh', 'hi', 'jk']
distinct()
# 去重
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd2 = rdd2.distinct()
>>> rdd2.collect()
[4, 5, 2, 3]
union()
# 生成一個(gè)包含兩個(gè)RDD 中所有元素的RDD
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd3 = rdd.union(rdd2)
>>> rdd3.collect()
[1, 2, 3, 4, 2, 2, 3, 4, 5, 5]
intersection()
# 求兩個(gè)RDD 共同的元素的RDD
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd3 = rdd.intersection(rdd2)
>>> rdd3.collect()
[2, 3, 4]
subtract()
# 移除一個(gè)RDD 中的內(nèi)容
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd3 = rdd.subtract(rdd2)
>>> rdd3.collect()
[1]
cartesian()
# 與另一個(gè)RDD 的笛卡爾積
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd2 = sc.parallelize([2, 3, 4, 5])
>>> rdd3 = rdd.cartesian(rdd2)
>>> rdd3.collect()
[(1, 2), (1, 3), (1, 4), (1, 5), (2, 2), (2, 3), (2, 4), (2, 5), (3, 2), (3, 3), (3, 4), (3, 5), (4, 2), (4, 3), (4, 4), (
4, 5)]
行動(dòng)
行動(dòng)操作對(duì)RDD進(jìn)行計(jì)算,并把結(jié)果返回到驅(qū)動(dòng)器程序中。
collect()
# 返回RDD 中的所有元素
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.collect()
[1, 2, 3, 4]
count()
# RDD 中的元素個(gè)數(shù)
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.count()
4
countByValue()
# 各元素在RDD 中出現(xiàn)的次
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd.countByValue()
defaultdict(<class 'int'>, {2: 2, 3: 1, 4: 1, 5: 2})
>>>
take()
# 從RDD 中返回num 個(gè)元素
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd.take(2)
[2, 2]
top()
從RDD 中返回最前面的num個(gè)元素
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd.top(3)
[5, 5, 4]
takeOrdered()
# 從RDD 中按照默認(rèn)(升序)或指定排序規(guī)則,返回前num個(gè)元素。
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd.takeOrdered(3)
[2, 2, 3]
reduce()
# 并行整合RDD 中所有數(shù)據(jù)
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd.reduce(lambda x, y: x + y)
21
fold()
# 和reduce() 一樣, 但是需要提供初始值
>>> rdd = sc.parallelize([2, 2, 3, 4, 5, 5])
>>> rdd.fold(0, lambda x, y: x + y)
21
aggregate()
# 和reduce() 相似, 但是通常返回不同類型的函數(shù)
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4)
foreach()
# 對(duì)RDD 中的每個(gè)元素使用給定的函數(shù),但是不把任何結(jié)果返回到驅(qū)動(dòng)器程序中。
持久化
Spark RDD 是惰性求值的,而有時(shí)我們希望能多次使用同一個(gè)RDD。如果簡(jiǎn)單地對(duì)RDD 調(diào)用行動(dòng)操作,Spark 每次都會(huì)重算RDD 以及它的所有依賴。這在迭代算法中消耗格外大,因?yàn)榈惴ǔ3?huì)多次使用同一組數(shù)據(jù)。
# 提供了不同類型的持久化參數(shù),可以傳遞到persist()中
pyspark.StorageLevel
# 持久化
rdd.persist
Tips
1.使用RDD.distinct() 轉(zhuǎn)化操作來(lái)生成一個(gè)只包含不同元素的新RDD時(shí)需要注意,distinct() 操作的開銷很大,因?yàn)樗枰獙⑺袛?shù)據(jù)通過(guò)網(wǎng)絡(luò)進(jìn)行混洗(shuffle),以確保每個(gè)元素都只有一份。此外subtract()等其他集合運(yùn)rdd算中,所有使用了去重的函數(shù)都會(huì)進(jìn)行混洗。
2.Python 會(huì)在把函數(shù)所在的對(duì)象也序列化傳出去。當(dāng)傳遞的對(duì)象是某個(gè)對(duì)象的成員,或者包含了對(duì)某個(gè)對(duì)象中一個(gè)字段的引用時(shí)(例如self.field),Spark 就會(huì)把整個(gè)對(duì)象發(fā)到工作節(jié)點(diǎn)上。