1.1 Spark 是什么
Apache Spark 是一個快速的,多用途的集群計算系統(tǒng),相當于Hadoop MapReduce ,Spark 使用了內(nèi)存來保存中間結(jié)果,能在數(shù)據(jù)還未寫入磁盤的時候在內(nèi)存中進行運算.
Spark 只是一個計算框架,不像Hadoop 一樣包含分布式文件系統(tǒng)和完備的調(diào)度系統(tǒng),如果要使用 Spark ,就需要搭載其他的文件系統(tǒng)和其他的調(diào)度系統(tǒng).
為什么會有Spark ?

Spark 產(chǎn)生之前,已經(jīng)有非常成熟的計算系統(tǒng)的存在了,比如我們的 MapReduce ,這些計算系統(tǒng)提供了高層次的 API ,把計算運行在集群中并提供容錯能力,從而實現(xiàn)分布式計算.
雖然這些框架提供了大量的對訪問利用計算資源的抽象,但是他們?nèi)鄙倭死梅植际絻?nèi)存的抽象,這些框架的缺點在于,多個計算之間的數(shù)據(jù)服用就是將中間的結(jié)果數(shù)據(jù)寫入到一個穩(wěn)定的文件系統(tǒng)中(比如: HDFS ),所以會產(chǎn)生數(shù)據(jù)的復(fù)制和備份,磁盤的 I/O 以及數(shù)據(jù)的 序列化, 所以這些框架在遇到需要在多個計算之間復(fù)用中間結(jié)果的操作的時會非常的不高效.
而這些復(fù)用中間結(jié)果的操作是非常的常見的,例如: 迭代式計算,交互式的數(shù)據(jù)挖掘, 圖形計算等等
認識到這個問題之后,學(xué)學(xué)術(shù)界提出了一個新的模型 : RDDS
RDDS 是一個可以容錯且并行的數(shù)據(jù)結(jié)構(gòu),它可以讓用戶顯式的將中間結(jié)果數(shù)據(jù)集保存在內(nèi)存中,并且通過控制數(shù)據(jù)集的分區(qū)來達到數(shù)據(jù)存放處理的最優(yōu)化.
同時 RDDS 也提供了豐富的 API 來操作數(shù)據(jù)集
后來 RDDS 被 AMPLab 在 一個叫做 Spark 的框架中進行了開源
總結(jié):
- Spark 是 Apache 的一個開源框架
- Spark 的母公司叫 Databticks
- Spark 是為了解決 MapReduce 等過去的計算系統(tǒng)無法在內(nèi)存中保存中間結(jié)果的問題
- Spark 的核心是 RDDS ,RDDS 不僅誰一個計算框架,也是一種數(shù)據(jù)結(jié)構(gòu).
- RDDS即Resilient distributed datasets(彈性分布式數(shù)據(jù)集)。
1.2 Spark 的特點(優(yōu)點)
理解 Spark 的特點,從而去理解 為什么要使用 Spark
速度快
- Spark 在內(nèi)存中運行時,速度是 Hadoop MapReduce 的 100 倍
- Spark 基于硬盤的運算速度 大概是 Hadoop MapReduce 的 10 倍
- Spark 實現(xiàn)了一種叫做 RDDS 的 DAG(Directed Acyclic Graph)叫做有向無環(huán)圖 執(zhí)行引擎,其數(shù)據(jù)緩存在內(nèi)存中 可以迭代處理.
易用
- Spark 支持 JAVA ,Scala , Python ,R , SQL 等多種語言的 API
- Spark 支持超過 80 個 高級運算符使得 用戶非常容易的構(gòu)建并行計算程序
- Spark 可以使用基于 Scala ,Python ,R , SQL 的 shell 交互式查詢
通用
- Spark 提供一個完整的技術(shù)棧,包括 SQL 執(zhí)行, Dataset 命令式 API, 機器學(xué)習(xí)庫 MLlib ,圖計算框架 GraphX ,流計算 SparkStreaming.
- 用戶可以在同一個應(yīng)用同時使用這些工具 , 這一點是劃時代的.
兼容
- Spark 可以運行在 Hadoop Yarn ,Apache Mesos, Kubernets, Spark Standalone 等集群中
- Spark 可以訪問 HBase , HDFS , Hive , Cassandra 在內(nèi)的多種數(shù)據(jù)庫
總結(jié)
- 支持 多種語言的 API
- 可以擴展至 超過 8K 個節(jié)點
- 能夠在內(nèi)存中緩存數(shù)據(jù)集,以實現(xiàn)交互式數(shù)據(jù)分析
- 提供命令行窗口, 減少探索式的數(shù)據(jù)分析的反應(yīng)時間
1.3 Spark 組件
理解 Spark 能做什么
理解 Spark 的學(xué)習(xí)路線
- Spark 最核心的功能是 RDDS , RDDS 存在于 Spark-Core 這個包中,這個包也是 Spark 最核心的包
同時 Spark 在 Spark-Core 的上層提供了很多的工具,以便適用于不同類型的計算
Spark-Core 和 RDDs(彈性分布式數(shù)據(jù)集 Resilient distributed datasets)
- Spark SQL 在 Spark-core 基礎(chǔ)之上帶出了一個名為 DataSet 和 DataFrame 的數(shù)據(jù)抽象化的概念
- Spark SQL 提供了在 DataSet 和 DataFrame 之上執(zhí)行 SQL 的能力
- Spark SQL 提供了DSL ,可以通過 Scala , Java , Python 等語言來操作 DataSet 和 DataFrame
- 它還支持 使用 JDBC / ODBC 服務(wù)器操作 SQL 語言
Spark Streaming
- Spark Streaming 充分利用 Spark-Core 的快速調(diào)度能力來運行 流分析
- 它截取小批量的數(shù)據(jù)并可以對之運行 RDD Transformation
- 它提供了在同一個程序中同時使用 流分析 和批量分析的能力
MLlib
- MLlib 是 Spark 上分布式機器學(xué)習(xí)的框架, Spark 分布式內(nèi)存的架構(gòu)比 Hadoop 吸盤式的 Apache Mahout 快上 10 倍, 有很大的擴展性
- MLlib 可以使用許多常見的 機器學(xué)習(xí) 和 統(tǒng)計算法 ,簡化大規(guī)模 機器學(xué)習(xí)
- 匯總統(tǒng)計,相關(guān)性,分層抽樣,假設(shè)檢定,隨機數(shù)據(jù)生成
- 支持向量機,回歸,線性回歸,邏輯回歸,決策樹,樸素貝葉斯
- 協(xié)同過濾, ALS
- k-means
- SVD 奇異值分解, PCA 主要成分分析
- TF-IDF, Word2Vec , StandardScaler
- SGD隨機梯度下降,L-BFGS
Graphx
- Graphx 是分布式圖計算框架,提供了一組可以表達圖計算的 API ,Graphx 還對這種抽象化提供了優(yōu)化運行
總結(jié)
- Spark 提供了批處理(RDDS) , 結(jié)構(gòu)化查詢(DataFrame), 流計算 (SparkStreaming),機器學(xué)習(xí)(MLlib),圖計算(GraphX) 組件
- 這些組件都是依托于 通用的 計算引擎 RDDS 而構(gòu)建出來的,所以 Spark-core 的 RDDS 是整個 Spark 的基礎(chǔ)
1.4 Spark 和 Hadoop 的差別

Spark 集群搭建
從Spark 的集群架構(gòu)開始,理解分布式環(huán)境,以及 Spark 的運行原理
理解Spark 的集群搭建,包括高可用的搭建方式
2.1 Spark 集群的結(jié)構(gòu)
通過應(yīng)用運行流程,理解分布式調(diào)度的基礎(chǔ)概念
Sprak 如果將程序運行在一個集群中?
Spark 自身是沒有集群管理工具的,但是如果想要管理數(shù)以千計的機器集群,就必須擁有一個集群的管理工具,所以 Spark集群 可以借助外部的集群工具來進行管理
-
整個的流程使用 Spark 的 Client 提交任務(wù), 找到集群管理工具去申請資源, 然后將計算任務(wù)分發(fā)到集群中運行。
流程
名詞解釋
Driver
- 該進程調(diào)用 Spark 程序的 Main 方法,并且啟動 SparkContext
Cluster Manager
- 該進程負責(zé)和外部集群工具打交道,申請和釋放集群資源
worker
- 該進程是一個守護進程,負責(zé)啟動和管理 Executor
Executor
-
該進程是一個 JVM 虛擬機,負責(zé)運行 Spark Task
運行一個 Spark 程序大致經(jīng)歷如下幾個步驟
1.啟動 Driver ,創(chuàng)建 SparkContext
- Client 提交程序給 Driver ,Driver 向Cluster Manager 申請集群資源
- 資源申請完畢,在 Worker 中 啟動 Executor
- Driver 將程序轉(zhuǎn)化為 Tasks ,分發(fā)給 Executor 執(zhí)行
問題一: Spark 程序可以運行在什么地方?
- 集群:一組協(xié)同工作的計算機,任務(wù)在調(diào)度軟件的控制下,表現(xiàn)的好像在使用一臺計算機一樣
- 集群管理工具: 調(diào)度任務(wù)到集群的軟件
- 常見的機權(quán)管理工具: Hadoop Yarn , Apache Mesos, Kubernetes
Spark 可以將任務(wù)運行在兩種模式下:
- 單機: 使用線程模擬并行來運行程序
- 集群: 使用集群故管理器 來和不同類型的集群交互,將任務(wù)運行在集群中
Spark 可以使用的集群管理工具有:
- Spark Standalone (自帶的)
- Hadoop Yarn
- Apache Mesos
- Kubernetes
問題二: Driver 和 Worker 什么時候啟動?


在Standalone 集群中,分為兩個角色, Master 和 Slave , 而 Slave 就是Worker ,所以在 Standalone 集群中,啟動之初就會創(chuàng)建固定數(shù)量的 Worker
Driver 的啟動分為兩種模式: Client 和 Cluster
在 Client 模式下, Driver 運行在 Client 端,在Client 啟動的時候被啟動。
-
在 Cluster 模式下,Driver 運行在某個 Worker 中,隨著應(yīng)用的提交而啟動。
在Yarn 集群模式下,也依然分為 Client 模式和 Cluster 模式,較新的版本中已經(jīng)開始廢棄 Client 模式了,所以上圖所示的就是 Cluster 模式
如果要在 Yarn 中運行 Spark 程序,首先要和 ResourceManager 交互,開啟 ApplicationMaster ,其中運行了 Driver ,Driver 創(chuàng)建基礎(chǔ)環(huán)境后,會由 ResourceManager 提供到對應(yīng)的容器中,運行 Executor , Executot 會反向向 Driver 注冊自己,并申請 Tasks 執(zhí)行。
總結(jié)
- Master 負責(zé)總控,調(diào)度,管理和協(xié)調(diào) Worker ,保留資源狀況等。
- Slave 對應(yīng)的 Worker 節(jié)點,用于啟動 Executor 執(zhí)行 Task ,定期向 Master 匯報
- Driver 運行在 Client 或者 Slave(Worker) 中,默認運行在 Slave(Worker ) 中
RDD 是什么?
定義:
RDD,全稱為 Resilient Distributed Datasets ,是一個容錯的,并行的數(shù)據(jù)結(jié)構(gòu),可以讓用戶顯式地將數(shù)據(jù)存儲到磁盤和內(nèi)存中,并能控制數(shù)據(jù)的分區(qū)。
同時,RDD, 還提供了一組豐富的操作來操作這些數(shù)據(jù),在這些操作中,包括 map ,flatMap ,filter 等轉(zhuǎn)換操作實現(xiàn)了 MONAD 模式,很好的契合了 Scala 的集合操作,除此之外,RDD 還提供了 join , groupBy ,reduceBykey 等更方便的操作,以支持創(chuàng)建的數(shù)據(jù)運算,通常來講,針對數(shù)據(jù)處理有幾種常見的模型: 包括: Iterative Algorithms, Relational Queries, MapReduce, Stream Processing.例如 Hadoop MapReduce 采用了 MapReduce 模型, Storm 則采用了 Stream Processing 模型.
而RDD 則混合了這四種模型,使得 Spark 可以應(yīng)用于各種大數(shù)據(jù)的處理場景。
RDD 作為數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是一個只讀的分區(qū)記錄集合,一個RDD 可以包含多個分區(qū),每個分區(qū)就是一個DataSet 片段。RDD 之間可以相互的依賴,如果 RDD 的每個分區(qū)最多只能被一個子 RDD 的一個分區(qū)使用,則稱之為: “窄依賴” ,若被多個子 RDD 的分區(qū)依賴,稱之為“寬依賴” ,不同的操作,依據(jù)其特性,可能會產(chǎn)生不同的依賴 ,列入 Map 操作則會插產(chǎn)生窄依賴,而 Join 操作,就會產(chǎn)生寬依賴。
特點
- RDD 是一個編程模型
- RDD允許用戶顯式的指定數(shù)據(jù)存放在內(nèi)存或者磁盤
- RDD 是分布式的,用戶可以控制 RDD 的分區(qū)
- RDD 是一個編程模型
- RDD提供了豐富的操作API
- RDD 提供了reduceByKey ,groupByKey 等操作符,用以操作 Key-Value 型數(shù)據(jù)
- RDD提供了 max ,min , mean 等操作符,用以操作數(shù)字型數(shù)據(jù)
- RDD 提供了 map ,flatMap ,filter 等操作符,用以實現(xiàn) Monad 模式(簡單說,Monad就是一種設(shè)計模式,表示將一個運算過程,通過函數(shù)拆解成互相連接的多個步驟。你只要提供下一步運算所需的函數(shù),整個運算就會自動進行下去。)
3.RDD 是混合型編程模型,可以支持迭代計算,關(guān)系查詢,MapReduce ,流計算
4.RDD 是只讀的。
5.RDD 之間有依賴關(guān)系,更具執(zhí)行操作的操作符的不同,依賴關(guān)系可以分為 寬依賴 和 窄依賴。

(wordcount.txt)單詞統(tǒng)計的 案例程序從結(jié)構(gòu)上可以用上圖表示,分為兩大部分
存儲
- 文件如果存放在 HDFS 上 ,就是分塊的,類似上圖所示,這個 wordcount.txt 分了三塊
計算
Spark 不僅可以讀HDFS ,Spark 還可以讀取很多其他的數(shù)據(jù)集,Spark 可以從數(shù)據(jù)集中創(chuàng)建出 RDD
比如: 上圖中,使用了一個RDD 表示 HDFS 上的 某一個文件,這個文件在 HDFS 中是分為 3塊的,那么 RDD 在讀取的時候也會有 3 個分區(qū), 每個 RDD 的分區(qū)對應(yīng)了一個 HDFS 的分塊。
后續(xù)RDD 在計算的時候,可以更改分區(qū),也可以繼續(xù)保持三個分區(qū),每個分區(qū)之間有依賴關(guān)系,列入說 RDD2 的分區(qū)1 依賴了 RDD1 的分區(qū)1
RDD之所以要設(shè)計為有分區(qū),是因為要進行分布式的計算,每個不同的分區(qū)在不同的線程,或者進程,甚至節(jié)點中,從而做到了并行計算。
總結(jié)
- RDD 是彈性分布式數(shù)據(jù)集
- RDD 一個非常重要的基礎(chǔ)是RDD 可以分區(qū),因為RDD 是運行在分布式的環(huán)境下的。
4.1 創(chuàng)建RDD
SparkContext 程序的入口
val conf = new SparkConf().setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
SparkContext 是 Spark-core 的入口組件,是一個 Spark 的程序的入口,是一個元老級別的API 了.
如果把 一個 Spark 程序分為前后端,那么服務(wù)端就是可以運行在 Spark 程序的集群,而 Driver 就是 Spark 的前端,在 Driver 中 SparkContext 是最主要的組件,也是 Driver 在運行時首先會創(chuàng)建的組件,是 Driver 的核心組件。
SparkContext 從提供的 API 來看,主要的作用是來連接集群,創(chuàng)建 RDD ,累加器, 廣播變量等 功能。
簡單的說,RDD 有三種創(chuàng)建方式
RDD 可以通過本地集合之間創(chuàng)建
RDD 也可以通過讀取外部數(shù)據(jù)集來創(chuàng)建
RDD 也可以通過其他的 RDD 衍生而來
通過本地集合直接創(chuàng)建 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val list = List(1, 2, 3, 4, 5, 6)
val rddParallelize = sc.parallelize(list, 2)
val rddMake = sc.makeRDD(list, 2)
通過 parallelize 和 makeRDD 這兩個 API 可以通過本地集合創(chuàng)建 RDD
這兩個 API 本質(zhì)上是一樣的, 在 makeRDD 這個方法的內(nèi)部, 最終也是調(diào)用了 parallelize
因為不是從外部直接讀取數(shù)據(jù)集的, 所以沒有外部的分區(qū)可以借鑒, 于是在這兩個方法都都有兩個參數(shù), 第一個參數(shù)是本地集合, 第二個參數(shù)是分區(qū)數(shù)
通過讀取外部文件創(chuàng)建 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
訪問方式:
支持訪問文件夾
sc.textFile("hdfs:///dataset")
支持訪問壓縮文件
sc.textFile("hdfs:///dataset/words.gz")
支持通過通配符訪問
sc.textFile("hdfs:///dataset/*.txt")
注意
如果把 Spark 應(yīng)用泡在集群上,則 worker 有可能在任何一個節(jié)點上運行
所以如果使用 如下形式訪問本地文件的話,要確保所有的 worker 中的對應(yīng)的路徑上有這個文件,否則可能會報錯,無法找到這個文件。
file:///…?
分區(qū)
默認情況下讀取 hdfs 中的文件的時候,每個 hdfs 的 block 對應(yīng)一個 RDD 的 partition ,block 的默認是 128M
通過第二個參數(shù),可以指定分區(qū)數(shù)量
sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 20)
如果通過第二個參數(shù)指定了分區(qū),這個分區(qū)的數(shù)量一定不能小于 “block” 的數(shù)量
通過其他的 RDD 衍生 新的 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 20)
val words = source.flatMap { line => line.split(" ") }
source 是通過讀取 HDFS 中的文件所創(chuàng)建的
words 是通過 source 調(diào)用算子 map 生成的新 RDD
通常每個 CPU core 對應(yīng) 2-4 個分區(qū)是合理的值
支持的平臺
支持Hadoop幾乎所有的數(shù)據(jù)格式,支持HDFS 的訪問
通過第三方的這次hi,可以訪問Aws 和 阿里云中的文件,可以到對應(yīng)的平臺查看API
總結(jié):
RDD 可以通過三種方式來創(chuàng)建,本地集合創(chuàng)建,外部數(shù)據(jù)集創(chuàng)建,其他的RDD 衍生。
RDD 算子
理解各個算子的作用
通過理解算子的作用,反向理解 WordCount 程序,以及 Spark的要點
Map算子
sc.parallelize(Seq(1, 2, 3))
.map( num => num * 10 )
.collect()


作用
把RDD 中的數(shù)據(jù)一對一的轉(zhuǎn)為 另外的一種形式
調(diào)用
def map[U: ClassTag](f: T ? U): RDD[U]
參數(shù)
f → Map 算子是 原RDD → 新RDD 的過程, 這個函數(shù)的參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù)
注意點
Map 是一對一,如果函數(shù)是 String → Array [String ] 則新的 RDD 中每條數(shù)據(jù)就是一個數(shù)組
FlatMap 算子
sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
.flatMap( line => line.split(" ") )
.collect()


作用
FlatMap 算子和 Map 算子雷士,但是FlatMap 是一對多
調(diào)用
def flatMap[U: ClassTag](f: T ? List[U]): RDD[U]
參數(shù)
f → 參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù), 需要注意的是返回值是一個集合, 集合中的數(shù)據(jù)會被展平后再放入新的 RDD
注意點
flatMap 其實是兩個操作,是 map + flatten ,也是先轉(zhuǎn)換,后把轉(zhuǎn)換而來的List 展開。
ReduceByKey
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.reduceByKey( (curr, agg) => curr + agg )
.collect()


作用
首先按照 Key 分組,接下來把整組的 Value 計算出一個聚合值,這個操作非常類似于 MapReduce 中的 Reduce
調(diào)用
def reduceByKey(func: (V, V) ? V): RDD[(K, V)]
參數(shù)
func → 執(zhí)行數(shù)據(jù)處理的函數(shù), 傳入兩個參數(shù), 一個是當前值, 一個是局部匯總, 這個函數(shù)需要有一個輸出, 輸出就是這個 Key 的匯總結(jié)果
注意點
ReduceByKey 只能作用于 Key-Value 型數(shù)據(jù), Key-Value 型數(shù)據(jù)在當前語境中特指 Tuple2
ReduceByKey 是一個需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因為類似 MapReduce 的, 在 Map 端有一個 Cominer, 這樣 I/O 的數(shù)據(jù)便會減少
總結(jié)
map 和 flatMap 算是都是轉(zhuǎn)換,只是 flatMap 在轉(zhuǎn)換過后會在執(zhí)行展開,所以 map 是一對一,flatMap 是一對多
reduceByKey 類似于 MapReduce 中的reduce


