前言
Spark YarnShuffleService是作為Hadoop Yarn模塊中NodeManager的輔助服務(wù)寄生在其進(jìn)程內(nèi)部,大家都知道可以通過這個外部服務(wù)來削減Executor自身在shuffle過程中的壓力,且得益于這個服務(wù)的常駐特性,Shuffle Write的文件可以跟著這個服務(wù)走,就可以實(shí)現(xiàn)動態(tài)資源分配等Spark的高級特性。
當(dāng)然,這個過程中我們把壓力實(shí)則轉(zhuǎn)移到了NodeManger上,NodeManager的如果有過重的GC問題,在響應(yīng)Shuffle Client的時候就會有問題,很多Shuffle Client的頻繁重試乃至FetchFailed相關(guān)的異常都基本和這個方面有關(guān)。另外,也可能給NodeManager帶來OOM的風(fēng)險,比如 YARN-7110
不得不說,Spark YarnShuffleService也是一個相對穩(wěn)定的模塊,高版本兼容低版本,低版本兼容高版本很多時候都沒什么問題,因?yàn)檫@個模塊幾乎沒有改動。此外由于這個服務(wù)的Jar包是需要放在NodeManager的ClassPath中,推動Hadoop的升級相對繁瑣與困難,在兼容性測試通過的基礎(chǔ)上,我們也不樂于去做這個推動者,以至于我們生產(chǎn)集群早起部署的Spark 2.1.2 based的分支光榮的承載的Spark 1.6.x~Spark 2.3.x。
凡事都有契機(jī),
For users who enabled external shuffle service, this feature can only be worked when external shuffle service is newer than Spark 2.2.
這種情況就不得不升級了。
趁著夜黑風(fēng)高,Hadoop Team上Capacity Scheduler/CGroup/NodeLabel/開超售,上2.9等等等等,我們這有個Spark 2.3.2的jar包,兄弟們拿去用吧。
問題來了(現(xiàn)象)
當(dāng)然,這么大規(guī)模的升級動作,必然不能一蹴而就,經(jīng)歷的N次回滾之后,終于算是“塵埃落定”。當(dāng)用戶大爺們開始進(jìn)場了,那么問題就來了。
1. NodeManager進(jìn)程吃CPU
48核的機(jī)器,每個核60-80%的使用率,都給NodeManager進(jìn)程吃了,還能不能好好跑Container了?
趕緊把NodeManager的jstack一打

哇哦,IO類的操作這么吃CPU,什么情況?
擼下代碼
/** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
原來是Shuffle Server服務(wù)端用了96個線程
/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
* at a premium.
*
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
* manually in Spark's configuration.
*/
private val MAX_DEFAULT_NETTY_THREADS = 8
根據(jù)官方描述2-4個線程基本就可以滿足萬兆網(wǎng)的數(shù)據(jù)傳輸服務(wù)了,保險起見那我們就把這個參數(shù)設(shè)置成8吧。
spark.shuffle.io.serverThreads=8
果然NodeManager的CPU消耗降下來了,CPU被限制在了600%-800%,但這幾個CPU相當(dāng)于還是爆滿的。
2. ExecutorLostFailure
Spark啟動Executor之后,首先由Executor端的CoarseGrainedExecutorBackend發(fā)送RegisterExecutor消息到Driver端的CoarseGrainedSchedulerBackend
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
CoarseGrainedSchedulerBackend收到這個消息之后, 塞進(jìn)executorDataMap完成executor的注冊,然后發(fā)送一個異步消息給CoarseGrainedExecutorBackend(就不管了),就就就就開始makeOffers, 下發(fā)task到這個executor了。
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
//此處刪掉一些代碼
val data = new ExecutorData(executorRef, executorAddress, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
CoarseGrainedExecutorBackend 接受到RegisteredExecutor消息后開始處理
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
這個過程其實(shí)才算Executor本尊的初始化過程,而且這個過程其實(shí)相對來說是比較“耗時”“容易出問題”的,一個是要完成block manager的初始化,其中需要完成Shuffle Client的初始化,如果是開啟了External Shuffle Service服務(wù),那就需要 registerWithExternalShuffleServer,實(shí)例化并注冊Shuffle Client到Shuffle Server端(也就是NodeManager 7337端口),
這個分為兩個過程,一個是創(chuàng)建Shuffle Client的實(shí)例,另一個是Shuffle Client基于spark.shuffle.registration.timeout 5000的超時間隔發(fā)一個RegisterExecutor的同步消息給Shuffle Server,然后基于spark.shuffle.registration.maxAttempts 3進(jìn)行這個周期的重試,當(dāng)這兩個參數(shù)設(shè)置過大,或者Shuffle Client實(shí)例化的時間過長,就會阻塞整個Executor本尊的初始化過程,這個過程完成后才能啟動Executor端的心跳上報機(jī)制,當(dāng)上個過程不完成,心跳沒上報,Driver端就無法更新Executor的最新狀態(tài)。這個時候Task已然可能已經(jīng)分發(fā)到這個Executor上,然后就會出現(xiàn)以下這類ExecutorLostFailure,往往Executor查看日志端也并沒有task被分配到這個Executor上。

往往
Executor查看日志端也并沒有task被分配到這個Executor上?;旧纤⑼晗旅娴娜罩静怀晒B接,就會結(jié)果了這個Executor
2019-08-02 02:23:36,064 [129209] - ERROR [dispatcher-event-loop-0:Logging$class@91] - Failed to connect to external shuffle server, will retry 14 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to hadoop3909.jd.163.org/10.196.68.54:7337
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201)
at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142)
at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:289)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:286)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:260)
at org.apache.spark.executor.Executor.<init>(Executor.scala:116)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
這塊總體上可以算是Spark本身的一個缺陷,在executor未真正初始化之前不應(yīng)該將它算成一個可用的計算節(jié)點(diǎn),不應(yīng)該將任務(wù)下發(fā)到這個節(jié)點(diǎn),明顯的這邊task處于無效的等待中,最后無謂的失敗掉!
另外當(dāng)開始Dynamic Executor Allocation特性和BlackList特性時,如果minExecutors值設(shè)置較小,如1,如果任務(wù)被分配到這個節(jié)點(diǎn)上,就可能出現(xiàn)cannot run anywhere due to node and executor blacklist樣子的錯誤,導(dǎo)致整個Job 被殺掉,這問題就更嚴(yán)重了。兩個增強(qiáng)Spark自身魯棒性的功能,配合另一個反而更加容易讓其掛掉,呵呵。
大量的FetchFailed
一種情況是,服務(wù)端主動或被動關(guān)閉連接,客戶端收到RST信號
org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
這種一般是由于服務(wù)端壓力過大造成的,對于這類異常一般都是通過調(diào)整以下兩個參數(shù)來調(diào)整客戶端的重試次數(shù)來給客戶端自己更多的機(jī)會,加大等待間隔來給服務(wù)端更多的喘息機(jī)會來消化自身的壓力。
spark.shuffle.io.maxRetries 15
spark.shuffle.io.retryWait 6s
但是服務(wù)端的壓力實(shí)在過大就需要從服務(wù)端入手了,盲目的調(diào)整這兩個參數(shù)只會讓你的任務(wù)更慢的失敗而已。
另一種情況是,客戶端判斷有請求但通道中沒有流量,這個時候超過網(wǎng)絡(luò)超時的設(shè)置就會斷開連接,
2019-08-02 01:31:57,119 [673532] - INFO [Executor task launch worker for task 5038:Logging$class@54] - Code generated in 32.595886 ms
2019-08-02 01:46:55,672 [1572085] - ERROR [shuffle-client-6-4:TransportChannelHandler@144] - Connection to hadoop3816.jd.163.org/10.196.77.46:7337 has been quiet for 900000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
2019-08-02 01:46:55,674 [1572087] - ERROR [shuffle-client-6-4:TransportResponseHandler@144] - Still have 4 requests outstanding when connection from hadoop3816.jd.163.org/10.196.77.46:7337 is closed
2019-08-02 01:46:55,675 [1572088] - ERROR [shuffle-client-6-4:OneForOneBlockFetcher$1@138] - Failed while starting block fetches
java.io.IOException: Connection from hadoop3816.jd.163.org/10.196.77.46:7337 closed
此時同一Executor上的別的task復(fù)用這個客戶端時就會出現(xiàn) connection * closed異常,注意日志第一行和第二行的時間間隔,這時候由于spark.network.timeout的設(shè)置,這個沒有流量的通道其實(shí)閑置了近15分鐘的時間,我們可以通過設(shè)置spark.shuffle.io.connectionTimeout=xxs來單獨(dú)控制這段邏輯的超時時間,而不是用spark.network.timeout的統(tǒng)一設(shè)置。
什么原因(本質(zhì))
Shuffle 過程本質(zhì)上是一個IO密集型的操作,但CPU消耗過大,其實(shí)才是真正的問題所在,再加上用戶作業(yè)在整個過程中并不是一個變量,變量只有Hadoop相關(guān)的修改,及Spark Yarn ShuffleService的2.1-> 2.3的變化,拋開Hadoop先不說,先看下Spark自身的變化。
SPARK-15074,由于發(fā)現(xiàn)做一個大任務(wù)的時候花了大把的時間在shuffle fetch過程中,jstack信息顯示主要的時間是花在了反復(fù)地讀取index文件上,這和我們的場景很像,這個issue對應(yīng)的PR中以entry的方式緩存index文件,而我們用的Spark 2.3通過SPARK-21501之后已經(jīng)改成了以文件大小的方式緩存,默認(rèn)的話只有100m的大小,對于集群上大部分shuffle作業(yè),這點(diǎn)基本上不夠看,等于沒有緩存,所以大量的開銷都花在了index文件的讀取上。
通過調(diào)整spark.shuffle.service.index.cache.size=6144m,大大縮減這塊的開銷,減少了NodeManager CPU壓力,緩解了Shuffle Client注冊,連接、傳輸超時和斷連的問題。
總結(jié)
- 不建議通過調(diào)整spark.network.timeout來調(diào)整所有spark網(wǎng)絡(luò)超時相關(guān)的參數(shù),影響面太廣,不可控,建議通過不同的timeout參數(shù)控制不同的邏輯
- Spark 大超時參數(shù)可能影響Executor注冊及Task調(diào)度的初始邏輯,這里有坑,詳細(xì)見上面
- 要注意Spark Yarn Shuffle Service在Index文件緩存方式上的變化,2.1的時候通過entry個數(shù)來限制總量,當(dāng)單entry過大時容易造成NodeManager的內(nèi)存壓力和OOM風(fēng)險,2.3的 spark.shuffle.service.index.cache.size 100M太小,需要調(diào)整。