進(jìn)行Spark核心編程時,首先要做的第一件事,就是創(chuàng)建一個初始的RDD。該RDD中,通常就代表和包含了Spark應(yīng)用程序的輸入源數(shù)據(jù)。然后在創(chuàng)建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對該RDD進(jìn)行轉(zhuǎn)換,來獲取其他的RDD。
Spark Core提供了三種創(chuàng)建RDD的方式,包括:使用程序中的集合創(chuàng)建RDD;使用本地文件創(chuàng)建RDD;使用HDFS文件創(chuàng)建RDD。
1.并行化集合創(chuàng)建RDD
如果要通過并行化集合來創(chuàng)建RDD,需要針對程序中的集合,調(diào)用SparkContext的parallelize()方法。Spark會將集合中的數(shù)據(jù)拷貝到集群上去,形成一個分布式的數(shù)據(jù)集合,也就是一個RDD。相當(dāng)于是,集合中的部分?jǐn)?shù)據(jù)會到一個節(jié)點上,而另一部分?jǐn)?shù)據(jù)會到其他節(jié)點上。然后就可以用并行的方式來操作這個分布式數(shù)據(jù)集合,即RDD。
案例:1到10累加求和
val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
調(diào)用parallelize()時,有一個重要的參數(shù)可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition運行一個task來進(jìn)行處理。Spark官方的建議是,為集群中的每個CPU創(chuàng)建2~4個partition。Spark默認(rèn)會根據(jù)集群的情況來設(shè)置partition的數(shù)量。但是也可以在調(diào)用parallelize()方法時,傳入第二個參數(shù),來設(shè)置RDD的partition數(shù)量。比如parallelize(arr, 10)
2. 使用本地文件和HDFS創(chuàng)建RDD
Spark是支持使用任何Hadoop支持的存儲系統(tǒng)上的文件創(chuàng)建RDD的,比如說HDFS、Cassandra、HBase以及本地文件。通過調(diào)用SparkContext的textFile()方法,可以針對本地文件或HDFS文件創(chuàng)建RDD。
有幾個事項是需要注意的:
- 如果是針對本地文件的話,如果是在windows上本地測試,windows上有一份文件即可;如果是在spark- 集群上針對linux本地文件,那么需要將文件拷貝到所有worker節(jié)點上。
- Spark的textFile()方法支持針對目錄、壓縮文件以及通配符進(jìn)行RDD創(chuàng)建。
- Spark默認(rèn)會為hdfs文件的每一個block創(chuàng)建一個partition,但是也可以通過textFile()的第二個參數(shù)手動設(shè)置分區(qū)數(shù)量,只能比block數(shù)量多,不能比block數(shù)量少。
案例:文件字?jǐn)?shù)統(tǒng)計
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)
Spark的textFile()除了可以針對上述幾種普通的文件創(chuàng)建RDD之外,還有一些特列的方法來創(chuàng)建RDD:
- SparkContext.wholeTextFiles()方法,可以針對一個目錄中的大量小文件,返回<filename, fileContent>組成的pair,作為一個PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個元素就是文件中的一行文本。
- SparkContext.sequenceFileK, V方法,可以針對SequenceFile創(chuàng)建RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,比如IntWritable、Text等。
- SparkContext.hadoopRDD()方法,對于Hadoop的自定義輸入類型,可以創(chuàng)建RDD。該方法接收J(rèn)obConf、InputFormatClass、Key和Value的Class。
- SparkContext.objectFile()方法,可以針對之前調(diào)用RDD.saveAsObjectFile()創(chuàng)建的對象序列化的文件,反序列化文件中的數(shù)據(jù),并創(chuàng)建一個RDD。