SpringStreaming+Kafka

摘自 :
Spark踩坑記——Spark Streaming+Kafka

[TOC]

SpringStreaming+Kafka

1.SpringStreaming+Kafka 接受數(shù)據(jù)和發(fā)送數(shù)據(jù)

(1)SparkStreaming 接受kafka方式

  • 基于Received的方式
    基于Receiverd方式獲取數(shù)據(jù).png

    KafkaStream-Recevied方式.jpg
  • 基于DirectKafkaStreaming
    DirectKafkaStreaming獲取kafka數(shù)據(jù).png

DirectKafkaStreaming 相比較 ReceiverKafkaStreaming

  • 簡(jiǎn)化的并行:在Receiver的方式中我們提到創(chuàng)建多個(gè)Receiver之后利用union來合并成一個(gè)Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對(duì)應(yīng)的并行讀取Kafka數(shù)據(jù),這種映射關(guān)系也更利于理解和優(yōu)化。
  • 高效:在Receiver的方式中,為了達(dá)到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費(fèi)!而第二種方式不存在這個(gè)問題,只要我們Kafka的數(shù)據(jù)保留時(shí)間足夠長(zhǎng),我們都能夠從Kafka進(jìn)行數(shù)據(jù)恢復(fù)。
  • 精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)。而第二種方式,直接使用了簡(jiǎn)單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進(jìn)行記錄,消除了這種不一致性。

(2)Spark 發(fā)送數(shù)據(jù)至Kafka中

一般處理方式 : 在RDD.forpartition進(jìn)行操作

input.foreachRDD(rdd =>
  // 不能在這里創(chuàng)建KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
) 

此方式的缺點(diǎn)在于每次foreach操作都需要重新創(chuàng)建一次kafkaProduce 主要花費(fèi)時(shí)間都在 創(chuàng)建連接的時(shí)候.
基于此我們以以下方式進(jìn)行操作

  • 首先,我們需要將KafkaProducer利用lazy val的方式進(jìn)行包裝如下:
 
    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
    class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    }
    object KafkaSink {
      import scala.collection.JavaConversions._
      def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
          producer
        }
        new KafkaSink(createProducerFunc)
      }
      def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
    }
  • 之后我們利用廣播變量的形式,將KafkaProducer廣播到每一個(gè)executor,如下:
// 廣播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", Conf.brokers)
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}
  • 這樣我們就能在每個(gè)executor中愉快的將數(shù)據(jù)輸入到kafka當(dāng)中:
//輸出到kafka
segmentedStream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreach(record => {
      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
      // do something else
    })
  }
})

2.Spark streaming+Kafka調(diào)優(yōu)

2.1 批處理時(shí)間設(shè)置

參數(shù)設(shè)置:

2.2 合理的Kafka拉取量

參數(shù)設(shè)置: spark.streaming.kafka.maxRatePerPartition

2.3 緩存反復(fù)使用的Dstream(RDD)

DStream.cache()

2.4 設(shè)置合理的GC

長(zhǎng)期使用Java的小伙伴都知道,JVM中的垃圾回收機(jī)制,可以讓我們不過多的關(guān)注與內(nèi)存的分配回收,更加專注于業(yè)務(wù)邏輯,JVM都會(huì)為我們搞定。對(duì)JVM有些了解的小伙伴應(yīng)該知道,在Java虛擬機(jī)中,將內(nèi)存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費(fèi)一定時(shí)間的,尤其是老年代的GC回收,需要對(duì)內(nèi)存碎片進(jìn)行整理,通常采用標(biāo)記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時(shí)間也是影響整個(gè)Spark效率的關(guān)鍵因素。在通常的使用中建議:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

2.5 設(shè)置合理的CPU資源數(shù)

CPU的core數(shù)量,每個(gè)executor可以占用一個(gè)或多個(gè)core,可以通過觀察CPU的使用率變化來了解計(jì)算資源的使用情況,例如,很常見的一種浪費(fèi)是一個(gè)executor占用了多個(gè)core,但是總的CPU使用率卻不高(因?yàn)橐粋€(gè)executor并不總能充分利用多核的能力),這個(gè)時(shí)候可以考慮讓么個(gè)executor占用更少的core,同時(shí)worker下面增加更多的executor,或者一臺(tái)host上面增加更多的worker來增加并行執(zhí)行的executor的數(shù)量,從而增加CPU利用率。但是增加executor的時(shí)候需要考慮好內(nèi)存消耗,因?yàn)橐慌_(tái)機(jī)器的內(nèi)存分配給越多的executor,每個(gè)executor的內(nèi)存就越小,以致出現(xiàn)過多的數(shù)據(jù)spill over甚至out of memory的情況。

2.6設(shè)置合理的parallelism

partition和parallelism,partition指的就是數(shù)據(jù)分片的數(shù)量,每一次task只能處理一個(gè)partition的數(shù)據(jù),這個(gè)值太小了會(huì)導(dǎo)致每片數(shù)據(jù)量太大,導(dǎo)致內(nèi)存壓力,或者諸多executor的計(jì)算能力無法利用充分;但是如果太大了則會(huì)導(dǎo)致分片太多,執(zhí)行效率降低。在執(zhí)行action類型操作的時(shí)候(比如各種reduce操作),partition的數(shù)量會(huì)選擇parent RDD中最大的那一個(gè)。而parallelism則指的是在RDD進(jìn)行reduce類操作的時(shí)候,默認(rèn)返回?cái)?shù)據(jù)的paritition數(shù)量(而在進(jìn)行map類操作的時(shí)候,partition數(shù)量通常取自parent RDD中較大的一個(gè),而且也不會(huì)涉及shuffle,因此這個(gè)parallelism的參數(shù)沒有影響)。所以說,這兩個(gè)概念密切相關(guān),都是涉及到數(shù)據(jù)分片的,作用方式其實(shí)是統(tǒng)一的。通過spark.default.parallelism可以設(shè)置默認(rèn)的分片數(shù)量,而很多RDD的操作都可以指定一個(gè)partition參數(shù)來顯式控制具體的分片數(shù)量。
在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對(duì)應(yīng)的,我們一般默認(rèn)設(shè)置為Kafka中Partition的數(shù)量。

2.7使用高性能的算子

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之后進(jìn)行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容