package com.meng.nan.day720
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object Custom {
//自定義累加器
? def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.spark_project").setLevel(Level.WARN)
val conf=new SparkConf()
.setMaster("local[8]")
.setAppName("Custom")
val sc=new SparkContext(conf)
val list=List(
"hello you",
"i hate you",
"i miss you",
"i love you",
"fuck you"
? ? )
val words=sc.parallelize(list).flatMap(_.split("\\s+"))
//使用自定義累加器
? ? val mAccu=new MyAccumulator
sc.register(mAccu,"aparkAccu")
val pairs=words.map(words=>{
if(words=="you"||"hate"==words){
mAccu.add(words)
}
(words,1)
})
pairs.count()
println(mAccu.value)
sc.stop()
}
}
class MyAccumulatorextends? AccumulatorV2[String,mutable.Map[String,Int]]{
var map=mutable.Map[String,Int]()
override def isZero: Boolean =true
? //累加器是否有初始值
//獲取累加器的值
? override def value: mutable.Map[String, Int] =map
? //考貝累加器
? override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
val myAccu=new MyAccumulator
myAccu.map=this.map
? ? myAccu
}
//重置累加器值
? override def reset(): Unit =map.clear()
//進(jìn)行累加操作
? override def add(word:String): Unit = {
map.put(word,map.getOrElse(word,0)+1)
}
//合并其他task中累加器的值
? override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val othermap=other.value
for ((word,count)<-othermap){
map.put(word,map.getOrElse(word,0)+count)
}
}
}