Kafka回顧
- 核心概念圖解
Broker:安裝Kafka服務(wù)的機(jī)器就是一個(gè)broker
Producer:消息的生產(chǎn)者,負(fù)責(zé)將數(shù)據(jù)寫入到broker中(push)
Consumer:消息的消費(fèi)者,負(fù)責(zé)從kafka中拉取數(shù)據(jù)(pull),老版本的消費(fèi)者需要依賴zk,新版本的不需要
Topic: 主題,相當(dāng)于是數(shù)據(jù)的一個(gè)分類,不同topic存放不同業(yè)務(wù)的數(shù)據(jù) –主題:區(qū)分業(yè)務(wù)
Replication:副本,數(shù)據(jù)保存多少份(保證數(shù)據(jù)不丟失) –副本:數(shù)據(jù)安全Partition:分區(qū),是一個(gè)物理的分區(qū),一個(gè)分區(qū)就是一個(gè)文件,一個(gè)Topic可以有1~n個(gè)分區(qū),每個(gè)分區(qū)都有自己的副本 –分區(qū):并發(fā)讀寫
Consumer Group:消費(fèi)者組,一個(gè)topic可以有多個(gè)消費(fèi)者/組同時(shí)消費(fèi),多個(gè)消費(fèi)者如果在一個(gè)消費(fèi)者組中,那么他們不能重復(fù)消費(fèi)數(shù)據(jù) –消費(fèi)者組:提高消費(fèi)者消費(fèi)速度、方便統(tǒng)一管理
注意[1]:一個(gè)Topic可以被多個(gè)消費(fèi)者或者組訂閱,一個(gè)消費(fèi)者/組也可以訂閱多個(gè)主題
注意[2]:讀數(shù)據(jù)只能從Leader讀, 寫數(shù)據(jù)也只能往Leader寫,F(xiàn)ollower會(huì)從Leader那里同步數(shù)據(jù)過(guò)來(lái)做副本?。?!
- 常用命令
啟動(dòng)kafka
/export/servers/kafka/bin/kafka-server-start.sh -daemon
/export/servers/kafka/config/server.properties
停止kafka
/export/servers/kafka/bin/kafka-server-stop.sh
查看topic信息
/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181
創(chuàng)建topic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test
查看某個(gè)topic信息
/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
刪除topic
/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test
啟動(dòng)生產(chǎn)者–控制臺(tái)的生產(chǎn)者一般用于測(cè)試
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
啟動(dòng)消費(fèi)者–控制臺(tái)的消費(fèi)者一般用于測(cè)試
/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic spark_kafka--from-beginning
消費(fèi)者連接到borker的地址
/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic spark_kafka --from-beginning
整合kafka兩種模式說(shuō)明
這同時(shí)也是一個(gè)面試題的熱點(diǎn)。開發(fā)中我們經(jīng)常會(huì)利用SparkStreaming實(shí)時(shí)地讀取kafka中的數(shù)據(jù)然后進(jìn)行處理,在spark1.3版本后,kafkaUtils里面提供了兩種創(chuàng)建DStream的方法:
1、Receiver接收方式:
- KafkaUtils.createDstream(開發(fā)中不用,了解即可,但是面試可能會(huì)問(wèn))。
- Receiver作為常駐的Task運(yùn)行在Executor等待數(shù)據(jù),但是一個(gè)Receiver效率低,需要開啟多個(gè),再手動(dòng)合并數(shù)據(jù)(union),再進(jìn)行處理,很麻煩
- Receiver哪臺(tái)機(jī)器掛了,可能會(huì)丟失數(shù)據(jù),所以需要開啟WAL(預(yù)寫日志)保證數(shù)據(jù)安全,那么效率又會(huì)降低!
- Receiver方式是通過(guò)zookeeper來(lái)連接kafka隊(duì)列,調(diào)用Kafka高階API,offset存儲(chǔ)在zookeeper,由Receiver維護(hù)。
- spark在消費(fèi)的時(shí)候?yàn)榱吮WC數(shù)據(jù)不丟也會(huì)在Checkpoint中存一份offset,可能會(huì)出現(xiàn)數(shù)據(jù)不一致
- 所以不管從何種角度來(lái)說(shuō),Receiver模式都不適合在開發(fā)中使用了,已經(jīng)淘汰了
2、Direct直連方式
- KafkaUtils.createDirectStream(開發(fā)中使用,要求掌握)
- Direct方式是直接連接kafka分區(qū)來(lái)獲取數(shù)據(jù),從每個(gè)分區(qū)直接讀取數(shù)據(jù)大大提高了并行能力
- Direct方式調(diào)用Kafka低階API(底層API),offset自己存儲(chǔ)和維護(hù),默認(rèn)由Spark維護(hù)在checkpoint中,消除了與zk不一致的情況
- 當(dāng)然也可以自己手動(dòng)維護(hù),把offset存在mysql、redis中
- 所以基于Direct模式可以在開發(fā)中使用,且借助Direct模式的特點(diǎn)+手動(dòng)操作可以保證數(shù)據(jù)的Exactly once 精準(zhǔn)一次
總結(jié):
- Receiver接收方式
- 多個(gè)Receiver接受數(shù)據(jù)效率高,但有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
- 開啟日志(WAL)可防止數(shù)據(jù)丟失,但寫兩遍數(shù)據(jù)效率低。
- Zookeeper維護(hù)offset有重復(fù)消費(fèi)數(shù)據(jù)可能。
- 使用高層次的API
- Direct直連方式
- 不使用Receiver,直接到kafka分區(qū)中讀取數(shù)據(jù)
- 不使用日志(WAL)機(jī)制
- Spark自己維護(hù)offset
- 使用低層次的API
擴(kuò)展:關(guān)于消息語(yǔ)義
注意: 開發(fā)中SparkStreaming和kafka集成有兩個(gè)版本:0.8及0.10+0.8版本有Receiver和Direct模式(但是0.8版本生產(chǎn)環(huán)境問(wèn)題較多,在Spark2.3之后不支持0.8版本了)。0.10以后只保留了direct模式(Reveiver模式不適合生產(chǎn)環(huán)境),并且0.10版本API有變化(更加強(qiáng)大)
結(jié)論:我們學(xué)習(xí)和開發(fā)都直接使用0.10版本中的direct模式
spark-streaming-kafka-0-8(了解)
1.Receiver
KafkaUtils.createDstream使用了receivers來(lái)接收數(shù)據(jù),利用的是Kafka高層次的消費(fèi)者api,偏移量由Receiver維護(hù)在zk中,對(duì)于所有的receivers接收到的數(shù)據(jù)將會(huì)保存在Spark executors中,然后通過(guò)Spark Streaming啟動(dòng)job來(lái)處理這些數(shù)據(jù),默認(rèn)會(huì)丟失,可啟用WAL日志,它同步將接受到數(shù)據(jù)保存到分布式文件系統(tǒng)上比如HDFS。保證數(shù)據(jù)在出錯(cuò)的情況下可以恢復(fù)出來(lái)。盡管這種方式配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性,但是啟用了WAL效率會(huì)較低,且無(wú)法保證數(shù)據(jù)被處理一次且僅一次,可能會(huì)處理兩次。因?yàn)镾park和ZooKeeper之間可能是不同步的。(官方現(xiàn)在已經(jīng)不推薦這種整合方式。)
- 準(zhǔn)備工作
1)啟動(dòng)zookeeper集群
zkServer.sh start
2)啟動(dòng)kafka集群
kafka-server-start.sh /export/servers/kafka/config/server.properties
3.創(chuàng)建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic spark_kafka
4.通過(guò)shell命令向topic發(fā)送消息
kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
5.添加kafka的pom依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>
- API
通過(guò)receiver接收器獲取kafka中topic數(shù)據(jù),可以并行運(yùn)行更多的接收器讀取kafak topic中的數(shù)據(jù),這里為3個(gè)
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})
如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設(shè)置存儲(chǔ)級(jí)別(默認(rèn)StorageLevel.MEMORY_AND_DISK_SER_2)
代碼演示
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable
object SparkKafka {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建StreamingContext
val config: SparkConf =
new SparkConf().setAppName("SparkStream").setMaster("local[*]")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
//開啟WAL預(yù)寫日志,保證數(shù)據(jù)源端可靠性
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./kafka")
//==============================================
//2.準(zhǔn)備配置參數(shù)
val zkQuorum = "node01:2181,node02:2181,node03:2181"
val groupId = "spark"
val topics = Map("spark_kafka" -> 2)//2表示每一個(gè)topic對(duì)應(yīng)分區(qū)都采用2個(gè)線程去消費(fèi),
//ssc的rdd分區(qū)和kafka的topic分區(qū)不一樣,增加消費(fèi)線程數(shù),并不增加spark的并行處理數(shù)據(jù)數(shù)量
//3.通過(guò)receiver接收器獲取kafka中topic數(shù)據(jù),可以并行運(yùn)行更多的接收器讀取kafak topic中的數(shù)據(jù),這里為3個(gè)
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})
//4.使用union方法,將所有receiver接受器產(chǎn)生的Dstream進(jìn)行合并
val allDStream: DStream[(String, String)] = ssc.union(receiverDStream)
//5.獲取topic的數(shù)據(jù)(String, String) 第1個(gè)String表示topic的名稱,第2個(gè)String表示topic的數(shù)據(jù)
val data: DStream[String] = allDStream.map(_._2)
//==============================================
//6.WordCount
val words: DStream[String] = data.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
2.Direct
Direct方式會(huì)定期地從kafka的topic下對(duì)應(yīng)的partition中查詢最新的偏移量,再根據(jù)偏移量范圍在每個(gè)batch里面處理數(shù)據(jù),Spark通過(guò)調(diào)用kafka簡(jiǎn)單的消費(fèi)者API讀取一定范圍的數(shù)據(jù)。- Direct的缺點(diǎn)是無(wú)法使用基于zookeeper的kafka監(jiān)控工具
- Direct相比基于Receiver方式有幾個(gè)優(yōu)點(diǎn):
-
簡(jiǎn)化并行
不需要?jiǎng)?chuàng)建多個(gè)kafka輸入流,然后union它們,sparkStreaming將會(huì)創(chuàng)建和kafka分區(qū)數(shù)一樣的rdd的分區(qū)數(shù),而且會(huì)從kafka中并行讀取數(shù)據(jù),spark中RDD的分區(qū)數(shù)和kafka中的分區(qū)數(shù)據(jù)是一一對(duì)應(yīng)的關(guān)系。 -
高效
Receiver實(shí)現(xiàn)數(shù)據(jù)的零丟失是將數(shù)據(jù)預(yù)先保存在WAL中,會(huì)復(fù)制一遍數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)被拷貝兩次,第一次是被kafka復(fù)制,另一次是寫到WAL中。而Direct不使用WAL消除了這個(gè)問(wèn)題。 -
恰好一次語(yǔ)義(Exactly-once-semantics)
Receiver讀取kafka數(shù)據(jù)是通過(guò)kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過(guò)數(shù)據(jù)保存在WAL中保證數(shù)據(jù)不丟失,但是可能會(huì)因?yàn)閟parkStreaming和ZK中保存的偏移量不一致而導(dǎo)致數(shù)據(jù)被消費(fèi)了多次。
Direct的Exactly-once-semantics(EOS)通過(guò)實(shí)現(xiàn)kafka低層次api,偏移量?jī)H僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問(wèn)題。
- API
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
代碼演示
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafka2 {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建StreamingContext
val config: SparkConf =
new SparkConf().setAppName("SparkStream").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./kafka")
//==============================================
//2.準(zhǔn)備配置參數(shù)
val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "spark")
val topics = Set("spark_kafka")
val allDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//3.獲取topic的數(shù)據(jù)
val data: DStream[String] = allDStream.map(_._2)
//==============================================
//WordCount
val words: DStream[String] = data.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
spark-streaming-kafka-0-10
- 說(shuō)明
spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發(fā)中使用
- pom.xml
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
- API:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
- 創(chuàng)建topic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic spark_kafka
- 啟動(dòng)生產(chǎn)者
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka
- 代碼演示
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkKafkaDemo {
def main(args: Array[String]): Unit = {
//1.創(chuàng)建StreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對(duì)數(shù)據(jù)進(jìn)行切分形成一個(gè)RDD
//準(zhǔn)備連接Kafka的參數(shù)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
//earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),從頭開始消費(fèi)
//latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
//none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
//這里配置latest自動(dòng)重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費(fèi),沒有偏移量從新來(lái)的數(shù)據(jù)開始消費(fèi)
"auto.offset.reset" -> "latest",
//false表示關(guān)閉自動(dòng)提交.由spark幫你提交到Checkpoint或程序員手動(dòng)維護(hù)
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_kafka")
//2.使用KafkaUtil連接Kafak獲取數(shù)據(jù)
val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,//位置策略,源碼強(qiáng)烈推薦使用該策略,會(huì)讓Spark的Executor和Kafka的Broker均勻?qū)?yīng)
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消費(fèi)策略,源碼強(qiáng)烈推薦使用該策略
//3.獲取VALUE數(shù)據(jù)
val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord
val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發(fā)過(guò)來(lái)的value,即一行數(shù)據(jù)
val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
result.print()
ssc.start()//開啟
ssc.awaitTermination()//等待優(yōu)雅停止
}
}
好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過(guò)程,