【Spark Java API】broadcast、accumulator

broadcast


官方文檔描述:

Broadcast a read-only variable to the cluster, returning a 
[[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
The variable will be sent to each cluster only once.

函數(shù)原型:

def broadcast[T](value: T): Broadcast[T]

廣播變量允許程序員將一個(gè)只讀的變量緩存在每臺(tái)機(jī)器上,而不用在任務(wù)之間傳遞變量。廣播變量可被用于有效地給每個(gè)節(jié)點(diǎn)一個(gè)大輸入數(shù)據(jù)集的副本。Spark還嘗試使用高效地廣播算法來(lái)分發(fā)變量,進(jìn)而減少通信的開(kāi)銷(xiāo)。 Spark的動(dòng)作通過(guò)一系列的步驟執(zhí)行,這些步驟由分布式的洗牌操作分開(kāi)。Spark自動(dòng)地廣播每個(gè)步驟每個(gè)任務(wù)需要的通用數(shù)據(jù)。這些廣播數(shù)據(jù)被序列化地緩存,在運(yùn)行任務(wù)之前被反序列化出來(lái)。這意味著當(dāng)我們需要在多個(gè)階段的任務(wù)之間使用相同的數(shù)據(jù),或者以反序列化形式緩存數(shù)據(jù)是十分重要的時(shí)候,顯式地創(chuàng)建廣播變量才有用。

源碼分析:

def broadcast[T: ClassTag](value: T): Broadcast[T] = {  
  assertNotStopped()  
  if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {    
    // This is a warning instead of an exception in order to avoid breaking user programs that    
    // might have created RDD broadcast variables but not used them:    
    logWarning("Can not directly broadcast RDDs; instead, call collect() and "      
      + "broadcast the result (see SPARK-5063)")  
  }  
  val bc = env.broadcastManager.newBroadcast[T](value, isLocal)  
  val callSite = getCallSite  
  logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)  
  cleaner.foreach(_.registerBroadcastForCleanup(bc))  
  bc
}

實(shí)例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);
final Broadcast<List<Integer>> broadcast = javaSparkContext.broadcast(data);
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {    
  List<Integer> iList = broadcast.value();    
  @Override    
  public Integer call(Integer v1) throws Exception {        
    Integer isum = 0;        
    for(Integer i : iList)            
      isum += i;        
    return v1 + isum;    
  }
});
System.out.println(result.collect());

accumulator


官方文檔描述:

 Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
 values to using the `add` method. Only the master can access the accumulator's `value`.

函數(shù)原型:

def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T]
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])   
   : Accumulator[T]

累加器是僅僅被相關(guān)操作累加的變量,因此可以在并行中被有效地支持。它可以被用來(lái)實(shí)現(xiàn)計(jì)數(shù)器和sum。Spark原生地只支持?jǐn)?shù)字類(lèi)型的累加器,開(kāi)發(fā)者可以添加新類(lèi)型的支持。如果創(chuàng)建累加器時(shí)指定了名字,可以在Spark的UI界面看到。這有利于理解每個(gè)執(zhí)行階段的進(jìn)程(對(duì)于Python還不支持) 。
累加器通過(guò)對(duì)一個(gè)初始化了的變量v調(diào)用SparkContext.accumulator(v)來(lái)創(chuàng)建。在集群上運(yùn)行的任務(wù)可以通過(guò)add或者”+=”方法在累加器上進(jìn)行累加操作。但是,它們不能讀取它的值。只有驅(qū)動(dòng)程序能夠讀取它的值,通過(guò)累加器的value方法。

源碼分析:

def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])  
  : Accumulator[T] = {  
  val acc = new Accumulator(initialValue, param, Some(name))  
  cleaner.foreach(_.registerAccumulatorForCleanup(acc))  
  acc
}

實(shí)例:

class VectorAccumulatorParam implements AccumulatorParam<Vector> {    
  @Override    
  //合并兩個(gè)累加器的值。
  //參數(shù)r1是一個(gè)累加數(shù)據(jù)集合
  //參數(shù)r2是另一個(gè)累加數(shù)據(jù)集合
  public Vector addInPlace(Vector r1, Vector r2) {
    r1.addAll(r2);
    return r1;    
  }    
  @Override 
  //初始值   
  public Vector zero(Vector initialValue) {        
     return initialValue;    
  }    
  @Override
  //添加額外的數(shù)據(jù)到累加值中
  //參數(shù)t1是當(dāng)前累加器的值
  //參數(shù)t2是被添加到累加器的值    
  public Vector addAccumulator(Vector t1, Vector t2) {        
      t1.addAll(t2);        
      return t1;    
  }
}
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,5);

final Accumulator<Integer> accumulator = javaSparkContext.accumulator(0);
Vector initialValue = new Vector();
for(int i=6;i<9;i++)    
  initialValue.add(i);
//自定義累加器
final Accumulator accumulator1 = javaSparkContext.accumulator(initialValue,new VectorAccumulatorParam());
JavaRDD<Integer> result = javaRDD.map(new Function<Integer, Integer>() {    
  @Override    
  public Integer call(Integer v1) throws Exception {        
    accumulator.add(1);        
    Vector term = new Vector();        
    term.add(v1);        
    accumulator1.add(term);        
    return v1;    
  }
});
System.out.println(result.collect());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator.value());
System.out.println("~~~~~~~~~~~~~~~~~~~~~" + accumulator1.value());
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容