Spark/Spark Streaming transform 是一個很強的方法,不過使用過程中可能也有一些值得注意的問題。在分析的問題,我們還會順帶討論下Spark Streaming 生成job的邏輯,從而讓大家知道問題的根源。
問題描述
今天有朋友貼了一段 gist,大家可以先看看這段代碼有什么問題。
特定情況你會發(fā)現(xiàn)UI 的Storage標簽上有很多新的Cache RDD,然后你以為是Cache RDD 不被釋放,但是通過Spark Streaming 數(shù)據(jù)清理機制分析我們可以排除這個問題。
接著通過給RDD的設(shè)置名字,名字帶上時間,發(fā)現(xiàn)是延時的Batch 也會產(chǎn)生cache RDD。那這是怎么回事呢?
另外還有一個問題,也是相同的原因造成的:我通過KafkaInputStream.transform 方法獲取Kafka偏移量,并且保存到HDFS上。然后發(fā)現(xiàn)一旦產(chǎn)生job(包括并沒有執(zhí)行的Job),都會生成了Offset,這樣如果出現(xiàn)宕機,你看到的最新Offset 其實就是延時的,而不是出現(xiàn)故障時的Offset了。這樣做恢復就變得困難了。
問題分析
其實是這樣,在transform里你可以做很多復雜的工作,但是transform接受到的函數(shù)比較特殊,是會在TransformedDStream.compute方法中執(zhí)行的,你需要確保里面的動作都是transformation(延時的),而不能是Action(譬如第一個例子里的count動作),或者不能有立即執(zhí)行的(比如我提到的例子里的自己通過HDFS API 將Kafka偏移量保存到HDFS)。
override def compute(validTime: Time): Option[RDD[U]] = {
val parentRDDs = parents.map { parent =>
....
//看這一句,你的函數(shù)在調(diào)用compute方法時,就會被調(diào)用
val transformedRDD = transformFunc(parentRDDs, validTime)
if (transformedRDD == null) {
throw new SparkException.....
}
Some(transformedRDD)
}
這里有兩個疑問:
- 那些.map .transform 都是transformation,不是只有真實被提交后才會被執(zhí)行么?
- DStream.compute 方法為什么會在generateJob的時候就被調(diào)用呢?
Spark Streaming generateJob 邏輯解析
在JobGenerator中,會定時產(chǎn)生一個GenerateJobs的事件:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
該事件會被DStreamGraph.generateJobs 處理,產(chǎn)生Job的邏輯 也很簡單,
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
........
}
就是調(diào)用各個outputStream 的generateJob方法,典型的outputStream如ForEachDStream。 以ForEachDStream為例,產(chǎn)生job的方式如下:
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
我們看到,在這里會觸發(fā)所有的DStream鏈進行compute動作。也就意味著所有transformation產(chǎn)生的DStream的compute方法都會被調(diào)用。
正常情況下不會有什么問題,比如.map(func) 產(chǎn)生的MappedDStream里面在compute執(zhí)行時,func 都是被記住而不是被執(zhí)行。但是TransformedDStream 是比較特殊的,對應(yīng)的func是會被執(zhí)行的,在對應(yīng)的compute方法里,你會看到這行代碼:
val transformedRDD = transformFunc(parentRDDs, validTime)
這里的transformFunc 就是transform(func)里的func了。然而transform 又特別靈活,可以執(zhí)行各種RDD操作,這個時候Spark Streaming 是攔不住你的,一旦你使用了count之類的Action,產(chǎn)生Job的時候就會被立刻執(zhí)行,而不是等到Job被提交才執(zhí)行。