一、為什么使用共享變量?
當(dāng)Spark在使用某個算子對RDD進(jìn)行運(yùn)算時,如果需要用到外部變量,比如對RDD[Int]中的每個element乘以一個系數(shù)factor,得到一個新的RDD(定義在main方法中,driver進(jìn)程):
val num = 3 # 外部變量
val newRdd = rdd.map(num => num * factor) # transformation操作,lazy方式執(zhí)行
newRdd.count # action操作,立即執(zhí)行
那么需要將factor從driver端通過網(wǎng)絡(luò)傳輸?shù)剿衪ask中(task數(shù)=RDD的partition數(shù),每個task只能處理自己的那個factor副本),如果說1個executor負(fù)責(zé)處理多個task,那么該executor將會得到多份相同的factor,這種方式無疑增加了網(wǎng)絡(luò)傳輸?shù)拈_銷和內(nèi)存開銷。
很顯然,如果同一個executor節(jié)點(diǎn)上的所有task能夠共享同一份factor副本,那么將有效減小網(wǎng)絡(luò)傳輸?shù)拈_銷和內(nèi)存開銷,這時候“共享變量”就閃亮登場了,這里的“共享“是指被多個task所共享。
Spark的共享變量包括兩類:廣播變量和累加變量。

上圖紅色箭頭表示使用共享變量,藍(lán)色箭頭表示不使用共享變量。
二、Broadcast Variable 廣播變量
2.1 廣播變量的特點(diǎn)
- 廣播變量即將一個外部變量從driver端高效地通過網(wǎng)絡(luò)發(fā)送到各executor節(jié)點(diǎn),采取的是高效地“廣播”方式,即先發(fā)送給離driver端最近(Spark內(nèi)部定義了這種近的衡量方法)的一個executor(比如executor A),然后executor A會將該變量發(fā)送到它附近的executor,以此類推這種指數(shù)型增長的發(fā)送方式,可以有效地節(jié)省網(wǎng)絡(luò)傳輸開銷;
- 一個application可能涉及到多個stage,在每個stage(driver中的DAG Scheduler將main方法代碼解析得到DAG,分析每個stage得到該stage的taskset,taskset發(fā)送給task scheduler)中Spark會自動發(fā)送task所需的common data,只有當(dāng)廣播變量跨stage被task使用,才認(rèn)為是有效的;
- 由于廣播變量會涉及到網(wǎng)絡(luò)傳輸,因此必然涉及到對象的序列化和反序列化,driver端的外部變量被序列化之后得到byte數(shù)組,通過網(wǎng)絡(luò)發(fā)送到集群其他機(jī)器的executor節(jié)點(diǎn)并以序列化的格式緩存在內(nèi)存中,再反序列化為對象(在task執(zhí)行之前完成);
- executor只能讀?。╮ead-only)廣播變量,而不能修改它;
- 廣播變量是從driver(即main方法)中定義并發(fā)出(sc.broadcast(factor)),在executor端接收并使用(broadcast.value),比如:
# 被廣播的外部變量list
val sparkSession = SparkSession.builder().master("yarn").appName("Datalake")getOrCreate()
val sc = sparkSession.sparkContext
// driver端定義廣播變量
val listB = sc.broadcast(list)
// 初始化操作,用mappartition比map更加高效
val rddMap: RDD[(Int, Row)] = oldRdd.mapPartitions {
partition => {
// initialization
// executor端接收廣播變量
InitUtil.init(listB.value)
partition.map(row => (getPartitionOrder(row), getNewRow(row)))
}
}
2.2 典型case:小表和大表做join操作
大表和小表做join操作時,可以把小表broadcast到各個節(jié)點(diǎn),從而就可以把join操作轉(zhuǎn)變成普通的操作(hashmap.get),以避免耗時的shuffle操作。
代碼可以參考博客:http://www.itdecent.cn/p/0c77036ad01b
三、Accumulator 累加變量
3.1 累加變量特點(diǎn)
- 累加變量主要用于多個task對同一個變量進(jìn)行共享性(并行)的操作;
- 累加器主要分為三種,即:LongAccumulator(長整數(shù))、DoubleAccumulator(浮點(diǎn)數(shù))、CollectionAccumulator(集合);
- 可以實(shí)現(xiàn)自定義的累加器(比如BigDecimalAccumulator),比如:
import java.math.BigInteger
import org.apache.spark.util.AccumulatorV2
class BigIntegerAccumulator extends AccumulatorV2 {
var num = BigInteger.ZERO
def BigIntegerAccumulator(num: BigInteger) {
this.num = num
}
override def isZero: Boolean = {
num.compareTo(BigInteger.ZERO)
}
override def copy(): AccumulatorV2[Nothing, Nothing] = new BigIntegerAccumulator()
override def reset(): Unit = {
num = BigInteger.ZERO
}
override def add(v: Nothing): Unit = {
}
override def merge(other: AccumulatorV2[Nothing, Nothing]): Unit = {
num = num.add(other.value)
}
override def value(): BigInteger = {
num
}
}
import org.apache.spark.sql.SparkSession
class Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
val sc = spark.sparkContext
// 直接new自定義的累加器
val bigIntegerAccumulator = new BigIntegerAccumulator()
// 然后在SparkContext上注冊一下
sc.register(bigIntegerAccumulator, "bigIntegerAccumulator")
bigIntegerAccumulator.reset()
rdd.repartition(6).map(element => {
println(element)
bigIntegerAccumulator.add(new BigInteger("1"))
})
rdd.count()
// rdd.count()
}
}
- 重點(diǎn):累加變量的最終結(jié)果應(yīng)該不受累加順序的影響,比如StringAccumulator就是一個錯誤的例子,就相當(dāng)于開了多個線程,每個線程隨機(jī)sleep若干毫秒然后往StringBuffer中追加字符,但是最后追加出來的字符串是無法被預(yù)測的;
- executor端只能對累加變量做累加操作,driver端只能讀取累加變量的值;
3.2 使用陷進(jìn):避免重復(fù)累加
每執(zhí)行一個task,就會對累加變量做一次累加操作。我們知道Spark存在兩種操作,即transformation和action,前者是lazy方式執(zhí)行,只有遇到action操作才會執(zhí)行之前的transformation操作(當(dāng)前action和上一個action之間的)。設(shè)想,如果一個application的main方法中,對一個RDD先后執(zhí)行了1個transfromation操作和2個action操作的話,那么同樣的transformation操作會被執(zhí)行兩次,這就會導(dǎo)致執(zhí)行了2*partition個數(shù)(個)task,那么累加變量就被重復(fù)多做了“parttiion個數(shù)”次累加操作。比如:
class Main {
def main(args: Array[String]): Unit = {
......
rdd.repartition(6).map(element => {
println(element)
bigIntegerAccumulator.add(new BigInteger("1"))
})
rdd.count()
rdd.count()
}
}
解決方法就是:對上一次的transformation計(jì)算結(jié)果進(jìn)行cache,這樣的話遇到第二個count操作時就不會再做一次transformation操作了,比如:
class Main {
def main(args: Array[String]): Unit = {
......
rdd.repartition(6).map(element => {
println(element)
bigIntegerAccumulator.add(new BigInteger("1"))
}).cache()
rdd.count()
rdd.count()
}
}
執(zhí)行第一個count時,會執(zhí)行 rdd.repartition(6).map() 操作并將結(jié)果進(jìn)行了cache;執(zhí)行第二個count時,不會再執(zhí)行rdd.repartition(6).map() 操作。
參考:
https://blog.csdn.net/qq_35866165/article/details/86671302