序列化
在實(shí)際開發(fā)中會(huì)自定義一些對(duì)RDD的操作,此時(shí)需要注意的是:
- 初始化工作在Driver端進(jìn)行的
- 實(shí)際運(yùn)行程序是在Executor端進(jìn)行的
那么在這個(gè)過程就涉及到網(wǎng)絡(luò)通信,是需要進(jìn)行序列化的
舉例:
/**
* @description:
* @date: 2020-10-27 17:41
**/
object SuperWordCount {
//屬性的實(shí)例化就是在driver
private val list = "and of see the to a in".split("\\s+")
private val p = """[()\\?\\.,:;'’”“!\\? ]"""
def main(args: Array[String]): Unit = {
//在driver端進(jìn)行初始化sc
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
//涉及到RDD的操作,就是在Executor端
val lines = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")
lines.flatMap(_.split("\\s+"))
.map(word => {
word.toLowerCase()
.replaceAll(p, "")
}).filter(word => word.trim.length > 0 && !list.contains(word))
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.collect().foreach(println(_))
}
}
序列化:
package com.hhb.spark.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @date: 2020-10-29 13:54
**/
class MyClass1(x: Int) {
val num = x
}
case class MyClass2(x: Int) {
val num = x
}
class MyClass3(x: Int) extends Serializable {
val num = x
}
object SerializableDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
val rdd = sc.makeRDD(1 to 20)
//過程和方法,都具備序列化的能力
def add1(x: Int) = x + 100
val add2 = add1 _
//
// rdd.map(add1(_)).foreach(println(_))
// rdd.map(add2(_)).foreach(println(_))
val object1 = new MyClass1(1)
//MyClass1沒有序列化,提示 org.apache.spark.SparkException: Task not serializable
// rdd.map(_ + object1.num).foreach(println(_))
// 解決1: 使用樣例類
val object2 = MyClass2(2)
rdd.map(_ + object2.num).foreach(println(_))
println("*" * 15)
//解決2:實(shí)現(xiàn)Serializable接口
val object3 = new MyClass3(3)
rdd.map(_ + object3.num).foreach(println(_))
}
}
RDD依賴關(guān)系
RDD只支持粗粒度抓獲,即在大量記錄上執(zhí)行的單個(gè)操作,將創(chuàng)建RDD的一系列Lineage血統(tǒng)記錄下來,以便恢復(fù)丟失的分區(qū)。
RDD的Lineage會(huì)記錄RDD的元數(shù)據(jù)信息和轉(zhuǎn)換行為,當(dāng)該RDD的部分分區(qū)丟失時(shí),可根據(jù)這些信息來重新運(yùn)算和和恢復(fù)丟失的數(shù)據(jù)分區(qū)。

RDD和他依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。依賴有兩個(gè)作用:
- 用來解決數(shù)據(jù)容錯(cuò)
- 用來?yè)Q份stage
窄依賴:1:1 或 n:1
寬依賴:n:m 意味著有shuffle


DAG(Directed Acyclic Graph)有向無環(huán)圖。原始的RDD通過一系列的轉(zhuǎn)換就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage:
- 對(duì)窄依賴,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算
- 對(duì)寬依賴由于有Shuffle的存在,只能在Parent RDD處理完成后,才能開始接下來的運(yùn)算
- 寬依賴是劃分Stage的依據(jù)

RDD任務(wù)切分中間分為:Driver programe、Job、Stage(TaskSet)和Task
- Driver programe:初始化一個(gè)SparkContext即生成一個(gè)Spark應(yīng)用
- job:一個(gè)Action算子就會(huì)生成一個(gè)Job
- Stage:根據(jù)RDD之間的依賴關(guān)系的不同,將Job劃分成不同的Stage,遇到一個(gè)寬依賴則劃分一個(gè)Stage
- Task:Stage是一個(gè)TashSet,將Stage劃分的結(jié)果發(fā)送到不同的Executor執(zhí)行,即為一個(gè)Task
- Task是Spark中任務(wù)調(diào)度的最小單位,每個(gè)Stage包含許多Task,這些Task執(zhí)行的計(jì)算邏輯相同,但是計(jì)算的數(shù)據(jù)不同
注意:Driver -> Job -> Stage -> Task 每一層都是1對(duì)n的關(guān)系
# 窄依賴
scala> val rdd1 = sc.parallelize(1 to 10 ,1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(11 to 20,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:27
scala> rdd3.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@7572c6dd, org.apache.spark.RangeDependency@d8194cc)
scala> rdd3.dependencies(0).rdd
res1: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[0] at parallelize at <console>:24
# 打印rdd1的數(shù)據(jù)
scala> rdd3.dependencies(0).rdd.collect
res2: Array[_] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
#寬依賴
random: scala.util.Random = scala.util.Random@81a48a8
scala> val arr = (1 to 100).map(idx => random.nextInt(100))
arr: scala.collection.immutable.IndexedSeq[Int] = Vector(96, 86, 87, 47, 60, 47, 40, 31, 75, 41, 13, 57, 16, 23, 47, 62, 75, 34, 64, 97, 37, 0, 71, 14, 24, 49, 95, 37, 58, 42, 51, 38, 19, 31, 76, 86, 62, 45, 88, 75, 99, 5, 6, 72, 75, 48, 60, 20, 21, 84, 64, 70, 21, 8, 12, 83, 76, 61, 37, 84, 39, 24, 61, 71, 61, 28, 33, 17, 0, 60, 90, 10, 72, 2, 70, 48, 2, 17, 51, 33, 4, 37, 91, 49, 75, 37, 15, 45, 67, 80, 22, 31, 67, 85, 75, 26, 75, 9, 12, 89)
scala> val rdd1 = sc.makeRDD(arr).map((_,1))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at map at <console>:26
scala> val rdd2 = rdd1.reduceByKey
reduceByKey reduceByKeyLocally
scala> val rdd2 = rdd1.reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:25
scala> rdd2.dependencies
res3: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1aaea328)
scala> rdd2.dependencies(0).rdd
res4: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[4] at map at <console>:26
scala> rdd2.dependencies(0).rdd.dependencies
res5: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@623e4193)
scala> rdd2.dependencies(0).rdd.dependencies(0).rdd.collect
res6: Array[_] = Array(96, 86, 87, 47, 60, 47, 40, 31, 75, 41, 13, 57, 16, 23, 47, 62, 75, 34, 64, 97, 37, 0, 71, 14, 24, 49, 95, 37, 58, 42, 51, 38, 19, 31, 76, 86, 62, 45, 88, 75, 99, 5, 6, 72, 75, 48, 60, 20, 21, 84, 64, 70, 21, 8, 12, 83, 76, 61, 37, 84, 39, 24, 61, 71, 61, 28, 33, 17, 0, 60, 90, 10, 72, 2, 70, 48, 2, 17, 51, 33, 4, 37, 91, 49, 75, 37, 15, 45, 67, 80, 22, 31, 67, 85, 75, 26, 75, 9, 12, 89)
dd2.toDebugString
res7: String =
(5) ShuffledRDD[5] at reduceByKey at <console>:25 []
+-(5) MapPartitionsRDD[4] at map at <console>:26 []
| ParallelCollectionRDD[3] at makeRDD at <console>:26 []
在談WordCount
scala> val rdd1 = sc.textFile("/azkaban-wc/wc.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /azkaban-wc/wc.txt MapPartitionsRDD[7] at textFile at <console>:24
scala> val rdd2 = rdd1.flatMap(_.split("\\s+"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at <console>:25
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:25
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at reduceByKey at <console>:25
scala> val rdd5 = rdd4.sortBy(_._2,false)
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at sortBy at <console>:25
scala> val rdd6 = rdd4.sortByKey()
rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at sortByKey at <console>:25
# 查看學(xué)院關(guān)系
scala> rdd5.toDebugString
res8: String =
(2) MapPartitionsRDD[15] at sortBy at <console>:25 []
| ShuffledRDD[14] at sortBy at <console>:25 []
+-(2) MapPartitionsRDD[11] at sortBy at <console>:25 []
| ShuffledRDD[10] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[9] at map at <console>:25 []
| MapPartitionsRDD[8] at flatMap at <console>:25 []
| /azkaban-wc/wc.txt MapPartitionsRDD[7] at textFile at <console>:24 []
| /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24 []
scala> rdd6.toDebugString
res9: String =
(2) ShuffledRDD[18] at sortByKey at <console>:25 []
+-(2) ShuffledRDD[10] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[9] at map at <console>:25 []
| MapPartitionsRDD[8] at flatMap at <console>:25 []
| /azkaban-wc/wc.txt MapPartitionsRDD[7] at textFile at <console>:24 []
| /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24 []
# 查看依賴
scala> rdd1.dependencies
res10: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@64cd5efa)
scala> rdd1.dependencies(0).rdd
res11: org.apache.spark.rdd.RDD[_] = /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24
scala> rdd5.dependencies(0).rdd
res12: org.apache.spark.rdd.RDD[_] = ShuffledRDD[14] at sortBy at <console>:25
scala> rdd6.dependencies(0).rdd
res13: org.apache.spark.rdd.RDD[_] = ShuffledRDD[10] at reduceByKey at <console>:25
scala> rdd6.dependencies(0)
res14: org.apache.spark.Dependency[_] = org.apache.spark.ShuffleDependency@3739758a
scala> rdd5.dependencies(0)
res15: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@2c4d4204
scala> rdd1.dependencies(0).rdd
res16: org.apache.spark.rdd.RDD[_] = /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24
# 查看最佳有限位置
scala> res16.preferredLocations(res16.partitions(0))
res17: Seq[String] = ArraySeq(linux121, linux122, linux123)
scala> res16.preferredLocations(res16.partitions(1))
res18: Seq[String] = ArraySeq(linux121, linux122, linux123)
# 只有兩個(gè)分區(qū)
scala> res16.preferredLocations(res16.partitions(3))
java.lang.ArrayIndexOutOfBoundsException: 3
... 49 elided
scala> rdd1.getNumPartitions
res20: Int = 2
# 使用hdfs命令檢查文件情況
hdfs fsck /azkaban-wc/wc.txt -files -blocks -locations
問題:上面的wordCount一共有幾個(gè)Job、幾個(gè)Stage、幾個(gè)Task?
1個(gè)Job、3個(gè)stage、6個(gè)Task

val rdd1 = sc.textFile("/azkaban-wc/wc.txt")
val rdd2 = rdd1.flatMap(_.split("\\s+"))
val rdd3 = rdd2.map((_,1))
val rdd4 = rdd3.reduceByKey(_+_)
val rdd6 = rdd4.sortByKey()
rdd6.count

RDD持久化/緩存
涉及到的算子:persist、cache、unpersist:都是Transformation
緩存是將計(jì)算結(jié)果寫入不同的介質(zhì),用戶定義存儲(chǔ)級(jí)別(存儲(chǔ)級(jí)別定義了緩存存儲(chǔ)的介質(zhì),目前支持內(nèi)存、磁盤、堆外內(nèi)存);通過緩存,Spark避免了RDD上的重復(fù)計(jì)算,能夠極大的提升計(jì)算速度;RDD持久化或緩存,是Spark最重要的特征之一,可以說,緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵因素。
Spark速度快的原因之一就是在內(nèi)存中持久化(或緩存)一個(gè)數(shù)據(jù)集,當(dāng)持久化一個(gè)RDD后,每一個(gè)節(jié)點(diǎn)都將把計(jì)算的分片結(jié)果保存在內(nèi)存中,并在對(duì)此數(shù)據(jù)集(或衍生的數(shù)據(jù)集)進(jìn)行其他動(dòng)作(Action)中重用。這樣事后續(xù)的動(dòng)作變得更加迅速
使用persist()方法對(duì)一個(gè)RDD標(biāo)記為持久化。之所以說“標(biāo)記為持久化”,是因?yàn)槌霈F(xiàn)persist()語句的地方,并不會(huì)馬上計(jì)算生成RDD并把它持久化,而是要等到遇到第一個(gè)行動(dòng)操作觸發(fā)真正的計(jì)算以后,才厚把計(jì)算結(jié)果持久化。

通過persist()或cache()方法可以標(biāo)記一個(gè)要被持久化的RDD,持久化被觸發(fā),RDD將會(huì)被保留在計(jì)算節(jié)點(diǎn)的內(nèi)存中并重用。
什么時(shí)候緩存數(shù)據(jù),需要對(duì)空間和速度進(jìn)行權(quán)衡,一般情況下,如果多個(gè)動(dòng)作需要用到某個(gè)RDD,而它的計(jì)算代價(jià)有很高,那么就應(yīng)該把這個(gè)RDD緩存起來;
緩存有可能丟失,或者存在于內(nèi)存中的數(shù)據(jù)由于內(nèi)存不足而被刪除,RDD緩存的容錯(cuò)機(jī)制保證了即使緩存丟失也能保證計(jì)算正確的執(zhí)行,通過基于RDD的一系列的轉(zhuǎn)化,丟失的數(shù)據(jù)會(huì)被重算。RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部的Partition。
persist()的參數(shù)可以指定持久化級(jí)別參數(shù)。使用cache()方法時(shí),會(huì)調(diào)用persist(MEMORY_ONLY)。即:
cache() == persist(StorageLevel.Memeory_ONLY)
使用unpersist()方法手動(dòng)的把持久化的RDD從緩存中移除
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
@DeveloperApi
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
}
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

cache RDD 以分區(qū)為單位(一存就是存一個(gè)分區(qū)的數(shù)據(jù),如果內(nèi)存存不下整個(gè)分區(qū)的數(shù)據(jù),那么就不存);程序執(zhí)行完畢后,系統(tǒng)會(huì)清理cache數(shù)據(jù);
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
# 調(diào)用的cache,就是調(diào)用的persist(MEMORY_ONLY)方法
# 語句執(zhí)行到這,其實(shí)還沒有緩存RDD,cache也是一個(gè)Transformation,遇到Action才會(huì)執(zhí)行
scala> rdd.cache
res2: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:26
# 第一次觸發(fā)Action操作,從頭到位執(zhí)行一次,并會(huì)把RDD緩存起來
scala> rdd.count
# 第二次Action操作,不需要從頭到尾執(zhí)行,只需要在使用緩存的cache
rdd.collect().mkString(",")
RDD容錯(cuò)機(jī)制-CheckPoint
涉及到的算子:checkpoint,也是Transformation
Spark對(duì)于數(shù)據(jù)的保存除了持久化操作之外,還提供了檢查點(diǎn)的機(jī)制。檢查點(diǎn)本質(zhì)是通過將RDD寫入高可靠的磁盤,主要目的是為了容錯(cuò)。檢查點(diǎn)通過將數(shù)據(jù)寫入到HDFS文件系統(tǒng)實(shí)現(xiàn)的RDD檢查點(diǎn)功能。
Lineage過長(zhǎng)會(huì)造成容錯(cuò)成本過高,這樣不如在中間階段做檢查點(diǎn)容錯(cuò),如果之后有節(jié)點(diǎn)出現(xiàn)問題而丟失分區(qū),從檢查點(diǎn)的RDD開始重做Lineage,就會(huì)減少開銷
cache和checkpoint有顯著區(qū)別的,緩存把RDD計(jì)算出來然后放在內(nèi)存中,但是RDD的依賴鏈不能丟掉,當(dāng)某個(gè)點(diǎn)宕機(jī)的時(shí)候,上面cache的RDD會(huì)丟掉,會(huì)通過依賴鏈重發(fā)計(jì)算,不同的是,checkpoint是把RDD保存在HDFS中,是多副本可靠存儲(chǔ),此時(shí)依賴鏈可以丟掉,所以斬?cái)嗔艘蕾囨?/strong>
以下場(chǎng)景適合使用檢查點(diǎn)機(jī)制:
- DAF的Lineage過程,如果重算,則開銷太大
- 在寬依賴上做CheckPoint獲得的收益更大
與cache類似,checkpoint也是lazy的
scala> val rdd1 = sc.parallelize(1 to 100000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
# 設(shè)置檢查點(diǎn)目錄
scala> sc.setCheckpointDir("/spark-test/checkpoint")
20/10/29 18:11:47 WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/spark-test/checkpoint' appears to be on the local filesystem.
scala> val rdd2 = rdd1.map(_ * 2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
#設(shè)置檢查點(diǎn)
scala> rdd2.checkpoint
# 判斷是否已經(jīng)做好檢查點(diǎn),由于checkpoint是lazy的,所以此時(shí)是false
scala> rdd2.isCheckpointed
res2: Boolean = false
# 獲取rdd2沒有做好檢查點(diǎn)之前的rdd依賴關(guān)系
scala> rdd2.dependencies(0).rdd
res3: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd2.dependencies(0).rdd.collect
res4: Array[_] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 1...
# 執(zhí)行Action,觸發(fā)checkpoint
scala> rdd2.count
res5: Long = 100000
#再次查看是否已經(jīng)執(zhí)行checkpoint
scala> rdd2.isCheckpointed
res6: Boolean = true
# 再次查看rdd2的依賴關(guān)系,可以看到checkpoint后,RDD的lineage被截?cái)啵兂蓮腸heckpointRDD開始
scala> rdd2.dependencies(0).rdd
res7: org.apache.spark.rdd.RDD[_] = ReliableCheckpointRDD[2] at count at <console>:26
scala> rdd2.dependencies(0).rdd.collect
res8: Array[_] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 3...
# 查看RDD所依賴的checkpoint的文件位置
scala> rdd2.getCheckpointFile
res9: Option[String] = Some(hdfs://linux121:9000/spark-test/checkpoint/8500794d-d6f8-49ef-98bd-b61c1343cbea/rdd-1)
備注:checkpoint的文件作業(yè)執(zhí)行完畢后不會(huì)被刪除,需要我們手動(dòng)自己刪除
RDD分區(qū)
spark.default.parallelism: 默認(rèn)并發(fā)數(shù) = 2
當(dāng)配置文件Spark-default.conf中沒有顯示的配置,就按照如下規(guī)則取值
本地模式
spark-shell --master local[N] spark.default.parallelism = N
spark-shell --master local spark.default.parallelism = 1
偽分布式
x 為本級(jí)上啟動(dòng)的executor數(shù),y為每個(gè)executor使用的core數(shù)。z為每個(gè)executor使用的內(nèi)存
spark-shell --master local-cluster[x,y,z]
spark,default.parallelism = x * y
分布式模式(Yarn or Standalone)
spark.default.parallelism = max(應(yīng)用程序持有executor的core總數(shù),2)
備注:total number of cores on all executor nodes or 2, whichever is larger
經(jīng)過上面的規(guī)則,就能確定了spark.default.parallelism的默認(rèn)值(配置文件 spark-defaylt.conf 中沒有顯示的配置,如果配置了,則spark.default.parallelism = 配置的值)
SparkContext初始化時(shí),同時(shí)會(huì)生成兩個(gè)參數(shù),由上面得到的spark.default.parallelism推導(dǎo)出這兩個(gè)參數(shù)的值
// 從集合中創(chuàng)建RDD的分區(qū)數(shù)
sc.defaultParallelism = spark.default.parallelism
//從文件中創(chuàng)建RDD的分區(qū)數(shù)
sc.defaultMinPartitions = min(spark.default.parallelism,2)
以上參數(shù)確定后,就可以重新計(jì)算RDD的分區(qū)數(shù)了

創(chuàng)建RDD的幾種方式:
通過集合創(chuàng)建
//如果創(chuàng)建RDD時(shí)沒有指定分區(qū)數(shù),則RDD的分區(qū)數(shù) = sc.defaultParallelism
val rdd = sc.parallelize(1 to 100)
rdd.getNumPartitions
備注:簡(jiǎn)單的說RDD分區(qū)數(shù)等于core總數(shù)
通過textFile創(chuàng)建
val rdd = sc.textFile("/azkaban-wc/wc.txt")
rdd.getNumPartitions
如果沒有沒有指定分區(qū)數(shù):
- 讀取本地文件:RDD的分區(qū)數(shù) = max(本地文件分片數(shù),sc.defaultMinPartitions)
- 讀取HDFS文件:RDD的分區(qū)數(shù) = max(hdfs文件block數(shù),sc.defaultMinPartitions)
備注:
- 本地文件分片數(shù) = 本地文件大小 /32 M
- 如果讀取的是HDFS文件,同時(shí)指定分區(qū)數(shù) < hdfs 文件的block數(shù),指定的數(shù)不生效
RDD分區(qū)器
以下RDD分別是否有分區(qū)器,是什么類型的分區(qū)器
scala> val rdd1 = sc.textFile("/azkaban-wc/wc.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /azkaban-wc/wc.txt MapPartitionsRDD[5] at textFile at <console>:24
scala> val rdd2 = rdd1.flatMap(_.split("\\s+"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:25
scala> rdd1.partitioner
res4: Option[org.apache.spark.Partitioner] = None
scala> rdd2.partitioner
res5: Option[org.apache.spark.Partitioner] = None
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:25
scala> rdd3.partitioner
res6: Option[org.apache.spark.Partitioner] = None
scala> val rdd4 = rdd3.reduceByKey
reduceByKey reduceByKeyLocally
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25
scala> rdd4.partitioner
res7: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
scala> val rdd5 = rdd4.sortByKey()
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at sortByKey at <console>:25
scala> rdd5.partitioner
res8: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@bdd2d498)
只有key-value類型的RDD才有可能有分區(qū)器,Value類型的RDD分區(qū)器的值是none
分區(qū)器的作用與分類:
在PairRDD(key,valye)中,很多操作都是基于key的,系統(tǒng)會(huì)按照key對(duì)數(shù)據(jù)進(jìn)行重組,如groupByKey,數(shù)據(jù)重組需要規(guī)則,最常見的就是基于hash的分區(qū),此外還有一種復(fù)雜的抽樣Range分區(qū)方法;

HashPartitioner
最簡(jiǎn)單、最常用,也是默認(rèn)提供的分區(qū)器,對(duì)于給定的key,計(jì)算其hashCode,并除以分區(qū)的個(gè)數(shù)取余,如果余數(shù)小于0,這用余數(shù)+分區(qū)的個(gè)數(shù),最后返回的值就是這個(gè)key所屬的分區(qū)ID。該方法可以保證key相同的數(shù)據(jù)出現(xiàn)在同一個(gè)分區(qū)。
用戶可以通過partitionBy主動(dòng)使用分區(qū)器,通過partitions參數(shù)指定想要分區(qū)的數(shù)量
scala> val rdd1 = sc.makeRDD(1 to 100).map((_,1))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at map at <console>:24
scala> rdd1.getNumPartitions
res9: Int = 5
// 僅僅是將數(shù)據(jù)大致分成了若干份,并沒有使用分區(qū)器
scala> rdd1.glom.collect.foreach(x => println(x.toBuffer))
ArrayBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (16,1), (17,1), (18,1), (19,1), (20,1))
ArrayBuffer((21,1), (22,1), (23,1), (24,1), (25,1), (26,1), (27,1), (28,1), (29,1), (30,1), (31,1), (32,1), (33,1), (34,1), (35,1), (36,1), (37,1), (38,1), (39,1), (40,1))
ArrayBuffer((41,1), (42,1), (43,1), (44,1), (45,1), (46,1), (47,1), (48,1), (49,1), (50,1), (51,1), (52,1), (53,1), (54,1), (55,1), (56,1), (57,1), (58,1), (59,1), (60,1))
ArrayBuffer((61,1), (62,1), (63,1), (64,1), (65,1), (66,1), (67,1), (68,1), (69,1), (70,1), (71,1), (72,1), (73,1), (74,1), (75,1), (76,1), (77,1), (78,1), (79,1), (80,1))
ArrayBuffer((81,1), (82,1), (83,1), (84,1), (85,1), (86,1), (87,1), (88,1), (89,1), (90,1), (91,1), (92,1), (93,1), (94,1), (95,1), (96,1), (97,1), (98,1), (99,1), (100,1))
scala> rdd1.partition
partitionBy partitioner partitions
scala> rdd1.partitioner
res11: Option[org.apache.spark.Partitioner] = None
scala> val rdd2 = rdd1.partition
partitionBy partitioner partitions
// 主動(dòng)使用HashPartitioner分區(qū)器
scala> val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at partitionBy at <console>:25
scala> rdd2.glom.collect.foreach(x => println(x.toBuffer))
ArrayBuffer((70,1), (80,1), (90,1), (100,1), (30,1), (40,1), (10,1), (20,1), (50,1), (60,1))
ArrayBuffer((21,1), (31,1), (61,1), (71,1), (81,1), (91,1), (1,1), (11,1), (41,1), (51,1))
ArrayBuffer((22,1), (32,1), (2,1), (12,1), (62,1), (72,1), (82,1), (92,1), (42,1), (52,1))
ArrayBuffer((43,1), (53,1), (3,1), (13,1), (23,1), (33,1), (63,1), (73,1), (83,1), (93,1))
ArrayBuffer((64,1), (74,1), (84,1), (94,1), (24,1), (34,1), (4,1), (14,1), (44,1), (54,1))
ArrayBuffer((25,1), (35,1), (5,1), (15,1), (45,1), (55,1), (65,1), (75,1), (85,1), (95,1))
ArrayBuffer((66,1), (76,1), (86,1), (96,1), (6,1), (16,1), (46,1), (56,1), (26,1), (36,1))
ArrayBuffer((67,1), (77,1), (87,1), (97,1), (7,1), (17,1), (27,1), (37,1), (47,1), (57,1))
ArrayBuffer((8,1), (18,1), (48,1), (58,1), (28,1), (38,1), (68,1), (78,1), (88,1), (98,1))
ArrayBuffer((29,1), (39,1), (49,1), (59,1), (9,1), (19,1), (69,1), (79,1), (89,1), (99,1))
scala> rdd2.partitioner
res13: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)
scala> rdd2.getNumPartitions
res14: Int = 10
// 主動(dòng)使用RangePartitioner分區(qū)器
scala> val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10,rdd1))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[19] at partitionBy at <console>:25
scala> rdd3.glom.collect.foreach(x => println(x.toBuffer))
ArrayBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1))
ArrayBuffer((11,1), (12,1), (13,1), (14,1), (15,1), (16,1), (17,1), (18,1), (19,1), (20,1))
ArrayBuffer((21,1), (22,1), (23,1), (24,1), (25,1), (26,1), (27,1), (28,1), (29,1), (30,1))
ArrayBuffer((31,1), (32,1), (33,1), (34,1), (35,1), (36,1), (37,1), (38,1), (39,1), (40,1))
ArrayBuffer((41,1), (42,1), (43,1), (44,1), (45,1), (46,1), (47,1), (48,1), (49,1), (50,1))
ArrayBuffer((51,1), (52,1), (53,1), (54,1), (55,1), (56,1), (57,1), (58,1), (59,1), (60,1))
ArrayBuffer((61,1), (62,1), (63,1), (64,1), (65,1), (66,1), (67,1), (68,1), (69,1), (70,1))
ArrayBuffer((71,1), (72,1), (73,1), (74,1), (75,1), (76,1), (77,1), (78,1), (79,1), (80,1))
ArrayBuffer((81,1), (82,1), (83,1), (84,1), (85,1), (86,1), (87,1), (88,1), (89,1), (90,1))
ArrayBuffer((91,1), (92,1), (93,1), (94,1), (95,1), (96,1), (97,1), (98,1), (99,1), (100,1))
scala> rdd3.partitioner
res16: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@7e392d9e)
scala> rdd3.getNumPartitions
res17: Int = 10
很多算子都可以設(shè)置HashPartitioner的值,例如我們使用的ReduceByKey算子,默認(rèn)的就是使用HashPartitioner

RangePartitioner
簡(jiǎn)單來說就是將一定范圍內(nèi)的數(shù)據(jù)映射到某一個(gè)分區(qū),在實(shí)現(xiàn)中,分區(qū)算法尤為重要,用到了水塘抽樣算法。sortByKey會(huì)使用RangePartitioner

現(xiàn)在的問題:在執(zhí)行分區(qū)之前,其實(shí)并不知道數(shù)據(jù)分布情況,如果想知道數(shù)據(jù)分區(qū)就需要對(duì)數(shù)據(jù)進(jìn)行采樣,spark中RangePartitioner對(duì)數(shù)據(jù)采用的過程使用的水塘采樣算法。
水塘采樣:從包含n個(gè)項(xiàng)目的集合S中選取k個(gè)樣本,其中n為一個(gè)很大或未知的數(shù)量,尤其適用于不能把所有n個(gè)項(xiàng)目都存放到主內(nèi)存的情況。
在采樣的過程中執(zhí)行了collect()操作,引發(fā)了Action。具體看sortByKey,sortByKey是一個(gè)Transformation。但是里面有執(zhí)行action操作。
自定義分區(qū)器
Spark運(yùn)行用戶自定義Partitioner對(duì)象, 靈活的控制RDD的分區(qū)方式
實(shí)現(xiàn)自定義分區(qū)器按以下規(guī)則分區(qū):
- 分區(qū)0 < 100
- 100 <= 分區(qū)1 < 200
- 200 <= 分區(qū)2 < 300
- 300 <= 分區(qū)3 < 400
- ... ...
- 900 <= 分區(qū)9 < 1000
package com.hhb.spark.core
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.immutable
/**
* @description:
* @date: 2020-10-31 16:41
**/
class MyPartitioner(n: Int) extends Partitioner {
override def numPartitions: Int = n
override def getPartition(key: Any): Int = {
key.toString.toInt / 100
}
}
object TestMyPartitioner {
def main(args: Array[String]): Unit = {
//創(chuàng)建sc
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]"))
val random = new scala.util.Random()
val arr: immutable.IndexedSeq[Int] = (1 to 100).map(_ => random.nextInt(1000))
val rdd1 = sc.makeRDD(arr).map((_, 1))
rdd1.glom.collect.foreach(x => println(x.toBuffer))
println("**" * 15)
val rdd2 = rdd1.partitionBy(new MyPartitioner(11))
rdd2.glom.collect.foreach(x => println(x.toBuffer))
// 關(guān)閉sc
sc.stop()
}
}
廣播變量
有時(shí)間需要在多個(gè)任務(wù)之間共享變量,或者在任務(wù)(Task)和Driver Program 之間共享變量。為了滿足這個(gè)需求,Spark提供了兩種類型的變量
- 廣播變量(broadcast variables)
- 累加器(accumulators)
廣播變量、累加器主要作用是為了優(yōu)化Spark程序
廣播變量將變量在節(jié)點(diǎn)的Exceutor之間進(jìn)行共享(有Driver廣播出去),廣播變量用來高效分發(fā)較大的對(duì)象。向所有工作節(jié)點(diǎn)(Executor)發(fā)送一個(gè)較大的只讀值,以供一個(gè)或多個(gè)操作使用
使用廣播變量的過程如下:
- 對(duì)一個(gè)類型T的對(duì)象調(diào)用SparkContext.broadcast創(chuàng)建一個(gè)Broadcast[T]對(duì)象,任何序列化的類型都可以這么實(shí)現(xiàn)(在Driver端)
- 通過value屬性訪問該對(duì)象的值(在Executor中)
- 變量只會(huì)被發(fā)到各個(gè)Executor一次,作為只讀值處理

廣播變量的參數(shù):
- spark.broadcast.blockSize(缺省值:4m) 壓縮的共享變量會(huì)分塊,每塊默認(rèn)4M
- spark.broadcast.checksum(缺省值:true) 檢查Driver發(fā)送到Executor的實(shí)際數(shù)量和要發(fā)送的數(shù)量
- spark.broadcast.compress(缺省值:true) 是否壓縮
廣播變量的運(yùn)用(Map Side Join)
普通的Join操作:

package com.hhb.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-31 21:36
**/
object JoinDemo {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
//讀取本地,默認(rèn)分片是32M,在這里設(shè)置模擬HDFS128M
sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)
//讀取文件
val productRDD = sc.textFile("/Users/baiwang/myproject/spark/data/lagou_product_info.txt")
.map(lines => {
(lines.split(";")(0), lines)
})
//讀取文件
val orderRDD = sc.textFile("/Users/baiwang/myproject/spark/data/orderinfo.txt")
.map(lines => {
(lines.split(";")(2), lines)
})
//有Shuffle
val resultRDD: RDD[(String, (String, String))] = productRDD.join(orderRDD)
println(resultRDD.count())
Thread.sleep(10000000)
sc.stop()
}
}

Map Side Join

package com.hhb.spark.core
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description:
* @author: huanghongbo
* @date: 2020-10-31 21:36
**/
object MapSideJoin {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
//讀取本地,默認(rèn)分片是32M,在這里設(shè)置模擬HDFS128M
sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)
//讀取文件
val productRDD = sc.textFile("/Users/baiwang/myproject/spark/data/lagou_product_info.txt")
.map(lines => {
(lines.split(";")(0), lines)
})
val broadProduct: Broadcast[collection.Map[String, String]] = sc.broadcast(productRDD.collectAsMap())
//讀取文件
val orderRDD = sc.textFile("/Users/baiwang/myproject/spark/data/orderinfo.txt")
.map(lines => {
(lines.split(";")(2), lines)
})
//有Shuffle
// val resultRDD: RDD[(String, (String, String))] = productRDD.join(orderRDD)
//用這種方式無shuffle,廣播變量
val resultRDD = orderRDD.map { case (pid, line) => {
val productMap = broadProduct.value
val productInfo = productMap.getOrElse(pid, null)
(pid, productInfo, line)
}
}
println(resultRDD.count())
Thread.sleep(10000000)
sc.stop()
}
}
這種方式?jīng)]有shuffle

累加器
累加器的作用:可以實(shí)現(xiàn)一個(gè)變量在不同的Executor端能保持狀態(tài)的累加;累加器在Driver端定義,讀??;在Executor中完成累加;累加器也是Lazy的,需要Action觸發(fā),Action觸發(fā)一次,執(zhí)行一次,觸發(fā)多次,執(zhí)行多次。累加器一個(gè)比較經(jīng)典的應(yīng)用場(chǎng)景是用來在Spark Streaming應(yīng)用中記錄某些事件的數(shù)量。
//方式一
scala> val data = sc.makeRDD(Seq("a b c","d e f"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> val rdd1 = data.flatMap(_.split("\\s+"))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at <console>:25
scala> val rdd2 = rdd1.map( word => 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25
scala> val count = rdd2.reduce(_+_)
count: Int = 6
//方式二
scala> var acc = 0
acc: Int = 0
scala> data.flatMap(_.split("\\s+")).foreach(word => acc+= 1)
scala> println(acc)
0
//在Driver中定義變量,每個(gè)運(yùn)行的Task會(huì)得到這些變量的一個(gè)副本,但在Task中更新這些副本的值,不會(huì)影響Driver中對(duì)應(yīng)變量的值。
Spark內(nèi)置類型的累加器,分別是:
- LongAccumulator 用來累加整數(shù)型
- DoubleAccumulator 用來累加浮點(diǎn)型
- CollectionAccumulator 用來累加集合元素
scala> val data = sc.makeRDD("a b c d e f g h i j k l m n".split("\\s+"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at makeRDD at <console>:24
scala> val acc1 = sc.longAccumulator
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 125, name: None, value: 0)
scala> val acc2 = sc.doubleAccumulator
acc2: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 126, name: None, value: 0.0)
scala> val acc3 =sc.collectionAccumulator[String]
acc3: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 128, name: None, value: [])
scala> val rdd = data.map{ word =>
| acc1.add(word.length)
| acc2.add(word.length)
| acc3.add(word)
| word
| }
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at map at <console>:31
scala> rdd.count
res7: Long = 14
scala> println(s"acc1=${acc1.value},acc2 = ${acc2.value},acc3=${acc3.value}")
acc1=14,acc2 = 14.0,acc3=[c, d, e, l, m, n, i, j, k, f, g, h, a, b]
scala> rdd.count
res10: Long = 14
scala> println(s"acc1=${acc1.value},acc2 = ${acc2.value},acc3=${acc3.value}")
acc1=28,acc2 = 28.0,acc3=[c, d, e, l, m, n, i, j, k, f, g, h, a, b, c, d, e, f, g, h, i, j, k, l, m, n, a, b]
TopN的優(yōu)化
package com.hhb.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable
/**
* @description:
* @date: 2020-11-02 10:20
**/
object TopNTest {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]"))
val random = scala.util.Random
val N = 10
val list: immutable.IndexedSeq[String] = (1 to 50).flatMap {
idx =>
(1 to 2000).map { id =>
f"group$idx%2d,${random.nextInt(100000)}"
}
}
//生產(chǎn)測(cè)試數(shù)據(jù)的RDD
val valueRDD: RDD[(String, Int)] = sc.makeRDD(list).map {
line =>
val strArr = line.split(",")
(strArr(0), strArr(1).toInt)
}
//緩存數(shù)據(jù)
valueRDD.cache()
//方法一:排序后,從右向左取值后再反轉(zhuǎn),由于使用的groupBy,是全部數(shù)據(jù)進(jìn)行Shuffle
val result1 = valueRDD.groupByKey().mapValues(x => x.toList.sorted.takeRight(N).reverse)
result1.sortByKey().collect.foreach(println(_))
println("--" * 15)
//方法二:由于shuffle無法曲線,那么就減少shuffle,在每一個(gè)分區(qū)內(nèi)獲取topN后。分區(qū)間在獲取topN
val result2 = valueRDD.aggregateByKey(List[Int]())(
//分區(qū)內(nèi)取值,一個(gè)參數(shù)就是初始值,第一個(gè)參數(shù)就是valueRDD的value,把value放到list中后排序,取N個(gè)
(list, score) => (list :+ score).sorted.takeRight(N),
//把兩個(gè)集合合并后,再次去N個(gè),這個(gè)是分區(qū)間的合并
(list1, list2) => (list1 ++ list2).sorted.takeRight(N)
).mapValues(list => list.sorted.reverse)
result2.sortByKey().collect.foreach(println(_))
}
}
Spark原理初探
Standalone模式作業(yè)提交
Standalone模式下四個(gè)重要組成部分,分別是:
- Driver: 用戶編寫Spark應(yīng)用程序就運(yùn)行在Driver上,有Driver執(zhí)行
- Master:主要負(fù)責(zé)資源的調(diào)度和分配,并進(jìn)行集群的監(jiān)控等職責(zé)
- Worker:Worker運(yùn)行在集群中的一臺(tái)服務(wù)器上,負(fù)責(zé)管理該節(jié)點(diǎn)的資源,負(fù)責(zé)啟動(dòng)節(jié)點(diǎn)上的Executor
- Executor:一個(gè)Worker上可以運(yùn)行多個(gè)Executor,Executor通過啟動(dòng)多個(gè)線程(Task)對(duì)RDD的分區(qū)進(jìn)行并行計(jì)算
SparkContext中的三大組件
DAGScheduler:負(fù)責(zé)將DAG劃分成若干個(gè)Stage
TashScheduler:將DAGScheduler提交的Stage(TaskSet)進(jìn)行優(yōu)先級(jí)排序,再將task發(fā)送到Executor
SchedulerBackend:定義了許多與Executor事件相關(guān)的處理,包括:新的executor注冊(cè)進(jìn)來的時(shí)候記錄executor的信息,增加全局的資源量(核數(shù));executor更新狀態(tài),若任務(wù)完成的話,回收core;其他停止executor、remove executor等事件

Standalone模式下作業(yè)提交步驟
1、啟動(dòng)應(yīng)用程序,完成SparkContext初始化
2、Driver程序向master注冊(cè),申請(qǐng)資源
3、Master檢查集群資源情況。如集群資源滿足,通知Worker啟動(dòng)Executor
4、Executor啟動(dòng)后向Driver注冊(cè)(反向注冊(cè))
5、Driver完成DAG解析,得到Tasks,然后向Executor發(fā)生Task
6、Executor向Driver匯總?cè)蝿?wù)的執(zhí)行情況
7、應(yīng)用程序執(zhí)行完畢,回收資源

Shuffle原理
Shuffle的本意是洗牌,目的就是把牌弄亂。
Spark、Hadoop中的shuffle可不是為了把數(shù)據(jù)弄亂,而是為了將隨機(jī)排列的數(shù)據(jù)轉(zhuǎn)換成具有移動(dòng)規(guī)則的數(shù)據(jù)。Shuffle是MapReduce計(jì)算框架中的一個(gè)特殊的階段,介于Map和Reduce之間,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí),輸出的結(jié)果需要按key排列,并且分發(fā)到Reducer上去,這個(gè)過程就是Shuffle。
Shuffle涉及到了本地磁盤(非HDFS)的讀寫和網(wǎng)絡(luò)的傳輸,大多數(shù)的Spark作業(yè)的性能主要就是消耗在了Shuffle環(huán)境,因此shuffle性能的高低直接影響到整個(gè)程序的運(yùn)行效率
在Spark Shuffle的實(shí)現(xiàn)上,經(jīng)歷了Hash、Sort、Tungsten-Sort(堆外排序)三階段
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機(jī)制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默認(rèn)仍為Hash Based Shuffle
Spark 1.2 默認(rèn)的Shuffle方式改為Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出歷史舞臺(tái)
簡(jiǎn)單的說:
Spark 1.1 以前是Hash Shuffle
Spark 1.1 引入了Sort Shuffle
Spark 1.6 將Tungsten-sort并入Sort Shuffle
Spark 2.0 Hash Shuffle退出歷史舞臺(tái)

Hash Base Shuffle V1
- 每個(gè)Shuffle Map Task 需要為每個(gè)下游的Task創(chuàng)建一個(gè)單獨(dú)的文件
- Shuffle過程中會(huì)生成海量的小文件,同時(shí)打開過多文件、低效的隨機(jī)IO

Hash Base Shuffle V2
Hash Base Shuffle V2 核心思想:允許不同的task復(fù)用同一批磁盤文件,有效降多個(gè)task的磁盤文件進(jìn)行一定程度上的合并,從而大幅度減少磁盤數(shù)量,進(jìn)而提升shuffle write的性能,一定程度上解決了Hash V1的問題,但不徹底

Hash Shuffle規(guī)避了排序,提高了性能;總體來說在Hash Shuffle過程中生成海量的小文件(Hash Base Shuffle V2 生成海量小文件的問題得到了一定程度的緩解)。
Sort Base Shuffle
Sort Base Shuffle 大大減少了Shuffle過程中產(chǎn)生的文件數(shù),提高Shuffle的效率

Spark Shuffle 與 Hadoop Shuffle從目的、功能上看時(shí)類似的,實(shí)現(xiàn)(細(xì)節(jié))上有區(qū)別
RDD變成優(yōu)化
RDD復(fù)用
避免創(chuàng)建重復(fù)的RDD,在開發(fā)過程中要注意:對(duì)于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD,不要?jiǎng)?chuàng)建多個(gè)RDD來代表同一份數(shù)據(jù)
RDD緩存/持久化
- 當(dāng)多次對(duì)同一個(gè)RDD執(zhí)行算子操作時(shí),每一次都會(huì)對(duì)這個(gè)RDD以及之前的父RDD重新計(jì)算一次,這種情況是必須要避免的,對(duì)同一個(gè)RDD的重復(fù)計(jì)算時(shí)對(duì)資源的極大浪費(fèi)
- 對(duì)多次使用的RDD進(jìn)行持久化,通過持久化將公共RDD的數(shù)據(jù)緩存到內(nèi)存/磁盤中,之后對(duì)于公共RDD的計(jì)算都會(huì)從內(nèi)存/磁盤中直接獲取RDD數(shù)據(jù)
- RDD的持久化是可以進(jìn)行序列化的,當(dāng)內(nèi)存無法將RDD的數(shù)據(jù)完整的進(jìn)行存放的時(shí)候,可以考慮使用序列化的方式減小數(shù)據(jù)體積,將數(shù)據(jù)完成存儲(chǔ)在內(nèi)存中
巧用filter
- 盡可能早的執(zhí)行filter操作,過濾無用數(shù)據(jù)
- 在filter過濾掉較多數(shù)據(jù)后,使用coalesce對(duì)數(shù)據(jù)進(jìn)行重分區(qū)
使用高性能算子
- 避免使用groupByKey,根據(jù)場(chǎng)景使用高性能算子的聚合算子reduceByKey、aggregateByKey
- coalesce、repartition,在可能的情況下優(yōu)先選擇沒有shuffle的操作
- foreachPartition優(yōu)先輸出操作
- map、mapPartitions 選擇合理的算子。MapPartitions性能更好,但數(shù)據(jù)量大時(shí)容易導(dǎo)致OOM
- 用repartitionAndSortWithinPartitions替代repartition + sort 操作
- 合理使用cache、persist、checkpoint,選擇合理的數(shù)據(jù)存儲(chǔ)級(jí)別
- filter使用
- 減少對(duì)數(shù)據(jù)源的掃描(算法復(fù)雜了)
設(shè)置合理的并行度
- Spark 作業(yè)中并行度指各個(gè)stage的task數(shù)量
- 設(shè)置合理的并行度,讓并行度與資源相匹配。見到來說就是在資源允許的前提下,并行度要設(shè)置的盡可能大,達(dá)到可以充分利用的集群資源。合理的設(shè)置并行度,可以提升整個(gè)Spark作業(yè)的性能和運(yùn)行速度
廣播大變量
- 默認(rèn)情況下,task中的算子中如果使用了外部變量,每個(gè)task都會(huì)獲取一份變量的副本,這回造成多余的網(wǎng)絡(luò)傳輸和內(nèi)存消耗
-
使用廣播變量,只會(huì)在每個(gè)Executor保存一個(gè)副本,Executor的所有task公用此廣播變量,這樣就節(jié)約了網(wǎng)絡(luò)及內(nèi)存資源
任務(wù)二錯(cuò)題集.png





