spark-RDD算子操作分類

RDD算子操作分類

測(cè)試用例說(shuō)明 前置方法

   /**
   * 提供初始化方法,完成輸出目錄的清理
   * 在每個(gè)Test方法之前先運(yùn)行
   */
  @Before
  def init(): Unit ={

    val fileSystem: FileSystem = FileSystem.get(new Configuration())

    val path = new Path("output")

    // 如果輸出目錄存在,就刪除
    if (fileSystem.exists(path)){
      fileSystem.delete(path,true)
    }

  }

后置方法

  /**
   *  每次測(cè)試完成后運(yùn)行
   */
  @After
  def stop(): Unit ={
    sc.stop()
  }

成員變量

val sc = new SparkContext(new SparkConf().setAppName("My app").setMaster("local[*]"))

1.taansformation(轉(zhuǎn)換)

它可以實(shí)現(xiàn)把一個(gè)rdd轉(zhuǎn)換成一個(gè)新的rdd,它是延遲加載,不會(huì)立即觸發(fā)任務(wù)的真正運(yùn)行。
比如 flatMap/map/reduceBykey

1.1map

/**
   * map 特點(diǎn):  分區(qū)之間是并行(真正并行取決于cores)運(yùn)算
   * *
   * 同一個(gè)分區(qū)內(nèi),一個(gè)元素執(zhí)行完所有的轉(zhuǎn)換操作后,才開(kāi)始下一個(gè)元素!
   */
  @Test
  def testMap(): Unit = {
    val list: List[Int] = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    val rdd2: RDD[Int] = rdd.map(x => {
      println(x + "執(zhí)行了第一次Map操作!")
      x
    })

    val rdd3: RDD[Int] = rdd2.map(x => {
      println(x+"執(zhí)行了第二次Map操作!")
      x
    })
    rdd3.saveAsTextFile("output")
  }
image.png

  /*
      map :  def map[U: ClassTag](f: T => U): RDD[U]

              對(duì)當(dāng)前RDD中的每個(gè)元素執(zhí)行map操作,返回一個(gè)新的元素,將元素放入新的MapPartitionsRDD中!

              特點(diǎn): ①map操作后,不會(huì)改變分區(qū)數(shù)
                     ②分區(qū)間的數(shù)據(jù)也不會(huì)發(fā)生交換
   */
  @Test
  def test2(): Unit = {

    val list = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    val rdd1: RDD[Int] = rdd.map(x => x + 1)

    rdd1.saveAsTextFile("output")

  }
image.png

image.png

1.2 mapPartitions

 /*
      mapPartitions :   將一個(gè)分區(qū)作為一個(gè)整體,調(diào)用一次map函數(shù),轉(zhuǎn)換后生成新的分區(qū)集合!
      def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]

        和map的區(qū)別: ①傳入的函數(shù)不同,map將一個(gè)元素轉(zhuǎn)為另一個(gè)元素, mapPartitions將一個(gè)集合變?yōu)榱硪粋€(gè)集合!
                     ② mapPartiion邏輯:  cleanedF(iter)    批處理
                        map邏輯:  iter.map(cleanF)         個(gè)體處理
                     ③map是全量處理:  RDD中有x個(gè)元素,返回的集合也有x個(gè)元素
                        mapPartition只要返回一個(gè)集合,進(jìn)行過(guò)濾或添加操作!
                     ④ 本質(zhì)是mapPartition是一個(gè)集合調(diào)用一次
                          在特殊場(chǎng)景,節(jié)省性能,例如將一個(gè)分區(qū)的數(shù)據(jù),寫入到數(shù)據(jù)庫(kù)中
                     ⑤ map是將一個(gè)元素的所有轉(zhuǎn)換操作運(yùn)行結(jié)束后,再繼續(xù)開(kāi)始下一個(gè)元素!
                       mapPartition: 多個(gè)分區(qū)并行開(kāi)始轉(zhuǎn)換操作,一個(gè)分區(qū)的所有數(shù)據(jù)全部運(yùn)行結(jié)束后,mapPartition才結(jié)束!
                            一旦某個(gè)分區(qū)中的元素沒(méi)有處理完,整個(gè)分區(qū)的數(shù)據(jù)都無(wú)法釋放!需要更大的內(nèi)存!



         spark是分布式運(yùn)算: 時(shí)刻分清Driver 和 Executor
                            Executor執(zhí)行的是Task(封裝了RDD的執(zhí)行邏輯)



   */
 @Test
  def testMapPartitions(): Unit = {

    // 封裝RDD操作的邏輯
    val list = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    // 將分區(qū)中的奇數(shù)提取出來(lái)
    val result: RDD[Int] = rdd.mapPartitions(x => {

      x.filter(elem => elem % 2 == 1).toIterator
    })

    result.saveAsTextFile("output")
  }
image.png

image.png

1.3 mapPartitionsWithIndex

 /*
      mapPartitionsWithIndex :  執(zhí)行邏輯  f(index, iter) : index是當(dāng)前分區(qū)的索引
                                                          iter是分區(qū)的迭代器

                                 將一個(gè)分區(qū)整體執(zhí)行一次map操作,可以使用分區(qū)的index!
   */
  @Test
  def test7(): Unit = {

    val list = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    val result: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => iter.map(elem => (index, elem)))

    result.saveAsTextFile("output")
  }
image.png

image.png

1.5 flatMap


  /*
        flatMap : 先map再扁平化。不會(huì)改變分區(qū)和分區(qū)邏輯!

        將List(List(1,2),3,List(4,5))進(jìn)行扁平化操作


   */
  @Test
  def test() : Unit ={

    val list = List(List(1, 2), 3, List(4, 5))

    val rdd: RDD[Any] = sc.makeRDD(list, 2)

    val result: RDD[Any] = rdd.flatMap {
      // 將單個(gè)Int,轉(zhuǎn)成集合
      case x: Int => List(x)
      case y: List[_] => y

    }

    result.saveAsTextFile("output")

  }
image.png

image.png

1.6 glom

  /*
      glom():  將一個(gè)分區(qū)的所有元素合并到一個(gè)Array中
   */
  @Test
  def test2() : Unit ={

    val list = List(1,2,3,4)

    val rdd: RDD[Any] = sc.makeRDD(list, 2)

    val result: RDD[Array[Any]] = rdd.glom()

    //  result.collect() => Array [ Array[Any],Array[Any] ]
    result.collect().foreach(x=>println(x.mkString(",")))

  }
image.png

2.action(動(dòng)作)

它會(huì)觸發(fā)任務(wù)的真正運(yùn)行
比如collect/saveAstextFile

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容