spark算子1:repartitionAndSortWithinPartitions

repartitionAndSortWithinPartitions算是一個(gè)高效的算子,是因?yàn)樗仁褂胷epartition And sortByKey 效率高,這是由于它的排序是在shuffle過程中進(jìn)行,一邊shuffle,一邊排序;具體見spark shuffle的讀操作;
關(guān)于為什么比repartition And sortByKey 效率高,首先簡要分析repartition 和sortbykey'的流程:

(1)rePartition
(2)sortByKey

repartitionAndSortWithinPartitions的使用

(1)使用repartitionAndSortWithinPartitions時(shí),需要自己傳入一個(gè)分區(qū)器參數(shù),這個(gè)分區(qū)器 可以是系統(tǒng)提供的,也可以是自定義的:例如以下Demo中使用的KeyBasePartitioner,同時(shí)需要自定義一個(gè)排序的隱式變量,當(dāng)我們使用repartitionAndSortWithinPartitions時(shí),我們自定義的my_self_Ordering 排序規(guī)則就會傳入到def implicitly[T](implicit e: T) = e
(2)二次排序
排序規(guī)則都需要在自定義的隱式變量my_self_Ordering中實(shí)現(xiàn)

private val ordering = implicitly[Ordering[K]]
//這里是使用了上下文界定,這個(gè)T就是Ordering[K]
def implicitly[T](implicit e: T) = e 
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
  }
Demo案例
val sparkConf = new SparkConf().setAppName("test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val wordsRDD: RDD[String] = sc.textFile("D:\\Spark_數(shù)據(jù)\\numbers_data.txt")
    val resultRDD = wordsRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(each => (each._2, each._1))
/**
      * key怎么排序,在這里定義
      * 為什么在這里聲明一個(gè)隱式變量呢,是因?yàn)樵谠创a中,方法中有一個(gè)隱式參數(shù);不設(shè)置是按照默認(rèn)的排序規(guī)則進(jìn)行排序;
      */
    implicit val my_self_Ordering = new Ordering[String] {
      override def compare(a: String, b: String): Int = {
        val a_b: Array[String] = a.split("_")
        val a_1 = a_b(0).toInt
        val a_2 = a_b(1).toInt
        val b_b = b.split("_")
        val b_1 = b_b(0).toInt
        val b_2 = b_b(1).toInt
        if (a_1 == b_1) {
          a_2 - b_2
        } else {
          a_1 - b_1
        }
      }
    }

val rdd = resultRDD.map(x => (x._1 + "_" + x._2, x._2)).repartitionAndSortWithinPartitions(new KeyBasePartitioner(2))
/**
    * 自定義分區(qū)器
    *
    * @param partitions
    */
  class KeyBasePartitioner(partitions: Int) extends Partitioner {
     //分區(qū)數(shù)
    override def numPartitions: Int = partitions
    //該方法決定了你的數(shù)據(jù)被分到那個(gè)分區(qū)里面
    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[String]
      Math.abs(k.hashCode() % numPartitions)
    }
  }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容