spark structedStreaming是如何實現(xiàn)容錯的

sss如何實現(xiàn)eoc的

spark structed Streaming簡稱sss,它主要還是采用微批的模式提供端到端的eoc(exactly-once)語義,要實現(xiàn)eoc,需要3方面保證,一個是可以replay的source,二是框架提供作業(yè)狀態(tài)的持久化能力,三是sinker要實現(xiàn)冪等

DataSource

DataSource要replayable,就是指數(shù)據(jù)源可以追蹤當(dāng)前讀取的位置,并且能夠從上次失敗的位置重新消費數(shù)據(jù)
這兩點可以保證能夠從持久化的狀態(tài)中恢復(fù)任務(wù),比如apache kafka和Amazon Kinesis,kafka消費可以commit offset,可以根據(jù)offset seek到指定的位置開始消費;
如果是quickstart中的socket數(shù)據(jù)源類型,它就不能replay,也就無法實現(xiàn)eoc

追蹤數(shù)據(jù)處理的點位主要依賴spark提供的checkpoint機制,checkpoint保存的信息主要是當(dāng)前批次的數(shù)據(jù)源的點位等元數(shù)據(jù)信息

StreamingQueryManager的startQuery和createQuery方法,將checkpoint的位置傳給StreamExecution對象
StreamExecution初始化org.apache.spark.sql.execution.streaming.OffsetSeqLog,這就是文檔中提到的wal日志,從名字就可以猜測它的功能是順序?qū)憯?shù)據(jù)源的點位信息,類似于數(shù)據(jù)庫的事務(wù)日志哦,它記錄了已經(jīng)處理的每一批數(shù)據(jù)的點位信息,當(dāng)前批次N在處理完之前,就先把點位信息寫到OffsetSeqLog,第N批的點位寫進OffsetSeqLog意味著第N-1批數(shù)據(jù)已經(jīng)正確的交給了sinker

org.apache.spark.sql.execution.streaming.MicroBatchExecution#constructNextBatch()
我們來看這個代碼片段

updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
  assert(offsetLog.add(
    currentBatchId,
    availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
    s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
    logInfo(s"Committed offsets for batch $currentBatchId. " +
    s"Metadata ${offsetSeqMetadata.toString}")
 
  // NOTE: The following code is correct because runStream() processes exactly one
  // batch at a time. If we add pipeline parallelism (multiple batches in flight at
  // the same time), this cleanup logic will need to change.
 
  // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
  // sources to discard data from the previous batch.
  if (currentBatchId != 0) {
    val prevBatchOff = offsetLog.get(currentBatchId - 1)
    if (prevBatchOff.isDefined) {
      prevBatchOff.get.toStreamProgress(sources).foreach {
        case (src: Source, off) => src.commit(off)
        case (reader: MicroBatchReader, off) =>
          reader.commit(reader.deserializeOffset(off.json))
      }
    } else {
      throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
    }
  }
   
  // It is now safe to discard the metadata beyond the minimum number to retain.
  // Note that purge is exclusive, i.e. it purges everything before the target ID.
  if (minLogEntriesToMaintain < currentBatchId) {
    offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
    commitLog.purge(currentBatchId - minLogEntriesToMaintain)
  }
}

作業(yè)恢復(fù)的時候,如果offset checkpoint文件存在,那么sss會解析當(dāng)前批次的id,它對應(yīng)的要處理的offsets,以及已經(jīng)commit的N-1批次的offsets,

這時候sss再檢查這個第N批有沒有處理完畢,即看一下最后commit的offset是不是跟N批的offset相同,如果相同,那么就會執(zhí)行N+1新批次的處理,比如上次最后執(zhí)行到1102批,那么解析出來發(fā)現(xiàn)1102批處理完成了,那么批次從1103批開始執(zhí)行
否則重做1102批次

Data sinks

sinker要冪等,同時sss還提供一個commitLog,它用來記錄所有的已經(jīng)完成的batch的id,跟offsetLog一樣,這倆都可以retension


處理流程示意圖

trigger一批之后,總是先寫offsetlog,然后處理,處理完之后寫commitlog

前面提到了offset的錯誤恢復(fù),顯然,會有可能會重復(fù)消費一批數(shù)據(jù)進行處理,導(dǎo)致到達sinker的數(shù)據(jù)出現(xiàn)重復(fù),eoc因此要求sinker必須實現(xiàn)冪等,sinker自己去重,比如寫key -value類型的數(shù)據(jù)庫,redis等,重復(fù)的數(shù)據(jù)并不影響結(jié)果

sss的狀態(tài)管理

狀態(tài)管理僅僅給應(yīng)用提供了at-least once的支持,要實現(xiàn)eoc還需要sinker是冪等的,這跟flink的數(shù)據(jù)加barrier對齊后checkpoint不同,flink等于是框架層面實現(xiàn)了eoc,當(dāng)然flink也要求sinker必須是冪等的,否則還是有可能有重復(fù)數(shù)據(jù),比如kafka-fllink-kafka,因為寫給kafka sinker的那一批結(jié)果,雖然不用flink重新計算,但是kafka如果沒有開啟事務(wù),那么夸session的producer無法保證冪等,也就是不知道要寫的結(jié)果數(shù)據(jù),到底成功寫進去了幾條。

spark的狀態(tài)管理就是指持久化保存在checkpointLocation位置的那些數(shù)據(jù)

包括四類
1、數(shù)據(jù)源的信息,因為支持多種數(shù)據(jù)源,所有要知道你用了什么數(shù)據(jù)源
2、數(shù)據(jù)源的offsetLog
3、commitLog
4、應(yīng)用程序內(nèi)部的狀態(tài)(統(tǒng)計狀態(tài)等,用戶作業(yè)邏輯的狀態(tài),可能會非常多,非常大,比如你groupby url之類的試試)

前3個狀態(tài)信息,都是文本格式保存的,帶有版本信息,防止被不同版本的spark處理,破壞元數(shù)據(jù)等

故障恢復(fù)的流程

正常處理流程是先commit offset,再處理,被sinker完全處理后 再寫commitlog
這種情況下,在恢復(fù)的時候,加載最后commit的offsetLog,然后通過commitLog判斷那一批有沒有被正確處理

處理了,那么從新的offset開始處理,如果沒有,那么,就要重新做

應(yīng)該是按照微批做原子更新的,如果本批次沒有完成成功寫入commitLog,是可以回滾的

狀態(tài)管理跟flink的區(qū)別

整體上sss的這種狀態(tài)管理跟flink類似,spark因為是微批,那么就可以做批的原子狀態(tài)管理,flink因為是連續(xù)的流,所以必須用barrier機制同步各個算子的狀態(tài),也相當(dāng)于利用barrier實現(xiàn)了微批,只不過flink的微批是“異步的”,就是不用等你這一批執(zhí)行完,就可以不斷的執(zhí)行下一批,而sss的是“同步的”,顯然異步的似乎效率更高一些,但是checkpoint如果太頻繁,頻繁等待barrier一致的話,也會有很多算子干等著啥也不干,協(xié)調(diào)效率可能需要根據(jù)場景自行調(diào)優(yōu)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 內(nèi)容 sparkStreaming簡介 spark Streaming和Storm區(qū)別 Spark Streami...
    SUSUR_28f6閱讀 2,642評論 0 0
  • 1.前言 目前實時計算的業(yè)務(wù)場景越來越多,實時計算引擎技術(shù)及生態(tài)也越來越成熟。以Flink和Spark為首的實時計...
    java菜閱讀 1,807評論 0 1
  • 與一朵花對話 與一朵花對話 與生命對話 生命宛如一場盛放 為了這場盛放 承受了多少寒潮的凄苦 承受了幾多欲望的煎熬...
    千里軒閱讀 155評論 1 2
  • 李老師是小艾高中的班主任,個子矮小,長著一幅六棱角的臉型,不茍言笑,古板中透著幾分刻薄!說實話,小艾不是很喜歡他,...
    浮生貧樂閱讀 256評論 0 0
  • 當(dāng)孩子頂撞你的時候 作者:沉香紅 坐在高鐵上,前面有一個三歲小男孩,不斷的指責(zé)與批評媽媽,聲音很大,語言很不文明,...
    爾文書舍閱讀 158評論 0 1

友情鏈接更多精彩內(nèi)容