Spark累加器(Accumulator)陷阱及解決辦法

程序中可能會使用到spark提供的累加器功能,可是如果你不了解它的運行機制,有時候會帶來一些負面作用(錯誤的累加結(jié)果)。

下文會介紹產(chǎn)生錯誤結(jié)果的原因,并提供一種解決辦法。

新博客地址

Accumulator簡介

Accumulator是spark提供的累加器,顧名思義,該變量只能夠增加。

只有driver能獲取到Accumulator的值(使用value方法),Task只能對其做增加操作(使用 +=)。你也可以在為Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,可以幫助你了解程序運行的情況。

Accumulator使用

使用示例

舉個最簡單的accumulator的使用例子:


//在driver中定義

val accum = sc.accumulator(0, "Example Accumulator")

//在task中進行累加

sc.parallelize(1 to 10).foreach(x=> accum += 1)

//在driver中輸出

accum.value

//結(jié)果將返回10

res: 10

累加器的錯誤用法


val accum= sc.accumulator(0, "Error Accumulator")

val data = sc.parallelize(1 to 10)

//用accumulator統(tǒng)計偶數(shù)出現(xiàn)的次數(shù),同時偶數(shù)返回0,奇數(shù)返回1

val newData = data.map{x => {

if(x%2 == 0){

accum += 1

0

}else 1

}}

//使用action操作觸發(fā)執(zhí)行

newData.count

//此時accum的值為5,是我們要的結(jié)果

accum.value

//繼續(xù)操作,查看剛才變動的數(shù)據(jù),foreach也是action操作

newData.foreach(println)

//上個步驟沒有進行累計器操作,可是累加器此時的結(jié)果已經(jīng)是10了

//這并不是我們想要的結(jié)果

accum.value

原因分析

官方對這個問題的解釋如下描述:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

我們都知道,spark中的一系列transform操作會構(gòu)成一串長的任務(wù)鏈,此時需要通過一個action操作來觸發(fā),accumulator也是一樣。因此在一個action操作之前,你調(diào)用value方法查看其數(shù)值,肯定是沒有任何變化的。

所以在第一次count(action操作)之后,我們發(fā)現(xiàn)累加器的數(shù)值變成了5,是我們要的答案。

之后又對新產(chǎn)生的的newData進行了一次foreach(action操作),其實這個時候又執(zhí)行了一次map(transform)操作,所以累加器又增加了5。最終獲得的結(jié)果變成了10。

這里寫圖片描述

解決辦法

看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結(jié)果的準確性。

事實上,還是有解決方案的,只要將任務(wù)之間的依賴關(guān)系切斷就可以了。什么方法有這種功能呢?你們肯定都想到了,cache,persist。調(diào)用這個方法的時候會將之前的依賴切除,后續(xù)的累加器就不會再被之前的transfrom操作影響到了。

這里寫圖片描述

val accum= sc.accumulator(0, "Error Accumulator")

val data = sc.parallelize(1 to 10)

//代碼和上方相同

val newData = data.map{x => {...}}

//使用cache緩存數(shù)據(jù),切斷依賴。

newData.cache.count

//此時accum的值為5

accum.value

newData.foreach(println)

//此時的accum依舊是5

accum.value

總結(jié)

使用Accumulator時,為了保證準確性,只使用一次action操作。如果需要使用多次則使用cache或persist操作切斷依賴。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容