combineByKey
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
其中的參數(shù):
createCombiner:組合器函數(shù),用于將V類型轉換成C類型,輸入參數(shù)為RDD[K,V]中的V,輸出為C
mergeValue:合并值函數(shù),將一個C類型和一個V類型值合并成一個C類型,輸入參數(shù)為(C,V),輸出為C
mergeCombiners:合并組合器函數(shù),用于將兩個C類型值合并成一個C類型,輸入參數(shù)為(C,C),輸出為C
numPartitions:結果RDD分區(qū)數(shù),默認保持原有的分區(qū)數(shù)
partitioner:分區(qū)函數(shù),默認為HashPartitioner
mapSideCombine:是否需要在Map端進行combine操作,類似于MapReduce中的combine,默認為true
舉例理解:
假設我們要將一堆的各類水果給榨果汁,并且要求果汁只能是純的,不能有其他品種的水果。那么我們需要一下幾步:
1 定義我們需要什么樣的果汁。
--相當于Hadoop中的map
2 定義一個榨果汁機,即給定水果,就能給出我們定義的果汁。
** --相當于hadoop中的local combiner**
3 定義一個果汁混合器,即能將相同類型的水果果汁給混合起來。
--相當于全局進行combiner,對每個partition 進行操作
那么對比上述三步,combineByKey的三個函數(shù)也就是這三個功能
1 createCombiner就是定義了v如何轉換為c
2 mergeValue 就是定義了如何給定一個V將其與原來的C合并成新的C
3 就是定義了如何將相同key下的C給合并成一個C

補充
注意體會 與 aggregate 的區(qū)別 foldByKey (f:(v,v) => v)
(zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
def testFoldBYKey(): Unit ={
val rdd2 =sc.makeRDD(Array(("A",1),("A",2),("c",1),("b",1),("b",2)))
rdd2.foldByKey(2)(_+_).collect().foreach(println)
}