在Spark Streaming中,job不斷的產(chǎn)生,有時候會產(chǎn)生一些空RDD,而基于這些空RDD生成的job大多數(shù)情況下是沒必要提交到集群執(zhí)行的。執(zhí)行沒有結(jié)果的job,就是浪費計算資源,數(shù)據(jù)庫連接資源,產(chǎn)生空文件等。
這里介紹兩種判斷空RDD的方式,第一種是以Receiver接收數(shù)據(jù)時產(chǎn)生的BlockRDD或WriteAheadLogBackedBlockRDD,所有以Receiver方式接收數(shù)據(jù)都會產(chǎn)生BlockRDD或WriteAheadLogBackedBlockRDD,第二種是以Direct Kafka方式接收數(shù)據(jù)產(chǎn)生的KafkaRDD。
- 第一種情況,以Receiver方式接收數(shù)據(jù),計算wordCount為例來說明空RDD如何處理,代碼如下
object ReceiverWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[3]")
conf.set("spark.testing.memory", "2147480000")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream("10.10.63.106", 8589, StorageLevel.MEMORY_AND_DISK_SER)
val words= lines.flatMap(_.split(""))
val wordCounts= words.map(x => (x,1)).reduceByKey((num1:Int,num2:Int)=>num1+num2,2)
wordCounts.foreachRDD(rdd=>{
if(rdd.dependencies(0).rdd.partitions.isEmpty){
println(">>>RDD:Empty")
}else{
rdd.foreach(x=>println(x._1+"\t"+x._2))
}
})
ssc.start()
ssc.awaitTermination()
}
}
這里為了方便,在foreachRDD中使用了rdd.foreach(x=>println(x._1+"\t"+x._2))來打印結(jié)果,只是簡單的效果演示,生產(chǎn)環(huán)境一般會輸出到外部存儲系統(tǒng)中,例如mysql、redis 、hdfs等
這里總結(jié)了三種判斷空RDD方式的,我們來看一下這三種方式有什么不同:
第一種:if(rdd.count==0)
RDD的count操作會觸發(fā)一個action,提交一個job,這種方式不是我們想要的
第二種:if(rdd.partitions.isEmpty)
判斷rdd的partitions是否為空,那我們需要看一下這里的rdd是怎么得來的,經(jīng)過上面WordCount中的一系列transformation操作后,最后一個reduceByKey操作產(chǎn)生的ShuffledRDD 。經(jīng)過reduceByKey操作后,分區(qū)數(shù)量會受到默認分區(qū)數(shù)或用戶指定的分區(qū)數(shù)的影響,和最初BlockRDD的分區(qū)數(shù)不一樣,因為ShuffledRDD的分區(qū)數(shù)不可能為0,所以if(rdd.partitions.isEmpty)無效。if(rdd.partitions.isEmpty)在什么有效呢?只有在當(dāng)前rdd和BlockRDD在同一個stage時才會有效,因為分區(qū)數(shù)沒有變化
第三種:if(rdd.dependencies(0).rdd.partitions.isEmpty)
根據(jù)RDD的依賴關(guān)系,從后向前尋找BlockRDD,因為在BlockRDD生成的時候分區(qū)數(shù)受blockInfos(Receiver接收數(shù)據(jù)的元數(shù)據(jù)信息)的影響,代碼如下
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
if (blockInfos.nonEmpty) {
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) }
if (validBlockIds.size != blockIds.size) {
logWarning("Some blocks could not be recovered as they were not found in memory. " +
"To prevent such data loss, enabled Write Ahead Log (see programming guide " +
"for more details.")
}
new BlockRDD[T](ssc.sc, validBlockIds)
}
} else {
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
new BlockRDD[T](ssc.sc, Array.empty)
}
}
}
如果blockInfos為空,BlockRDD的分區(qū)數(shù)也為空,所以要判斷BlockRDD的分區(qū)數(shù)。這里只判斷了當(dāng)前rdd的父RDD分區(qū)是否為空,因為父RDD和BlockRDD在同一個stage內(nèi),分區(qū)數(shù)是一致的。RDD的依賴關(guān)系可以通過rdd.toDebugString和web頁面獲得,stage劃分也可以通過web頁面獲得。
- 第二種情況,以Direct kafka的方式接收數(shù)據(jù)的方式,計算WordCount為例,代碼如下
object DirectKafkaDemo{
def main(args: Array[String]) {
val topics = "DirectKafkaDemo"
val brokers = "*:9092,*:9092"
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
sparkConf.set("spark.testing.memory", "2147480000")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val result = messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.foreachRDD(rdd => {
val num= rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.count()
if(num>0) {
rdd.foreachPartition(data => {
val conn = MDBManager.getConnection
conn.setAutoCommit(false)
val sql = "insert into word set key1=?,num=?;"
val preparedStatement = conn.prepareStatement(sql)
data.foreach(recode => {
val key = recode._1;
val num = recode._2;
preparedStatement.setString(1, key)
preparedStatement.setInt(2, num)
preparedStatement.addBatch()
println("key:" + key + "\tnum:" + num)
})
preparedStatement.executeBatch()
conn.commit()
conn.close()
})
}else{
println(">>>>>>>>>>>>>>>>>>>>>>RDD Empty")
}
})
ssc.start()
ssc.awaitTermination()
}
}
這里使用了KafkaRDD的count操作來判斷KafkaRDD是否為空,如果不為空,將計算結(jié)果保存到數(shù)據(jù)庫中,減少不必要是數(shù)據(jù)庫操作。獲取KafkaRDD的代碼如下,不同代碼編寫RDD的依賴關(guān)系是不一樣的,要根據(jù)代碼而定
val num= rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.dependencies(0).rdd.count()
看一下KafkaRDD的count()方法,他重寫了RDD的count方法,代碼如下
override def count(): Long = offsetRanges.map(_.count).sum
他并沒有觸發(fā)一個runJob操作,而是通過讀取kafka分區(qū)的offset偏移量來計算RDD記錄的個數(shù),這里是利用了kafka的特性。通過依賴關(guān)系找到KafkaRDD,然后調(diào)用KafkaRDD的count()方法,就知道KafkaRDD是否為空,如果KafkaRDD為空,就沒必要runJob了。
那么判斷KafkaRDD的分區(qū)數(shù)是否也可以,看一下KafkaRDD的分區(qū)數(shù)是怎么得來的,代碼如下
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
和offsetRanges的數(shù)量有關(guān),因為offsetRanges是根據(jù)kafka的分區(qū)數(shù)而來,offsetRanges的數(shù)量是固定不變的,從而KafkaRDD的分區(qū)數(shù)是固定的,不管分區(qū)有沒有數(shù)據(jù),因此不能判斷KafkaRDD的分區(qū)數(shù)
總結(jié)
不同數(shù)據(jù)接收方式的RDD,表現(xiàn)數(shù)據(jù)為空都可能是不一樣的,通過RDD的依賴關(guān)系正確找到數(shù)據(jù)源RDD是最關(guān)鍵的。此方法使用一定要結(jié)合業(yè)務(wù)和RDD的具體生成方式,這里說的依賴關(guān)系都是之有一個父RDD,如果有多個父RDD要根據(jù)情況決定是否可以使用此方法。