假設(shè)你已經(jīng)了解job是如何被劃分及提交的,若不了解請前往spark streaming 流程詳解
當前位置是JobGenerator類的generateJobs的方法,我們重點看上面的generateJobs方法中的這一段:
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
graph.generateJobs(time)
allocateBlocksToBatch這里做了什么呢?
//我們可以看到receiverTracker中這個方法的實現(xiàn):
def allocateBlocksToBatch(batchTime: Time): Unit = {
if (receiverInputStreams.nonEmpty) {
receivedBlockTracker.allocateBlocksToBatch(batchTime)
}
}
通過receivedBlockTracker將真正的數(shù)據(jù):block,和batch對應(yīng)起來。其中receivedBlockTracker有兩個map用來存放block信息:streamIdToUnallocatedBlockQueues、timeToAllocatedBlocks。詳細不講啦,有興趣的可以自己扒代碼
??接下來看下:graph.generateJobs(time),這里DStreamGraph做了什么事情呢?
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
}
}
jobs
}
可以看到原來是調(diào)用了outputstream的generateJob的方法,我們看一個outputstream的實現(xiàn),如:ForEachDStream
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
....
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)//這里很有意思,以裝飾器的模式執(zhí)行了我們代碼里寫的那一堆對rdd的處理
}
Some(new Job(time, jobFunc))
case None => None
}
}
}
在這里我們可以看到,outputstream其實是調(diào)用了parent的Compute方法,一層一層遞歸,最會會調(diào)用到inputstream的Compute方法,那么我們一起看下inputstream的Compute方:
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
createBlockRDD(validTime, blockInfos)
}
Some(blockRDD)
}
可以看到首先是通過receiverTracker獲取到了batch的所有block info,然后new了rdd,這就跟前面的receiverTracker.allocateBlocksToBatch(time)對上了