“夫唯兵者,不祥之器,物或惡之,故有道者不處。
君子居則貴左,用兵則貴右。
兵者不祥之器,非君子之器,不得已而用之,恬淡為上。
勝而不美,而美之者,是樂(lè)殺人。
夫樂(lè)殺人者,則不可得志于天下矣。
吉事尚左,兇事尚右。
偏將軍居左,上將軍居右,言以喪禮處之。
殺人之眾,以悲哀泣之,戰(zhàn)勝以喪禮處之?!?sup>[1]
Spark對(duì)RDD的持久化操作(cache()、persist()、checkpoint())是很重要的,可以將rdd存放在不同的存儲(chǔ)介質(zhì)中,方便后續(xù)的操作能重復(fù)使用。
cache()
persist()
cache和persist都是用于將一個(gè)RDD進(jìn)行緩存,這樣在之后使用的過(guò)程中就不需要重新計(jì)算,可以大大節(jié)省程序運(yùn)行時(shí)間。
cache和persist的區(qū)別:cache只有一個(gè)默認(rèn)的緩存級(jí)別MEMORY_ONLY,而persist可以根據(jù)情況設(shè)置其它的緩存級(jí)別。
RDD的緩存級(jí)別:
查看 StorageLevel 類(lèi)的源碼
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
可以看到這里列出了12種緩存級(jí)別,但這些有什么區(qū)別呢?可以看到每個(gè)緩存級(jí)別后面都跟了一個(gè)StorageLevel的構(gòu)造函數(shù),里面包含了4個(gè)或5個(gè)參數(shù),如下:
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
查看其構(gòu)造函數(shù)
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
可以看到StorageLevel類(lèi)的主構(gòu)造器包含了5個(gè)參數(shù):
- useDisk:使用硬盤(pán)(外存)
- useMemory:使用內(nèi)存
- useOffHeap:使用堆外內(nèi)存,這是Java虛擬機(jī)里面的概念,堆外內(nèi)存意味著把內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是虛擬機(jī))。這樣做的結(jié)果就是能保持一個(gè)較小的堆,以減少垃圾收集對(duì)應(yīng)用的影響。
- deserialized:反序列化,其逆過(guò)程序列化(Serialization)是java提供的一種機(jī)制,將對(duì)象表示成一連串的字節(jié);而反序列化就表示將字節(jié)恢復(fù)為對(duì)象的過(guò)程。序列化是對(duì)象永久化的一種機(jī)制,可以將對(duì)象及其屬性保存起來(lái),并能在反序列化后直接恢復(fù)這個(gè)對(duì)象
- replication:備份數(shù)(在多個(gè)節(jié)點(diǎn)上備份)
理解了這5個(gè)參數(shù),StorageLevel 的12種緩存級(jí)別就不難理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種緩存級(jí)別的RDD將存儲(chǔ)在硬盤(pán)以及內(nèi)存中,使用序列化(在硬盤(pán)中),并且在多個(gè)節(jié)點(diǎn)上備份2份(正常的RDD只有一份)
checkpoint()
sc.sparkContext.setCheckpointDir('...')
......
......
rdd.cache()
rdd.checkpoint()
......
checkpoint接口是將RDD持久化到HDFS中,與persist的區(qū)別是checkpoint會(huì)切斷此RDD之前的依賴關(guān)系,而persist會(huì)保留依賴關(guān)系。checkpoint的兩大作用:一是spark程序長(zhǎng)期駐留,過(guò)長(zhǎng)的依賴會(huì)占用很多的系統(tǒng)資源,定期checkpoint可以有效的節(jié)省資源;二是維護(hù)過(guò)長(zhǎng)的依賴關(guān)系可能會(huì)出現(xiàn)問(wèn)題,一旦spark程序運(yùn)行失敗,RDD的容錯(cuò)成本會(huì)很高。
注意:checkpoint執(zhí)行前要先進(jìn)行cache,避免兩次計(jì)算。
-
老子《道德經(jīng)》第三十一章,老子故里,中國(guó)鹿邑。 ?