Spark 連接kafka兩種方式及區(qū)別(direct和receiver)

Kafka Direct 跟Receiver方式的區(qū)別

Receiver

Receiver

Receiver是使用Kafka的 High-Level Consumer API來實(shí)現(xiàn)的。Receiver從Kafka中獲取的數(shù)據(jù)都存儲(chǔ)在Spark Executor的內(nèi)存中的(如果數(shù)據(jù)暴增,數(shù)據(jù)大量堆積,容易出現(xiàn)oom的問題),Spark Streaming啟動(dòng)的job會(huì)去處理那些數(shù)據(jù)。
在默認(rèn)的配置下,這種方式可能會(huì)因?yàn)榈讓拥氖《鴣G失數(shù)據(jù),如果要啟用高可靠機(jī)制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL),該機(jī)制會(huì)同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS,S3)上的預(yù)寫日志中,所以當(dāng)?shù)讓庸?jié)點(diǎn)出現(xiàn)了失敗,可以通過WAL中的數(shù)據(jù)進(jìn)行恢復(fù),但是效率會(huì)下降。

使用時(shí)注意事項(xiàng):

1.操作簡(jiǎn)單,代碼量少,不需要手動(dòng)管理offset,需要開啟wal機(jī)制,可以保證數(shù)據(jù)不丟失,但效率會(huì)減低,并且為了保證數(shù)據(jù)不丟失,將一份數(shù)據(jù)存兩份,浪費(fèi)資源
2.無法保證數(shù)據(jù)只被處理一次,在寫入外部存儲(chǔ)的數(shù)據(jù)還未將offset更新到zk就掛掉,這些數(shù)據(jù)會(huì)被重復(fù)消費(fèi)
3.kafka的topic的分區(qū)和spark streaming生成的rdd分區(qū)不相關(guān),增加topic的分區(qū)數(shù),只會(huì)增加reciver讀取分區(qū)數(shù)據(jù)的線程數(shù),并不會(huì)提高spark的處理數(shù)據(jù)的并行度

Direct

Direct

Direct 使用Kafka的Low-Level Consumer api讀取kafka數(shù)據(jù),來獲得每個(gè)topic+partition的最新的offset,從而定義每個(gè)batch的offset的范圍。當(dāng)處理數(shù)據(jù)的job啟動(dòng)時(shí),就會(huì)使用Kafka的Low-Level Consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)。

使用時(shí)注意事項(xiàng):

1.當(dāng)讀取topic的數(shù)據(jù)時(shí)候,會(huì)自動(dòng)對(duì)應(yīng)topic的分區(qū)生成對(duì)應(yīng)的RDD分區(qū)并行從Kafka中讀取數(shù)據(jù),在Kafka partition和RDD partition之間,有一對(duì)一的映射關(guān)系。
2.不需要開啟WAL機(jī)制,只要Kafka中作了數(shù)據(jù)的備份,那么就可以使用通過Kafka的副本進(jìn)行恢復(fù)。
3.Spark內(nèi)部一定時(shí)同步的,所以可以自己跟蹤offset并保存到checkpoint中,可以保證數(shù)據(jù)不會(huì)被重復(fù)消費(fèi)
4.操作復(fù)雜,代碼量大,并且需要自己對(duì)offset監(jiān)控維護(hù),增加用戶開發(fā)成本

Receiver配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無法保證數(shù)據(jù)被處理一次且僅一次,可能會(huì)處理兩次。因?yàn)镾park和ZooKeeper之間可能是不同步的?;赿irect的方式,使用kafka的簡(jiǎn)單api,SparkStreaming自己就負(fù)責(zé)追蹤消費(fèi)的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次。

區(qū)別

Receiver Direct
需要開啟WAL 不需要開啟WAL
使用高層次 api 使用簡(jiǎn)單api
zk自動(dòng)維護(hù) 手動(dòng)維護(hù)offset
無法保證數(shù)據(jù)被處理一次 數(shù)據(jù)只被處理一次
代碼簡(jiǎn)單,量少 代碼復(fù)雜,量大
topic分區(qū)與rdd分區(qū)不是一對(duì)一的關(guān)系 topic分區(qū)與rdd分區(qū)是一對(duì)一的關(guān)系
由receiver拉取kafka數(shù)據(jù) 由rdd分區(qū)拉取對(duì)應(yīng)分區(qū)的數(shù)據(jù)(kafka與rdd分區(qū)相等的情況)
.. ..


連接kafka的兩種方式 (receiver&direct) 栗子

Maven依賴

 <properties>
 <scala.version>2.11.8</scala.version>
 <spark.version>2.1.3</spark.version>
 <scala.binary.version>2.11</scala.binary.version>
 </properties>
 <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <!-- spark-streaming kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
 </dependencies>

Receiver

package xzw.shuai.kafka.demo

import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaReceiver {

  private val topics = "receiver-test"
  private val HDFS_PATH = "hdfs://node01:9000/kafka-ck"
  private val numThreads = 1

  def main(args: Array[String]): Unit = {
    //當(dāng)應(yīng)用程序停止的時(shí)候,會(huì)將當(dāng)前批次的數(shù)據(jù)處理完成后在停止
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //1000*分區(qū)數(shù)*采樣時(shí)間=拉取數(shù)據(jù)量
    System.setProperty("spark.streaming.kafka.maxRatePerPartition", "1000")
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
      //設(shè)置監(jiān)控級(jí)別
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint(HDFS_PATH)
    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",
      "group.id" -> "receiver"
    )
    
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val kafkaDStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_2)

    // word count
    kafkaDStream
      .map(_._2) // 1是分區(qū)號(hào),2是具體kafka中數(shù)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print(10) // 輸出結(jié)果

    ssc.start()
    ssc.awaitTermination()
  }
}

Direct

package xzw.shuai.kafka.demo

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.zookeeper.data.Stat

object SparkKafkaDirect {
  private val zkHosts = "node01:2181,node02:2181,node03:2181"
  private val logger = Logger.getLogger("SparkKafkaDirect")
  private val zkPath = "/kafka-direct-test"
  private val topic = Set("direct-test")
  private val HDFS_PATH="hdfs://node01:9000/kafka-ck"
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("receiver")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val ssc = new StreamingContext(sc, Seconds(5))    

    val kafkaParams = Map(
      "metadata.broker.list" -> "node01:9091,node02:9092,node03:9092",
      "group.id" -> "direct"
    )

    val zkClient: ZkClient = new ZkClient(zkHosts)

    // 讀取 offset
    val offsets: Option[Map[TopicAndPartition, Long]] = readOffset(zkClient)

    // 獲取到kafka數(shù)據(jù)
    val kafkaDStream: InputDStream[(String, String)] = offsets match {
      // 使用 direct方式消費(fèi)kafka數(shù)據(jù)
      case None =>
        print("start from scratch")
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
      case Some(offset) =>
        print("start with the offset")
        val messageHeader = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
        KafkaUtils.createDirectStream[String, String, StringDecoder,
          StringDecoder, (String, String)](ssc, kafkaParams, offset, messageHeader)
    }

    // word count
    kafkaDStream.map(_._2) // 1是分區(qū)號(hào),2是具體kafka中數(shù)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .foreachRDD(print(_)) // 輸出結(jié)果
    
    // 保存偏移量到zk中 , 也可自定義到其他存儲(chǔ)介質(zhì)
    kafkaDStream.foreachRDD(rdd =>
      saveOffset(zkClient, zkHosts, zkPath, rdd)
    )

    ssc.start()
    ssc.awaitTermination()

  }

  // 保存 offset
  def saveOffset(zkClient: ZkClient, zkHost: String, zkPath: String, rdd: RDD[_]): Unit = {
    logger.info("save offsets to Zookeeper")
    val stopwatch = new Stopwatch()
    val offsetsRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsRanges.foreach(offsetRange => logger.debug(s"  Using $offsetRange"))
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
      .mkString(",")
    logger.info("writing offsets to Zookeeper zkClient=" + zkClient + "  zkHosts=" + zkHosts + "zkPath=" + zkPath + "  offsetsRangesStr:" + offsetsRangesStr)
    updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
    logger.info("done updating offsets in zookeeper. took " + stopwatch)
  }

  // 讀取 offset
  def readOffset(zkClient: ZkClient): Option[Map[TopicAndPartition, Long]] = {

    val stopwatch = new Stopwatch()

    val stat = new Stat()
    val dataAndStat: (Option[String], Stat) = try {
      (Some(zkClient.readData(zkPath, stat)), stat)
    } catch {
      case _ => (None, stat)
      case e2: Throwable => throw e2
    }

    // 獲取offset
    dataAndStat._1 match {
      case Some(offsetsRangeStr) =>
        logger.info(s" Read offset ranges: $offsetsRangeStr")
        val offset: Map[TopicAndPartition, Long] = offsetsRangeStr.split(",")
          .map(str => str.split(":"))
          .map {
            case Array(partitions, offset) =>
              TopicAndPartition(topic.last, partitions.toInt) -> offset.toLong
          }.toMap
        logger.info("Done reading offsets from Zookeeper. Took " + stopwatch)
        Some(offset)
      case None =>
        logger.info(" No offsets found in Zookeeper. Took " + stopwatch)
        None
    }
  }

  // 更新 zk中的 offset
  def updatePersistentPath(zkClient: ZkClient, zkPath: String, offsetsRangesStr: String): Unit = {
    try {
      zkClient.writeData(zkPath, offsetsRangesStr)
    } catch {
      // 如果失敗了 ==> 沒有此目錄,則創(chuàng)建目錄
      case _: ZkNoNodeException =>
        createParentPath(zkClient, zkPath)
        try {
          // 創(chuàng)建一個(gè)持久的節(jié)點(diǎn) ==> 即 目錄
          // 在offset寫入到 該節(jié)點(diǎn)
          zkClient.createPersistent(zkPath, offsetsRangesStr)
        } catch {
          case _: ZkNodeExistsException =>
            zkClient.writeData(zkPath, offsetsRangesStr)
          case e2: Throwable => throw e2
        }
      case e2: Throwable => throw e2
    }
  }

  // 如果path不存在,則創(chuàng)建
  def createParentPath(zkClient: ZkClient, zkPath: String): Unit = {
    val parentDir = zkPath.substring(0, zkPath.lastIndexOf('/'))
    if (parentDir.length != 0)
      zkClient.createPersistent(parentDir, true)
  }

  // 過程時(shí)間
  class Stopwatch {
    private val start = System.currentTimeMillis()

    override def toString: String = (System.currentTimeMillis() - start) + " ms"
  }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容