Spark core Insight
1.深入理解 RDD 的內(nèi)在邏輯
- 能夠使用 RDD 的 算子
- 理解 RDD 算子的 Shuffle 和 緩存
- 理解 RDD 整體的使用流程
- 理解 RDD 的調(diào)度原理
- 理解 Spark 中常見(jiàn)的分布式變量共享方式
1. 深入 RDD
- 深入理解RDD 的內(nèi)在邏輯,以及 RDD 的內(nèi)部屬性( RDD 由什么組成)
1.1 案例
需求
- 給定一個(gè)網(wǎng)站的訪問(wèn)記錄,俗稱 Access log ( 需要這份數(shù)據(jù)的可以私聊我發(fā)給你.)
計(jì)算其中出現(xiàn)的獨(dú)立 IP ,以及其訪問(wèn)的次數(shù)
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
val result = sc.textFile("dataset/access_log_sample.txt")
.map(item => (item.split(" ")(0), 1))
.filter(item => StringUtils.isNotBlank(item._1))
.reduceByKey((curr, agg) => curr + agg)
.sortBy(item => item._2, false)
.take(10)
result.foreach(item => println(item))
- 針對(duì)這個(gè)小案例,我們問(wèn)出了五個(gè)問(wèn)題?
1. 假設(shè)要針對(duì)整個(gè)網(wǎng)站的歷史數(shù)據(jù)進(jìn)行處理,數(shù)據(jù)量有1T,如何處理?
- 放在集群中,利用集群多臺(tái)計(jì)算機(jī)來(lái)并行處理數(shù)據(jù).
2. 如何放在集群中運(yùn)行?
簡(jiǎn)單來(lái)講,并行計(jì)算就是同時(shí)使用多個(gè)計(jì)算資源解決同一個(gè)問(wèn)題, 有四個(gè)要點(diǎn):
- 要解決的問(wèn)題必須可以分解為多個(gè)可以并發(fā)計(jì)算的部分
- 每個(gè)部分要可以在不同處理器上被同時(shí)執(zhí)行
- 需要一個(gè)共享內(nèi)存的機(jī)制
- 需要一個(gè)總體上的協(xié)作機(jī)制來(lái)進(jìn)行資源調(diào)度
3. 如果放在集群中的話,可能要對(duì)整個(gè)計(jì)算任務(wù)進(jìn)行分解, 那么應(yīng)該如何分解任務(wù)呢?

概述:
- 對(duì)于 HDFS 中的文件,是分為不同的 Block 的
- 在進(jìn)行計(jì)算的時(shí)候,就可以按照 Block 來(lái)劃分,每一個(gè) Block 對(duì)應(yīng)著一個(gè)不同的計(jì)算單元
擴(kuò)展
- RDD 并沒(méi)有真實(shí)的存放數(shù)據(jù),數(shù)據(jù)是從 HDFS 中讀取的,在計(jì)算的過(guò)程中讀取即可。
- RDD 至少是需要可以分片的,因?yàn)?HDFS 中的文件就是分片的, RDD 分片的意義在于表示對(duì)源數(shù)據(jù)集每個(gè)分片的計(jì)算, RDD 可以分片,也就意味著 可以并行計(jì)算。
移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算,這是一個(gè)基礎(chǔ)的優(yōu)化,如何做到?
- 小標(biāo)題的意思是說(shuō),早大數(shù)據(jù)的情況下,移動(dòng)數(shù)據(jù)到另外一臺(tái)機(jī)器上會(huì)涉及到帶寬等問(wèn)題,所以基本上應(yīng)該減少數(shù)據(jù)量的傳輸過(guò)程,也就是數(shù)據(jù)盡量就在本臺(tái)機(jī)器上面進(jìn)行計(jì)算。
- 每一個(gè)計(jì)算單元需要記錄其存儲(chǔ)單元的位置,盡量調(diào)度過(guò)去
5. 在集群中運(yùn)行,需要很多節(jié)點(diǎn)之間配合,出錯(cuò)的概率也更高,出錯(cuò)了怎么辦?
假設(shè) RDD1 RDD2 RDD3 在轉(zhuǎn)換的過(guò)程中,RDD2 出錯(cuò)了,有兩種辦法可以解決:
- 緩存 RDD2 的數(shù)據(jù),直接回復(fù) RDD2 ,類似 HDFS 上的備份機(jī)制。
- 記錄 RDD2 的依賴關(guān)系,通過(guò)其父類的 RDD 來(lái)回復(fù) RDD2 ,這種方式會(huì)減少很多的數(shù)據(jù)交互和保存的磁盤(pán)空間。
如何通過(guò)父級(jí) RDD 來(lái) 恢復(fù)?
- 記錄 RDD2 的父親是 RDD1
- 記錄 RDD2 的計(jì)算函數(shù), 例如 記錄(下面就是一個(gè)計(jì)算函數(shù)):
RDD2 = RDD1.map(…?), map(…?)
- 當(dāng) RDD2 計(jì)算出錯(cuò)的時(shí)候, 可以通過(guò) 父級(jí) RDD 和計(jì)算函數(shù)來(lái)恢復(fù) RDD2
6. 假如任務(wù)特別的復(fù)雜,流程特別的長(zhǎng),有很多的 RDD 之間的依賴關(guān)系,如何優(yōu)化呢?
- 上面提到了可以使用依賴關(guān)系來(lái)進(jìn)行容錯(cuò),但是如果依賴關(guān)系特別長(zhǎng)的時(shí)候,這種方式其實(shí)也比較低效,這個(gè)時(shí)候就應(yīng)該使用另外一種方式,就是記錄數(shù)據(jù)集的狀態(tài)。
在 RDD 中有兩個(gè)手段可以做到
- 緩存
- CheckPoint
1.2 再談 RDD
- 理解 RDD 為什么會(huì)出現(xiàn)
- 理解 RDD 的主要特點(diǎn)
- 理解 RDD 的五大屬性
1.2.1 RDD 為什么會(huì)出現(xiàn)?
在 RDD 出現(xiàn)之前,當(dāng)時(shí) MapReduce 是比較主流的,而 MapReduce 如何執(zhí)行迭代計(jì)算的任務(wù)呢?

- 多個(gè) MapReduce 任務(wù)之間沒(méi)有基于 內(nèi)存的數(shù)據(jù)共享方式,只能通過(guò)磁盤(pán)來(lái)進(jìn)行共享
- 因?yàn)樯婕暗搅舜疟P(pán)的I/O讀寫(xiě)共享,這種方式效率明顯低下。
RDD 如何解決迭代計(jì)算非常低效的問(wèn)題的呢?

- 在 Spark 中,其實(shí)最終 Job3 從邏輯上的計(jì)算過(guò)程是:
Job3 = (Job1.map).filter
- 整個(gè)過(guò)程是共享內(nèi)存的,而不需要將中間結(jié)果存放在可靠的分布式文件系統(tǒng)中。
- 這種方式可以在保證容錯(cuò)的前提下,提供更多的靈活的,更快的執(zhí)行速度, RDD 在執(zhí)行迭代型任務(wù)的時(shí)候的表現(xiàn)可以通過(guò)下面這個(gè)代碼體現(xiàn)。
// 線性回歸
val points = sc.textFile(...)
.map(...)
.persist(...)
val w = randomValue
for (i <- 1 to 10000) {
val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
.reduce(_ + _)
w -= gradient
}
- 在這個(gè)例子中,進(jìn)行了大致 10000 次數(shù)的迭代,如果在 MapReduce 中實(shí)現(xiàn)的話,可能需要運(yùn)行很多 Job, 每個(gè) Job 之間都要通過(guò) HDFS 共享結(jié)果,那么速度明顯會(huì)比 RDD 慢很多很多。
1.2.3 RDD 的特點(diǎn)
RDD 不僅是數(shù)據(jù)集,也是編程模型
-
RDD 既是一種數(shù)據(jù)結(jié)構(gòu),同時(shí)也提供了上層的 API , 同時(shí) RDD 的 API 和 Scala 中對(duì)集合運(yùn)算 API 非常的相似了,同時(shí)也有各種 算子。
RDD 的算子大致分為兩類:
- Transformation(轉(zhuǎn)換操作) : 例如 map , flatMap , filter 等
- Action( 動(dòng)作操作)例如: reduce ,collect , show 等
- 執(zhí)行 RDD 的時(shí)候,在執(zhí)行到Transformation(轉(zhuǎn)換)操作的時(shí)候,并不會(huì)立刻執(zhí)行,知道遇見(jiàn)了 Action (動(dòng)作)操作的時(shí)候,才是真正的觸發(fā)執(zhí)行,這個(gè)點(diǎn)特叫做 : 惰性求值
RDD 可以分區(qū)

- RDD 是一個(gè)分布式計(jì)算框架,所以,一定要能過(guò)進(jìn)行分區(qū)計(jì)算的,只有能分區(qū)了,才能利用集群的并行計(jì)算能力。
- 同時(shí),RDD 不需要始終被 具體化,也就是說(shuō): RDD 中可以沒(méi)有數(shù)據(jù) ,只要有足夠的信息知道自己是從誰(shuí)計(jì)算得來(lái)的就可以了,這是一種非常高效的容錯(cuò)方式
RDD 是只讀的

- RDD 是 只讀的,不允許任何形式的修改,雖說(shuō)不能因?yàn)?RDD 和 HDFS 都是只讀的,就認(rèn)為分布式存儲(chǔ)系統(tǒng)必須設(shè)計(jì)為 只讀的,但是設(shè)計(jì)為 只讀的,會(huì)顯著降低問(wèn)題的復(fù)雜度,因?yàn)?RDD 需要可以容錯(cuò), 可以惰性求值, 可以移動(dòng)計(jì)算, 所以很難支持修改.
- RDD2 中可能沒(méi)有數(shù)據(jù), 只是保留了依賴關(guān)系和計(jì)算函數(shù), 那修改啥?
- 如果因?yàn)橹С中薷? 而必須保存數(shù)據(jù)的話, 怎么容錯(cuò)?
- 如果允許修改, 如何定位要修改的那一行? RDD 的轉(zhuǎn)換是粗粒度的, 也就是說(shuō), RDD 并不感知具體每一行在哪.
RDD 是可以容錯(cuò)的

RDD 的容錯(cuò)有兩種方式
- 保存 RDD 之間的依賴關(guān)系, 以及計(jì)算函數(shù), 出現(xiàn)錯(cuò)誤重新計(jì)算
- 直接將 RDD 的數(shù)據(jù)存放在外部存儲(chǔ)系統(tǒng), 出現(xiàn)錯(cuò)誤直接讀取, Checkpoint
1.2.3 什么叫做彈性分布式數(shù)據(jù)集
分布式
- RDD 支持分區(qū), 可以運(yùn)行在集群中
彈性
- RDD 支持高效的容錯(cuò)
- RDD 中的數(shù)據(jù)即可以緩存在內(nèi)存中, 也可以緩存在磁盤(pán)中, 也可以緩存在外部存儲(chǔ)中
數(shù)據(jù)集
- RDD 可以不保存具體數(shù)據(jù), 只保留創(chuàng)建自己的必備信息, 例如依賴和計(jì)算函數(shù)
- RDD 也可以緩存起來(lái), 相當(dāng)于存儲(chǔ)具體數(shù)據(jù)
總結(jié): RDD 的 五大屬性
首先整理一下上面所提到的 RDD 所要實(shí)現(xiàn)的功能:
- RDD 有分區(qū)
- RDD 要可以通過(guò)依賴關(guān)系和計(jì)算函數(shù)進(jìn)行容錯(cuò)
- RDD 要針對(duì)數(shù)據(jù)本地性進(jìn)行優(yōu)化
- RDD 支持 MapReduce 形式的計(jì)算, 所以要能夠?qū)?shù)據(jù)進(jìn)行 Shuffled
對(duì)于 RDD 來(lái)說(shuō), 其中應(yīng)該有什么內(nèi)容呢? 如果站在 RDD 設(shè)計(jì)者的角度上, 這個(gè)類中, 至少需要什么屬性?
- Partition List 分片列表, 記錄 RDD 的分片, 可以在創(chuàng)建 RDD 的時(shí)候指定分區(qū)數(shù)目, 也可以通過(guò)算子來(lái)生成新的 RDD 從而改變分區(qū)數(shù)目
- Compute Function 為了實(shí)現(xiàn)容錯(cuò), 需要記錄 RDD 之間轉(zhuǎn)換所執(zhí)行的計(jì)算函數(shù)
- RDD Dependencies RDD 之間的依賴關(guān)系, 要在 RDD 中記錄其上級(jí) RDD 是誰(shuí), 從而實(shí)現(xiàn)容錯(cuò)和計(jì)算
- Partitioner 為了執(zhí)行 Shuffled 操作, 必須要有一個(gè)函數(shù)用來(lái)計(jì)算數(shù)據(jù)應(yīng)該發(fā)往哪個(gè)分區(qū)
- Preferred Location 優(yōu)先位置, 為了實(shí)現(xiàn)數(shù)據(jù)本地性操作, 從而移動(dòng)計(jì)算而不是移動(dòng)存儲(chǔ), 需要記錄每個(gè) RDD 分區(qū)最好應(yīng)該放置在什么位置
2. RDD 的 算子
1.理解 RDD 的算子分類, 以及其特性
2.理解常見(jiàn)算子的使用
分類
RDD 中的算子從功能上分為兩大類
- Transformation(轉(zhuǎn)換) 它會(huì)在一個(gè)已經(jīng)存在的 RDD 上創(chuàng)建一個(gè)新的 RDD, 將舊的 RDD 的數(shù)據(jù)轉(zhuǎn)換為另外一種形式后放入新的 RDD
- Action(動(dòng)作) 執(zhí)行各個(gè)分區(qū)的計(jì)算任務(wù), 將的到的結(jié)果返回到 Driver 中
RDD 中可以存放各種類型的數(shù)據(jù), 那么對(duì)于不同類型的數(shù)據(jù), RDD 又可以分為三類
- 針對(duì)基礎(chǔ)類型(例如 String)處理的普通算子
- 針對(duì) Key-Value 數(shù)據(jù)處理的 byKey 算子
- 針對(duì)數(shù)字類型數(shù)據(jù)處理的計(jì)算算子
特點(diǎn)
- Spark 中所有的 Transformations 是 Lazy(惰性) 的, 它們不會(huì)立即執(zhí)行獲得結(jié)果. 相反, 它們只會(huì)記錄在數(shù)據(jù)集上要應(yīng)用的操作. 只有當(dāng)需要返回結(jié)果給 Driver 時(shí), 才會(huì)執(zhí)行這些操作, 通過(guò) DAGScheduler ,TaskScheduler 分發(fā)到集群中運(yùn)行, 這個(gè)特性叫做 惰性求值
- 默認(rèn)情況下, 每一個(gè) Action 運(yùn)行的時(shí)候, 其所關(guān)聯(lián)的所有 Transformation RDD 都會(huì)重新計(jì)算, 但是也可以使用 presist 方法將 RDD 持久化到磁盤(pán)或者內(nèi)存中. 這個(gè)時(shí)候?yàn)榱讼麓慰梢愿斓脑L問(wèn), 會(huì)把數(shù)據(jù)保存到集群上.
2.1 Transformations 算子
map(T ? U)
.map( num => num * 10 )
.collect()


作用
- 把 RDD 中的數(shù)據(jù) 一對(duì)一 的轉(zhuǎn)為另一種形式
簽名
def map[U: ClassTag](f: T ? U): RDD[U]
參數(shù)
f → Map 算子是 原RDD → 新RDD 的過(guò)程, 傳入函數(shù)的參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過(guò)函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù)
注意點(diǎn)
- Map 是一對(duì)一, 如果函數(shù)是 String → Array[String] 則新的 RDD 中每條數(shù)據(jù)就是一個(gè)數(shù)組
flatMap(T ? List[U])
sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
.flatMap( line => line.split(" ") )
.collect()


作用
- FlatMap 算子和 Map 算子類似, 但是 FlatMap 是一對(duì)多
- 就是說(shuō),我們的一個(gè)“hello lily" 字符串變成了 Hello ,lily 兩個(gè)了。和map 不同的是,map 只是在原先的上面進(jìn)行修改。
調(diào)用
def flatMap[U: ClassTag](f: T ? List[U]): RDD[U]
參數(shù)
f → 參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過(guò)函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù), 需要注意的是返回值是一個(gè)集合, 集合中的數(shù)據(jù)會(huì)被展平后再放入新的 RDD
注意點(diǎn)
- flatMap 其實(shí)是兩個(gè)操作, 是 map + flatten, 也就是先轉(zhuǎn)換, 后把轉(zhuǎn)換而來(lái)的 List 展開(kāi)
- Spark 中并沒(méi)有直接展平 RDD 中數(shù)組的算子, 可以使用 flatMap 做這件事
filter(T ? Boolean)
sc.parallelize(Seq(1, 2, 3))
.filter( value => value >= 3 )
.collect()


作用
- Filter 算子的主要作用是過(guò)濾掉不需要的內(nèi)容,也就是說(shuō)保留符合條件的內(nèi)容,為true ,則保留。
mapPartitions(List[T] ? List[U])
- RDD[T] ? RDD[U] 和 map 類似, 但是針對(duì)整個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)換
mapPartitionsWithIndex
- 和 mapPartitions 類似, 只是在函數(shù)中增加了分區(qū)的 Index
mapValues
sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
.mapValues( value => value * 10 )
.collect()


作用
- MapValues 只能作用于 Key-Value 型數(shù)據(jù), 和 Map 類似, 也是使用函數(shù)按照轉(zhuǎn)換數(shù)據(jù), 不同點(diǎn)是 MapValues 只轉(zhuǎn)換 Key-Value 中的 Value
sample(withReplacement, fraction, seed)
sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.sample(withReplacement = true, 0.6, 2)
.collect()


作用
- Sample 算子可以從一個(gè)數(shù)據(jù)集中抽樣出來(lái)一部分, 常用作于減小數(shù)據(jù)集以保證運(yùn)行速度, 并且盡可能少規(guī)律的損失
參數(shù)
- Sample 接受第一個(gè)參數(shù)為 withReplacement, 意為是否取樣以后是否還放回原數(shù)據(jù)集供下次使用, 簡(jiǎn)單的說(shuō), 如果這個(gè)參數(shù)的值為 true, 則抽樣出來(lái)的數(shù)據(jù)集中可能會(huì)有重復(fù)
- Sample 接受第二個(gè)參數(shù)為 fraction, 意為抽樣的比例
- Sample 接受第三個(gè)參數(shù)為 seed, 隨機(jī)數(shù)種子, 用于 Sample 內(nèi)部隨機(jī)生成下標(biāo), 一般不指定, 使用默認(rèn)值
union(other)并集
val rdd1 = sc.parallelize(Seq(1, 2, 3))
val rdd2 = sc.parallelize(Seq(4, 5, 6))
rdd1.union(rdd2)
.collect()


intersection(other)
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
rdd1.intersection(rdd2)
.collect()


作用
- Intersection 算子是一個(gè)集合操作, 用于求得 左側(cè)集合 和 右側(cè)集合 的交集, 換句話說(shuō), 就是左側(cè)集合和右側(cè)集合都有的元素, 并生成一個(gè)新的 RDD
subtract(other, numPartitions)(差集)
- 可以設(shè)置分區(qū)數(shù)
distinct(numPartitions)(去重)
sc.parallelize(Seq(1, 1, 2, 2, 3))
.distinct()
.collect()


作用
- Distinct 算子用于去重
注意點(diǎn)
- Distinct 是一個(gè)需要 Shuffled 的操作
- 本質(zhì)上 Distinct 就是一個(gè) reductByKey, 把重復(fù)的合并為一個(gè)
reduceByKey((V, V) ? V, numPartition)
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.reduceByKey( (curr, agg) => curr + agg )
.collect()


作用
首先按照 Key 分組生成一個(gè) Tuple, 然后針對(duì)每個(gè)組執(zhí)行 reduce 算子
調(diào)用
def reduceByKey(func: (V, V) ? V): RDD[(K, V)]
參數(shù)
func → 執(zhí)行數(shù)據(jù)處理的函數(shù), 傳入兩個(gè)參數(shù), 一個(gè)是當(dāng)前值, 一個(gè)是局部匯總, 這個(gè)函數(shù)需要有一個(gè)輸出, 輸出就是這個(gè) Key 的匯總結(jié)果
注意點(diǎn)
ReduceByKey 只能作用于 Key-Value 型數(shù)據(jù), Key-Value 型數(shù)據(jù)在當(dāng)前語(yǔ)境中特指 Tuple2
ReduceByKey 是一個(gè)需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因?yàn)轭愃?MapReduce 的, 在 Map 端有一個(gè) Cominer, 這樣 I/O 的數(shù)據(jù)便會(huì)減少
groupByKey()
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.groupByKey()
.collect()


作用
- GroupByKey 算子的主要作用是按照 Key 分組, 和 ReduceByKey 有點(diǎn)類似, 但是 GroupByKey 并不求聚合, 只是列舉 Key 對(duì)應(yīng)的所有 Value
注意點(diǎn)
- GroupByKey 是一個(gè) Shuffled
- GroupByKey 和 ReduceByKey 不同, 因?yàn)樾枰信e Key 對(duì)應(yīng)的所有數(shù)據(jù), 所以無(wú)法在 Map 端做 Combine, 所以 GroupByKey 的性能并沒(méi)有 ReduceByKey 好
combineByKey()
val rdd = sc.parallelize(Seq(
("zhangsan", 99.0),
("zhangsan", 96.0),
("lisi", 97.0),
("lisi", 98.0),
("zhangsan", 97.0))
)
val combineRdd = rdd.combineByKey(
score => (score, 1),
(scoreCount: (Double, Int),newScore) => (scoreCount._1 + newScore, scoreCount._2 + 1),
(scoreCount1: (Double, Int), scoreCount2: (Double, Int)) =>
(scoreCount1._1 + scoreCount2._1, scoreCount1._2 + scoreCount2._2)
)
val meanRdd = combineRdd.map(score => (score._1, score._2._1 / score._2._2))
meanRdd.collect()

作用
- 對(duì)數(shù)據(jù)集按照 Key 進(jìn)行聚合
調(diào)用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner],[serializer])
參數(shù)
- createCombiner 將 Value 進(jìn)行初步轉(zhuǎn)換
- mergeValue 在每個(gè)分區(qū)把上一步轉(zhuǎn)換的結(jié)果聚合
- mergeCombiners 在所有分區(qū)上把每個(gè)分區(qū)的聚合結(jié)果聚合
- partitioner 可選, 分區(qū)函數(shù)
- mapSideCombiner 可選, 是否在 Map 端 Combine
- serializer 序列化器
注意點(diǎn)
- combineByKey 的要點(diǎn)就是三個(gè)函數(shù)的意義要理解
- groupByKey, reduceByKey 的底層都是 combineByKey
aggregateByKey()
val rdd = sc.parallelize(Seq(("手機(jī)", 10.0), ("手機(jī)", 15.0), ("電腦", 20.0)))
val result = rdd.aggregateByKey(0.8)(
seqOp = (zero, price) => price * zero,
combOp = (curr, agg) => curr + agg
).collect()
println(result)

作用
聚合所有 Key 相同的 Value, 換句話說(shuō), 按照 Key 聚合 Value
調(diào)用
rdd.aggregateByKey(zeroValue)(seqOp, combOp)
參數(shù)
zeroValue 初始值
seqOp 轉(zhuǎn)換每一個(gè)值的函數(shù)
comboOp 將轉(zhuǎn)換過(guò)的值聚合的函數(shù)
注意點(diǎn) 為什么需要兩個(gè)函數(shù)?
aggregateByKey 運(yùn)行將一個(gè) RDD[(K, V)] 聚合為 RDD[(K, U)], 如果要做到這件事的話, 就需要先對(duì)數(shù)據(jù)做一次轉(zhuǎn)換, 將每條數(shù)據(jù)從 V 轉(zhuǎn)為 U, seqOp 就是干這件事的 當(dāng) seqOp 的事情結(jié)束以后, comboOp 把其結(jié)果聚合
和 reduceByKey 的區(qū)別
- aggregateByKey 最終聚合結(jié)果的類型和傳入的初始值類型保持一致
- reduceByKey 在集合中選取第一個(gè)值作為初始值, 并且聚合過(guò)的數(shù)據(jù)類型不能改變
foldByKey(zeroValue)((V, V) ? V)
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.foldByKey(zeroValue = 10)( (curr, agg) => curr + agg )
.collect()


作用
和 ReduceByKey 是一樣的, 都是按照 Key 做分組去求聚合, 但是 FoldByKey 的不同點(diǎn)在于可以指定初始值
調(diào)用
foldByKey(zeroValue)(func)
參數(shù)
zeroValue 初始值
func seqOp 和 combOp 相同, 都是這個(gè)參數(shù)
注意點(diǎn)
FoldByKey 是 AggregateByKey 的簡(jiǎn)化版本, seqOp 和 combOp 是同一個(gè)函數(shù)
FoldByKey 指定的初始值作用于每一個(gè) Value
join(other, numPartitions)
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))
rdd1.join(rdd2).collect()

作用
將兩個(gè) RDD 按照相同的 Key 進(jìn)行連接
調(diào)用
join(other, [partitioner or numPartitions])
參數(shù)
other 其它 RDD
partitioner or numPartitions 可選, 可以通過(guò)傳遞分區(qū)函數(shù)或者分區(qū)數(shù)量來(lái)改變分區(qū)
注意點(diǎn)
- Join 有點(diǎn)類似于 SQL 中的內(nèi)連接, 只會(huì)再結(jié)果中包含能夠連接到的 Key
- Join 的結(jié)果是一個(gè)笛卡爾積形式, 例如 "a", 1), ("a", 2 和 "a", 10), ("a", 11 的 Join 結(jié)果集是 "a", 1, 10), ("a", 1, 11), ("a", 2, 10), ("a", 2, 11
cogroup(other, numPartitions)
val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 5), ("b", 2), ("b", 6), ("c", 3), ("d", 2)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("b", 1), ("d", 3)))
val rdd3 = sc.parallelize(Seq(("b", 10), ("a", 1)))
val result1 = rdd1.cogroup(rdd2).collect()
val result2 = rdd1.cogroup(rdd2, rdd3).collect()
/*
執(zhí)行結(jié)果:
Array(
(d,(CompactBuffer(2),CompactBuffer(3))),
(a,(CompactBuffer(1, 2, 5),CompactBuffer(10))),
(b,(CompactBuffer(2, 6),CompactBuffer(1))),
(c,(CompactBuffer(3),CompactBuffer()))
)
*/
println(result1)
/*
執(zhí)行結(jié)果:
Array(
(d,(CompactBuffer(2),CompactBuffer(3),CompactBuffer())),
(a,(CompactBuffer(1, 2, 5),CompactBuffer(10),CompactBuffer(1))),
(b,(CompactBuffer(2, 6),CompactBuffer(1),Co...
*/
println(result2)

作用
多個(gè) RDD 協(xié)同分組, 將多個(gè) RDD 中 Key 相同的 Value 分組
調(diào)用
cogroup(rdd1, rdd2, rdd3, [partitioner or numPartitions])
參數(shù)
rdd…? 最多可以傳三個(gè) RDD 進(jìn)去, 加上調(diào)用者, 可以為四個(gè) RDD 協(xié)同分組
partitioner or numPartitions 可選, 可以通過(guò)傳遞分區(qū)函數(shù)或者分區(qū)數(shù)來(lái)改變分區(qū)
注意點(diǎn)
對(duì) RDD1, RDD2, RDD3 進(jìn)行 cogroup, 結(jié)果中就一定會(huì)有三個(gè) List, 如果沒(méi)有 Value 則是空 List, 這一點(diǎn)類似于 SQL 的全連接, 返回所有結(jié)果, 即使沒(méi)有關(guān)聯(lián)上
CoGroup 是一個(gè)需要 Shuffled 的操作
cartesian(other)
(RDD[T], RDD[U]) ? RDD[(T, U)]
- 生成兩個(gè) RDD 的笛卡爾積
sortBy(ascending, numPartitions)
val rdd1 = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val sortByResult = rdd1.sortBy( item => item._2 ).collect()
val sortByKeyResult = rdd1.sortByKey().collect()
println(sortByResult)
println(sortByKeyResult)
作用
排序相關(guān)相關(guān)的算子有兩個(gè), 一個(gè)是 sortBy, 另外一個(gè)是 sortByKey
調(diào)用
sortBy(func, ascending, numPartitions)
參數(shù)
func 通過(guò)這個(gè)函數(shù)返回要排序的字段
ascending 是否升序
numPartitions 分區(qū)數(shù)
注意點(diǎn)
普通的 RDD 沒(méi)有 sortByKey, 只有 Key-Value 的 RDD 才有
sortBy 可以指定按照哪個(gè)字段來(lái)排序, sortByKey 直接按照 Key 來(lái)排序
partitionBy(partitioner)
- 使用用傳入的 partitioner 重新分區(qū), 如果和當(dāng)前分區(qū)函數(shù)相同, 則忽略操作
coalesce(numPartitions)
- 減少分區(qū)數(shù)
val rdd = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val oldNum = rdd.partitions.length
val coalesceRdd = rdd.coalesce(4, shuffle = true)
val coalesceNum = coalesceRdd.partitions.length
val repartitionRdd = rdd.repartition(4)
val repartitionNum = repartitionRdd.partitions.length
print(oldNum, coalesceNum, repartitionNum)
作用
一般涉及到分區(qū)操作的算子常見(jiàn)的有兩個(gè), repartitioin 和 coalesce, 兩個(gè)算子都可以調(diào)大或者調(diào)小分區(qū)數(shù)量
調(diào)用
repartitioin(numPartitions)
coalesce(numPartitions, shuffle)
參數(shù)
numPartitions 新的分區(qū)數(shù)
shuffle 是否 shuffle, 如果新的分區(qū)數(shù)量比原分區(qū)數(shù)大, 必須 Shuffled, 否則重分區(qū)無(wú)效
注意點(diǎn)
repartition 和 coalesce 的不同就在于 coalesce 可以控制是否 Shuffle
repartition 是一個(gè) Shuffled 操作
repartition(numPartitions)
- 重新分區(qū)
repartitionAndSortWithinPartitions
- 重新分區(qū)的同時(shí)升序排序, 在 partitioner 中排序, 比先重分區(qū)再排序要效率高, 建議使用在需要分區(qū)后再排序的場(chǎng)景使用
常見(jiàn)的 Transformation 類型的 RDD
map
flatMap
filter
groupBy
reduceByKey
常見(jiàn)的 Action 類型的 RDD
collect
countByKey
reduce
