spark中cache()、persist()、checkpoint()解析

“夫唯兵者,不祥之器,物或惡之,故有道者不處。
君子居則貴左,用兵則貴右。
兵者不祥之器,非君子之器,不得已而用之,恬淡為上。
勝而不美,而美之者,是樂(lè)殺人。
夫樂(lè)殺人者,則不可得志于天下矣。
吉事尚左,兇事尚右。
偏將軍居左,上將軍居右,言以喪禮處之。
殺人之眾,以悲哀泣之,戰(zhàn)勝以喪禮處之?!?sup>[1]

Spark對(duì)RDD的持久化操作(cache()、persist()、checkpoint())是很重要的,可以將rdd存放在不同的存儲(chǔ)介質(zhì)中,方便后續(xù)的操作能重復(fù)使用。

cache()

persist()

cache和persist都是用于將一個(gè)RDD進(jìn)行緩存,這樣在之后使用的過(guò)程中就不需要重新計(jì)算,可以大大節(jié)省程序運(yùn)行時(shí)間。
cache和persist的區(qū)別:cache只有一個(gè)默認(rèn)的緩存級(jí)別MEMORY_ONLY,而persist可以根據(jù)情況設(shè)置其它的緩存級(jí)別。
RDD的緩存級(jí)別:

查看 StorageLevel 類(lèi)的源碼
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

可以看到這里列出了12種緩存級(jí)別,但這些有什么區(qū)別呢?可以看到每個(gè)緩存級(jí)別后面都跟了一個(gè)StorageLevel的構(gòu)造函數(shù),里面包含了4個(gè)或5個(gè)參數(shù),如下:

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

查看其構(gòu)造函數(shù)
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel類(lèi)的主構(gòu)造器包含了5個(gè)參數(shù):

  • useDisk:使用硬盤(pán)(外存)
  • useMemory:使用內(nèi)存
  • useOffHeap:使用堆外內(nèi)存,這是Java虛擬機(jī)里面的概念,堆外內(nèi)存意味著把內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。這樣做的結(jié)果就是能保持一個(gè)較小的堆,以減少垃圾收集對(duì)應(yīng)用的影響。
  • deserialized:反序列化,其逆過(guò)程序列化(Serialization)是java提供的一種機(jī)制,將對(duì)象表示成一連串的字節(jié);而反序列化就表示將字節(jié)恢復(fù)為對(duì)象的過(guò)程。序列化是對(duì)象永久化的一種機(jī)制,可以將對(duì)象及其屬性保存起來(lái),并能在反序列化后直接恢復(fù)這個(gè)對(duì)象
  • replication:備份數(shù)(在多個(gè)節(jié)點(diǎn)上備份)

理解了這5個(gè)參數(shù),StorageLevel 的12種緩存級(jí)別就不難理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種緩存級(jí)別的RDD將存儲(chǔ)在硬盤(pán)以及內(nèi)存中,使用序列化(在硬盤(pán)中),并且在多個(gè)節(jié)點(diǎn)上備份2份(正常的RDD只有一份)

checkpoint()

sc.sparkContext.setCheckpointDir('...')
......
......
rdd.cache()
rdd.checkpoint()
......

checkpoint接口是將RDD持久化到HDFS中,與persist的區(qū)別是checkpoint會(huì)切斷此RDD之前的依賴關(guān)系,而persist會(huì)保留依賴關(guān)系。checkpoint的兩大作用:一是spark程序長(zhǎng)期駐留,過(guò)長(zhǎng)的依賴會(huì)占用很多的系統(tǒng)資源,定期checkpoint可以有效的節(jié)省資源;二是維護(hù)過(guò)長(zhǎng)的依賴關(guān)系可能會(huì)出現(xiàn)問(wèn)題,一旦spark程序運(yùn)行失敗,RDD的容錯(cuò)成本會(huì)很高。

注意:checkpoint執(zhí)行前要先進(jìn)行cache,避免兩次計(jì)算。


  1. 老子《道德經(jīng)》第三十一章,老子故里,中國(guó)鹿邑。 ?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容