spark streaming的checkpoint目的是保證長時(shí)間運(yùn)行的任務(wù)在意外掛掉后保證數(shù)據(jù)不丟失,checkpoint包含兩種數(shù)據(jù):metadata和data,本篇主要討論對metadata的checkpoint。
如何checkpoint
如果要對metadata做checkpoint,首先要有一個(gè)可靠的文件系統(tǒng)保證數(shù)據(jù)的安全性,spark支持hdfs等。通過代碼streamingContext.checkpoint(checkpointDirectory)指定具體的存儲路徑;
jobGenerator在每一個(gè)batch時(shí)間后調(diào)用generateJobs方法,在jobScheduler.submitJobSet提交任務(wù)后,執(zhí)行doCheckpoint方法來保存metadata;
doCheckpoint方法中先判斷是否需要checkpoint,條件為ssc.checkpointDuration != null && ssc.checkpointDir != null,最重要的是指定后面的ssc.checkpointDir指定路徑,再判斷是否到時(shí)間,如果滿足條件進(jìn)行正式代碼;
-
通過ssc.graph.updateCheckpointData(time)調(diào)用DStream的updateCheckpointData,從而執(zhí)行每個(gè)DStream子類的checkpointData.update(currentTime),以DirectKafkaInputDStream為例,最后執(zhí)行的是DirectKafkaInputDStreamCheckpointData的update,目的是更新要持久的源數(shù)據(jù)checkpointData.data;通過dependencies.foreach(_.updateCheckpointData(currentTime))使所有依賴的DStream執(zhí)行;
所有DStream都執(zhí)行完update后,執(zhí)行CheckpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater),本次batchcheckpoint完成;
當(dāng)jobGenerator接收到batch完成事件后,通過jobGenerator.onBatchCompletion(jobSet.time)調(diào)用clearMetadata方法,最后執(zhí)行DStream的clearMetadata刪除generatedRDDs的過期RDD的metadata。
如何恢復(fù)
-
要從checkpoint中恢復(fù),在創(chuàng)建StreamingContext時(shí)略有不同,代碼如圖
StreamingContext的getOrCreate方法中,先通過CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError)反序列化出Checkpoint,如果Checkpoint不為空即路徑存在且有數(shù)據(jù),使用StreamingContext(null, _, null)構(gòu)造方法創(chuàng)建StreamingContext;
StreamingContext.start后,在使用DStreamGraph的實(shí)例時(shí)時(shí)會判斷此實(shí)例為新創(chuàng)建或從checkpoint中恢復(fù),如從checkpoint中恢復(fù),則執(zhí)行g(shù)raph.restoreCheckpointData(),通過DStream的restoreCheckpointData最終調(diào)用DStream子類內(nèi)部的DStreamCheckpointData.restore將保存的RDD metadata寫回到generatedRDDs里;
-
同時(shí)jobGenerator在start時(shí),判斷ssc.isCheckpointPresent,實(shí)際就是判斷ssc里面的cp_是否有值從而執(zhí)行restart方法。restart方法首先從checkpoint的時(shí)間開始恢復(fù)任務(wù),然后生成從最后時(shí)間到restartTime時(shí)間序列;
-
調(diào)用graph.generateJobs生成job,在方法內(nèi)會調(diào)用DStream的generateJobs時(shí),在getOrCompute方法通過上面還原的generatedRDDs獲取對應(yīng)時(shí)間的RDD源數(shù)據(jù)信息,如果沒有再重新生成,最后提交任務(wù)。
創(chuàng)建與恢復(fù)區(qū)別
-
先看一下Checkpoint中包括哪些信息:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll以上數(shù)據(jù)都是通過反序列化恢復(fù)得到的,對新程序的所有的配置都不會生效,比如隊(duì)列、資源數(shù)等。
恢復(fù)checkpoint時(shí),從文件系統(tǒng)反序列化數(shù)據(jù)成CheckPoint的具體代碼為Checkpoint.deserialize(fis, conf),所以還原的信息要與當(dāng)前編譯的serialVersion一致,否則會出現(xiàn)異常
在jobGenerator中,新創(chuàng)建的StreamingContext調(diào)用的是startFirstTime方法,會初始化DStream的一些數(shù)據(jù);而checkpoint恢復(fù)調(diào)用的是restart。



