1.spark基礎(chǔ)-RDD

1.RDD創(chuàng)建

Spark是以RDD概念為中心運(yùn)行的。RDD是一個(gè)容錯(cuò)的、可以被并行操作的元素集合。創(chuàng)建一個(gè)RDD有兩個(gè)方法:在你的驅(qū)動(dòng)程序中并行化一個(gè)已經(jīng)存在的集合;從外部存儲(chǔ)系統(tǒng)中引用一個(gè)數(shù)據(jù)集,這個(gè)存儲(chǔ)系統(tǒng)可以是一個(gè)共享文件系統(tǒng),比如HDFS、HBase或任意提供了Hadoop輸入格式的數(shù)據(jù)來(lái)源。

(1) RDD的創(chuàng)建—— 并行化集合
并行化集合是通過(guò)在驅(qū)動(dòng)程序中一個(gè)現(xiàn)有的迭代器或集合上調(diào)用SparkContext的parallelize方法建立的。為了創(chuàng)建一個(gè)能夠并行操作的分布數(shù)據(jù)集,集合中的元素都會(huì)被拷貝

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)    #建立了分布數(shù)據(jù)集,可以進(jìn)行一些并行的操作

并行化中可以自己設(shè)置數(shù)據(jù)集劃分成分片的數(shù)量(一般是spark集群自動(dòng)進(jìn)行設(shè)定的),比如sc.parallelize(data, 10)

(2)外部數(shù)據(jù)集
PySpark可以通過(guò)Hadoop支持的外部數(shù)據(jù)源(包括本地文件系統(tǒng)、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分布數(shù)據(jù)集。Spark支持文本文件、序列文件以及其他任何Hadoop輸入格式文件.

(1)通過(guò)文本文件創(chuàng)建RDD要使用SparkContext的textfile方法

from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext(appName="zzz_KMeans")
    #調(diào)用文件的url/本地文件路徑等
    lines = sc.textFile("your_hdfs_path")

注意

  • 包括textFile在內(nèi)的所有基于文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑作為參數(shù)
  • textFile方法也可以傳入第二個(gè)可選參數(shù)來(lái)控制文件的分片數(shù)量。默認(rèn)情況下,Spark會(huì)為文件的每一個(gè)塊(在HDFS中塊的大小默認(rèn)是64MB)創(chuàng)建一個(gè)分片。但是你也可以通過(guò)傳入一個(gè)更大的值來(lái)要求Spark建立更多的分片。注意,分片的數(shù)量絕不能小于文件塊的數(shù)量。

(3)其他

除了文本文件之外,pyspark還支持一些其他的數(shù)據(jù)格式

  • SparkContext.wholeTextFiles能夠讀入包含多個(gè)小文本文件的目錄,然后為每一個(gè)文件返回一個(gè)(文件名,內(nèi)容)對(duì)。這是與textFile方法為每一個(gè)文本行返回一條記錄相對(duì)應(yīng)的。
  • RDD.saveAsPickleFile和SparkContext.pickleFile支持將RDD以串行化的Python對(duì)象格式存儲(chǔ)起來(lái)。串行化的過(guò)程中會(huì)以默認(rèn)10個(gè)一批的數(shù)量批量處理。
  • 序列文件和其他Hadoop輸入輸出格式。

數(shù)據(jù)庫(kù)

2.RDD基本操作

RDD的操作,整體上分為兩類: 轉(zhuǎn)化操作和啟動(dòng)操作。

轉(zhuǎn)化操作

  • 都是惰性求值的,就是說(shuō)它們并不會(huì)立刻真的計(jì)算出結(jié)果。相反,它們僅僅是記錄下了轉(zhuǎn)換操作的操作對(duì)象(比如:一個(gè)文件)。只有當(dāng)一個(gè)啟動(dòng)操作被執(zhí)行,要向驅(qū)動(dòng)程序返回結(jié)果時(shí),轉(zhuǎn)化操作才會(huì)真的開始計(jì)算
  • 每一個(gè)由轉(zhuǎn)化操作得到的RDD都會(huì)在每次執(zhí)行啟動(dòng)操作時(shí)重新計(jì)算生成。也可以調(diào)用persist或者cache方法將RDD永久化到內(nèi)存中。

啟動(dòng)操作

常見(jiàn)的轉(zhuǎn)化操作:

轉(zhuǎn)化操作 作用
map(func) 返回一個(gè)新的分布數(shù)據(jù)集,由原數(shù)據(jù)集元素經(jīng)func處理后的結(jié)果組成
filter(func) 返回一個(gè)新的數(shù)據(jù)集,由傳給func返回True的原數(shù)據(jù)集元素組成
flatMap(func) 與map類似,但是每個(gè)傳入元素可能有0或多個(gè)返回值,func可以返回一個(gè)序列而不是一個(gè)值
mapParitions(func) 類似map,但是RDD的每個(gè)分片都會(huì)分開獨(dú)立運(yùn)行,所以func的參數(shù)和返回值必須都是迭代器
mapParitionsWithIndex(func) 類似mapParitions,但是func有兩個(gè)參數(shù),第一個(gè)是分片的序號(hào),第二個(gè)是迭代器。返回值還是迭代器
sample(withReplacement, fraction, seed) 使用提供的隨機(jī)數(shù)種子取樣,然后替換或不替換
union(otherDataset) 返回新的數(shù)據(jù)集,包括原數(shù)據(jù)集和參數(shù)數(shù)據(jù)集的所有元素
intersection(otherDataset) 返回新數(shù)據(jù)集,是兩個(gè)集的交集
distinct([numTasks]) 返回新的集,包括原集中的不重復(fù)元素
groupByKey([numTasks]) 當(dāng)用于鍵值對(duì)RDD時(shí)返回(鍵,值迭代器)對(duì)的數(shù)據(jù)集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 用于鍵值對(duì)RDD時(shí)返回(K,U)對(duì)集,對(duì)每一個(gè)Key的value進(jìn)行聚集計(jì)算sortByKey([ascending], [numTasks])用于鍵值對(duì)RDD時(shí)會(huì)返回RDD按鍵的順序排序,升降序由第一個(gè)參數(shù)決定
join(otherDataset, [numTasks]) 用于鍵值對(duì)(K, V)和(K, W)RDD時(shí)返回(K, (V, W))對(duì)RDD
cogroup(otherDataset, [numTasks]) 用于兩個(gè)鍵值對(duì)RDD時(shí)返回(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) 用于T和U類型RDD時(shí)返回(T, U)對(duì)類型鍵值對(duì)RDD
pipe(command, [envVars]) 通過(guò)shell命令管道處理每個(gè)RDD分片
coalesce(numPartitions) 把RDD的分片數(shù)量降低到參數(shù)大小
repartition(numPartitions) 重新打亂RDD中元素順序并重新分片,數(shù)量由參數(shù)決定
repartitionAndSortWithinPartitions(partitioner) 按照參數(shù)給定的分片器重新分片,同時(shí)每個(gè)分片內(nèi)部按照鍵排序

常見(jiàn)的啟動(dòng)操作:

啟動(dòng)操作 作用
reduce(func) 使用func進(jìn)行聚集計(jì)算,func的參數(shù)是兩個(gè),返回值一個(gè),兩次func運(yùn)行應(yīng)當(dāng)是完全解耦的,這樣才能正確地并行運(yùn)算
collect() 向驅(qū)動(dòng)程序返回?cái)?shù)據(jù)集的元素組成的數(shù)組
count() 返回?cái)?shù)據(jù)集元素的數(shù)量
first() 返回?cái)?shù)據(jù)集的第一個(gè)元素
take(n) 返回前n個(gè)元素組成的數(shù)組
takeSample(withReplacement, num, [seed]) 返回一個(gè)由原數(shù)據(jù)集中任意num個(gè)元素的suzuki,并且替換之
takeOrder(n, [ordering]) 返回排序后的前n個(gè)元素
saveAsTextFile(path) 將數(shù)據(jù)集的元素寫成文本文件
saveAsSequenceFile(path) 將數(shù)據(jù)集的元素寫成序列文件,這個(gè)API只能用于Java和Scala程序
saveAsObjectFile(path) 將數(shù)據(jù)集的元素使用Java的序列化特性寫到文件中,這個(gè)API只能用于Java和Scala程序
countByCount() 只能用于鍵值對(duì)RDD,返回一個(gè)(K, int) hashmap,返回每個(gè)key的出現(xiàn)次數(shù)
foreach(func) 對(duì)數(shù)據(jù)集的每個(gè)元素執(zhí)行func, 通常用于完成一些帶有副作用的函數(shù),比如更新累加器(見(jiàn)下文)或與外部存儲(chǔ)交互等

RDD持久化

主要用的兩個(gè)方法persistcache

  • Spark的一個(gè)重要功能就是在將數(shù)據(jù)集持久化(或緩存)到內(nèi)存中以便在多個(gè)操作中重復(fù)使用。當(dāng)我們持久化一個(gè)RDD是,每一個(gè)節(jié)點(diǎn)將這個(gè)RDD的每一個(gè)分片計(jì)算并保存到內(nèi)存中以便在下次對(duì)這個(gè)數(shù)據(jù)集(或者這個(gè)數(shù)據(jù)集衍生的數(shù)據(jù)集)的計(jì)算中可以復(fù)用。這使得接下來(lái)的計(jì)算過(guò)程速度能夠加快(經(jīng)常能加快超過(guò)十倍的速度).

  • 每一個(gè)持久化的RDD都有一個(gè)可變的存儲(chǔ)級(jí)別,這個(gè)級(jí)別使得用戶可以改變RDD持久化的儲(chǔ)存位置.

  • Spark會(huì)自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)的緩存使用同時(shí)使用LRU算法丟棄舊數(shù)據(jù)分片。如果你想手動(dòng)刪除某個(gè)RDD而不是等待它被自動(dòng)刪除,調(diào)用RDD.unpersist()方法。

共享變量

  • 廣播變量
  • 累加器

3.RDD分區(qū)

有時(shí)候需要重新設(shè)置Rdd的分區(qū)數(shù)量:

  • 比如Rdd的分區(qū)中,Rdd分區(qū)比較多,但是每個(gè)Rdd的數(shù)據(jù)量比較小,需要設(shè)置一個(gè)比較合理的分區(qū)。或者需要把Rdd的分區(qū)數(shù)量調(diào)大。
  • 還有就是通過(guò)設(shè)置一個(gè)Rdd的分區(qū)來(lái)達(dá)到設(shè)置生成的文件的數(shù)量。

有兩種方法是可以重設(shè)Rdd的分區(qū):分別是 coalesce()方法和repartition()

spark中的數(shù)據(jù)是分布式的

4.數(shù)據(jù)輸出/保存

rdd.saveAsTextFile()

【參考資料】

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容