21、Spark核心編程之創(chuàng)建RDD(集合、本地文件、HDFS文件)

創(chuàng)建RDD

進(jìn)行Spark核心編程時(shí),首先要做的第一件事,就是創(chuàng)建一個(gè)初始的RDD。該RDD中,通常就代表和包含了Spark應(yīng)用程序的輸入源數(shù)據(jù)。然后在創(chuàng)建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對(duì)該RDD進(jìn)行轉(zhuǎn)換,來獲取其他的RDD。
Spark Core提供了三種創(chuàng)建RDD的方式,包括:使用程序中的集合創(chuàng)建RDD;使用本地文件創(chuàng)建RDD;使用HDFS文件創(chuàng)建RDD。

  1. 使用程序中的集合創(chuàng)建RDD,主要用于進(jìn)行測(cè)試,可以在實(shí)際部署到集群運(yùn)行之前,自己使用集合構(gòu)造測(cè)試數(shù)據(jù),來測(cè)試后面的spark應(yīng)用的流程。
  2. 使用本地文件創(chuàng)建RDD,主要用于臨時(shí)性地處理一些存儲(chǔ)了大量數(shù)據(jù)的文件。
  3. 使用HDFS文件創(chuàng)建RDD,應(yīng)該是最常用的生產(chǎn)環(huán)境處理方式,主要可以針對(duì)HDFS上存儲(chǔ)的大數(shù)據(jù),進(jìn)行離線批處理操作。

并行化集合創(chuàng)建RDD

如果要通過并行化集合來創(chuàng)建RDD,需要針對(duì)程序中的集合,調(diào)用SparkContext的parallelize()方法。Spark會(huì)將集合中的數(shù)據(jù)拷貝到集群上去,形成一個(gè)分布式的數(shù)據(jù)集合,也就是一個(gè)RDD。相當(dāng)于是,集合中的部分?jǐn)?shù)據(jù)會(huì)到一個(gè)節(jié)點(diǎn)上,而另一部分?jǐn)?shù)據(jù)會(huì)到其他節(jié)點(diǎn)上。然后就可以用并行的方式來操作這個(gè)分布式數(shù)據(jù)集合,即RDD。
調(diào)用parallelize()時(shí),有一個(gè)重要的參數(shù)可以指定,就是要將集合切分成多少個(gè)partition。Spark會(huì)為每一個(gè)partition運(yùn)行一個(gè)task來進(jìn)行處理。Spark官方的建議是,為集群中的每個(gè)CPU創(chuàng)建2~4個(gè)partition。Spark默認(rèn)會(huì)根據(jù)集群的情況來設(shè)置partition的數(shù)量。但是也可以在調(diào)用parallelize()方法時(shí),傳入第二個(gè)參數(shù),來設(shè)置RDD的partition數(shù)量。比如parallelize(arr, 10)
案例:1到10累加求和
使用Java實(shí)現(xiàn)

public class ParallelizeCollection {

    public static void main(String[] args) {
        // 創(chuàng)建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("ParallelizeCollectionJava").setMaster("local");
        // 創(chuàng)建JavaSparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        // 要通過并行化集合的方式創(chuàng)建RDD,那么就調(diào)用SparkContext以及其子類,的parallelize()方法
        List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 執(zhí)行reduce算子操作
        // 相當(dāng)于,先進(jìn)行1 + 2 = 3;然后再用3 + 3 = 6;然后再用6 + 4 = 10。。。以此類推
        JavaRDD<Integer> numsRDD = javaSparkContext.parallelize(nums);
        Integer sum = numsRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        // 輸出累加的和
        System.out.println("sum = " + sum);
        // 關(guān)閉JavaSparkContext
        javaSparkContext.close();

    }
}

使用Scala實(shí)現(xiàn)

object ParallelizeCollection {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ParallelizeCollectionScala").setMaster("local")
    val sparkContext = new SparkContext(sparkConf)
    val numsRDD = sparkContext.parallelize(1 to 10)
    val sum = numsRDD.reduce(_ + _)
    println("sum = " + sum)
  }
}

使用本地文件和HDFS創(chuàng)建RDD

Spark是支持使用任何Hadoop支持的存儲(chǔ)系統(tǒng)上的文件創(chuàng)建RDD的,比如說HDFS、Cassandra、HBase以及本地文件。通過調(diào)用SparkContext的textFile()方法,可以針對(duì)本地文件或HDFS文件創(chuàng)建RDD。
有幾個(gè)事項(xiàng)是需要注意的:

  1. 如果是針對(duì)本地文件的話,如果是在windows上本地測(cè)試,windows上有一份文件即可;如果是在spark集群上針對(duì)linux本地文件,那么需要將文件拷貝到所有worker節(jié)點(diǎn)上。
  2. Spark的textFile()方法支持針對(duì)目錄、壓縮文件以及通配符進(jìn)行RDD創(chuàng)建。
  3. Spark默認(rèn)會(huì)為hdfs文件的每一個(gè)block創(chuàng)建一個(gè)partition,但是也可以通過textFile()的第二個(gè)參數(shù)手動(dòng)設(shè)置分區(qū)數(shù)量,只能比block數(shù)量多,不能比block數(shù)量少。
    案例:文件字?jǐn)?shù)統(tǒng)計(jì),本地文件
    java版本
/**
 * 使用本地文件創(chuàng)建RDD
 * 案例,統(tǒng)計(jì)文本文件字?jǐn)?shù)
 */
public class LocalFile {
    public static void main(String[] args) {
        // 創(chuàng)建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("LocalFileJava").setMaster("local");
        // 創(chuàng)建JavaSparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        // 調(diào)用SparkContext以及其子類的textFile()方法,針對(duì)本地文件創(chuàng)建RDD
        JavaRDD<String> linesRDD = javaSparkContext.textFile("E:\\testdata\\wordcount\\input\\1.txt");
        // 統(tǒng)計(jì)文本文件內(nèi)的字?jǐn)?shù)
        JavaRDD<Integer> wordsRDD = linesRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return s.length();
            }
        });
        Integer sum = wordsRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        System.out.println("sum = " + sum);
        javaSparkContext.close();
    }
}

scala版本

object LocalFile {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("LocalFileScala").setMaster("local")
    val context = new SparkContext(conf)
    val words = context.textFile("E:\\testdata\\wordcount\\input\\1.txt")
    val sum = words.map(word => word.length).reduce(_ + _)
    println("sum = " + sum)
  }
}

案例:文件字?jǐn)?shù)統(tǒng)計(jì),HDFS文件
java版本

/**
 * 使用Hdfs文件創(chuàng)建RDD
 * 案例,統(tǒng)計(jì)文本文件字?jǐn)?shù)
 */
public class HdfsFile {
    public static void main(String[] args) {
        // 創(chuàng)建SparkConf
        // 修改 去除setMaster()設(shè)置,修改setAppName()
        SparkConf sparkConf = new SparkConf().setAppName("HdfsFileJava");
        // 創(chuàng)建JavaSparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        // 調(diào)用SparkContext以及其子類的textFile()方法,針對(duì)Hdfs文件創(chuàng)建RDD
        JavaRDD<String> linesRDD = javaSparkContext.textFile("hdfs://hadoop-100:9000/testdate/1.txt");
        // 統(tǒng)計(jì)文本文件內(nèi)的字?jǐn)?shù)
        JavaRDD<Integer> wordsRDD = linesRDD.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return s.length();
            }
        });
        Integer sum = wordsRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });

        System.out.println("sum = " + sum);
        javaSparkContext.close();
    }
}

scala版本

object HdfsFile {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HdfsFileScala")
    val context = new SparkContext(conf)
    val words = context.textFile("hdfs://hadoop-100:9000/testdate/1.txt")
    val sum = words.map(word => word.length).reduce(_ + _)
    println("sum = " + sum)
  }
}

使用本地文件和HDFS創(chuàng)建RDD

Spark的textFile()除了可以針對(duì)上述幾種普通的文件創(chuàng)建RDD之外,還有一些特列的方法來創(chuàng)建RDD:

  1. SparkContext.wholeTextFiles()方法,可以針對(duì)一個(gè)目錄中的大量小文件,返回<filename, fileContent>組成的pair,作為一個(gè)PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個(gè)元素就是文件中的一行文本。
  2. SparkContext.sequenceFileK, V方法,可以針對(duì)SequenceFile創(chuàng)建RDD,K和V泛型類型就是SequenceFile的key和value的類型。K和V要求必須是Hadoop的序列化類型,比如IntWritable、Text等。
  3. SparkContext.hadoopRDD()方法,對(duì)于Hadoop的自定義輸入類型,可以創(chuàng)建RDD。該方法接收J(rèn)obConf、InputFormatClass、Key和Value的Class。
  4. SparkContext.objectFile()方法,可以針對(duì)之前調(diào)用RDD.saveAsObjectFile()創(chuàng)建的對(duì)象序列化的文件,反序列化文件中的數(shù)據(jù),并創(chuàng)建一個(gè)RDD。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容