sparkRddFunctionNote

zipWithIndex:首先基于分區(qū)索引? 然后基于分區(qū)內元素索引 第一個元素是第一個分區(qū)的第一個元素 最后一個元素是最后一個分區(qū)的最后一個元素 Index的返回類型是Long類型而不是Int 如果RDD不止一個分區(qū),則觸發(fā)一個spark job,如果是根據groupBy()返回的RDD 不能保證一個分區(qū)內的元素排序,所以 如果需要確保每一個元素的索引序列,需要針對RDD使用sortByKey() 算子 進行sort? 或者保存進一個文件

val seqRdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))

seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,Mary)

// (partitionIndex:00,Jim)

// (partitionIndex:01,Green)

// (partitionIndex:01,Jack)

// (partitionIndex:01,Tony)

seqRdd.zipWithIndex().collect().foreach(println)

// (Mary,0)

// (Jim,1)

// (Green,2)

// (Jack,3)

// (Tony,4)

seqRdd.zipWithIndex().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(Mary,0))

// (partitionIndex:00,(Jim,1))

// (partitionIndex:01,(Green,2))

// (partitionIndex:01,(Jack,3))

// (partitionIndex:01,(Tony,4))

zip:

將當前RDD與另外一個RDD進行zip操作,返回從兩個RDD返回的第一個元素對/第二個元素對 作為key-value對,假定兩個RDD 具有兩個相同數據量的分區(qū)且每個分區(qū)的元素數量相同,如果存在某一個分區(qū)對,其元素數量不相同,則拋出SparkException:Can Only zip RDDs with the same number of elements in each partition

CollectAsMap:將當前RDD的key-value 對 以Map形式返回至master節(jié)點,如果一個key有多個value。只能返回一個value

seqRdd.zip(seqRdd).collectAsMap().foreach(println);seqRdd.zip(seqRdd).foreach(println);

// (Jim,Jim) (Jack,Jack) (Green,Green) (Mary,Mary) (Tony,Tony)

// (Mary,Mary)(Jim,Jim)(Green,Green)(Jack,Jack)(Tony,Tony)

zipWithUniqueId:

使用唯一的Long id 與當前RDD 進行Zips操作,在第K個分區(qū)內的元素(其分區(qū)內索引分別為0,1,2,3,4)的id分別為:k,k+n,k+2*n,k+3*n... n為分區(qū)總數,因此在這里存在一定的跳躍,但是與zipWithIndex不同,該方法不會觸發(fā)spark job,其他與其一致。

seqRdd.zipWithUniqueId().collect.foreach(println)

// (Mary,0)

// (Jim,2)

// (Green,1)

// (Jack,3)

// (Tony,5)

// 諸如 combineByKey等算子,在指定參數名及其具體值時,需要注意其參數名應與聲明時參數名一致。

val seq1Rdd = sc.parallelize(List("Mary","Jim","Green","Jack","Tony"))

val seq2Rdd = sc.parallelize(List(1,2,1,2,1))

val zipRdd = seq2Rdd.zip(seq1Rdd)

combineRdd:方法向后兼容,使用一系列常用的聚合函數對RDD的每個Key 聯(lián)合組成其value值

val combineRdd = zipRdd.combineByKey(createCombiner = (x:String) => List(x),mergeValue = (x:List[String],y:String) => x.:+(y),mergeCombiners = (x:List[String],y:List[String]) => x.:::(y))

combineRdd.foreach(println)

// (2,(List(Jack,Jim)))

// (1,(List(Green,Tony,Mary)))

解析:

combineByKey屬于Key-value算子,做的是聚合操作,這種變換不會觸發(fā)作業(yè)的提交,主要有三個參數:

createCombiner function:一個組合函數 用于將RDD[K,V] 中的V轉換成一個新的值C1

mergeValue function: 合并值函數,將一個C1類型值和一個V類型值合并成一個C2類型,輸入參數為(C1,V) 輸出為新的C2

mergeCombiners function:合并組合器函數 用于將兩個C2類型值合并成一個C3類型 輸入參數為(C2,C2) 輸出為C2

val createCombine = (x:String) => List(x)

val mergeValue = (x:List[String],y:String) => y :: x

// Adda an element at the beginning of this list.{{{ 1 :: List(2,3) = List(2,3).::(1) = List(1,2,3)}}}

val mergerCombiners = (x:List[String],y:List[String]) => x ::: y // Adds the elements of a given list in front of this list. {{{ List(1,2) ::: List(3,4) = List(3,4).:::List(1,2) = List(1,2,3,4)}}}

val combineRdd2 = zipRdd.combineByKey(createCombine,mergeValue,mergerCombiners)

combineRdd2.foreach(println)

// (2,(List(Jim,Jack)))

// (1,(List(Mary,Tony,Green)))

aggregateByKey:

使用combine組合函數和一個中性netural“zero value”對每一個key的多個value進行聚合操作,可以返回不同于RDD中固有數據類型V的結果類型U,一個merge操作將V 轉換為U,另外一個merge操作將兩個U聚合,(集合可反復遍歷) 之前的數據類型轉換操作是在一個分區(qū)內部進行的? 之后的將U進行聚合操作是在分區(qū)之間進行的。

為避免內存泄漏,這些函數均可以被修改并且返回他們的第一個參數而不是創(chuàng)建一個新的U實例。 計算的時候與分區(qū)的關系很大,注意分區(qū)的作用。

val seqRdd = sc.parallelize(List(("cat",2),("dog",12),("cat",12),("cat",5),("mouse",4),("mouse",2)))

seqRdd.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(cat,2))

// (partitionIndex:00,(dog,12))

// (partitionIndex:00,(cat,12))

// (partitionIndex:01,(cat,5))

// (partitionIndex:01,(mouse,4))

// (partitionIndex:01,(mouse,2))

val aggregateFuncRdd = seqRdd.aggregateByKey[Int](zeroValue = 0)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2)

aggregateFuncRdd.collect.foreach(println)

// (dog,12) (cat,17) (mouse,4)

seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)

// (dog,12) (cat,22) (mouse,10)? 首先和當前分區(qū)內的同一key的多個value與zeroValue結合進行seqOp運算,分區(qū)0內cat 對應的value最大為12,dog為12,分區(qū)1內cat對應的value最大為5 mouse為4? 再將其與ZeroValue進行計算 0-cat-Max 12 0-dog-Max 12 1-cat-Max 10 1-mouse-Max 10? 然后再將兩個分區(qū)的同一key的多個value進行相加? 得到 (dog,12) (cat,22) (mouse,10)

seqRdd.aggregateByKey[Int](zeroValue = 10)(seqOp = (value:Int,zeroValue:Int) => math.max(value,zeroValue),combOp = (value1:Int,value2:Int) => value1 + value2).collect.foreach(println)

// (dog,100) (cat,200) (mouse,100)

countByKey:

計算每一個Key對應的元素數量,并講結果collect至本地Map格式,結果Map必須相對較小 才能夠全部加載進Driver's 內存 處理大數據量的話? 可以返回一個RDD[K,Long] 代替一個Map

seqRdd.countByKey() // self.mapValue(_ => 1L).reduceByKey(_+_).collect().toMap

seqRdd.countByValue()// map(value => (value,null)).countByKey()? 當前value不是key-value對的value? 而是指代整個key-value 以(value,count)對的本地Map形式返回當前RDD的不同value的個數(此處應該是一個Bug)

seqRdd.map(_._2).countByValue()

sortByKey():

進行了shuffle操作,shuffle之后被重新分區(qū),排序靠前的元素在低序號分區(qū),排序靠后的元素在高序號分區(qū),每一個分區(qū)內包含一段已經排序好的元素,針對結果如果調用collect 或者save算子,將會返回或者輸出一段已經排序好的記錄,調用save算子時將會在文件系統(tǒng)內生成多個依照key進行排序的“part-x”的文件

val ze = sc.parallelize(List("dog","tiger","tac","cat","gnu","panther"),2)

val zf = ze.map(x => (x.length,x))

zf.mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(3,dog))

// (partitionIndex:00,(5,tiger))

// (partitionIndex:00,(3,tac))

// (partitionIndex:01,(3,cat))

// (partitionIndex:01,(3,gnu))

// (partitionIndex:01,(7,panther))

zf.sortByKey().foreachPartition(x => println("sortByKey....",x.toList.mkString(",")))

// (sortByKey....,(5,tiger),(7,panther))

// (sortByKey....,(3,dog),(3,tac),(3,cat),(3,gnu))

zf.sortByKey().mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("partitionIndex:0"+Index,x)).toIterator}.collect().foreach(println)

// (partitionIndex:00,(3,dog))

// (partitionIndex:00,(3,tac))

// (partitionIndex:00,(3,cat))

// (partitionIndex:00,(3,gnu))

// (partitionIndex:01,(5,tiger))

// (partitionIndex:01,(7,panther))

foldByKey():使用一個組合函數 和中性值 neutral “zero value” 數據類型與Value類型相同,執(zhí)行時首先針對某一分區(qū)內相關元素進行組合函數計算 然后 結合 zero value 進行計算。。 然后根據Key將不同分區(qū)的組合計算結果進行組合函數運算

zf.foldByKey(zeroValue="#")((x:String,y:String) => x + y)..mapPartitionsWithIndex{(index,iter) => iter.toList.map(x => ("foldByKey:"+Index,x)).toIterator}.collect().foreach(println)

// (foldByKey:1,(3,#dogtac#catgnu))

// (foldByKey:1,(7,#panther))

// (foldByKey:1,(5,#tiger))

Join:

fullOuterJoin:全連接 (key,(Some(v1),Some(v2)))

join:連接 (key,(v1,v2))

leftOuterJoin:左連接 (key,(v1,Some(v2)))

rightOuterJoin:右連接 (key,(Some(v1),v2))

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容