翻譯 Spark 共享變量部分的官方文檔(Spark 2.4.3)。
通常,當傳遞給 Spark 操作 (如 map 或 reduce ) 的函數(shù)在遠程集群節(jié)點上執(zhí)行時,在函數(shù)中使用的所有外部變量都是單獨拷貝的變量副本。這些變量被復制到每臺機器上,對遠程機器上的變量更新不會傳播回驅(qū)動程序。支持通用的、任務(wù)間的讀寫共享變量是很低效的。不過,Spark確實為兩種常見的使用模式提供了兩種有限的共享變量類型:廣播變量和累加器。
一、廣播變量
廣播變量允許程序員在每臺機器上保留一個只讀變量,而不是給每個 task 都發(fā)送一份它的副本。 例如,它們可用于使用一個有效的方式為每個節(jié)點提供很大的輸入數(shù)據(jù)集的副本。 Spark 還嘗試使用有效的廣播算法來分發(fā)廣播變量,以降低通信成本。
Spark 的所有 action 操作都是貫穿著很多個 stage 的,這些 stage 由 shuffle 操作進行劃分。 Spark 自動廣播每個 stage 中任務(wù)所需的公共數(shù)據(jù)。以這種方式廣播的數(shù)據(jù)是以序列化形式緩存并在運行每個 task 之前進行反序列化。所以,廣播變量在多個 stage 中的所有 task 都需要一份同樣的數(shù)據(jù)這樣的場景中很有用。
廣播變量是通過 SparkContext.broadcast(v) 這樣的方式創(chuàng)建的。它是將原始變量 v 包裹到自己封裝的變量中去,然后通過 .value() 這個方法獲取原始變量的值,代碼如下:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
當廣播變量被創(chuàng)建之后,在集群上所有的計算函數(shù)中都會使用廣播變量去計算,因此原始變量 v 就不需要多次被復制到很多個節(jié)點上了。另外,原始變量 v 在廣播變量被創(chuàng)建之后不可再被修改,如果在廣播變量創(chuàng)建之后再去修改原始變量 v 會導致集群中每個節(jié)點拿到的共享變量值不一樣。
二、累加器
累加器內(nèi)部是通過關(guān)聯(lián)和交換操作實現(xiàn) “add” 操作的變量,因此可以并發(fā)執(zhí)行。它可以用來實現(xiàn)計算器或者求和操作。Spark 天然支持數(shù)值類型的累加,程序員也可以自定義一些新的數(shù)據(jù)類型用來累加。
作為用戶,你可以創(chuàng)建命名或者未命名的累加器。如下圖所示,一個命名的累加器(counter) 會被展示在使用該累加器的 stage 的 web UI 上面。 Spark 會展示每一個被 Tasks 表中的任一個 task 修改過的累加器的值。

在 UI 中追蹤累加器的值可以幫助理解運行中的各個 stage 的進度。
一個數(shù)字類型的累加器可以通過這樣的方式創(chuàng)建:SparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator() ,去計算 long 類型或者 double 類型的數(shù)值累加。集群中每個 task 在做累加計算任務(wù)的時候可以通過調(diào)用 add 方法去實現(xiàn)。但是,不可以在集群上讀取累加器的值。只有在 driver 程序中才可以讀取累加器的值,通過 value 這個方法。
下面的這段代碼是用累加器去將一個 array 中的每個元素相加:
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
除了在代碼中使用內(nèi)建的 long 類型的累加器之外,程序員也可以通過繼承 AccumulatorV2 去實現(xiàn)想要的類型的累加器。AccumulatorV2 這個抽象類有很多個方法需要去重寫,如:reset 方法(用來將累加器置零的)、add 方法(用來和另外一個值做累加的)、merge 方法(用來合并另外一個相同類型的累加器到該累加器的)。其它需要被重寫的方法可以參考 API documentation 。比如,我們可以自定義一個累加器 MyVector 代表數(shù)學中的向量集合,可以這么寫:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
注意一點,當程序員自定義了一些數(shù)據(jù)類型的累加器之后,累加器的值的數(shù)據(jù)類型可以和你相加的元素類型不一致。
注意 :當 Spark 的 task 計算任務(wù)結(jié)束后,Spark 將會嘗試著將這個 task 中所有的累加計算合并到一個累加器上去。如果合并失敗,Spark 會忽略這次失敗,仍然認為這個 task 的計算任務(wù)是成功的,并且繼續(xù)跑其它的 task。所以,一個有 bug 的累加器將不會影響 Spark 的作業(yè),但一個累加器可能會在整個 Spark 作業(yè)成功跑完之后還沒有成功更新到最新的值。
由于累加器的 update 操作只會在 action 算子內(nèi)部執(zhí)行,Spark 保證了每個 task 對累加器的更新操作只有一次。比如重啟 task 不會更新累加器的值。在 transform 算子操作時,用戶需要知道每個 task 對累加器的更新操作可能不止一次,比如一但某個 task 或者 job 的 stage 被重新執(zhí)行。
累加器不會改變 Spark 的 lazy 特性。如果累加器的值在 RDD 某個操作中被更新了,他們的值只會在這個 RDD 的某個 action 操作的某個部分計算中更新。因此,累加器的更新不會保證在像 map 這樣的算子中被立刻更新。可以看下面的代碼片段:
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.