一、問題
對實(shí)時流量日志過濾篩選商戶流量,對每個商戶的流量進(jìn)行累計,統(tǒng)計商戶實(shí)時累計流量。
當(dāng)時間超過24時時,重新統(tǒng)計當(dāng)日商戶的實(shí)時累計流量。
二、實(shí)現(xiàn)步驟
1、采用Spark Streaming讀取Kafka中的實(shí)時日志流,生成DStream
2、過濾其中的商戶頁流量,生成DStream[k,v] (注:k為shopid, v為pv)
3、采用Spark Streaming中DStream[k,v]的mapWithState方法生成商戶累計流量MapWithStateDStream
4、通過調(diào)用StreamingContext中的awaitTerminationOrTimeout(time) 方法設(shè)置當(dāng)前StreamingContext的終止時間實(shí)現(xiàn)在每天24時終止所有上述DStream計算。
5、調(diào)用StreamingContext中的stop方法,終止StreamingContext。調(diào)用stop方法默認(rèn)會終止SparkContext,設(shè)置stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)參數(shù),可以實(shí)現(xiàn)不終止SparkContext,同時能夠保持StreamingContext已經(jīng)接受的Batch能夠處理完成后再終止StreamingContext
6、重復(fù)1~5,即可以再次日0時自動生成新的StreamingContext統(tǒng)計當(dāng)日商戶累計流量
三、案例代碼
package com.demo.data
import java.util
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
import com.demo.data.kafka.KafkaService
import com.demo.data.util.Constants
/**
* Created by phycsgy on 17/2/13.
*/
object KafkaToRedis extends App with Logging{
val conf = new SparkConf().setAppName("SparkStreamingKafka")
val sc = new SparkContext(conf)
//過濾商戶頁P(yáng)V流量
def shopTrafficFilter(log:String):Boolean = {
(log contains "\"element_id\":\"pageview\"") &
(log contains "\"page_name\":\"shopinfo\"") &
("\"shop_id\":\"[0-9]+\"".r findFirstIn log).nonEmpty
}
//正則表達(dá)式提取shopid
def shopInfoExtract(log:String) = {
val parttern = "\"shop_id\":\"([0-9]+)\"".r
val matchResult = parttern findFirstMatchIn log
Tuple2(matchResult.get.group(1),1)
}
//計算當(dāng)前時間距離次日凌晨的時長(毫秒數(shù))
def resetTime = {
val now = new Date()
val tomorrowMidnight = new Date(now.getYear,now.getMonth,now.getDate+1)
tomorrowMidnight.getTime - now.getTime
}
//商戶實(shí)時流量狀態(tài)更新函數(shù)
val mapFuction = (shopid: String, pv: Option[Int], state: State[Int]) => {
val accuSum = pv.getOrElse(0) + state.getOption().getOrElse(0)
val output = (shopid,accuSum)
state.update(accuSum)
output
}
val stateSpec = StateSpec.function(mapFuction)
while(true){
val ssc = new StreamingContext(sc, Seconds(30))
ssc.checkpoint("./")
val kafkaService = new KafkaService
val topicName = "log.traffic_data"
//從kafka讀取日志流
val kafkaStream = kafkaService.getKafkaStream[String, StringDecoder](ssc, topicName, Constants.KAFKA_LARGEST_OFFSET)
//過濾商戶頁實(shí)時流量
val shopTrafficStream = kafkaStream.map(msg => msg._2).filter(shopTrafficFilter).map(shopInfoExtract)
//生成商戶頁流量實(shí)時累計狀態(tài)
val shopTrafficUpdateStateDStream = shopTrafficStream.mapWithState(stateSpec).stateSnapshots()
//展示商戶頁實(shí)時累計流量TOP10的商戶
shopTrafficUpdateStateDStream.foreachRDD{
rdd => {
//取TOP10商戶
rdd.top(10)(/*自定義排序方法*/TopElementOrdering)
.foreach(item => println(item))
}
}
ssc.start()
//
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false,true)
}
}