大數(shù)據(jù)Spark面試,distinct去重原理,是如何實(shí)現(xiàn)的

最近,有位朋友問我,distinct去重原理是怎么實(shí)現(xiàn)的?

“在面試時(shí),面試官問他了解distinct算子嗎?”

“了解啊,Spark的rdd,一種transFormation去重的算子,主要用來去重的”。

“喲,看來你經(jīng)常使用distinct算子,對(duì)distinct算子很熟悉啊”。

“好說,好說”。

“那你能說說distinct是如何實(shí)現(xiàn)去重的嗎?”

我朋友支支吾吾半天:“就是這樣、那樣去重的啊”。

“這樣、那樣是怎么去重的呢”

“具體有點(diǎn)忘記了(其實(shí)是根本就不知道)”。

那么distinct,底層到底是如何實(shí)現(xiàn)去重功能的呢?這個(gè)是面試spark部分時(shí),經(jīng)常被問到的問題。

先來看一段代碼,我們測試一下distinct去重的作用:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkDistinct {  
  def main(args: Array[String]): Unit = {    
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkDistinct")    
    val sc: SparkContext = new SparkContext(conf)    //定義一個(gè)數(shù)組    
    val array: Array[Int] = Array(1,1,1,2,2,3,3,4)    //把數(shù)組轉(zhuǎn)為RDD算子,后面的數(shù)字2代表分區(qū),也可以指定3,4....個(gè)分區(qū),也可以不指定。   
    val line: RDD[Int] = sc.parallelize(array,2)      
    line.distinct().foreach(x => println(x))  //輸出的結(jié)果已經(jīng)去重:1,2,3,4 
 }
}

通過上面的代碼可以看出,使用distinct以后,會(huì)對(duì)重復(fù)的元素進(jìn)行去重。我們來看下源碼

/**   * Return a new RDD containing the distinct elements in this RDD.   */  
def distinct(numPartitions: Int(implicit ord: Ordering[T] = null): RDD[T] = withScope {    
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)  
}  

/**   * Return a new RDD containing the distinct elements in this RDD.   */
def distinct(): RDD[T] = withScope {    
    distinct(partitions.length)  
}

上面是distinct的源碼,有帶參和無參兩種。當(dāng)我們調(diào)用無參的distinct時(shí),底層調(diào)用的是如下源碼:

def distinct(): RDD[T] = withScope {    
  distinct(partitions.length)  
}

而無參distinct()中又調(diào)用了帶參數(shù)的distinct(partitions.length)。

其中,partitions.length代表是分區(qū)數(shù),而這個(gè)分區(qū)則是我們?cè)谑褂?sc.parallelize(array,2) 時(shí)指定的2個(gè)分區(qū)。

帶參數(shù)的distinct其內(nèi)部就很容易理解了,這就是一個(gè)wordcount統(tǒng)計(jì)單詞的方法,區(qū)別是:后者通過元組獲取了第一個(gè)單詞元素。

map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

其中,numPartitions就是分區(qū)數(shù)。

我們也可以寫成這樣:

map(x => (x, null)).reduceByKey((x, y) => x).map(_._1)

也可以這樣寫:

line.map(x =>(x,1)).reduceByKey(_+_).map(_._1)

通過上面的流程圖很清晰的看出來,distinct的原理流程。

使用map算子把元素轉(zhuǎn)為一個(gè)帶有null的元組;使用reducebykey對(duì)具有相同key的元素進(jìn)行統(tǒng)計(jì);之后再使用map算子,取得元組中的單詞元素,實(shí)現(xiàn)去重的效果。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容