翻譯:Hadoop權威指南之Spark-4

本文原始地址

Persistence

回到本章開頭的例子,我們可以把“年度-氣溫”的中間數(shù)據(jù)集緩存在內(nèi)存中:

scala> tuples.cache()
res1: tuples.type = MappedRDD[4] at map at <console>:18

調(diào)用cache()不會立刻把RDD緩存到內(nèi)存中,只是對這個RDD做一個標記,當Spark job運行的時候,實際的緩存行為才會發(fā)生。因此我們首先強制運行一個job:

scala> tuples.reduceByKey((a, b) => Math.max(a, b)).foreach(println(_))
INFO BlockManagerInfo: Added rdd_4_0 in memory on 192.168.1.90:64640
INFO BlockManagerInfo: Added rdd_4_1 in memory on 192.168.1.90:64640
(1950,22)
(1949,111)

關于BlockManagerInfo的日志顯示,作為job運行的一部分,RDD的分區(qū)會被保持在內(nèi)存中。日志顯示這個RDD的編號是4(在調(diào)用cache()方法之后的控制臺輸出中,也能看到這個信息),它包含兩個分區(qū),標簽分別是0和1。如果在這個緩存的數(shù)據(jù)集上運行另一個job,我們會看到這個RDD將從內(nèi)存中加載。這次我們計算最低氣溫:

scala> tuples.reduceByKey((a, b) => Math.min(a, b)).foreach(println(_))
INFO BlockManager: Found block rdd_4_0 locally
INFO BlockManager: Found block rdd_4_1 locally
(1949,78)
(1950,-11)

這是在微小數(shù)據(jù)集上的簡單示例,但是對于更大的job,節(jié)省的時間將很可觀。在MapReduce中,為了執(zhí)行另一個計算,輸入數(shù)據(jù)集必須再次從磁盤加載。即使中間數(shù)據(jù)可以作為輸入(比如一個清洗后的數(shù)據(jù)集,無效行和不必要的字段都已移除),也不能改變“數(shù)據(jù)必須從磁盤加載”的事實,這是很慢的。Spark會把數(shù)據(jù)集緩存在一個跨集群的內(nèi)存高速緩存中,這就意味著任何基于此數(shù)據(jù)集的計算都會執(zhí)行的非??臁?/p>

在對數(shù)據(jù)進行交互式探索時,這種效率是極其有用的。這也自然適合某些類型的算法,比如迭代算法,一次迭代計算的結果可以緩存在內(nèi)存中,成為下次迭代計算的輸入。這種算法也可以用MapReduce實現(xiàn),每次迭代都是一個單獨的MapReduce job,因此每次迭代的結果必須寫入磁盤,然后在下次迭代時再讀回來。

緩存的RDD只能被同一個application中的job獲取。要在不同的application之間共享數(shù)據(jù)集,第一個application必須使用某個saveAs*()方法(saveAsTextFile(),saveAsHadoopFile()等等)來寫到外部存儲中,然后第二個application使用SparkContext中的對應方法(textFile(),hadoopFile()等等)再次加載。同樣的,當一個application終止時,它緩存的所有RDD都被銷毀,除非顯式的保存下來,否則不能再次訪問。

Persistence levels

調(diào)用cache()會把RDD的每個分區(qū)持久化到執(zhí)行器(executor)的內(nèi)存中。如果執(zhí)行器沒有足夠的內(nèi)存來存儲這個RDD分區(qū),計算不會失敗,相反該分區(qū)將會根據(jù)需要進行重算。對于帶有很多trsansformation的復雜程序,這是很昂貴的。因此Spark提供了不同類型的持久化行為供用戶選擇,在調(diào)用persist()時指定StorageLevel參數(shù)即可。

默認的持久化級別是MEMORY_ONLY,這種方式使用對象的常規(guī)內(nèi)存表示。要使用更緊湊的表現(xiàn)形式,可以把分區(qū)中的元素序列化為字節(jié)數(shù)組(byte array)。這種級別是MEMORY_ONLY_SER,相比MEMORY_ONLY,這種級別會導致CPU的壓力,如果序列化之后的RDD分區(qū)能夠適應內(nèi)存,而常規(guī)的內(nèi)存表示不適合,那么這種壓力就是值得的。MEMORY_ONLY_SER還會減輕垃圾回收的壓力,因為每個RDD都以字節(jié)數(shù)組的形式存儲,而不是很多的對象。

在driver程序的日志文件中,檢查BlockManager相關的信息,可以看到一個RDD分區(qū)是否不適合內(nèi)存。另外,每個driver的SparkContext會在4040端口啟動一個HTTP服務,提供關于運行環(huán)境以及正在運行的job的有用信息,包括緩存的RDD分區(qū)的信息。

默認情況下,使用常規(guī)的Java序列化框架來序列化RDD分區(qū),不過Kryo序列化框架(下節(jié)討論)通常是更好的選擇,在大小和速度兩方面都更優(yōu)秀。如果把序列化后的分區(qū)進行壓縮,可以節(jié)省更多的空間(再一次付出CPU的代價),設置spark.rdd.compress屬性為true來啟用壓縮,屬性spark.io.compression.codec是可選設置。

如果重算一個數(shù)據(jù)集非常昂貴,那么MEMORY_AND_DISK(如果數(shù)據(jù)集在內(nèi)存中放不下,就寫到磁盤上)或者MEMORY_AND_DISK_SER(如果序列化后的數(shù)據(jù)集在內(nèi)存中放不下,就寫到磁盤上)是合適的。

還有一些更高級的和實驗中的持久化級別,用來在集群中的多個節(jié)點上復制分區(qū),或者使用off-heap內(nèi)存——更多細節(jié),查看Spark文檔。

Serialization

在Spark中需要考慮序列化的兩個方面:序列化數(shù)據(jù)和序列化函數(shù)(或閉包)。

Data

首先來看數(shù)據(jù)的序列化。默認情況下,Spark使用Java序列化框架在執(zhí)行器之間的網(wǎng)絡上傳輸數(shù)據(jù),或者以序列化的形式來緩存數(shù)據(jù)。對程序員來說,Java的序列化很好理解,只需確定你使用的類實現(xiàn)了java.io.Serializable接口或者java.io.Externalizable接口,但從性能和大小的角度來看,這種方式的效率不高。

對于大多數(shù)的Spark程序,更好的選擇是Kryo序列化框架。Kryo是一個高效的通用的Java序列化庫。要使用Kryo,在driver程序的SparkConf上設置spark.serializer屬性如下:

conf.set("spark.serializer",  "org.apache.spark.serializer.KryoSerializer")

Kryo不要求你的類實現(xiàn)特定接口,因此簡單的Java對象不需要任何改動即可在RDD中使用。話雖如此,如果在使用一個類之前把它注冊到Kryo會更加高效。這是因為Kryo會創(chuàng)建一個引用,指向那個序列化對象的類(一個對象對應一個引用),如果類已注冊,該引用是個整數(shù)ID,如果類沒有注冊,該引用是類的全名。這個引導僅僅適用于你自己的類,Scala類和許多其他的框架類(比如Avro Generic或者Thrift類)已經(jīng)由Spark注冊了。

向Kryo注冊類也很簡單。創(chuàng)建一個KryoRegistrator的子類,覆蓋registerClasses()方法:

class CustomKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[WeatherRecord])
  }
}

最后,在driver程序中,把屬性spark.kryo.registrator設置為你的KryoRegistrator實現(xiàn)類的完整類名:

conf.set("spark.kryo.registrator", "CustomKryoRegistrator")

Functions

通常,函數(shù)的序列化將"剛好工作":在Scala中,函數(shù)都是可序列化的,使用標準Java序列化機制。這也是Spark向遠程執(zhí)行器節(jié)點發(fā)送函數(shù)時使用的方式。即使在本地模式下運行,Spark也會序列化函數(shù)。如果你在無意中引入了不可序列化的函數(shù)(比如,從一個非序列化類的方法轉(zhuǎn)換過來的函數(shù)),你會在開發(fā)過程的早期階段發(fā)現(xiàn)它。

Shared Variables

Spark程序經(jīng)常需要訪問一些數(shù)據(jù),這些數(shù)據(jù)不是一個RDD的一部分。例如,下面的程序在一個map()操作中使用了一個查找表(lookup table):

val lookup = Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u")
val result = sc.parallelize(Array(2, 1, 3)).map(lookup(_))
assert(result.collect().toSet === Set("a", "e", "i"))

這段程序會正確工作(變量lookup被序列化為閉包的一部分,傳遞給map()),但是還有一個更高效的方式來達到同樣的目的:使用廣播變量。

Broadcast Variables

廣播變量在序列化之后發(fā)送給每一個執(zhí)行器,在那里緩存起來,因此后續(xù)的任務可以在需要時訪問。這與普通的變量不同。普通的變量會序列化為閉包的一部分,然后在網(wǎng)絡上傳輸,一個任務一次傳輸。廣播變量的角色,與MapReduce中的分布式緩存相似,不過Spark內(nèi)部的實現(xiàn)是把數(shù)據(jù)存儲在內(nèi)存中,僅當內(nèi)存被耗盡時才寫到磁盤。

廣播變量的創(chuàng)建方法是,把需要廣播的變量傳遞給SparkContext的broadcast()方法。T類型的變量被包裝進Broadcast[T],然后返回:

val lookup: Broadcast[Map[Int, String]] =
    sc.broadcast(Map(1 -> "a", 2 -> "e", 3 -> "i", 4 -> "o", 5 -> "u"))
val result = sc.parallelize(Array(2, 1, 3)).map(lookup.value(_))
assert(result.collect().toSet === Set("a", "e", "i"))

在RDD的map()操作中,調(diào)用這個廣播變量的value來訪問它。

顧名思義,廣播變量是單向傳送的,從driver到task——沒有辦法更新一個廣播變量,然后回傳給driver。為此,我們需要一個累加器。

Accumulators

累加器是一個共享變量,和MapReduce中的計數(shù)器一樣,任務只能對其增加。在job完成以后,累加器的最終值可以在driver程序中獲取。下面的例子中,使用累加器計算一個整數(shù)RDD中的元素數(shù)量,同時使用reduce()操作對RDD中的值求和:

val count: Accumulator[Int] = sc.accumulator(0)
val result = sc.parallelize(Array(1, 2, 3))
  .map(i => { count += 1; i })
  .reduce((x, y) => x + y)
assert(count.value === 3)
assert(result === 6)

第一行代碼使用SparkContext的accumulator()方法,創(chuàng)建了一個累加器變量count。map()操作是一個恒等函數(shù),副作用是增加count。當Spark job的結果計算出來之后,累加器的值通過調(diào)用value來訪問。

在這個例子中,我們使用一個Int作為累加器,但任何的數(shù)值類型都是可以的。Spark還提供了兩種方法,一是使用累加器的結果類型與“被增量”的類型不同(參見SparkContext的accumulable()方法),二是可以累加可變集合中的值(通過accumulableCollection())。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容