創(chuàng)建RDD
進(jìn)行Spark核心編程時(shí),首先要做的第一件事,就是創(chuàng)建一個(gè)初始的RDD。該RDD中,通常就代表和包含了Spark應(yīng)用程序的輸入源數(shù)據(jù)。然后在創(chuàng)建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對(duì)該RDD進(jìn)行轉(zhuǎn)換,來獲取其他的RDD。
Spark Core提供了三種創(chuàng)建RDD的方式,包括:使用程序中的集合創(chuàng)建RDD;使用本地文件創(chuàng)建RDD;使用HDFS文件創(chuàng)建RDD。
- 使用程序中的集合創(chuàng)建RDD,主要用于進(jìn)行測(cè)試,可以在實(shí)際部署到集群運(yùn)行之前,自己使用集合構(gòu)造測(cè)試數(shù)據(jù),來測(cè)試后面的spark應(yīng)用的流程。
- 使用本地文件創(chuàng)建RDD,主要用于臨時(shí)性地處理一些存儲(chǔ)了大量數(shù)據(jù)的文件。
- 使用HDFS文件創(chuàng)建RDD,應(yīng)該是最常用的生產(chǎn)環(huán)境處理方式,主要可以針對(duì)HDFS上存儲(chǔ)的大數(shù)據(jù),進(jìn)行離線批處理操作。
并行化集合創(chuàng)建RDD
如果要通過并行化集合來創(chuàng)建RDD,需要針對(duì)程序中的集合,調(diào)用SparkContext的parallelize()方法。Spark會(huì)將集合中的數(shù)據(jù)拷貝到集群上去,形成一個(gè)分布式的數(shù)據(jù)集合,也就是一個(gè)RDD。相當(dāng)于是,集合中的部分?jǐn)?shù)據(jù)會(huì)到一個(gè)節(jié)點(diǎn)上,而另一部分?jǐn)?shù)據(jù)會(huì)到其他節(jié)點(diǎn)上。然后就可以用并行的方式來操作這個(gè)分布式數(shù)據(jù)集合,即RDD。
調(diào)用parallelize()時(shí),有一個(gè)重要的參數(shù)可以指定,就是要將集合切分成多少個(gè)partition。Spark會(huì)為每一個(gè)partition運(yùn)行一個(gè)task來進(jìn)行處理。Spark官方的建議是,為集群中的每個(gè)CPU創(chuàng)建2~4個(gè)partition。Spark默認(rèn)會(huì)根據(jù)集群的情況來設(shè)置partition的數(shù)量。但是也可以在調(diào)用parallelize()方法時(shí),傳入第二個(gè)參數(shù),來設(shè)置RDD的partition數(shù)量。比如parallelize(arr, 10)
案例:1到10累加求和
使用Java實(shí)現(xiàn)
public class ParallelizeCollection {
public static void main(String[] args) {
// 創(chuàng)建SparkConf
SparkConf sparkConf = new SparkConf().setAppName("ParallelizeCollectionJava").setMaster("local");
// 創(chuàng)建JavaSparkContext
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
// 要通過并行化集合的方式創(chuàng)建RDD,那么就調(diào)用SparkContext以及其子類,的parallelize()方法
List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 執(zhí)行reduce算子操作
// 相當(dāng)于,先進(jìn)行1 + 2 = 3;然后再用3 + 3 = 6;然后再用6 + 4 = 10。。。以此類推
JavaRDD<Integer> numsRDD = javaSparkContext.parallelize(nums);
Integer sum = numsRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
// 輸出累加的和
System.out.println("sum = " + sum);
// 關(guān)閉JavaSparkContext
javaSparkContext.close();
}
}
使用Scala實(shí)現(xiàn)
object ParallelizeCollection {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ParallelizeCollectionScala").setMaster("local")
val sparkContext = new SparkContext(sparkConf)
val numsRDD = sparkContext.parallelize(1 to 10)
val sum = numsRDD.reduce(_ + _)
println("sum = " + sum)
}
}
使用本地文件和HDFS創(chuàng)建RDD
Spark是支持使用任何Hadoop支持的存儲(chǔ)系統(tǒng)上的文件創(chuàng)建RDD的,比如說HDFS、Cassandra、HBase以及本地文件。通過調(diào)用SparkContext的textFile()方法,可以針對(duì)本地文件或HDFS文件創(chuàng)建RDD。
有幾個(gè)事項(xiàng)是需要注意的:
- 如果是針對(duì)本地文件的話,如果是在windows上本地測(cè)試,windows上有一份文件即可;如果是在spark集群上針對(duì)linux本地文件,那么需要將文件拷貝到所有worker節(jié)點(diǎn)上。
- Spark的textFile()方法支持針對(duì)目錄、壓縮文件以及通配符進(jìn)行RDD創(chuàng)建。
- Spark默認(rèn)會(huì)為hdfs文件的每一個(gè)block創(chuàng)建一個(gè)partition,但是也可以通過textFile()的第二個(gè)參數(shù)手動(dòng)設(shè)置分區(qū)數(shù)量,只能比block數(shù)量多,不能比block數(shù)量少。
案例:文件字?jǐn)?shù)統(tǒng)計(jì),本地文件
java版本
/**
* 使用本地文件創(chuàng)建RDD
* 案例,統(tǒng)計(jì)文本文件字?jǐn)?shù)
*/
public class LocalFile {
public static void main(String[] args) {
// 創(chuàng)建SparkConf
SparkConf sparkConf = new SparkConf().setAppName("LocalFileJava").setMaster("local");
// 創(chuàng)建JavaSparkContext
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
// 調(diào)用SparkContext以及其子類的textFile()方法,針對(duì)本地文件創(chuàng)建RDD
JavaRDD<String> linesRDD = javaSparkContext.textFile("E:\\testdata\\wordcount\\input\\1.txt");
// 統(tǒng)計(jì)文本文件內(nèi)的字?jǐn)?shù)
JavaRDD<Integer> wordsRDD = linesRDD.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
}
});
Integer sum = wordsRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println("sum = " + sum);
javaSparkContext.close();
}
}
scala版本
object LocalFile {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("LocalFileScala").setMaster("local")
val context = new SparkContext(conf)
val words = context.textFile("E:\\testdata\\wordcount\\input\\1.txt")
val sum = words.map(word => word.length).reduce(_ + _)
println("sum = " + sum)
}
}
案例:文件字?jǐn)?shù)統(tǒng)計(jì),HDFS文件
java版本
/**
* 使用Hdfs文件創(chuàng)建RDD
* 案例,統(tǒng)計(jì)文本文件字?jǐn)?shù)
*/
public class HdfsFile {
public static void main(String[] args) {
// 創(chuàng)建SparkConf
// 修改 去除setMaster()設(shè)置,修改setAppName()
SparkConf sparkConf = new SparkConf().setAppName("HdfsFileJava");
// 創(chuàng)建JavaSparkContext
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
// 調(diào)用SparkContext以及其子類的textFile()方法,針對(duì)Hdfs文件創(chuàng)建RDD
JavaRDD<String> linesRDD = javaSparkContext.textFile("hdfs://hadoop-100:9000/testdate/1.txt");
// 統(tǒng)計(jì)文本文件內(nèi)的字?jǐn)?shù)
JavaRDD<Integer> wordsRDD = linesRDD.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
}
});
Integer sum = wordsRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println("sum = " + sum);
javaSparkContext.close();
}
}
scala版本
object HdfsFile {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HdfsFileScala")
val context = new SparkContext(conf)
val words = context.textFile("hdfs://hadoop-100:9000/testdate/1.txt")
val sum = words.map(word => word.length).reduce(_ + _)
println("sum = " + sum)
}
}
使用本地文件和HDFS創(chuàng)建RDD
Spark的textFile()除了可以針對(duì)上述幾種普通的文件創(chuàng)建RDD之外,還有一些特列的方法來創(chuàng)建RDD:
- SparkContext.wholeTextFiles()方法,可以針對(duì)一個(gè)目錄中的大量小文件,返回<filename, fileContent>組成的pair,作為一個(gè)PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個(gè)元素就是文件中的一行文本。
- SparkContext.sequenceFileK, V方法,可以針對(duì)SequenceFile創(chuàng)建RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,比如IntWritable、Text等。
- SparkContext.hadoopRDD()方法,對(duì)于Hadoop的自定義輸入類型,可以創(chuàng)建RDD。該方法接收J(rèn)obConf、InputFormatClass、Key和Value的Class。
- SparkContext.objectFile()方法,可以針對(duì)之前調(diào)用RDD.saveAsObjectFile()創(chuàng)建的對(duì)象序列化的文件,反序列化文件中的數(shù)據(jù),并創(chuàng)建一個(gè)RDD。