Streaming metadate checkpoint詳解

spark streaming的checkpoint目的是保證長時(shí)間運(yùn)行的任務(wù)在意外掛掉后保證數(shù)據(jù)不丟失,checkpoint包含兩種數(shù)據(jù):metadata和data,本篇主要討論對metadata的checkpoint。

如何checkpoint

  1. 如果要對metadata做checkpoint,首先要有一個(gè)可靠的文件系統(tǒng)保證數(shù)據(jù)的安全性,spark支持hdfs等。通過代碼streamingContext.checkpoint(checkpointDirectory)指定具體的存儲路徑;

  2. jobGenerator在每一個(gè)batch時(shí)間后調(diào)用generateJobs方法,在jobScheduler.submitJobSet提交任務(wù)后,執(zhí)行doCheckpoint方法來保存metadata;

  3. doCheckpoint方法中先判斷是否需要checkpoint,條件為ssc.checkpointDuration != null && ssc.checkpointDir != null,最重要的是指定后面的ssc.checkpointDir指定路徑,再判斷是否到時(shí)間,如果滿足條件進(jìn)行正式代碼;

  4. 通過ssc.graph.updateCheckpointData(time)調(diào)用DStream的updateCheckpointData,從而執(zhí)行每個(gè)DStream子類的checkpointData.update(currentTime),以DirectKafkaInputDStream為例,最后執(zhí)行的是DirectKafkaInputDStreamCheckpointData的update,目的是更新要持久的源數(shù)據(jù)checkpointData.data;通過dependencies.foreach(_.updateCheckpointData(currentTime))使所有依賴的DStream執(zhí)行;
  5. 所有DStream都執(zhí)行完update后,執(zhí)行CheckpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater),本次batchcheckpoint完成;

  6. 當(dāng)jobGenerator接收到batch完成事件后,通過jobGenerator.onBatchCompletion(jobSet.time)調(diào)用clearMetadata方法,最后執(zhí)行DStream的clearMetadata刪除generatedRDDs的過期RDD的metadata。

如何恢復(fù)

  1. 要從checkpoint中恢復(fù),在創(chuàng)建StreamingContext時(shí)略有不同,代碼如圖
  2. StreamingContext的getOrCreate方法中,先通過CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError)反序列化出Checkpoint,如果Checkpoint不為空即路徑存在且有數(shù)據(jù),使用StreamingContext(null, _, null)構(gòu)造方法創(chuàng)建StreamingContext;

  3. StreamingContext.start后,在使用DStreamGraph的實(shí)例時(shí)時(shí)會判斷此實(shí)例為新創(chuàng)建或從checkpoint中恢復(fù),如從checkpoint中恢復(fù),則執(zhí)行g(shù)raph.restoreCheckpointData(),通過DStream的restoreCheckpointData最終調(diào)用DStream子類內(nèi)部的DStreamCheckpointData.restore將保存的RDD metadata寫回到generatedRDDs里;

  4. 同時(shí)jobGenerator在start時(shí),判斷ssc.isCheckpointPresent,實(shí)際就是判斷ssc里面的cp_是否有值從而執(zhí)行restart方法。restart方法首先從checkpoint的時(shí)間開始恢復(fù)任務(wù),然后生成從最后時(shí)間到restartTime時(shí)間序列;
  5. 調(diào)用graph.generateJobs生成job,在方法內(nèi)會調(diào)用DStream的generateJobs時(shí),在getOrCompute方法通過上面還原的generatedRDDs獲取對應(yīng)時(shí)間的RDD源數(shù)據(jù)信息,如果沒有再重新生成,最后提交任務(wù)。

創(chuàng)建與恢復(fù)區(qū)別

  1. 先看一下Checkpoint中包括哪些信息:

    val master = ssc.sc.master
    val framework = ssc.sc.appName
    val jars = ssc.sc.jars
    val graph = ssc.graph
    val checkpointDir = ssc.checkpointDir
    val checkpointDuration = ssc.checkpointDuration
    val pendingTimes = ssc.scheduler.getPendingTimes().toArray
    val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
    val sparkConfPairs = ssc.conf.getAll
    

    以上數(shù)據(jù)都是通過反序列化恢復(fù)得到的,對新程序的所有的配置都不會生效,比如隊(duì)列、資源數(shù)等。

  2. 恢復(fù)checkpoint時(shí),從文件系統(tǒng)反序列化數(shù)據(jù)成CheckPoint的具體代碼為Checkpoint.deserialize(fis, conf),所以還原的信息要與當(dāng)前編譯的serialVersion一致,否則會出現(xiàn)異常

  3. 在jobGenerator中,新創(chuàng)建的StreamingContext調(diào)用的是startFirstTime方法,會初始化DStream的一些數(shù)據(jù);而checkpoint恢復(fù)調(diào)用的是restart。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 兜兜轉(zhuǎn)轉(zhuǎn)這么多年,不知道為什么總是忘不了一個(gè)人,就是想見他一面,看看他過的怎么樣,不管他是否還記得我!
    走遠(yuǎn)201411閱讀 142評論 0 0
  • 我們家的男孩現(xiàn)在3歲,我準(zhǔn)備這樣養(yǎng)男孩。 男孩主要分成三個(gè)階段: 1、0-6歲幼兒園階段 這個(gè)階段關(guān)鍵字是“愛”。...
    余叔閱讀 257評論 0 1
  • 清晨,被婉轉(zhuǎn)悅耳的鳥鳴叫醒。我躺在被窩里,用心仔細(xì)地聽著這美妙的音樂。嘰嘰喳喳,嘰嘰喳喳,嘿!居然聽不見了。 于是...
    沐洋公子閱讀 127評論 0 1
  • 是不是我前世的愛情遺落在江南 若不然為什麼今世的寂寞總與江南有染。 ...
    禾鄉(xiāng)閱讀 659評論 2 19

友情鏈接更多精彩內(nèi)容