spark淺談

學(xué)習(xí)和使用一段時間的spark, 對spark的總結(jié)一下,希望對大家有用,不介紹怎么使用, 只從設(shè)計上談?wù)劇?/p>

spark解決了什么問題?

說spark前一定要說一下, 就不得不提Google的三駕馬車:Google FS、MapReduce、BigTable。其中對應(yīng)開源實(shí)現(xiàn)如下:

Google FS -> hdfs、
MapReduce -> hadoop mapreduce
BigTable  ->  hbase

spark就是處理 mapreduce慢的問題。
在spark沒出現(xiàn)前, hadoop是 v1 版本 有兩個問題,

  1. 一個就是 hadoop的namenode單點(diǎn)以及內(nèi)存問題(數(shù)據(jù)的node是放在內(nèi)存中), v2也都解決了。
  2. hadoop的機(jī)器資源管理和計算管理都是 mapreduce進(jìn)程管理,就是執(zhí)行任務(wù)和資源都是mapduce一個在管理, v2獨(dú)立出 yarn才解決這個問題的
  3. mapreduce慢的問題, 還是不能解決。 一開始定位就是在廉價的機(jī)器上運(yùn)行。 定位不同。

說下mapreduce核心

  1. 移動數(shù)據(jù)不如移動計算。 比如數(shù)據(jù)在一個節(jié)點(diǎn)上, 那就把計算放在這個節(jié)點(diǎn)上, 這樣就沒有網(wǎng)絡(luò)磁盤IO了, 當(dāng)然需要考慮機(jī)器的負(fù)載繁忙等。
  2. 合久必分,分久必合。 數(shù)據(jù)量很大, 處理不了,就拆分,分發(fā)到多臺機(jī)器上,開始運(yùn)算,運(yùn)算結(jié)果再進(jìn)行合并,最后輸出。
    這就是 map(分) reduce(合) 中間還有shuffle(洗牌)。 map和reduce都是并行的。

hadoop mapreduce是基于 文件的,相當(dāng)于以數(shù)據(jù)為中心。 大量的磁盤網(wǎng)絡(luò)IO。 一個mapreduce只能計算一個結(jié)果,不能迭代計算。 必須是前一個mapreduce的輸出文件作為下一個輸出。

spark就是解決mapreduce的慢的, spark是內(nèi)存計算, 將數(shù)據(jù)加載到內(nèi)存中計算, 所有速度快。 spark也有map reduce概念。
進(jìn)行迭代計算。 數(shù)據(jù)在內(nèi)存中, 上一步的計算結(jié)果,可以在下一步進(jìn)行使用。

另外一個原因:
spark開發(fā)更容易,hadoop的mapreduce很麻煩,每次都要有 map,reuduce, driver三個類。

spark介紹

Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎,是一種開源的類Hadoop MapReduce的通用并行框架,擁有Hadoop MapReduce所具有的優(yōu)點(diǎn)。

Spark不同于MapReduce的是,Spark的Job中間輸出結(jié)果可以保存在內(nèi)存中,從而不再需要讀寫HDFS,因此Spark能更好地適用于數(shù)據(jù)挖掘與機(jī)器學(xué)習(xí)等需要迭代的MapReduce的算法。

Spark 主要有三個特點(diǎn)

首先,高級 API 剝離了對集群本身的關(guān)注,Spark 應(yīng)用開發(fā)者可以專注于應(yīng)用所要做的計算本身。

其次,Spark 很快,支持交互式計算和復(fù)雜算法。

最后,Spark 是一個通用引擎,可用它來完成各種各樣的運(yùn)算,包括 SQL 查詢、文本處理、機(jī)器學(xué)習(xí)等,而在 Spark 出現(xiàn)之前,我們一般需要學(xué)習(xí)各種各樣的引擎來分別處理這些需求。

總結(jié)一下:從各種方向上(比如開發(fā)速度和運(yùn)行速度等)來看,Spark都優(yōu)于Hadoop MapReduce;同時,Spark還提供大數(shù)據(jù)生態(tài)的一站式解決方案

spark架構(gòu)

spark core是基礎(chǔ),上面都是轉(zhuǎn)成 core來執(zhí)行的。

spark是分布式,分成master和 work.

部署方式有很多種, 不同方式,對節(jié)點(diǎn)稱呼不同

  1. spark的自身集群管理 master worker, 發(fā)布的是driver
  2. YARN 集群配合 hdfs使用的, 這個使用最多, spark沒有存儲。 所有用yarn和hdfs最密切。
  3. mesos
  4. k8s

spark核心

spark core的數(shù)據(jù)類型計算三種 RDD,Broadcast Variables,Accumulators
RDD:彈性分布式數(shù)據(jù)集
Broadcast Variables: 廣播變量 將變量廣播到所有執(zhí)行的節(jié)點(diǎn) 只讀
Accumulators: 累加器, 執(zhí)行節(jié)點(diǎn)可以將累加結(jié)果回傳到 driver, 執(zhí)行節(jié)點(diǎn),只寫。

核心是 RDD,包括SQL的數(shù)據(jù)類型 DataFrame和DataSet以及 stream的 DStream也是對RDD包裝的。

RDD特點(diǎn)
1)一組分區(qū)(Partition),即數(shù)據(jù)集的基本組成單位;
2)一個計算每個分區(qū)的函數(shù);
3)RDD之間的依賴關(guān)系;
4)一個Partitioner,即RDD的分片函數(shù);
5)一個列表,存儲存取每個Partition的優(yōu)先位置(preferred location)。

spark的功能都是在上面RDD數(shù)據(jù)結(jié)構(gòu)特點(diǎn)上擴(kuò)展完成的。

1. 分區(qū)

spark是分布式的, 分區(qū)就天然支持了, 可以提高并行度。 比如統(tǒng)計一個文件的word數(shù)量, 那不同分區(qū),不同task進(jìn)行處理,
最后將各個分區(qū)的結(jié)果合并就可以了。 分區(qū)可以改變。

2. 數(shù)據(jù)是只讀

RDD加的數(shù)據(jù)都是只讀的。 只讀保證了任務(wù)失敗重跑冪等性。 每一步執(zhí)行都是產(chǎn)生新的RDD,不會修改原RDD。

3. 函數(shù)

函數(shù)就是操作,這就是spark中的算子,RDD的操作算子包括兩類,一類叫做transformations,它是用來將RDD進(jìn)行轉(zhuǎn)化,構(gòu)建RDD的血緣關(guān)系;另一類叫做actions,它是用來觸發(fā)RDD的計算,得到RDD的相關(guān)計算結(jié)果或者將RDD保存的文件系統(tǒng)中。
就是所說的 惰性計算,沒有觸發(fā)計算,都是記錄計算步驟,觸發(fā)了步驟,才開始執(zhí)行。

4. 依賴

RDDs通過操作算子進(jìn)行轉(zhuǎn)換,轉(zhuǎn)換得到的新RDD包含了從其他RDDs衍生所必需的信息,RDDs之間維護(hù)著這種血緣關(guān)系,也稱之為依賴。

這是spark數(shù)據(jù)失敗重跑的依據(jù)。 DAG: 有向無環(huán)圖。 spark的迭代計算。 函數(shù)式編程鏈?zhǔn)?,在RDD中會保存一個依賴, 在上一個執(zhí)行完。 每一步就一個點(diǎn), 這樣構(gòu)成一個圖。

5. 緩存

如果在應(yīng)用程序中多次使用同一個RDD,可以將該RDD緩存起來,該RDD只有在第一次計算的時候會根據(jù)血緣關(guān)系得到分區(qū)的數(shù)據(jù),在后續(xù)其他地方用到該RDD的時候,會直接從緩存處取而不用再根據(jù)血緣關(guān)系計算,這樣就加速后期的重用。

6. checkpoint

雖然RDD的血緣關(guān)系天然地可以實(shí)現(xiàn)容錯,當(dāng)RDD的某個分區(qū)數(shù)據(jù)失敗或丟失,可以通過血緣關(guān)系重建。但是對于長時間迭代型應(yīng)用來說,隨著迭代的進(jìn)行,RDDs之間的血緣關(guān)系會越來越長,一旦在后續(xù)迭代過程中出錯,則需要通過非常長的血緣關(guān)系去重建,勢必影響性能。為此,RDD支持checkpoint將數(shù)據(jù)保存到持久化的存儲中,這樣就可以切斷之前的血緣關(guān)系,因為checkpoint后的RDD不需要知道它的父RDDs了,它可以從checkpoint處拿到數(shù)據(jù)。就是將數(shù)據(jù)持久化, 切斷DAG圖。

編程模型

給個示例:

package org.jackson.exp

import org.apache.spark.{SparkConf, SparkContext}


object Wd {
  def main(args: Array[String]): Unit = {
    // 設(shè)置 conf
    val conf = new SparkConf().setMaster("local[*]").setAppName("WC")
    // 創(chuàng)建SparkContext,該對象是提交spark App的入口
    val sc = new SparkContext(conf)
    sc.textFile("/Users/zego/IdeaProjects/sparkOne/input").
      flatMap(_.split(" ")).  // 將一行進(jìn)行按 " "拆分
      map((_, 1)). // 轉(zhuǎn)換數(shù)據(jù)類型 tuple
      reduceByKey(_ + _). // 基于key進(jìn)行 value 相加
      coalesce(1).   // 修改分區(qū)數(shù)
      saveAsTextFile("/Users/zego/IdeaProjects/sparkOne/output")

    sc.stop()
  }
}

不同分區(qū),不同task


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容