SparkStreaming 如何整合 Kafka

Kafka回顧

  • 核心概念圖解

Broker:安裝Kafka服務(wù)的機(jī)器就是一個(gè)broker

image

Producer:消息的生產(chǎn)者,負(fù)責(zé)將數(shù)據(jù)寫入到broker中(push)
Consumer:消息的消費(fèi)者,負(fù)責(zé)從kafka中拉取數(shù)據(jù)(pull),老版本的消費(fèi)者需要依賴zk,新版本的不需要
Topic: 主題,相當(dāng)于是數(shù)據(jù)的一個(gè)分類,不同topic存放不同業(yè)務(wù)的數(shù)據(jù) –主題:區(qū)分業(yè)務(wù)
Replication:副本,數(shù)據(jù)保存多少份(保證數(shù)據(jù)不丟失) –副本:數(shù)據(jù)安全
Partition:
分區(qū),是一個(gè)物理的分區(qū),一個(gè)分區(qū)就是一個(gè)文件,一個(gè)Topic可以有1~n個(gè)分區(qū),每個(gè)分區(qū)都有自己的副本 –分區(qū):并發(fā)讀寫
Consumer Group:消費(fèi)者組,一個(gè)topic可以有多個(gè)消費(fèi)者/組同時(shí)消費(fèi),多個(gè)消費(fèi)者如果在一個(gè)消費(fèi)者組中,那么他們不能重復(fù)消費(fèi)數(shù)據(jù) –消費(fèi)者組:提高消費(fèi)者消費(fèi)速度、方便統(tǒng)一管理
注意[1]:一個(gè)Topic可以被多個(gè)消費(fèi)者或者組訂閱,一個(gè)消費(fèi)者/組也可以訂閱多個(gè)主題
注意[2]:讀數(shù)據(jù)只能從Leader讀, 寫數(shù)據(jù)也只能往Leader寫,F(xiàn)ollower會(huì)從Leader那里同步數(shù)據(jù)過(guò)來(lái)做副本?。?!

  • 常用命令

啟動(dòng)kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon 

/export/servers/kafka/config/server.properties 

停止kafka

/export/servers/kafka/bin/kafka-server-stop.sh

查看topic信息

/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181 

創(chuàng)建topic

/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test 

查看某個(gè)topic信息

/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test 

刪除topic

/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test

啟動(dòng)生產(chǎn)者–控制臺(tái)的生產(chǎn)者一般用于測(cè)試

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka

啟動(dòng)消費(fèi)者–控制臺(tái)的消費(fèi)者一般用于測(cè)試

/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic spark_kafka--from-beginning 

消費(fèi)者連接到borker的地址

/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic spark_kafka --from-beginning 

整合kafka兩種模式說(shuō)明

這同時(shí)也是一個(gè)面試題的熱點(diǎn)。開發(fā)中我們經(jīng)常會(huì)利用SparkStreaming實(shí)時(shí)地讀取kafka中的數(shù)據(jù)然后進(jìn)行處理,在spark1.3版本后,kafkaUtils里面提供了兩種創(chuàng)建DStream的方法:
1、Receiver接收方式:

  • KafkaUtils.createDstream(開發(fā)中不用,了解即可,但是面試可能會(huì)問(wèn))。
  • Receiver作為常駐的Task運(yùn)行在Executor等待數(shù)據(jù),但是一個(gè)Receiver效率低,需要開啟多個(gè),再手動(dòng)合并數(shù)據(jù)(union),再進(jìn)行處理,很麻煩
  • Receiver哪臺(tái)機(jī)器掛了,可能會(huì)丟失數(shù)據(jù),所以需要開啟WAL(預(yù)寫日志)保證數(shù)據(jù)安全,那么效率又會(huì)降低!
  • Receiver方式是通過(guò)zookeeper來(lái)連接kafka隊(duì)列,調(diào)用Kafka高階API,offset存儲(chǔ)在zookeeper,由Receiver維護(hù)。
  • spark在消費(fèi)的時(shí)候?yàn)榱吮WC數(shù)據(jù)不丟也會(huì)在Checkpoint中存一份offset,可能會(huì)出現(xiàn)數(shù)據(jù)不一致
  • 所以不管從何種角度來(lái)說(shuō),Receiver模式都不適合在開發(fā)中使用了,已經(jīng)淘汰了

2、Direct直連方式

  • KafkaUtils.createDirectStream(開發(fā)中使用,要求掌握)
  • Direct方式是直接連接kafka分區(qū)來(lái)獲取數(shù)據(jù),從每個(gè)分區(qū)直接讀取數(shù)據(jù)大大提高了并行能力
  • Direct方式調(diào)用Kafka低階API(底層API),offset自己存儲(chǔ)和維護(hù),默認(rèn)由Spark維護(hù)在checkpoint中,消除了與zk不一致的情況
  • 當(dāng)然也可以自己手動(dòng)維護(hù),把offset存在mysql、redis中
  • 所以基于Direct模式可以在開發(fā)中使用,且借助Direct模式的特點(diǎn)+手動(dòng)操作可以保證數(shù)據(jù)的Exactly once 精準(zhǔn)一次

總結(jié):

  • Receiver接收方式
  1. 多個(gè)Receiver接受數(shù)據(jù)效率高,但有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
  2. 開啟日志(WAL)可防止數(shù)據(jù)丟失,但寫兩遍數(shù)據(jù)效率低。
  3. Zookeeper維護(hù)offset有重復(fù)消費(fèi)數(shù)據(jù)可能。
  4. 使用高層次的API
  • Direct直連方式
  1. 不使用Receiver,直接到kafka分區(qū)中讀取數(shù)據(jù)
  2. 不使用日志(WAL)機(jī)制
  3. Spark自己維護(hù)offset
  4. 使用低層次的API

擴(kuò)展:關(guān)于消息語(yǔ)義

image

注意: 開發(fā)中SparkStreaming和kafka集成有兩個(gè)版本:0.8及0.10+0.8版本有Receiver和Direct模式(但是0.8版本生產(chǎn)環(huán)境問(wèn)題較多,在Spark2.3之后不支持0.8版本了)。0.10以后只保留了direct模式(Reveiver模式不適合生產(chǎn)環(huán)境),并且0.10版本API有變化(更加強(qiáng)大)

image

結(jié)論:我們學(xué)習(xí)和開發(fā)都直接使用0.10版本中的direct模式

spark-streaming-kafka-0-8(了解)
1.Receiver
KafkaUtils.createDstream使用了receivers來(lái)接收數(shù)據(jù),利用的是Kafka高層次的消費(fèi)者api,偏移量由Receiver維護(hù)在zk中,對(duì)于所有的receivers接收到的數(shù)據(jù)將會(huì)保存在Spark executors中,然后通過(guò)Spark Streaming啟動(dòng)job來(lái)處理這些數(shù)據(jù),默認(rèn)會(huì)丟失,可啟用WAL日志,它同步將接受到數(shù)據(jù)保存到分布式文件系統(tǒng)上比如HDFS。保證數(shù)據(jù)在出錯(cuò)的情況下可以恢復(fù)出來(lái)。盡管這種方式配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性,但是啟用了WAL效率會(huì)較低,且無(wú)法保證數(shù)據(jù)被處理一次且僅一次,可能會(huì)處理兩次。因?yàn)镾park和ZooKeeper之間可能是不同步的。(官方現(xiàn)在已經(jīng)不推薦這種整合方式。)

image

  • 準(zhǔn)備工作

1)啟動(dòng)zookeeper集群

zkServer.sh start

2)啟動(dòng)kafka集群

kafka-server-start.sh  /export/servers/kafka/config/server.properties 

3.創(chuàng)建topic

kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic spark_kafka 

4.通過(guò)shell命令向topic發(fā)送消息

kafka-console-producer.sh --broker-list node01:9092 --topic  spark_kafka 

5.添加kafka的pom依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
  • API

通過(guò)receiver接收器獲取kafka中topic數(shù)據(jù),可以并行運(yùn)行更多的接收器讀取kafak topic中的數(shù)據(jù),這里為3個(gè)

val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
      val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
      stream
    }) 

如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設(shè)置存儲(chǔ)級(jí)別(默認(rèn)StorageLevel.MEMORY_AND_DISK_SER_2)

代碼演示

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable

object SparkKafka {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建StreamingContext
    val config: SparkConf = 
new SparkConf().setAppName("SparkStream").setMaster("local[*]")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
//開啟WAL預(yù)寫日志,保證數(shù)據(jù)源端可靠性
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint("./kafka")
//==============================================
    //2.準(zhǔn)備配置參數(shù)
    val zkQuorum = "node01:2181,node02:2181,node03:2181"
    val groupId = "spark"
    val topics = Map("spark_kafka" -> 2)//2表示每一個(gè)topic對(duì)應(yīng)分區(qū)都采用2個(gè)線程去消費(fèi),
//ssc的rdd分區(qū)和kafka的topic分區(qū)不一樣,增加消費(fèi)線程數(shù),并不增加spark的并行處理數(shù)據(jù)數(shù)量
    //3.通過(guò)receiver接收器獲取kafka中topic數(shù)據(jù),可以并行運(yùn)行更多的接收器讀取kafak topic中的數(shù)據(jù),這里為3個(gè)
    val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
      val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
      stream
    })
    //4.使用union方法,將所有receiver接受器產(chǎn)生的Dstream進(jìn)行合并
    val allDStream: DStream[(String, String)] = ssc.union(receiverDStream)
    //5.獲取topic的數(shù)據(jù)(String, String) 第1個(gè)String表示topic的名稱,第2個(gè)String表示topic的數(shù)據(jù)
    val data: DStream[String] = allDStream.map(_._2)
//==============================================
    //6.WordCount
    val words: DStream[String] = data.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
} 

2.Direct

Direct方式會(huì)定期地從kafka的topic下對(duì)應(yīng)的partition中查詢最新的偏移量,再根據(jù)偏移量范圍在每個(gè)batch里面處理數(shù)據(jù),Spark通過(guò)調(diào)用kafka簡(jiǎn)單的消費(fèi)者API讀取一定范圍的數(shù)據(jù)。
image
  • Direct的缺點(diǎn)是無(wú)法使用基于zookeeper的kafka監(jiān)控工具
  • Direct相比基于Receiver方式有幾個(gè)優(yōu)點(diǎn):
  1. 簡(jiǎn)化并行
    不需要?jiǎng)?chuàng)建多個(gè)kafka輸入流,然后union它們,sparkStreaming將會(huì)創(chuàng)建和kafka分區(qū)數(shù)一樣的rdd的分區(qū)數(shù),而且會(huì)從kafka中并行讀取數(shù)據(jù),spark中RDD的分區(qū)數(shù)和kafka中的分區(qū)數(shù)據(jù)是一一對(duì)應(yīng)的關(guān)系。
  2. 高效
    Receiver實(shí)現(xiàn)數(shù)據(jù)的零丟失是將數(shù)據(jù)預(yù)先保存在WAL中,會(huì)復(fù)制一遍數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)被拷貝兩次,第一次是被kafka復(fù)制,另一次是寫到WAL中。而Direct不使用WAL消除了這個(gè)問(wèn)題。
  3. 恰好一次語(yǔ)義(Exactly-once-semantics)
    Receiver讀取kafka數(shù)據(jù)是通過(guò)kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過(guò)數(shù)據(jù)保存在WAL中保證數(shù)據(jù)不丟失,但是可能會(huì)因?yàn)閟parkStreaming和ZK中保存的偏移量不一致而導(dǎo)致數(shù)據(jù)被消費(fèi)了多次。

Direct的Exactly-once-semantics(EOS)通過(guò)實(shí)現(xiàn)kafka低層次api,偏移量?jī)H僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問(wèn)題。

  • API
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

代碼演示

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafka2 {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建StreamingContext
    val config: SparkConf = 
new SparkConf().setAppName("SparkStream").setMaster("local[*]")
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc,Seconds(5))
    ssc.checkpoint("./kafka")
    //==============================================
    //2.準(zhǔn)備配置參數(shù)
    val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "spark")
    val topics = Set("spark_kafka")
    val allDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    //3.獲取topic的數(shù)據(jù)
    val data: DStream[String] = allDStream.map(_._2)
    //==============================================
    //WordCount
    val words: DStream[String] = data.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
} 

spark-streaming-kafka-0-10

  • 說(shuō)明

spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發(fā)中使用

  • pom.xml
<!--<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
  • API:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

  • 創(chuàng)建topic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic spark_kafka 
  • 啟動(dòng)生產(chǎn)者
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka 
  • 代碼演示
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaDemo {
  def main(args: Array[String]): Unit = {
    //1.創(chuàng)建StreamingContext
    //spark.master should be set as local[n], n > 1
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對(duì)數(shù)據(jù)進(jìn)行切分形成一個(gè)RDD
    //準(zhǔn)備連接Kafka的參數(shù)
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      //earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),從頭開始消費(fèi)
      //latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
      //none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
      //這里配置latest自動(dòng)重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費(fèi),沒有偏移量從新來(lái)的數(shù)據(jù)開始消費(fèi)
      "auto.offset.reset" -> "latest",
      //false表示關(guān)閉自動(dòng)提交.由spark幫你提交到Checkpoint或程序員手動(dòng)維護(hù)
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("spark_kafka")
    //2.使用KafkaUtil連接Kafak獲取數(shù)據(jù)
    val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent,//位置策略,源碼強(qiáng)烈推薦使用該策略,會(huì)讓Spark的Executor和Kafka的Broker均勻?qū)?yīng)
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消費(fèi)策略,源碼強(qiáng)烈推薦使用該策略
    //3.獲取VALUE數(shù)據(jù)
    val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord
    val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發(fā)過(guò)來(lái)的value,即一行數(shù)據(jù)
    val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
    val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
    result.print()
    ssc.start()//開啟
    ssc.awaitTermination()//等待優(yōu)雅停止
  }
} 

好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過(guò)程,

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.SparkStreaming 1.6 + kafka 0.8.2 Receiver模式 1):采用receiv...
    喬一波一閱讀 1,817評(píng)論 0 4
  • 如何保證消息不丟失,不重復(fù)消費(fèi) 基于 Receiver-based 的 createStream 方法。recei...
    db9388a2d4c5閱讀 395評(píng)論 0 0
  • 前言 在WeTest輿情項(xiàng)目中,需要對(duì)每天千萬(wàn)級(jí)的游戲評(píng)論信息進(jìn)行詞頻統(tǒng)計(jì),在生產(chǎn)者一端,我們將數(shù)據(jù)按照每天的拉取...
    生活的探路者閱讀 2,220評(píng)論 0 6
  • 內(nèi)容 sparkStreaming簡(jiǎn)介 spark Streaming和Storm區(qū)別 Spark Streami...
    SUSUR_28f6閱讀 2,642評(píng)論 0 0
  • 張林波 寧波大發(fā)化纖 【日精進(jìn)打卡第4天】 【知~學(xué)習(xí)】 《大學(xué)》背誦1篇 精進(jìn)六條背讀2篇 【行~實(shí)踐】 一、修...
    36eb5a0f61cd閱讀 148評(píng)論 0 0

友情鏈接更多精彩內(nèi)容