源文件放在github,隨著理解的深入,不斷更新,如有謬誤之處,歡迎指正。原文鏈接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/spark_streaming使用kafka保證數(shù)據(jù)零丟失.md
spark streaming從1.2開始提供了數(shù)據(jù)的零丟失,想享受這個(gè)特性,需要滿足如下條件:
1.數(shù)據(jù)輸入需要可靠的sources和可靠的receivers
2.應(yīng)用metadata必須通過應(yīng)用driver checkpoint
3.WAL(write ahead log)
可靠的sources和receivers
spark streaming可以通過多種方式作為數(shù)據(jù)sources(包括kafka),輸入數(shù)據(jù)通過receivers接收,通過replication存儲(chǔ)于spark中(為了faultolerance,默認(rèn)復(fù)制到兩個(gè)spark executors),如果數(shù)據(jù)復(fù)制完成,receivers可以知道(例如kafka中更新offsets到zookeeper中)。這樣當(dāng)receivers在接收數(shù)據(jù)過程中crash掉,不會(huì)有數(shù)據(jù)丟失,receivers沒有復(fù)制的數(shù)據(jù),當(dāng)receiver恢復(fù)后重新接收。

metadata checkpoint
可靠的sources和receivers,可以使數(shù)據(jù)在receivers失敗后恢復(fù),然而在driver失敗后恢復(fù)是比較復(fù)雜的,一種方法是通過checkpoint metadata到HDFS或者S3。metadata包括:
- configuration
- code
-
一些排隊(duì)等待處理但沒有完成的RDD(僅僅是metadata,而不是data)
image
這樣當(dāng)driver失敗時(shí),可以通過metadata checkpoint,重構(gòu)應(yīng)用程序并知道執(zhí)行到那個(gè)地方。
數(shù)據(jù)可能丟失的場(chǎng)景
可靠的sources和receivers,以及metadata checkpoint也不可以保證數(shù)據(jù)的不丟失,例如:
- 兩個(gè)executor得到計(jì)算數(shù)據(jù),并保存在他們的內(nèi)存中
- receivers知道數(shù)據(jù)已經(jīng)輸入
- executors開始計(jì)算數(shù)據(jù)
- driver突然失敗
- driver失敗,那么executors都會(huì)被kill掉
- 因?yàn)閑xecutor被kill掉,那么他們內(nèi)存中得數(shù)據(jù)都會(huì)丟失,但是這些數(shù)據(jù)不再被處理
- executor中的數(shù)據(jù)不可恢復(fù)
WAL
為了避免上面情景的出現(xiàn),spark streaming 1.2引入了WAL。所有接收的數(shù)據(jù)通過receivers寫入HDFS或者S3中checkpoint目錄,這樣當(dāng)driver失敗后,executor中數(shù)據(jù)丟失后,可以通過checkpoint恢復(fù)。

At-Least-Once
盡管WAL可以保證數(shù)據(jù)零丟失,但是不能保證exactly-once,例如下面場(chǎng)景:
Receivers接收完數(shù)據(jù)并保存到HDFS或S3
-
在更新offset前,receivers失敗了
image Spark Streaming以為數(shù)據(jù)接收成功,但是Kafka以為數(shù)據(jù)沒有接收成功,因?yàn)閛ffset沒有更新到zookeeper
隨后receiver恢復(fù)了
從WAL可以讀取的數(shù)據(jù)重新消費(fèi)一次,因?yàn)槭褂玫膋afka High-Level消費(fèi)API,從zookeeper中保存的offsets開始消費(fèi)
WAL的缺點(diǎn)
通過上面描述,WAL有兩個(gè)缺點(diǎn):
- 降低了receivers的性能,因?yàn)閿?shù)據(jù)還要存儲(chǔ)到HDFS等分布式文件系統(tǒng)
- 對(duì)于一些resources,可能存在重復(fù)的數(shù)據(jù),比如Kafka,在Kafka中存在一份數(shù)據(jù),在Spark Streaming也存在一份(以WAL的形式存儲(chǔ)在hadoop API兼容的文件系統(tǒng)中)
Kafka direct API
為了WAL的性能損失和exactly-once,spark streaming1.3中使用Kafka direct API。非常巧妙,Spark driver計(jì)算下個(gè)batch的offsets,指導(dǎo)executor消費(fèi)對(duì)應(yīng)的topics和partitions。消費(fèi)Kafka消息,就像消費(fèi)文件系統(tǒng)文件一樣。

1.不再需要kafka receivers,executor直接通過Kafka API消費(fèi)數(shù)據(jù)
2.WAL不再需要,如果從失敗恢復(fù),可以重新消費(fèi)
3.exactly-once得到了保證,不會(huì)再?gòu)腤AL中重復(fù)讀取數(shù)據(jù)
總結(jié)
主要說的是spark streaming通過各種方式來保證數(shù)據(jù)不丟失,并保證exactly-once,每個(gè)版本都是spark streaming越來越穩(wěn)定,越來越向生產(chǎn)環(huán)境使用發(fā)展。
參考
spark-streaming
Recent Evolution of Zero Data Loss Guarantee in Spark Streaming With Kafka