1、 createDirectStream
自 Spark-1.3.0 起,提供了不需要 Receiver 的方法。替代了使用 receivers 來(lái)接收數(shù)據(jù),該方法定期查詢(xún)每個(gè) topic+partition 的 lastest offset,并據(jù)此決定每個(gè) batch 要接收的 offsets 范圍
? 簡(jiǎn)化并行:不再需要?jiǎng)?chuàng)建多個(gè) kafka input DStream 然后再 union 這些 input DStream。使用 directStream,Spark Streaming會(huì)創(chuàng)建與 Kafka partitions 相同數(shù)量的 paritions 的 RDD,RDD 的 partition與 Kafka 的 partition 一一對(duì)應(yīng),這樣更易于理解及調(diào)優(yōu)
? 高效:在方式一中要保證數(shù)據(jù)零丟失需要啟用 WAL(預(yù)寫(xiě)日志),這會(huì)占用更多空間。而在方式二中,可以直接從 Kafka 指定的 topic 的指定 offsets 處恢復(fù)數(shù)據(jù),不需要使用 WAL
? 恰好一次語(yǔ)義保證:基于Receiver方式使用了 Kafka 的 high level API 來(lái)在 Zookeeper 中存儲(chǔ)已消費(fèi)的 offsets。這在某些情況下會(huì)導(dǎo)致一些數(shù)據(jù)被消費(fèi)兩次,比如 streaming app 在處理某個(gè) batch 內(nèi)已接受到的數(shù)據(jù)的過(guò)程中掛掉,但是數(shù)據(jù)已經(jīng)處理了一部分,但這種情況下無(wú)法將已處理數(shù)據(jù)的 offsets 更新到 Zookeeper 中,下次重啟時(shí),這批數(shù)據(jù)將再次被消費(fèi)且處理?;赿irect的方式,使用kafka的簡(jiǎn)單api,Spark Streaming自己就負(fù)責(zé)追蹤消費(fèi)的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次。這種方式中,只要將 output 操作和保存 offsets 操作封裝成一個(gè)原子操作就能避免失敗后的重復(fù)消費(fèi)和處理,從而達(dá)到恰好一次的語(yǔ)義(Exactly-once)
2、 createStream
這種方法使用一個(gè) Receiver 來(lái)接收數(shù)據(jù)。在該 Receiver 的實(shí)現(xiàn)中使用了 Kafka high-level consumer API。Receiver 從 kafka 接收的數(shù)據(jù)將被存儲(chǔ)到 Spark executor 中,隨后啟動(dòng)的 job 將處理這些數(shù)據(jù)。
在默認(rèn)配置下,該方法失敗后會(huì)丟失數(shù)據(jù)(保存在 executor 內(nèi)存里的數(shù)據(jù)在 application 失敗后就沒(méi)了),若要保證數(shù)據(jù)不丟失,需要啟用 WAL(即預(yù)寫(xiě)日志至 HDFS、S3等),這樣再失敗后可以從日志文件中恢復(fù)數(shù)據(jù)。
在該函數(shù)中,會(huì)新建一個(gè) KafkaInputDStream對(duì)象,KafkaInputDStream繼承于 ReceiverInputDStream。KafkaInputDStream實(shí)現(xiàn)了getReceiver方法,返回接收器的實(shí)例
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
//< 不啟用 WAL
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
//< 啟用 WAL
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
需要注意的點(diǎn):
? Kafka Topic 的 partitions 與RDD 的 partitions 沒(méi)有直接關(guān)系,不能一一對(duì)應(yīng)。如果增加 topic 的 partition 個(gè)數(shù)的話僅僅會(huì)增加單個(gè) Receiver 接收數(shù)據(jù)的線程數(shù)。事實(shí)上,使用這種方法只會(huì)在一個(gè) executor 上啟用一個(gè) Receiver,該 Receiver 包含一個(gè)線程池,線程池的線程個(gè)數(shù)與所有 topics 的 partitions 個(gè)數(shù)總和一致,每條線程接收一個(gè) topic 的一個(gè) partition 的數(shù)據(jù)。而并不會(huì)增加處理數(shù)據(jù)時(shí)的并行度。
? 對(duì)于一個(gè) topic,可以使用多個(gè) groupid 相同的 input DStream 來(lái)使用多個(gè) Receivers 來(lái)增加并行度,然后 union 他們;對(duì)于多個(gè) topics,除了可以用上個(gè)辦法增加并行度外,還可以對(duì)不同的 topic 使用不同的 input DStream 然后 union 他們來(lái)增加并行度
? 如果你啟用了 WAL,為能將接收到的數(shù)據(jù)將以 log 的方式在指定的存儲(chǔ)系統(tǒng)備份一份,需要指定輸入數(shù)據(jù)的存儲(chǔ)等級(jí)為 StorageLevel.MEMORY_AND_DISK_SER 或 StorageLevel.MEMORY_AND_DISK_SER_2