關(guān)注公眾號(hào):“程序員成長軟技能” ,日拱一卒,功不唐捐!????????Spark Streaming 應(yīng)用定位是長期執(zhí)行的。但如何優(yōu)雅的關(guān)閉它,使正在被處理的消息在作業(yè)停止前被妥善處理?很多博文建議我們必須通過JVM關(guān)閉的鉤子,可在此 查看相關(guān)代碼。但是,這個(gè)方法在新的Spark版本(1.4版本之后)中不能正常工作,并且會(huì)引起死鎖情況。
????????目前有兩種方式去優(yōu)雅的關(guān)閉Spark Streaming作業(yè)。第一種方法是設(shè)置spark.streaming.stopGracefullyOnShutdown參數(shù)值為true(默認(rèn)是false)。這個(gè)參數(shù)在解決Spark優(yōu)雅關(guān)閉的issue中引入。開發(fā)者不再需要去調(diào)用ssc.stop()函數(shù),只需要向Driver發(fā)送SIGTERM信號(hào)。在實(shí)踐中,我們需要如下操作:
- 在Spark UI上找到Driver進(jìn)程運(yùn)行在哪個(gè)節(jié)點(diǎn)。在Yarn Cluster部署模式下,Driver進(jìn)程和AM運(yùn)行在同一個(gè)Container。
- 登陸運(yùn)行Driver的節(jié)點(diǎn),并且執(zhí)行ps -ef |grep java |grep ApplicationMaster 去找到進(jìn)程ID。請(qǐng)注意,你搜索的字符串可能會(huì)因?yàn)樽鳂I(yè)或者環(huán)境等原因不同。
- 執(zhí)行kill -SIGTERM <AM-PID> 命令,發(fā)送SIGTERM信號(hào)給進(jìn)程。
在Spark Driver接收到SIGTERM信號(hào)后,你會(huì)在日志中看到類似如下的消息:
17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM*
17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook...
17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully**
17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook...
17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext...
17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called*
????????需要注意,默 spark.yarn.maxAppAttempts默認(rèn)使用Yarn的yarn.resourcemanager.am.max-attempts的值。而yarn.resourcemanager.am.max-attempts值默認(rèn)為2。因此,在執(zhí)行kill命令A(yù)M第一次停止后,Yarn將會(huì)自動(dòng)啟動(dòng)另一個(gè)AM/Driver。你需要第二次kill掉它。你可以在spark-submit設(shè)置--conf spark.yarn.maxAppAttempts=1 ,但是你必須考慮清楚,因?yàn)槿绱伺渲煤驞river失敗后將沒機(jī)會(huì)重試。
????????你不能使用yarn application -kill <applicationid>去kill作業(yè)。這個(gè)命令不會(huì)發(fā)送SIGTERM信號(hào)給container,而是幾乎同時(shí)發(fā)送SIGKILL信號(hào)。SIGTERM和SIGKILL之間的時(shí)間間隔可以使用yarn.nodemanager.sleep-delay-before-sigkill.ms (默認(rèn) 250)去配置。當(dāng)然,你可以增大該值,但是, 在一定程度上,即使我調(diào)整到60000(1分鐘),它仍然不起作用。作業(yè)的containers幾乎是立即被kill掉,并且日志中僅包含如下內(nèi)容:
17/02/02 12:12:27 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM*
17/02/02 12:12:27 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook*
????????所以,我不建議使用yarn application -kill <applicationid> 命令去發(fā)送SIGTERM信號(hào)。
????????第二個(gè)解決方案是以某種方式通知Spark Streaming應(yīng)用它需要優(yōu)雅的關(guān)閉,而不是使用SIGTERM信號(hào)。一種方式是在HDFS上放一個(gè)標(biāo)識(shí)文件,Spark Streaming應(yīng)用周期性的去檢測(cè)它。如果標(biāo)識(shí)文件存在了,就調(diào)用scc.stop(true, true) 。第一個(gè)true意思是Spark context需要被停止。第二個(gè)true意思是需要優(yōu)雅的關(guān)閉,允許正在處理的消息完成。
????????至關(guān)重要的是,不要在micro-batch的代碼中調(diào)用ssc.stop(true, true),試想一下,如果你在微批代碼中調(diào)用ssc.stop(true, true),它將等待所有正在被處理的消息完成,包括當(dāng)前正在執(zhí)行的微批。但是,當(dāng)前的微批不會(huì)結(jié)束,直到ssc.stop(true, true)結(jié)束返回。這是一種死鎖的情況。所以,你必須在另一個(gè)線程中執(zhí)行標(biāo)識(shí)文件檢測(cè)和調(diào)用ssc.stop(true, true)。我在github上放了一個(gè)簡單的樣例,此樣例里我在mian線程中在ssc.start()后執(zhí)行檢測(cè)和調(diào)用ssc.stop() 。你可以在這里找到源碼。當(dāng)然,使用HDFS標(biāo)識(shí)文件僅僅是一種方法,其他可選擇的方法有使用一個(gè)單獨(dú)的線程監(jiān)聽一個(gè)socket,啟動(dòng)一個(gè)RESTful服務(wù)等等。
????????期待在將來的release中,Spark會(huì)考慮更優(yōu)雅的方案。比如,在Spark UI中可以增加一個(gè)按鈕,去優(yōu)雅的停止Spark Streaming作業(yè),這樣,我們就不需要憑借定制化的編碼或者使用PID和SIGTERM信號(hào)了。
翻譯:http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html
關(guān)注公眾號(hào):“程序員成長軟技能” ,日拱一卒,功不唐捐!