PySpark on Yarn的相關依賴的解決方式

問題

Spark on Yarn是將yarn作為ClusterManager的運行模式,Spark會將資源(container)的管理與協調統一交給yarn去處理。

Spark on Yarn分為client/cluster模式:
對于client模式,Spark程序的Driver/SparkContext實例用戶提交機上,該機器可以位于yarn集群之內或之外,只需要起能正常與ResourceManager通信及正確配置HADOOP_CONF_DIR或YARN_CONF_DIR環(huán)境變量指向yarn集群。生產環(huán)境中,通常提交機不會是yarn集群內部的節(jié)點,手握配置權限的情況下,可以按需配置支撐Spark程序需要的軟件、環(huán)境、文件等。
對于cluster模式,Spark程序的Driver/SparkContext實例位于ApplicationMaster(am)中,am作為一個container可以起在yarn集群中任何一個NodeManager上,默認情況下,我們就需要為所有的節(jié)點機器準備好Spark程序需要的所有運行環(huán)境。

Python提供了非常豐富的數學運算、機器學習處理庫——如numpy、pandas、scipy等等。越來越多的同事希望利用這些高效的庫開發(fā)各種算法然后以PySpark程序跑到我們的Spark上。

對于scala/java寫的Spark程序,我們可以將我們所依賴的jar一起與我們的main函數所在的主程序打成一個fat jar,通過spark-submit提交后,這些依賴就會通過Yarn的Distribute Cache分發(fā)到所有節(jié)點支撐運行。
對于python寫的Spark程序如果有外部依賴就很尷尬了,python本身就是兩種語言,在所有NodeManager節(jié)點上安裝你所有需要的依賴對于IT運維人員也是一個非常痛苦的事情。

參考官方文檔

For Python, you can use the --py-files argument of spark-submit to add .py, .zip or .egg
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a .zip or .egg.

--py-files,可以解決部分依賴的問題,但對于有些場景就可能不是很方便,或者不可能實現。

  • 依賴太多,包括傳遞依賴
  • python包在deploy前需要依賴的C代碼提前編譯
  • 基于不同版本的python的pyspark跑在同一個yarn集群上

對于這些問題 ,社區(qū)也有相關的討論,詳細可以看下 這個ticket https://issues.apache.org/jira/browse/SPARK-13587

原理

pyspark原理的資料比較少,可以看下wiki

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

可以看下上面鏈接中的圖,圖中左右分為driver/executor, 圖白色和綠色分python和java,可以看到不管PySpark適宜client還是cluster模式跑在yarn上,driver和executor端都有python的進程起著,這就需要集群中的所有節(jié)點都有相應的python依賴環(huán)境。

方案

從靈活性的角度來講,這里從前輩的討論中總結一下,提供一種在運行時創(chuàng)建python運行及相關依賴的辦法

1、下載并安裝anaconda
https://www.anaconda.com/download/#linux

2、安裝anaconda

sh Anaconda2-5.0.1-Linux-x86_64.sh

2、創(chuàng)建需要的依賴環(huán)境conda create

conda create --name py2env_take1 --quiet  --copy --yes python=2 numpy scipy pandas

打印如下消息

Package plan for installation in environment /home/hadoop/anaconda2/envs/py2env_take1:

The following NEW packages will be INSTALLED:

    ca-certificates: 2017.08.26-h1d4fec5_0
    certifi:         2017.11.5-py27h71e7faf_0
    intel-openmp:    2018.0.0-hc7b2577_8
    libedit:         3.1-heed3624_0
    libffi:          3.2.1-hd88cf55_4
    libgcc-ng:       7.2.0-h7cc24e2_2
    libgfortran-ng:  7.2.0-h9f7466a_2
    libstdcxx-ng:    7.2.0-h7a57d05_2
    mkl:             2018.0.1-h19d6760_4
    ncurses:         6.0-h9df7e31_2
    numpy:           1.13.3-py27hbcc08e0_0
    openssl:         1.0.2m-h26d622b_1
    pandas:          0.21.0-py27he307072_1
    pip:             9.0.1-py27ha730c48_4
    python:          2.7.14-hdd48546_24
    python-dateutil: 2.6.1-py27h4ca5741_1
    pytz:            2017.3-py27h001bace_0
    readline:        7.0-ha6073c6_4
    scipy:           1.0.0-py27hf5f0f52_0
    setuptools:      36.5.0-py27h68b189e_0
    six:             1.11.0-py27h5f960f1_1
    sqlite:          3.20.1-hb898158_2
    tk:              8.6.7-hc745277_3
    wheel:           0.30.0-py27h2bc6bb2_1
    zlib:            1.2.11-ha838bed_2

第一次根據網絡情況下載上述這些依賴,可能會比較久,以后就會快很多。

du -sh ~/anaconda2/envs/py2env_take1/
965M    /home/hadoop/anaconda2/envs/py2env_take1/

可以看到依賴包整個大小還是挺大的,對于一些實時性比較高的場景這種方式其實不太有利,有些不需要的依賴在創(chuàng)建的時候可以不打進去。當然我們還需要zip壓縮一下,可以減小部分網絡開銷。當然如果我們把這個環(huán)境直接提前put到hdfs,也就沒有這個問題了。

zip -r -9 -q py2env_take1.zip ./py2env_take1/
du -sh py2env_take1.zip
345M    py2env_take1.zip

這樣我們就可以通過--archives path/to/py2env_take1.zip#py2env的方式將python環(huán)境上傳并分發(fā)到spark各個進程的working dir。

測試

不會寫python,從spark示例代碼里拷一個出來玩玩

from __future__ import print_function

import numpy as np

from pyspark import SparkContext
# $example on$
from pyspark.mllib.stat import Statistics
# $example off$

if __name__ == "__main__":
    sc = SparkContext(appName="CorrelationsExample")  # SparkContext

    # $example on$
    seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0])  # a series
    # seriesY must have the same number of partitions and cardinality as seriesX
    seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])

    # Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
    # If a method is not specified, Pearson's method will be used by default.
    print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

    data = sc.parallelize(
        [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
    )  # an RDD of Vectors

    # calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
    # If a method is not specified, Pearson's method will be used by default.
    print(Statistics.corr(data, method="pearson"))
    # $example off$

    sc.stop()

take1

bin/spark-submit --master yarn --deploy-mode client --proxy-user hzyaoqin  /home/hadoop/data/apache-spark/spark-2.1.2-bin-hadoop2.7/examples/src/main/python/mllib/correlations_example.py
17/11/24 23:51:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "/home/hadoop/data/apache-spark/spark-2.1.2-bin-hadoop2.7/examples/src/main/python/mllib/correlations_example.py", line 20, in <module>
    import numpy as np
ImportError: No module named numpy

client模式,提示未安裝numpy,失敗

take2

bin/spark-submit --master yarn --deploy-mode cluster --proxy-user hzyaoqin  /home/hadoop/data/apache-spark/spark-2.1.2-bin-hadoop2.7/examples/src/main/python/mllib/correlations_example.py

cluster 模式,失敗
查看am日志

Log Type: stdout
Log Upload Time: Fri Nov 24 23:49:11 +0800 2017
Log Length: 148
Traceback (most recent call last):
  File "correlations_example.py", line 20, in <module>
    import numpy as np
ImportError: No module named numpy

take3

bin/spark-submit --master yarn --deploy-mode cluster --proxy-user hzyaoqin --archives ~/anaconda2/envs/py2env_take2.zip#python2env  --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python2env/py2env_take2/bin/python /home/hadoop/data/apache-spark/spark-2.1.2-bin-hadoop2.7/examples/src/main/python/mllib/correlations_example.py

輸出結果,成功

Log Type: stdout
Log Upload Time: Fri Nov 24 23:47:45 +0800 2017
Log Length: 149
Correlation is: 0.850028676877
[[ 1.          0.97888347  0.99038957]
 [ 0.97888347  1.          0.99774832]
 [ 0.99038957  0.99774832  1.        ]]

測試版本

https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
https://www.apache.org/dyn/closer.lua/spark/spark-2.1.2/spark-2.1.2-bin-hadoop2.7.tgz
https://www.apache.org/dyn/closer.lua/spark/spark-2.0.2/spark-2.0.2-bin-hadoop2.7.tgz

其他版本未測試

結論

1、依靠anaconda 創(chuàng)建python依賴環(huán)境
2、通過--archives 上傳該環(huán)境
3、通過spark.yarn.appMasterEnv.PYSPARK_PYTHON指定python執(zhí)行目錄
4、cluster模式可以,client模式顯式指定PYSPARK_PYTHON,會導致PYSPARK_PYTHON環(huán)境變量不能被spark.yarn.appMasterEnv.PYSPARK_PYTHON overwrite
5、如果executor端也有numpy等依賴,應該要指定spark.executorEnv.PYSPARK_PYTHON(I guess)
6、改日試下anaconda3 創(chuàng)建python3的隔離環(huán)境試下。

參考

https://github.com/massmutual/sample-pyspark-application

其他

關于Client模式下的問題,提了個PR,歡迎討論
https://github.com/apache/spark/pull/19840

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容