問題描述
基于Spark 3.0-SNAPSHOT(unreleased),做Spark-Terasort相關(guān)測試,任務正常的話分如下圖所示兩個stage,

第一個,stage 0,讀取hdfs input目錄數(shù)據(jù),并進行shuffle write
第二個,stage 1,進行shuffle read,并向hdfs output目錄的輸出
其中一次測試由于hdfs存儲的配額不足,導致stage 1失敗,fail 整個spark job,如下圖所示。

此時按照Spark on k8s正常的邏輯,會執(zhí)行到SparkContext.stop, 各類線程該停停該關(guān)關(guān),各executor進程應該收到exit的命令,然后做完這些,主線程退出,留給JVM收尾最后Driver 進程停止。當然Driver pod會留給k8s去進行垃圾回收。
然而在實際的情況下,卻發(fā)現(xiàn)整個Spark作業(yè)依然占著k8s集群的資源,Driver pod狀態(tài)一直處于running的狀態(tài)。

在client側(cè)自然也無法獲得該作業(yè)的“實際狀態(tài)”

在這種情況下,Spark on k8s作業(yè)就無法像類似Spark on yarn的作業(yè),不依靠一些額外的監(jiān)控手段才能感知app的運行狀態(tài)。
測試的過程中,模擬了各種失敗的場景,stage/job級別的異常基本上,都讓app卡死了.
原因分析
分析Spark on k8s作業(yè)的異常,和其他調(diào)度器(yarn等)作業(yè)也基本一致,各進程的日志信息,jstack信息,gc信息等,還可以通過Spark UI 獲取一些作業(yè)相關(guān)的信息。區(qū)別點在于Spark on k8s可能還需要看下各個Pod的狀態(tài)等相關(guān)信息
-
首先查看Driver Pod狀態(tài),未見異常
driver pod status -
查看Driver/executor進程jstack
driver jstack

果不其然,driver 進程中 DestoryJavaJVM被一個OkHttp WebSocket...非daemon線程給攔住了去路...
根據(jù)線程的名字可以猜到這個driver端啟動的k8s api server 通信的client有關(guān)
翻翻Spark 源碼
- Driver側(cè)的 k8s client有定義關(guān)閉自己的邏輯
- Spark 在初始化OkHttpClient的時候把ping interval設(shè)置為0,
val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null)))
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withRequestTimeout(clientType.requestTimeout(sparkConf))
.withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
而OkHttpClient 中 pingIntervalMillis 為0時,這個線程并不會被調(diào)度。。
public void initReaderAndWriter(
String name, long pingIntervalMillis, Streams streams) throws IOException {
synchronized (this) {
this.streams = streams;
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
if (!messageAndCloseQueue.isEmpty()) {
runWriter(); // Send messages that were enqueued before we were connected.
}
}
reader = new WebSocketReader(streams.client, streams.source, this);
}
開始凌亂了。。。
既然定義了自己關(guān)閉的邏輯,沒有正常的關(guān)閉,想必是有沒法捕獲的系統(tǒng)異常發(fā)生,比如OOM
但是第2點就沒法解釋了,jar包沖突,不兼容?嘗試把okhttp的構(gòu)件升級到kubernetes-client的依賴版本(3.12.0)依然沒有用。。
解決辦法
https://issues.apache.org/jira/browse/SPARK-27927
https://issues.apache.org/jira/browse/SPARK-27812
目前社區(qū)對這個問題有兩個類似的issue跟蹤,其中也不乏一些嘗試,不乏一些原因的猜測,但貌似都沒找到根源。
可以嘗試的方法,
- 回退kubernete-client到3.0版本
- 加上 -XX:OnOutOfMemoryError="kill -9 %p"來跑,可以規(guī)避一些driver oom導致的問題
- 使用UncaughtExceptionHandler
- 調(diào)用System.exit

