1.RDD創(chuàng)建
Spark是以RDD概念為中心運(yùn)行的。RDD是一個(gè)容錯(cuò)的、可以被并行操作的元素集合。創(chuàng)建一個(gè)RDD有兩個(gè)方法:在你的驅(qū)動(dòng)程序中并行化一個(gè)已經(jīng)存在的集合;從外部存儲(chǔ)系統(tǒng)中引用一個(gè)數(shù)據(jù)集,這個(gè)存儲(chǔ)系統(tǒng)可以是一個(gè)共享文件系統(tǒng),比如HDFS、HBase或任意提供了Hadoop輸入格式的數(shù)據(jù)來(lái)源。
(1) RDD的創(chuàng)建—— 并行化集合
并行化集合是通過(guò)在驅(qū)動(dòng)程序中一個(gè)現(xiàn)有的迭代器或集合上調(diào)用SparkContext的parallelize方法建立的。為了創(chuàng)建一個(gè)能夠并行操作的分布數(shù)據(jù)集,集合中的元素都會(huì)被拷貝
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data) #建立了分布數(shù)據(jù)集,可以進(jìn)行一些并行的操作
并行化中可以自己設(shè)置數(shù)據(jù)集劃分成分片的數(shù)量(一般是spark集群自動(dòng)進(jìn)行設(shè)定的),比如sc.parallelize(data, 10)
(2)外部數(shù)據(jù)集
PySpark可以通過(guò)Hadoop支持的外部數(shù)據(jù)源(包括本地文件系統(tǒng)、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分布數(shù)據(jù)集。Spark支持文本文件、序列文件以及其他任何Hadoop輸入格式文件.
(1)通過(guò)文本文件創(chuàng)建RDD要使用SparkContext的textfile方法
from pyspark import SparkContext
if __name__ == "__main__":
sc = SparkContext(appName="zzz_KMeans")
#調(diào)用文件的url/本地文件路徑等
lines = sc.textFile("your_hdfs_path")
注意
- 包括textFile在內(nèi)的所有基于文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑作為參數(shù)
- textFile方法也可以傳入第二個(gè)可選參數(shù)來(lái)控制文件的分片數(shù)量。默認(rèn)情況下,Spark會(huì)為文件的每一個(gè)塊(在HDFS中塊的大小默認(rèn)是64MB)創(chuàng)建一個(gè)分片。但是你也可以通過(guò)傳入一個(gè)更大的值來(lái)要求Spark建立更多的分片。注意,分片的數(shù)量絕不能小于文件塊的數(shù)量。
(3)其他
除了文本文件之外,pyspark還支持一些其他的數(shù)據(jù)格式
- SparkContext.wholeTextFiles能夠讀入包含多個(gè)小文本文件的目錄,然后為每一個(gè)文件返回一個(gè)(文件名,內(nèi)容)對(duì)。這是與textFile方法為每一個(gè)文本行返回一條記錄相對(duì)應(yīng)的。
- RDD.saveAsPickleFile和SparkContext.pickleFile支持將RDD以串行化的Python對(duì)象格式存儲(chǔ)起來(lái)。串行化的過(guò)程中會(huì)以默認(rèn)10個(gè)一批的數(shù)量批量處理。
- 序列文件和其他Hadoop輸入輸出格式。
數(shù)據(jù)庫(kù)
2.RDD基本操作
RDD的操作,整體上分為兩類: 轉(zhuǎn)化操作和啟動(dòng)操作。
轉(zhuǎn)化操作
- 都是
惰性求值的,就是說(shuō)它們并不會(huì)立刻真的計(jì)算出結(jié)果。相反,它們僅僅是記錄下了轉(zhuǎn)換操作的操作對(duì)象(比如:一個(gè)文件)。只有當(dāng)一個(gè)啟動(dòng)操作被執(zhí)行,要向驅(qū)動(dòng)程序返回結(jié)果時(shí),轉(zhuǎn)化操作才會(huì)真的開始計(jì)算 - 每一個(gè)由轉(zhuǎn)化操作得到的RDD都會(huì)在每次執(zhí)行啟動(dòng)操作時(shí)重新計(jì)算生成。也可以調(diào)用persist或者cache方法將RDD永久化到內(nèi)存中。
啟動(dòng)操作
常見(jiàn)的轉(zhuǎn)化操作:
| 轉(zhuǎn)化操作 | 作用 |
|---|---|
| map(func) | 返回一個(gè)新的分布數(shù)據(jù)集,由原數(shù)據(jù)集元素經(jīng)func處理后的結(jié)果組成 |
| filter(func) | 返回一個(gè)新的數(shù)據(jù)集,由傳給func返回True的原數(shù)據(jù)集元素組成 |
| flatMap(func) | 與map類似,但是每個(gè)傳入元素可能有0或多個(gè)返回值,func可以返回一個(gè)序列而不是一個(gè)值 |
| mapParitions(func) | 類似map,但是RDD的每個(gè)分片都會(huì)分開獨(dú)立運(yùn)行,所以func的參數(shù)和返回值必須都是迭代器 |
| mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個(gè)參數(shù),第一個(gè)是分片的序號(hào),第二個(gè)是迭代器。返回值還是迭代器 |
| sample(withReplacement, fraction, seed) | 使用提供的隨機(jī)數(shù)種子取樣,然后替換或不替換 |
| union(otherDataset) | 返回新的數(shù)據(jù)集,包括原數(shù)據(jù)集和參數(shù)數(shù)據(jù)集的所有元素 |
| intersection(otherDataset) | 返回新數(shù)據(jù)集,是兩個(gè)集的交集 |
| distinct([numTasks]) | 返回新的集,包括原集中的不重復(fù)元素 |
| groupByKey([numTasks]) | 當(dāng)用于鍵值對(duì)RDD時(shí)返回(鍵,值迭代器)對(duì)的數(shù)據(jù)集 |
| aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用于鍵值對(duì)RDD時(shí)返回(K,U)對(duì)集,對(duì)每一個(gè)Key的value進(jìn)行聚集計(jì)算sortByKey([ascending], [numTasks])用于鍵值對(duì)RDD時(shí)會(huì)返回RDD按鍵的順序排序,升降序由第一個(gè)參數(shù)決定 |
| join(otherDataset, [numTasks]) | 用于鍵值對(duì)(K, V)和(K, W)RDD時(shí)返回(K, (V, W))對(duì)RDD |
| cogroup(otherDataset, [numTasks]) | 用于兩個(gè)鍵值對(duì)RDD時(shí)返回(K, (V迭代器, W迭代器))RDD |
| cartesian(otherDataset) | 用于T和U類型RDD時(shí)返回(T, U)對(duì)類型鍵值對(duì)RDD |
| pipe(command, [envVars]) | 通過(guò)shell命令管道處理每個(gè)RDD分片 |
| coalesce(numPartitions) | 把RDD的分片數(shù)量降低到參數(shù)大小 |
| repartition(numPartitions) | 重新打亂RDD中元素順序并重新分片,數(shù)量由參數(shù)決定 |
| repartitionAndSortWithinPartitions(partitioner) | 按照參數(shù)給定的分片器重新分片,同時(shí)每個(gè)分片內(nèi)部按照鍵排序 |
常見(jiàn)的啟動(dòng)操作:
| 啟動(dòng)操作 | 作用 |
|---|---|
| reduce(func) | 使用func進(jìn)行聚集計(jì)算,func的參數(shù)是兩個(gè),返回值一個(gè),兩次func運(yùn)行應(yīng)當(dāng)是完全解耦的,這樣才能正確地并行運(yùn)算 |
| collect() | 向驅(qū)動(dòng)程序返回?cái)?shù)據(jù)集的元素組成的數(shù)組 |
| count() | 返回?cái)?shù)據(jù)集元素的數(shù)量 |
| first() | 返回?cái)?shù)據(jù)集的第一個(gè)元素 |
| take(n) | 返回前n個(gè)元素組成的數(shù)組 |
| takeSample(withReplacement, num, [seed]) | 返回一個(gè)由原數(shù)據(jù)集中任意num個(gè)元素的suzuki,并且替換之 |
| takeOrder(n, [ordering]) | 返回排序后的前n個(gè)元素 |
| saveAsTextFile(path) | 將數(shù)據(jù)集的元素寫成文本文件 |
| saveAsSequenceFile(path) | 將數(shù)據(jù)集的元素寫成序列文件,這個(gè)API只能用于Java和Scala程序 |
| saveAsObjectFile(path) | 將數(shù)據(jù)集的元素使用Java的序列化特性寫到文件中,這個(gè)API只能用于Java和Scala程序 |
| countByCount() | 只能用于鍵值對(duì)RDD,返回一個(gè)(K, int) hashmap,返回每個(gè)key的出現(xiàn)次數(shù) |
| foreach(func) | 對(duì)數(shù)據(jù)集的每個(gè)元素執(zhí)行func, 通常用于完成一些帶有副作用的函數(shù),比如更新累加器(見(jiàn)下文)或與外部存儲(chǔ)交互等 |
RDD持久化
主要用的兩個(gè)方法persist和cache
Spark的一個(gè)重要功能就是在將數(shù)據(jù)集持久化(或緩存)到內(nèi)存中以便在多個(gè)操作中重復(fù)使用。當(dāng)我們持久化一個(gè)RDD是,每一個(gè)節(jié)點(diǎn)將這個(gè)RDD的每一個(gè)分片計(jì)算并保存到內(nèi)存中以便在下次對(duì)這個(gè)數(shù)據(jù)集(或者這個(gè)數(shù)據(jù)集衍生的數(shù)據(jù)集)的計(jì)算中可以復(fù)用。這使得接下來(lái)的計(jì)算過(guò)程速度能夠加快(經(jīng)常能加快超過(guò)十倍的速度).
每一個(gè)持久化的RDD都有一個(gè)可變的存儲(chǔ)級(jí)別,這個(gè)級(jí)別使得用戶可以改變RDD持久化的儲(chǔ)存位置.
Spark會(huì)自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)的緩存使用同時(shí)使用LRU算法丟棄舊數(shù)據(jù)分片。如果你想手動(dòng)刪除某個(gè)RDD而不是等待它被自動(dòng)刪除,調(diào)用
RDD.unpersist()方法。
共享變量
- 廣播變量
- 累加器
3.RDD分區(qū)
有時(shí)候需要重新設(shè)置Rdd的分區(qū)數(shù)量:
- 比如Rdd的分區(qū)中,Rdd分區(qū)比較多,但是每個(gè)Rdd的數(shù)據(jù)量比較小,需要設(shè)置一個(gè)比較合理的分區(qū)。或者需要把Rdd的分區(qū)數(shù)量調(diào)大。
- 還有就是通過(guò)設(shè)置一個(gè)Rdd的分區(qū)來(lái)達(dá)到設(shè)置生成的文件的數(shù)量。
有兩種方法是可以重設(shè)Rdd的分區(qū):分別是 coalesce()方法和repartition()
spark中的數(shù)據(jù)是分布式的
4.數(shù)據(jù)輸出/保存
rdd.saveAsTextFile()
【參考資料】