因?yàn)镾park Streaming流程序比較特殊,所以不能直接執(zhí)行kill -9 這種暴力方式停掉,如果使用這種方式停程序,那么就有可能丟失數(shù)據(jù)或者重復(fù)消費(fèi)數(shù)據(jù)。
為什么呢?因?yàn)榱鞒绦蛞坏┢饋砘旧鲜且粋€7*24小時的狀態(tài),除非特殊情況,否則是不會停的,因?yàn)槊繒r每刻都有可能在處理數(shù)據(jù),如果要停,也一定要確認(rèn)當(dāng)前正在處理的數(shù)據(jù)執(zhí)行完畢,并且不能在接受新的數(shù)據(jù),只有這樣才能保證不丟不重。
如何優(yōu)雅的關(guān)閉spark streaming呢?方式主要有三種:
第一種:全人工介入
首先程序里面設(shè)置下面的配置參數(shù)
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")//優(yōu)雅的關(guān)閉
然后按照下面的步驟依次操作:
(1)通過Hadoop 8088頁面找到運(yùn)行的程序
(2)打開spark ui的監(jiān)控頁面
(3)打開executor的監(jiān)控頁面
(4)登錄liunx找到驅(qū)動節(jié)點(diǎn)所在的機(jī)器ip以及運(yùn)行的端口號
(5)然后執(zhí)行一個封裝好的命令
sudo ss -tanlp |? grep 5555 |awk '{print $6}'|awk? -F, '{print $2}' | sudo? xargs kill -15
從上面的步驟可以看出,這樣停掉一個spark streaming程序是比較復(fù)雜的。那么有沒有更加優(yōu)雅的方式來停止它呢?答案是有的
第二種:使用HDFS系統(tǒng)做消息通知
在驅(qū)動程序中,加一段代碼,這段代碼的作用每隔一段時間可以是10秒也可以是3秒,掃描HDFS上某一個文件,如果發(fā)現(xiàn)這個文件存在,就調(diào)用StreamContext對象stop方法,自己優(yōu)雅的終止自己,其實(shí)這里HDFS可以換成redis,zk,hbase,db都可以,這里唯一的問題就是依賴了外部的一個存儲系統(tǒng)來達(dá)到消息通知的目的,如果使用了這種方式后。停止流程序就比較簡單了,登錄上有hdfs客戶端的機(jī)器,然后touch一個空文件到指定目錄,然后等到間隔的掃描時間到之后,發(fā)現(xiàn)有文件存在,就知道需要關(guān)閉程序了。
第三種:內(nèi)部暴露一個socket或者h(yuǎn)ttp端口用來接收請求,等待觸發(fā)關(guān)閉流程序
這種方式,需要在driver啟動一個socket線程,或者h(yuǎn)ttp服務(wù),這里推薦使用http服務(wù),因?yàn)閟ocket有點(diǎn)偏底層處理起來稍微復(fù)雜點(diǎn),如果使用http服務(wù),我們可以直接用內(nèi)嵌的jetty,對外暴露一個http接口,spark ui頁面用的也是內(nèi)嵌的jetty提供服務(wù),所以我不需要在pom里面引入額外的依賴,在關(guān)閉的時候,找到驅(qū)動所在ip,就可以直接通過curl或者瀏覽器就直接關(guān)閉流程序。
找到驅(qū)動程序所在的ip,可以在程序啟動的log中看到,也可以在spark master ui的頁面上找到。這種方式不依賴任何外部的存儲系統(tǒng),僅僅部署的時候需要一個額外的端口號用來暴露http服務(wù)。
至此,關(guān)于優(yōu)雅的停止spark streaming的主流方式已經(jīng)介紹完畢,推薦使用第二種或者第三種,如果想要最大程度減少對外部系統(tǒng)的依賴,推薦使用第三種方式。