Spark簡介
什么是Spark?
Apache Spark是由加州伯克利分校AMP實驗室開發(fā)的通用大數(shù)據(jù)處理框架。Spark提供了大數(shù)據(jù)處理的一站式解決方案,以Spark Core為基礎(chǔ)推出了Spark SQL、Spark Streaming、MLlib、GraphX、SparkR等組件。整個Spark生態(tài)體系稱為BDAS,即:伯克利數(shù)據(jù)分析棧。
Spark特點
Spark具有運行速度快、易用性好、通用型強(qiáng)和隨處運行的特點。
運行速度快(Speed)
- 如果Spark基于內(nèi)存讀取,速度是Hadoop的100倍;使用磁盤讀取,也是Hadoop的十倍。spark之所以能夠比Hadoop快,有兩點主要原因:基于內(nèi)存計算和引入DAG執(zhí)行引擎。
- Spark在迭代計算過程中數(shù)據(jù)默認(rèn)是保存在內(nèi)存中,后續(xù)計算直接讀取內(nèi)存中的結(jié)果即可。而Hadoop每一步計算都是直接將結(jié)果存儲在磁盤中,后續(xù)的計算從磁盤重新讀取上次計算結(jié)果?;趦?nèi)存讀取數(shù)據(jù)的速度比磁盤讀取的速度高出兩個數(shù)量級。
-
Spark在實際執(zhí)行任務(wù)前,將計算步驟根據(jù)依賴關(guān)系形成DAG圖(有向無環(huán)圖),在執(zhí)行過程中就會根據(jù)DAG圖的順序來執(zhí)行,這個過程還會對DAG進(jìn)行計算路徑的優(yōu)化,大大減少了I/O讀取操作。而Hadoop需要手動或者借助Oozie等工具來處理這些步驟之間的關(guān)系。
speed
易用性好(Ease of Use)
Spark支持Scala、Java、Python、R語言編寫應(yīng)用程序。Scala本身是一種高效、可擴(kuò)展的語言,能夠使用簡介的代碼處理較為復(fù)雜的工作。比如下面的WordCount,使用Hadoop需要幾十行代碼,而使用Scala只需要兩行。

通用型強(qiáng)(generality)
Spark提供了一站式的大數(shù)據(jù)解決方案,生態(tài)圈BADS包含了:提供內(nèi)存計算框架的Spark Core、用于結(jié)構(gòu)化查詢的Spark SQL、用于實時計算的Spark Streaming、用于機(jī)器學(xué)習(xí)的MLlib、用于圖計算的GraphX和用于數(shù)學(xué)計算的SparkR。所以針對大數(shù)據(jù)處理的任何一場景,Spark都為你提供了相應(yīng)的組件。

隨處運行(Runs Everywhere)
Spark提供了本地Local運行模式,用來學(xué)習(xí)和測試(當(dāng)然還有許多用途,比如我們正在做的一個項目就是基于Local模式的),對于集群部署模式,Spark能夠以YARN、Mesos和自身提供的Standalone作為資源管理調(diào)度框架來執(zhí)行作業(yè)。對于數(shù)據(jù)源,Spark能夠讀取HDFS、Cassandra、HBase、S3、Alluxio等數(shù)據(jù)源數(shù)據(jù)。

Spark歷史發(fā)展
Spark在2009年開始編寫,在隨后的四年里在伯克利AMP實驗室逐漸形成了現(xiàn)有Spark的雛形。在2013年6月成為Apache孵化項目,8個月后成為Apache頂級項目,隨后進(jìn)入了快速發(fā)展。在2014年5月發(fā)布了1.0.0正式版本,在隨后的時間里3到4個月發(fā)布一個小版本,在2016年7月推出了Spark2.0版本,到現(xiàn)在Spark穩(wěn)定版為spark 2.4.0。

Spark生態(tài)系統(tǒng)
Spark BDAS以Spark Core分布式計算引擎為核心,在Spark Core之上擴(kuò)展了用于結(jié)構(gòu)化查詢的Spark SQL、用于實時數(shù)據(jù)處理的Spark Streaming、用于機(jī)器學(xué)習(xí)的MLlib、用于圖計算的GraphX和用于統(tǒng)計分析的SparkR等。

Spark Core
Spark Core是整個Spark生態(tài)體系的核心,是一個基于內(nèi)存的分布式計算引擎。Spark Core中定義了彈性分布式數(shù)據(jù)集RDD、資源管理、任務(wù)調(diào)度、任務(wù)執(zhí)行、內(nèi)存計算等核心功能。其中資源管理既可以使用Spark自身提供的Standalone資源管理器,也可以使用第三方資源調(diào)度框架YARN、Mesos、Kubernetes等。相比較而言,第三方資源調(diào)度框架提供了更細(xì)粒度的資源管理。
Spark SQL
Spark SQL的前身是Shark,當(dāng)時Hive幾乎是SQL on Hadoop(將SQL翻譯成MapReduce作業(yè))的唯一選擇,鑒于Hive的性能以及Spark的兼容性,由此Shark而生。Shark就是Hive on Spark,將Hive的HQL翻譯成Spark上的RDD,然后再通過Hive的Metadata(實際為HDFS上的數(shù)據(jù)和文件)獲取數(shù)據(jù)庫信息,最后交由Spark運算。
在2014年7月1日SparkSubmit峰會上,Databricks宣布終止Shark的開發(fā),轉(zhuǎn)而開發(fā)自己的Spark SQL。因為Shark本身是對Hive的改造,只是替換了Hive的物理執(zhí)行引擎,使之能夠快速運行。但是,Shark繼承了大量的Hive代碼,因此對于優(yōu)化和維護(hù)增加了成本,并且Hive本身基于MapReduce設(shè)計的,而這部分設(shè)計成為了整個項目的瓶頸。
Spark SQL能夠使SQL查詢和Spark編程(RDD、Dataset、DataFrame)無縫混合,并且提供了統(tǒng)一訪問外部數(shù)據(jù)源的方式,包括:Hive、Avro、Parquet、JSON、ORC和JDBC等。對于Hive的集成,Spark SQL本身支持HiveQL語法以及Hive SerDes和UDF,允許你訪問已有的Hive倉庫。
Spark SQL特點:
- 引入了新的RDD類型:SchemaRDD。這樣就可以像傳統(tǒng)數(shù)據(jù)庫一樣來定義SchemaRDD。SchemaRDD由定義了列數(shù)據(jù)類型的行對象組成,SchemRDD既可以從已有的RDD轉(zhuǎn)換而來,還可以讀取從Parquet文件讀入,還可以同Hive中獲取。
- 內(nèi)嵌了Catalyst查詢優(yōu)化器。首先會把SQL語句解析成邏輯執(zhí)行計劃,然后利用Catalyst提供的一些類和接口,執(zhí)行一些簡單的執(zhí)行計劃優(yōu)化,最后變成RDD的計算。
- 混合不同的數(shù)據(jù)源計算。比如可以將從Hive中獲取的數(shù)據(jù),和從Parquet上獲取的數(shù)據(jù)進(jìn)行Join操作。
Spark SQL在性能上也比Shark有很大提升,Spark SQL主要做了以下幾點優(yōu)化:
- 內(nèi)存列存儲(In-Memory Columnar Storage):Spark SQL的表數(shù)據(jù)在內(nèi)存中存儲不是使用原生態(tài)的JVM對象存儲方式,而是采用內(nèi)存列存儲。
- 字節(jié)碼生成技術(shù)(Bytecode Generation):Spark 1.1.0在Catalyst模塊的Expressions增加了Codegen模塊,使用動態(tài)字節(jié)碼生成技術(shù),對匹配的表達(dá)式采用特定的代碼動態(tài)編譯。另外對SQL表達(dá)式的也做了GC優(yōu)化。
- Scala代碼優(yōu)化:Spark SQL在使用Scala編寫代碼的時候,盡量避免了一些低效、容易GC的代碼,雖然增加了編寫代碼難度,但對于用戶來說接口統(tǒng)一。

Spark SQL還對商業(yè)智能工具(BI)提供了JDBC和ODBC的標(biāo)準(zhǔn)支持。

Spark Streaming
Spark Streaming可以輕松構(gòu)建高吞吐、可擴(kuò)展、高容錯的流式應(yīng)用程序。Spark Streaming可以支持多種數(shù)據(jù)源進(jìn)行類似Map、Reduce、Join等復(fù)雜操作,然后將結(jié)果保存到外部系統(tǒng)、數(shù)據(jù)庫或?qū)崟r儀表盤上。

MLlib
MLlib是Apache Spark提供的可擴(kuò)展機(jī)器學(xué)習(xí)庫,提供了包括分類、聚類、回歸、協(xié)同過濾等算法,同時也包括了相關(guān)測試和數(shù)據(jù)生成器。
GraphX
GraphX是用來操作圖(比如社交網(wǎng)絡(luò)的朋友圈)的程序庫,可以進(jìn)行并行的圖計算。
其他
Spark生態(tài)體系還有許多框架比如SparkR、BlinkDB、Alluxio等,這些在之后的學(xué)習(xí)中可以在做總結(jié)分享。
Spark集群搭建
Spark部署可以分為Local、standalone、YARN、Mesos、Kubernetes(實驗階段)這幾種部署模式。其中Local為本地部署模式,在單個機(jī)器上部署(也支持偽分布式),用來進(jìn)行實驗、測試等。其他幾種部署模式都屬于集群部署模式,standalone部署模式使用spark自帶的資源管理器進(jìn)行資源分配,YARN、Mesos、Kubernetes等使用的是第三方資源管理框架。
這里講解是使用spark自帶的standalone部署模式部署,我們先來看下spark集群相關(guān)的概述。
集群模式概述
Spark應(yīng)用程序作為獨立的進(jìn)程在集群上運行,由主程序(main)的SparkContext來協(xié)調(diào)應(yīng)用程序和集群。SparkContext能夠連接多種集群管理器(standalone、YARN、Mesos等),一旦連接上集群管理器,Spark就能夠獲取集群節(jié)點中的Executor。Executor是用來計算和存儲應(yīng)用程序的進(jìn)程,由集群中的Worker節(jié)點啟動。當(dāng)?shù)玫皆搼?yīng)用程序的Executor后,就會發(fā)送應(yīng)用程序代碼到Executor上,最后SparkContext向executor發(fā)送需要運行的task。

上面是集群應(yīng)用程序運行的簡單架構(gòu),對于這架構(gòu)需要有幾點說明:
- 每個應(yīng)用程序都有自己的Executor,每個Executor使用多個線程執(zhí)行task。這樣做有利于在調(diào)度端(每個驅(qū)動程序調(diào)度自己的task)和執(zhí)行端(不同應(yīng)用程序的任務(wù)運行在不同的JVM中)相互隔離應(yīng)用程序。這樣做也導(dǎo)致不同應(yīng)用程序之間不能共享數(shù)據(jù),但是你可以使用外部存儲系統(tǒng)來達(dá)到這個目的。
- spark對于底層的集群管理器是未知的,無論使用什么集群管理器,spark只要獲取到Executor就能與其通信。這樣集群管理器作為spark的一個插件,可以根據(jù)自己的需求使用不同的集群管理器。
- 應(yīng)用程序提交雖然可以使用客戶端和服務(wù)器模式,但是無論什么模式都需要確保執(zhí)行應(yīng)用程序的節(jié)點能夠與其它work節(jié)點通信,因為驅(qū)動任務(wù)調(diào)度器需要和executor通信。
Spark目前支持的集群管理器有:Standalone、Apache Mesos 、Hadoop YARN 、Kubernetes (experimental) 。
關(guān)鍵術(shù)語描述
| 術(shù)語 | 描述 |
|---|---|
| Application | 構(gòu)建在spark上的用戶程序,由集群上的驅(qū)動程序(driver program)和執(zhí)行器(executor)組成。 |
| Application jar | 包含spark應(yīng)用程序的jar包,應(yīng)用程序如果依賴其它應(yīng)用,需要將這些依賴也打到這個jar包中。注意,spark和hadoop相關(guān)依賴不需要打進(jìn)來,在運行時spark會自動添加。 |
| Driver program | 運行應(yīng)用程序的main()函數(shù)和創(chuàng)建SparkContext的進(jìn)程。 |
| Cluster manager | 用于獲取資源的外部服務(wù),可以是standalone、YARN、Mesos等。 |
| Deploy model | 驅(qū)動程序運行的位置。cluster模式下驅(qū)動程序在集群內(nèi)部的節(jié)點執(zhí)行,client模式下驅(qū)動程序在用戶提交的集群外部節(jié)點上運行。 |
| Work node | 集群中運行應(yīng)用程序代碼的節(jié)點。 |
| Executor | 為執(zhí)行應(yīng)用程序在Work節(jié)點上啟動的進(jìn)程,用于運行task并將數(shù)據(jù)保存到磁盤或內(nèi)存中。每個應(yīng)用程序都有自己的executors。 |
| Task | 一個任務(wù)工作單元,發(fā)送給executor執(zhí)行。 |
| Job | 由Spark執(zhí)行action RDD產(chǎn)生,由多個task組成用于并行計算。 |
| Stage | 每個Job由DAGScheduler劃分成多個Stage,每個Stage包含一組task。 |
standalone集群安裝
安裝standalone模式,只需要在集群中的每個節(jié)點安裝Spark的編譯版本,你可以直接下載預(yù)編譯版本http://spark.apache.org/downloads.html,也可以下載源碼自己構(gòu)建http://spark.apache.org/docs/latest/building-spark.html。
部署環(huán)境
集群節(jié)點
| 節(jié)點 | 角色 |
|---|---|
| 192.168.0.1 | Master、Worker |
| 192.168.0.2 | Work |
| 192.168.0.3 | Work |
軟件版本
| 軟件 | 版本 |
|---|---|
| JDK | 1.8+ |
| Scala | 2.11+ |
| Spark | 2.4.0 |
spark下載
Spark預(yù)編譯版本和源代碼都可以從http://spark.apache.org/downloads.html 進(jìn)行下載,根據(jù)自己的需求選擇相應(yīng)版本,如果需要靈活控制對應(yīng)的hadoop、hive版本可以自己編譯源代碼。
wget http://mirror.bit.edu.cn/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz
編輯slave列表文件
在${SPARK_HOME}/conf目錄創(chuàng)建 slaves文件(目錄下已經(jīng)有了slaves.template,可以直接復(fù)制或重命名),并且將需要運行Work進(jìn)程節(jié)點的主機(jī)名稱(或IP)添加到給文件中。
cp conf/slaves.template conf/slaves
vim conf/slaves
#添加運行Work服務(wù)的主機(jī)
192.168.0.1
192.168.0.2
192.168.0.3
配置集群
你可以對集群進(jìn)一步配置,spark集群配置文件為conf/spark-env.sh,默認(rèn)conf目錄下有一個spark-env.sh.template,你可以直接復(fù)制或重命名。
cp conf/spark-env.sh.template conf/spark-env.sh
vim spark-env.sh
#添加集群配置屬性
JAVA_HOME=${JAVA_HOME} #指定jdk安裝路徑
SPARK_MASTER_HOST=192.168.0.1 #指定master節(jié)點主機(jī)名稱或IP
SPARK_MASTER_PORT=7077 #master后臺通信端口,默認(rèn)為7077
SPARK_MASTER_WEBUI_PORT=8080 #master web ui端口,默認(rèn)為8080
SPARK_MASTER_OPTS= #應(yīng)用于master的配置屬性,可配置屬性查看下面列表,使用-Dx=y模式配置,默認(rèn)為none
SPARK_LOCAL_DIR=/opt/spark/work-data #spark臨時目錄,包含map輸出文件和存儲在磁盤上的RDD。該目錄應(yīng)該在本地快速的磁盤上,可以用逗號分割指定多個目錄。默認(rèn)為${SPARK_HOME}/work
SPARK_WORKER_CORES=2 #允許spark應(yīng)用程序使用本地機(jī)器的cpu數(shù),默認(rèn)為全部可用cpu數(shù)
SPARK_WORKER_MEMORY=1G #允許spark應(yīng)用程序使用本地機(jī)器的內(nèi)存數(shù),默認(rèn)全部最少為1G。如果要為應(yīng)用程序指定內(nèi)存,使用spark.executor.memory屬性。
SPARK_WORKER_PORT=7078 #work后臺通信端口,默認(rèn)隨機(jī)
SPARK_WORKER_WEBUI_PORT=8081 #work web ui端口,默認(rèn)為8081
SPARK_WORKER_DIR=/opt/spark/work-data #用于運行應(yīng)用程序的目錄,里面存儲日志和臨時空間使用。默認(rèn)為${SPARK_HOME}/work
SPARK_WORKER_OPTS= #應(yīng)用于work的配置屬性,可配置屬性查看下面列表,使用-Dx=y模式配置,默認(rèn)為none
SPARK_DAEMON_MEMORY=1G #分配給spark master和worker自身運行的內(nèi)存。默認(rèn)為1g
SPARK_DAEMON_JAVA_OPTS= #spark master和worker自身使用的JVM參數(shù),使用-Dx=y模式配置
SPARK_MASTER_OPTS可配置屬性:
| 屬性 | 默認(rèn)值 | 含義 |
|---|---|---|
| spark.deploy.retainedApplications | 200 | 顯示完成應(yīng)用程序的最大數(shù)量,超過該設(shè)置會把舊的應(yīng)用程序會從ui中刪除 |
| spark.deploy.retainedDrivers | 200 | 顯示完成驅(qū)動程序的最大數(shù)量,超過該設(shè)置會把舊的驅(qū)動程序會從ui中刪除 |
| spark.deploy.spreadOut | true | standalone集群管理是否應(yīng)該跨節(jié)點分布應(yīng)用程序或者說是否將應(yīng)用程序盡可能分布到少量節(jié)點上。對于HDFS中數(shù)據(jù)局部性來說,擴(kuò)展通常更好,但是對于計算密集型的負(fù)載,整合到一起通常更有效。 |
| spark.deploy.defaultCores | 無限 | 如果沒有設(shè)置spark.cores.max,則為應(yīng)用程序提供最大的內(nèi)核數(shù)為該默認(rèn)值。如果都未設(shè)置,應(yīng)用程序會盡可能的獲取所有內(nèi)核。對于共享集群來說,通常設(shè)置一個較低的值,防止用戶默認(rèn)使用全部cpu |
| spark.deploy.maxExecutorRetries | 10 | 限制standalone集群管理器對于失敗的executor個數(shù)達(dá)到配置上限則進(jìn)行刪除,如果不配置,只要有一個executor正在運行,應(yīng)用程序就不會被刪除 |
| spark.worker.timeout | 60 | master在規(guī)定時間內(nèi)沒有接受work心跳則認(rèn)為該work丟失,以s為單位 |
SPARK_WORKER_OPTS可以配置屬性:
| 屬性 | 默認(rèn)值 | 含義 |
|---|---|---|
| spark.worker.cleanup.enabled | false | 啟動定時清理woker/application目錄,只對standalone部署模式有影響。只有應(yīng)用程序停止后,才會定期刪除 |
| spark.worker.cleanup.interval | 1800 (30 minutes) | 定時清除worker下舊的應(yīng)用程序工作目錄,以s為單位 |
| spark.worker.cleanup.appDataTtl | 604800 (7 days, 7 * 24 * 3600) | 每個woker下保留應(yīng)用程序工作目錄的秒數(shù)(跟上一個區(qū)別還不清楚) |
| spark.worker.ui.compressedLogFileLengthCacheSize | 100 | 設(shè)置壓縮日志文件緩存的大小 |
分發(fā)spark安裝
將安裝好的spark分發(fā)到各個slave節(jié)點。
scp -r spark-2.2.0-bin-hadoop2.7 yangjianzhang@192.168.0.2:/home/yangjianzhang/server/spark/
scp -r spark-2.2.0-bin-hadoop2.7 yangjianzhang@192.168.0.3:/home/yangjianzhang/server/spark/
啟動服務(wù)
spark啟動腳本在${SPARK_HOME}/sbin目錄下,包含了master和slave相關(guān)的啟動、停止腳本。如果我們要啟動所有slave(conf/slaves配置的),使用start-slaves.sh腳本。如果啟動本機(jī)slave,則使用start-slave.sh <master-spark-URL>
sbin/start-master.sh #啟動master服務(wù)
sbin/start-slaves.sh #啟動slave服務(wù)
訪問UI頁面
master UI地址為:http://master:8080/
worker UI地址為:http://worker:8081/
