Spark_day01

1.1 Spark 是什么
  • Apache Spark 是一個快速的,多用途的集群計算系統(tǒng),相當于Hadoop MapReduce ,Spark 使用了內(nèi)存來保存中間結(jié)果,能在數(shù)據(jù)還未寫入磁盤的時候在內(nèi)存中進行運算.

  • Spark 只是一個計算框架,不像Hadoop 一樣包含分布式文件系統(tǒng)和完備的調(diào)度系統(tǒng),如果要使用 Spark ,就需要搭載其他的文件系統(tǒng)和其他的調(diào)度系統(tǒng).

為什么會有Spark ?
  • Spark 產(chǎn)生之前,已經(jīng)有非常成熟的計算系統(tǒng)的存在了,比如我們的 MapReduce ,這些計算系統(tǒng)提供了高層次的 API ,把計算運行在集群中并提供容錯能力,從而實現(xiàn)分布式計算.

  • 雖然這些框架提供了大量的對訪問利用計算資源的抽象,但是他們?nèi)鄙倭死梅植际絻?nèi)存的抽象,這些框架的缺點在于,多個計算之間的數(shù)據(jù)服用就是將中間的結(jié)果數(shù)據(jù)寫入到一個穩(wěn)定的文件系統(tǒng)中(比如: HDFS ),所以會產(chǎn)生數(shù)據(jù)的復(fù)制和備份,磁盤的 I/O 以及數(shù)據(jù)的 序列化, 所以這些框架在遇到需要在多個計算之間復(fù)用中間結(jié)果的操作的時會非常的不高效.

  • 而這些復(fù)用中間結(jié)果的操作是非常的常見的,例如: 迭代式計算,交互式的數(shù)據(jù)挖掘, 圖形計算等等

  • 認識到這個問題之后,學(xué)學(xué)術(shù)界提出了一個新的模型 : RDDS

  • RDDS 是一個可以容錯且并行的數(shù)據(jù)結(jié)構(gòu),它可以讓用戶顯式的將中間結(jié)果數(shù)據(jù)集保存在內(nèi)存中,并且通過控制數(shù)據(jù)集的分區(qū)來達到數(shù)據(jù)存放處理的最優(yōu)化.

  • 同時 RDDS 也提供了豐富的 API 來操作數(shù)據(jù)集

  • 后來 RDDS 被 AMPLab 在 一個叫做 Spark 的框架中進行了開源

總結(jié):
  1. Spark 是 Apache 的一個開源框架
  2. Spark 的母公司叫 Databticks
  3. Spark 是為了解決 MapReduce 等過去的計算系統(tǒng)無法在內(nèi)存中保存中間結(jié)果的問題
  4. Spark 的核心是 RDDS ,RDDS 不僅誰一個計算框架,也是一種數(shù)據(jù)結(jié)構(gòu).
  5. RDDS即Resilient distributed datasets(彈性分布式數(shù)據(jù)集)。
1.2 Spark 的特點(優(yōu)點)

理解 Spark 的特點,從而去理解 為什么要使用 Spark

速度快
  • Spark 在內(nèi)存中運行時,速度是 Hadoop MapReduce 的 100 倍
  • Spark 基于硬盤的運算速度 大概是 Hadoop MapReduce 的 10 倍
  • Spark 實現(xiàn)了一種叫做 RDDS 的 DAG(Directed Acyclic Graph)叫做有向無環(huán)圖 執(zhí)行引擎,其數(shù)據(jù)緩存在內(nèi)存中 可以迭代處理.
易用
  • Spark 支持 JAVA ,Scala , Python ,R , SQL 等多種語言的 API
  • Spark 支持超過 80 個 高級運算符使得 用戶非常容易的構(gòu)建并行計算程序
  • Spark 可以使用基于 Scala ,Python ,R , SQL 的 shell 交互式查詢
通用
  • Spark 提供一個完整的技術(shù)棧,包括 SQL 執(zhí)行, Dataset 命令式 API, 機器學(xué)習(xí)庫 MLlib ,圖計算框架 GraphX ,流計算 SparkStreaming.
  • 用戶可以在同一個應(yīng)用同時使用這些工具 , 這一點是劃時代的.
兼容
  • Spark 可以運行在 Hadoop Yarn ,Apache Mesos, Kubernets, Spark Standalone 等集群中
  • Spark 可以訪問 HBase , HDFS , Hive , Cassandra 在內(nèi)的多種數(shù)據(jù)庫
總結(jié)
  • 支持 多種語言的 API
  • 可以擴展至 超過 8K 個節(jié)點
  • 能夠在內(nèi)存中緩存數(shù)據(jù)集,以實現(xiàn)交互式數(shù)據(jù)分析
  • 提供命令行窗口, 減少探索式的數(shù)據(jù)分析的反應(yīng)時間
1.3 Spark 組件

理解 Spark 能做什么
理解 Spark 的學(xué)習(xí)路線

  • Spark 最核心的功能是 RDDS , RDDS 存在于 Spark-Core 這個包中,這個包也是 Spark 最核心的包
    同時 Spark 在 Spark-Core 的上層提供了很多的工具,以便適用于不同類型的計算
Spark-Core 和 RDDs(彈性分布式數(shù)據(jù)集 Resilient distributed datasets)
  • Spark SQL 在 Spark-core 基礎(chǔ)之上帶出了一個名為 DataSet 和 DataFrame 的數(shù)據(jù)抽象化的概念
  • Spark SQL 提供了在 DataSet 和 DataFrame 之上執(zhí)行 SQL 的能力
  • Spark SQL 提供了DSL ,可以通過 Scala , Java , Python 等語言來操作 DataSet 和 DataFrame
  • 它還支持 使用 JDBC / ODBC 服務(wù)器操作 SQL 語言
Spark Streaming
  • Spark Streaming 充分利用 Spark-Core 的快速調(diào)度能力來運行 流分析
  • 它截取小批量的數(shù)據(jù)并可以對之運行 RDD Transformation
  • 它提供了在同一個程序中同時使用 流分析 和批量分析的能力
MLlib
  • MLlib 是 Spark 上分布式機器學(xué)習(xí)的框架, Spark 分布式內(nèi)存的架構(gòu)比 Hadoop 吸盤式的 Apache Mahout 快上 10 倍, 有很大的擴展性
  • MLlib 可以使用許多常見的 機器學(xué)習(xí) 和 統(tǒng)計算法 ,簡化大規(guī)模 機器學(xué)習(xí)
  • 匯總統(tǒng)計,相關(guān)性,分層抽樣,假設(shè)檢定,隨機數(shù)據(jù)生成
  • 支持向量機,回歸,線性回歸,邏輯回歸,決策樹,樸素貝葉斯
  • 協(xié)同過濾, ALS
  • k-means
  • SVD 奇異值分解, PCA 主要成分分析
  • TF-IDF, Word2Vec , StandardScaler
  • SGD隨機梯度下降,L-BFGS
Graphx
  • Graphx 是分布式圖計算框架,提供了一組可以表達圖計算的 API ,Graphx 還對這種抽象化提供了優(yōu)化運行
總結(jié)
  • Spark 提供了批處理(RDDS) , 結(jié)構(gòu)化查詢(DataFrame), 流計算 (SparkStreaming),機器學(xué)習(xí)(MLlib),圖計算(GraphX) 組件
  • 這些組件都是依托于 通用的 計算引擎 RDDS 而構(gòu)建出來的,所以 Spark-core 的 RDDS 是整個 Spark 的基礎(chǔ)
1.4 Spark 和 Hadoop 的差別
Spark 和 Hadoop 的差別

Spark 集群搭建

從Spark 的集群架構(gòu)開始,理解分布式環(huán)境,以及 Spark 的運行原理
理解Spark 的集群搭建,包括高可用的搭建方式

2.1 Spark 集群的結(jié)構(gòu)

通過應(yīng)用運行流程,理解分布式調(diào)度的基礎(chǔ)概念

Sprak 如果將程序運行在一個集群中?
  • Spark 自身是沒有集群管理工具的,但是如果想要管理數(shù)以千計的機器集群,就必須擁有一個集群的管理工具,所以 Spark集群 可以借助外部的集群工具來進行管理

  • 整個的流程使用 Spark 的 Client 提交任務(wù), 找到集群管理工具去申請資源, 然后將計算任務(wù)分發(fā)到集群中運行。


    流程
名詞解釋
Driver
  • 該進程調(diào)用 Spark 程序的 Main 方法,并且啟動 SparkContext
Cluster Manager
  • 該進程負責(zé)和外部集群工具打交道,申請和釋放集群資源
worker
  • 該進程是一個守護進程,負責(zé)啟動和管理 Executor
Executor
  • 該進程是一個 JVM 虛擬機,負責(zé)運行 Spark Task


運行一個 Spark 程序大致經(jīng)歷如下幾個步驟

1.啟動 Driver ,創(chuàng)建 SparkContext

  1. Client 提交程序給 Driver ,Driver 向Cluster Manager 申請集群資源
  2. 資源申請完畢,在 Worker 中 啟動 Executor
  3. Driver 將程序轉(zhuǎn)化為 Tasks ,分發(fā)給 Executor 執(zhí)行
問題一: Spark 程序可以運行在什么地方?
  • 集群:一組協(xié)同工作的計算機,任務(wù)在調(diào)度軟件的控制下,表現(xiàn)的好像在使用一臺計算機一樣
  • 集群管理工具: 調(diào)度任務(wù)到集群的軟件
  • 常見的機權(quán)管理工具: Hadoop Yarn , Apache Mesos, Kubernetes

Spark 可以將任務(wù)運行在兩種模式下:

  • 單機: 使用線程模擬并行來運行程序
  • 集群: 使用集群故管理器 來和不同類型的集群交互,將任務(wù)運行在集群中
Spark 可以使用的集群管理工具有:
  • Spark Standalone (自帶的)
  • Hadoop Yarn
  • Apache Mesos
  • Kubernetes
問題二: Driver 和 Worker 什么時候啟動?

  • 在Standalone 集群中,分為兩個角色, Master 和 Slave , 而 Slave 就是Worker ,所以在 Standalone 集群中,啟動之初就會創(chuàng)建固定數(shù)量的 Worker

  • Driver 的啟動分為兩種模式: Client 和 Cluster

  • 在 Client 模式下, Driver 運行在 Client 端,在Client 啟動的時候被啟動。

  • 在 Cluster 模式下,Driver 運行在某個 Worker 中,隨著應(yīng)用的提交而啟動。


  • 在Yarn 集群模式下,也依然分為 Client 模式和 Cluster 模式,較新的版本中已經(jīng)開始廢棄 Client 模式了,所以上圖所示的就是 Cluster 模式

  • 如果要在 Yarn 中運行 Spark 程序,首先要和 ResourceManager 交互,開啟 ApplicationMaster ,其中運行了 Driver ,Driver 創(chuàng)建基礎(chǔ)環(huán)境后,會由 ResourceManager 提供到對應(yīng)的容器中,運行 Executor , Executot 會反向向 Driver 注冊自己,并申請 Tasks 執(zhí)行。

總結(jié)
  • Master 負責(zé)總控,調(diào)度,管理和協(xié)調(diào) Worker ,保留資源狀況等。
  • Slave 對應(yīng)的 Worker 節(jié)點,用于啟動 Executor 執(zhí)行 Task ,定期向 Master 匯報
  • Driver 運行在 Client 或者 Slave(Worker) 中,默認運行在 Slave(Worker ) 中

RDD 是什么?

定義:
  • RDD,全稱為 Resilient Distributed Datasets ,是一個容錯的,并行的數(shù)據(jù)結(jié)構(gòu),可以讓用戶顯式地將數(shù)據(jù)存儲到磁盤和內(nèi)存中,并能控制數(shù)據(jù)的分區(qū)。
    同時,RDD, 還提供了一組豐富的操作來操作這些數(shù)據(jù),在這些操作中,包括 map ,flatMap ,filter 等轉(zhuǎn)換操作實現(xiàn)了 MONAD 模式,很好的契合了 Scala 的集合操作,除此之外,RDD 還提供了 join , groupBy ,reduceBykey 等更方便的操作,以支持創(chuàng)建的數(shù)據(jù)運算,

  • 通常來講,針對數(shù)據(jù)處理有幾種常見的模型: 包括: Iterative Algorithms, Relational Queries, MapReduce, Stream Processing.例如 Hadoop MapReduce 采用了 MapReduce 模型, Storm 則采用了 Stream Processing 模型.

  • 而RDD 則混合了這四種模型,使得 Spark 可以應(yīng)用于各種大數(shù)據(jù)的處理場景。
    RDD 作為數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是一個只讀的分區(qū)記錄集合,一個RDD 可以包含多個分區(qū),每個分區(qū)就是一個DataSet 片段。

  • RDD 之間可以相互的依賴,如果 RDD 的每個分區(qū)最多只能被一個子 RDD 的一個分區(qū)使用,則稱之為: “窄依賴” ,若被多個子 RDD 的分區(qū)依賴,稱之為“寬依賴” ,不同的操作,依據(jù)其特性,可能會產(chǎn)生不同的依賴 ,列入 Map 操作則會插產(chǎn)生窄依賴,而 Join 操作,就會產(chǎn)生寬依賴。

特點
  1. RDD 是一個編程模型
  • RDD允許用戶顯式的指定數(shù)據(jù)存放在內(nèi)存或者磁盤
  • RDD 是分布式的,用戶可以控制 RDD 的分區(qū)
  1. RDD 是一個編程模型
  • RDD提供了豐富的操作API
  • RDD 提供了reduceByKey ,groupByKey 等操作符,用以操作 Key-Value 型數(shù)據(jù)
  • RDD提供了 max ,min , mean 等操作符,用以操作數(shù)字型數(shù)據(jù)
  • RDD 提供了 map ,flatMap ,filter 等操作符,用以實現(xiàn) Monad 模式(簡單說,Monad就是一種設(shè)計模式,表示將一個運算過程,通過函數(shù)拆解成互相連接的多個步驟。你只要提供下一步運算所需的函數(shù),整個運算就會自動進行下去。)

3.RDD 是混合型編程模型,可以支持迭代計算,關(guān)系查詢,MapReduce ,流計算
4.RDD 是只讀的。
5.RDD 之間有依賴關(guān)系,更具執(zhí)行操作的操作符的不同,依賴關(guān)系可以分為 寬依賴 和 窄依賴。


(wordcount.txt)單詞統(tǒng)計的 案例程序從結(jié)構(gòu)上可以用上圖表示,分為兩大部分
存儲
  • 文件如果存放在 HDFS 上 ,就是分塊的,類似上圖所示,這個 wordcount.txt 分了三塊
計算
  • Spark 不僅可以讀HDFS ,Spark 還可以讀取很多其他的數(shù)據(jù)集,Spark 可以從數(shù)據(jù)集中創(chuàng)建出 RDD

  • 比如: 上圖中,使用了一個RDD 表示 HDFS 上的 某一個文件,這個文件在 HDFS 中是分為 3塊的,那么 RDD 在讀取的時候也會有 3 個分區(qū), 每個 RDD 的分區(qū)對應(yīng)了一個 HDFS 的分塊。

  • 后續(xù)RDD 在計算的時候,可以更改分區(qū),也可以繼續(xù)保持三個分區(qū),每個分區(qū)之間有依賴關(guān)系,列入說 RDD2 的分區(qū)1 依賴了 RDD1 的分區(qū)1

  • RDD之所以要設(shè)計為有分區(qū),是因為要進行分布式的計算,每個不同的分區(qū)在不同的線程,或者進程,甚至節(jié)點中,從而做到了并行計算。

總結(jié)
  • RDD 是彈性分布式數(shù)據(jù)集
  • RDD 一個非常重要的基礎(chǔ)是RDD 可以分區(qū),因為RDD 是運行在分布式的環(huán)境下的。
4.1 創(chuàng)建RDD
SparkContext 程序的入口
val conf = new SparkConf().setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)

SparkContext 是 Spark-core 的入口組件,是一個 Spark 的程序的入口,是一個元老級別的API 了.
如果把 一個 Spark 程序分為前后端,那么服務(wù)端就是可以運行在 Spark 程序的集群,而 Driver 就是 Spark 的前端,在 Driver 中 SparkContext 是最主要的組件,也是 Driver 在運行時首先會創(chuàng)建的組件,是 Driver 的核心組件。
SparkContext 從提供的 API 來看,主要的作用是來連接集群,創(chuàng)建 RDD ,累加器, 廣播變量等 功能。

簡單的說,RDD 有三種創(chuàng)建方式
RDD 可以通過本地集合之間創(chuàng)建
RDD 也可以通過讀取外部數(shù)據(jù)集來創(chuàng)建
RDD 也可以通過其他的 RDD 衍生而來

通過本地集合直接創(chuàng)建 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)

val list = List(1, 2, 3, 4, 5, 6)
val rddParallelize = sc.parallelize(list, 2)
val rddMake = sc.makeRDD(list, 2)

通過 parallelize 和 makeRDD 這兩個 API 可以通過本地集合創(chuàng)建 RDD
這兩個 API 本質(zhì)上是一樣的, 在 makeRDD 這個方法的內(nèi)部, 最終也是調(diào)用了 parallelize
因為不是從外部直接讀取數(shù)據(jù)集的, 所以沒有外部的分區(qū)可以借鑒, 于是在這兩個方法都都有兩個參數(shù), 第一個參數(shù)是本地集合, 第二個參數(shù)是分區(qū)數(shù)

通過讀取外部文件創(chuàng)建 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)
val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt")
訪問方式:

支持訪問文件夾

sc.textFile("hdfs:///dataset")

支持訪問壓縮文件

sc.textFile("hdfs:///dataset/words.gz")

支持通過通配符訪問

sc.textFile("hdfs:///dataset/*.txt")
注意

如果把 Spark 應(yīng)用泡在集群上,則 worker 有可能在任何一個節(jié)點上運行
所以如果使用 如下形式訪問本地文件的話,要確保所有的 worker 中的對應(yīng)的路徑上有這個文件,否則可能會報錯,無法找到這個文件。

file:///…?
分區(qū)

默認情況下讀取 hdfs 中的文件的時候,每個 hdfs 的 block 對應(yīng)一個 RDD 的 partition ,block 的默認是 128M
通過第二個參數(shù),可以指定分區(qū)數(shù)量

sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 20)

如果通過第二個參數(shù)指定了分區(qū),這個分區(qū)的數(shù)量一定不能小于 “block” 的數(shù)量

通過其他的 RDD 衍生 新的 RDD
val conf = new SparkConf().setMaster("local[2]")
val sc = new SparkContext(conf)

val source: RDD[String] = sc.textFile("hdfs://node01:8020/dataset/wordcount.txt", 20)
val words = source.flatMap { line => line.split(" ") }

source 是通過讀取 HDFS 中的文件所創(chuàng)建的
words 是通過 source 調(diào)用算子 map 生成的新 RDD

通常每個 CPU core 對應(yīng) 2-4 個分區(qū)是合理的值
支持的平臺

支持Hadoop幾乎所有的數(shù)據(jù)格式,支持HDFS 的訪問
通過第三方的這次hi,可以訪問Aws 和 阿里云中的文件,可以到對應(yīng)的平臺查看API

總結(jié):

RDD 可以通過三種方式來創(chuàng)建,本地集合創(chuàng)建,外部數(shù)據(jù)集創(chuàng)建,其他的RDD 衍生。

RDD 算子

理解各個算子的作用
通過理解算子的作用,反向理解 WordCount 程序,以及 Spark的要點

Map算子
sc.parallelize(Seq(1, 2, 3))
  .map( num => num * 10 )
  .collect()
作用

把RDD 中的數(shù)據(jù)一對一的轉(zhuǎn)為 另外的一種形式

調(diào)用
def map[U: ClassTag](f: T ? U): RDD[U]
參數(shù)

f → Map 算子是 原RDD → 新RDD 的過程, 這個函數(shù)的參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù)

注意點

Map 是一對一,如果函數(shù)是 String → Array [String ] 則新的 RDD 中每條數(shù)據(jù)就是一個數(shù)組

FlatMap 算子
sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
  .flatMap( line => line.split(" ") )
  .collect()
作用

FlatMap 算子和 Map 算子雷士,但是FlatMap 是一對多

調(diào)用
def flatMap[U: ClassTag](f: T ? List[U]): RDD[U]
參數(shù)

f → 參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù), 需要注意的是返回值是一個集合, 集合中的數(shù)據(jù)會被展平后再放入新的 RDD

注意點

flatMap 其實是兩個操作,是 map + flatten ,也是先轉(zhuǎn)換,后把轉(zhuǎn)換而來的List 展開。

ReduceByKey
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .reduceByKey( (curr, agg) => curr + agg )
  .collect()
作用

首先按照 Key 分組,接下來把整組的 Value 計算出一個聚合值,這個操作非常類似于 MapReduce 中的 Reduce

調(diào)用

def reduceByKey(func: (V, V) ? V): RDD[(K, V)]

參數(shù)

func → 執(zhí)行數(shù)據(jù)處理的函數(shù), 傳入兩個參數(shù), 一個是當前值, 一個是局部匯總, 這個函數(shù)需要有一個輸出, 輸出就是這個 Key 的匯總結(jié)果

注意點

ReduceByKey 只能作用于 Key-Value 型數(shù)據(jù), Key-Value 型數(shù)據(jù)在當前語境中特指 Tuple2
ReduceByKey 是一個需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因為類似 MapReduce 的, 在 Map 端有一個 Cominer, 這樣 I/O 的數(shù)據(jù)便會減少

總結(jié)

map 和 flatMap 算是都是轉(zhuǎn)換,只是 flatMap 在轉(zhuǎn)換過后會在執(zhí)行展開,所以 map 是一對一,flatMap 是一對多
reduceByKey 類似于 MapReduce 中的reduce

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容