Spark系列課程-00xxSpark RDD持久化

我們這節(jié)課講一下RDD的持久化

RDD的持久化

這段代碼我們上午已經(jīng)看過(guò)了,有瑕疵大家看出來(lái)了嗎?
有什么瑕疵啊?
大家是否還記得我在第二節(jié)課的時(shí)候跟大家說(shuō),RDD實(shí)際是不存數(shù)據(jù)的?

image.png

我們?cè)僦匦轮v解一下這段代碼

sc.textfile是不是講數(shù)據(jù)加載到這個(gè)RDD里面去了?
對(duì)這個(gè)RDD執(zhí)行一個(gè)filter算子過(guò)濾,返回一個(gè)errors RDD
然后對(duì)errorsRDD又經(jīng)過(guò)一次過(guò)濾,然后再執(zhí)行一次count算子,我們把這個(gè)job叫做job0
然后第四行,又對(duì)這個(gè)errors執(zhí)行了一次filter,然后又count一次,我們把這個(gè)job叫做job1

那么在job1里面errors這個(gè)RDD,那我們說(shuō)了RDD里面不存數(shù)據(jù),那errors這個(gè)RDD里面的數(shù)據(jù)是通過(guò)lines這個(gè)RDD計(jì)算得來(lái)的,那rdd不存數(shù)據(jù),那他是不是要重新計(jì)算???

我再重新說(shuō)一遍,errors這個(gè)RDD由于他不存數(shù)據(jù),所以job1這個(gè)RDD里面的errors和job0里面的這個(gè)RDD,都是通過(guò)依賴的RDD計(jì)算過(guò)來(lái)的,他是怎么計(jì)算過(guò)來(lái)的,他是從lines這個(gè)RDD計(jì)算過(guò)來(lái)的

那lines這個(gè)RDD數(shù)據(jù)哪來(lái)的?他是通過(guò)textfile加載過(guò)來(lái)的

他其實(shí)是計(jì)算了幾遍?計(jì)算了兩遍對(duì)嗎?
那我們對(duì)他進(jìn)行優(yōu)化,怎么優(yōu)化??jī)?yōu)化的時(shí)候就讓他讀一遍就可以了唄?

我們重復(fù)是用errors重復(fù)使用兩次,那如果我們把errors里面的數(shù)據(jù)持久化一下,把他保存到內(nèi)存當(dāng)中去,或者保存到磁盤當(dāng)中去是不是就可以了?

我job1在使用errors的時(shí)候直接從磁盤或者內(nèi)存當(dāng)中去讀就可以了對(duì)吧?

那如果我們想給RDD做持久化,我們就要使用持久化的算子,cache、persist、checkpoint,這些算子都可以將RDD的數(shù)據(jù)進(jìn)行持久化
那么這三個(gè)算子有什么區(qū)別呢?
cache他是persist的一個(gè)簡(jiǎn)化版cache他默認(rèn)是將RDD的數(shù)據(jù)持久化到內(nèi)存里面去

persist這個(gè)算子我們可以自己指定持久化的級(jí)別,可以自定義,我們可以將RDD的數(shù)據(jù)持久化到磁盤上,如果你想要把數(shù)據(jù)持久化到磁盤上 ,必須要使用persist算子
因?yàn)閏ache是默認(rèn)將數(shù)據(jù)放到內(nèi)存
我們現(xiàn)在來(lái)寫代碼測(cè)試一下

我們看一下性能有沒有提升,我們剛才說(shuō)使用cache這個(gè)算子對(duì)剛才這個(gè)代碼進(jìn)行優(yōu)化,看性能有沒有提升,

package com.bjsxt.spark.persist

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Albert on 2017/7/15.
  *
  *
  * cache persist注意事項(xiàng)
  * 
  */
object T01Persist {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("T01Cache")
    val sc = new SparkContext(conf)

    var rdd = sc.textFile("log.txt")
    
    rdd.cache()
    //rdd.persist(StorageLevel.MEMORY_ONLY)
    // chche() = persist()  = persist(StorageLevel.MEMORY_ONLY)


    val st = System.currentTimeMillis()
    val count = rdd.count()
    val et = System.currentTimeMillis()

    println(s"count: $count \t Duration:${et-st}")

    val st1 = System.currentTimeMillis()
    val count1 = rdd.count()
    val et1 = System.currentTimeMillis()

    println(s"count: $count1 \t Duration:${et1-st1}")
    
  }
}

這段代碼大家都可以理解吧?理解的舉手,要給我反饋真實(shí)的情況啊,如果不懂的人多了,我再通過(guò)其他案例給大家講

這里給大家總結(jié)一下持久化算子的注意事項(xiàng)

cache和persist使用注意事項(xiàng)
1、cache和persist算子都是懶執(zhí)行的,必須有一個(gè)Action算子觸發(fā)執(zhí)行
2、cache和persist算子的返回值必須賦值給一個(gè)變量,在下一個(gè)job中直接使用這個(gè)變量就是使用了持久化的數(shù)據(jù)

提問(wèn)如果一個(gè)Application里面只有一個(gè)job有必要使用持久化算子嗎?
答案是沒有必要對(duì)吧?

3、cache和persist算子后面不能緊跟Aciton類算子(不能直接rdd.cache().count())

為什么不能直接點(diǎn)出來(lái)Action類算子呢?
我們還記不記得Action類算子,都有什么類型的了呢?我們?cè)谥v算子的時(shí)候說(shuō)所有Action類算子,他有三種返回類型:無(wú)類型:foreach、HDFS、Scala數(shù)據(jù)類型
那我們r(jià)dd = rdd.cache返回的類型是RDD[]

incorrect: rdd = rdd.cache().count()
correct: val cacheRDD = rdd.cache
              rdd.count()

好下面我們來(lái)看一下persist各種StorageLevel的級(jí)別,我們看一下Spark的源碼

如果我們要看源碼,我們要先找到這個(gè)方法

那cache這個(gè)方法在哪里呢?我們是在rdd. cache的這個(gè)方法吧?
所以我們?nèi)DD這個(gè)類里面找一下

image.png
image.png
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)

StorageLevel五個(gè)參數(shù)的意義

參數(shù)名稱 參數(shù)意義
_useDisk 是否使用磁盤
_useMemory 是否使用內(nèi)存
_useOffHeap 是否使用堆外內(nèi)存
_deserialized 是否不序列化,注意這里false代表序列化
_replication 副本數(shù)量默認(rèn)是1

我們看看MEMORY_ONLY 他的參數(shù)分別是
new StorageLevel(false, true, false, true)
是否使用磁盤,不使用
是否使用內(nèi)存,使用
是否使用堆外內(nèi)存,不使用
是否不序列化,是(意思就是不序列化)

沒有參數(shù)副本數(shù),所以副本數(shù)是1

其他的自己看一下記一下,老師在這著重講解一下

MEMORY_AND_DISK

MEMORY_AND_DISK他是內(nèi)存中一份磁盤中一份嗎?不是的
大家一定要記住,這個(gè)持久化級(jí)別,一共就只有一份
這個(gè)持久化的級(jí)別,會(huì)先往內(nèi)存里面持久化RDD,如果內(nèi)存不夠了,就往硬盤里面持久化
大家記住了嗎?

OFF_HEAP
OFF_HEAP 這個(gè)持久化級(jí)別的意思?
堆外內(nèi)存?什么是堆外內(nèi)存,堆以外的內(nèi)存就是堆外內(nèi)存對(duì)嗎?
JVM管理了一塊內(nèi)存對(duì)嗎?那不受JVM 的GC管理的內(nèi)存是不是就是堆外內(nèi)存???

我們?cè)诘谝还?jié)課講了一個(gè)
Tachyon現(xiàn)在改名叫Alluxio他是基于內(nèi)存的一個(gè)文件系統(tǒng),也是Berkeley技術(shù)架構(gòu)下的一個(gè)系統(tǒng),Tachyon可以和Spark進(jìn)行整合,整合好了以后,如果想用堆外內(nèi)存來(lái)持久化,就需要設(shè)置這種類型,明白嗎?如果你沒有整合Tachyon,在程序里面還使用了這個(gè)級(jí)別,程序會(huì)給你報(bào)錯(cuò)。
現(xiàn)在Tachyon用的公司還很少,國(guó)內(nèi)阿里、百度、華為才有一定規(guī)模的內(nèi)存分布式文件系統(tǒng)的集群

另外序列化的數(shù)據(jù)在持久化的時(shí)候會(huì)小一些,但帶來(lái)的就像加解密一樣,會(huì)影響性能。

有同學(xué)會(huì)問(wèn)到,我一直持久化,內(nèi)存越來(lái)越多,我怎么辦?
其實(shí)還有一個(gè)unpersist這個(gè)算子,會(huì)把rdd的持久化級(jí)別設(shè)置為StorageLevel.NONE
但一般情況下不需要使用
因?yàn)槌志没瘯?huì)有一個(gè)TTL機(jī)制,就是最近不常用的數(shù)據(jù),他會(huì)自動(dòng)清除

還有另外一個(gè)問(wèn)題,也是同學(xué)們經(jīng)常會(huì)問(wèn)到的,當(dāng)我內(nèi)存不夠的時(shí)候,我還是cache了一個(gè)RDD,會(huì)不會(huì)報(bào)OOM?
那我告訴大家一個(gè)結(jié)論,就是不會(huì),RDD的持久化,有多少內(nèi)存,他就放多少數(shù)據(jù),放不下的數(shù)據(jù)就不會(huì)進(jìn)行存儲(chǔ)

RDD持久化時(shí)內(nèi)存不夠的處理機(jī)制

然后我們講一下checkpoint
checkpoint不僅僅能夠持久化數(shù)據(jù),還能夠?qū)DD的依賴關(guān)系切斷

持久化數(shù)據(jù)

那我們說(shuō)我們選用persist進(jìn)行RDD的持久化,我們可以指定持久化的級(jí)別,還可以設(shè)置數(shù)據(jù)的副本數(shù),他的數(shù)據(jù)要么放到內(nèi)存,要么放到磁盤上,并且還能有備份

那大家考慮一下,我們是用persist給我們持久化到內(nèi)存硬盤安全,還是使用checkpoint讓Spark給我們把數(shù)據(jù)持久化到hdfs上安全?
持久化到hdfs上更安全一些對(duì)嗎?
那為什么持久化到HDFS上安全呢?
因?yàn)檫@就是雙保險(xiǎn)了,所以會(huì)更安全對(duì)吧?

切斷RDD的依賴關(guān)系

我們說(shuō)checkpoint可以切斷RDD的依賴關(guān)系,當(dāng)我們業(yè)務(wù)非常復(fù)雜的時(shí)候,需要頻繁的對(duì)RDD轉(zhuǎn)換,頻繁的轉(zhuǎn)換,會(huì)導(dǎo)致,RDD的依賴lineage特別的長(zhǎng),如果中間某一個(gè)RDD的數(shù)據(jù)壞了,錯(cuò)了,是不是要重新計(jì)算???
如果我們把RDD的依賴關(guān)系給切斷,那重新計(jì)算就會(huì)快了是吧?

我們畫圖來(lái)解釋一下這個(gè)事情

checkpoint執(zhí)行機(jī)制及優(yōu)化方法

checkpoint應(yīng)用場(chǎng)景
當(dāng)RDD的依賴關(guān)系非常長(zhǎng)的時(shí)候,如果rdd4的數(shù)據(jù)有問(wèn)題,就需要從rdd0重新計(jì)算,如果我checkpoint了rdd5,就會(huì)把rdd5的數(shù)據(jù)持久化到hdfs當(dāng)中去,并切斷rdd的依賴關(guān)系,rdd5由原來(lái)的整個(gè)lineage的依賴,轉(zhuǎn)換為依賴HDFS的checkpointRDD,不需要從rdd0開始逐個(gè)的計(jì)算了
checkpoint運(yùn)行機(jī)制

  1. 當(dāng)RDD的job執(zhí)行完畢后,會(huì)從finalRDD從后往前回溯數(shù)據(jù)
  2. 當(dāng)遇到某一個(gè)RDD調(diào)用了checkpoint方法是,會(huì)對(duì)當(dāng)前的RDD做一個(gè)標(biāo)記
  3. 當(dāng)checkpoint RDD5時(shí),會(huì)啟動(dòng)一個(gè)新的job線程進(jìn)行rdd5的數(shù)據(jù)計(jì)算
    新的job會(huì)重新從rdd0開始計(jì)算,直到算出rdd5的數(shù)據(jù),然后將數(shù)據(jù)持久化到hdfs當(dāng)中去
    優(yōu)化
    在執(zhí)行checkpoint之前先對(duì)rdd5進(jìn)行一次cache
    這樣做的好處是,不用重新計(jì)算,而是直接把cache里面的數(shù)據(jù)寫入到hdfs文件系統(tǒng)里面了

下面我通過(guò)代碼的方式,給大家演示如何使用checkpoint

package com.bjsxt.spark.persist

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Albert on 2017/7/15.
  */
object T03Checkpoint {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("T01Cache")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("/Users/AlbertC/checkpointtest")


    val rdd =  sc.makeRDD(1 to 10)

    rdd.checkpoint()

    rdd.count()

    sc.stop()
  }

}

setChectpointDir是設(shè)置一個(gè)存儲(chǔ)路徑,這里我用的是我本地電腦的一個(gè)路徑也可以放到hdfs上面

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容