
aggregate(RDD.scala)
大致的意思是aggregate接收兩個函數(shù),和一個初始化值。seqOp函數(shù)用于聚集每一個分區(qū),combOp用于聚集所有分區(qū)聚集后的結(jié)果。每一個分區(qū)的聚集,和最后所有分區(qū)的聚集都需要初始化值的參與。
舉例如下:
集群環(huán)境:一臺Master,三臺Worker,在spark-shell中測試
scala> val seqOp:(Int,Int)=>Int={(a,b)=>{println("seqOp"+a+"\t"+b);math.min(a,b)}}
seqOp: (Int, Int) => Int = <function2>
scala> val combOp:(Int,Int)=>Int={(a,b)=>{println("combOp"+a+"\t"+b);a+b}}
combOp: (Int, Int) => Int = <function2>
scala> val z=sc.parallelize(List(1,2,3,4,5,6,7,8),2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> z.aggregate(3)(seqOp,combOp)
[Stage 13:> (0 + 0) / 2]combOp3 1
combOp 4 3
res13: Int = 7
為什么會等于7呢?

任務(wù)
可以看出有兩個任務(wù),原因是我們將List并發(fā)數(shù)設(shè)置為了2,Spark會將List拆分成2部分同時執(zhí)行。再進一步看這兩個任務(wù)的的統(tǒng)計信息:

任務(wù)的統(tǒng)計信息
可以看出這兩個任務(wù)在兩個worker上執(zhí)行,可以看到任務(wù)的啟動時間,執(zhí)行了多久等信息。再進一步看任務(wù)的stdout輸出日志:

任務(wù)一的輸出日志

任務(wù)二的輸出日志
可以看出spark將List拆分成了兩部分,啟動兩個任務(wù)分別執(zhí)行。再來看看seqOp函數(shù)表達的意思,seqOp取的是兩個數(shù)中的較小值。如第一半部分List(1,2,3,4),spark會拿初始值3與這個List中的每一個元素分別比較,最后得出的結(jié)果是1.同時,第二半部分List得出的結(jié)果是3,然后spark再將這兩部分得出的結(jié)果調(diào)用combOp處理,combOp是兩個數(shù)的相加,spark首先將初始值3加上1得出4,再加上3得到7.
接下來再看一下使用aggregate方法編寫wordcount例子。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* User:cool coding
* Date:20171214
* Time:16:12:20
*
*/
object WordCount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("wordcount").setMaster("local[2]")
val sc=new SparkContext(conf)
val data=sc.textFile("H:/hadoop/wordcount.txt");
val words: RDD[String] = data.flatMap(_.split(" "))
val countsMap=words.aggregate(mutable.HashMap[String,Int]())((agg:mutable.HashMap[String,Int], word)=>{
if(!agg.contains(word)){
agg.put(word,1)
}else{
agg.put(word,agg(word)+1)
}
agg
},(agg1:mutable.HashMap[String,Int],agg2:mutable.HashMap[String,Int])=> {
for((word,count)<-agg1){
if(!agg2.contains(word)){
agg2.put(word,1)
}else{
agg2.put(word,agg2(word)+count)
}
}
agg2
}
)
println(countsMap.toList)
}
}