Spark Streaming整合Kafka

前幾章介紹了Kafka、Spark Streaming入門、Spark Streaming進(jìn)階。在這一章一起學(xué)習(xí)Spark Streaming和Kafka的整合。

概述

kafka作為一個(gè)實(shí)時(shí)的分布式消息隊(duì)列,實(shí)時(shí)的生產(chǎn)和消費(fèi)消息,這里我們可以利用SparkStreaming實(shí)時(shí)計(jì)算框架實(shí)時(shí)地讀取kafka中的數(shù)據(jù)然后進(jìn)行計(jì)算。在spark1.3版本后,kafkaUtils里面提供了兩個(gè)創(chuàng)建dstream的方法,一種為KafkaUtils.createDstream(需要receiver接收),另一種為KafkaUtils.createDirectStream。其中推薦使用KafkaUtils.createDirectStream的方式相比基于Receiver方式有幾個(gè)優(yōu)點(diǎn):

  • 簡(jiǎn)化并行
    不需要?jiǎng)?chuàng)建多個(gè)kafka輸入流,然后union它們,sparkStreaming將會(huì)創(chuàng)建和kafka分區(qū)一種的rdd的分區(qū)數(shù),而且會(huì)從kafka中并行讀取數(shù)據(jù),spark中RDD的分區(qū)數(shù)和kafka中的分區(qū)數(shù)據(jù)是一一對(duì)應(yīng)的關(guān)系。
  • 高效
    第一種實(shí)現(xiàn)數(shù)據(jù)的零丟失是將數(shù)據(jù)預(yù)先保存在WAL中,會(huì)復(fù)制一遍數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)被拷貝兩次,第一次是被kafka復(fù)制,另一次是寫到WAL中。而沒有receiver的這種方式消除了這個(gè)問題。
  • 恰好一次語義(Exactly-once-semantics)
    Receiver讀取kafka數(shù)據(jù)是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數(shù)據(jù)保存在WAL中保證數(shù)據(jù)不丟失,但是可能會(huì)因?yàn)閟parkStreaming和ZK中保存的偏移量不一致而導(dǎo)致數(shù)據(jù)被消費(fèi)了多次。EOS通過實(shí)現(xiàn)kafka低層次api,偏移量?jī)H僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。缺點(diǎn)是無法使用基于zookeeper的kafka監(jiān)控工具。
  • 版本限制
    除了以上的原因,由于在學(xué)習(xí)Kafka時(shí)安裝的版本是2.2.0,查詢官方文檔Spark Streaming整合Kafka在0.10已經(jīng)不支持Receiver的方式。
    image.png

綜上我們只演示KafkaUtils.createDirectStream的方式進(jìn)行整合。

整合流程

  • 啟動(dòng)zookeeper集群
zkServer.sh start
  • 啟動(dòng)kafka集群
    在啟動(dòng)之前在server.properties中根據(jù)虛擬機(jī)地址配置listeners的地址


    image.png

    因?yàn)椴慌渲迷趩?dòng)整合代碼時(shí)報(bào)Broker may not be available的錯(cuò)誤,通過百度后指定listeners的地址即可。
    啟動(dòng)kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
  • 創(chuàng)建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
//查看創(chuàng)建的topic,有記錄說明創(chuàng)建成功
kafka-topics.sh --list --zookeeper localhost:2181
  • 啟動(dòng)生成者,向topic中生產(chǎn)數(shù)據(jù)
./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_spark
  • 編寫SparkStreaming應(yīng)用程序
  • pom依賴
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.2</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.3</version>
</dependency>

這里spark-streaming的版本我選擇的是spark-streaming_2.12:2.42版本,這是由于我本地用的Scala的環(huán)境是2.12.8,spark-streaming這個(gè)版本中用到的Scala版本就是2.12.8。之前我使用的是spark-streaming_2.12:2.11.8版本,項(xiàng)目啟動(dòng)時(shí)報(bào)環(huán)境不匹配的問題。所以在本地演示時(shí)需要選擇合適的版本。

  • Scala代碼
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * Spark Streaming整合Kafka
  *
  * @author zhiying.dong@hand-china.com 2019/05/24 16:54
  */
object KafkaDirectWordCount{
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("DirectKafka")
      .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(2))

    val topicsSet = Array("kafka_spark")
    val kafkaParams = mutable.HashMap[String, String]()
    //必須添加以下參數(shù),否則會(huì)報(bào)錯(cuò)
    kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
    kafkaParams.put("group.id", "group1")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
      )
    )

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 本地測(cè)試
    在生產(chǎn)者中輸入統(tǒng)計(jì)字符
image.png

觀察控制臺(tái)發(fā)現(xiàn)可以統(tǒng)計(jì)字符出現(xiàn)的此時(shí),說明Spark Streaming可以消費(fèi)到Kafka中生產(chǎn)的消息


image.png
  • 服務(wù)器測(cè)試
    和之前一樣,打包上傳到服務(wù)器,在通過以下命令啟動(dòng)
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.3 --class com.imooc.spark.Test ~/lib/sparktrain-1.0.jar
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容