repartitionAndSortWithinPartitions算是一個(gè)高效的算子,是因?yàn)樗仁褂胷epartition And sortByKey 效率高,這是由于它的排序是在shuffle過程中進(jìn)行,一邊shuffle,一邊排序;具體見spark shuffle的讀操作;
關(guān)于為什么比repartition And sortByKey 效率高,首先簡要分析repartition 和sortbykey'的流程:
(1)rePartition
(2)sortByKey
repartitionAndSortWithinPartitions的使用
(1)使用repartitionAndSortWithinPartitions時(shí),需要自己傳入一個(gè)分區(qū)器參數(shù),這個(gè)分區(qū)器 可以是系統(tǒng)提供的,也可以是自定義的:例如以下Demo中使用的KeyBasePartitioner,同時(shí)需要自定義一個(gè)排序的隱式變量,當(dāng)我們使用repartitionAndSortWithinPartitions時(shí),我們自定義的my_self_Ordering 排序規(guī)則就會傳入到def implicitly[T](implicit e: T) = e
(2)二次排序
排序規(guī)則都需要在自定義的隱式變量my_self_Ordering中實(shí)現(xiàn)
private val ordering = implicitly[Ordering[K]]
//這里是使用了上下文界定,這個(gè)T就是Ordering[K]
def implicitly[T](implicit e: T) = e
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
Demo案例
val sparkConf = new SparkConf().setAppName("test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val wordsRDD: RDD[String] = sc.textFile("D:\\Spark_數(shù)據(jù)\\numbers_data.txt")
val resultRDD = wordsRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(each => (each._2, each._1))
/**
* key怎么排序,在這里定義
* 為什么在這里聲明一個(gè)隱式變量呢,是因?yàn)樵谠创a中,方法中有一個(gè)隱式參數(shù);不設(shè)置是按照默認(rèn)的排序規(guī)則進(jìn)行排序;
*/
implicit val my_self_Ordering = new Ordering[String] {
override def compare(a: String, b: String): Int = {
val a_b: Array[String] = a.split("_")
val a_1 = a_b(0).toInt
val a_2 = a_b(1).toInt
val b_b = b.split("_")
val b_1 = b_b(0).toInt
val b_2 = b_b(1).toInt
if (a_1 == b_1) {
a_2 - b_2
} else {
a_1 - b_1
}
}
}
val rdd = resultRDD.map(x => (x._1 + "_" + x._2, x._2)).repartitionAndSortWithinPartitions(new KeyBasePartitioner(2))
/**
* 自定義分區(qū)器
*
* @param partitions
*/
class KeyBasePartitioner(partitions: Int) extends Partitioner {
//分區(qū)數(shù)
override def numPartitions: Int = partitions
//該方法決定了你的數(shù)據(jù)被分到那個(gè)分區(qū)里面
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[String]
Math.abs(k.hashCode() % numPartitions)
}
}