—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過(guò)程
Spark Streaming Receiver啟動(dòng)過(guò)程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析
—————?—————?—————?—————?—————?—————
Receiver是數(shù)據(jù)準(zhǔn)備階段的一個(gè)主要組件,其負(fù)載接入外部數(shù)據(jù),其生命周期由ReceiverTracker負(fù)責(zé)管理。
Receiver的啟動(dòng)
1. Receiver抽取與Executor準(zhǔn)備
“Spark Streaming 初始化過(guò)程”中提到 JobScheduler在啟動(dòng)時(shí)會(huì)創(chuàng)建和啟動(dòng)ReceiverTracker.
在ReceiverTracker創(chuàng)建時(shí),其會(huì)從DStreamGraph中抽取出ReceiverInputStream,以便在啟動(dòng)Receiver時(shí)從中抽取出Receiver,然后一一啟動(dòng)。
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
在ReceiverTracker啟動(dòng)時(shí),其主要做如下兩件事:
- 創(chuàng)建ReceiverTrackerEndpoint,用于接收Receiver的信息
- 啟動(dòng)Receiver.
ReceiverTracker的Start方法如下所示:
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
其中 launchReceivers()方法用于啟動(dòng)Receiver, 其代碼如下:
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
}
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
// 發(fā)送啟動(dòng)指令
endpoint.send(StartAllReceivers(receivers))
}
此方法的主要操作有:
- 從ReceiverInputStreams中抽取Receiver, 并將streamId做為Receiver的id.
- 執(zhí)行runDummySparkJob,此方法是執(zhí)行一個(gè)簡(jiǎn)單的SparkJob,目的是為確保應(yīng)用申請(qǐng)的Executor的最小份額得以滿足,最小份額由參數(shù)“spark.cores.max” 和 “spark.scheduler.minRegisteredResourcesRatio” 共同決定,默認(rèn)為申請(qǐng)的所有Executor。當(dāng)應(yīng)用已獲得的Executor數(shù)量小于最小份額時(shí),Job將阻塞并等待Executor注冊(cè),直到滿足其運(yùn)行需要的最小限額。
runDummySparkJob的代碼如下:
/**
* Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
* receivers to be scheduled on the same node.
*
* TODO Should poll the executor number and wait for executors according to
* "spark.scheduler.minRegisteredResourcesRatio" and
* "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
*/
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}
程序邏輯非常簡(jiǎn)單,目的是使其在不消耗過(guò)多資源的情況下,可以保證在調(diào)度Recevier時(shí),已有大量的Executor注冊(cè)完成,從而使Recevier調(diào)度時(shí)盡量均勻的調(diào)度至不同的Executor 。
- 向ReceiverTrackerEndpoint發(fā)送啟動(dòng)所有executor指令(StartAllReceivers)
在ReceiverTrackerEndpoint收到StartAllReceivers指令后,其將
- 調(diào)度Receiver: 為Receiver設(shè)置執(zhí)行位置信息
- 啟動(dòng)Receiver
其實(shí)現(xiàn)邏輯如下:
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
2. Receiver 調(diào)度
Receiver調(diào)度工作由ReceiverSchedulingPolicy進(jìn)行,對(duì)Receiver的調(diào)度工作主要可以分為如下兩個(gè)階段:
- 全局調(diào)度階段
此階段發(fā)生在首次調(diào)度Receiver時(shí),此階段會(huì)保證receivers盡量均勻的分散在Executors中。調(diào)度過(guò)程中會(huì)為每一個(gè)Receiver指定啟動(dòng)的位置信息(location) - 局部調(diào)度階段
此階段發(fā)生在Receiver重啟時(shí),僅需啟動(dòng)失敗Receiver
全局調(diào)度階段是必然會(huì)發(fā)生的,因此將以這種情況為例對(duì)Receiver調(diào)度進(jìn)行詳細(xì)說(shuō)明。其調(diào)度過(guò)程如下:
- 獲取所有executor的主要地址信息
- 創(chuàng)建numReceiversOnExecutor用于記錄每個(gè)Executor分配的Receiver數(shù)目
- 創(chuàng)建scheduledLocations用于記錄用戶指定偏好位置的Receiver
- 調(diào)度指定preferredLocation信息的Receiver. 遍歷Receivers, 為用戶指定的preferredLocation的主機(jī)中選擇啟動(dòng)Receiver數(shù) 最少的Executor做為當(dāng)前Receiver啟動(dòng)位置,并更新記錄scheduledLocations 和numReceiversOnExecutor。
- 調(diào)度未指定preferredLocation信息的Receiver.
將Executor依照分配的Receiver數(shù)目從小到大排序,為Receiver分配一個(gè)Executor. - 若還有剩余Executor, 將這些Executor 加入到擁有最少候選對(duì)象的Receiver列表中。
至此, Receiver與與Executor的關(guān)聯(lián)聯(lián)系建立完畢。
調(diào)度的實(shí)現(xiàn)代碼如下所示:
/**
* Try our best to schedule receivers with evenly distributed. However, if the
* `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
* because we have to respect them.
*
* Here is the approach to schedule executors:
* <ol>
* <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
* executors running on those host.</li>
* <li>Then, schedule all other receivers evenly among all the executors such that overall
* distribution over all the receivers is even.</li>
* </ol>
*
* This method is called when we start to launch receivers at the first time.
*
* @return a map for receivers and their scheduled locations
*/
def scheduleReceivers(
receivers: Seq[Receiver[_]],
executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
if (receivers.isEmpty) {
return Map.empty
}
if (executors.isEmpty) {
return receivers.map(_.streamId -> Seq.empty).toMap
}
val hostToExecutors = executors.groupBy(_.host)
val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
// Set the initial value to 0
executors.foreach(e => numReceiversOnExecutor(e) = 0)
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- 0 until receivers.length) {
// Note: preferredLocation is host but executors are host_executorId
receivers(i).preferredLocation.foreach { host =>
hostToExecutors.get(host) match {
case Some(executorsOnHost) =>
// preferredLocation is a known host. Select an executor that has the least receivers in
// this host
val leastScheduledExecutor =
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
scheduledLocations(i) += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) =
numReceiversOnExecutor(leastScheduledExecutor) + 1
case None =>
// preferredLocation is an unknown host.
// Note: There are two cases:
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
// Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
// this case
scheduledLocations(i) += TaskLocation(host)
}
}
}
// For those receivers that don't have preferredLocation, make sure we assign at least one
// executor to them.
for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
// Select the executor that has the least receivers
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
scheduledLocationsForOneReceiver += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
}
// Assign idle executors to receivers that have less executors
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
for (executor <- idleExecutors) {
// Assign an idle executor to the receiver that has least candidate executors.
val leastScheduledExecutors = scheduledLocations.minBy(_.size)
leastScheduledExecutors += executor
}
receivers.map(_.streamId).zip(scheduledLocations).toMap
}
此實(shí)現(xiàn),存在一個(gè)問(wèn)題,如果Receiver設(shè)置了preferredLocation且preferredLocation所對(duì)應(yīng)的主機(jī)存在此應(yīng)用的Executor的情況下,也不一定保證Receiver調(diào)度至此Executor.
3. Receiver 啟動(dòng)
在為Receiver設(shè)置完啟動(dòng)位置之后,將調(diào)用startReceiver方法啟動(dòng)Receiver, 啟動(dòng)過(guò)程如下:
- 依據(jù)preferredLocation將Receiver包裝成RDD
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
- 以SparkJob的形式提交作業(yè), Receiver作為T(mén)ask 以線程方式執(zhí)行
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
- Task執(zhí)行, 執(zhí)行的startReceiverFunc方法,該方法會(huì)創(chuàng)建并啟動(dòng)ReceiverSupervisorImpl(Job及Task調(diào)度過(guò)程此處不再詳細(xì)說(shuō)明,同批處理)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
其中ReceiverSupervisorImpl 提供了處理Receiver接收數(shù)據(jù)的所有必要的方法。并且它還創(chuàng)建了BlockGenerator,用于對(duì)Receiver接收的數(shù)據(jù)流進(jìn)行切片操作。
其ReceiverSupervisorImpl的Start方法實(shí)現(xiàn)如下:
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
其中onStart() 會(huì)創(chuàng)建BlockGenerator并啟動(dòng)。
startReceiver()方法,首先會(huì)向ReceiverTracker注冊(cè)Receiver信息,并驗(yàn)證Receiver是否合法。若合法,則調(diào)用Receiver的onStart方法進(jìn)行數(shù)據(jù)接收,其實(shí)現(xiàn)邏輯如下:
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo(s"Starting receiver $streamId")
receiverState = Started
receiver.onStart()
logInfo(s"Called receiver $streamId onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
下面以WordCount中的SocketInputDStream中的SocketReceiver為例進(jìn)行說(shuō)明,其onStart方法實(shí)現(xiàn)如下:
def onStart() {
logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
onStop()
}
}
}
通過(guò)上述實(shí)現(xiàn)可知,其將通過(guò)socket方式進(jìn)行數(shù)據(jù)接收。
Receiver啟動(dòng)流程至此結(jié)束,Receiver啟動(dòng)之后會(huì)接收源源不斷的數(shù)據(jù)流并對(duì)數(shù)據(jù)分片,副本分發(fā)工作,為計(jì)算階段做準(zhǔn)備,接下來(lái)將進(jìn)行數(shù)據(jù)準(zhǔn)備環(huán)節(jié)的分析。