最近需要在Windows上配置python 開發(fā) Spark應(yīng)用,在此做一個總結(jié)筆記。
Spark 簡介
Spark的介紹及運行環(huán)境要求,引自 Spark 官方文檔
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It’s easy to run locally on one machine — all you need is to have java installed on your systemPATH, or the JAVA_HOME environment variable pointing to a Java installation.
Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark 2.1.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).
下載Spark
從官方網(wǎng)站下載tar包 http://spark.apache.org/downloads.html

Spark 由 Scala語言開發(fā),而Scala也是運行于JVM之上,因此也可以看作是跨平臺的,所以在下載 spark-2.1.0-bin-hadoop2.7.tgz 之后, 在Windows 平臺直接解壓即可。
在Spark 的 sbin 目錄下,并沒有提供Spark作為 Master 啟動腳本,所以在windows下,我們借助spark-shell, pyspark等方式啟動并調(diào)用Spark。
配置環(huán)境變量
假設(shè) spark-2.1.0-bin-hadoop2.7.tgz 已被解壓至E:\Spark, 接下來需要配置環(huán)境變量。 對于python開發(fā)來說,有三個環(huán)境變量至關(guān)重要。 開始-> 計算機, 右鍵點擊 -> 屬性->高級系統(tǒng)設(shè)置->環(huán)境變量,在系統(tǒng)環(huán)境變量中添加環(huán)境變量 SPARK_HOME和PYTHONPATH,并將Spark\bin加入到系統(tǒng)Path變量中。
SPARK_HOME=E:\Spark
Path=%SPARK_HOME%\bin;%Path%
PYTHONPATH=%SPARK_HOME%\Python;%SPARK_HOME%\Python\lib\py4j-0.10.4-src.zip
切記,如果在windows下已經(jīng)通過cmd打開命令窗口,則需要退出命令窗口再重新打開,以上設(shè)置的環(huán)境變量才會生效。
Python Spark入門示例
在Spark的安裝包,提供了經(jīng)典的入門示例程序,通過這些示例程序演示了基本的Spark開發(fā)和API調(diào)用過程。
1. Word Count
統(tǒng)計文本中某一單詞的重復(fù)次數(shù),是在技術(shù)面試中,特別是考察編程能力經(jīng)常遇到的面試題,網(wǎng)絡(luò)中也有各種語言的解題代碼。在Spark的示例中,通過非常精悍的代碼展示了Spark的強大。
from __future__ import print_function
import sys
from operator import add
# SparkSession:是一個對Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調(diào)用Dataset和DataFrame API
# SparkSession可用于創(chuàng)建DataFrame,將DataFrame注冊為表,在表上執(zhí)行SQL,緩存表和讀取parquet文件。
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Python 常用的簡單參數(shù)傳入
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
# appName 為 Spark 應(yīng)用設(shè)定一個應(yīng)用名,改名會顯示在 Spark Web UI 上
# 假如SparkSession 已經(jīng)存在就取得已存在的SparkSession,否則創(chuàng)建一個新的。
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
# 讀取傳入的文件內(nèi)容,并寫入一個新的RDD實例lines中,此條語句所做工作有些多,不適合初學者,可以截成兩條語句以便理解。
# map是一種轉(zhuǎn)換函數(shù),將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素。原始RDD中的數(shù)據(jù)項與新RDD中的數(shù)據(jù)項是一一對應(yīng)的關(guān)系。
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
# flatMap與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結(jié)果”扁平化“后輸出
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
# collect() 在驅(qū)動程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回。 這在返回足夠小的數(shù)據(jù)子集的過濾器或其他操作之后通常是有用的。由于collect 是將整個RDD匯聚到一臺機子上,所以通常需要預(yù)估返回數(shù)據(jù)集的大小以免溢出。
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
測試數(shù)據(jù), 可以拷貝下面的文字存入一個文本文件,比如a.txt
These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.
執(zhí)行測試結(jié)果

概念介紹
RDD:彈性分布式數(shù)據(jù)集,是一種特殊集合 ? 支持多種來源 ? 有容錯機制 ? 可以被緩存 ? 支持并行操作
RDD有兩種操作算子:
- Transformation(轉(zhuǎn)換):Transformation屬于延遲計算,當一個RDD轉(zhuǎn)換成另一個RDD時并沒有立即進行轉(zhuǎn)換,僅僅是記住了數(shù)據(jù)集的邏輯操作
- Ation(執(zhí)行):觸發(fā)Spark作業(yè)的運行,真正觸發(fā)轉(zhuǎn)換算子的計算
常見執(zhí)行錯誤
初次執(zhí)行Python Spark可能會遇到類似錯誤提示

之所以有上面提示的內(nèi)容,主要包含兩部分配置問題
1. 日志輸出
Spark在執(zhí)行過程中,很多INFO日志消息都會打印到屏幕,方便執(zhí)行者獲得更多的內(nèi)部細節(jié)。開發(fā)者可以根據(jù)需要設(shè)置$SPARK_HOME/conf下的log4j。在 $SPARK_HOME/conf 下 已經(jīng)預(yù)先存放了一份模版log4j.properties.template文件,開發(fā)者可以拷貝出一份 log4j.properties, 并設(shè)置成WARN
將log4j.properties.template 中的 INFO
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
修改為 WARN, 存入log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
2. Could not locate executable null\bin\winutils.exe
首先,下載 winutils.exe,并保存至 c:\hadoop\bin
https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe
其次,設(shè)置HADOOP_HOME環(huán)境變量,指向 c:\hadoop, 并將HADOOP_HOME加到系統(tǒng)變量PATH中
set HADOOP_HOME=c:\hadoop
set PATH=%HADOOP_HOME%\bin;%PATH%
參考:
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://spark.apache.org/docs/latest/programming-guide.html#transformations