RDD五大特性
外部存儲(chǔ)轉(zhuǎn)換為RDD
val datas=sc.textfile("address")
val wordrdd=rdd.flatmap(_.split(""))
val kvRdd=wordrdd.map((_,1))
val wordcoutrdd=kvrdd.reducebykey(_+_)
wordcoutrdd.collect
...transform中l(wèi)azy加載,不會(huì)執(zhí)行,直到action操作后才執(zhí)行
1 a list of partitions(一系列分區(qū)組成的)
2 每一個(gè)分區(qū)都有一個(gè)function進(jìn)行轉(zhuǎn)換
3 每個(gè)rdd都有對(duì)其他RDD的依賴
4 optionally,針對(duì)keyvalue形式的rdd可以指定分區(qū),告訴它如何分片
5 optionally,處理RDD的每個(gè)分片,split數(shù)據(jù)在哪里,就去哪計(jì)算,移動(dòng)計(jì)算
操作
- transformation
- 創(chuàng)建一個(gè)新的數(shù)據(jù)集
- lazy mode
- example: map flatmap filter groupbykey reducebykey sortbykey join union
- action
- 返回一個(gè)值給driverprogram:例如保存 saveastextfile("address")
- 緩存為后續(xù)執(zhí)行節(jié)省時(shí)間 persistent() cache()
RDD依賴
- 窄依賴
- 子RDD的每個(gè)分區(qū)依賴于常數(shù)個(gè)父分區(qū)
- 輸入輸出一對(duì)一的算子,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變
- 輸入輸出一對(duì)一,但結(jié)果的RDD分區(qū)結(jié)構(gòu)發(fā)生了變化
- 從輸入中選擇部分元素的算子
- 寬依賴
- 子RDD的分區(qū)依賴于所有的父RDD紛分區(qū)
- 對(duì)單個(gè)RDD基于key進(jìn)行重組和reduce
- 對(duì)兩個(gè)RDD基于key進(jìn)行join和重組
RDDshuffle
- 針對(duì)split重新調(diào)整分區(qū)的機(jī)制
- 發(fā)生調(diào)整分區(qū)的會(huì)有此過程
- jion操作、bykey操作
spark內(nèi)核分析
- RDD object
- DAG scheduler
- spark 階段劃分:根據(jù)有沒有shuffle
- task scheduler
- worker
案例
- sort
- sortbykey
- sortbyvalue
wordcount.map(x=>(x._2,x._1)).sortbykey(false).map(x=>(x._2,x._1)).collect(按照降序)
- top k
- wordcount.map(x=>(x._2,x._1)).sortbykey(false).map(x=>(x._2,x._1)).take(3)(取前三)
- group top key
- rdd.map(_.split(" ")).map(x=>(x(0),x(1))).grouByKey.map(x=>{val xx=x._1
val yy=x._2
(xx,yy.tolist.sorted.reverse.take(3)
)}).collect
//tolist實(shí)現(xiàn)將前面得到的是iterable
轉(zhuǎn)變?yōu)閘ist
默認(rèn)是升序,加reverse成降序