Spark介紹、環(huán)境搭建及運(yùn)行

Apache Spark 簡介

Apache Spark 是什么

Apache Spark是一個分布式計(jì)算框架,旨在簡化運(yùn)行于計(jì)算機(jī)集群上的并行程序的編寫。該框架對資源調(diào)度,任務(wù)的提交、執(zhí)行和跟蹤,節(jié)點(diǎn)間的通信以及數(shù)據(jù)并行處理的內(nèi)在底層操作都進(jìn)行了抽象。它提供了一個更高級別的API用于處理分布式數(shù)據(jù)。下面的引用是Apache Spark自己的說明。

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Apache Spark 起源

Spark起源于加利福利亞大學(xué)伯克利分校的一個研究項(xiàng)目。學(xué)校當(dāng)時關(guān)注分布式機(jī)器學(xué)習(xí)算法的應(yīng)用情況。因此,Spark從一開始便為應(yīng)對迭代式應(yīng)用的高性能需求而設(shè)計(jì)。在這類應(yīng)用中,相同的數(shù)據(jù)會被多次訪問。該設(shè)計(jì)主要靠利用數(shù)據(jù)集內(nèi)存緩存以及啟動任務(wù)時的低延遲和低系統(tǒng)開銷來實(shí)現(xiàn)高性能。再加上其容錯性、靈活的分布式數(shù)據(jù)結(jié)構(gòu)和強(qiáng)大的函數(shù)式編程接口,Spark在各類基于機(jī)器學(xué)習(xí)和迭代分析的大規(guī)模數(shù)據(jù)處理任務(wù)上有廣泛的應(yīng)用,這也表明了其實(shí)用性。

Apache Spark 運(yùn)行模式

Apache Spark共支持四種運(yùn)行模式,每種模式各有其特點(diǎn),為了方便起見,本文基于的運(yùn)行模式為本地單機(jī)模式。

  • 本地單機(jī)模式:所有Spark進(jìn)程都運(yùn)行在同一個Java虛擬機(jī)(Java Vitural Machine,JVM)中
  • 集群單機(jī)模式:使用Spark自己內(nèi)置的任務(wù)調(diào)度框架
  • 基于Mesos:Mesos是一個流行的開源集群計(jì)算框架
  • 基于YARN:即Hadoop 2,它是一個與Hadoop關(guān)聯(lián)的集群計(jì)算和資源調(diào)度框架

Apache Spark 環(huán)境搭建

Spark能通過內(nèi)置的單機(jī)集群調(diào)度器來在本地運(yùn)行。此時,所有的Spark進(jìn)程運(yùn)行在同一個Java虛擬機(jī)中。這實(shí)際上構(gòu)造了一個獨(dú)立、多線程版本的Spark環(huán)境。本地模式很適合程序的原型設(shè)計(jì)、開發(fā)、調(diào)試及測試。同樣,它也適應(yīng)于在單機(jī)上進(jìn)行多核并行計(jì)算的實(shí)際場景。

本地構(gòu)建Spark環(huán)境的第一步是下載其版本包, 本文以spark-1.6.1-bin-hadoop2.4.tgz為例進(jìn)行安裝演示。下載完上述版本包后,解壓,并在終端進(jìn)入解壓時新建的主目錄。Spark的運(yùn)行依賴Scala編程語言,好在預(yù)編譯的二進(jìn)制包中已包含Scala運(yùn)行環(huán)境,我們不需要另外安裝Scala便可運(yùn)行Spark。但是,JRE(Java運(yùn)行時環(huán)境)或JDK(Java開發(fā)套件)是要安裝的。

>tar xfvz spark-1.6.1-bin-hadoop2.4.tgz
>cd spark-1.6.1-bin-hadoop2.4

用戶運(yùn)行Spark的腳本在該目錄的bin目錄下。我們可以運(yùn)行Spark附帶的一個示例程序來測試是否一切正常:

>./bin/run-example org.apache.spark.examples.SparkPi

該命令將在本地單機(jī)模式下執(zhí)行SparkPi這個示例。在該模式下,所有的Spark進(jìn)程均運(yùn)行于同一個JVM中,而并行處理則通過多線程來實(shí)現(xiàn)。默認(rèn)情況下,該示例會啟用與本地系統(tǒng)的CPU核心數(shù)目相同的線程。示例運(yùn)行完,應(yīng)可在輸出的結(jié)尾看到類似如下的提示,

Pi is roughly 3.14248

Apache Spark 基本概念

前一部分介紹了Apache Spark的安裝過程,接下來我們一起體驗(yàn)下在Spark上編程的樂趣。就像之前介紹的,Spark支持多種編程語言,包括Java、Scala、Python 和 R等。接下來首先介紹下Spark的編程模型,然后通過使用這四種不同的語言來演示Spark的編程運(yùn)行過程。

Spark 編程模型

任何Spark程序的編寫都是從SparkContext(或用Java編寫時的JavaSparkContext)開始的,SparkContext的初始化需要一個SparkConf對象,后者包含了Spark集群配置的各種參數(shù)(比如主節(jié)點(diǎn)的URL)。初始化后,我們便可用SparkContext對象所包含的各種方法來創(chuàng)建和操作分布式數(shù)據(jù)集和共享變量。Spark shell(在Scala和Python下可以,但不支持Java)能自動完成上述初始化。

Spark Shell

Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)來進(jìn)行交互式的程序編寫。由于輸入的代碼會被立即計(jì)算,shell能在輸入代碼時給出實(shí)時反饋。在Scala shell里,命令執(zhí)行結(jié)果的值與類型在代碼執(zhí)行完后也會顯示出來。
要想通過Scala來使用Spark shell,只需從Spark的主目錄執(zhí)行./bin/spark-shell。它會啟動Scala shell并初始化一個SparkContext對象。我們可以通過sc這個Scala值來調(diào)用這個對象。
要想在Python shell中使用Spark,直接運(yùn)行./bin/pyspark命令即可。與Scala shell類似, Python下的SparkContext對象可以通過Python變量sc來調(diào)用。

彈性分布式數(shù)據(jù)集(RDD)

上文提到的分布式數(shù)據(jù)集其實(shí)就是指RDD。RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)是Spark的核心概念之一。一個RDD代表一系列的“記錄”(嚴(yán)格來說,某種類型的對象)。這些記錄被分配或分區(qū)到一個集群的多個節(jié)點(diǎn)上(在本地模式下,可以類似地理解為單個進(jìn)程里的多個線程上)。Spark中的RDD具備容錯性,即當(dāng)某個節(jié)點(diǎn)或任務(wù)失敗時(因非用戶代碼錯誤的原因而引起,如硬件故障、網(wǎng)絡(luò)不通等),RDD會在余下的節(jié)點(diǎn)上自動重建,以便任務(wù)能最終完成。

創(chuàng)建RDD后,我們便有了一個可供操作的分布式記錄集。在Spark編程模式下,所有的操作被分為轉(zhuǎn)換(transformation)和執(zhí)行(action)兩種。一般來說,轉(zhuǎn)換操作是對一個數(shù)據(jù)集里的所有記錄執(zhí)行某種函數(shù),從而使記錄發(fā)生改變;而執(zhí)行通常是運(yùn)行某些計(jì)算或聚合操作,并將結(jié)果返回運(yùn)行SparkContext的那個驅(qū)動程序。

Apache Spark 編程入門

下面我們通過依次用Java、Python等種語言來編寫一個簡單的Spark數(shù)據(jù)處理程序。假設(shè)一存在一個名為UserPurchaseHistory.csv的文件,內(nèi)容如下所示。文件的每一行對應(yīng)一條購買記錄,從左到右的各列值依次為客戶名稱、商品名以及商品價格。

John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49

Spark 程序?qū)嵗?Java)

/**
* Created by hackx on 9/11/16.
*/

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/**
* 用Java編寫的一個簡單的Spark應(yīng)用
*/
public class JavaApp {
public static void main(String[] args) {

/*正如在Scala項(xiàng)目中一樣,我們首先需要初始化一個上下文對象。值得注意的是,
這里所使用的是JavaSparkContext類而不是之前的SparkContext。類似地,調(diào)用
JavaSparkContext對象,利用textFile函數(shù)來訪問數(shù)據(jù),然后將各行輸入分割成
多個字段。請注意下面代碼的高亮部分是如何使用匿名類來定義一個分割函數(shù)的。
該函數(shù)確定了如何對各行字符串進(jìn)行分割。*/

JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");

// 將CSV格式的原始數(shù)據(jù)轉(zhuǎn)化為(user,product,price)格式的記錄集
JavaRDD data = sc.textFile("data/UserPurchaseHistory.csv").map(new Function<String, String[]>() {
public String[] call(String s) throws Exception {

return s.split(",");
}
});

/*現(xiàn)在可以算一下用Scala時計(jì)算過的指標(biāo)。這里有兩點(diǎn)值得注意的地方,一是
下面Java API中有些函數(shù)(比如distinct和count)實(shí)際上和在Scala API中
一樣,二是我們定義了一個匿名類并將其傳給map函數(shù)。匿名類的定義方式可參
見代碼的高亮部分。*/

// 求總購買次數(shù)
long numPurchases = data.count();

// 求有多少個不同客戶購買過商品
long uniqueUsers = data.map(new Function<String[], String>() {
public String call(String[] strings) throws Exception {
return strings[0];
}
}).distinct().count();
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
}
}

Spark 程序?qū)嵗?Python)

"""用Python編寫的一個簡單Spark應(yīng)用"""

from pyspark import SparkContext

sc = SparkContext("local[2]", "First Spark App")

# 將CSV格式的原始數(shù)據(jù)轉(zhuǎn)化為(user,product,price)格式的記錄集

data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:

line.split(",")).map(lambda record: (record[0], record[1], record[2]))

# 求總購買次數(shù)

numPurchases = data.count()

# 求有多少不同客戶購買過商品

uniqueUsers = data.map(lambda record: record[0]).distinct().count()

# 求和得出總收入

totalRevenue = data.map(lambda record: float(record[2])).sum()

# 求最暢銷的產(chǎn)品是什么

products = data.map(lambda record: (record[1], 1.0)).

reduceByKey(lambda a, b: a + b).collect()

mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]

print "Total purchases: %d" % numPurchases

print "Unique users: %d" % uniqueUsers

print "Total revenue: %2.2f" % totalRevenue

print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])

運(yùn)行該腳本的最好方法是在腳本目錄下運(yùn)行如下命令:

>$SPARK_HOME/bin/spark-submit pythonapp.py

參考資料

Spark官網(wǎng)
Spark機(jī)器學(xué)習(xí)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容