spark從入門到精通4:之創(chuàng)建RDD

進(jìn)行Spark核心編程時,首先要做的第一件事,就是創(chuàng)建一個初始的RDD。該RDD中,通常就代表和包含了Spark應(yīng)用程序的輸入源數(shù)據(jù)。然后在創(chuàng)建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對該RDD進(jìn)行轉(zhuǎn)換,來獲取其他的RDD。
Spark Core提供了三種創(chuàng)建RDD的方式,包括:使用程序中的集合創(chuàng)建RDD;使用本地文件創(chuàng)建RDD;使用HDFS文件創(chuàng)建RDD。

1.并行化集合創(chuàng)建RDD

如果要通過并行化集合來創(chuàng)建RDD,需要針對程序中的集合,調(diào)用SparkContext的parallelize()方法。Spark會將集合中的數(shù)據(jù)拷貝到集群上去,形成一個分布式的數(shù)據(jù)集合,也就是一個RDD。相當(dāng)于是,集合中的部分?jǐn)?shù)據(jù)會到一個節(jié)點上,而另一部分?jǐn)?shù)據(jù)會到其他節(jié)點上。然后就可以用并行的方式來操作這個分布式數(shù)據(jù)集合,即RDD。

案例:1到10累加求和

val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)

調(diào)用parallelize()時,有一個重要的參數(shù)可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition運行一個task來進(jìn)行處理。Spark官方的建議是,為集群中的每個CPU創(chuàng)建2~4個partition。Spark默認(rèn)會根據(jù)集群的情況來設(shè)置partition的數(shù)量。但是也可以在調(diào)用parallelize()方法時,傳入第二個參數(shù),來設(shè)置RDD的partition數(shù)量。比如parallelize(arr, 10)

2. 使用本地文件和HDFS創(chuàng)建RDD

Spark是支持使用任何Hadoop支持的存儲系統(tǒng)上的文件創(chuàng)建RDD的,比如說HDFS、Cassandra、HBase以及本地文件。通過調(diào)用SparkContext的textFile()方法,可以針對本地文件或HDFS文件創(chuàng)建RDD。

有幾個事項是需要注意的:

  1. 如果是針對本地文件的話,如果是在windows上本地測試,windows上有一份文件即可;如果是在spark- 集群上針對linux本地文件,那么需要將文件拷貝到所有worker節(jié)點上。
  2. Spark的textFile()方法支持針對目錄、壓縮文件以及通配符進(jìn)行RDD創(chuàng)建。
  3. Spark默認(rèn)會為hdfs文件的每一個block創(chuàng)建一個partition,但是也可以通過textFile()的第二個參數(shù)手動設(shè)置分區(qū)數(shù)量,只能比block數(shù)量多,不能比block數(shù)量少。

案例:文件字?jǐn)?shù)統(tǒng)計

val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)

Spark的textFile()除了可以針對上述幾種普通的文件創(chuàng)建RDD之外,還有一些特列的方法來創(chuàng)建RDD:

  1. SparkContext.wholeTextFiles()方法,可以針對一個目錄中的大量小文件,返回<filename, fileContent>組成的pair,作為一個PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個元素就是文件中的一行文本。
  2. SparkContext.sequenceFileK, V方法,可以針對SequenceFile創(chuàng)建RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,比如IntWritable、Text等。
  3. SparkContext.hadoopRDD()方法,對于Hadoop的自定義輸入類型,可以創(chuàng)建RDD。該方法接收J(rèn)obConf、InputFormatClass、Key和Value的Class。
  4. SparkContext.objectFile()方法,可以針對之前調(diào)用RDD.saveAsObjectFile()創(chuàng)建的對象序列化的文件,反序列化文件中的數(shù)據(jù),并創(chuàng)建一個RDD。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容