19 Spark Streaming中空RDD的處理

在Spark Streaming中,job不斷的產(chǎn)生,有時候會產(chǎn)生一些空RDD,而基于這些空RDD生成的job大多數(shù)情況下是沒必要提交到集群執(zhí)行的。執(zhí)行沒有結(jié)果的job,就是浪費計算資源,數(shù)據(jù)庫連接資源,產(chǎn)生空文件等。
這里介紹兩種判斷空RDD的方式,第一種是以Receiver接收數(shù)據(jù)時產(chǎn)生的BlockRDD或WriteAheadLogBackedBlockRDD,所有以Receiver方式接收數(shù)據(jù)都會產(chǎn)生BlockRDD或WriteAheadLogBackedBlockRDD,第二種是以Direct Kafka方式接收數(shù)據(jù)產(chǎn)生的KafkaRDD。

  1. 第一種情況,以Receiver方式接收數(shù)據(jù),計算wordCount為例來說明空RDD如何處理,代碼如下
object ReceiverWordCount {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[3]")
    conf.set("spark.testing.memory", "2147480000")
    val ssc = new StreamingContext(conf, Seconds(10))

    val lines = ssc.socketTextStream("10.10.63.106", 8589, StorageLevel.MEMORY_AND_DISK_SER)

    val words= lines.flatMap(_.split(""))
    val wordCounts= words.map(x => (x,1)).reduceByKey((num1:Int,num2:Int)=>num1+num2,2)
    wordCounts.foreachRDD(rdd=>{
     if(rdd.dependencies(0).rdd.partitions.isEmpty){
        println(">>>RDD:Empty")
      }else{
        rdd.foreach(x=>println(x._1+"\t"+x._2))
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

這里為了方便,在foreachRDD中使用了rdd.foreach(x=>println(x._1+"\t"+x._2))來打印結(jié)果,只是簡單的效果演示,生產(chǎn)環(huán)境一般會輸出到外部存儲系統(tǒng)中,例如mysql、redis 、hdfs等
這里總結(jié)了三種判斷空RDD方式的,我們來看一下這三種方式有什么不同:
第一種:if(rdd.count==0)
RDD的count操作會觸發(fā)一個action,提交一個job,這種方式不是我們想要的
第二種:if(rdd.partitions.isEmpty)
判斷rdd的partitions是否為空,那我們需要看一下這里的rdd是怎么得來的,經(jīng)過上面WordCount中的一系列transformation操作后,最后一個reduceByKey操作產(chǎn)生的ShuffledRDD 。經(jīng)過reduceByKey操作后,分區(qū)數(shù)量會受到默認分區(qū)數(shù)或用戶指定的分區(qū)數(shù)的影響,和最初BlockRDD的分區(qū)數(shù)不一樣,因為ShuffledRDD的分區(qū)數(shù)不可能為0,所以if(rdd.partitions.isEmpty)無效。if(rdd.partitions.isEmpty)在什么有效呢?只有在當(dāng)前rdd和BlockRDD在同一個stage時才會有效,因為分區(qū)數(shù)沒有變化
第三種:if(rdd.dependencies(0).rdd.partitions.isEmpty)
根據(jù)RDD的依賴關(guān)系,從后向前尋找BlockRDD,因為在BlockRDD生成的時候分區(qū)數(shù)受blockInfos(Receiver接收數(shù)據(jù)的元數(shù)據(jù)信息)的影響,代碼如下

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {

    if (blockInfos.nonEmpty) {
      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

      // Are WAL record handles present with all the blocks
      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

      if (areWALRecordHandlesPresent) {
        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
        new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      } else {
        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
        // others then that is unexpected and log a warning accordingly.
        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
            logError("Some blocks do not have Write Ahead Log information; this is unexpected and data may not be recoverable after driver failures")
          } else {
            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
          }
        }
        val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) }
        if (validBlockIds.size != blockIds.size) {
          logWarning("Some blocks could not be recovered as they were not found in memory. " +
            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
            "for more details.")
        }
        new BlockRDD[T](ssc.sc, validBlockIds)
      }
    } else {
      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
      // according to the configuration
      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      } else {
        new BlockRDD[T](ssc.sc, Array.empty)
      }
    }
}

如果blockInfos為空,BlockRDD的分區(qū)數(shù)也為空,所以要判斷BlockRDD的分區(qū)數(shù)。這里只判斷了當(dāng)前rdd的父RDD分區(qū)是否為空,因為父RDD和BlockRDD在同一個stage內(nèi),分區(qū)數(shù)是一致的。RDD的依賴關(guān)系可以通過rdd.toDebugString和web頁面獲得,stage劃分也可以通過web頁面獲得。

  1. 第二種情況,以Direct kafka的方式接收數(shù)據(jù)的方式,計算WordCount為例,代碼如下
object DirectKafkaDemo{
  def main(args: Array[String]) {

    val topics = "DirectKafkaDemo"
    val brokers = "*:9092,*:9092"
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    sparkConf.set("spark.testing.memory", "2147480000")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    val result = messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    result.foreachRDD(rdd => {

      val num= rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.count()

      if(num>0) {
        rdd.foreachPartition(data => {
          val conn = MDBManager.getConnection
          conn.setAutoCommit(false)
          val sql = "insert into word set key1=?,num=?;"
          val preparedStatement = conn.prepareStatement(sql)
          data.foreach(recode => {
            val key = recode._1;
            val num = recode._2;
            preparedStatement.setString(1, key)
            preparedStatement.setInt(2, num)
            preparedStatement.addBatch()
            println("key:" + key + "\tnum:" + num)
          })
          preparedStatement.executeBatch()
          conn.commit()
          conn.close()
        })
      }else{
          println(">>>>>>>>>>>>>>>>>>>>>>RDD Empty")
      }
    })

    ssc.start()
    ssc.awaitTermination()

  }
}

這里使用了KafkaRDD的count操作來判斷KafkaRDD是否為空,如果不為空,將計算結(jié)果保存到數(shù)據(jù)庫中,減少不必要是數(shù)據(jù)庫操作。獲取KafkaRDD的代碼如下,不同代碼編寫RDD的依賴關(guān)系是不一樣的,要根據(jù)代碼而定

val num= rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.count()

看一下KafkaRDD的count()方法,他重寫了RDD的count方法,代碼如下

override def count(): Long = offsetRanges.map(_.count).sum

他并沒有觸發(fā)一個runJob操作,而是通過讀取kafka分區(qū)的offset偏移量來計算RDD記錄的個數(shù),這里是利用了kafka的特性。通過依賴關(guān)系找到KafkaRDD,然后調(diào)用KafkaRDD的count()方法,就知道KafkaRDD是否為空,如果KafkaRDD為空,就沒必要runJob了。
那么判斷KafkaRDD的分區(qū)數(shù)是否也可以,看一下KafkaRDD的分區(qū)數(shù)是怎么得來的,代碼如下

override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
}

和offsetRanges的數(shù)量有關(guān),因為offsetRanges是根據(jù)kafka的分區(qū)數(shù)而來,offsetRanges的數(shù)量是固定不變的,從而KafkaRDD的分區(qū)數(shù)是固定的,不管分區(qū)有沒有數(shù)據(jù),因此不能判斷KafkaRDD的分區(qū)數(shù)

總結(jié)
不同數(shù)據(jù)接收方式的RDD,表現(xiàn)數(shù)據(jù)為空都可能是不一樣的,通過RDD的依賴關(guān)系正確找到數(shù)據(jù)源RDD是最關(guān)鍵的。此方法使用一定要結(jié)合業(yè)務(wù)和RDD的具體生成方式,這里說的依賴關(guān)系都是之有一個父RDD,如果有多個父RDD要根據(jù)情況決定是否可以使用此方法。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容