【轉(zhuǎn)載】Yarn上常駐Spark-Streaming程序調(diào)優(yōu)

對于長時間運行的Spark Streaming作業(yè),一旦提交到Y(jié)ARN群集便需要永久運行,直到有意停止。任何中斷都會引起嚴重的處理延遲,并可能導(dǎo)致數(shù)據(jù)丟失或重復(fù)。YARN和Apache Spark都不是為了執(zhí)行長時間運行的服務(wù)而設(shè)計的。但是,它們已經(jīng)成功地滿足了近實時數(shù)據(jù)處理作業(yè)的常駐需求。成功并不一定意味著沒有技術(shù)挑戰(zhàn)。
這篇博客總結(jié)了在安全的YARN集群上,運行一個關(guān)鍵任務(wù)且長時間的Spark Streaming作業(yè)的經(jīng)驗。您將學(xué)習如何將Spark Streaming應(yīng)用程序提交到Y(jié)ARN群集,以避免在值班時候的不眠之夜。

Fault tolerance

在YARN集群模式下,Spark驅(qū)動程序與Application Master(應(yīng)用程序分配的第一個YARN容器)在同一容器中運行。此過程負責從YARN 驅(qū)動應(yīng)用程序和請求資源(Spark執(zhí)行程序)。重要的是,Application Master消除了在應(yīng)用程序生命周期中運行的任何其他進程的需要。即使一個提交Spark Streaming作業(yè)的邊緣Hadoop節(jié)點失敗,應(yīng)用程序也不會受到影響。

要以集群模式運行Spark Streaming應(yīng)用程序,請確保為spark-submit命令提供以下參數(shù):

spark-submit --master yarn --deploy-mode cluster

由于Spark驅(qū)動程序和Application Master共享一個JVM,Spark驅(qū)動程序中的任何錯誤都會阻止我們長期運行的工作。幸運的是,可以配置重新運行應(yīng)用程序的最大嘗試次數(shù)。設(shè)置比默認值2更高的值是合理的(從YARN集群屬性yarn.resourcemanager.am.max嘗試中導(dǎo)出)。對我來說,4工作相當好,即使失敗的原因是永久性的,較高的值也可能導(dǎo)致不必要的重新啟動。

spark-submit --master yarn --deploy-mode cluster  --conf spark.yarn.maxAppAttempts=4

如果應(yīng)用程序運行數(shù)天或數(shù)周,而不重新啟動或重新部署在高度使用的群集上,則可能在幾個小時內(nèi)耗盡4次嘗試。為了避免這種情況,嘗試計數(shù)器應(yīng)該在每個小時都重置。

spark-submit --master yarn --deploy-mode cluster     \
--conf spark.yarn.maxAppAttempts=4  \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h

另一個重要的設(shè)置是在應(yīng)用程序發(fā)生故障之前executor失敗的最大數(shù)量。默認情況下是max(2 * num executors,3),非常適合批處理作業(yè),但不適用于長時間運行的作業(yè)。該屬性具有相應(yīng)的有效期間,也應(yīng)設(shè)置。

spark-submit --master yarn --deploy-mode cluster  \
   --conf spark.yarn.maxAppAttempts=4  \
   --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
   --conf spark.yarn.max.executor.failures={8 * num_executors}  \
   --conf spark.yarn.executor.failuresValidityInterval=1h

對于長時間運行的作業(yè),您也可以考慮在放棄作業(yè)之前提高任務(wù)失敗的最大數(shù)量。默認情況下,任務(wù)將重試4次,然后作業(yè)失敗。

spark-submit --master yarn --deploy-mode cluster    \
 --conf spark.yarn.maxAppAttempts=4  \
 --conf spark.yarn.am.attemptFailuresValidityInterval=1h   \
 --conf spark.yarn.max.executor.failures={8 * num_executors}  \
 --conf spark.yarn.executor.failuresValidityInterval=1h  \
 --conf spark.task.maxFailures=8

Performance

當Spark Streaming應(yīng)用程序提交到集群時,必須定義運行作業(yè)的YARN隊列。我強烈建議使用YARN Capacity Scheduler并將長時間運行的作業(yè)提交到單獨的隊列。沒有一個單獨的YARN隊列,您的長時間運行的工作遲早將被的大量Hive查詢搶占。

spark-submit --master yarn --deploy-mode cluster  \
   --conf spark.yarn.maxAppAttempts=4  \
   --conf spark.yarn.am.attemptFailuresValidityInterval=1h  \
   --conf spark.yarn.max.executor.failures={8 * num_executors}  \
   --conf spark.yarn.executor.failuresValidityInterval=1h  \
   --conf spark.task.maxFailures=8  \
   --queue realtime_queue

Spark Streaming工作的另一個重要問題是保持處理時間的穩(wěn)定性和高度可預(yù)測性。處理時間應(yīng)保持在批次持續(xù)時間以下以避免延誤。我發(fā)現(xiàn)Spark的推測執(zhí)行有很多幫助,特別是在繁忙的群集中。當啟用推測性執(zhí)行時,批處理時間更加穩(wěn)定。只有當Spark操作是冪等時,才能啟用推測模式。

spark-submit --master yarn --deploy-mode cluster  \
 --conf spark.yarn.maxAppAttempts=4  \
 --conf spark.yarn.am.attemptFailuresValidityInterval=1h  \
 --conf spark.yarn.max.executor.failures={8 * num_executors}  \
 --conf spark.yarn.executor.failuresValidityInterval=1h  \
 --conf spark.task.maxFailures=8  \
 --queue realtime_queue \
 --conf spark.speculation=true

Security

在安全的HDFS群集上,長時間運行的Spark Streaming作業(yè)由于Kerberos票據(jù)到期而失敗。沒有其他設(shè)置,當Spark Streaming作業(yè)提交到集群時,會發(fā)布Kerberos票證。當票證到期時Spark Streaming作業(yè)不能再從HDFS寫入或讀取數(shù)據(jù)。
在理論上(基于文檔),應(yīng)該將Kerberos主體和keytab作為spark-submit命令傳遞:

spark-submit --master yarn --deploy-mode cluster  \
   --conf spark.yarn.maxAppAttempts=4  \
   --conf spark.yarn.am.attemptFailuresValidityInterval=1h  \
   --conf spark.yarn.max.executor.failures={8 * num_executors}  \
   --conf spark.yarn.executor.failuresValidityInterval=1h  \
   --conf spark.task.maxFailures=8  \
   --queue realtime_queue \
   --conf spark.speculation=true \
   --principal user/hostname@domain \
   --keytab /path/to/foo.keytab

實際上,由于幾個錯誤(HDFS-9276, SPARK-11182)必須禁用HDFS緩存。如果沒有,Spark將無法從HDFS上的文件讀取更新的令牌。

spark-submit --master yarn --deploy-mode cluster  \
   --conf spark.yarn.maxAppAttempts=4  \
   --conf spark.yarn.am.attemptFailuresValidityInterval=1h  \
   --conf spark.yarn.max.executor.failures={8 * num_executors}  \
   --conf spark.yarn.executor.failuresValidityInterval=1h  \
   --conf spark.task.maxFailures=8  \
   --queue realtime_queue \
   --conf spark.speculation=true \
   --principal user/hostname@domain \
   --keytab /path/to/foo.keytab \
   --conf spark.hadoop.fs.hdfs.impl.disable.cache=true

Mark Grover指出,這些錯誤只影響在HA模式下配置了NameNodes的HDFS集群。

Logging

訪問Spark應(yīng)用程序日志的最簡單方法是配置Log4j控制臺追加程序,等待應(yīng)用程序終止并使用yarn logs -applicationId [applicationId]命令。不幸的是終止長時間運行的Spark Streaming作業(yè)來訪問日志是不可行的。
我建議安裝和配置Elastic,Logstash和Kibana(ELK套裝)。ELK的安裝和配置是超出了這篇博客的范圍,但請記住記錄以下上下文字段:

YARN application id
YARN container hostname
Executor id (Spark driver is always 000001, Spark executors start from 000002)
YARN attempt (to check how many times Spark driver has been restarted)

Log4j配置使用Logstash特定的appender和布局定義應(yīng)該傳遞給spark-submit命令:

spark-submit --master yarn --deploy-mode cluster  \
   --conf spark.yarn.maxAppAttempts=4  \
   --conf spark.yarn.am.attemptFailuresValidityInterval=1h  \
   --conf spark.yarn.max.executor.failures={8 * num_executors}  \
   --conf spark.yarn.executor.failuresValidityInterval=1h  \
   --conf spark.task.maxFailures=8  \
   --queue realtime_queue \
   --conf spark.speculation=true \
   --principal user/hostname@domain \
   --keytab /path/to/foo.keytab \
   --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
   --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
   --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties  \
   --files /path/to/log4j.properties

最后,Spark Job的Kibana儀表板可能如下所示:


image.png

Monitoring

長時間運行的工作全天候運行,所以了解歷史指標很重要。Spark UI僅在有限數(shù)量的批次中保留統(tǒng)計信息,并且在重新啟動后,所有度量標準都消失了。再次,需要外部工具。我建議安裝Graphite用于收集指標和Grafana來建立儀表板。
首先,Spark需要配置為將指標報告給Graphite,準備metrics.properties文件:

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=some_meaningful_name

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Graceful stop

最后一個難題是如何以優(yōu)雅的方式停止部署在YARN上的Spark Streaming應(yīng)用程序。停止(甚至殺死)YARN應(yīng)用程序的標準方法是使用命令yarn application -kill [applicationId]。這個命令會停止Spark Streaming應(yīng)用程序,但這可能發(fā)生在批處理中。因此,如果該作業(yè)是從Kafka讀取數(shù)據(jù)然后在HDFS上保存處理結(jié)果,并最終提交Kafka偏移量,當作業(yè)在提交偏移之前停止工作時,您應(yīng)該預(yù)見到HDFS會有重復(fù)的數(shù)據(jù)。
解決優(yōu)雅關(guān)機問題的第一個嘗試是在關(guān)閉程序時回調(diào)Spark Streaming Context的停止方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

令人失望的是,由于Spark應(yīng)用程序幾乎立即被殺死,一個退出回調(diào)函數(shù)來不及完成已啟動的批處理任務(wù)。此外,不能保證JVM會調(diào)用shutdown hook。
在撰寫本博客文章時,唯一確認的YARN Spark Streaming應(yīng)用程序的確切方法是通知應(yīng)用程序關(guān)于計劃關(guān)閉,然后以編程方式停止流式傳輸(但不是關(guān)閉掛鉤)。命令yarn application -kill 如果通知應(yīng)用程序在定義的超時后沒有停止,則應(yīng)該僅用作最后手段。

可以使用HDFS上的標記文件(最簡單的方法)或使用驅(qū)動程序上公開的簡單Socket / HTTP端點(復(fù)雜方式)通知應(yīng)用程序。
因為我喜歡KISS原理,下面你可以找到shell腳本偽代碼,用于啟動/停止Spark Streaming應(yīng)用程序使用標記文件:

start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

在Spark Streaming應(yīng)用程序中,后臺線程應(yīng)該監(jiān)視標記文件,當文件消失時停止上下文調(diào)用

streamingContext.stop(stopSparkContext = true, stopGracefully = true).

Summary

可以看到,部署在YARN上的關(guān)鍵任務(wù)Spark Streaming應(yīng)用程序的配置相當復(fù)雜。以上提出的技術(shù),由一些非常聰明的開發(fā)人員經(jīng)過漫長而冗長乏味的迭代學(xué)習。最終,部署在高可用的YARN集群上的長期運行的Spark Streaming應(yīng)用非常穩(wěn)定。

原文地址:http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容