在對RDDPair(一種特殊的 RDD,即RDD[(key, Row)])進(jìn)行操作時經(jīng)常會用到 reduceByKey() 和 groupByKey() 兩個算子。下面看看兩者的區(qū)別和使用方法:
一、reduceByKey(func) 和 groupByKey() 的區(qū)別
reduceByKey(func):顧名思義,是針對 RDDPair 中具有
相同 key 的所有 row 做 reduce 操作,操作內(nèi)容由函數(shù) func 確定,可以自定義,比如:形如 (0, BACA) 這樣的 row,現(xiàn)在需要對 key 相同的所有row(即 BACA)使用"-"拼接成一個長字符串,比如(1,TMWTYV-PYSAJV);groupByKey(): 顧名思義,是針對 RDDPair 中具有
相同 key 的所有 row 分組,相同 key 對應(yīng)的 row 匯總生成一個sequence;本身不能自定義函數(shù),只能通過額外通過map(func)來實(shí)現(xiàn)。比如:(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))。


使用reduceByKey()的時候,本地的數(shù)據(jù)先進(jìn)行merge然后再傳輸?shù)讲煌?jié)點(diǎn)再進(jìn)行merge,最終得到最終結(jié)果。
而使用groupByKey()的時候,并不進(jìn)行本地的merge,全部數(shù)據(jù)傳出,得到全部數(shù)據(jù)后才會進(jìn)行聚合成一個sequence,groupByKey()傳輸速度明顯慢于reduceByKey()。
雖然groupByKey().map(func)也能實(shí)現(xiàn)reduceByKey(func)功能,但是優(yōu)先使用reduceByKey(func)。
區(qū)別:
| 區(qū)別項(xiàng) | reduceByKey | groupByKey | 備注 |
|---|---|---|---|
| 功能 | 針對 RDDPair 中具有相同 key 的所有 row 做 reduce 操作 | 針對 RDDPair 中具有相同 key 的所有 row 分組 | 無 |
| 能自定義函數(shù) | 可以自定義reduce函數(shù) | 否 | 無 |
| 輸出 | 一個 key 對應(yīng)一個row | 一個key 對應(yīng)多個row的sequence | 無 |
| 性能 | 更高 | 更低 | groupByKey.map(func) 可以實(shí)現(xiàn) reduceByKey,但是盡量用 reduceByKey,因?yàn)楦咝?/td> |
二、Scala 代碼--使用方法
- rddMap.groupByKey(自定義partitioner);
- rddMap.reduceByKey(自定義reduce函數(shù)) 或者類似 rddMap.reduceByKey(_ + "-" + ) ,其中 _ + "-" + _ 中的""表示 key 相同的兩個 row
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object TestSparkShuffle {
class MyPartitioner(partitionNum: Int) extends Partitioner() {
override def numPartitions: Int = partitionNum
override def getPartition(key: Any): Int = {
if (key.asInstanceOf[Int] % 2 == 0) {
0
} else {
1
}
}
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local").appName("test").getOrCreate()
val sc = spark.sparkContext
val arr = new ArrayBuffer[String]
genStrArr(36, arr)
val rdd = sc.parallelize(arr)
val rddMap: RDD[(Int, String)] = rdd.mapPartitions(
partition => {
partition.map(str => (getKey(str), str))
}
)
rddMap.foreach(x => println(x))
// 按照 key 進(jìn)行分組,且key為奇、偶數(shù)的row各分在0、1分區(qū)內(nèi)
val rddMap2 = rddMap.groupByKey(new MyPartitioner(2))
rddMap2.foreach(x => println(x))
// 對 rddMap 中的row按照row的key,同樣的key的value相繼使用"-"拼接起來
val rddMap3 = rddMap.reduceByKey(reduceFunc)
// val rddMap3 = rddMap.reduceByKey(_ + "-" + _) // _ + "-" + _ 中的"_"表示 key 相同的兩個value
rddMap3.foreach(x => println(x))
println(rddMap.count())
}
// reduce 函數(shù),將兩個字符串使用"-"拼接
def reduceFunc(x: String, y : String): String = {
x + "-" + y
}
def getKey(str: String): Int = {
Math.abs(str.hashCode % 6)
}
// 生成size為num的字符串?dāng)?shù)組,每個字符串長度為6,由A~Z隨機(jī)構(gòu)成
def genStrArr(num : Int, arr: ArrayBuffer[String]): Unit = {
val baseChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
val charLen = 6
val rand = new Random()
for (x <- Range(0, num)) {
var subStr = ""
for (i <- Range(0, charLen)) {
val order = rand.nextInt(baseChars.length)
subStr += baseChars.charAt(order)
}
arr.append(subStr)
}
}
}
測試結(jié)果:
# groupByKey 結(jié)果
(4,CompactBuffer(HCAESV, OZNIQU, WIIWNX, MEFMUZ, TVFPRH, EMSZJC))
(0,CompactBuffer(ZCEXLX, BKSGQD, ICRWVA, PXFBAC, SUBCYR, OMEQVV, TMBPHW))
(2,CompactBuffer(XTAKJH, HOUFFR, KIJCNU, BDILZU, SJFGRN, IZPCHR, RIPRRA, UUGZER))
(1,CompactBuffer(TMWTYV, PYSAJV))
(3,CompactBuffer(UHQTWN, YSLXXE, PNIMWJ, NAYYWU, EYPRPM, SXGUQO, DDSNIY, EXPSPM))
(5,CompactBuffer(ZOGCRZ, VORGBM, CUZZFS, SLFBWC, PFRFRA))
# reduceByKey 結(jié)果
(4,HCAESV-OZNIQU-WIIWNX-MEFMUZ-TVFPRH-EMSZJC)
(0,ZCEXLX-BKSGQD-ICRWVA-PXFBAC-SUBCYR-OMEQVV-TMBPHW)
(1,TMWTYV-PYSAJV)
(3,UHQTWN-YSLXXE-PNIMWJ-NAYYWU-EYPRPM-SXGUQO-DDSNIY-EXPSPM)
(5,ZOGCRZ-VORGBM-CUZZFS-SLFBWC-PFRFRA)
(2,XTAKJH-HOUFFR-KIJCNU-BDILZU-SJFGRN-IZPCHR-RIPRRA-UUGZER)