Python海量數(shù)據(jù)處理之_Hadoop&Spark

1. 說明

?前篇介紹了安裝和使用Hadoop,本篇將介紹Hadoop+Spark的安裝配置及如何用Python調(diào)用Spark。
?當(dāng)數(shù)據(jù)以TB,PB計(jì)量時(shí),用單機(jī)處理數(shù)據(jù)變得非常困難,于是使用Hadoop建立計(jì)算集群處理海量數(shù)據(jù),Hadoop分為兩部分,一部分是數(shù)據(jù)存儲(chǔ)HDFS,另一部分是數(shù)據(jù)計(jì)算MapReduce。MapReduce框架將數(shù)據(jù)處理分成map,reduce兩段,使用起來比較麻煩,并且有一些限制,如:數(shù)據(jù)都是流式的,且必須所有Map結(jié)束后才能開始Reduce。我們可以引入Spark加以改進(jìn)。
?Spark的優(yōu)點(diǎn)在于它的中間結(jié)果保存在內(nèi)存中,而非HDFS文件系統(tǒng)中,所以速度很快。用Scala 語言可以像操作本地集合對(duì)象一樣輕松地操作分布式數(shù)據(jù)集。雖然它支持中間結(jié)果保存在內(nèi)存,但集群中的多臺(tái)機(jī)器仍然需要讀寫數(shù)據(jù)集,所以它經(jīng)常與HDFS共同使用。因此,它并非完全替代Hadoop。
?Spark的框架是使用Scala語言編寫的,Spark的開發(fā)可以使用語言有:Scala、R語言、Java、Python。

2. Scala

?Scala是一種類似java的編程語言,使用Scala語言相對(duì)來說代碼量更少,調(diào)用spark更方便,也可以將它和其它程序混用。
?在不安裝scala的情況下,啟動(dòng)hadoop和spark,python的基本例程也可以正常運(yùn)行。但出于進(jìn)一步開發(fā)的需要,最好安裝scala。

(1) 下載scala

?http://www.scala-lang.org/download/
?我下載的是與spark中一致的2.11版本的非源碼tgz包

(2) 安裝

$ cd /home/hadoop #用戶可選擇安裝的文件夾
$ tar xvzf tgz/scala-2.11.12.tgz
$ ln -s scala-2.11.12/ scala

在.bashrc中加入
export PATH=/home/hadoop/scala/bin:$PATH

3. 下載安裝Spark

(1) 下載spark

?http://spark.apache.org/downloads.html
?我下載的版本是:spark-2.2.1-bin-hadoop.2.7.tgz

(2) 安裝spark

$ cd /home/hadoop #用戶可選擇安裝的文件夾
$ tar xvzf spark-2.2.1-bin-hadoop2.7.tgz
$ ln -s spark-2.2.1-bin-hadoop2.7/ spark

在.bashrc中加入
export SPARK_HOME=/home/hadoop/spark
export PATH=$SPARK_HOME/bin:$PATH

(3) 配置文件

?不做配置,pyspark可以在本機(jī)上運(yùn)行,但不能使用集群中其它機(jī)器。配置文件在$SPARK_HOME/conf/目錄下。

i. 配置spark-env.sh

$ cd $SPARK_HOME/conf/
$ cp spark-env.sh.template spark-env.sh
按具體配置填寫內(nèi)容
export SCALA_HOME=/home/hadoop/scala
export JAVA_HOME=/exports/android/jdk/jdk1.8.0_91/
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/

ii. 設(shè)置主從服務(wù)器slave

$ cp slaves.template slaves 
在其中列出從服務(wù)器地址,單機(jī)不用設(shè)

iii. 設(shè)置spark-defaults.conf

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
按具體配置填寫內(nèi)容
spark.master                     spark://master:7077
spark.eventLog.enabled           false
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              1g
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

(4) 啟動(dòng)

?運(yùn)行spark之前,需要運(yùn)行hadoop,具體見之前的Hadoop文檔

$ $SPARK_HOME/sbin/start-all.sh

?該腳本啟動(dòng)了所有master和workers,在本機(jī)用jps查看,增加了Worker和Master,

4. 命令行調(diào)用

?下面我們來看看從程序?qū)用嫒绾问褂肧park

(1) 準(zhǔn)備工作

?在使用相對(duì)路徑時(shí),系統(tǒng)默認(rèn)是從hdfs://localhost:9000/中讀數(shù)據(jù),因此需要先把待處理的本地文件復(fù)制到HDFS上,常用命令見之前的Hadoop有意思。

$ hadoop fs -mkdir -p /usr/hadoop
$ hadoop fs -copyFromLocal README.md /user/hadoop/

(2) Spark命令行

$ pyspark
>>> textFile = spark.read.text("README.md")
>>> textFile.count() # 返回行數(shù)
>>> textFile.first() # 返回第一行
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) # 返回所有含Spark行的數(shù)據(jù)集

5. 程序

(1) 實(shí)現(xiàn)功能

?統(tǒng)計(jì)文件中的詞頻

(2) 代碼

?這里使用了spark自帶的例程 /home/hadoop/spark/examples/src/main/python/wordcount.py,和之前介紹過的hadoop程序一樣,同樣是實(shí)現(xiàn)的針對(duì)key,value的map,reduce,一個(gè)文件就完成了,看起來更簡(jiǎn)徢更靈活,像是hadoop自帶MapReduce的加強(qiáng)版。具體內(nèi)容如下:

from __future__ import print_function

import sys 
from operator import add 

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect() # 收集結(jié)果
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()

(3) 運(yùn)行

?spark-submit命令在$HOME_SPARK/bin目錄下,之前設(shè)置了PATH,可以直接使用

$ spark-submit $SPARK_HOME/examples/src/main/python/wordcount.py /user/hadoop/README.md

?參數(shù)是hdfs中的文件路徑。
?此時(shí)訪問$SPARK_IP:8080端口,可以看到程序PythonWordCount正在hadoop中運(yùn)行。

6. 多臺(tái)機(jī)器上安裝Spark以建立集群

?和hadoop的集群設(shè)置類似,同樣是把整個(gè)spark目錄復(fù)制集群中其它的服務(wù)器上,用slaves文件設(shè)置主從關(guān)系,然后啟動(dòng)$SPARK_HOME/sbin/start-all.sh。正常開啟后可以通過網(wǎng)頁查看狀態(tài):SparkMaster_IP:8080

7. 參考

(1) 官方幫助文檔,具體見其python部分
http://spark.apache.org/docs/latest/quick-start.html
(2) Hadoop2.7.3+Spark2.1.0 完全分布式環(huán)境 搭建全過程
https://www.cnblogs.com/purstar/p/6293605.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容