學(xué)習(xí)和使用一段時間的spark, 對spark的總結(jié)一下,希望對大家有用,不介紹怎么使用, 只從設(shè)計上談?wù)劇?/p>
spark解決了什么問題?
說spark前一定要說一下, 就不得不提Google的三駕馬車:Google FS、MapReduce、BigTable。其中對應(yīng)開源實(shí)現(xiàn)如下:
Google FS -> hdfs、
MapReduce -> hadoop mapreduce
BigTable -> hbase
spark就是處理 mapreduce慢的問題。
在spark沒出現(xiàn)前, hadoop是 v1 版本 有兩個問題,
- 一個就是 hadoop的namenode單點(diǎn)以及內(nèi)存問題(數(shù)據(jù)的node是放在內(nèi)存中), v2也都解決了。
- hadoop的機(jī)器資源管理和計算管理都是 mapreduce進(jìn)程管理,就是執(zhí)行任務(wù)和資源都是mapduce一個在管理, v2獨(dú)立出 yarn才解決這個問題的
- mapreduce慢的問題, 還是不能解決。 一開始定位就是在廉價的機(jī)器上運(yùn)行。 定位不同。
說下mapreduce核心:
-
移動數(shù)據(jù)不如移動計算。 比如數(shù)據(jù)在一個節(jié)點(diǎn)上, 那就把計算放在這個節(jié)點(diǎn)上, 這樣就沒有網(wǎng)絡(luò)磁盤IO了, 當(dāng)然需要考慮機(jī)器的負(fù)載繁忙等。 -
合久必分,分久必合。 數(shù)據(jù)量很大, 處理不了,就拆分,分發(fā)到多臺機(jī)器上,開始運(yùn)算,運(yùn)算結(jié)果再進(jìn)行合并,最后輸出。
這就是 map(分) reduce(合) 中間還有shuffle(洗牌)。 map和reduce都是并行的。
hadoop mapreduce是基于 文件的,相當(dāng)于以數(shù)據(jù)為中心。 大量的磁盤網(wǎng)絡(luò)IO。 一個mapreduce只能計算一個結(jié)果,不能迭代計算。 必須是前一個mapreduce的輸出文件作為下一個輸出。
spark就是解決mapreduce的慢的, spark是內(nèi)存計算, 將數(shù)據(jù)加載到內(nèi)存中計算, 所有速度快。 spark也有map reduce概念。
進(jìn)行迭代計算。 數(shù)據(jù)在內(nèi)存中, 上一步的計算結(jié)果,可以在下一步進(jìn)行使用。
另外一個原因:
spark開發(fā)更容易,hadoop的mapreduce很麻煩,每次都要有 map,reuduce, driver三個類。
spark介紹
Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎,是一種開源的類Hadoop MapReduce的通用并行框架,擁有Hadoop MapReduce所具有的優(yōu)點(diǎn)。
Spark不同于MapReduce的是,Spark的Job中間輸出結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫HDFS,因此Spark能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法。
Spark 主要有三個特點(diǎn)
首先,高級 API 剝離了對集群本身的關(guān)注,Spark 應(yīng)用開發(fā)者可以專注于應(yīng)用所要做的計算本身。
其次,Spark 很快,支持交互式計算和復(fù)雜算法。
最后,Spark 是一個通用引擎,可用它來完成各種各樣的運(yùn)算,包括 SQL 查詢、文本處理、機(jī)器學(xué)習(xí)等,而在 Spark 出現(xiàn)之前,我們一般需要學(xué)習(xí)各種各樣的引擎來分別處理這些需求。
總結(jié)一下:從各種方向上(比如開發(fā)速度和運(yùn)行速度等)來看,Spark都優(yōu)于Hadoop MapReduce;同時,Spark還提供大數(shù)據(jù)生態(tài)的一站式解決方案
spark架構(gòu)

spark core是基礎(chǔ),上面都是轉(zhuǎn)成 core來執(zhí)行的。
spark是分布式,分成master和 work.

部署方式有很多種, 不同方式,對節(jié)點(diǎn)稱呼不同
- spark的自身集群管理 master worker, 發(fā)布的是driver
- YARN 集群配合 hdfs使用的, 這個使用最多, spark沒有存儲。 所有用yarn和hdfs最密切。
- mesos
- k8s
spark核心
spark core的數(shù)據(jù)類型計算三種 RDD,Broadcast Variables,Accumulators
RDD:彈性分布式數(shù)據(jù)集
Broadcast Variables: 廣播變量 將變量廣播到所有執(zhí)行的節(jié)點(diǎn) 只讀
Accumulators: 累加器, 執(zhí)行節(jié)點(diǎn)可以將累加結(jié)果回傳到 driver, 執(zhí)行節(jié)點(diǎn),只寫。
核心是 RDD,包括SQL的數(shù)據(jù)類型 DataFrame和DataSet以及 stream的 DStream也是對RDD包裝的。
RDD特點(diǎn)
1)一組分區(qū)(Partition),即數(shù)據(jù)集的基本組成單位;
2)一個計算每個分區(qū)的函數(shù);
3)RDD之間的依賴關(guān)系;
4)一個Partitioner,即RDD的分片函數(shù);
5)一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。
spark的功能都是在上面RDD數(shù)據(jù)結(jié)構(gòu)特點(diǎn)上擴(kuò)展完成的。
1. 分區(qū)
spark是分布式的, 分區(qū)就天然支持了, 可以提高并行度。 比如統(tǒng)計一個文件的word數(shù)量, 那不同分區(qū),不同task進(jìn)行處理,
最后將各個分區(qū)的結(jié)果合并就可以了。 分區(qū)可以改變。
2. 數(shù)據(jù)是只讀
RDD加的數(shù)據(jù)都是只讀的。 只讀保證了任務(wù)失敗重跑冪等性。 每一步執(zhí)行都是產(chǎn)生新的RDD,不會修改原RDD。
3. 函數(shù)
函數(shù)就是操作,這就是spark中的算子,RDD的操作算子包括兩類,一類叫做transformations,它是用來將RDD進(jìn)行轉(zhuǎn)化,構(gòu)建RDD的血緣關(guān)系;另一類叫做actions,它是用來觸發(fā)RDD的計算,得到RDD的相關(guān)計算結(jié)果或者將RDD保存的文件系統(tǒng)中。
就是所說的 惰性計算,沒有觸發(fā)計算,都是記錄計算步驟,觸發(fā)了步驟,才開始執(zhí)行。
4. 依賴
RDDs通過操作算子進(jìn)行轉(zhuǎn)換,轉(zhuǎn)換得到的新RDD包含了從其他RDDs衍生所必需的信息,RDDs之間維護(hù)著這種血緣關(guān)系,也稱之為依賴。
這是spark數(shù)據(jù)失敗重跑的依據(jù)。 DAG: 有向無環(huán)圖。 spark的迭代計算。 函數(shù)式編程鏈?zhǔn)?,在RDD中會保存一個依賴, 在上一個執(zhí)行完。 每一步就一個點(diǎn), 這樣構(gòu)成一個圖。
5. 緩存
如果在應(yīng)用程序中多次使用同一個RDD,可以將該RDD緩存起來,該RDD只有在第一次計算的時候會根據(jù)血緣關(guān)系得到分區(qū)的數(shù)據(jù),在后續(xù)其他地方用到該RDD的時候,會直接從緩存處取而不用再根據(jù)血緣關(guān)系計算,這樣就加速后期的重用。
6. checkpoint
雖然RDD的血緣關(guān)系天然地可以實(shí)現(xiàn)容錯,當(dāng)RDD的某個分區(qū)數(shù)據(jù)失敗或丟失,可以通過血緣關(guān)系重建。但是對于長時間迭代型應(yīng)用來說,隨著迭代的進(jìn)行,RDDs之間的血緣關(guān)系會越來越長,一旦在后續(xù)迭代過程中出錯,則需要通過非常長的血緣關(guān)系去重建,勢必影響性能。為此,RDD支持checkpoint將數(shù)據(jù)保存到持久化的存儲中,這樣就可以切斷之前的血緣關(guān)系,因為checkpoint后的RDD不需要知道它的父RDDs了,它可以從checkpoint處拿到數(shù)據(jù)。就是將數(shù)據(jù)持久化, 切斷DAG圖。
編程模型
給個示例:
package org.jackson.exp
import org.apache.spark.{SparkConf, SparkContext}
object Wd {
def main(args: Array[String]): Unit = {
// 設(shè)置 conf
val conf = new SparkConf().setMaster("local[*]").setAppName("WC")
// 創(chuàng)建SparkContext,該對象是提交spark App的入口
val sc = new SparkContext(conf)
sc.textFile("/Users/zego/IdeaProjects/sparkOne/input").
flatMap(_.split(" ")). // 將一行進(jìn)行按 " "拆分
map((_, 1)). // 轉(zhuǎn)換數(shù)據(jù)類型 tuple
reduceByKey(_ + _). // 基于key進(jìn)行 value 相加
coalesce(1). // 修改分區(qū)數(shù)
saveAsTextFile("/Users/zego/IdeaProjects/sparkOne/output")
sc.stop()
}
}
不同分區(qū),不同task
