Spark 默認采用的是資源預(yù)分配的方式。這其實也和按需做資源分配的理念是有沖突的。這篇文章會詳細介紹Spark 動態(tài)資源分配原理。
前言
最近在使用Spark Streaming程序時,發(fā)現(xiàn)如下幾個問題:
- 高峰和低峰Spark Streaming每個周期要處理的數(shù)據(jù)量相差三倍以上,預(yù)分配資源會導(dǎo)致低峰的時候資源的大量浪費。
- Spark Streaming 跑的數(shù)量多了后,資源占用相當(dāng)可觀。
所以便有了要開發(fā)一套針對Spark Streaming 動態(tài)資源調(diào)整的想法。我在文章最后一個章節(jié)給出了一個可能的設(shè)計方案。不過要做這件事情,首先我們需要了解現(xiàn)有的Spark 已經(jīng)實現(xiàn)的 Dynamic Resource Allocation 機制,以及為什么它無法滿足現(xiàn)有的需求。
入口
在SparkContext 中可以看到這一行:
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
}
通過spark.dynamicAllocation.enabled參數(shù)開啟后就會啟動ExecutorAllocationManager。
這里有我第一個吐槽的點,這么直接new出來,好歹也做個配置,方便第三方開發(fā)個新的組件可以集成進去。但是Spark很多地方都是這么搞的,完全沒有原來Java社區(qū)的風(fēng)格。
動態(tài)調(diào)整資源面臨的問題
我們先看看,動態(tài)資源調(diào)整需要解決哪幾個問題:
- Cache問題。如果需要移除的Executor含有RDD cache該如何辦?
- Shuffle問題。 如果需要移除的Executor包含了Shuffle Write先關(guān)數(shù)據(jù)該怎么辦?
- 添加和刪除之后都需要告知DAGSchedule進行相關(guān)信息更新。
Cache去掉了重算即可。為了防止數(shù)據(jù)抖動,默認包含有Cache的Executor是不會被刪除的,因為默認的Idle時間設(shè)置的非常大:
private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout",
s"${Integer.MAX_VALUE}s")
你可以自己設(shè)置從而去掉這個限制。
而對于Shuffle,則需要和Yarn集成,需要配置yarn.nodemanager.aux-services。具體配置方式,大家可以Google。這樣Spark Executor就不用保存Shuffle狀態(tài)了。
觸發(fā)條件
添加Worker的觸發(fā)條件是:
- 有Stage正在運行,并且預(yù)估需要的Executors > 現(xiàn)有的
刪除Woker的觸發(fā)條件是:
- 一定時間內(nèi)(默認60s)沒有task運行的Executor
我們看到觸發(fā)條件還是比較簡單的。這種簡單就意味著用戶需要根據(jù)實際場景,調(diào)整各個時間參數(shù),比如到底多久沒有運行task的Executor才需要刪除。
默認檢測時間是100ms:
private val intervalMillis: Long = 100
如何實現(xiàn)Container的添加和釋放
只有ApplicationMaster才能夠向Yarn發(fā)布這些動作。而真正的中控是org.apache.spark.ExecutorAllocationManager,所以他們之間需要建立一個通訊機制。對應(yīng)的方式是在ApplicationMaster有一個private class AMEndpoint(類,比如刪除釋放容器的動作在里就有:
case KillExecutors(executorIds) =>
logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
Option(allocator) match {
case Some(a) => executorIds.foreach(a.killExecutor)
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
context.reply(true)
而ExecutorAllocationManager則是引用YarnSchedulerBackend實例,該實例持有ApplicationMaster的 RPC引用
private var amEndpoint: Option[RpcEndpointRef]
如何獲取調(diào)度信息
要觸發(fā)上面描述的操作,就需要任務(wù)的調(diào)度信息。這個是通過ExecutorAllocationListener extends SparkListener來完成的。具體是在 ExecutorAllocationMaster的start函數(shù)里,會將該Listener實例添加到SparkContext里的listenerBus里,從而實現(xiàn)對DAGSchecude等模塊的監(jiān)聽。機制可以參看這篇文章 Spark ListenerBus 和 MetricsSystem 體系分析。
根據(jù)上面的分析,我們至少要知道如下三個信息:
- Executor上是否為空,如果為空,就可以標(biāo)記為Idle.只要超過一定的時間,就可以刪除掉這個Executor.
- 正在跑的Task有多少
- 等待調(diào)度的Task有多少
這里是以Stage為區(qū)分的。分別以三個變量來表示:
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
名字已經(jīng)很清楚了。值得說的是stageIdToTaskIndices,其實就是stageId 對應(yīng)的正在運行的task id 集合。
那么怎么計算出等待調(diào)度的task數(shù)量呢?計算方法如下:
stageIdToNumTasks(stageId) - stageIdToTaskIndices(stageId).size
這些都是動態(tài)更新變化的,因為有了監(jiān)聽器,所以任務(wù)那邊有啥變化,這邊都會得到通知。
定時掃描器
有了上面的鋪墊,我們現(xiàn)在進入核心方法:
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
}
!expired
}
}
該方法會每隔100ms被調(diào)度一次。你可以理解為一個監(jiān)控線程。
Executor判定為空閑的機制
只要有一個task結(jié)束,就會判定有哪些Executor已經(jīng)沒有任務(wù)了。然后會被加入待移除列表。在放到removeTimes的時候,會把當(dāng)前時間now + executorIdleTimeoutS * 1000 作為時間戳存儲起來。當(dāng)調(diào)度進程掃描這個到Executor時,會判定時間是不是到了,到了的話就執(zhí)行實際的remove動作。在這個期間,一旦有task再啟動,并且正好運行在這個Executor上,則又會從removeTimes列表中被移除。 那么這個Executor就不會被真實的刪除了。
Executor 需要增加的情況
首先,系統(tǒng)會根據(jù)下面的公式計算出實際需要的Executors數(shù)目:
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
接著每個計算周期到了之后,會和當(dāng)前已經(jīng)有的Executors數(shù):numExecutorsTarget 進行比較。
如果發(fā)現(xiàn) maxNumExecutorsNeeded < numExecutorsTarget 則會發(fā)出取消還有沒有執(zhí)行的Container申請。并且重置每次申請的容器數(shù)為1,也就是numExecutorsToAdd=1
否則如果發(fā)現(xiàn)當(dāng)前時間now >= addTime(addTime 每次會增加一個sustainedSchedulerBacklogTimeoutS ,避免申請容器過于頻繁),則會進行新容器的申請,如果是第一次,則增加一個(numExecutorsToAdd),如果是第二次則增加2個以此按倍數(shù)類推。直到maxNumExecutorsNeeded <= numExecutorsTarget ,然后就會重置numExecutorsToAdd。
所以我們會發(fā)現(xiàn),我們并不是一次性就申請足夠的資源,而是每隔sustainedSchedulerBacklogTimeoutS次時間,按[1,2,4,8]這種節(jié)奏去申請資源的。因為在某個sustainedSchedulerBacklogTimeoutS期間,可能已經(jīng)有很多任務(wù)完成了,其實不需要那么多資源了。而按倍數(shù)上升的原因是,防止為了申請到足夠的資源時間花費過長。這是一種權(quán)衡。
DRA評價
我們發(fā)現(xiàn),DRA(Dynamic Resource Allocation)涉及到的點還是很多的,雖然邏輯比較簡單,但是和任務(wù)調(diào)度密切相關(guān),是一個非常動態(tài)的過程。這個設(shè)計本身也是面向一個通用的調(diào)度方式。
我個人建議如果采用了DRA,可以注意如下幾點:
- 設(shè)置一個合理的minExecutors-maxExecutors值
- 將Executor對應(yīng)的cpuCore 最好設(shè)置為<=3 ,避免Executor數(shù)目下降時,等不及新申請到資源,已有的Executor就因為任務(wù)過重而導(dǎo)致集群掛掉。
- 如果程序中有shuffle,例如(reduce,groupBy),建議設(shè)置一個合理的并行數(shù),避免殺掉過多的Executors。
- 對于每個Stage持續(xù)時間很短的應(yīng)用,其實不適合這套機制。這樣會頻繁增加和殺掉Executors,造成系統(tǒng)顛簸。而Yarn對資源的申請?zhí)幚硭俣炔⒉豢臁?/li>
Spark Streaming該使用什么機制動態(tài)調(diào)整資源
現(xiàn)有的DRA機制其實適合長時的批處理過程中,每個Stage需要的資源量不一樣,并且耗時都比較長。Spark Streaming 可以理解為循環(huán)的微批處理。而DRA是在每次微批處理起作用,可能還沒等DRA反應(yīng)過來,這個周期就已經(jīng)過了。
Spark Streaming需要一個從全局一天24小時來考慮。每個調(diào)度周期的processing time可能更適合作為增減Executors的標(biāo)準(zhǔn)。同時如果發(fā)生delay的話,則可以擴大資源申請的速度。并且,因為是周期性的,釋放和新增動作只會發(fā)生在一個新的周期的開始,所以他并不會面臨現(xiàn)有 DRA的問題,譬如需要通過額外的方式保存Shuffle 狀態(tài)等。 所以實現(xiàn)起來更加容易。我們可能需要同時監(jiān)聽StreamingContext的一些信息。
具體而言:
每個周期檢查上個周期的處理時間 ,設(shè)為 preProcessingTime,周期為duration, 一般而言,我們的Spark Streaming程序都會讓preProcessingTime < duration。否則會發(fā)生delay。
如果 preProcessingTime > 0.8 * duration,則一次性將資源申請到maxExecutors。
如果preProcessingTime < duration,則應(yīng)該刪除的Worker為
removeExecutorNum = currentExecutors * ((duration -preProcessingTime)/duration - 0.2)
其中0.2 為預(yù)留的worker數(shù)。如果removeExecutorNum如果<=0 則不進行任何操作。
假設(shè)duration =10s, preProcessingTime= 5s, currentExecutors=100,則我們理論上認為 只要保留50%的資源即可。
但是為了防止延時,我們其實額外保留一些20%資源。也就意味著我們刪除30個Executor。 我們并不會一次性將資源都釋放掉。假設(shè)我們增加一個新的參數(shù)spark.streaming.release.num.duration=5,這個參數(shù)意味著我們需要花費5個周期釋放掉這30個Executor的資源。也就是當(dāng)前這個周期,我們要釋放掉 6個Executor。
接著到下一個周期,重復(fù)上面的計算。 直到計算結(jié)果 <=0 為止。