Spark簡(jiǎn)介
相較于國(guó)內(nèi)外較多的大數(shù)據(jù)處理框架,Spark以基低延時(shí)的出色表現(xiàn),正在成為繼Hadoop的MapReduce之后,新的,最具有影響的大數(shù)據(jù)框架之一。以Spark為核心的生態(tài)圈,最底層為分布式存儲(chǔ)系統(tǒng)HDFS,S3,Hypertable或者其它格式的存儲(chǔ)系統(tǒng)如Hbase。資源管理采用Mesos,YARN等集群資源管理模式,或者spark自帶的獨(dú)立運(yùn)行模式 。Spark sql提供SQL查詢(xún)服務(wù),性能比Hive快3-50倍。MLIB提供機(jī)器學(xué)習(xí)服務(wù);GraphX提供圖計(jì)算服務(wù);Spark Streaming將流式計(jì)算分解成一系列短小的批處理(Micro Bath)計(jì)算。

好處:資源利用率高。多種框架共享資源,使用均衡。實(shí)現(xiàn)數(shù)據(jù)共享。多種框架共享數(shù)據(jù)和硬件資源,減少數(shù)據(jù)分散帶來(lái)的成本。有效降低運(yùn)維和管理成本:共享模式只需要少量的維護(hù)人員.
Spark已經(jīng)成為整合以大數(shù)據(jù)應(yīng)用的標(biāo)準(zhǔn)平臺(tái):交互式查詢(xún),包括SQL;實(shí)時(shí)流處理;復(fù)雜的分析,包括機(jī)器學(xué)習(xí),圖計(jì)算;批處理。
Spark特點(diǎn):
快速:Spark有先進(jìn)的DAG執(zhí)行引擎,支持循環(huán)數(shù)據(jù)流和內(nèi)存計(jì)算;Spark程序在內(nèi)存中的運(yùn)行速度 是Hadoop MapReduce運(yùn)行速度的100倍,在磁盤(pán)上的運(yùn)行速度是Hadoop MapReduce運(yùn)行速度的10倍。
易用:Spark支持Java,Scala,Python語(yǔ)言。
通用:Spark可以與SQL,Streaming以及復(fù)雜的分析良好結(jié)合。
有效集成Hadoop:Spark可以指定Hadoop,Yarn的版本來(lái)編譯出合適的發(fā)行版本,Spark也能很容易地運(yùn)行在EC2,Mesos上,或以Standalong模式運(yùn)行,并從HDFS,HBase,Cassandra 和其他Hadoop數(shù)據(jù)源讀取數(shù)據(jù)。
Spark應(yīng)用場(chǎng)景:
1.快速查詢(xún)系統(tǒng),基于日志數(shù)據(jù)的快速查詢(xún)系統(tǒng)業(yè)務(wù)構(gòu)建于Spark之上,利用其快速查詢(xún)以及內(nèi)存表等優(yōu)勢(shì),能夠承擔(dān)大部分日志數(shù)據(jù)的即時(shí)查詢(xún)工作;在性能方面,普遍比Hive快2-10倍,如果使用內(nèi)存表的功能,性能將會(huì)比hive快百倍。
2.實(shí)時(shí)日志采集處理,通過(guò)Spark Streaming實(shí)時(shí)進(jìn)行業(yè)務(wù)日志采集,快速迭代處理,并進(jìn)行綜合分析,能夠滿(mǎn)足線(xiàn)上系統(tǒng)分析要求。
3.業(yè)務(wù)推薦系統(tǒng),使用spark將業(yè)務(wù)推薦系統(tǒng)的小時(shí)和天級(jí)別的模型訓(xùn)練轉(zhuǎn)變?yōu)榉昼娂?jí)別的模型訓(xùn)練,有效優(yōu)化相關(guān)排名,個(gè)性化推薦以及熱點(diǎn)點(diǎn)擊分析等 。
4.定制廣告系統(tǒng),在定制廣告業(yè)務(wù)方面需要大數(shù)據(jù)做應(yīng)用分析,效果分析,定向優(yōu)化等,借助spark快速迭代的優(yōu)勢(shì),實(shí)現(xiàn)了在“數(shù)據(jù)實(shí)時(shí)采集,算法實(shí)時(shí)訓(xùn)練,系統(tǒng)實(shí)時(shí)預(yù)測(cè)”的全流程實(shí)時(shí)并行高維算法,支持上億的請(qǐng)求量處理;模擬廣告投放計(jì)算效率高,延時(shí)小,同MapReduce相比延時(shí)至少降低一個(gè)數(shù)量級(jí)。
5.用戶(hù)圖計(jì)算,利用Graphx解決了許多生產(chǎn)問(wèn)題,包括以下計(jì)算場(chǎng)景;基于度分布的中樞節(jié)點(diǎn)發(fā)現(xiàn),基于最大連通圖的社區(qū)發(fā)現(xiàn),基于三角形計(jì)數(shù)的關(guān)系衡量,基于隨機(jī)游走的用戶(hù)屬性傳播等。
Spark SQL是spark的一個(gè)處理結(jié)構(gòu)化數(shù)據(jù)的模塊,提供一個(gè)DataFrame編程抽象。它可以看作是一個(gè)分布式SQL查詢(xún)引擎,主要由Catalyst優(yōu)化,Spark SQL內(nèi)核,Hive支持三部分組成。
從1.3開(kāi)始在原有SchemaRDD的基礎(chǔ)上提供了與R風(fēng)格類(lèi)似的DataFrame API.
DataFrame是以指定列(named columns)組織的分布式數(shù)據(jù)集合,在Spark SQL中相當(dāng)于關(guān)系數(shù)據(jù)庫(kù)的一個(gè)表,或R/Python的一個(gè)數(shù)據(jù)框架,但后臺(tái)更加優(yōu)化。 DataFrame 支持多種數(shù)據(jù)源構(gòu)建,包括:結(jié)構(gòu)化數(shù)據(jù)文件(Parquet,JSON)加載,HIVE表讀取,外部數(shù)據(jù)庫(kù)讀取,現(xiàn)有RDD轉(zhuǎn)化,以及SQLContext運(yùn)行sql查詢(xún)結(jié)果創(chuàng)建DataFrame.

Spark Streaming:
Spark Streaming屬于核心Spark Api的擴(kuò)展,它支持高吞吐量和容錯(cuò)的實(shí)時(shí)流數(shù)據(jù)處理,它可以接受來(lái)自kafka,flume,twitter,zeroMQ或Tcp Socket的數(shù)據(jù)源,使用復(fù)雜的算法表達(dá)和高級(jí)功能來(lái)進(jìn)行處理,如Map,Reduce,Join, window等,處理的結(jié)果數(shù)據(jù)能夠存入文件系統(tǒng),數(shù)據(jù)庫(kù)。

Saprk部署和運(yùn)行
本地部署模式,獨(dú)立模式部署,YARN模式部署,以及基于各種模式的應(yīng)用程序運(yùn)行。
Spark下載路徑:http://spark.apache.org/downloads.html
安裝JAVA:
下載并安裝好java后,進(jìn)行配置:在/etc/profile文件中增加變量
sudo vim /etc/profile
export JAVA_HOME=$YOUR_JAVA_HOME# //實(shí)際安裝路徑
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
如果想立即生效,可以通過(guò)運(yùn)行# source /etc/profile,否則只能在下次用戶(hù)重新登錄加載環(huán)境變量時(shí)生效。運(yùn)行 java -version測(cè)試。
關(guān)于JDK環(huán)境變量配置一般包括4種方式:
1.在用戶(hù)環(huán)境變量文件 /etc/profile文件中添加變量,需要具有root權(quán)限才能進(jìn)行配置,對(duì)linux下所有用戶(hù)長(zhǎng)期有效。
2.在用戶(hù)目錄下的.profile文件中增加變量,對(duì)當(dāng)前用戶(hù)長(zhǎng)期生效 。
3.直接運(yùn)行export命令定義變量,只對(duì)當(dāng)前shell臨時(shí)有效。在shell的命令行下直接使用[export 變量名=變量值]定義變量,該變量只在當(dāng)前的shell或其子shell下是有效的,若shell關(guān)閉,變量則生效,再打開(kāi)新shell時(shí)就沒(méi)有這個(gè)變量,若需要使用,則還需要重新定義 。
4.在系統(tǒng)環(huán)境變量 /etc/environment中進(jìn)行配置。

Spark部署主要包括Local模式部署,Standalone模式部署,YARN模式部署,Mesos模式部署。
Standalone- Spark獨(dú)立部署意味著Spark占據(jù)HDFS(Hadoop分布式文件系統(tǒng))頂部的位置,Spark 框架自身也自帶了完整的資源調(diào)度管理服務(wù),可以獨(dú)立部署到一個(gè)集群中。
Spark on YARN - Spark 可以運(yùn)行于YARN之上,和Hadoop 進(jìn)行統(tǒng)一部署。資源管理和調(diào)度依賴(lài)YARN,分布式存儲(chǔ)依賴(lài)HDFS。
Spark on Mesos - Mesos 是一種資源調(diào)度管理框架,可以為運(yùn)行在它上面的Spark 提供資源調(diào)度服務(wù)。
ssh免密:
ssh-keygen -t rsa // /root/.ssh/id_rsa
cd /root/.ssh
cp id_rsa.pub authorized_keys
cd /usr/local
ssh-copy-id -i node002 //發(fā)送到node002
ssh node002
Spark standalone集群:
/usr/spark/spark3/conf
#mv spark-env.sh.template spark-env.sh
export SPARK_MASTER_IP=node001
export SPARK_MASTER_PORT=7077
export SPARK_EXECUTOR_INSTANCES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=256M
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_CONF_DIR=/usr/spark/spark3/conf
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
export JRE_HOME=${JAVA_HOME}/jre
mv slaves.template slaves
vi slaves //打開(kāi)slaves去掉localhost,增加如下內(nèi)容
node002 #slave1,node001為master
將spark文件夾同步到其他從機(jī)
scp -r /usr/spark/spark3 root@node002:/usr/spark/
啟動(dòng)spark
/usr/spark/spark3/sbin/start-all.sh
打開(kāi)控制臺(tái):

在Standalone模式運(yùn)行Spark應(yīng)用程序
1.spark-shell運(yùn)行應(yīng)用程序
在Spark集群上運(yùn)行應(yīng)用程序,需要將Master的spark://ip:port傳遞給sparkcontext構(gòu)造函數(shù)。
在集群上運(yùn)行交互式的spark命令spark-shell,該命令將會(huì)用spark-env.sh中的SPARK_MASTER_IPD和SPARK_MASTER_PORT自動(dòng)設(shè)置Master.
./bin/spark-shell --master spark://ip:port
2.spark-submit啟動(dòng)應(yīng)用程序
spark-submit向集群提交spark應(yīng)用程序比較直接,對(duì)于獨(dú)立部署模式的集群,spark支持Client部署模式,即在提交應(yīng)用的客戶(hù)端進(jìn)程中部署Driver.
應(yīng)用程序通過(guò)spark-submit啟動(dòng),應(yīng)用程序的jar包將會(huì)自動(dòng)地分配給所有的Worker節(jié)點(diǎn);對(duì)于任何其他運(yùn)行應(yīng)用程序時(shí)需要依賴(lài)的Jar包,可以通過(guò)-jar聲明,jar包名之間用逗號(hào)隔開(kāi)。
./bin/spark-submit --class xxx.XX --master spark://node001:7077 --executor-memory 2G? --total-executor-cores 2 xxxx.xxxx.jar
應(yīng)用程序提交過(guò)程
Spark應(yīng)用程序在集群上以獨(dú)立進(jìn)程集合的形式運(yùn)行,接受用戶(hù)Driver程序中main函數(shù)sparkcontext對(duì)象的協(xié)調(diào)。當(dāng)任務(wù)提交到集群上,SparkContext對(duì)象可以與多種集群管理(Standalone部署,Mesos,YARN模式)連接,這些集群管理器負(fù)責(zé)為所有應(yīng)用分配資源。一旦連接建立,Spark可以在集群的節(jié)點(diǎn)上獲得Executor,這些Executor進(jìn)程負(fù)責(zé)執(zhí)行計(jì)算和為應(yīng)用程序存儲(chǔ)數(shù)據(jù)。

(1)sparkContext向資源管理器注冊(cè)并申請(qǐng)資源
(2)資源管理器根據(jù)預(yù)先設(shè)定的算法,在資源池里分配合適的Executor運(yùn)行資源
(3)應(yīng)用(Main函數(shù)里的算子)構(gòu)建有向無(wú)環(huán)圖
(4)DAGScheduler將圖轉(zhuǎn)換成TaskSet
(5)TaskScheduler負(fù)責(zé)TaskSet的任務(wù)分發(fā)。
Executor之間可以相互通信。
SparkShell是交互控制臺(tái),使用scala程序;./bin/spark-shell --master local[*] //--master用來(lái)設(shè)置context將要連接并使用的資源主節(jié)點(diǎn),master的值是standalone模式的spark集群地址,Mesos,Yarn集群的url.或者是一個(gè)local地址;使用--jars可以添加jar包的路徑,逗號(hào)隔開(kāi)多個(gè)。spark-shell本質(zhì)是在后臺(tái)調(diào)用了spark-submit腳本來(lái)啟動(dòng)應(yīng)用程序。

scala> val textFile = sc.textFile("file:///usr/spark/spark3/README.md")
textFile: org.apache.spark.rdd.RDD[String] = file:///usr/spark/spark3/README.md MapPartitionsRDD[1] at textFile at <console>:24
加載HDFS文件和本地文件都是使用textFile,區(qū)別是添加前綴(hdfs:// 和 file://)進(jìn)行標(biāo)識(shí),從本地文件讀取文件直接返回MapPartitionsRDD,而從HDFS讀取的文件先轉(zhuǎn)成HadoopRDD,然后隱式轉(zhuǎn)換成MapPartitionsRDD.它們都是Spark的彈性分布式數(shù)據(jù)集(RDD)。