sparkstreaming和kafka整合

Spark Streaming提供了一個叫做DStream(Discretized Stream)的高級抽象,DStream表示一個持續(xù)不斷輸入的數(shù)據(jù)流,可以基于Kafka、TCP Socket、Flume等輸入數(shù)據(jù)流創(chuàng)建。在內(nèi)部,一個DStream實際上是由一個RDD序列組成的。

Spark Streaming提供了一個叫做DStream(Discretized Stream)的高級抽象,DStream表示一個持續(xù)不斷輸入的數(shù)據(jù)流,可以基于Kafka、TCP Socket、Flume等輸入數(shù)據(jù)流創(chuàng)建。在內(nèi)部,一個DStream實際上是由一個RDD序列組成的。Sparking Streaming是基于Spark平臺的,也就繼承了Spark平臺的各種特性,如容錯(Fault-tolerant)、可擴展(Scalable)、高吞吐(High-throughput)等。

在Spark Streaming中,每個DStream包含了一個時間間隔之內(nèi)的數(shù)據(jù)項的集合,我們可以理解為指定時間間隔之內(nèi)的一個batch,每一個batch就構成一個RDD數(shù)據(jù)集,所以DStream就是一個個batch的有序序列,時間是連續(xù)的,按照時間間隔將數(shù)據(jù)流分割成一個個離散的RDD數(shù)據(jù)集.

其實就是一個定時去kafka中消費數(shù)據(jù)的定時器,只不過數(shù)據(jù)是保存在rdd中而已。

object UserClickCountAnalytics {

def main(args: Array[String]): Unit = {

var masterUrl = "local[1]"

if (args.length > 0) {

masterUrl = args(0)

}

// Create a StreamingContext with the given master URL

val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")

val ssc = new StreamingContext(conf, Seconds(5))

// Kafka configurations

val topics = Set("user_events")

val brokers = "10.10.4.126:9092,10.10.4.127:9092"

val kafkaParams = Map[String, String](

"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

val dbIndex = 1

val clickHashKey = "app::users::click"

--------------------------------------------------------------------------

// Create a direct stream

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

val events = kafkaStream.flatMap(line => {

val data = JSONObject.fromObject(line._2)

Some(data)

})

//也可以這樣建立stream對象

val ssc:StreamingContext=???

val kafkaParams:Map[String,String]=Map("group.id"->"terran",...)

val numDStreams=5

val topics=Map("zerg.hydra"->1)

val kafkaDStreams=(1to numDStreams).map{_=>KafkaUtils.createStream(ssc,kafkaParams,topics,...)}

//> collection of five *input* DStreams = handled by five receivers/tasks

val unionDStream=ssc.union(kafkaDStreams)// often unnecessary, just showcasing how to do it

//> single?DStream

val processingParallelism=20

val processingDStream=unionDStream(processingParallelism)

//> single DStream but now with 20 partitions

---------------------------------------------------------------------

// Compute user click times

/**? ??

?userClicks.foreachRDD拿到的是微批處理一個批次數(shù)據(jù)? ?

?rdd.foreachPartition拿到的是一個批次在Spark各節(jié)點對應的分區(qū)數(shù)據(jù)? ? partitionOfRecords.foreach拿到對應分區(qū)的每條數(shù)據(jù) */

val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)

//通過foreachRDD來保存處理結果

userClicks.foreachRDD(rdd => {

rdd.foreachPartition(partitionOfRecords => {

partitionOfRecords.foreach(pair => {

val uid = pair._1

val clickCount = pair._2

val jedis = RedisClient.pool.getResource

jedis.select(dbIndex)

jedis.hincrBy(clickHashKey, uid, clickCount)

RedisClient.pool.returnResource(jedis)

})

})

})

ssc.start()

ssc.awaitTermination()

}

}

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容