Spark是什么
a)?是一種通用的大數(shù)據(jù)計算框架
b)?Spark Core?離線計算
Spark SQL?交互式查詢
Spark Streaming?實(shí)時流式計算
Spark MLlib?機(jī)器學(xué)習(xí)
Spark GraphX?圖計算
c)?特點(diǎn):
i.?一站式:一個技術(shù)堆棧解決大數(shù)據(jù)領(lǐng)域的計算問題
ii.?基于內(nèi)存
d)?Spark2009年誕生于伯克利大學(xué)的AMPLab實(shí)驗室
2010年正式開源了Spark項目
2013年Spark成為Apache下的項目
2014年飛速發(fā)展,成為Apache的頂級項目
2015年在國內(nèi)興起,代替mr,hive,storm等
作者:辛湜(shi)
e)?Spark和Hive:
Spark優(yōu)點(diǎn):
i.?速度快
ii.?Spark SQL支持大量不同的數(shù)據(jù)源
f)?Spark?和Storm
i.?計算模型不一樣
ii.?Spark吞吐量大
g)?特點(diǎn):快,易用,通用,兼容性
h)?spark運(yùn)行模式
i.?local(本地)
ii.?standalone(集群)
iii.?on yarn(由?yarn作為資源調(diào)度Spark負(fù)責(zé)任務(wù)調(diào)度和計算)
iv.?on mesos(由mesos作為資源調(diào)度S)
v.?on cloud()
i)?配置步驟
=======================on yarn====================
【說明】
1.?spark任務(wù)運(yùn)行在yarn上,由yarn來進(jìn)行資源調(diào)度和管理,spark只負(fù)責(zé)任務(wù)的調(diào)度?和計算
2.不需要配置和啟動spark集群
3.只需要在提交任務(wù)的節(jié)點(diǎn)上安裝并配置spark on yarn?模式
4.必須找一臺節(jié)點(diǎn)安裝spark
5.?步驟:
i.安裝配置JDK
ii.?vi spark-env.sh
1.?export ?JAVA_HOME=/opt/modules/jdk1.7_6.0
2.?export ?HADOOP_CONF_DIR = /opt/modules/hadoop/etc/hadoop
iii.測試spark on yarn?模式是否安裝成功
iv.網(wǎng)絡(luò)測試:http://hadoop-yarn1.beicai.com:8088
=====================sdandalone模式==============
【說明】
1.??spark運(yùn)行在spark?集群上,由spark進(jìn)行資源調(diào)度管理,同時還負(fù)責(zé)任務(wù)的調(diào)度和?計算
2.需要配置和啟動spark集群
3.?步驟:
i.安裝配置JDK
ii.上傳并解壓Spark
iii.建立軟連接?ln -s spark spark?或者修改名稱
iv.?配置環(huán)境變量
v.安裝配置Spark,修改spark配置文件(spark-env.sh, slaves)
1.?vi spark-env.sh
a)?export ?JAVA_HOME=/opt/modules/jdk(jdk位置)
b)?export SPARK_MASTER_IP=hadoop-yarn1.beicai.com
c)?export SPARK_MASTER_PORT=7077
2.??vi slaves(用于指定在哪些節(jié)點(diǎn)上啟動worker)
a)?hadoop-yarn2.beicai.com
hadoop-yarn3.beicai.com
vi.將spark發(fā)送給其他主機(jī)
vii.?啟動
/opt/modules/spark/bin/start-all.sh
vii.?查看SparkUI界面:http://hadoop-yarn1.beicai.com:8080
4.?
j)?
一、Spark原理
1、Spark的運(yùn)行原理
i、分布式
Ii、主要基于內(nèi)存(少數(shù)情況基于磁盤)
Iii、迭代式計算
2、Spark?計算模式?VS ?MapReduce ?計算模式對比
Mr這種計算模型比較固定,只有兩種階段,map階段和reduce階段,兩個階段結(jié)束?后,任務(wù)就結(jié)束了,這意味著我們的操作很有限,只能在map階段和reduce階段,?也同時意味著可能需要多個mr任務(wù)才能處理完這個job
Spark?是迭代式計算,一個階段結(jié)束后,后面可以有多個階段,直至任務(wù)計算完?成,也就意味著我們可以做很多的操作,這就是Spark計算模型比mr?強(qiáng)大的地方
三、什么是Spark RDD?
1、什么是RDD?
彈性的,分布式的,數(shù)據(jù)集
(RDD在邏輯上可以看出來是代表一個HDFS上的文件,他分為多個分區(qū),散落?在Spark的多個節(jié)點(diǎn)上)
3、RDD----彈性
當(dāng)RDD的某個分區(qū)的數(shù)據(jù)保存到某個節(jié)點(diǎn)上,當(dāng)這個節(jié)點(diǎn)的內(nèi)存有限,保存不了這個?分區(qū)的全部數(shù)據(jù)時,Spark就會有選擇性的將部分?jǐn)?shù)據(jù)保存到硬盤上,例如:當(dāng)worker?的內(nèi)存只能保存20w條數(shù)據(jù)時,但是RDD的這個分區(qū)有30w條數(shù)據(jù),這時候Spark就?會將多余的10w條數(shù)據(jù),保存到硬盤上去。Spark的這種有選擇性的在內(nèi)存和硬盤之間的權(quán)衡機(jī)制就是RDD的彈性特點(diǎn)所在
4、Spark的容錯性
RDD最重要的特性就是,提供了容錯性,可以自動的從失敗的節(jié)點(diǎn)上恢復(fù)過來,即如?果某個節(jié)點(diǎn)上的RDD partition(數(shù)據(jù)),因為節(jié)點(diǎn)的故障丟了,那么RDD會自動的通過?自己的數(shù)據(jù)來源重新計算該partition,這一切對使用者來說是透明的
2、Spark的開發(fā)類型
(1)、核心開發(fā):離線批處理?/?演示性的交互式數(shù)據(jù)處理
(2)、SQL查詢:底層都是RDD和計算操作
(3)、底層都是RDD和計算操作
(4)、機(jī)器學(xué)習(xí)
(5)、圖計算
3、Spark?核心開發(fā)(Spark-core == Spark-RDD)步驟
(1)、創(chuàng)建初始的RDD
(2)、對初始的RDD進(jìn)行轉(zhuǎn)換操作形成新的RDD,然后對新的RDD再進(jìn)行操作,直?至操作計算完成
(3)、將最后的RDD的數(shù)據(jù)保存到某種介質(zhì)中(hive、hdfs,MySQL、hbase...)
五、Spark原理
Driver,Master,Worker,Executor,Task各個節(jié)點(diǎn)之間的聯(lián)系
Spark中的各節(jié)點(diǎn)的作用:
1、driver的作用:
(1)、 向master進(jìn)行任務(wù)的注冊
(2)、構(gòu)建運(yùn)行任務(wù)的基本環(huán)境
(3)、接受該任務(wù)的executor的反向注冊
(4)、向?qū)儆谠撊蝿?wù)的executor分配任務(wù)
2、什么是driver?
我們編寫的程序打成jar包后,然后找一臺能夠連接spark集群的節(jié)點(diǎn)做任務(wù)的driver,具體的表現(xiàn)為SparkSubmit
3、Master的作用:
(1)、監(jiān)控集群;
(2)、動態(tài)感知worker的上下線;
(3)、接受driver端注冊請求;
(4)、任務(wù)資源的調(diào)度
4、Worker的作用:
(1)、定時向master匯報狀態(tài);
(2)、接受master資源調(diào)度命令,進(jìn)行資源的調(diào)度
(3)、啟動任務(wù)的容器Executor
5、Executor的作用:
(1)、保存計算的RDD分區(qū)數(shù)據(jù);
(2)、向Driver反向注冊;
(3)、接受Driver端發(fā)送來的任務(wù)Task,作用在RDD上進(jìn)行執(zhí)行
Spark 編程的流程:
1、我們編寫的程序打包成jar包,然后調(diào)用Spark-Submit?腳本做任務(wù)的提交
2、啟動driver做任務(wù)的初始化
3、Driver會將任務(wù)極其參數(shù)(core,memory,driver相關(guān)的參數(shù))進(jìn)行封裝成ApplicationDescript通過taskSchedulerImpl?提交給Master
4、Master接受到driver端注冊任務(wù)請求時,會將請求參數(shù)進(jìn)行解析,并封裝成APP,然后進(jìn)行持久化,并且加入到其任務(wù)隊列中的waitingAPPs
5、當(dāng)輪到咱們提交的任務(wù)運(yùn)行時,master會調(diào)用schedule()這個方法,做任務(wù)資源調(diào)度
6、Master將調(diào)度好的資源封裝成launchExecutor,發(fā)送給指定的worker
7、Worker接收到發(fā)送來的launchExecutor時,會將其解析并封裝成ExecutorRunner,然后調(diào)用start方法,啟動Executor
8、Executor啟動后,會向任務(wù)的Driver進(jìn)行反向注冊
9、當(dāng)屬于這個任務(wù)的所有executor啟動成功并反向注冊完之后,driver會結(jié)束SparkContext對象的初始化
10、當(dāng)sc?初始化成功后,意味著運(yùn)行任務(wù)的基本環(huán)境已經(jīng)準(zhǔn)備好了,driver會繼續(xù)運(yùn)行我們編寫好的代碼
11、開始注冊初始的RDD,并且不斷的進(jìn)行轉(zhuǎn)換操作,當(dāng)觸發(fā)了一個action算子時,意味著觸發(fā)了一個job,此時driver就會將RDD之間的依賴關(guān)系劃分成一個一個的stage,并將stage封裝成taskset,然后將taskset中的每個task進(jìn)行序列化,封裝成launchtask,發(fā)送給指定的executor執(zhí)行
12、Executor接受到driver發(fā)送過來的任務(wù)task,會對task進(jìn)行反序列化,然后將對應(yīng)的算子(flatmap,map,reduceByKey。。。。)作用在RDD分區(qū)上
六、RDD詳解
1、什么是RDD?
RDD(Resilient Disttibuted Dataset)叫做彈性的分布式的數(shù)據(jù)集,是spark中最基本的數(shù)據(jù)抽象,它代表一個不可變,可分區(qū),里面的元素可并行計算的集合
2、RDD的特點(diǎn):
自動容錯
位置感知性調(diào)度
伸縮性
3、RDD的屬性:
(1)、一組分片(partition),即數(shù)據(jù)集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務(wù)處理,并決定并行計算的粒度,用戶可以在創(chuàng)建RDD時指定RDD的分片個數(shù),如果沒有指定,那么就會采用默認(rèn)值,默認(rèn)值就是程序所分配到的CPU Core的數(shù)目
(2)、一個計算每個分區(qū)的函數(shù)。Spark中RDD的計算是以分片為單位的,每個RDD都會實(shí)現(xiàn)computer函數(shù)以達(dá)到這個目的。Computer函數(shù)會對迭代器進(jìn)行復(fù)合,不需要保存每次計算的結(jié)果。
(3)、RDD之間的依賴關(guān)系。RDD的每次轉(zhuǎn)換都會生成一個新的RDD,所以RDD之間就會形成類似于流水一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時,Spark可以通過這個依賴關(guān)系重新計算丟失的分區(qū)數(shù)據(jù),而不是對RDD的所有分區(qū)進(jìn)行重新計算。
(4)、一個partition,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個是基于hashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于key-value的RDD,才會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了partition RDD Shuffle輸出時的分片數(shù)量。
(5)、一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。對于一個HDFD文件來說。這個列表保存的就是每個Partition所在的快的位置。按照“移動數(shù)據(jù)不如移動計算”的理念。Spark在進(jìn)行任務(wù)調(diào)度的時候,會盡可能的將計算任務(wù)分配到所要處理數(shù)據(jù)塊的存儲位置。
4、RDD的創(chuàng)建:
進(jìn)行Spark核心編程時,首先要做的事就是創(chuàng)建一個初始的RDD。Spark Core提供了三種創(chuàng)建RDD的方式:
(1)、使用程序中的集合創(chuàng)建RDD?(調(diào)用parallelize()方法)
(2)、使用本地文件創(chuàng)建RDD(調(diào)用textFile()方法)
(3)、使用HDFD文件創(chuàng)建RDD ?(調(diào)用textFile()方法)
七、算子
1、什么是算子?
是RDD中定義的作用在每一個RDD分片上的函數(shù),可以對RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換?和操作
2、RDD算子的分類
(1)、Transformation算子,這類算子變換不觸發(fā)提交作業(yè)(特點(diǎn)就是lazy特性)
返回的是一個RDD
(2)、Action算子,這類算子會觸發(fā)SparkContext提交作業(yè)(觸發(fā)一個spark job的運(yùn)行,從而觸發(fā)這個action之前所有的transformation的執(zhí)行)
返回的是一個spark對象
3、常用的Transformation算子
八、RDD分區(qū)排序
I、分區(qū)
兩種實(shí)現(xiàn)方式:coalesce和?repartition(底層調(diào)用coalesce)
coalesce(numPartitons,isShuffle)
第一個參數(shù)是重分區(qū)后的數(shù)量,第二個參數(shù)是是否進(jìn)行shuffle
如果原來有N個分區(qū),重分區(qū)后有M個分區(qū)
如果?M > N ,必須將第二參數(shù)設(shè)置為true(也就是進(jìn)行shuffle),等價于?repartition(numPartitons) ???如果是false將不起作用 ?
如果M < N
100-->10?重分區(qū)后的分區(qū)數(shù)比原來的小的多,那么久需要使用shuffle,也即是設(shè)置為true
100-->90?重分區(qū)后的分區(qū)數(shù)和原來的差不多的,那么就不需要使用shuffle,也就是設(shè)置為false
II、排序
sortBy(x => x)這個算子中帶有隱式轉(zhuǎn)換參數(shù)
x?能夠排序(比較大小),那么這個類就必須有比較大小的功能,也就是實(shí)現(xiàn)了compareTo?或者compare
實(shí)現(xiàn)二次排序有兩種方法:
1、繼承Comparable?接口 或者?Ordered
2、隱式轉(zhuǎn)換:可以定義隱式轉(zhuǎn)換函數(shù)(Ordered)或者隱式轉(zhuǎn)換值(Ordering)
九、自定義分區(qū)
自定義分區(qū)
要求:按照key將對應(yīng)的value輸出到指定的分區(qū)中
解釋:自定義一個自定義分區(qū)類,繼承partitioner,實(shí)現(xiàn)他的兩個方法
1、numPartitions
2、getPartition
具體的功能根據(jù)項目的要求自定義實(shí)現(xiàn),然后調(diào)用partitionBy方法,new出自定義的類,傳入?yún)?shù)即可
九、RDD持久化原理
1、持久化場景:對于一個rdd會被多次引用到,并且這個rdd計算過程復(fù)雜,計算時間特變耗時
2、如何進(jìn)行持久化,調(diào)用rdd.persist方法或cache方法,cache方法底層就是調(diào)用persist方法
******************persist(StorageLevel.MEMORY_ONLY)*******************
如果對RDD做持久化,默認(rèn)持久化級別是storageLevel.MEMORY_ONLY ,也就是持久化到內(nèi)存中去,這種持久化級別是效率最快的,但是由于是純Java?對象,保存到內(nèi)存中,那么內(nèi)存可能保存的數(shù)量就會較少
***************persist(StorageLevel.MEMORY_ONLY_SER)****************
如果當(dāng)我們集群資源有限時,那么我們可以采用MEMORY_ONLY_SER,也就是將Java對象進(jìn)行序列化之后持久到內(nèi)存中去,這種持久化的好處是能夠持久化更多的數(shù)據(jù)到內(nèi)存中,但是由于在持久化時需要序列化,取出來之后又需要反序列化這一過程,這個過程會消耗CPU計算資源,性能相對于MEMORY_ONLY?這種持久化級別來說稍微弱點(diǎn),但是還是比較高效的
3、如何選擇RDD持久化策略?
Spark提供的多種持久化級別,主要是為了在CPU和內(nèi)存消耗之間進(jìn)行取舍,下面是一些通用的持久化級別的選擇建議:
1)、優(yōu)先使用MEMORY_ONLY,如果可以緩存所有數(shù)據(jù)的話,那么就使用這種策略,因為純內(nèi)存速度最快,而且沒有序列化,不需要消耗CPU進(jìn)行反序列化操作
2)、如果MEMORY_ONLY策略,無法存儲所有數(shù)據(jù)的話,那么使用MEMORY_ONLY_SER,將數(shù)據(jù)進(jìn)行序列化存儲,純內(nèi)存操作還是非常快的,只是要消耗CPU進(jìn)行反序列化
3)、如果需要進(jìn)行快速的失敗恢復(fù),那么就選擇帶后綴為_2的策略,進(jìn)行數(shù)據(jù)的備份,這樣在失敗時,就不需要重新計算了
4、能不使用DISK相關(guān)的策略,就不要使用,有的時候,從磁盤讀取數(shù)據(jù),還不如重新計算一次
十一、共享變量
1、共享變量分為兩種:廣播變量?和?累加器
廣播變量(broadcast)
2、日常所遇問題
因為每個task都需要拷貝這樣的一個副本到executor去執(zhí)行,那么我們可以想象一下,如果有1000?個task在某個worker上執(zhí)行,而這個副本有100M,那么意味著我們需要拷貝100G的數(shù)據(jù)都到某個worker上執(zhí)行,這樣的話會大大消耗我們的網(wǎng)絡(luò)流量,同時會加大executor的內(nèi)存消耗,從而增加了我們spark作業(yè)的運(yùn)行時間,大大降低了spark作業(yè)的運(yùn)行效率,增加了作業(yè)失敗的概率
3、如何解決以上問題,也就是說什么時候使用廣播變量?
當(dāng)RDD引用到了一個外部變量并且這個外部變量數(shù)據(jù)量不小,同時這個RDD對應(yīng)的task數(shù)量特別多,那么此時使用廣播共享變量再合適不過了
我們可以將這種大的外部變量做成廣播變量,外部變量做成廣播變量的時候,那么每個executor的內(nèi)存中只會有一個外部變量,而這個副本針對所有的task都是共享的,這樣的話就減少了網(wǎng)絡(luò)流量消耗,降低了executor的內(nèi)存消耗,提高了spark作業(yè)運(yùn)行效率和縮短了運(yùn)行時間,同時降低了作業(yè)失敗的概率
4、廣播變量的使用流程:
1)、某個executor的第一個task先執(zhí)行,首先會從自己的blockManager中查找外部變量,如果沒有就從鄰居的executor的blockManager的內(nèi)存中獲取這個外部變量,如果還是獲取不到,就從driver端獲取,拷貝這個外部變量到本地的executor的blockManager
2)、當(dāng)這個executor的其他task執(zhí)行時,就不需要從外面獲取這個外部變量的副本,直接從本地的blockManager中獲取即可
5、如何獲取廣播變量的值?
可以直接調(diào)用廣播變量的value()?這個方法即可
【注意】廣播變量是只讀的,不可寫
累加器(Accumulator)
Spark提供的Accumulator?,主要用于多個節(jié)點(diǎn)對一個變量進(jìn)行共享性的操作,Accumulator只是提供了累加的功能。但是卻給我們提供了多個task對一個變量并行操作的功能,但是task只能對Accumulator進(jìn)行累加操作
【注意】task只能對Accumulator進(jìn)行類加操作,只有Driver程序可以讀取Accumulator的值
RDD分區(qū)和容錯機(jī)制講解
1、RDD?的Lineage血統(tǒng)
RDD只支持粗粒度轉(zhuǎn)換,即在大量記錄上執(zhí)行的單個操作,將創(chuàng)建RDD的一系列Lineage(血統(tǒng))記錄下來。以便恢復(fù)丟失的分區(qū)
2、RDD的依賴關(guān)系
RDD和它的父RDD的關(guān)系有兩種不同的類型:
1)、窄依賴(一對一,多對一)
形象的比喻:獨(dú)生子女
2)、寬依賴(多對多)
形象的比喻:超生
注釋:劃分stage的依據(jù)就是寬依賴,也就是RDD之間是否有shuffle,shuffle過程就是一個寬依賴過程,shuffle之前的tasks就屬于一個stage,shuffle之后的也屬于一個stage,shuffle之前和之后的操作都是窄依賴
【注意】shuffle過程分為:shuffle Write過程 和?shuffle read過程
4、DAG的生成(有向無環(huán)圖)和任務(wù)的劃分
DAG(Directed Acyclic Graph)叫做有向無環(huán)圖(有方向無循環(huán)的圖)
5、一個wordCount過程會產(chǎn)生多少個RDD?
至少會產(chǎn)生五個RDD,
第一個,從HDFS中加載后得到一個RDD(即使用sc.textFile()算子),即HadoopRDD
在sc.textFile()過程中還會產(chǎn)生一個RDD(調(diào)用map算子),產(chǎn)生一個MapPartitionRDD
第二個,使用flatMap算子,得到一個MapPartitionRDD
第三個,使用map算子,得到一個MapPartitionRDD
第四個,使用reduceByKey算子,也就是在經(jīng)過了shuffle過程后又會得到一個shuffledRDD
第五個,使用saveAsTextFile算子,再產(chǎn)生一個MapPartitionRDD?
spark程序提交流程講解
Spark任務(wù)簡介:
Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通過反射創(chuàng)建我們編寫的主類的實(shí)例對象,調(diào)用main方法-->開始執(zhí)行我們編寫的代碼-->初始化SparkContext對象-->創(chuàng)建初始的RDD-->觸發(fā)action算子-->提交job-->worker執(zhí)行任務(wù)-->任務(wù)結(jié)束
Spark任務(wù)詳解:
1)、將我們編寫的程序打成jar包
2)、調(diào)用spark-submit腳本提交任務(wù)到集群上運(yùn)行
3)、運(yùn)行sparkSubmit的main方法,在這個方法中通過反射的方式創(chuàng)建我們編寫的主類的實(shí)例對象,然后調(diào)用main方法,開始執(zhí)行我們的代碼(注意,我們的spark程序中的driver就運(yùn)行在sparkSubmit進(jìn)程中)
4)、當(dāng)代碼運(yùn)行到創(chuàng)建SparkContext對象時,那就開始初始化SparkContext對象了
5)、在初始化SparkContext對象的時候,會創(chuàng)建兩個特別重要的對象,分別是:DAGScheduler
和TaskScheduler
【DAGScheduler的作用】將RDD的依賴切分成一個一個的stage,然后將stage作為taskSet提交給DriverActor
6)、在構(gòu)建taskScheduler的同時,會創(chuàng)建兩個非常重要的對象,分別是DriverActor和ClientActor
【clientActor的作用】向master注冊用戶提交的任務(wù)
【DriverActor的作用】接受executor的反向注冊,將任務(wù)提交給executor
7)、當(dāng)clientActor啟動后,會將用戶提交的任務(wù)和相關(guān)的參數(shù)封裝到ApplicationDescription對象中,然后提交給master進(jìn)行任務(wù)的注冊
8)、當(dāng)master接受到clientActor提交的任務(wù)請求時,會將請求參數(shù)進(jìn)行解析,并封裝成Application,然后將其持久化,然后將其加入到任務(wù)隊列waitingApps中
9)、當(dāng)輪到我們提交的任務(wù)運(yùn)行時,就開始調(diào)用schedule(),進(jìn)行任務(wù)資源的調(diào)度
10)、master將調(diào)度好的資源封裝到launchExecutor中發(fā)送給指定的worker
11)、worker接受到Maseter發(fā)送來的launchExecutor時,會將其解壓并封裝到ExecutorRunner中,然后調(diào)用這個對象的start(),?啟動Executor
12)、Executor啟動后會向DriverActor進(jìn)行反向注冊
13)、driverActor會發(fā)送注冊成功的消息給Executor
14)、Executor接受到DriverActor注冊成功的消息后會創(chuàng)建一個線程池,用于執(zhí)行DriverActor發(fā)送過來的task任務(wù)
15)、當(dāng)屬于這個任務(wù)的所有的Executor啟動并反向注冊成功后,就意味著運(yùn)行這個任務(wù)的環(huán)境已經(jīng)準(zhǔn)備好了,driver會結(jié)束SparkContext對象的初始化,也就意味著new SparkContext這句代碼運(yùn)行完成
16)、當(dāng)初始化sc成功后,driver端就會繼續(xù)運(yùn)行我們編寫的代碼,然后開始創(chuàng)建初始的RDD,然后進(jìn)行一系列轉(zhuǎn)換操作,當(dāng)遇到一個action算子時,也就意味著觸發(fā)了一個job
17)、driver會將這個job提交給DAGScheduler
18)、DAGScheduler將接受到的job,從最后一個算子向前推導(dǎo),將DAG依據(jù)寬依賴劃分成一個一個的stage,然后將stage封裝成taskSet,并將taskSet中的task提交給DriverActor
19)、DriverActor接受到DAGScheduler發(fā)送過來的task,會拿到一個序列化器,對task進(jìn)行序列化,然后將序列化好的task封裝到launchTask中,然后將launchTask發(fā)送給指定的Executor
20)、Executor接受到了DriverActor發(fā)送過來的launchTask時,會拿到一個反序列化器,對launchTask進(jìn)行反序列化,封裝到TaskRunner中,然后從Executor這個線程池中獲取一個線程,將反序列化好的任務(wù)中的算子作用在RDD對應(yīng)的分區(qū)上
【注意】
Spark的任務(wù)分為為兩種:
a、shuffleMapTask:shuffle之前的任務(wù)
b、resultTask:shuffle之后的任務(wù)
Spark任務(wù)的本質(zhì):
將RDD的依賴關(guān)系切分成一個一個的stage,然后將stage作為TaskSet分批次的發(fā)送到Executor上執(zhí)行
十三、Checkpoint
1、使用checkpoint的場景:
某個RDD會被多次引用,計算特別復(fù)雜,計算特別耗時
擔(dān)心中間某些關(guān)鍵的,在后面會反復(fù)幾次使用的RDD,可能會因為節(jié)點(diǎn)的故障,導(dǎo)致持久化數(shù)據(jù)的丟失
2、如何對RDD進(jìn)行checkpoint?
1)、設(shè)置還原點(diǎn)目錄,設(shè)置checkpoint目錄
2)、調(diào)用RDD的checkpoint的方法對該RDD進(jìn)行checkpoint
3、checkpoint的原理
1)、RDD調(diào)用了checkpoint方法之后,就接受RDDCheckpointData對象的管理
2)、RDDCheckpointData對象會負(fù)責(zé)將調(diào)用了checkpoint的RDD?的狀態(tài)設(shè)置為MarkedForCheckpoint
3)、當(dāng)這個RDD所在的job運(yùn)行結(jié)束后,會調(diào)用最后一個RDD的doCheckpoint,根據(jù)其血統(tǒng)向上查找,查找到被標(biāo)注為MarkedForCheckpoint狀態(tài)的RDD,將其狀態(tài)改變?yōu)閏heckpointingInProgress
4)、啟動一個單獨(dú)的job,將血統(tǒng)中標(biāo)記為checkpointingInProgress的RDD進(jìn)行checkpoint,也就是將RDD的數(shù)據(jù)寫入到checkpoint的目錄中去
5)、當(dāng)某個節(jié)點(diǎn)發(fā)生故障,導(dǎo)致包括持久化的數(shù)據(jù)全部丟失,此時會從還原點(diǎn)目錄還原RDD的每個分區(qū)的數(shù)據(jù),這樣就不需要從頭開始計算一次
4、checkpoint需要注意的地方
因為RDD在做checkpoint的時候,會單獨(dú)啟動一個job對需要進(jìn)行checkpoint的RDD進(jìn)行重新計算,這樣就會增加spark作業(yè)運(yùn)行時間,所以spark強(qiáng)烈建議在做checkpoint之前,應(yīng)該對需要進(jìn)行checkpoint的RDD進(jìn)行持久化(即調(diào)用?.cache)
5、checkpoint?和持久化的區(qū)別
1)、是否改變血統(tǒng):
持久化(.cache):不會改變RDD的依賴關(guān)系,也就是不會改變其血統(tǒng)
Checkpoint:會改變RDD的血統(tǒng),做了checkpoint的RDD會清除其所有的依賴關(guān)系,并將其父RDD強(qiáng)制設(shè)置為checkpointRDD,并且將RDD的狀態(tài)更改為checkpointed
2)、RDD的數(shù)據(jù)的可靠性:
持久化:只是將RDD的數(shù)據(jù)持久化到內(nèi)存或磁盤中,但是如果節(jié)點(diǎn)發(fā)生故障,那么持久化的數(shù)據(jù)還是會丟失
Checkpoint:checkpoint的數(shù)據(jù)保存在第三方高可靠的分布式的文件系統(tǒng)中,機(jī)試節(jié)點(diǎn)發(fā)生故障,數(shù)據(jù)也不會丟失,所以checkpoint比持久化可靠性更高
6、后續(xù)
我們實(shí)現(xiàn)了checkpoint?之后,在某個task?又調(diào)用了該RDD的iterator()?方法時,就實(shí)現(xiàn)了高容錯機(jī)制,即使RDD的持久化數(shù)據(jù)丟失,或者壓根兒就沒有持久化,但是還是可以通過readCheckpointOrComputer()?方法,優(yōu)先從父RDD-----checkpointRDD中讀取,HDFS(外部文件系統(tǒng))的數(shù)據(jù)
第二部分?spark-sql
一、Spark-SQL前世今生
1、Spark SQL的特點(diǎn)
1)、支持多種數(shù)據(jù)源:Hive、RDD、Parquet、JSON、JDBC等。
2)、多種性能優(yōu)化技術(shù):in-memory columnar storage、byte-code generation、cost model動態(tài)評估等。
3)、組件擴(kuò)展性:對于SQL的語法解析器、分析器以及優(yōu)化器,用戶都可以自己重新開發(fā),并且動態(tài)擴(kuò)展
2、Spark SQL的性能優(yōu)化技術(shù)簡介
1)、內(nèi)存列存儲(in-memory columnar storage)
2)、字節(jié)碼生成技術(shù)(byte-code generation)
3)、Scala代碼編寫的優(yōu)化
3、Spark SQL and DataFrame
Spark SQL是Spark中的一個模塊,主要用于進(jìn)行結(jié)構(gòu)化數(shù)據(jù)的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數(shù)據(jù)。
DataFrame,可以理解為是,以列的形式組織的,分布式的數(shù)據(jù)集合。它其實(shí)和關(guān)系型數(shù)據(jù)庫中的表非常類似,但是底層做了很多的優(yōu)化。DataFrame可以通過很多來源進(jìn)行構(gòu)建,包括:結(jié)構(gòu)化的數(shù)據(jù)文件,Hive中的表,外部的關(guān)系型數(shù)據(jù)庫,以及RDD。
二、Spark-sql的使用
1、RDD轉(zhuǎn)換為DataFrame(兩種)
1)、使用反射的方式來推斷包含了特定數(shù)據(jù)類型的RDD的元數(shù)據(jù)
2)、通過編程接口來創(chuàng)建DataFrame
2、UDF自定義函數(shù)和UDAF自定義聚合函數(shù)
UDF,其實(shí)更多的是針對單行輸入,返回一個輸出
UDAF,則可以針對多行輸入,進(jìn)行聚合計算,返回一個輸出,功能更加強(qiáng)大
3、Spark-SQL工作原理
SqlParse ?--------->解析器
Analyser ?--------->分析器
Optimizer ?--------->優(yōu)化器
SparkPlan ?--------->物理計劃
流程:
1)、自己編寫的SQL語句
大家要知道,只要在數(shù)據(jù)庫類型的技術(shù)里面,比如:最傳統(tǒng)的MySQL,Oracle等,包括現(xiàn)在大數(shù)據(jù)領(lǐng)域的數(shù)據(jù)倉庫,比如hive,他的基本的SQL執(zhí)行的模型,都是類似的,首先都要生成一條SQL語句的執(zhí)行計劃
2)、通過SqlParser(解析器)生成未解析的邏輯計劃(unresolved LogicalPlan)
3)、通過Analyzer(分析器)生成解析后的邏輯計劃(resolved LogicalPlan)
4)、通過Optimizer(優(yōu)化器)生成優(yōu)化后的邏輯計劃(optimized LogicalPlan)
實(shí)際上,比如傳統(tǒng)的Oracle等數(shù)據(jù)庫,通常都會生成多個執(zhí)行計劃,然后呢,最后有一個優(yōu)化器,針對多個計劃,選擇一個最好的計劃,而SparkSql這兒的優(yōu)化指的是,比如說,剛生成的執(zhí)行計劃中,有些地方的性能是顯而易見的,不太好,舉例說明:
比如說,我們有一個SQL語句,select name from (select ... from ...) where ..=..;
此時,在執(zhí)行計劃解析出來的時候,其實(shí)就是按照他原封不動的樣子,來解析成可以執(zhí)行的計劃,但是呢,Optimizer?在這里其實(shí)就會對執(zhí)行計劃進(jìn)行優(yōu)化,比如說,發(fā)現(xiàn)where?條件,其實(shí)可以放在子查詢中,這樣,子查詢的數(shù)量大大變小,可以優(yōu)化執(zhí)行速度,此時,可能就會變成如下這樣:select name from (select name from ...where ..=..)
5)、通過SparkPlan,生成最后的物理計劃(PhysicalPlan)
到物理計劃這里,那么其實(shí)就是非常“接地氣”的計劃了。就是說,已經(jīng)很明朗了,從那幾個文件讀取什么數(shù)據(jù),從那幾個文件中讀取,如何進(jìn)行關(guān)聯(lián)等等
6)、在executor中執(zhí)行物理計劃
邏輯的執(zhí)行計劃,更多的是偏向于邏輯,比如說吧,大致就是這種樣子的,
From table students=>filter ... => select name ...
這里基本上,邏輯計劃都是采用Tree?,樹形結(jié)構(gòu)
7)、生成RDD
Select ?name ?from ?students =>?解析,從哪里去查詢,students表,在哪個文件里,從哪個文件中查詢哪些數(shù)據(jù),比如說是name這個列,此外,復(fù)雜的SQL,還有,比如說查詢時,是否對表中的數(shù)據(jù)進(jìn)行過濾和篩選,更不用說,復(fù)雜時,需要有多表的JOIN(咋傳統(tǒng)數(shù)據(jù)庫中,比如MySQL,執(zhí)行計劃還涉及到如何掃描和利用索引)
4、spark-SQL性能優(yōu)化
1)、設(shè)置shuffle過程的并行度:spark.sql.shuffle.partitions(SQLContext.setConf())
2)、在hive數(shù)據(jù)倉庫建設(shè)過程中,合理設(shè)置數(shù)據(jù)類型,比如能設(shè)置為int的,就不要設(shè)置為bigInt,減少數(shù)據(jù)類型導(dǎo)致不必要的內(nèi)存開銷
3)、編寫SQL時,盡量給出明確的列名,比如select name from students。不要寫select *?的方式。
4)、并行處理查詢結(jié)果:對于spark-SQL查詢的結(jié)果,如果數(shù)據(jù)量比較大,比如超過1000條,那么就不要一次性的collect()到driver再處理,使用foreach()算子,并行處理查詢結(jié)果
5)、緩存表:對于一條SQL語句可能對此使用到的表,可以對其進(jìn)行緩存,使用?sqlContext.cacheTable(tableName),或者DataFrame.cache()即可,spark-SQL會用內(nèi)存列存儲的格式進(jìn)行表的緩存,然后spark-sql就可以僅僅掃描需要使用的列,并且自動優(yōu)化壓縮,來最小化內(nèi)存使用和GC開銷,SQLContext.uncacheTable(tableName)可以將表從緩存中移除,用SQLContext。setConf(),設(shè)置spark.sql.inMemoryColumnarStorage.batchSize參數(shù)(默認(rèn)10000),可以設(shè)置列存儲的單位
6)、廣播join表:spark.sql.autoBroadcastJoinThreshold,默認(rèn)10485760 (10 MB)。在內(nèi)存夠用的情況下,可以增加其大小,參數(shù)設(shè)置了一個表在join的時候,最大在多大以內(nèi),可以被廣播出去優(yōu)化性能
5、Hive on Spark配置
1)、安轉(zhuǎn)配置好Hive和Spark
2)、Set hive.execution.engine=spark;
3)、set spark.master=spark://mini1:7077
第三部分spark-streaming
1, ?Dstream
?
Dstream是sparkStreaming的數(shù)據(jù)模型,本質(zhì)就是一連串不間斷的RDD,但是它是一個時間段的RDD.這些時間段的RDD源源不斷的連接在一起。
這個時間可以自己設(shè)置,時間設(shè)置的越短,實(shí)時性越高,但是性能消耗也越大。
2, ?spark streaming從kafka獲取數(shù)據(jù),有哪幾種方式?
?
有兩種方式:
1.通過receiver的方式,
2,通過direct的方式,dirrect的方式需要自己來管理偏移量。
?
3, ?sparkStreaming和storm的區(qū)別
sparkStreaming是spark里面的一個做流式準(zhǔn)實(shí)時計算的組件,它使用的數(shù)據(jù)結(jié)構(gòu)是Dstream,Dstream里面是一連串時間片的rdd。
相比于storm,sparkStreaming在實(shí)時性,保證數(shù)據(jù)不丟失方面都不占用優(yōu)勢,spark streaming在spark支持者眼中的優(yōu)勢是spark Streaming具有高吞吐性,最本質(zhì)來說,sparkStreaming相比于storm的優(yōu)勢是sparkStreaming可以和spark core,spark SQL無縫整合。
4.對于需要多次引用的,并且這個dstream計算時間特別耗時,數(shù)據(jù)特別重要,那么我們就需要對dstream進(jìn)行checkpoint,(只有多次引用的,進(jìn)行持久化就可以了),因為即使對這個dstream進(jìn)行持久化,數(shù)據(jù)也可能會丟失,而checkpoint數(shù)據(jù)丟失的可能性小,但是這樣會影響spark-streaming的數(shù)據(jù)吞吐量,因為在做計算的同時,還需要將數(shù)據(jù)寫入到外部存儲系統(tǒng)中,會降低spark性能,影響吞吐量,非必要情況下不建議使用
5.如何對dstream做checkpoint
?
首先設(shè)置還原點(diǎn)目錄,其次調(diào)用dstream的checkpoint方法
【注意】:dstream的checkpoint的周期一定要是產(chǎn)生batch時間的整數(shù)倍,同時spark官方建議將checkpoint的時間設(shè)置為至少10秒。通常來說,將checkpoint間隔設(shè)置為窗口操作的滑動間隔的5-10倍
6.spark程序在啟動時,會去這個checkpointPath目錄下查看是否有保存的driver的元數(shù)據(jù)(1.dstream的操作轉(zhuǎn)換關(guān)系,2.未處理完的batch)信息,當(dāng)spark-streaming程序在二次啟動后就會去checkpointPath目錄下還原這個程序,加載未處理的batch元數(shù)據(jù)信息在內(nèi)存中恢復(fù),繼續(xù)進(jìn)行任務(wù)處理
7.為了保證spark-streaming程序7*24小時運(yùn)行,那么我們程序應(yīng)該具備高可靠性,怎樣具備高可靠性?
?
a.程序出現(xiàn)故障,driver死掉了,流式程序應(yīng)該具備自動重啟的功能
b.沒有計算完成的rdd在程序異常停止后,下次啟動后還會將未處理的rdd進(jìn)行處理
【注意】:要在spark_submit中,添加--deploy-mode參數(shù),默認(rèn)其值為client,即在提交應(yīng)用的機(jī)器上啟動driver,但是要能夠自動重啟driver,就必須將其值設(shè)置為cluster;此外,需要添加--supervise參數(shù),失敗后自動重啟
//spark_submit --executor-memory 1g --total-execute-cores 5 --deploy-model cluster --supervise
8.啟用預(yù)寫機(jī)制
a.預(yù)寫日志機(jī)制,簡寫為WAL,全稱為Write Ahead Log,從spark1.2版本開始,就引入了基于容錯的文件系統(tǒng)的WAL機(jī)制。如果啟用該機(jī)制,Receiver接收到的所有數(shù)據(jù)都會寫入配置的checkpoint目錄中的預(yù)寫日志。這中機(jī)制可以讓driver在恢復(fù)的時候,避免數(shù)據(jù)丟失,并且可以確保整個實(shí)時計算過程中零數(shù)據(jù)丟失
b.要配置該機(jī)制,首先調(diào)用StreamingContext的checkpoint()方法設(shè)置一個checkpoint目錄,然后需要將spark.streaming.receiver.writeAheadLog.enable參數(shù)設(shè)置為true
然而,這種極強(qiáng)的可靠性機(jī)制,會導(dǎo)致Receiver的吞吐量大幅度下降,因為單位時間內(nèi),有相當(dāng)一部分時間需要將數(shù)據(jù)寫入預(yù)寫日志。如果又希望開啟預(yù)寫日志機(jī)制,確保數(shù)據(jù)零損失,又不希望影響系統(tǒng)的吞吐量,那么可以創(chuàng)建多個輸入DStream,啟動多個Receiver
此外,在啟用了預(yù)寫日志機(jī)制之后,推薦將復(fù)制持久化機(jī)制禁用掉,因為所有數(shù)據(jù)已經(jīng)保存在容錯的文件系統(tǒng)中,不需要在用復(fù)制機(jī)制進(jìn)行持久化,保存一份副本,只要將輸入的DStream的持久化機(jī)制設(shè)置一下即可,persist(StorageLevel.MEMORY_AND_DISK_SER)。
9.spark-Streaming checkpoint概述
每一個spark-streaming應(yīng)用,正常來說,都是7*24小時運(yùn)轉(zhuǎn)的,這就是實(shí)時計算程序的特點(diǎn),因為要持續(xù)不斷的對數(shù)據(jù)進(jìn)行計算,因此,對實(shí)時計算應(yīng)用的要求,應(yīng)該是必須要能夠?qū)εc應(yīng)用程序邏輯無關(guān)的失敗,進(jìn)行容錯
如果要實(shí)現(xiàn)這個目標(biāo),Spark Streaming程序就必須將足夠的信息checkpoint到容錯的存儲系統(tǒng)上,從而讓它能夠從失敗中進(jìn)行恢復(fù)
有兩種數(shù)據(jù)需要被進(jìn)行checkpoint
1.元數(shù)據(jù)checkpoint-將定義了流式計算邏輯的信息,保存到容錯的存儲系統(tǒng)上,比如HDFS,當(dāng)運(yùn)行spark Streaming應(yīng)用程序的Driver進(jìn)程所在的節(jié)點(diǎn)失敗時,該信息可以用于進(jìn)行恢復(fù),元數(shù)據(jù)信息包括:
a.配置信息---創(chuàng)建spark Streaming應(yīng)用程序的配置信息,比如sparkConf中的信息
b.DStream的操作信息---定義了spark Stream應(yīng)用程序的計算邏輯的DStream操作信息
c.未處理的batch信息---那些job正在排隊,還沒處理的batch信息
2.數(shù)據(jù)checkpoint---將實(shí)時計算過程中產(chǎn)生的RDD的數(shù)據(jù)保存到可靠的存儲系統(tǒng)中。對于一些將多個batch的數(shù)據(jù)進(jìn)行聚合,有狀態(tài)的transformation操作,這是非常有用的,在這種transformation操作中,生成的RDD是依賴于之前batch的RDD的,這會導(dǎo)致隨著時間的推移,RDD的依賴鏈條變得越來越長,要避免由于依賴鏈條越來越長,導(dǎo)致的一起變得越來越長的失敗恢復(fù)時間,有狀態(tài)的transformation操作執(zhí)行過程中中間產(chǎn)生的RDD,會定期被checkpoint到可靠的存儲系統(tǒng)上,比如HDFS,從而削減RDD的依賴鏈條進(jìn)而縮短失敗恢復(fù)時,RDD的恢復(fù)時間。一句話概括,元數(shù)據(jù)checkpoint主要是為了從driver失敗中進(jìn)行恢復(fù);而RDD checkpoint主要是為了使用到有狀態(tài)的transformation操作時,能夠在其生產(chǎn)出的數(shù)據(jù)丟失時,進(jìn)行快速的失敗恢復(fù)
10.何時啟用checkpoint機(jī)制?
a.使用了有狀態(tài)的transformation操作----比如updateStateByKey,或者reduceByKeyAndWindow操作被使用了,那么checkpoint目錄要求必須提供的,也就是必須開啟checkpoint機(jī)制,從而進(jìn)行周期性的RDD checkpoint
b.要保證可以從Driver失敗中進(jìn)行恢復(fù)-----元數(shù)據(jù)checkpoint需要啟用,來進(jìn)行這種情況的恢復(fù)
【注意】并不是說,所有的spark streaming應(yīng)用程序,都要啟用checkpoint機(jī)制,如果既不強(qiáng)制要求從Driver失敗中自動進(jìn)行恢復(fù),又沒使用有狀態(tài)的transformation操作,那么就不需要啟用checkpoint,事實(shí)上,這么做反而是有助于提升性能的
11.如何自動從Driver失敗中恢復(fù)過來
?
要能夠自動從Driver失敗中恢復(fù)過來運(yùn)行spark Streaming應(yīng)用程序的集群,就必須監(jiān)控Driver運(yùn)行的過程,并且在他失敗時將他重啟,對于spark自身的standalone模式,需要進(jìn)行一些配置去supervise driver,在他失敗時將其重啟
首先,要在spark-submit中,添加--deploy-mode參數(shù),默認(rèn)其值為client,即在提交應(yīng)用的機(jī)器上啟動Driver,但是,要能夠自動重啟Driver,就必須將其值設(shè)置為cluster,此外,需要添加--supervise參數(shù)
?
部分原理圖稍后上傳。?