對于長時間運行的Spark Streaming作業(yè),一旦提交到Y(jié)ARN群集便需要永久運行,直到有意停止。任何中斷都會引起嚴重的處理延遲,并可能導致數(shù)據(jù)丟失或重復。YARN和Apache Spark都不是為了執(zhí)行長時間運行的服務而設計的。但是,它們已經(jīng)成功地滿足了近實時數(shù)據(jù)處理作業(yè)的常駐需求。成功并不一定意味著沒有技術挑戰(zhàn)。
這篇博客總結(jié)了在安全的YARN集群上,運行一個關鍵任務且長時間的Spark Streaming作業(yè)的經(jīng)驗。您將學習如何將Spark Streaming應用程序提交到Y(jié)ARN群集,以避免在值班時候的不眠之夜。
Fault tolerance
在YARN集群模式下,Spark驅(qū)動程序與Application Master(應用程序分配的第一個YARN容器)在同一容器中運行。此過程負責從YARN 驅(qū)動應用程序和請求資源(Spark執(zhí)行程序)。重要的是,Application Master消除了在應用程序生命周期中運行的任何其他進程的需要。即使一個提交Spark Streaming作業(yè)的邊緣Hadoop節(jié)點失敗,應用程序也不會受到影響。
要以集群模式運行Spark Streaming應用程序,請確保為spark-submit命令提供以下參數(shù):
spark-submit --master yarn --deploy-mode cluster</pre>
由于Spark驅(qū)動程序和Application Master共享一個JVM,Spark驅(qū)動程序中的任何錯誤都會阻止我們長期運行的工作。幸運的是,可以配置重新運行應用程序的最大嘗試次數(shù)。設置比默認值2更高的值是合理的(從YARN集群屬性yarn.resourcemanager.am.max嘗試中導出)。對我來說,4工作相當好,即使失敗的原因是永久性的,較高的值也可能導致不必要的重新啟動。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4
如果應用程序運行數(shù)天或數(shù)周,而不重新啟動或重新部署在高度使用的群集上,則可能在幾個小時內(nèi)耗盡4次嘗試。為了避免這種情況,嘗試計數(shù)器應該在每個小時都重置。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h
另一個重要的設置是在應用程序發(fā)生故障之前executor失敗的最大數(shù)量。默認情況下是max(2 * num executors,3),非常適合批處理作業(yè),但不適用于長時間運行的作業(yè)。該屬性具有相應的有效期間,也應設置。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h
對于長時間運行的作業(yè),您也可以考慮在放棄作業(yè)之前提高任務失敗的最大數(shù)量。默認情況下,任務將重試4次,然后作業(yè)失敗。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8
Performance
當Spark Streaming應用程序提交到集群時,必須定義運行作業(yè)的YARN隊列。我強烈建議使用YARN Capacity Scheduler并將長時間運行的作業(yè)提交到單獨的隊列。沒有一個單獨的YARN隊列,您的長時間運行的工作遲早將被的大量Hive查詢搶占。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue
Spark Streaming工作的另一個重要問題是保持處理時間的穩(wěn)定性和高度可預測性。處理時間應保持在批次持續(xù)時間以下以避免延誤。我發(fā)現(xiàn)Spark的推測執(zhí)行有很多幫助,特別是在繁忙的群集中。當啟用推測性執(zhí)行時,批處理時間更加穩(wěn)定。只有當Spark操作是冪等時,才能啟用推測模式。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true
Security
在安全的HDFS群集上,長時間運行的Spark Streaming作業(yè)由于Kerberos票據(jù)到期而失敗。沒有其他設置,當Spark Streaming作業(yè)提交到集群時,會發(fā)布Kerberos票證。當票證到期時Spark Streaming作業(yè)不能再從HDFS寫入或讀取數(shù)據(jù)。
在理論上(基于文檔),應該將Kerberos主體和keytab作為spark-submit命令傳遞:
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true --principal user/hostname@domain --keytab /path/to/foo.keytab
實際上,由于幾個錯誤(HDFS-9276, SPARK-11182)必須禁用HDFS緩存。如果沒有,Spark將無法從HDFS上的文件讀取更新的令牌。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true --principal user/hostname@domain --keytab /path/to/foo.keytab --conf spark.hadoop.fs.hdfs.impl.disable.cache=true
Mark Grover指出,這些錯誤只影響在HA模式下配置了NameNodes的HDFS集群。謝謝,馬克
Logging
訪問Spark應用程序日志的最簡單方法是配置Log4j控制臺追加程序,等待應用程序終止并使用yarn logs -applicationId [applicationId]命令。不幸的是終止長時間運行的Spark Streaming作業(yè)來訪問日志是不可行的。
我建議安裝和配置Elastic,Logstash和Kibana(ELK套裝)。ELK的安裝和配置是超出了這篇博客的范圍,但請記住記錄以下上下文字段:
- YARN application id
- YARN container hostname
- Executor id (Spark driver is always 000001, Spark executors start from 000002)
- YARN attempt (to check how many times Spark driver has been restarted)
Log4j配置使用Logstash特定的appender和布局定義應該傳遞給spark-submit命令:
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true --principal user/hostname@domain --keytab /path/to/foo.keytab --conf spark.hadoop.fs.hdfs.impl.disable.cache=true --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties --files /path/to/log4j.properties
最后,Spark Job的Kibana儀表板可能如下所示:

Monitoring
長時間運行的工作全天候運行,所以了解歷史指標很重要。Spark UI僅在有限數(shù)量的批次中保留統(tǒng)計信息,并且在重新啟動后,所有度量標準都消失了。再次,需要外部工具。我建議安裝Graphite用于收集指標和Grafana來建立儀表板。
首先,Spark需要配置為將指標報告給Graphite,準備metrics.properties文件:
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port] *.sink.graphite.prefix=some_meaningful_name
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Graceful stop
最后一個難題是如何以優(yōu)雅的方式停止部署在YARN上的Spark Streaming應用程序。停止(甚至殺死)YARN應用程序的標準方法是使用命令yarn application -kill [applicationId]。這個命令會停止Spark Streaming應用程序,但這可能發(fā)生在批處理中。因此,如果該作業(yè)是從Kafka讀取數(shù)據(jù)然后在HDFS上保存處理結(jié)果,并最終提交Kafka偏移量,當作業(yè)在提交偏移之前停止工作時,您應該預見到HDFS會有重復的數(shù)據(jù)。
解決優(yōu)雅關機問題的第一個嘗試是在關閉程序時回調(diào)Spark Streaming Context的停止方法。
sys.addShutdownHook {
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}
令人失望的是,由于Spark應用程序幾乎立即被殺死,一個退出回調(diào)函數(shù)來不及完成已啟動的批處理任務。此外,不能保證JVM會調(diào)用shutdown hook。
在撰寫本博客文章時,唯一確認的YARN Spark Streaming應用程序的確切方法是通知應用程序關于計劃關閉,然后以編程方式停止流式傳輸(但不是關閉掛鉤)。命令yarn application -kill 如果通知應用程序在定義的超時后沒有停止,則應該僅用作最后手段。
可以使用HDFS上的標記文件(最簡單的方法)或使用驅(qū)動程序上公開的簡單Socket / HTTP端點(復雜方式)通知應用程序。
因為我喜歡KISS原理,下面你可以找到shell腳本偽代碼,用于啟動/停止Spark Streaming應用程序使用標記文件:
start() {
hdfs dfs -touchz /path/to/marker/my_job_unique_name
spark-submit ...
}
stop() {
hdfs dfs -rm /path/to/marker/my_job_unique_name
force_kill=true application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`) for i in `seq 1 10`; do application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)") if [ -n "$application_status" ]; then
sleep 60s else force_kill=false
break fi
done
$force_kill && yarn application -kill ${application_id}
}
在Spark Streaming應用程序中,后臺線程應該監(jiān)視標記文件,當文件消失時停止上下文調(diào)用
streamingContext.stop(stopSparkContext = true, stopGracefully = true).
Summary
可以看到,部署在YARN上的關鍵任務Spark Streaming應用程序的配置相當復雜。以上提出的技術,由一些非常聰明的開發(fā)人員經(jīng)過漫長而冗長乏味的迭代學習。最終,部署在高可用的YARN集群上的長期運行的Spark Streaming應用非常穩(wěn)定。
翻譯:http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/
關注公眾號:“程序員成長軟技能” ,日拱一卒,功不唐捐!