Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和區(qū)別

在對RDDPair(一種特殊的 RDD,即RDD[(key, Row)])進(jìn)行操作時經(jīng)常會用到 reduceByKey() 和 groupByKey() 兩個算子。下面看看兩者的區(qū)別和使用方法:

一、reduceByKey(func) 和 groupByKey() 的區(qū)別

  • reduceByKey(func):顧名思義,是針對 RDDPair 中具有相同 key 的所有 row 做 reduce 操作,操作內(nèi)容由函數(shù) func 確定,可以自定義,比如:形如 (0, BACA) 這樣的 row,現(xiàn)在需要對 key 相同的所有row(即 BACA)使用"-"拼接成一個長字符串,比如(1,TMWTYV-PYSAJV);

  • groupByKey(): 顧名思義,是針對 RDDPair 中具有相同 key 的所有 row 分組,相同 key 對應(yīng)的 row 匯總生成一個sequence;本身不能自定義函數(shù),只能通過額外通過map(func)來實(shí)現(xiàn)。比如:(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))。

ReduceByKey 示例
GroupByKey 示例

使用reduceByKey()的時候,本地的數(shù)據(jù)先進(jìn)行merge然后再傳輸?shù)讲煌?jié)點(diǎn)再進(jìn)行merge,最終得到最終結(jié)果。
而使用groupByKey()的時候,并不進(jìn)行本地的merge,全部數(shù)據(jù)傳出,得到全部數(shù)據(jù)后才會進(jìn)行聚合成一個sequence,groupByKey()傳輸速度明顯慢于reduceByKey()。
雖然groupByKey().map(func)也能實(shí)現(xiàn)reduceByKey(func)功能,但是優(yōu)先使用reduceByKey(func)

區(qū)別:

區(qū)別項(xiàng) reduceByKey groupByKey 備注
功能 針對 RDDPair 中具有相同 key 的所有 row 做 reduce 操作 針對 RDDPair 中具有相同 key 的所有 row 分組
能自定義函數(shù) 可以自定義reduce函數(shù)
輸出 一個 key 對應(yīng)一個row 一個key 對應(yīng)多個row的sequence
性能 更高 更低 groupByKey.map(func) 可以實(shí)現(xiàn) reduceByKey,但是盡量用 reduceByKey,因?yàn)楦咝?/td>

二、Scala 代碼--使用方法

  • rddMap.groupByKey(自定義partitioner);
  • rddMap.reduceByKey(自定義reduce函數(shù)) 或者類似 rddMap.reduceByKey(_ + "-" + ) ,其中 _ + "-" + _ 中的""表示 key 相同的兩個 row
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object TestSparkShuffle {

  class MyPartitioner(partitionNum: Int) extends Partitioner() {
    override def numPartitions: Int = partitionNum

    override def getPartition(key: Any): Int = {
      if (key.asInstanceOf[Int] % 2 == 0) {
        0
      } else {
        1
      }
    }
  }

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local").appName("test").getOrCreate()
    val sc = spark.sparkContext
    val arr = new ArrayBuffer[String]
    genStrArr(36, arr)
    val rdd = sc.parallelize(arr)
    val rddMap: RDD[(Int, String)] = rdd.mapPartitions(
      partition => {
        partition.map(str => (getKey(str), str))
      }
    )
    rddMap.foreach(x => println(x))
    // 按照 key 進(jìn)行分組,且key為奇、偶數(shù)的row各分在0、1分區(qū)內(nèi)
    val rddMap2 = rddMap.groupByKey(new MyPartitioner(2))
    rddMap2.foreach(x => println(x))
    // 對 rddMap 中的row按照row的key,同樣的key的value相繼使用"-"拼接起來
    val rddMap3 = rddMap.reduceByKey(reduceFunc) 
    //  val rddMap3 = rddMap.reduceByKey(_ + "-" +  _)    // _ + "-" +  _ 中的"_"表示 key 相同的兩個value
    rddMap3.foreach(x => println(x))
    println(rddMap.count())

  }

  // reduce 函數(shù),將兩個字符串使用"-"拼接
  def reduceFunc(x: String, y : String): String = {
    x + "-" +  y
  }


  def getKey(str: String): Int = {
    Math.abs(str.hashCode % 6)
  }

   // 生成size為num的字符串?dāng)?shù)組,每個字符串長度為6,由A~Z隨機(jī)構(gòu)成
  def genStrArr(num : Int, arr: ArrayBuffer[String]): Unit = {
    val baseChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    val charLen = 6
    val rand = new Random()
    for (x <- Range(0, num)) {
      var subStr = ""
      for (i <- Range(0, charLen)) {
        val order = rand.nextInt(baseChars.length)
        subStr += baseChars.charAt(order)
      }
      arr.append(subStr)
    }
  }


}


測試結(jié)果:

# groupByKey 結(jié)果
(4,CompactBuffer(HCAESV, OZNIQU, WIIWNX, MEFMUZ, TVFPRH, EMSZJC))
(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
(2,CompactBuffer(XTAKJH, HOUFFR, KIJCNU, BDILZU, SJFGRN, IZPCHR, RIPRRA, UUGZER))
(1,CompactBuffer(TMWTYV, PYSAJV))
(3,CompactBuffer(UHQTWN, YSLXXE, PNIMWJ, NAYYWU, EYPRPM, SXGUQO, DDSNIY, EXPSPM))
(5,CompactBuffer(ZOGCRZ, VORGBM, CUZZFS, SLFBWC, PFRFRA))


# reduceByKey 結(jié)果
(4,HCAESV-OZNIQU-WIIWNX-MEFMUZ-TVFPRH-EMSZJC)
(0,ZCEXLX-BKSGQD-ICRWVA-PXFBAC-SUBCYR-OMEQVV-TMBPHW)
(1,TMWTYV-PYSAJV)
(3,UHQTWN-YSLXXE-PNIMWJ-NAYYWU-EYPRPM-SXGUQO-DDSNIY-EXPSPM)
(5,ZOGCRZ-VORGBM-CUZZFS-SLFBWC-PFRFRA)
(2,XTAKJH-HOUFFR-KIJCNU-BDILZU-SJFGRN-IZPCHR-RIPRRA-UUGZER)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容