Spark-Streaming獲取kafka數(shù)據(jù)的兩種方式:Receiver與Direct的方式

???? 簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節(jié)點上獲取數(shù)據(jù)

Receiver

???? 使用Kafka的高層次Consumer API來實現(xiàn)。receiver從Kafka中獲取的數(shù)據(jù)都存儲在Spark Executor的內(nèi)存中,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù)。如果要啟用高可靠機制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中。所以,即使底層節(jié)點出現(xiàn)了失敗,也可以使用預(yù)寫日志中的數(shù)據(jù)進行恢復(fù)。

注意事項:

1、Kafka中topic的partition與Spark中RDD的partition是沒有關(guān)系的,因此,在KafkaUtils.createStream()中,提高partition的數(shù)量,只會增加Receiver的數(shù)量,也就是讀取Kafka中topic partition的線程數(shù)量,不會增加Spark處理數(shù)據(jù)的并行度。

2、可以創(chuàng)建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收數(shù)據(jù)。

3、如果基于容錯的文件系統(tǒng),比如HDFS,啟用了預(yù)寫日志機制,接收到的數(shù)據(jù)都會被復(fù)制一份到預(yù)寫日志中。因此,KafkaUtils.createStream()中,設(shè)置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。

Direct

Spark1.3中引入Direct方式,用來替代掉使用Receiver接收數(shù)據(jù),這種方式會周期性地查詢Kafka,獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數(shù)據(jù)的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)。

這種方式有如下優(yōu)點:

1、簡化并行讀?。喝绻x取多個partition,不需要創(chuàng)建多個輸入DStream,然后對它們進行union操作。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間,有一個一對一的映射關(guān)系。

2、高性能:如果要保證零數(shù)據(jù)丟失,在基于receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數(shù)據(jù)實際上被復(fù)制了兩份,Kafka自己本身就有高可靠的機制會對數(shù)據(jù)復(fù)制一份,而這里又會復(fù)制一份到WAL中。而基于direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數(shù)據(jù)的復(fù)制,那么就可以通過Kafka的副本進行恢復(fù)。

3、一次且僅一次的事務(wù)機制:基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數(shù)據(jù)的傳統(tǒng)方式。這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。

基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費一次且僅消費一次。由于數(shù)據(jù)消費偏移量是保存在checkpoint中,因此,如果后續(xù)想使用kafka高級API消費數(shù)據(jù),需要手動的更新zookeeper中的偏移量

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容