我們這節(jié)課講一下RDD的持久化

這段代碼我們上午已經(jīng)看過(guò)了,有瑕疵大家看出來(lái)了嗎?
有什么瑕疵啊?
大家是否還記得我在第二節(jié)課的時(shí)候跟大家說(shuō),RDD實(shí)際是不存數(shù)據(jù)的?

我們?cè)僦匦轮v解一下這段代碼
sc.textfile是不是講數(shù)據(jù)加載到這個(gè)RDD里面去了?
對(duì)這個(gè)RDD執(zhí)行一個(gè)filter算子過(guò)濾,返回一個(gè)errors RDD
然后對(duì)errorsRDD又經(jīng)過(guò)一次過(guò)濾,然后再執(zhí)行一次count算子,我們把這個(gè)job叫做job0
然后第四行,又對(duì)這個(gè)errors執(zhí)行了一次filter,然后又count一次,我們把這個(gè)job叫做job1
那么在job1里面errors這個(gè)RDD,那我們說(shuō)了RDD里面不存數(shù)據(jù),那errors這個(gè)RDD里面的數(shù)據(jù)是通過(guò)lines這個(gè)RDD計(jì)算得來(lái)的,那rdd不存數(shù)據(jù),那他是不是要重新計(jì)算???
我再重新說(shuō)一遍,errors這個(gè)RDD由于他不存數(shù)據(jù),所以job1這個(gè)RDD里面的errors和job0里面的這個(gè)RDD,都是通過(guò)依賴的RDD計(jì)算過(guò)來(lái)的,他是怎么計(jì)算過(guò)來(lái)的,他是從lines這個(gè)RDD計(jì)算過(guò)來(lái)的
那lines這個(gè)RDD數(shù)據(jù)哪來(lái)的?他是通過(guò)textfile加載過(guò)來(lái)的
他其實(shí)是計(jì)算了幾遍?計(jì)算了兩遍對(duì)嗎?
那我們對(duì)他進(jìn)行優(yōu)化,怎么優(yōu)化??jī)?yōu)化的時(shí)候就讓他讀一遍就可以了唄?
我們重復(fù)是用errors重復(fù)使用兩次,那如果我們把errors里面的數(shù)據(jù)持久化一下,把他保存到內(nèi)存當(dāng)中去,或者保存到磁盤當(dāng)中去是不是就可以了?
我job1在使用errors的時(shí)候直接從磁盤或者內(nèi)存當(dāng)中去讀就可以了對(duì)吧?
那如果我們想給RDD做持久化,我們就要使用持久化的算子,cache、persist、checkpoint,這些算子都可以將RDD的數(shù)據(jù)進(jìn)行持久化
那么這三個(gè)算子有什么區(qū)別呢?
cache他是persist的一個(gè)簡(jiǎn)化版cache他默認(rèn)是將RDD的數(shù)據(jù)持久化到內(nèi)存里面去
persist這個(gè)算子我們可以自己指定持久化的級(jí)別,可以自定義,我們可以將RDD的數(shù)據(jù)持久化到磁盤上,如果你想要把數(shù)據(jù)持久化到磁盤上 ,必須要使用persist算子
因?yàn)閏ache是默認(rèn)將數(shù)據(jù)放到內(nèi)存
我們現(xiàn)在來(lái)寫代碼測(cè)試一下
我們看一下性能有沒有提升,我們剛才說(shuō)使用cache這個(gè)算子對(duì)剛才這個(gè)代碼進(jìn)行優(yōu)化,看性能有沒有提升,
package com.bjsxt.spark.persist
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Albert on 2017/7/15.
*
*
* cache persist注意事項(xiàng)
*
*/
object T01Persist {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("T01Cache")
val sc = new SparkContext(conf)
var rdd = sc.textFile("log.txt")
rdd.cache()
//rdd.persist(StorageLevel.MEMORY_ONLY)
// chche() = persist() = persist(StorageLevel.MEMORY_ONLY)
val st = System.currentTimeMillis()
val count = rdd.count()
val et = System.currentTimeMillis()
println(s"count: $count \t Duration:${et-st}")
val st1 = System.currentTimeMillis()
val count1 = rdd.count()
val et1 = System.currentTimeMillis()
println(s"count: $count1 \t Duration:${et1-st1}")
}
}
這段代碼大家都可以理解吧?理解的舉手,要給我反饋真實(shí)的情況啊,如果不懂的人多了,我再通過(guò)其他案例給大家講
這里給大家總結(jié)一下持久化算子的注意事項(xiàng)
cache和persist使用注意事項(xiàng)
1、cache和persist算子都是懶執(zhí)行的,必須有一個(gè)Action算子觸發(fā)執(zhí)行
2、cache和persist算子的返回值必須賦值給一個(gè)變量,在下一個(gè)job中直接使用這個(gè)變量就是使用了持久化的數(shù)據(jù)
提問(wèn)如果一個(gè)Application里面只有一個(gè)job有必要使用持久化算子嗎?
答案是沒有必要對(duì)吧?
3、cache和persist算子后面不能緊跟Aciton類算子(不能直接rdd.cache().count())
為什么不能直接點(diǎn)出來(lái)Action類算子呢?
我們還記不記得Action類算子,都有什么類型的了呢?我們?cè)谥v算子的時(shí)候說(shuō)所有Action類算子,他有三種返回類型:無(wú)類型:foreach、HDFS、Scala數(shù)據(jù)類型
那我們r(jià)dd = rdd.cache返回的類型是RDD[]
incorrect: rdd = rdd.cache().count()
correct: val cacheRDD = rdd.cache
rdd.count()
好下面我們來(lái)看一下persist各種StorageLevel的級(jí)別,我們看一下Spark的源碼
如果我們要看源碼,我們要先找到這個(gè)方法
那cache這個(gè)方法在哪里呢?我們是在rdd. cache的這個(gè)方法吧?
所以我們?nèi)DD這個(gè)類里面找一下


private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
StorageLevel五個(gè)參數(shù)的意義
| 參數(shù)名稱 | 參數(shù)意義 |
|---|---|
| _useDisk | 是否使用磁盤 |
| _useMemory | 是否使用內(nèi)存 |
| _useOffHeap | 是否使用堆外內(nèi)存 |
| _deserialized | 是否不序列化,注意這里false代表序列化
|
| _replication | 副本數(shù)量默認(rèn)是1 |
我們看看MEMORY_ONLY 他的參數(shù)分別是
new StorageLevel(false, true, false, true)
是否使用磁盤,不使用
是否使用內(nèi)存,使用
是否使用堆外內(nèi)存,不使用
是否不序列化,是(意思就是不序列化)
沒有參數(shù)副本數(shù),所以副本數(shù)是1
其他的自己看一下記一下,老師在這著重講解一下
MEMORY_AND_DISK
MEMORY_AND_DISK他是內(nèi)存中一份磁盤中一份嗎?不是的
大家一定要記住,這個(gè)持久化級(jí)別,一共就只有一份
這個(gè)持久化的級(jí)別,會(huì)先往內(nèi)存里面持久化RDD,如果內(nèi)存不夠了,就往硬盤里面持久化
大家記住了嗎?
OFF_HEAP
OFF_HEAP 這個(gè)持久化級(jí)別的意思?
堆外內(nèi)存?什么是堆外內(nèi)存,堆以外的內(nèi)存就是堆外內(nèi)存對(duì)嗎?
JVM管理了一塊內(nèi)存對(duì)嗎?那不受JVM 的GC管理的內(nèi)存是不是就是堆外內(nèi)存???
- 參考閱讀:
什么是堆外內(nèi)存?堆內(nèi)內(nèi)存還是堆外內(nèi)存?
順帶普及一下內(nèi)存的知識(shí):
Stack and Heap 堆和棧的區(qū)別
我們?cè)诘谝还?jié)課講了一個(gè)
Tachyon現(xiàn)在改名叫Alluxio他是基于內(nèi)存的一個(gè)文件系統(tǒng),也是Berkeley技術(shù)架構(gòu)下的一個(gè)系統(tǒng),Tachyon可以和Spark進(jìn)行整合,整合好了以后,如果想用堆外內(nèi)存來(lái)持久化,就需要設(shè)置這種類型,明白嗎?如果你沒有整合Tachyon,在程序里面還使用了這個(gè)級(jí)別,程序會(huì)給你報(bào)錯(cuò)。
現(xiàn)在Tachyon用的公司還很少,國(guó)內(nèi)阿里、百度、華為才有一定規(guī)模的內(nèi)存分布式文件系統(tǒng)的集群
另外序列化的數(shù)據(jù)在持久化的時(shí)候會(huì)小一些,但帶來(lái)的就像加解密一樣,會(huì)影響性能。
有同學(xué)會(huì)問(wèn)到,我一直持久化,內(nèi)存越來(lái)越多,我怎么辦?
其實(shí)還有一個(gè)unpersist這個(gè)算子,會(huì)把rdd的持久化級(jí)別設(shè)置為StorageLevel.NONE
但一般情況下不需要使用
因?yàn)槌志没瘯?huì)有一個(gè)TTL機(jī)制,就是最近不常用的數(shù)據(jù),他會(huì)自動(dòng)清除
還有另外一個(gè)問(wèn)題,也是同學(xué)們經(jīng)常會(huì)問(wèn)到的,當(dāng)我內(nèi)存不夠的時(shí)候,我還是cache了一個(gè)RDD,會(huì)不會(huì)報(bào)OOM?
那我告訴大家一個(gè)結(jié)論,就是不會(huì),RDD的持久化,有多少內(nèi)存,他就放多少數(shù)據(jù),放不下的數(shù)據(jù)就不會(huì)進(jìn)行存儲(chǔ)

然后我們講一下checkpoint
checkpoint不僅僅能夠持久化數(shù)據(jù),還能夠?qū)DD的依賴關(guān)系切斷
持久化數(shù)據(jù)
那我們說(shuō)我們選用persist進(jìn)行RDD的持久化,我們可以指定持久化的級(jí)別,還可以設(shè)置數(shù)據(jù)的副本數(shù),他的數(shù)據(jù)要么放到內(nèi)存,要么放到磁盤上,并且還能有備份
那大家考慮一下,我們是用persist給我們持久化到內(nèi)存硬盤安全,還是使用checkpoint讓Spark給我們把數(shù)據(jù)持久化到hdfs上安全?
持久化到hdfs上更安全一些對(duì)嗎?
那為什么持久化到HDFS上安全呢?
因?yàn)檫@就是雙保險(xiǎn)了,所以會(huì)更安全對(duì)吧?
切斷RDD的依賴關(guān)系
我們說(shuō)checkpoint可以切斷RDD的依賴關(guān)系,當(dāng)我們業(yè)務(wù)非常復(fù)雜的時(shí)候,需要頻繁的對(duì)RDD轉(zhuǎn)換,頻繁的轉(zhuǎn)換,會(huì)導(dǎo)致,RDD的依賴lineage特別的長(zhǎng),如果中間某一個(gè)RDD的數(shù)據(jù)壞了,錯(cuò)了,是不是要重新計(jì)算???
如果我們把RDD的依賴關(guān)系給切斷,那重新計(jì)算就會(huì)快了是吧?
我們畫圖來(lái)解釋一下這個(gè)事情

checkpoint應(yīng)用場(chǎng)景
當(dāng)RDD的依賴關(guān)系非常長(zhǎng)的時(shí)候,如果rdd4的數(shù)據(jù)有問(wèn)題,就需要從rdd0重新計(jì)算,如果我checkpoint了rdd5,就會(huì)把rdd5的數(shù)據(jù)持久化到hdfs當(dāng)中去,并切斷rdd的依賴關(guān)系,rdd5由原來(lái)的整個(gè)lineage的依賴,轉(zhuǎn)換為依賴HDFS的checkpointRDD,不需要從rdd0開始逐個(gè)的計(jì)算了
checkpoint運(yùn)行機(jī)制
- 當(dāng)RDD的job執(zhí)行完畢后,會(huì)從finalRDD從后往前回溯數(shù)據(jù)
- 當(dāng)遇到某一個(gè)RDD調(diào)用了checkpoint方法是,會(huì)對(duì)當(dāng)前的RDD做一個(gè)標(biāo)記
- 當(dāng)checkpoint RDD5時(shí),會(huì)啟動(dòng)一個(gè)新的job線程進(jìn)行rdd5的數(shù)據(jù)計(jì)算
新的job會(huì)重新從rdd0開始計(jì)算,直到算出rdd5的數(shù)據(jù),然后將數(shù)據(jù)持久化到hdfs當(dāng)中去
優(yōu)化
在執(zhí)行checkpoint之前先對(duì)rdd5進(jìn)行一次cache
這樣做的好處是,不用重新計(jì)算,而是直接把cache里面的數(shù)據(jù)寫入到hdfs文件系統(tǒng)里面了
下面我通過(guò)代碼的方式,給大家演示如何使用checkpoint
package com.bjsxt.spark.persist
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Albert on 2017/7/15.
*/
object T03Checkpoint {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("T01Cache")
val sc = new SparkContext(conf)
sc.setCheckpointDir("/Users/AlbertC/checkpointtest")
val rdd = sc.makeRDD(1 to 10)
rdd.checkpoint()
rdd.count()
sc.stop()
}
}
setChectpointDir是設(shè)置一個(gè)存儲(chǔ)路徑,這里我用的是我本地電腦的一個(gè)路徑也可以放到hdfs上面