Spark Streaming使用Kafka保證數(shù)據(jù)零丟失

源文件放在github,隨著理解的深入,不斷更新,如有謬誤之處,歡迎指正。原文鏈接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/spark_streaming使用kafka保證數(shù)據(jù)零丟失.md

spark streaming從1.2開始提供了數(shù)據(jù)的零丟失,想享受這個(gè)特性,需要滿足如下條件:

1.數(shù)據(jù)輸入需要可靠的sources和可靠的receivers

2.應(yīng)用metadata必須通過應(yīng)用driver checkpoint

3.WAL(write ahead log)

可靠的sources和receivers

spark streaming可以通過多種方式作為數(shù)據(jù)sources(包括kafka),輸入數(shù)據(jù)通過receivers接收,通過replication存儲(chǔ)于spark中(為了faultolerance,默認(rèn)復(fù)制到兩個(gè)spark executors),如果數(shù)據(jù)復(fù)制完成,receivers可以知道(例如kafka中更新offsets到zookeeper中)。這樣當(dāng)receivers在接收數(shù)據(jù)過程中crash掉,不會(huì)有數(shù)據(jù)丟失,receivers沒有復(fù)制的數(shù)據(jù),當(dāng)receiver恢復(fù)后重新接收。

image
image

metadata checkpoint

可靠的sources和receivers,可以使數(shù)據(jù)在receivers失敗后恢復(fù),然而在driver失敗后恢復(fù)是比較復(fù)雜的,一種方法是通過checkpoint metadata到HDFS或者S3。metadata包括:

  • configuration
  • code
  • 一些排隊(duì)等待處理但沒有完成的RDD(僅僅是metadata,而不是data)


    image
    image

這樣當(dāng)driver失敗時(shí),可以通過metadata checkpoint,重構(gòu)應(yīng)用程序并知道執(zhí)行到那個(gè)地方。

數(shù)據(jù)可能丟失的場(chǎng)景

可靠的sources和receivers,以及metadata checkpoint也不可以保證數(shù)據(jù)的不丟失,例如:

  • 兩個(gè)executor得到計(jì)算數(shù)據(jù),并保存在他們的內(nèi)存中
  • receivers知道數(shù)據(jù)已經(jīng)輸入
  • executors開始計(jì)算數(shù)據(jù)
  • driver突然失敗
  • driver失敗,那么executors都會(huì)被kill掉
  • 因?yàn)閑xecutor被kill掉,那么他們內(nèi)存中得數(shù)據(jù)都會(huì)丟失,但是這些數(shù)據(jù)不再被處理
  • executor中的數(shù)據(jù)不可恢復(fù)

WAL

為了避免上面情景的出現(xiàn),spark streaming 1.2引入了WAL。所有接收的數(shù)據(jù)通過receivers寫入HDFS或者S3中checkpoint目錄,這樣當(dāng)driver失敗后,executor中數(shù)據(jù)丟失后,可以通過checkpoint恢復(fù)。


image
image

At-Least-Once

盡管WAL可以保證數(shù)據(jù)零丟失,但是不能保證exactly-once,例如下面場(chǎng)景:

  • Receivers接收完數(shù)據(jù)并保存到HDFS或S3

  • 在更新offset前,receivers失敗了


    image
    image
  • Spark Streaming以為數(shù)據(jù)接收成功,但是Kafka以為數(shù)據(jù)沒有接收成功,因?yàn)閛ffset沒有更新到zookeeper

  • 隨后receiver恢復(fù)了

  • 從WAL可以讀取的數(shù)據(jù)重新消費(fèi)一次,因?yàn)槭褂玫膋afka High-Level消費(fèi)API,從zookeeper中保存的offsets開始消費(fèi)

WAL的缺點(diǎn)

通過上面描述,WAL有兩個(gè)缺點(diǎn):

  • 降低了receivers的性能,因?yàn)閿?shù)據(jù)還要存儲(chǔ)到HDFS等分布式文件系統(tǒng)
  • 對(duì)于一些resources,可能存在重復(fù)的數(shù)據(jù),比如Kafka,在Kafka中存在一份數(shù)據(jù),在Spark Streaming也存在一份(以WAL的形式存儲(chǔ)在hadoop API兼容的文件系統(tǒng)中)

Kafka direct API

為了WAL的性能損失和exactly-once,spark streaming1.3中使用Kafka direct API。非常巧妙,Spark driver計(jì)算下個(gè)batch的offsets,指導(dǎo)executor消費(fèi)對(duì)應(yīng)的topics和partitions。消費(fèi)Kafka消息,就像消費(fèi)文件系統(tǒng)文件一樣。


image
image

1.不再需要kafka receivers,executor直接通過Kafka API消費(fèi)數(shù)據(jù)

2.WAL不再需要,如果從失敗恢復(fù),可以重新消費(fèi)

3.exactly-once得到了保證,不會(huì)再?gòu)腤AL中重復(fù)讀取數(shù)據(jù)

總結(jié)

主要說的是spark streaming通過各種方式來保證數(shù)據(jù)不丟失,并保證exactly-once,每個(gè)版本都是spark streaming越來越穩(wěn)定,越來越向生產(chǎn)環(huán)境使用發(fā)展。

參考

spark-streaming
Recent Evolution of Zero Data Loss Guarantee in Spark Streaming With Kafka

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容