exactly once指的是在處理數(shù)據(jù)的過程中,系統(tǒng)有很好的容錯(cuò)性(fault-tolerance),能夠保證數(shù)據(jù)處理不重不丟,每一條數(shù)據(jù)僅被處理一次。
Spark具備很好的機(jī)制來保證exactly once的語義,具體體現(xiàn)在數(shù)據(jù)源的可重放性、計(jì)算過程中的容錯(cuò)性、以及寫入存儲(chǔ)介質(zhì)時(shí)的冪等性或者事務(wù)性。
數(shù)據(jù)源的可重放性
數(shù)據(jù)源具有可重放性指的是在出現(xiàn)問題時(shí)可以重新獲取到需要的數(shù)據(jù),一般Spark的數(shù)據(jù)源分為兩種:
第一種為基于文件系統(tǒng)的數(shù)據(jù)源,如HDFS,這種數(shù)據(jù)源的數(shù)據(jù)來自于文件,所以本身就具備了可重放性;
另一種為基于外部接收器的數(shù)據(jù)源,如Kafka,這種數(shù)據(jù)源就不一定能夠具備可重放性,需要具體考慮。
以Kafka為例,一般與Kafka配合使用的是Spark中的SparkStreaming模塊,用來對(duì)流式數(shù)據(jù)進(jìn)行準(zhǔn)實(shí)時(shí)的處理。而SparkStreaming接入Kafka的數(shù)據(jù)有兩種模式,一種為Receiver模式,一種為Direct模式。
Receiver模式
Receiver模式采用Kafka的高階consumer API,Kafka自己封裝了對(duì)數(shù)據(jù)的獲取邏輯,且通過Zookeeper管理offset信息,這種模式在與SparkStreaming對(duì)接時(shí),有以下特點(diǎn):
- Kafka中的partition數(shù)量與SparkStreaming中的并行度不是一一對(duì)應(yīng)的,SparkStreaming通過創(chuàng)建Receiver去讀取Kafka中數(shù)據(jù),createStream()方法傳入的并發(fā)參數(shù)代表的是讀取Kafka中topic+partition的線程數(shù),并不能提高SparkStreaming讀取數(shù)據(jù)的并行度。
- Kafka自己管理offset,Receiver作為一個(gè)高層的Consumer來消費(fèi)數(shù)據(jù),其消費(fèi)的偏移量(offset)由Kafka記錄在Zookeeper中,一旦出現(xiàn)錯(cuò)誤,那些已經(jīng)標(biāo)記為消費(fèi)過的數(shù)據(jù)將會(huì)丟失。
Receiver模式下,為了解決讀取數(shù)據(jù)時(shí)的并行度問題,可以創(chuàng)建多個(gè)DStream,然后union起來,具體可參考文章:http://www.itdecent.cn/p/c8669261165a;為了解決數(shù)據(jù)丟失的問題,可以選擇開啟Spark的WAL(write ahead log)機(jī)制,每次處理數(shù)據(jù)前將預(yù)寫日志寫入到HDFS中,如果節(jié)點(diǎn)出現(xiàn)錯(cuò)誤,可以從WAL中恢復(fù)。但是這種方法其實(shí)效率低下,不僅數(shù)據(jù)冗余(Kafka中有副本機(jī)制,Spark中還要存一份),且無法保證exactly once,數(shù)據(jù)可能重復(fù)消費(fèi)。
無論采取什么方法進(jìn)行補(bǔ)救,Receiver模式都不能夠?qū)崿F(xiàn)exactly once的語義,其根本原因是Kafka自己管理的offset與SparkStreaming實(shí)際處理數(shù)據(jù)的offset沒有同步導(dǎo)致的。
Direct模式
為了解決Receiver模式的弊病,Spark1.3中引入了Direct模式來替代Receiver模式,它使用Kafka的Simple consumer API,由Spark應(yīng)用自己管理offset信息,以達(dá)成exactly once的語義,其特點(diǎn)如下:
- Kafka中的partition與SparkStreaming中的partition一一對(duì)應(yīng),也就是SparkStreaming讀取數(shù)據(jù)的并行度取決于Kafka中partition的數(shù)量。
- 不依賴Receiver,而是通過低階api直接找到topic+partition的leader獲取數(shù)據(jù),并由SparkStreaming應(yīng)用自己負(fù)責(zé)追蹤維護(hù)消費(fèi)的offset。
由于SparkStreaming自己可以維護(hù)offset,所以應(yīng)用自身消費(fèi)的數(shù)據(jù)和偏移量之間的對(duì)應(yīng)關(guān)系確定的,數(shù)據(jù)也是同步的,所以可以實(shí)現(xiàn)exactly once的語義。
下面將給出Direct模式下,SparkStreaming應(yīng)用管理offset的方法案例,其中offset依然是存放在zookeeper中,但是由應(yīng)用自身來管理的,offset也可以放在Redis、MySQL、HBase中進(jìn)行管理,根據(jù)具體情況進(jìn)行選擇。
createDirectStream方法:
def createDirectStream(ssc:StreamingContext)(implicit streamingConfig: StreamingConfig, kc: SimpleKafkaCluster): InputDStream[(Array[Byte], Array[Byte])] = {
val topics = streamingConfig.topicSet
val groupId = streamingConfig.group
// 首先更新offset
setOrUpdateOffsets(topics, groupId)
//從zookeeper上讀取offset開始消費(fèi)message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
consumerOffsets.foreach {
case (tp, n) => println("===================================" + tp.topic + "," + tp.partition + "," + n)
}
KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (Array[Byte], Array[Byte])](
ssc, streamingConfig.kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message))
}
messages
}
代碼中,參數(shù)streamingConfig中封裝了Kafka的具體參數(shù)信息,如topic名稱,broker list,消費(fèi)者組id等。kc則是Simple Consumer API的接口類,封裝了具體獲取Kafka數(shù)據(jù)的方法。代碼段剛開始的時(shí)候就調(diào)用了setOrUpdateOffsets()方法來更新offset,確保下面得到的數(shù)據(jù)是最新的。
setOrUpdateOffsets方法:
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
// 某個(gè)groupid首次沒有offset信息,會(huì)報(bào)錯(cuò),從頭開始讀
if (hasConsumed) {// 消費(fèi)過
/**
* 如果streaming程序執(zhí)行的時(shí)候出現(xiàn)kafka.common.OffsetOutOfRangeException,
* 說明zk上保存的offsets已經(jīng)過時(shí)了,即kafka的定時(shí)清理策略已經(jīng)將包含該offsets的文件刪除。
* 針對(duì)這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時(shí),
* 這時(shí)把consumerOffsets更新為earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分區(qū)consumerOffsets過時(shí),所以只更新過時(shí)分區(qū)的consumerOffsets為earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case(tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已經(jīng)過時(shí),更新為" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {// 沒有消費(fèi)過
val reset = streamingConfig.resetSign.toLowerCase
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {// 從頭消費(fèi)
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else { // 從最新offset處消費(fèi)
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
最后是updateZKOffsets方法,用于應(yīng)用輸出數(shù)據(jù)后,更新zk中的offset:
def updateZKOffsets(rdd: RDD[(Array[Byte], Array[Byte])]) : Unit = {
val groupId = streamingConfig.group
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
對(duì)于一個(gè)SparkStreaming應(yīng)用來說,每個(gè)批次通過createDirectStream方法來獲取zookeeper中最新的offset,然后使用simple kafka api獲取數(shù)據(jù),消費(fèi)處理完之后,再通過updateZKOffsets方法,更新這個(gè)duration消費(fèi)的offset至zookeeper,以此過程保證了exactly once的語義。
計(jì)算過程中的容錯(cuò)性
以上所述僅保證了在讀取數(shù)據(jù)源過程中的exactly once,數(shù)據(jù)讀取成功后,在Spark應(yīng)用中做處理時(shí),是怎么保證數(shù)據(jù)不重不丟的呢?Spark在容錯(cuò)性這一方面交出了令人滿意的答卷。撇去Driver與Executor的高可用性不說,Spark應(yīng)用內(nèi)部則采用checkpoint和lineage的機(jī)制來確保容錯(cuò)性。
lineage
一般翻譯為血統(tǒng),簡(jiǎn)單來說就是RDD在轉(zhuǎn)化的過程中,由于父RDD與子RDD存在依賴關(guān)系(Dependency),從而形成的lineage,也可以理解為lineage串起了RDD DAG。
RDD可以進(jìn)行緩存,通過調(diào)用persist或者cache方法,將RDD持久化到內(nèi)存或者磁盤中,這樣緩存的RDD就可以被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中被重用,緩存是構(gòu)建Spark快速迭代的關(guān)鍵。
當(dāng)一個(gè)RDD丟失的情況下,Spark會(huì)去尋找它的父RDD是否已經(jīng)緩存,如果已經(jīng)緩存,就可以通過父RDD直接算出當(dāng)前的RDD,從而避免了緩存之前的RDD的計(jì)算過程,且只有丟失數(shù)據(jù)的partition需要進(jìn)行重算,這樣Spark就避免了RDD上的重復(fù)計(jì)算,能夠極大的提升計(jì)算速度。
緩存雖然可以提升Spark快速迭代計(jì)算的速度,但是緩存是會(huì)丟失的。
checkpoint
檢查點(diǎn)機(jī)制就是為了可以切斷l(xiāng)ineage的依賴關(guān)系,在某個(gè)重要的節(jié)點(diǎn),將RDD持久化到文件系統(tǒng)中(一般選擇HDFS),這樣就算之前的緩存已經(jīng)丟失了,也可以保證檢查點(diǎn)數(shù)據(jù)不會(huì)丟失,這樣在恢復(fù)的時(shí)候,會(huì)直接從檢查點(diǎn)的數(shù)據(jù)開始進(jìn)行計(jì)算,檢查點(diǎn)機(jī)制在SparkStreaming這種流式計(jì)算中發(fā)揮的作用會(huì)更大。
可以通過以下源碼為入口進(jìn)一步了解Spark的緩存和檢查點(diǎn)機(jī)制,RDD在進(jìn)行計(jì)算的時(shí)候會(huì)調(diào)用其iterator方法,在該方法中會(huì)首先去讀取緩存的數(shù)據(jù),如果沒有緩存的數(shù)據(jù)則會(huì)去讀取checkpoint的數(shù)據(jù)
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
Spark在計(jì)算過程中采用的lineage和checkpoint機(jī)制相互結(jié)合,取長(zhǎng)補(bǔ)短,再加上Spark各個(gè)組件底層本身就是具有高可用性,所以在Spark應(yīng)用在轉(zhuǎn)化計(jì)算的過程中,可是保證數(shù)據(jù)處理的exactly once。
寫入存儲(chǔ)介質(zhì)的冪等性或事務(wù)性
Spark進(jìn)行數(shù)據(jù)輸出的時(shí)候,為了達(dá)到exactly once,有兩種方式:
- 冪等更新
指多次寫入的結(jié)果總是寫入相同的數(shù)據(jù),比較典型的例子是key-value型數(shù)據(jù)庫,即使數(shù)據(jù)可能多次寫入,但是最終的結(jié)果也不會(huì)影響其正確性,Spark RDD的輸出方法saveAsTextFile在輸出的時(shí)候?qū)DD轉(zhuǎn)換成為PairRDD,總是將相同的數(shù)據(jù)寫入到文件系統(tǒng)中,而PairRDD的輸出方法本身就滿足key-value的模型,所以均滿足冪等更新。 - 事務(wù)更新
指所有的更新都是基于事務(wù)的,所以更新都是exactly once。Spark需要用戶自己實(shí)現(xiàn)事物機(jī)制,在foreachRDD方法中,用戶可以使用batch time和partition index來創(chuàng)建一個(gè)id,使用這個(gè)id來確保數(shù)據(jù)的唯一性,啟動(dòng)事務(wù)并使用這個(gè)id來更新外部系統(tǒng)數(shù)據(jù),如果這個(gè)id不存在則提交更新,如果這個(gè)id已經(jīng)存在那么則放棄更新。
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
另外,還有一些下游的存儲(chǔ)介質(zhì)本身就不支持冪等或是事務(wù)性寫入,比如kafka。Spark的task或是stage的失敗重做機(jī)制以及kafka本身的高可用寫入,都會(huì)造成一些數(shù)據(jù)重復(fù),這可能就需要Kafka本身去支持transaction write或者其下游應(yīng)用去實(shí)現(xiàn)去重機(jī)制。
最后,exactly once固然是個(gè)理想的狀態(tài),但其實(shí)現(xiàn)成本也是非常高的,在對(duì)數(shù)據(jù)可靠性要求不是很高的場(chǎng)景中,at-least-once甚至丟失少量數(shù)據(jù)也是可以作為一個(gè)選項(xiàng)考慮的。