【Spark Java API】Transformation(13)—zipWithIndex、zipWithUniqueId

zipWithIndex


官方文檔描述:

Zips this RDD with its element indices. The ordering is first based on the partition index 
and then the ordering of items within each partition. So the first item in the first partition 
gets index 0, and the last item in the last partition receives the largest index. 
This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
This method needs to trigger a spark job when this RDD contains more than one partitions.

函數(shù)原型:

def zipWithIndex(): JavaPairRDD[T, JLong]

該函數(shù)將RDD中的元素和這個元素在RDD中的indices組合起來,形成鍵/值對的RDD。

源碼分析:

def zipWithIndex(): RDD[(T, Long)] = withScope {  
    new ZippedWithIndexRDD(this)
}

/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {  
    val n = prev.partitions.length  
    if (n == 0) {    
      Array[Long]()  
    } else if (n == 1) {   
       Array(0L)  
    } else {    
       prev.context.runJob(      
          prev,      
          Utils.getIteratorSize _,      
          0 until n - 1, // do not need to count the last partition      
          allowLocal = false    
      ).scanLeft(0L)(_ + _)  
  }
}

override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {  
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]      
    firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>    
        (x._1, split.startIndex + x._2)  
  }
}

從源碼中可以看出,該函數(shù)返回ZippedWithIndexRDD,在ZippedWithIndexRDD中通過計算startIndices獲得index;然后在compute函數(shù)中利用scala的zipWithIndex計算index。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); 
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); 
List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); 
JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1);
JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithIndex(); 
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());

zipWithUniqueId


官方文檔描述:

Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, 
n+k,2*n+k, ..., where n is the number of partitions. So there may exist gaps, 
but this method won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].

函數(shù)原型:

def zipWithUniqueId(): JavaPairRDD[T, JLong]

該函數(shù)將RDD中的元素和一個對應(yīng)的唯一ID組合成鍵值對,其中ID的生成算法是每個分區(qū)的第一元素的ID是該分區(qū)索引號,每個分區(qū)中的第N個元素的ID是(N * 該RDD總的分區(qū)數(shù)) + (該分區(qū)索引號)。

源碼分析:

def zipWithUniqueId(): RDD[(T, Long)] = withScope {  
    val n = this.partitions.length.toLong    
    this.mapPartitionsWithIndex { case (k, iter) =>    
        iter.zipWithIndex.map { case (item, i) =>      
            (item, i * n + k)    
        }  
  }
}

從源碼中可以看出,zipWithUniqueId()函數(shù)是利用mapPartitionsWithIndex()函數(shù)獲得每個元素的分區(qū)索引號,同時利用(i*n + k)進行相應(yīng)的計算。

實例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7);
JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1);
JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithUniqueId();
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容