Spark Streaming 實(shí)時統(tǒng)計商戶當(dāng)日累計PV流量

一、問題

對實(shí)時流量日志過濾篩選商戶流量,對每個商戶的流量進(jìn)行累計,統(tǒng)計商戶實(shí)時累計流量。

當(dāng)時間超過24時時,重新統(tǒng)計當(dāng)日商戶的實(shí)時累計流量。

二、實(shí)現(xiàn)步驟

1、采用Spark Streaming讀取Kafka中的實(shí)時日志流,生成DStream

2、過濾其中的商戶頁流量,生成DStream[k,v] (注:k為shopid, v為pv)

3、采用Spark Streaming中DStream[k,v]的mapWithState方法生成商戶累計流量MapWithStateDStream

4、通過調(diào)用StreamingContext中的awaitTerminationOrTimeout(time) 方法設(shè)置當(dāng)前StreamingContext的終止時間實(shí)現(xiàn)在每天24時終止所有上述DStream計算。

5、調(diào)用StreamingContext中的stop方法,終止StreamingContext。調(diào)用stop方法默認(rèn)會終止SparkContext,設(shè)置stop(stopSparkContext:Boolean = false,stopGracefully:Boolean = true)參數(shù),可以實(shí)現(xiàn)不終止SparkContext,同時能夠保持StreamingContext已經(jīng)接受的Batch能夠處理完成后再終止StreamingContext

6、重復(fù)1~5,即可以再次日0時自動生成新的StreamingContext統(tǒng)計當(dāng)日商戶累計流量

三、案例代碼

package com.demo.data

import java.util
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{Logging, SparkConf}
import com.demo.data.kafka.KafkaService
import com.demo.data.util.Constants

/**
  * Created by phycsgy on 17/2/13.
  */

object KafkaToRedis extends App with Logging{

    val conf = new SparkConf().setAppName("SparkStreamingKafka")
    val sc = new SparkContext(conf)

    //過濾商戶頁P(yáng)V流量
    def shopTrafficFilter(log:String):Boolean = {
        (log contains "\"element_id\":\"pageview\"") &
        (log contains "\"page_name\":\"shopinfo\"") &
        ("\"shop_id\":\"[0-9]+\"".r findFirstIn log).nonEmpty
    }

    //正則表達(dá)式提取shopid
    def shopInfoExtract(log:String) = {
      val parttern = "\"shop_id\":\"([0-9]+)\"".r
      val matchResult = parttern findFirstMatchIn log
      Tuple2(matchResult.get.group(1),1)
    }

    //計算當(dāng)前時間距離次日凌晨的時長(毫秒數(shù))
    def resetTime = {
      val now = new Date()
      val tomorrowMidnight = new Date(now.getYear,now.getMonth,now.getDate+1)
      tomorrowMidnight.getTime - now.getTime

    }

    //商戶實(shí)時流量狀態(tài)更新函數(shù)
    val mapFuction = (shopid: String, pv: Option[Int], state: State[Int]) => {
      val accuSum = pv.getOrElse(0) + state.getOption().getOrElse(0)
      val output = (shopid,accuSum)
      state.update(accuSum)
      output
    }

    val stateSpec = StateSpec.function(mapFuction)

    while(true){

      val ssc = new StreamingContext(sc, Seconds(30))
      ssc.checkpoint("./")
      val kafkaService = new KafkaService
      val topicName = "log.traffic_data"
      //從kafka讀取日志流
      val kafkaStream = kafkaService.getKafkaStream[String, StringDecoder](ssc, topicName, Constants.KAFKA_LARGEST_OFFSET)
      //過濾商戶頁實(shí)時流量
      val shopTrafficStream = kafkaStream.map(msg => msg._2).filter(shopTrafficFilter).map(shopInfoExtract)
      //生成商戶頁流量實(shí)時累計狀態(tài)
      val shopTrafficUpdateStateDStream = shopTrafficStream.mapWithState(stateSpec).stateSnapshots()
      //展示商戶頁實(shí)時累計流量TOP10的商戶
      shopTrafficUpdateStateDStream.foreachRDD{
        rdd => {
          //取TOP10商戶
          rdd.top(10)(/*自定義排序方法*/TopElementOrdering)
            .foreach(item => println(item))
        }
      }

      ssc.start()
      //
      ssc.awaitTerminationOrTimeout(resetTime)
      ssc.stop(false,true)

    }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容