???? 簡單理解為:Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節(jié)點上獲取數(shù)據(jù)
Receiver
???? 使用Kafka的高層次Consumer API來實現(xiàn)。receiver從Kafka中獲取的數(shù)據(jù)都存儲在Spark Executor的內(nèi)存中,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù)。如果要啟用高可靠機制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中。所以,即使底層節(jié)點出現(xiàn)了失敗,也可以使用預(yù)寫日志中的數(shù)據(jù)進行恢復(fù)。
注意事項:
1、Kafka中topic的partition與Spark中RDD的partition是沒有關(guān)系的,因此,在KafkaUtils.createStream()中,提高partition的數(shù)量,只會增加Receiver的數(shù)量,也就是讀取Kafka中topic partition的線程數(shù)量,不會增加Spark處理數(shù)據(jù)的并行度。
2、可以創(chuàng)建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收數(shù)據(jù)。
3、如果基于容錯的文件系統(tǒng),比如HDFS,啟用了預(yù)寫日志機制,接收到的數(shù)據(jù)都會被復(fù)制一份到預(yù)寫日志中。因此,KafkaUtils.createStream()中,設(shè)置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
Direct
Spark1.3中引入Direct方式,用來替代掉使用Receiver接收數(shù)據(jù),這種方式會周期性地查詢Kafka,獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數(shù)據(jù)的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)。
這種方式有如下優(yōu)點:
1、簡化并行讀?。喝绻x取多個partition,不需要創(chuàng)建多個輸入DStream,然后對它們進行union操作。Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間,有一個一對一的映射關(guān)系。
2、高性能:如果要保證零數(shù)據(jù)丟失,在基于receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數(shù)據(jù)實際上被復(fù)制了兩份,Kafka自己本身就有高可靠的機制會對數(shù)據(jù)復(fù)制一份,而這里又會復(fù)制一份到WAL中。而基于direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數(shù)據(jù)的復(fù)制,那么就可以通過Kafka的副本進行恢復(fù)。
3、一次且僅一次的事務(wù)機制:基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數(shù)據(jù)的傳統(tǒng)方式。這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費一次且僅消費一次。由于數(shù)據(jù)消費偏移量是保存在checkpoint中,因此,如果后續(xù)想使用kafka高級API消費數(shù)據(jù),需要手動的更新zookeeper中的偏移量