2019-08-05

Spark core Insight

1.深入理解 RDD 的內(nèi)在邏輯

  1. 能夠使用 RDD 的 算子
  2. 理解 RDD 算子的 Shuffle 和 緩存
  3. 理解 RDD 整體的使用流程
  4. 理解 RDD 的調(diào)度原理
  5. 理解 Spark 中常見(jiàn)的分布式變量共享方式

1. 深入 RDD

  • 深入理解RDD 的內(nèi)在邏輯,以及 RDD 的內(nèi)部屬性( RDD 由什么組成)

1.1 案例

需求
  • 給定一個(gè)網(wǎng)站的訪問(wèn)記錄,俗稱 Access log ( 需要這份數(shù)據(jù)的可以私聊我發(fā)給你.)
    計(jì)算其中出現(xiàn)的獨(dú)立 IP ,以及其訪問(wèn)的次數(shù)
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)

val result = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .sortBy(item => item._2, false)
  .take(10)

result.foreach(item => println(item))
  • 針對(duì)這個(gè)小案例,我們問(wèn)出了五個(gè)問(wèn)題?
1. 假設(shè)要針對(duì)整個(gè)網(wǎng)站的歷史數(shù)據(jù)進(jìn)行處理,數(shù)據(jù)量有1T,如何處理?
  • 放在集群中,利用集群多臺(tái)計(jì)算機(jī)來(lái)并行處理數(shù)據(jù).
2. 如何放在集群中運(yùn)行?
簡(jiǎn)單來(lái)講,并行計(jì)算就是同時(shí)使用多個(gè)計(jì)算資源解決同一個(gè)問(wèn)題, 有四個(gè)要點(diǎn):
  • 要解決的問(wèn)題必須可以分解為多個(gè)可以并發(fā)計(jì)算的部分
  • 每個(gè)部分要可以在不同處理器上被同時(shí)執(zhí)行
  • 需要一個(gè)共享內(nèi)存的機(jī)制
  • 需要一個(gè)總體上的協(xié)作機(jī)制來(lái)進(jìn)行資源調(diào)度
3. 如果放在集群中的話,可能要對(duì)整個(gè)計(jì)算任務(wù)進(jìn)行分解, 那么應(yīng)該如何分解任務(wù)呢?
概述:
  • 對(duì)于 HDFS 中的文件,是分為不同的 Block 的
  • 在進(jìn)行計(jì)算的時(shí)候,就可以按照 Block 來(lái)劃分,每一個(gè) Block 對(duì)應(yīng)著一個(gè)不同的計(jì)算單元
擴(kuò)展
  • RDD 并沒(méi)有真實(shí)的存放數(shù)據(jù),數(shù)據(jù)是從 HDFS 中讀取的,在計(jì)算的過(guò)程中讀取即可。
  • RDD 至少是需要可以分片的,因?yàn)?HDFS 中的文件就是分片的, RDD 分片的意義在于表示對(duì)源數(shù)據(jù)集每個(gè)分片的計(jì)算, RDD 可以分片,也就意味著 可以并行計(jì)算。
移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算,這是一個(gè)基礎(chǔ)的優(yōu)化,如何做到?
  • 小標(biāo)題的意思是說(shuō),早大數(shù)據(jù)的情況下,移動(dòng)數(shù)據(jù)到另外一臺(tái)機(jī)器上會(huì)涉及到帶寬等問(wèn)題,所以基本上應(yīng)該減少數(shù)據(jù)量的傳輸過(guò)程,也就是數(shù)據(jù)盡量就在本臺(tái)機(jī)器上面進(jìn)行計(jì)算。
  • 每一個(gè)計(jì)算單元需要記錄其存儲(chǔ)單元的位置,盡量調(diào)度過(guò)去
5. 在集群中運(yùn)行,需要很多節(jié)點(diǎn)之間配合,出錯(cuò)的概率也更高,出錯(cuò)了怎么辦?

假設(shè) RDD1 RDD2 RDD3 在轉(zhuǎn)換的過(guò)程中,RDD2 出錯(cuò)了,有兩種辦法可以解決:

  1. 緩存 RDD2 的數(shù)據(jù),直接回復(fù) RDD2 ,類似 HDFS 上的備份機(jī)制。
  2. 記錄 RDD2 的依賴關(guān)系,通過(guò)其父類的 RDD 來(lái)回復(fù) RDD2 ,這種方式會(huì)減少很多的數(shù)據(jù)交互和保存的磁盤(pán)空間。
如何通過(guò)父級(jí) RDD 來(lái) 恢復(fù)?
  1. 記錄 RDD2 的父親是 RDD1
  2. 記錄 RDD2 的計(jì)算函數(shù), 例如 記錄(下面就是一個(gè)計(jì)算函數(shù)):
RDD2 = RDD1.map(…?), map(…?) 
  1. 當(dāng) RDD2 計(jì)算出錯(cuò)的時(shí)候, 可以通過(guò) 父級(jí) RDD 和計(jì)算函數(shù)來(lái)恢復(fù) RDD2
6. 假如任務(wù)特別的復(fù)雜,流程特別的長(zhǎng),有很多的 RDD 之間的依賴關(guān)系,如何優(yōu)化呢?
  • 上面提到了可以使用依賴關(guān)系來(lái)進(jìn)行容錯(cuò),但是如果依賴關(guān)系特別長(zhǎng)的時(shí)候,這種方式其實(shí)也比較低效,這個(gè)時(shí)候就應(yīng)該使用另外一種方式,就是記錄數(shù)據(jù)集的狀態(tài)。
在 RDD 中有兩個(gè)手段可以做到
  1. 緩存
  2. CheckPoint

1.2 再談 RDD

  • 理解 RDD 為什么會(huì)出現(xiàn)
  • 理解 RDD 的主要特點(diǎn)
  • 理解 RDD 的五大屬性

1.2.1 RDD 為什么會(huì)出現(xiàn)?

在 RDD 出現(xiàn)之前,當(dāng)時(shí) MapReduce 是比較主流的,而 MapReduce 如何執(zhí)行迭代計(jì)算的任務(wù)呢?
  • 多個(gè) MapReduce 任務(wù)之間沒(méi)有基于 內(nèi)存的數(shù)據(jù)共享方式,只能通過(guò)磁盤(pán)來(lái)進(jìn)行共享
  • 因?yàn)樯婕暗搅舜疟P(pán)的I/O讀寫(xiě)共享,這種方式效率明顯低下。
RDD 如何解決迭代計(jì)算非常低效的問(wèn)題的呢?
  • 在 Spark 中,其實(shí)最終 Job3 從邏輯上的計(jì)算過(guò)程是:
Job3 = (Job1.map).filter
  • 整個(gè)過(guò)程是共享內(nèi)存的,而不需要將中間結(jié)果存放在可靠的分布式文件系統(tǒng)中。
  • 這種方式可以在保證容錯(cuò)的前提下,提供更多的靈活的,更快的執(zhí)行速度, RDD 在執(zhí)行迭代型任務(wù)的時(shí)候的表現(xiàn)可以通過(guò)下面這個(gè)代碼體現(xiàn)。
// 線性回歸
val points = sc.textFile(...)
   .map(...)
   .persist(...)
val w = randomValue
for (i <- 1 to 10000) {
   val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
       .reduce(_ + _)
   w -= gradient
}
  • 在這個(gè)例子中,進(jìn)行了大致 10000 次數(shù)的迭代,如果在 MapReduce 中實(shí)現(xiàn)的話,可能需要運(yùn)行很多 Job, 每個(gè) Job 之間都要通過(guò) HDFS 共享結(jié)果,那么速度明顯會(huì)比 RDD 慢很多很多。

1.2.3 RDD 的特點(diǎn)

RDD 不僅是數(shù)據(jù)集,也是編程模型
  • RDD 既是一種數(shù)據(jù)結(jié)構(gòu),同時(shí)也提供了上層的 API , 同時(shí) RDD 的 API 和 Scala 中對(duì)集合運(yùn)算 API 非常的相似了,同時(shí)也有各種 算子。


RDD 的算子大致分為兩類:
  • Transformation(轉(zhuǎn)換操作) : 例如 map , flatMap , filter 等
  • Action( 動(dòng)作操作)例如: reduce ,collect , show 等
  • 執(zhí)行 RDD 的時(shí)候,在執(zhí)行到Transformation(轉(zhuǎn)換)操作的時(shí)候,并不會(huì)立刻執(zhí)行,知道遇見(jiàn)了 Action (動(dòng)作)操作的時(shí)候,才是真正的觸發(fā)執(zhí)行,這個(gè)點(diǎn)特叫做 : 惰性求值
RDD 可以分區(qū)
  • RDD 是一個(gè)分布式計(jì)算框架,所以,一定要能過(guò)進(jìn)行分區(qū)計(jì)算的,只有能分區(qū)了,才能利用集群的并行計(jì)算能力。
  • 同時(shí),RDD 不需要始終被 具體化,也就是說(shuō): RDD 中可以沒(méi)有數(shù)據(jù) ,只要有足夠的信息知道自己是從誰(shuí)計(jì)算得來(lái)的就可以了,這是一種非常高效的容錯(cuò)方式
RDD 是只讀的
  • RDD 是 只讀的,不允許任何形式的修改,雖說(shuō)不能因?yàn)?RDD 和 HDFS 都是只讀的,就認(rèn)為分布式存儲(chǔ)系統(tǒng)必須設(shè)計(jì)為 只讀的,但是設(shè)計(jì)為 只讀的,會(huì)顯著降低問(wèn)題的復(fù)雜度,因?yàn)?RDD 需要可以容錯(cuò), 可以惰性求值, 可以移動(dòng)計(jì)算, 所以很難支持修改.
  • RDD2 中可能沒(méi)有數(shù)據(jù), 只是保留了依賴關(guān)系和計(jì)算函數(shù), 那修改啥?
  • 如果因?yàn)橹С中薷? 而必須保存數(shù)據(jù)的話, 怎么容錯(cuò)?
  • 如果允許修改, 如何定位要修改的那一行? RDD 的轉(zhuǎn)換是粗粒度的, 也就是說(shuō), RDD 并不感知具體每一行在哪.

RDD 是可以容錯(cuò)的


RDD 的容錯(cuò)有兩種方式
  • 保存 RDD 之間的依賴關(guān)系, 以及計(jì)算函數(shù), 出現(xiàn)錯(cuò)誤重新計(jì)算
  • 直接將 RDD 的數(shù)據(jù)存放在外部存儲(chǔ)系統(tǒng), 出現(xiàn)錯(cuò)誤直接讀取, Checkpoint

1.2.3 什么叫做彈性分布式數(shù)據(jù)集

分布式
  • RDD 支持分區(qū), 可以運(yùn)行在集群中
彈性
  • RDD 支持高效的容錯(cuò)
  • RDD 中的數(shù)據(jù)即可以緩存在內(nèi)存中, 也可以緩存在磁盤(pán)中, 也可以緩存在外部存儲(chǔ)中
數(shù)據(jù)集
  • RDD 可以不保存具體數(shù)據(jù), 只保留創(chuàng)建自己的必備信息, 例如依賴和計(jì)算函數(shù)
  • RDD 也可以緩存起來(lái), 相當(dāng)于存儲(chǔ)具體數(shù)據(jù)

總結(jié): RDD 的 五大屬性

首先整理一下上面所提到的 RDD 所要實(shí)現(xiàn)的功能:
  1. RDD 有分區(qū)
  2. RDD 要可以通過(guò)依賴關(guān)系和計(jì)算函數(shù)進(jìn)行容錯(cuò)
  3. RDD 要針對(duì)數(shù)據(jù)本地性進(jìn)行優(yōu)化
  4. RDD 支持 MapReduce 形式的計(jì)算, 所以要能夠?qū)?shù)據(jù)進(jìn)行 Shuffled
對(duì)于 RDD 來(lái)說(shuō), 其中應(yīng)該有什么內(nèi)容呢? 如果站在 RDD 設(shè)計(jì)者的角度上, 這個(gè)類中, 至少需要什么屬性?
  • Partition List 分片列表, 記錄 RDD 的分片, 可以在創(chuàng)建 RDD 的時(shí)候指定分區(qū)數(shù)目, 也可以通過(guò)算子來(lái)生成新的 RDD 從而改變分區(qū)數(shù)目
  • Compute Function 為了實(shí)現(xiàn)容錯(cuò), 需要記錄 RDD 之間轉(zhuǎn)換所執(zhí)行的計(jì)算函數(shù)
  • RDD Dependencies RDD 之間的依賴關(guān)系, 要在 RDD 中記錄其上級(jí) RDD 是誰(shuí), 從而實(shí)現(xiàn)容錯(cuò)和計(jì)算
  • Partitioner 為了執(zhí)行 Shuffled 操作, 必須要有一個(gè)函數(shù)用來(lái)計(jì)算數(shù)據(jù)應(yīng)該發(fā)往哪個(gè)分區(qū)
  • Preferred Location 優(yōu)先位置, 為了實(shí)現(xiàn)數(shù)據(jù)本地性操作, 從而移動(dòng)計(jì)算而不是移動(dòng)存儲(chǔ), 需要記錄每個(gè) RDD 分區(qū)最好應(yīng)該放置在什么位置

2. RDD 的 算子

1.理解 RDD 的算子分類, 以及其特性
2.理解常見(jiàn)算子的使用

分類

RDD 中的算子從功能上分為兩大類

  1. Transformation(轉(zhuǎn)換) 它會(huì)在一個(gè)已經(jīng)存在的 RDD 上創(chuàng)建一個(gè)新的 RDD, 將舊的 RDD 的數(shù)據(jù)轉(zhuǎn)換為另外一種形式后放入新的 RDD
  2. Action(動(dòng)作) 執(zhí)行各個(gè)分區(qū)的計(jì)算任務(wù), 將的到的結(jié)果返回到 Driver 中

RDD 中可以存放各種類型的數(shù)據(jù), 那么對(duì)于不同類型的數(shù)據(jù), RDD 又可以分為三類

  • 針對(duì)基礎(chǔ)類型(例如 String)處理的普通算子
  • 針對(duì) Key-Value 數(shù)據(jù)處理的 byKey 算子
  • 針對(duì)數(shù)字類型數(shù)據(jù)處理的計(jì)算算子

特點(diǎn)

  • Spark 中所有的 Transformations 是 Lazy(惰性) 的, 它們不會(huì)立即執(zhí)行獲得結(jié)果. 相反, 它們只會(huì)記錄在數(shù)據(jù)集上要應(yīng)用的操作. 只有當(dāng)需要返回結(jié)果給 Driver 時(shí), 才會(huì)執(zhí)行這些操作, 通過(guò) DAGScheduler ,TaskScheduler 分發(fā)到集群中運(yùn)行, 這個(gè)特性叫做 惰性求值
  • 默認(rèn)情況下, 每一個(gè) Action 運(yùn)行的時(shí)候, 其所關(guān)聯(lián)的所有 Transformation RDD 都會(huì)重新計(jì)算, 但是也可以使用 presist 方法將 RDD 持久化到磁盤(pán)或者內(nèi)存中. 這個(gè)時(shí)候?yàn)榱讼麓慰梢愿斓脑L問(wèn), 會(huì)把數(shù)據(jù)保存到集群上.

2.1 Transformations 算子

map(T ? U)

  .map( num => num * 10 )
  .collect()
map

map
作用
  • 把 RDD 中的數(shù)據(jù) 一對(duì)一 的轉(zhuǎn)為另一種形式
簽名
def map[U: ClassTag](f: T ? U): RDD[U]
參數(shù)

f → Map 算子是 原RDD → 新RDD 的過(guò)程, 傳入函數(shù)的參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過(guò)函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù)

注意點(diǎn)
  • Map 是一對(duì)一, 如果函數(shù)是 String → Array[String] 則新的 RDD 中每條數(shù)據(jù)就是一個(gè)數(shù)組

flatMap(T ? List[U])

sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
  .flatMap( line => line.split(" ") )
  .collect()
flatMap

flatMap
作用
  • FlatMap 算子和 Map 算子類似, 但是 FlatMap 是一對(duì)多
  • 就是說(shuō),我們的一個(gè)“hello lily" 字符串變成了 Hello ,lily 兩個(gè)了。和map 不同的是,map 只是在原先的上面進(jìn)行修改。
調(diào)用
def flatMap[U: ClassTag](f: T ? List[U]): RDD[U]
參數(shù)

f → 參數(shù)是原 RDD 數(shù)據(jù), 返回值是經(jīng)過(guò)函數(shù)轉(zhuǎn)換的新 RDD 的數(shù)據(jù), 需要注意的是返回值是一個(gè)集合, 集合中的數(shù)據(jù)會(huì)被展平后再放入新的 RDD

注意點(diǎn)
  • flatMap 其實(shí)是兩個(gè)操作, 是 map + flatten, 也就是先轉(zhuǎn)換, 后把轉(zhuǎn)換而來(lái)的 List 展開(kāi)
  • Spark 中并沒(méi)有直接展平 RDD 中數(shù)組的算子, 可以使用 flatMap 做這件事

filter(T ? Boolean)

sc.parallelize(Seq(1, 2, 3))
  .filter( value => value >= 3 )
  .collect()
filter

filter
作用
  • Filter 算子的主要作用是過(guò)濾掉不需要的內(nèi)容,也就是說(shuō)保留符合條件的內(nèi)容,為true ,則保留。

mapPartitions(List[T] ? List[U])

  • RDD[T] ? RDD[U] 和 map 類似, 但是針對(duì)整個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)換

mapPartitionsWithIndex

  • 和 mapPartitions 類似, 只是在函數(shù)中增加了分區(qū)的 Index

mapValues

sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
  .mapValues( value => value * 10 )
  .collect()
mapvalues

mapvalues
作用
  • MapValues 只能作用于 Key-Value 型數(shù)據(jù), 和 Map 類似, 也是使用函數(shù)按照轉(zhuǎn)換數(shù)據(jù), 不同點(diǎn)是 MapValues 只轉(zhuǎn)換 Key-Value 中的 Value

sample(withReplacement, fraction, seed)

sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  .sample(withReplacement = true, 0.6, 2)
  .collect()
sample

sample
作用
  • Sample 算子可以從一個(gè)數(shù)據(jù)集中抽樣出來(lái)一部分, 常用作于減小數(shù)據(jù)集以保證運(yùn)行速度, 并且盡可能少規(guī)律的損失
參數(shù)
  • Sample 接受第一個(gè)參數(shù)為 withReplacement, 意為是否取樣以后是否還放回原數(shù)據(jù)集供下次使用, 簡(jiǎn)單的說(shuō), 如果這個(gè)參數(shù)的值為 true, 則抽樣出來(lái)的數(shù)據(jù)集中可能會(huì)有重復(fù)
  • Sample 接受第二個(gè)參數(shù)為 fraction, 意為抽樣的比例
  • Sample 接受第三個(gè)參數(shù)為 seed, 隨機(jī)數(shù)種子, 用于 Sample 內(nèi)部隨機(jī)生成下標(biāo), 一般不指定, 使用默認(rèn)值

union(other)并集

val rdd1 = sc.parallelize(Seq(1, 2, 3))
val rdd2 = sc.parallelize(Seq(4, 5, 6))
rdd1.union(rdd2)
  .collect()
union

union

intersection(other)

val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 = sc.parallelize(Seq(4, 5, 6, 7, 8))
rdd1.intersection(rdd2)
  .collect()
image.png

image.png
作用
  • Intersection 算子是一個(gè)集合操作, 用于求得 左側(cè)集合 和 右側(cè)集合 的交集, 換句話說(shuō), 就是左側(cè)集合和右側(cè)集合都有的元素, 并生成一個(gè)新的 RDD

subtract(other, numPartitions)(差集)

  • 可以設(shè)置分區(qū)數(shù)

distinct(numPartitions)(去重)

sc.parallelize(Seq(1, 1, 2, 2, 3))
  .distinct()
  .collect()
distinct

distinct
作用
  • Distinct 算子用于去重
注意點(diǎn)
  • Distinct 是一個(gè)需要 Shuffled 的操作
  • 本質(zhì)上 Distinct 就是一個(gè) reductByKey, 把重復(fù)的合并為一個(gè)

reduceByKey((V, V) ? V, numPartition)

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .reduceByKey( (curr, agg) => curr + agg )
  .collect()
reduceByKey

reduceByKey
作用

首先按照 Key 分組生成一個(gè) Tuple, 然后針對(duì)每個(gè)組執(zhí)行 reduce 算子

調(diào)用
def reduceByKey(func: (V, V) ? V): RDD[(K, V)]
參數(shù)

func → 執(zhí)行數(shù)據(jù)處理的函數(shù), 傳入兩個(gè)參數(shù), 一個(gè)是當(dāng)前值, 一個(gè)是局部匯總, 這個(gè)函數(shù)需要有一個(gè)輸出, 輸出就是這個(gè) Key 的匯總結(jié)果

注意點(diǎn)

ReduceByKey 只能作用于 Key-Value 型數(shù)據(jù), Key-Value 型數(shù)據(jù)在當(dāng)前語(yǔ)境中特指 Tuple2
ReduceByKey 是一個(gè)需要 Shuffled 的操作
和其它的 Shuffled 相比, ReduceByKey是高效的, 因?yàn)轭愃?MapReduce 的, 在 Map 端有一個(gè) Cominer, 這樣 I/O 的數(shù)據(jù)便會(huì)減少

groupByKey()

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .groupByKey()
  .collect()
groupByKey

groupByKey
作用
  • GroupByKey 算子的主要作用是按照 Key 分組, 和 ReduceByKey 有點(diǎn)類似, 但是 GroupByKey 并不求聚合, 只是列舉 Key 對(duì)應(yīng)的所有 Value
注意點(diǎn)
  • GroupByKey 是一個(gè) Shuffled
  • GroupByKey 和 ReduceByKey 不同, 因?yàn)樾枰信e Key 對(duì)應(yīng)的所有數(shù)據(jù), 所以無(wú)法在 Map 端做 Combine, 所以 GroupByKey 的性能并沒(méi)有 ReduceByKey 好

combineByKey()

val rdd = sc.parallelize(Seq(
  ("zhangsan", 99.0),
  ("zhangsan", 96.0),
  ("lisi", 97.0),
  ("lisi", 98.0),
  ("zhangsan", 97.0))
)

val combineRdd = rdd.combineByKey(
  score => (score, 1),
  (scoreCount: (Double, Int),newScore) => (scoreCount._1 + newScore, scoreCount._2 + 1),
  (scoreCount1: (Double, Int), scoreCount2: (Double, Int)) =>
    (scoreCount1._1 + scoreCount2._1, scoreCount1._2 + scoreCount2._2)
)

val meanRdd = combineRdd.map(score => (score._1, score._2._1 / score._2._2))

meanRdd.collect()
combineByKey
作用
  • 對(duì)數(shù)據(jù)集按照 Key 進(jìn)行聚合
調(diào)用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner],[serializer])
參數(shù)
  • createCombiner 將 Value 進(jìn)行初步轉(zhuǎn)換
  • mergeValue 在每個(gè)分區(qū)把上一步轉(zhuǎn)換的結(jié)果聚合
  • mergeCombiners 在所有分區(qū)上把每個(gè)分區(qū)的聚合結(jié)果聚合
  • partitioner 可選, 分區(qū)函數(shù)
  • mapSideCombiner 可選, 是否在 Map 端 Combine
  • serializer 序列化器
注意點(diǎn)
  • combineByKey 的要點(diǎn)就是三個(gè)函數(shù)的意義要理解
  • groupByKey, reduceByKey 的底層都是 combineByKey

aggregateByKey()

val rdd = sc.parallelize(Seq(("手機(jī)", 10.0), ("手機(jī)", 15.0), ("電腦", 20.0)))
val result = rdd.aggregateByKey(0.8)(
  seqOp = (zero, price) => price * zero,
  combOp = (curr, agg) => curr + agg
).collect()
println(result)
aggregateByKey
作用

聚合所有 Key 相同的 Value, 換句話說(shuō), 按照 Key 聚合 Value

調(diào)用

rdd.aggregateByKey(zeroValue)(seqOp, combOp)

參數(shù)

zeroValue 初始值
seqOp 轉(zhuǎn)換每一個(gè)值的函數(shù)
comboOp 將轉(zhuǎn)換過(guò)的值聚合的函數(shù)

注意點(diǎn) 為什么需要兩個(gè)函數(shù)?

aggregateByKey 運(yùn)行將一個(gè) RDD[(K, V)] 聚合為 RDD[(K, U)], 如果要做到這件事的話, 就需要先對(duì)數(shù)據(jù)做一次轉(zhuǎn)換, 將每條數(shù)據(jù)從 V 轉(zhuǎn)為 U, seqOp 就是干這件事的 當(dāng) seqOp 的事情結(jié)束以后, comboOp 把其結(jié)果聚合
和 reduceByKey 的區(qū)別

  • aggregateByKey 最終聚合結(jié)果的類型和傳入的初始值類型保持一致
  • reduceByKey 在集合中選取第一個(gè)值作為初始值, 并且聚合過(guò)的數(shù)據(jù)類型不能改變

foldByKey(zeroValue)((V, V) ? V)

sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
  .foldByKey(zeroValue = 10)( (curr, agg) => curr + agg )
  .collect()
foldByKey

foldByKey
作用

和 ReduceByKey 是一樣的, 都是按照 Key 做分組去求聚合, 但是 FoldByKey 的不同點(diǎn)在于可以指定初始值

調(diào)用

foldByKey(zeroValue)(func)

參數(shù)

zeroValue 初始值
func seqOp 和 combOp 相同, 都是這個(gè)參數(shù)

注意點(diǎn)

FoldByKey 是 AggregateByKey 的簡(jiǎn)化版本, seqOp 和 combOp 是同一個(gè)函數(shù)
FoldByKey 指定的初始值作用于每一個(gè) Value

join(other, numPartitions)

val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))

rdd1.join(rdd2).collect()
Join
作用

將兩個(gè) RDD 按照相同的 Key 進(jìn)行連接

調(diào)用
join(other, [partitioner or numPartitions])
參數(shù)

other 其它 RDD
partitioner or numPartitions 可選, 可以通過(guò)傳遞分區(qū)函數(shù)或者分區(qū)數(shù)量來(lái)改變分區(qū)

注意點(diǎn)
  • Join 有點(diǎn)類似于 SQL 中的內(nèi)連接, 只會(huì)再結(jié)果中包含能夠連接到的 Key
  • Join 的結(jié)果是一個(gè)笛卡爾積形式, 例如 "a", 1), ("a", 2 和 "a", 10), ("a", 11 的 Join 結(jié)果集是 "a", 1, 10), ("a", 1, 11), ("a", 2, 10), ("a", 2, 11

cogroup(other, numPartitions)

val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 5), ("b", 2), ("b", 6), ("c", 3), ("d", 2)))
val rdd2 = sc.parallelize(Seq(("a", 10), ("b", 1), ("d", 3)))
val rdd3 = sc.parallelize(Seq(("b", 10), ("a", 1)))

val result1 = rdd1.cogroup(rdd2).collect()
val result2 = rdd1.cogroup(rdd2, rdd3).collect()

/*
執(zhí)行結(jié)果:
Array(
  (d,(CompactBuffer(2),CompactBuffer(3))),
  (a,(CompactBuffer(1, 2, 5),CompactBuffer(10))),
  (b,(CompactBuffer(2, 6),CompactBuffer(1))),
  (c,(CompactBuffer(3),CompactBuffer()))
)
 */
println(result1)

/*
執(zhí)行結(jié)果:
Array(
  (d,(CompactBuffer(2),CompactBuffer(3),CompactBuffer())),
  (a,(CompactBuffer(1, 2, 5),CompactBuffer(10),CompactBuffer(1))),
  (b,(CompactBuffer(2, 6),CompactBuffer(1),Co...
 */
println(result2)
cogroup
作用

多個(gè) RDD 協(xié)同分組, 將多個(gè) RDD 中 Key 相同的 Value 分組

調(diào)用
cogroup(rdd1, rdd2, rdd3, [partitioner or numPartitions])
參數(shù)

rdd…? 最多可以傳三個(gè) RDD 進(jìn)去, 加上調(diào)用者, 可以為四個(gè) RDD 協(xié)同分組
partitioner or numPartitions 可選, 可以通過(guò)傳遞分區(qū)函數(shù)或者分區(qū)數(shù)來(lái)改變分區(qū)

注意點(diǎn)

對(duì) RDD1, RDD2, RDD3 進(jìn)行 cogroup, 結(jié)果中就一定會(huì)有三個(gè) List, 如果沒(méi)有 Value 則是空 List, 這一點(diǎn)類似于 SQL 的全連接, 返回所有結(jié)果, 即使沒(méi)有關(guān)聯(lián)上
CoGroup 是一個(gè)需要 Shuffled 的操作

cartesian(other)

(RDD[T], RDD[U]) ? RDD[(T, U)]
  • 生成兩個(gè) RDD 的笛卡爾積

sortBy(ascending, numPartitions)

val rdd1 = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val sortByResult = rdd1.sortBy( item => item._2 ).collect()
val sortByKeyResult = rdd1.sortByKey().collect()

println(sortByResult)
println(sortByKeyResult)
作用

排序相關(guān)相關(guān)的算子有兩個(gè), 一個(gè)是 sortBy, 另外一個(gè)是 sortByKey

調(diào)用
sortBy(func, ascending, numPartitions)
參數(shù)

func 通過(guò)這個(gè)函數(shù)返回要排序的字段
ascending 是否升序
numPartitions 分區(qū)數(shù)

注意點(diǎn)

普通的 RDD 沒(méi)有 sortByKey, 只有 Key-Value 的 RDD 才有
sortBy 可以指定按照哪個(gè)字段來(lái)排序, sortByKey 直接按照 Key 來(lái)排序

partitionBy(partitioner)

  • 使用用傳入的 partitioner 重新分區(qū), 如果和當(dāng)前分區(qū)函數(shù)相同, 則忽略操作

coalesce(numPartitions)

  • 減少分區(qū)數(shù)
val rdd = sc.parallelize(Seq(("a", 3), ("b", 2), ("c", 1)))
val oldNum = rdd.partitions.length

val coalesceRdd = rdd.coalesce(4, shuffle = true)
val coalesceNum = coalesceRdd.partitions.length

val repartitionRdd = rdd.repartition(4)
val repartitionNum = repartitionRdd.partitions.length

print(oldNum, coalesceNum, repartitionNum)
作用

一般涉及到分區(qū)操作的算子常見(jiàn)的有兩個(gè), repartitioin 和 coalesce, 兩個(gè)算子都可以調(diào)大或者調(diào)小分區(qū)數(shù)量

調(diào)用
repartitioin(numPartitions)
coalesce(numPartitions, shuffle)
參數(shù)

numPartitions 新的分區(qū)數(shù)
shuffle 是否 shuffle, 如果新的分區(qū)數(shù)量比原分區(qū)數(shù)大, 必須 Shuffled, 否則重分區(qū)無(wú)效

注意點(diǎn)

repartition 和 coalesce 的不同就在于 coalesce 可以控制是否 Shuffle
repartition 是一個(gè) Shuffled 操作

repartition(numPartitions)

  • 重新分區(qū)

repartitionAndSortWithinPartitions

  • 重新分區(qū)的同時(shí)升序排序, 在 partitioner 中排序, 比先重分區(qū)再排序要效率高, 建議使用在需要分區(qū)后再排序的場(chǎng)景使用

常見(jiàn)的 Transformation 類型的 RDD

map
flatMap
filter
groupBy
reduceByKey

常見(jiàn)的 Action 類型的 RDD

collect
countByKey
reduce

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容