Spark方法aggregate講解

首先看一下Spark官網(wǎng)對該方法的講解:
aggregate(RDD.scala)

大致的意思是aggregate接收兩個函數(shù),和一個初始化值。seqOp函數(shù)用于聚集每一個分區(qū),combOp用于聚集所有分區(qū)聚集后的結(jié)果。每一個分區(qū)的聚集,和最后所有分區(qū)的聚集都需要初始化值的參與。
舉例如下:

集群環(huán)境:一臺Master,三臺Worker,在spark-shell中測試

scala> val  seqOp:(Int,Int)=>Int={(a,b)=>{println("seqOp"+a+"\t"+b);math.min(a,b)}}
seqOp: (Int, Int) => Int = <function2>

scala> val combOp:(Int,Int)=>Int={(a,b)=>{println("combOp"+a+"\t"+b);a+b}}
combOp: (Int, Int) => Int = <function2>

scala> val z=sc.parallelize(List(1,2,3,4,5,6,7,8),2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> z.aggregate(3)(seqOp,combOp)
[Stage 13:>                                        (0 + 0) / 2]combOp3 1
combOp 4 3
res13: Int = 7                                                                  

為什么會等于7呢?

我們首先看一下集群中任務(wù):
任務(wù)

可以看出有兩個任務(wù),原因是我們將List并發(fā)數(shù)設(shè)置為了2,Spark會將List拆分成2部分同時執(zhí)行。再進一步看這兩個任務(wù)的的統(tǒng)計信息:
任務(wù)的統(tǒng)計信息

可以看出這兩個任務(wù)在兩個worker上執(zhí)行,可以看到任務(wù)的啟動時間,執(zhí)行了多久等信息。再進一步看任務(wù)的stdout輸出日志:


任務(wù)一的輸出日志

任務(wù)二的輸出日志

可以看出spark將List拆分成了兩部分,啟動兩個任務(wù)分別執(zhí)行。再來看看seqOp函數(shù)表達的意思,seqOp取的是兩個數(shù)中的較小值。如第一半部分List(1,2,3,4),spark會拿初始值3與這個List中的每一個元素分別比較,最后得出的結(jié)果是1.同時,第二半部分List得出的結(jié)果是3,然后spark再將這兩部分得出的結(jié)果調(diào)用combOp處理,combOp是兩個數(shù)的相加,spark首先將初始值3加上1得出4,再加上3得到7.

接下來再看一下使用aggregate方法編寫wordcount例子。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable

/**
  * User:cool coding
  * Date:20171214
  * Time:16:12:20
  *
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("wordcount").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val data=sc.textFile("H:/hadoop/wordcount.txt");
    val words: RDD[String] = data.flatMap(_.split(" "))
    val countsMap=words.aggregate(mutable.HashMap[String,Int]())((agg:mutable.HashMap[String,Int], word)=>{
      if(!agg.contains(word)){
        agg.put(word,1)
      }else{
        agg.put(word,agg(word)+1)
      }
      agg
      },(agg1:mutable.HashMap[String,Int],agg2:mutable.HashMap[String,Int])=> {
         for((word,count)<-agg1){
           if(!agg2.contains(word)){
             agg2.put(word,1)
           }else{
             agg2.put(word,agg2(word)+count)
           }
         }
      agg2
    }
    )
    println(countsMap.toList)
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容