Spark Streaming提供了一個叫做DStream(Discretized Stream)的高級抽象,DStream表示一個持續(xù)不斷輸入的數(shù)據(jù)流,可以基于Kafka、TCP Socket、Flume等輸入數(shù)據(jù)流創(chuàng)建。在內(nèi)部,一個DStream實際上是由一個RDD序列組成的。
Spark Streaming提供了一個叫做DStream(Discretized Stream)的高級抽象,DStream表示一個持續(xù)不斷輸入的數(shù)據(jù)流,可以基于Kafka、TCP Socket、Flume等輸入數(shù)據(jù)流創(chuàng)建。在內(nèi)部,一個DStream實際上是由一個RDD序列組成的。Sparking Streaming是基于Spark平臺的,也就繼承了Spark平臺的各種特性,如容錯(Fault-tolerant)、可擴展(Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每個DStream包含了一個時間間隔之內(nèi)的數(shù)據(jù)項的集合,我們可以理解為指定時間間隔之內(nèi)的一個batch,每一個batch就構成一個RDD數(shù)據(jù)集,所以DStream就是一個個batch的有序序列,時間是連續(xù)的,按照時間間隔將數(shù)據(jù)流分割成一個個離散的RDD數(shù)據(jù)集.
其實就是一個定時去kafka中消費數(shù)據(jù)的定時器,只不過數(shù)據(jù)是保存在rdd中而已。
object UserClickCountAnalytics {
def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}
// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))
// Kafka configurations
val topics = Set("user_events")
val brokers = "10.10.4.126:9092,10.10.4.127:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val dbIndex = 1
val clickHashKey = "app::users::click"
--------------------------------------------------------------------------
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})
//也可以這樣建立stream對象
val ssc:StreamingContext=???
val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)
val numDStreams=5
val topics=Map("zerg.hydra"->1)
val kafkaDStreams=(1to numDStreams).map{_=>KafkaUtils.createStream(ssc,kafkaParams,topics,...)}
//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream=ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it
//> single?DStream
val processingParallelism=20
val processingDStream=unionDStream(processingParallelism)
//> single DStream but now with 20 partitions
---------------------------------------------------------------------
// Compute user click times
/**? ??
?userClicks.foreachRDD拿到的是微批處理一個批次數(shù)據(jù)? ?
?rdd.foreachPartition拿到的是一個批次在Spark各節(jié)點對應的分區(qū)數(shù)據(jù)? ? partitionOfRecords.foreach拿到對應分區(qū)的每條數(shù)據(jù) */
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
//通過foreachRDD來保存處理結果
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.select(dbIndex)
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}