我們平時很容易遇到說排序,并取前N個的狀況。
我們根據(jù)數(shù)據(jù)類型可以簡單分為重復(fù)鍵和不重復(fù)鍵的topN
MapReduce
對于MR來說,topN代碼比較多一些,在這里我只講講思路。
當無重復(fù)鍵的時候,
我們有數(shù)據(jù)("w"->2,"ww"->3,"r"->3)
我們的目的是對值進行排序,如用戶點擊了幾次網(wǎng)頁,值記錄的就是網(wǎng)頁。
map階段,我們要做的是獲取并且處理數(shù)據(jù),并完成本地的topN排序。
在排序時我們用的是java自帶的treeMap(它是一個基于紅黑樹的實現(xiàn))。
為什么要在map階段就進行排序呢?
因為在數(shù)據(jù)量巨大的時候,為了減少RPC和reduce的壓力。于是我們在map排好序并篩選出前N個。
reduce階段,我們只需要把map傳來的topN再進行一次排序篩選出前N個。
這樣我們的目的就達成了。
對于非唯一鍵,MR顯得笨拙一些,它必須先經(jīng)過一次reduce,把非唯一鍵變成唯一鍵后再重復(fù)上述操作。
spark
spark具有高層抽象函數(shù)。所以排序顯得十分簡單。在這里主要看看這幾個函數(shù)。
sortby
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]
sortby函數(shù)可以完成對指定數(shù)據(jù)的排序,(k,v)既可以指定k也可以指定v,第二個參數(shù)是選擇正序還是逆序(默認是true正序,一般要topN的話用逆序),因為這是一個shuffle操作所以可要指定分區(qū)。sortbykey
比sortby少一個第一個參數(shù),它是僅對key的排序。sortwith
def sortWith(lt: (A, A) => Boolean): Repr = sorted(Ordering fromLessThan lt)
一種自定義排序的方法takeOrder
take
def take(num: Int): Array[T]
抽取rdd的前n個元素top
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
默認使用降序,并抽取前n個元素tabkeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
默認使用升序,并抽取前n個元素
val arr=Map(1->2,2->3,3->1,4->4,4->5,10->23,12->21,10->2,9->1,0->2,9->3)
val conf=new SparkConf().setAppName("test")
val sc=new SparkContext(conf)
val rdd=sc.parallelize(arr.toList,4)
println(rdd.partitions.size+"======================================")
val rerdd=rdd.coalesce(3)
println((rerdd.partitions.size+"======================================"))
val pairs=rerdd.map(x=>new Tuple2(x._1,x._2))
val result=pairs.reduceByKey(_+_)
println(result.partitions.size+"======================================")
val partitions=result.sortBy(x=>x._2,false)
val res=partitions.take(3)
res.foreach(x=>println(x))
代碼的簡單實現(xiàn)。
思考:如果大量數(shù)據(jù)中進行topN有什么優(yōu)化呢?
個人認為剪枝是必要的,假如對于1-100分布的數(shù)服從正態(tài)分布,我們自然就可以過濾掉百分之50-70的數(shù)。
如果在已知平均值等情況下,更方便進行剪枝。