程序中可能會使用到spark提供的累加器功能,可是如果你不了解它的運行機制,有時候會帶來一些負面作用(錯誤的累加結(jié)果)。
下文會介紹產(chǎn)生錯誤結(jié)果的原因,并提供一種解決辦法。
Accumulator簡介
Accumulator是spark提供的累加器,顧名思義,該變量只能夠增加。
只有driver能獲取到Accumulator的值(使用value方法),Task只能對其做增加操作(使用 +=)。你也可以在為Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,可以幫助你了解程序運行的情況。
Accumulator使用
使用示例
舉個最簡單的accumulator的使用例子:
//在driver中定義
val accum = sc.accumulator(0, "Example Accumulator")
//在task中進行累加
sc.parallelize(1 to 10).foreach(x=> accum += 1)
//在driver中輸出
accum.value
//結(jié)果將返回10
res: 10
累加器的錯誤用法
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator統(tǒng)計偶數(shù)出現(xiàn)的次數(shù),同時偶數(shù)返回0,奇數(shù)返回1
val newData = data.map{x => {
if(x%2 == 0){
accum += 1
0
}else 1
}}
//使用action操作觸發(fā)執(zhí)行
newData.count
//此時accum的值為5,是我們要的結(jié)果
accum.value
//繼續(xù)操作,查看剛才變動的數(shù)據(jù),foreach也是action操作
newData.foreach(println)
//上個步驟沒有進行累計器操作,可是累加器此時的結(jié)果已經(jīng)是10了
//這并不是我們想要的結(jié)果
accum.value
原因分析
官方對這個問題的解釋如下描述:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
我們都知道,spark中的一系列transform操作會構(gòu)成一串長的任務(wù)鏈,此時需要通過一個action操作來觸發(fā),accumulator也是一樣。因此在一個action操作之前,你調(diào)用value方法查看其數(shù)值,肯定是沒有任何變化的。
所以在第一次count(action操作)之后,我們發(fā)現(xiàn)累加器的數(shù)值變成了5,是我們要的答案。
之后又對新產(chǎn)生的的newData進行了一次foreach(action操作),其實這個時候又執(zhí)行了一次map(transform)操作,所以累加器又增加了5。最終獲得的結(jié)果變成了10。
解決辦法
看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結(jié)果的準確性。
事實上,還是有解決方案的,只要將任務(wù)之間的依賴關(guān)系切斷就可以了。什么方法有這種功能呢?你們肯定都想到了,cache,persist。調(diào)用這個方法的時候會將之前的依賴切除,后續(xù)的累加器就不會再被之前的transfrom操作影響到了。
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//代碼和上方相同
val newData = data.map{x => {...}}
//使用cache緩存數(shù)據(jù),切斷依賴。
newData.cache.count
//此時accum的值為5
accum.value
newData.foreach(println)
//此時的accum依舊是5
accum.value
總結(jié)
使用Accumulator時,為了保證準確性,只使用一次action操作。如果需要使用多次則使用cache或persist操作切斷依賴。