深入理解groupByKey、reduceByKey

測(cè)試源碼

下面來看看groupByKey和reduceByKey的區(qū)別:

    val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local")
    val sc = new SparkContext(conf)
    val words = Array("one", "two", "two", "three", "three", "three")
    val wordsRDD = sc.parallelize(words).map(word => (word, 1))
    val wordsCountWithReduce = wordsRDD.
      reduceByKey(_ + _).
      collect().
      foreach(println)
    val wordsCountWithGroup = wordsRDD.
      groupByKey().
      map(w => (w._1, w._2.sum)).
      collect().
      foreach(println)

雖然兩個(gè)函數(shù)都能得出正確的結(jié)果, 但reduceByKey函數(shù)更適合使用在大數(shù)據(jù)集上。 這是因?yàn)镾park知道它可以在每個(gè)分區(qū)移動(dòng)數(shù)據(jù)之前將輸出數(shù)據(jù)與一個(gè)共用的key結(jié)合。

借助下圖可以理解在reduceByKey里發(fā)生了什么。 在數(shù)據(jù)對(duì)被搬移前,同一機(jī)器上同樣的key是怎樣被組合的( reduceByKey中的 lamdba 函數(shù))。然后 lamdba 函數(shù)在每個(gè)分區(qū)上被再次調(diào)用來將所有值 reduce成最終結(jié)果。整個(gè)過程如下:

image
image

另一方面,當(dāng)調(diào)用 groupByKey時(shí),所有的鍵值對(duì)(key-value pair) 都會(huì)被移動(dòng),在網(wǎng)絡(luò)上傳輸這些數(shù)據(jù)非常沒必要,因此避免使用 GroupByKey。

為了確定將數(shù)據(jù)對(duì)移到哪個(gè)主機(jī),Spark會(huì)對(duì)數(shù)據(jù)對(duì)的key調(diào)用一個(gè)分區(qū)算法。 當(dāng)移動(dòng)的數(shù)據(jù)量大于單臺(tái)執(zhí)行機(jī)器內(nèi)存總量時(shí)Spark會(huì)把數(shù)據(jù)保存到磁盤上。 不過在保存時(shí)每次會(huì)處理一個(gè)key的數(shù)據(jù),所以當(dāng)單個(gè) key 的鍵值對(duì)超過內(nèi)存容量會(huì)存在內(nèi)存溢出的異常。 這將會(huì)在之后發(fā)行的 Spark 版本中更加優(yōu)雅地處理,這樣的工作還可以繼續(xù)完善。 盡管如此,仍應(yīng)避免將數(shù)據(jù)保存到磁盤上,這會(huì)嚴(yán)重影響性能。

image
image

你可以想象一個(gè)非常大的數(shù)據(jù)集,在使用 reduceByKey 和 groupByKey 時(shí)他們的差別會(huì)被放大更多倍。

我們來看看兩個(gè)函數(shù)的實(shí)現(xiàn):

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
  /**
  * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
   * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
   */
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

注意mapSideCombine=false,partitioner是HashPartitioner,但是groupByKey對(duì)小數(shù)據(jù)量比較好,一個(gè)key對(duì)應(yīng)的個(gè)數(shù)少于10個(gè)。

他們都調(diào)用了combineByKeyWithClassTag,我們?cè)賮砜纯?code>combineByKeyWithClassTag的定義:

  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]

combineByKey函數(shù)主要接受了三個(gè)函數(shù)作為參數(shù),分別為createCombiner、mergeValue、mergeCombiners。這三個(gè)函數(shù)足以說明它究竟做了什么。理解了這三個(gè)函數(shù),就可以很好地理解combineByKey。

combineByKey是將RDD[(K,V)]combine為RDD[(K,C)],因此,首先需要提供一個(gè)函數(shù),能夠完成從V到C的combine,稱之為combiner。如果V和C類型一致,則函數(shù)為V => V。倘若C是一個(gè)集合,例如Iterable[V],則createCombiner為V => Iterable[V]。

mergeValue則是將原RDD中Pair的Value合并為操作后的C類型數(shù)據(jù)。合并操作的實(shí)現(xiàn)決定了結(jié)果的運(yùn)算方式。所以,mergeValue更像是聲明了一種合并方式,它是由整個(gè)combine運(yùn)算的結(jié)果來導(dǎo)向的。函數(shù)的輸入為原RDD中Pair的V,輸出為結(jié)果RDD中Pair的C。

最后的mergeCombiners則會(huì)根據(jù)每個(gè)Key所對(duì)應(yīng)的多個(gè)C,進(jìn)行歸并。

例如:

var rdd1 = sc.makeRDD(Array(("A", 1), ("A", 2), ("B", 1), ("B", 2),("B",3),("B",4), ("C", 1)))
    rdd1.combineByKey(
      (v: Int) => v + "_",
      (c: String, v: Int) => c + "@" + v,
      (c1: String, c2: String) => c1 + "$" + c2
    ).collect.foreach(println)

result不確定歐,單機(jī)執(zhí)行不會(huì)調(diào)用mergeCombiners:

(B,1_@2@3@4)
(A,1_@2)
(C,1_)

在集群情況下:

(B,2_@3@4$1_)
(A,1_@2)
(C,1_)
或者
(B,1_$2_@3@4)
(A,1_@2)
(C,1_)

mapSideCombine=false時(shí),再體驗(yàn)一下運(yùn)行結(jié)果。

有許多函數(shù)比goupByKey好:

  1. 當(dāng)你combine元素時(shí),可以使用combineByKey,但是輸入值類型和輸出可能不一樣
  2. foldByKey合并每一個(gè) key 的所有值,在級(jí)聯(lián)函數(shù)和“零值”中使用。
    //使用combineByKey計(jì)算wordcount
    wordsRDD.map(word=>(word,1)).combineByKey(
      (v: Int) => v,
      (c: Int, v: Int) => c+v,
      (c1: Int, c2: Int) => c1 + c2
    ).collect.foreach(println)

    //使用foldByKey計(jì)算wordcount
    println("=======foldByKey=========")
    wordsRDD.map(word=>(word,1)).foldByKey(0)(_+_).foreach(println)

    //使用aggregateByKey計(jì)算wordcount
    println("=======aggregateByKey============")
    wordsRDD.map(word=>(word,1)).aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)

foldByKey,aggregateByKey都是由combineByKey實(shí)現(xiàn),并且mapSideCombine=true,因此可以使用這些函數(shù)替代goupByKey。

參考

Spark中的combineByKey

databricks gitbooks

在Spark中盡量少使用GroupByKey函數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容