Spark Streaming 誤用.transform(func)函數(shù)導致的問題解析

Spark/Spark Streaming transform 是一個很強的方法,不過使用過程中可能也有一些值得注意的問題。在分析的問題,我們還會順帶討論下Spark Streaming 生成job的邏輯,從而讓大家知道問題的根源。

問題描述

今天有朋友貼了一段 gist,大家可以先看看這段代碼有什么問題。

特定情況你會發(fā)現(xiàn)UI 的Storage標簽上有很多新的Cache RDD,然后你以為是Cache RDD 不被釋放,但是通過Spark Streaming 數(shù)據(jù)清理機制分析我們可以排除這個問題。

接著通過給RDD的設(shè)置名字,名字帶上時間,發(fā)現(xiàn)是延時的Batch 也會產(chǎn)生cache RDD。那這是怎么回事呢?

另外還有一個問題,也是相同的原因造成的:我通過KafkaInputStream.transform 方法獲取Kafka偏移量,并且保存到HDFS上。然后發(fā)現(xiàn)一旦產(chǎn)生job(包括并沒有執(zhí)行的Job),都會生成了Offset,這樣如果出現(xiàn)宕機,你看到的最新Offset 其實就是延時的,而不是出現(xiàn)故障時的Offset了。這樣做恢復就變得困難了。

問題分析

其實是這樣,在transform里你可以做很多復雜的工作,但是transform接受到的函數(shù)比較特殊,是會在TransformedDStream.compute方法中執(zhí)行的,你需要確保里面的動作都是transformation(延時的),而不能是Action(譬如第一個例子里的count動作),或者不能有立即執(zhí)行的(比如我提到的例子里的自己通過HDFS API 將Kafka偏移量保存到HDFS)。

override def compute(validTime: Time): Option[RDD[U]] = {
    val parentRDDs = parents.map { parent => 
    ....
  //看這一句,你的函數(shù)在調(diào)用compute方法時,就會被調(diào)用
    val transformedRDD = transformFunc(parentRDDs, validTime)
    if (transformedRDD == null) {
      throw new SparkException.....
    }
    Some(transformedRDD)
  }

這里有兩個疑問:

  • 那些.map .transform 都是transformation,不是只有真實被提交后才會被執(zhí)行么?
  • DStream.compute 方法為什么會在generateJob的時候就被調(diào)用呢?

Spark Streaming generateJob 邏輯解析

在JobGenerator中,會定時產(chǎn)生一個GenerateJobs的事件:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

該事件會被DStreamGraph.generateJobs 處理,產(chǎn)生Job的邏輯 也很簡單,

def generateJobs(time: Time): Seq[Job] = {   
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        ........    
  }

就是調(diào)用各個outputStream 的generateJob方法,典型的outputStream如ForEachDStream。 以ForEachDStream為例,產(chǎn)生job的方式如下:

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

我們看到,在這里會觸發(fā)所有的DStream鏈進行compute動作。也就意味著所有transformation產(chǎn)生的DStream的compute方法都會被調(diào)用。

正常情況下不會有什么問題,比如.map(func) 產(chǎn)生的MappedDStream里面在compute執(zhí)行時,func 都是被記住而不是被執(zhí)行。但是TransformedDStream 是比較特殊的,對應(yīng)的func是會被執(zhí)行的,在對應(yīng)的compute方法里,你會看到這行代碼:

val transformedRDD = transformFunc(parentRDDs, validTime)

這里的transformFunc 就是transform(func)里的func了。然而transform 又特別靈活,可以執(zhí)行各種RDD操作,這個時候Spark Streaming 是攔不住你的,一旦你使用了count之類的Action,產(chǎn)生Job的時候就會被立刻執(zhí)行,而不是等到Job被提交才執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容