[參考link]
[參考link]
[參考link]
[參考link]
[參考link]
[參考link]
[參考link]
[參考link]
-0- Spark概述
Spark是一個計算引擎,專為大規(guī)模數(shù)據(jù)處理而設(shè)計,快速、通用。
-1- SparkContext、sparkApp、Driver、Executor
val conf = new SparkConf().setAppName("appName")
implicit val sc: SparkContext = new SparkContext(conf)
...
...
...
sc.stop()
*SparkContext是整一個sparkApp的入口:
通過SparkContext()初始化一個SparkContext實(shí)例,一個SC實(shí)例就是一個Spark應(yīng)用;
*舉個簡單的sparkApp的例子:
saprkSql讀取hive表數(shù)據(jù)→DataFrame相關(guān)數(shù)據(jù)過濾處理→sparkSql將最終數(shù)據(jù)overwrite到hive表;
當(dāng)然,在sparkApp里還可以進(jìn)行hive表樣本數(shù)據(jù)讀取→機(jī)器學(xué)習(xí)模型訓(xùn)練→模型預(yù)測→預(yù)測結(jié)果overwrite到hive表這種任務(wù)腳本;
*一個Spark應(yīng)用程序,包含了一個Driver program和集群中多個Executor,Driver和Executor存在心跳機(jī)制確保存活;
*Driver負(fù)責(zé)控制執(zhí)行開發(fā)人員向spark引擎提交的應(yīng)用腳本中main入口的代碼,包括:
創(chuàng)建SparkContext、創(chuàng)建 RDD、對RDD 的transformation操作和action操作等;
*Executor負(fù)責(zé)運(yùn)行組成 Spark 應(yīng)用的任務(wù),并將結(jié)果返回給Driver進(jìn)程;Executor通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內(nèi)存式存儲。RDD是直接緩存在Executor進(jìn)程內(nèi)的,因此任務(wù)可以在運(yùn)行時充分利用緩存數(shù)據(jù)加速運(yùn)算。
-2- SparkSession、sparkSQL
implicit val spark: SparkSession = SparkSession.builder()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
val inputSql = s"""
|SELECT *
|FROM ${inputTable}
|WHERE date = '${date}'
|""".stripMargin
println("inputSql :" + inputSql)
val rawDF = spark.sql(inputSql)
rawDF.show(10)
*SparkSession是sparkSql的入口:
通過SparkSession.builder()初始化一個spark對象,spark.sql()就可以進(jìn)行跑SQL,完成從hive表取數(shù)、將數(shù)據(jù)overwrite到hive表的操作了。
*SparkSession 其實(shí)是封裝了SQLContext和HiveContext,spark應(yīng)用中有sparkSQL 操作的話必須創(chuàng)建一個 SQLContext 或者 HiveContext 的類實(shí)例,HiveContext繼承自 SQLContext,用于處理 hive 中的數(shù)據(jù)。
-3- DAG(Directed Acyclic Graph,有向無環(huán)圖)、RDD(ResilienntDistributedDatasets,彈性分布式數(shù)據(jù)集)
*spark的作業(yè)和任務(wù)調(diào)度系統(tǒng)是其核心,它能夠有效的進(jìn)行調(diào)度的根本原因是因?yàn)閷θ蝿?wù)劃分DAG和容錯。
*spark處理數(shù)據(jù)時,會將計算轉(zhuǎn)化為一個有向無環(huán)圖(DAG)的任務(wù)集。
*Spark中使用DAG對RDD的關(guān)系進(jìn)行建模,描述了RDD的依賴關(guān)系,這種關(guān)系也被稱之為lineage,RDD的依賴關(guān)系使用Dependency維護(hù),參考Spark RDD之Dependency,DAG在Spark中的對應(yīng)的實(shí)現(xiàn)為DAGScheduler。
*RDD :彈性分布式數(shù)據(jù)集,只讀的、分區(qū)(partition)記錄的集合
*RDD能夠有效的恢復(fù)DAG中故障和慢節(jié)點(diǎn)執(zhí)行的任務(wù),并且RDD提供一種基于粗粒度變換的接口,記錄創(chuàng)建數(shù)據(jù)集的“血統(tǒng)”,能夠?qū)崿F(xiàn)高效的容錯性。
*初代rdd處于血統(tǒng)的頂層,記錄任務(wù)所需的數(shù)據(jù)的分區(qū)信息,每個分區(qū)數(shù)據(jù)的讀取方法
*子代rdd不真正的存儲信息,只記錄血統(tǒng)信息
*真正的數(shù)據(jù)讀取,應(yīng)該是task具體被執(zhí)行的時候,觸發(fā)action操作的時候才發(fā)生的
-4- DataFrame、DataSet、Row

*在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集;
*DataFrame類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格;
*DataFrame與RDD的主要區(qū)別在于:
DataFrame帶有schema元信息,即DataFrame的每一列都帶有名稱和類型。
這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時效率的目標(biāo)。
RDD所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu)無從得知,Spark Core只能在stage層面進(jìn)行簡單、通用的流水線優(yōu)化。
-5- 算子:transformation()、action()
RDD的三大特性:
* 分布式存儲分布式存儲在最大的好處是可以讓數(shù)據(jù)在不同工作節(jié)點(diǎn)并行存儲,以便在需要數(shù)據(jù)時并行運(yùn)算。
* 彈性彈性指其在節(jié)點(diǎn)存儲時,既可以使用內(nèi)存,也可已使用外存,為使用者進(jìn)行大數(shù)據(jù)處理提供方便。
* 延遲計算一個完整的RDD運(yùn)行任務(wù)被分為兩部分:Transformation和Action.
hadoop計算引擎提供的接口只有map函數(shù)、reduce函數(shù)
spark計算引擎提供的是mapreduce的擴(kuò)展,提供兩類操作:Transformation系列操作、Action系列操作
-5.1- Transformation
Transformation用于對RDD的創(chuàng)建,RDD只能使用Transformation創(chuàng)建,同時還提供大量操作方法,包括map,filter,groupBy,join等,RDD利用這些操作生成新的RDD,但是需要注意,無論多少次Transformation,在RDD中真正數(shù)據(jù)計算Action之前都不可能真正運(yùn)行。
-5.2- Action
Action是數(shù)據(jù)執(zhí)行部分,其通過執(zhí)行count,reduce,collect等方法真正執(zhí)行數(shù)據(jù)的計算部分。實(shí)際上,RDD中所有的操作都是Lazy模式進(jìn)行,運(yùn)行在編譯中不會立即計算最終結(jié)果,而是記住所有操作步驟和方法,只有顯示的遇到啟動命令才執(zhí)行。這樣做的好處在于大部分前期工作在Transformation時已經(jīng)完成,當(dāng)Action工作時,只需要利用全部自由完成業(yè)務(wù)的核心工作。
-5.3- Spark 的 20個Transformations 操作函數(shù)總結(jié)及舉例
| function | 說明 | 注釋 |
|---|---|---|
| map(func) | 將func函數(shù)作用到數(shù)據(jù)集的每個元素,生成一個新的分布式的數(shù)據(jù)集并返回 | |
| filter(func) | 選出所有func返回值為true的元素,作為一個新的數(shù)據(jù)集返回 | |
| flatMap(func) | 與map相似,但是每個輸入的item能夠被map到0個或者更多的items輸出 | |
| mapPartitions(func) | 與map相似,但是mapPartitions的輸入函數(shù)單獨(dú)作用于RDD的每個分區(qū)(block)上,因此func的輸入和返回值都必須是迭代器iterator | |
| mapPartitionsWithIndex(func) | 與mapPartitions相似,但是輸入函數(shù)func提供了一個正式的參數(shù),可以用來表示分區(qū)的編號 | |
| sample(withReplacement, fraction, seed) | 從數(shù)據(jù)中抽樣,withReplacement表示是否有放回,withReplacement=true表示有放回抽樣,fraction為抽樣的概率(0<=fraction<=1),seed為隨機(jī)種子 | |
| union(otherDataset) | 并集操作,將源數(shù)據(jù)集與union中的輸入數(shù)據(jù)集取并集,默認(rèn)保留重復(fù)元素 | |
| intersection(otherDataset) | 交集操作,將源數(shù)據(jù)集與union中的輸入數(shù)據(jù)集取交集,并返回新的數(shù)據(jù)集 | |
| distinct([numTasks]) | 去除數(shù)據(jù)集中的重復(fù)元素 | |
| groupByKey([numTasks]) | 作用于由鍵值對(K, V)組成的數(shù)據(jù)集上,將Key相同的數(shù)據(jù)放在一起,返回一個由鍵值對(K, Iterable)組成的數(shù)據(jù)集 | |
| reduceByKey(func, [numTasks]) | 作用于鍵值對(K, V)上,按Key分組,然后將Key相同的鍵值對的Value都執(zhí)行func操作,得到一個值 | |
| aggregateByKey(zeroValue, seqOp, comOp, [numTasks]) | 在于鍵值對(K, V)的RDD中,按key將value進(jìn)行分組合并,合并時,將每個value和初始值作為seqOp函數(shù)的參數(shù),進(jìn)行計算,返回的結(jié)果作為一個新的鍵值對(K, V) | |
| sortByKey([ascending=True], [numTasks]) | 按照Key進(jìn)行排序,ascending的值默認(rèn)為True,True/False表示升序還是降序 | |
| join(otherDataset, [numTasks]) | 類似于SQL中的連接操作,即作用于鍵值對(K, V)和(K, W)上,返回元組 (K, (V, W)),spark也支持外連接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin | |
| cogroup(otherDataset, [numTasks]) | 作用于鍵值對(K, V)和(K, W)上,返回元組 (K, (Iterable, Iterable)) | |
| cartesian(otherDataset) | 笛卡爾乘積,作用于數(shù)據(jù)集T和U上,返回(T, U),即數(shù)據(jù)集中每個元素的兩兩組合 | |
| pipe(command, [envVars]) | 將驅(qū)動程序中的RDD交給shell處理(外部進(jìn)程),例如Perl或bash腳本 | |
| coalesce(numPartitions) | 將RDD的分區(qū)數(shù)減小到numPartitions個 | |
| repartition(numPartitions) | 重組數(shù)據(jù),數(shù)據(jù)被重新隨機(jī)分區(qū)為numPartitions個,numPartitions可以比原來大,也可以比原來小,平衡各個分區(qū) | |
| repartitionAndSortWithinPartitions(partitioner) | 根據(jù)給定的partitioner函數(shù)重新將RDD分區(qū),并在分區(qū)內(nèi)排序 |
-5.4- Spark 的 12個Actions 操作函數(shù)總結(jié)及舉例
| function | 說明 | 注釋 |
|---|---|---|
| reduce(func) | 對數(shù)據(jù)集中的元素做聚集操作 | |
| collect() | 以數(shù)組形式返回數(shù)據(jù)集中所有的元素 | |
| count() | 返回數(shù)據(jù)集中元素的個數(shù) | |
| first() | 返回數(shù)據(jù)集中的第一個元素 | |
| take() | 以數(shù)組形式返回數(shù)據(jù)集中前n個元素 | |
| takeSample(withReplacement, num, [seed]) | 以數(shù)組形式返回從數(shù)據(jù)集中抽取的樣本數(shù)量為num的隨機(jī)樣本,有替換或者無替換的進(jìn)行采樣 | |
| takeOrdered(n, [ordering]) | 返回RDD的前n個元素,可以利用自然順序或者由用戶執(zhí)行排序的comparator | |
| saveAsTextFile(path) | 將數(shù)據(jù)集中的元素以文本文件(或者文本文件的一個集合)的形式寫入本地文件系統(tǒng),或者HDFS,或者其他Hadoop支持的文件系統(tǒng)的指定路徑path下。Spark會調(diào)用每個元素的toString方法,將其轉(zhuǎn)換為文本文件中的一行 | |
| saveAsSequenceFile(path) | 將數(shù)據(jù)集中的元素以Hadoop SequenceFile的形式寫入本地文件系統(tǒng),或者HDFS,或者其他Hadoop支持的文件系統(tǒng)的指定路徑path下。RDD的元素必須由實(shí)現(xiàn)了Hadoop的Writable接口的key-value鍵值對組成。 | |
| saveAsObjectFile(path) | 利用Java序列化,將數(shù)據(jù)集中的元素以一種簡單的形式進(jìn)行寫操作,并能夠利用SparkContext.objectFile()加載數(shù)據(jù) | |
| countByKey() | 只能作用于鍵值對(K, V)形式的RDDs上。按照Key進(jìn)行計數(shù),返回鍵值對(K, int)的哈希表 | |
| foreach(func) | 在數(shù)據(jù)集的每個元素上調(diào)用函數(shù)func |
-6- cache
-7- Job、Stage、Task、Spark Web UI
*一個action算子觸發(fā)一個job
*一個job中有好多的task,task是執(zhí)行job的邏輯單元(猜測是根據(jù)partition劃分任務(wù))
*一個job根據(jù)是否有shuffle發(fā)生可以分為好多的stage
-8- 參數(shù)配置
--conf spark.executor.instances=5
--conf spark.executor.cores=8
--conf spark.executor.memory=80G