Spark中的combineByKey

在數(shù)據(jù)分析中,處理Key,Value的Pair數(shù)據(jù)是極為常見的場景。譬如說,對(duì)Pair數(shù)據(jù)按照key分組、聚合,又或者更抽象的,則是根據(jù)key對(duì)value進(jìn)行fold運(yùn)算。

如果我們對(duì)編碼的態(tài)度有些敷衍,大約會(huì)將其分別定義為三個(gè)函數(shù):gruopByKey、aggregateByKey、foldByKey。站在調(diào)用者的角度,如此設(shè)計(jì)無可厚非,相反我還得擊節(jié)贊嘆。因?yàn)閺暮瘮?shù)名來看,確實(shí)體貼地照顧了用戶的知識(shí)結(jié)構(gòu)。換個(gè)角度,站在實(shí)現(xiàn)這一邊,你可能會(huì)發(fā)現(xiàn)這三個(gè)函數(shù)乃孿生兄弟,具有相同的血統(tǒng)。

所謂“抽象”,就是要尋找實(shí)現(xiàn)上的共同特征。OO也好,F(xiàn)P也罷,都格外重視抽象的能力,因?yàn)槌橄竽茉诤艽蟪潭壬匣睘楹?,且具備?yīng)對(duì)變化的能力。只是各自抽象的層次不同罷了。與OO中抽象接口的設(shè)計(jì)思路是等同的,個(gè)人認(rèn)為,F(xiàn)P只是將抽象做到了極致,落實(shí)到了類型之上(暫且放開業(yè)務(wù)的羈絆),函數(shù)不過就是類型的轉(zhuǎn)換罷了。

在Spark的語境中,RDD是核心數(shù)據(jù)結(jié)構(gòu),從Scala的語法出發(fā),可以認(rèn)為RDD是一個(gè)類型類,或者M(jìn)onad容器(個(gè)人如此認(rèn)為,若有不妥,還請(qǐng)方家指正)。這個(gè)容器到底裝了什么數(shù)據(jù)呢?得看你如何為其裝載數(shù)據(jù),前面提到了Pair類型,在本文場景下,RDD實(shí)則是一個(gè)RDD[(K, V)]類型的數(shù)據(jù)。

“前戲”到此結(jié)束,現(xiàn)在來看看groupByKey、aggregateByKey和foldByKey到底要做什么(what to do)?

groupByKey是將一堆結(jié)構(gòu)形如(K, V)的數(shù)據(jù)根據(jù)K分組,我們暫且將這個(gè)分組過程看做是一個(gè)黑盒子,想一想,輸出會(huì)是什么?

aggregateByKey是將一堆結(jié)構(gòu)形如(K, V)的數(shù)據(jù)根據(jù)K對(duì)數(shù)據(jù)進(jìn)行聚合運(yùn)算,它的輸出又會(huì)是什么呢

fold是針對(duì)一個(gè)集合中的數(shù)據(jù)進(jìn)行折疊運(yùn)算,運(yùn)算符則取決傳入的函數(shù),例如sum或者produce等。因而foldByKey就是將一堆結(jié)構(gòu)形如(K, V)的數(shù)據(jù)根據(jù)K對(duì)數(shù)據(jù)進(jìn)行折疊運(yùn)算,那么,它的輸出又會(huì)是什么?

現(xiàn)在需要一點(diǎn)推演的能力。首先,不管過程如何,這三個(gè)運(yùn)算接收的參數(shù)總是相同的,皆為(K, V)。輸出結(jié)果定然不同,但它們卻又具有相同的特征,即它們都是根據(jù)K分別對(duì)數(shù)據(jù)進(jìn)行運(yùn)算,換言之,計(jì)算可能不同,計(jì)算的結(jié)果可能不同,但結(jié)果一定是根據(jù)K來組織的。這里的K不就是Pair的key嗎?對(duì)應(yīng)的Value呢?我們還不知道,然而不管是阿貓阿狗,它總可以用一個(gè)抽象的類型參數(shù)來代表,記為C,結(jié)果就變成了(K, C)。這就是我們要找的一個(gè)抽象:

RDD[(K, V)] -> RDD[(K, C)]

然而,這個(gè)抽象還不夠,我們需要找到將V(可能是多個(gè)V)變成C的方法。

現(xiàn)在假設(shè)我們要尋找的抽象是一臺(tái)超級(jí)酷的果汁機(jī)。它能同時(shí)接受各種各樣的水果,然后聰明地按照水果的種類分別榨出不同的果汁。蘋果歸蘋果汁,橙子歸橙汁,西瓜歸西瓜汁。我們?yōu)樗x類型為Fruit,果汁定義為Juice,根據(jù)前面的分析,這個(gè)過程就是將RDD[(String, Fruit)]轉(zhuǎn)換為RDD[(String, Juice)]。

注意,在榨果汁前,水果可能有很多,即使是相同類型的水果,也會(huì)作為不同的RDD元素:

("apple", apple1), ("orange", orange1), ("apple", apple2)

轉(zhuǎn)換的結(jié)果是每種水果只有一杯果汁(只是容量不同罷了):

("apple", appleJuice), ("orange", orangeJuice)

那么,這個(gè)果汁機(jī)該由什么元件構(gòu)成呢?現(xiàn)在,我們化身為機(jī)械師,想想這個(gè)果汁機(jī)的組成元件:

  • 首先,它需要一個(gè)元件提供將各種水果榨為各種果汁的功能;
  • 其次,它需要提供將果汁進(jìn)行混合的功能;
  • 最后,為了避免混合錯(cuò)誤,還得提供能夠根據(jù)水果類型進(jìn)行混合的功能。

注意第二個(gè)函數(shù)和第三個(gè)函數(shù)的區(qū)別,前者只提供混合功能,即能夠?qū)⒉煌萜鞯墓b到一個(gè)容器中,而后者的輸入已有一個(gè)前提,那就是已經(jīng)按照水果類型放到不同的區(qū)域,果汁機(jī)在混合果汁時(shí),并不會(huì)混淆不同區(qū)域的果汁。否則你得到的果汁就不是蘋果汁或者橙汁,而是混合味兒的果汁。

再回到函數(shù)這邊來。從函數(shù)的抽象層面看,這些操作具有共同的特征,都是將類型為RDD[(K,V)]的數(shù)據(jù)處理為RDD[(K,C)]。這里的V和C可以是相同類型,也可以是不同類型。這種操作并非單純地對(duì)Pair的value進(jìn)行map,而是針對(duì)不同的key值對(duì)原有的value進(jìn)行聯(lián)合(Combine)。因而,不僅類型可能不同,元素個(gè)數(shù)也可能不同。

于是,我們百折千回地尋找到了這個(gè)高度抽象的操作combineByKey。該方法在Spark的定義如下所示:

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = {
    //實(shí)現(xiàn)略
  }

聲明式風(fēng)格與命令式風(fēng)格不同之處在于它說明了代碼做了什么(what to do),而不是怎么做(how to do)。combineByKey函數(shù)主要接受了三個(gè)函數(shù)作為參數(shù),分別為createCombiner、mergeValue、mergeCombiners。這三個(gè)函數(shù)足以說明它究竟做了什么。理解了這三個(gè)函數(shù),就可以很好地理解combineByKey。

要將RDD[(K,V)]combine為RDD[(K,C)],就需要提供一個(gè)函數(shù),能夠完成從V到C的combine,稱之為combiner。如果V和C類型一致,則函數(shù)為V => V。倘若C是一個(gè)集合,例如Iterable[V],則createCombiner為V => Iterable[V]。

mergeValue則將原RDD中Pair的Value合并為操作后的C類型數(shù)據(jù)。合并操作的實(shí)現(xiàn)決定了結(jié)果的運(yùn)算方式。所以,mergeValue更像是聲明了一種合并方式,它是由整個(gè)combine運(yùn)算的結(jié)果來導(dǎo)向的。函數(shù)的輸入為原RDD中Pair的V,輸出為結(jié)果RDD中Pair的C。

最后的mergeCombiners則會(huì)根據(jù)每個(gè)Key對(duì)應(yīng)的多個(gè)C,進(jìn)行歸并。

再回到果汁機(jī)的案例。果汁機(jī)的功能類似于groupByKey+foldByKey操作。但我們沒有必要自己去實(shí)現(xiàn)這個(gè)榨取果汁的功能,可以直接調(diào)用combineByKey函數(shù):

case class Juice(volumn: Int) {
    def add(j: Juice):Juice = Juice(volumn + j.volumn)
}
case class Fruit(kind: String, weight: Int) {
    def makeJuice:Juice = Juice(weight * 100)
}
val apple1 = Fruit("apple", 5)
val apple2 = Fruit("apple", 8)
val orange1 = Fruit("orange", 10)
     
val fruit = sc.parallelize(List(("apple", apple1) , ("orange", orange1) , ("apple", apple2))) 
val juice = fruit.combineByKey(
    f => f.makeJuice,
    (j:Juice, f) => j.add(f.makeJuice),
    (j1:Juice, j2:Juice) => j1.add(j2) 
)

執(zhí)行juice.collect,結(jié)果為:

Array[(String, Juice)] = Array((orange, Juice(1000)), (apple, Juice(1300)))

RDD中有許多針對(duì)Pair RDD的操作在內(nèi)部實(shí)現(xiàn)都調(diào)用了combineByKey函數(shù)(Spark的2.0版本則實(shí)現(xiàn)為combineByKeyWithClassTag)。例如groupByKey:

class PairRDDFunctions[K, V](self: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
  extends Logging
  with SparkHadoopMapReduceUtil
  with Serializable {
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
        val createCombiner = (v: V) => CompactBuffer(v)
        val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
        val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
        val bufs = combineByKey[CompactBuffer[V]](
          createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
        bufs.asInstanceOf[RDD[(K, Iterable[V])]]
      }
}

groupByKey函數(shù)針對(duì)PairRddFunctions的RDD[(K, V)]按照key對(duì)value進(jìn)行分組。它在內(nèi)部調(diào)用了combineByKey函數(shù),傳入的三個(gè)函數(shù)分別承擔(dān)了如下職責(zé):

  • createCombiner是將原RDD中的K類型轉(zhuǎn)換為Iterable[V]類型,實(shí)現(xiàn)為CompactBuffer。
  • mergeValue實(shí)則就是將原RDD的元素追加到CompactBuffer中,即將追加操作(+=)視為合并操作。
  • mergeCombiners則負(fù)責(zé)針對(duì)每個(gè)key值所對(duì)應(yīng)的Iterable[V],提供合并功能。

再例如,我們要針對(duì)科目對(duì)成績求平均值:

val scores = sc.parallelize(List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0)))

平均值并不能一次獲得,而是需要求得各個(gè)科目的總分以及科目的數(shù)量。因此,我們需要針對(duì)scores進(jìn)行combine,從(String, Float)combine為(String, (Float, Int))。在調(diào)用combineByKey函數(shù)后,再通過map來獲得平均值。代碼如下:

val avg = scores.combineByKey(
    (v) => (v, 1),
    (acc: (Float, Int), v) => (acc._1 + v, acc._2 + 1),
    (acc1:(Float, Int), acc2:(Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

除了可以進(jìn)行g(shù)roup、average之外,根據(jù)傳入的函數(shù)實(shí)現(xiàn)不同,我們還可以利用combineByKey完成諸如aggregate、fold等操作。這是一個(gè)高度的抽象,但從聲明的角度來看,卻又不需要了解過多的實(shí)現(xiàn)細(xì)節(jié)。這正是函數(shù)式編程的魅力。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容