【Spark Java API】Transformation(9)—sortByKey、repartitionAndSortWithinPartitions

sortByKey


官方文檔描述:

Sort the RDD by key, so that each partition contains a sorted range of the elements in ascending order. 
Calling `collect` or `save` on the resulting RDD will return or output an ordered list of records (in the `save` case, 
they will be written to multiple `part-X` files in the filesystem, in order of the keys).

函數(shù)原型:

def sortByKey(): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V]
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V]

源碼分析:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope{  
  val part = new RangePartitioner(numPartitions, self, ascending)  
  new ShuffledRDD[K, V, V](self, part)    
  .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

**
sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數(shù)據(jù)依賴很簡單,先使用 shuffle 將 records 聚集在一起(放到對應(yīng)的 partition 里面),然后將 partition 內(nèi)的所有 records 按 key 排序,最后得到的 MapPartitionsRDD 中的 records 就有序了。目前 sortByKey() 先使用 Array 來保存 partition 中所有的 records,再排序。
**

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random(100);
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override      
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
      return new Tuple2<Integer, Integer>(integer,random.nextInt(10));    
  }
});

JavaPairRDD<Integer,Integer> sortByKeyRDD = javaPairRDD.sortByKey();
System.out.println(sortByKeyRDD.collect());

repartitionAndSortWithinPartitions


官方文檔描述:

Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys.This is more efficient than calling `repartition` 
and then sorting within each partition because it can push the sorting down into the shuffle machinery.

函數(shù)原型:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaPairRDD[K, V]

def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])  : JavaPairRDD[K, V]

源碼分析:

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {  
  new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}

**
從源碼中可以看出,該方法依據(jù)partitioner對RDD進(jìn)行分區(qū),并且在每個結(jié)果分區(qū)中按key進(jìn)行排序;通過對比sortByKey發(fā)現(xiàn),這種方式比先分區(qū),然后在每個分區(qū)中進(jìn)行排序效率高,這是因為它可以將排序融入到shuffle階段。
**

實例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
final Random random = new Random();JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override    
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
      return new Tuple2<Integer, Integer>(integer,random.nextInt(10));    
  }
});

JavaPairRDD<Integer,Integer> RepartitionAndSortWithPartitionsRDD = javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {    
    @Override    
    public int numPartitions() {   return 2;    }    
    @Override    
    public int getPartition(Object key) { return key.toString().hashCode() % numPartitions();    
  }
});
System.out.println(RepartitionAndSortWithPartitionsRDD.collect());
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容