本文基于Spark 2.11
1. 前言
Spark Streaming(1)中提到JobScheduler使用JobGenerator可以每隔一段時(shí)間根據(jù)DStream DAG創(chuàng)建出RDD DAG,并提交job,本文主要介紹JobScheduler的細(xì)節(jié)。
2. JobScheduler
JobScheduler在StreamingContext調(diào)用start時(shí)啟動(dòng),啟動(dòng)序列如下:
StreamingContext#start
->JobScheduler#start
-> ReceiverTracker#start
->JobGenerator#start
JobScheduler有如下成員:
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
- jobSets。
job生成時(shí)間到j(luò)obs的映射,JobGenerator調(diào)用DStreamGraph為持有的每一個(gè)DStream DAG生成一個(gè)job返回給JobGenerator,JobGenerator將時(shí)間以及生成的jobs反饋給Jobscheudler,保存在jobSets里。JobGenerator并沒(méi)有提交job,job是由JobScheudler提交的。 - numConcurrentJobs
控制同時(shí)能運(yùn)行的job數(shù)量。 - jobExecutor
線程池,由numConccurrentJobs控制線程數(shù)量,jobExecutor里提交job并等待結(jié)果。由于等待結(jié)果是一個(gè)阻塞操作,所以一個(gè)線程同時(shí)只能提交一個(gè)job - jobGenerator
JobScheduler委托用來(lái)生成job - receiverTracker,JobScheduler啟動(dòng),接收Receiver上報(bào)的數(shù)據(jù)batch信息。
3. JobGenerator生成job
上面說(shuō)到JobScheduler委托JobGenerator生成job,
下面是JobGenerator的核心成員:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
private var eventLoop: EventLoop[JobGeneratorEvent] = null
// last batch whose completion,checkpointing and metadata cleanup has been completed
- timer
定時(shí)器,JobGenerator定時(shí)生成job,時(shí)間間隔batchDuration就是創(chuàng)建StreamingContext是傳入的,這個(gè)timer每隔timeDuration時(shí)間網(wǎng)eventLoop中發(fā)送一條生成job的消息。 - eventLoop
一直運(yùn)行,接收消息,做出處理。接受的消息類型有:
- GenerateJobs, 使用DSteamGraph生成job
- DoCheckpoint,提交新的job去做checkpoint
- ClearCheckpointData,DoCheckpoint都是在job完成后清楚信息的
生成job
timer定時(shí)器每隔batchDuration往eventLoop發(fā)送GenerateJob事件生成job,下面是eventLoop時(shí)間主循環(huán)中處理GenerateJob事件調(diào)用如下:
eventLoop#processEvent
--> jobGenerator#generateJobs
下面是JobGenerator的generateJobs
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//將jobs反饋給JobScheudler,等待調(diào)度
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
- receiverTracker.allocateBlocksToBatch(time)根據(jù)當(dāng)前時(shí)間time,從已經(jīng)匯報(bào)的數(shù)據(jù)中生成數(shù)據(jù)塊,后續(xù)根據(jù)DStream生成RDD的數(shù)據(jù)就是根據(jù)time檢索到本次生成的數(shù)據(jù)塊
- graph.generateJobs生成jobs
- jobScheduler.submitJobSet,反饋給Jobscheudler等待人物調(diào)度
- eventLoop.post,創(chuàng)建job做checkpoint
第二步創(chuàng)建中創(chuàng)建job有如下調(diào)用序列:
DStreamGraph#generateJobs
->DStream#generateJob
//DStream#generateJob
private[streaming] def generateJob(time: Time): Option[Job] = {
// 將DStream轉(zhuǎn)換成RDD
getOrCompute(time) match {
case Some(rdd) =>
// 此處創(chuàng)建了函數(shù),函數(shù)里基于當(dāng)前RDD提交了job
// JobScheduler在jobExecutor線程池中調(diào)度job時(shí),該函數(shù)會(huì)執(zhí)行
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
使用time,以及一個(gè)jobFunc的函數(shù)創(chuàng)建Job,jobFunc在調(diào)度時(shí)執(zhí)行。
4. JobScheduler調(diào)度job
3中面提到JobGenerator生成jobs并將生成的job反饋給JobScheduler,2中說(shuō)到到JobScheduler使用jobExecutor調(diào)度job
下面是JobScheduler的submitJobSet方法:
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
上面代碼中jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))對(duì)JobGenerator傳遞過(guò)來(lái)的每一個(gè)job包裝成JobHandler,然后在jobExecutor線程池中調(diào)度執(zhí)行。
JobHandler實(shí)現(xiàn)了Runnable接口,是的能在線程池中運(yùn)行,它的run方法如下:
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
調(diào)用Job#run方法,run方法中執(zhí)行jobFunc完成job的提交。
job 并行度的控制
JobScheduler的成員numConcurrentJobs控制同時(shí)能有多少stream job在運(yùn)行,numConcurrentJobs通過(guò)spark.streaming.concurrentJobs配置項(xiàng)獲取,默認(rèn)為1. numCOncurrentJobs控制jobExecutor線程池中線程的數(shù)量從而實(shí)現(xiàn)控制同時(shí)運(yùn)行的JobHandler數(shù)量(而一個(gè)JobHandler封裝一個(gè)job)。